Skip to content
This repository has been archived by the owner on May 15, 2024. It is now read-only.

Commit

Permalink
Implement local Motion HTTP API that conforms to the specification
Browse files Browse the repository at this point in the history
Implement a basic motion API with a local flat file blob store that
 conforms to the OpenAPI specification.

 Implement the server with swappable blob storage such that future work
 can change it for other types of storage such as RIPS, S3, R2, etc.

 TODOs are left for async discussion points.
  • Loading branch information
masih committed Jul 5, 2023
1 parent a06961a commit 4717372
Show file tree
Hide file tree
Showing 12 changed files with 621 additions and 2 deletions.
10 changes: 10 additions & 0 deletions api/model.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package api

type (
PostBlobResponse struct {
ID string `json:"id"`
}
ErrorResponse struct {
Error string `json:"error"`
}
)
134 changes: 134 additions & 0 deletions api/server/handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package server

import (
"fmt"
"io"
"net/http"
"strconv"
"strings"

"github.com/filecoin-project/motion/api"
"github.com/filecoin-project/motion/blob"
)

func (m *HttpServer) handleBlobRoot(w http.ResponseWriter, r *http.Request) {
switch r.Method {
case http.MethodOptions:
w.Header().Set(httpHeaderAllow(http.MethodPost, http.MethodOptions))
case http.MethodPost:
m.handlePostBlob(w, r)
default:
respondWithNotAllowed(w, http.MethodPost, http.MethodOptions)
}
}

func (m *HttpServer) handlePostBlob(w http.ResponseWriter, r *http.Request) {
// TODO: check Accept header accepts JSON response
if r.Header.Get("Content-Type") != "application/octet-stream" {
respondWithJson(w, api.ErrorResponse{Error: `Invalid content type, expected "application/octet-stream".`}, http.StatusBadRequest)
return
}
var contentLength uint64
if value := r.Header.Get("Content-Length"); value != "" {
var err error
if contentLength, err = strconv.ParseUint(value, 10, 32); err != nil {
respondWithJson(w, api.ErrorResponse{Error: "Invalid content length, expected unsigned numerical value."}, http.StatusBadRequest)
return
}
if contentLength > m.maxBlobLength {
message := fmt.Sprintf(`Content-Length exceeds the maximum accepted content length of %d bytes.`, m.maxBlobLength)
respondWithJson(w, api.ErrorResponse{Error: message}, http.StatusBadRequest)
return
}
}
defer r.Body.Close()
desc, err := m.store.Put(r.Context(), r.Body)
switch err {
case nil:
case blob.ErrBlobTooLarge:
message := fmt.Sprintf(`Blob length exceeds the maximum accepted length of %d bytes.`, m.maxBlobLength)
respondWithJson(w, api.ErrorResponse{Error: message}, http.StatusBadRequest)
return
default:
respondWithJson(w, api.ErrorResponse{Error: "Internal error occurred while uploading data: " + err.Error()}, http.StatusInternalServerError)
return
}
logger := logger.With("id", desc.ID, "size", desc.Size)
if contentLength != 0 && desc.Size != contentLength {
logger.Warnw("Content-Length in request header did not match the data length", "expectedSize", contentLength)
// TODO: add option to reject such requests?
}
respondWithJson(w, api.PostBlobResponse{ID: desc.ID.String()}, http.StatusCreated)
logger.Debugw("Blob crated successfully", "id", desc.ID, "size", desc.Size)
}

func (m *HttpServer) handleBlobSubtree(w http.ResponseWriter, r *http.Request) {
switch r.Method {
case http.MethodOptions:
w.Header().Set(httpHeaderAllow(http.MethodGet, http.MethodOptions))
case http.MethodGet:
m.handleBlobGet(w, r)
default:
respondWithNotAllowed(w, http.MethodPost, http.MethodOptions)
}
}

func (m *HttpServer) handleBlobGet(w http.ResponseWriter, r *http.Request) {
suffix := strings.TrimPrefix(r.URL.Path, "/v0/blob/")
segments := strings.Split(suffix, "/")
switch len(segments) {
case 1:
m.handleBlobGetByID(w, r, segments[0])
case 2:
if segments[1] == "status" {
m.handleBlobGetStatusByID(w, r, segments[0])
} else {
respondWithJson(w, api.ErrorResponse{Error: "404 Page Not found"}, http.StatusNotFound)
}
default:
respondWithJson(w, api.ErrorResponse{Error: "404 Page Not found"}, http.StatusNotFound)
}
}

func (m *HttpServer) handleBlobGetByID(w http.ResponseWriter, r *http.Request, idUriSegment string) {
var id blob.ID
if err := id.Decode(idUriSegment); err != nil {
respondWithJson(w, api.ErrorResponse{Error: "Invalid blob ID"}, http.StatusBadRequest)
return
}
logger := logger.With("id", id)
blobReader, err := m.store.Get(r.Context(), id)
switch err {
case nil:
case blob.ErrBlobNotFound:
respondWithJson(w, api.ErrorResponse{Error: "No blob is found for the given ID"}, http.StatusNotFound)
return
default:
respondWithJson(w, api.ErrorResponse{Error: "Internal error occurred while getting blob: " + err.Error()}, http.StatusInternalServerError)
return
}
w.Header().Set(httpHeaderContentTypeOctetStream())
w.Header().Set(httpHeaderContentTypeOptionsNoSniff())
w.Header().Set("Content-Disposition", fmt.Sprintf(`attachement; filename="%s.bin"`, id.String()))

// TODO: Use pooled buffers with configurable size for better efficiency
var buf []byte
if written, err := io.CopyBuffer(w, blobReader, buf); err != nil {
logger.Errorw("Failed to write blob", "written", written, "err", err)
} else {
logger.Debugw("Blob fetched successfully", "size", written)
}
}

func (m *HttpServer) handleBlobGetStatusByID(w http.ResponseWriter, _ *http.Request, _ string) {
respondWithJson(w, api.ErrorResponse{Error: "This functionally is pending implementation."}, http.StatusNotImplemented)
}

func (m *HttpServer) handleRoot(w http.ResponseWriter, r *http.Request) {
switch r.Method {
case http.MethodOptions:
w.Header().Set(httpHeaderAllow(http.MethodOptions))
default:
respondWithJson(w, api.ErrorResponse{Error: "404 Page Not found"}, http.StatusNotFound)
}
}
36 changes: 36 additions & 0 deletions api/server/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package server

type (
Option func(*options) error
options struct {
httpListenAddr string
maxBlobLength uint64
}
)

func newOptions(o ...Option) (*options, error) {
opts := &options{
httpListenAddr: "0.0.0.0:40080",
maxBlobLength: 32 << 30, // 32 GiB
}
for _, apply := range o {
if err := apply(opts); err != nil {
return nil, err
}
}
return opts, nil
}

func WithHttpListenAddr(addr string) Option {
return func(o *options) error {
o.httpListenAddr = addr
return nil
}
}

func WithMaxBlobLength(l uint64) Option {
return func(o *options) error {
o.maxBlobLength = l
return nil
}
}
64 changes: 64 additions & 0 deletions api/server/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package server

import (
"context"
"errors"
"net"
"net/http"

"github.com/filecoin-project/motion/blob"
"github.com/ipfs/go-log/v2"
)

var logger = log.Logger("motion/api/server")

type (
HttpServer struct {
*options
httpServer *http.Server
store blob.Store
}
)

func NewHttpServer(store blob.Store, o ...Option) (*HttpServer, error) {
opts, err := newOptions(o...)
if err != nil {
return nil, err
}
server := &HttpServer{
options: opts,
store: store,
}
server.httpServer = &http.Server{
Handler: server.ServeMux(),
}
return server, nil
}

func (m *HttpServer) Start(_ context.Context) error {
listener, err := net.Listen("tcp", m.httpListenAddr)
if err != nil {
return err
}
go func() {
if err := m.httpServer.Serve(listener); errors.Is(err, http.ErrServerClosed) {
logger.Info("HTTP server stopped successfully.")
} else {
logger.Errorw("HTTP server stopped erroneously.", "err", err)
}
}()
logger.Infow("HTTP server started successfully.", "address", listener.Addr())
return nil
}

func (m *HttpServer) ServeMux() *http.ServeMux {
mux := http.NewServeMux()
mux.HandleFunc("/v0/blob", m.handleBlobRoot)
mux.HandleFunc("/v0/blob/", m.handleBlobSubtree)
mux.HandleFunc("/", m.handleRoot)
return mux
}

func (m *HttpServer) Shutdown(ctx context.Context) error {
return m.httpServer.Shutdown(ctx)
}
40 changes: 40 additions & 0 deletions api/server/util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package server

import (
"encoding/json"
"net/http"
"strings"

"github.com/filecoin-project/motion/api"
)

func httpHeaderContentTypeJson() (string, string) {
return "Content-Type", "application/json; charset=utf-8"
}
func httpHeaderContentTypeOctetStream() (string, string) {
return "Content-Type", "application/octet-stream"
}

func httpHeaderContentTypeOptionsNoSniff() (string, string) {
return "X-Content-Type-Options", "nosniff"
}

func httpHeaderAllow(methods ...string) (string, string) {
return "Allow", strings.Join(methods, ",")
}

func respondWithJson(w http.ResponseWriter, resp any, code int) {
w.Header().Set(httpHeaderContentTypeJson())
w.Header().Set(httpHeaderContentTypeOptionsNoSniff())
w.WriteHeader(code)
if err := json.NewEncoder(w).Encode(resp); err != nil {
logger.Errorw("Failed to encode response.", "code", code, "resp", resp, "err", err)
}
}

func respondWithNotAllowed(w http.ResponseWriter, allowedMethods ...string) {
w.Header().Set(httpHeaderAllow(allowedMethods...))
respondWithJson(w, api.ErrorResponse{
Error: `Method not allowed. Please see "Allow" response header for the list of allowed methods.`,
}, http.StatusMethodNotAllowed)
}
39 changes: 39 additions & 0 deletions blob/blob.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package blob

import (
"context"
"errors"
"io"

"github.com/ipfs/go-cid"
)

var (
ErrBlobTooLarge = errors.New("blob size exceeds the maximum allowed")
ErrBlobNotFound = errors.New("no blob is found with given ID")
)

type (
ID cid.Cid // TODO: Discuss if everyone is on board with using CIDs as blob ID.
Descriptor struct {
ID ID // TODO: Discuss whether to use CIDs straight up.
Size uint64
}
Store interface {
Put(context.Context, io.ReadCloser) (*Descriptor, error)
Get(context.Context, ID) (io.ReadCloser, error)
}
)

func (i *ID) String() string {
return cid.Cid(*i).String()
}

func (i *ID) Decode(v string) error {
decode, err := cid.Decode(v)
if err != nil {
return err
}
*i = ID(decode)
return nil
}
65 changes: 65 additions & 0 deletions blob/local_store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package blob

import (
"context"
"errors"
"io"
"os"
"path"

"github.com/ipfs/go-cid"
"github.com/multiformats/go-multihash"
)

var _ Store = (*LocalStore)(nil)

type LocalStore struct {
dir string
}

func NewLocalStore(dir string) *LocalStore {
return &LocalStore{
dir: dir,
}
}

func (l *LocalStore) Put(_ context.Context, reader io.ReadCloser) (*Descriptor, error) {
hasher, err := multihash.GetHasher(multihash.SHA2_256)
if err != nil {
return nil, err
}
teeReader := io.TeeReader(reader, hasher)
dest, err := os.CreateTemp("", "motion_local_store_*.bin")
if err != nil {
return nil, err
}
defer dest.Close()
written, err := io.Copy(dest, teeReader)
if err != nil {
return nil, err
}
sum := hasher.Sum(nil)
mh, err := multihash.Encode(sum, multihash.SHA2_256)
if err != nil {
return nil, err
}
id := cid.NewCidV1(cid.Raw, mh)
if err = os.Rename(dest.Name(), path.Join(l.dir, id.String()+".bin")); err != nil {
return nil, err
}
return &Descriptor{
ID: ID(id),
Size: uint64(written),
}, nil
}

func (l *LocalStore) Get(_ context.Context, id ID) (io.ReadCloser, error) {
switch blob, err := os.Open(path.Join(l.dir, id.String()+".bin")); {
case err == nil:
return blob, nil
case errors.Is(err, os.ErrNotExist):
return nil, ErrBlobNotFound
default:
return nil, err
}
}
Loading

0 comments on commit 4717372

Please sign in to comment.