diff --git a/api/model.go b/api/model.go index 967d571..3763476 100644 --- a/api/model.go +++ b/api/model.go @@ -1,5 +1,7 @@ package api +import "time" + type ( // PostBlobResponse represents the response to a successful POST request to upload a blob. PostBlobResponse struct { @@ -11,4 +13,14 @@ type ( // Error is the description of the error. Error string `json:"error"` } + GetStatusResponse struct { + ID string `json:"id"` + Replicas []Replica `json:"Replicas,omitempty"` + } + Replica struct { + Provider string `json:"provider"` + Status string `json:"status"` + LastVerified time.Time `json:"lastVerified"` + Expiration time.Time `json:"expiration"` + } ) diff --git a/api/server/error_response.go b/api/server/error_response.go index 6f813e6..5f64e35 100644 --- a/api/server/error_response.go +++ b/api/server/error_response.go @@ -12,7 +12,6 @@ var ( errResponseBlobNotFound = api.ErrorResponse{Error: "No blob is found for the given ID"} errResponseNotStreamContentType = api.ErrorResponse{Error: `Invalid content type, expected "application/octet-stream".`} errResponseInvalidContentLength = api.ErrorResponse{Error: "Invalid content length, expected unsigned numerical value."} - errResponseNotImplemented = api.ErrorResponse{Error: "This functionally is pending implementation."} ) func errResponseInternalError(err error) api.ErrorResponse { diff --git a/api/server/handler.go b/api/server/handler.go index da4c0f7..4b3cf28 100644 --- a/api/server/handler.go +++ b/api/server/handler.go @@ -94,7 +94,17 @@ func (m *HttpServer) handleBlobGetByID(w http.ResponseWriter, r *http.Request, i return } logger := logger.With("id", id) - blobReader, blobDesc, err := m.store.Get(r.Context(), id) + blobDesc, err := m.store.Describe(r.Context(), id) + switch err { + case nil: + case blob.ErrBlobNotFound: + respondWithJson(w, errResponseBlobNotFound, http.StatusNotFound) + return + default: + respondWithJson(w, errResponseInternalError(err), http.StatusInternalServerError) + return + } + blobReader, err := m.store.Get(r.Context(), id) switch err { case nil: case blob.ErrBlobNotFound: @@ -113,8 +123,41 @@ func (m *HttpServer) handleBlobGetByID(w http.ResponseWriter, r *http.Request, i logger.Debug("Blob fetched successfully") } -func (m *HttpServer) handleBlobGetStatusByID(w http.ResponseWriter, _ *http.Request, _ string) { - respondWithJson(w, errResponseNotImplemented, http.StatusNotImplemented) +func (m *HttpServer) handleBlobGetStatusByID(w http.ResponseWriter, r *http.Request, idUriSegment string) { + var id blob.ID + if err := id.Decode(idUriSegment); err != nil { + respondWithJson(w, errResponseInvalidBlobID, http.StatusBadRequest) + return + } + logger := logger.With("id", id) + blobDesc, err := m.store.Describe(r.Context(), id) + switch err { + case nil: + case blob.ErrBlobNotFound: + respondWithJson(w, errResponseBlobNotFound, http.StatusNotFound) + return + default: + logger.Errorw("Failed to get status for ID", "err", err) + respondWithJson(w, errResponseInternalError(err), http.StatusInternalServerError) + return + } + + response := api.GetStatusResponse{ + ID: idUriSegment, + } + + if blobDesc.Status != nil { + response.Replicas = make([]api.Replica, len(blobDesc.Status.Replicas)) + for _, replica := range blobDesc.Status.Replicas { + response.Replicas = append(response.Replicas, api.Replica{ + Provider: replica.Provider, + Status: replica.Status, + LastVerified: replica.LastVerified, + Expiration: replica.Expiration, + }) + } + } + respondWithJson(w, response, http.StatusOK) } func (m *HttpServer) handleRoot(w http.ResponseWriter, r *http.Request) { diff --git a/api/server/util.go b/api/server/util.go index c51e4ad..d6ffada 100644 --- a/api/server/util.go +++ b/api/server/util.go @@ -31,7 +31,9 @@ func httpHeaderAllow(methods ...string) (string, string) { func respondWithJson(w http.ResponseWriter, resp any, code int) { w.Header().Set(httpHeaderContentTypeJson()) w.Header().Set(httpHeaderContentTypeOptionsNoSniff()) - w.WriteHeader(code) + if code != http.StatusOK { + w.WriteHeader(code) + } if err := json.NewEncoder(w).Encode(resp); err != nil { logger.Errorw("Failed to encode response.", "code", code, "resp", resp, "err", err) } diff --git a/blob/blob.go b/blob/blob.go index 565e294..70474a7 100644 --- a/blob/blob.go +++ b/blob/blob.go @@ -25,10 +25,21 @@ type ( Size uint64 // ModificationTime is the latest time at which the blob was modified. ModificationTime time.Time + Status *Status + } + Status struct { + Replicas []Replica + } + Replica struct { + Provider string + Status string + LastVerified time.Time + Expiration time.Time } Store interface { Put(context.Context, io.ReadCloser) (*Descriptor, error) - Get(context.Context, ID) (io.ReadSeekCloser, *Descriptor, error) + Describe(context.Context, ID) (*Descriptor, error) + Get(context.Context, ID) (io.ReadSeekCloser, error) } ) diff --git a/blob/local_store.go b/blob/local_store.go index 3c474fa..488a924 100644 --- a/blob/local_store.go +++ b/blob/local_store.go @@ -59,23 +59,32 @@ func (l *LocalStore) Put(_ context.Context, reader io.ReadCloser) (*Descriptor, }, nil } -// Get Retrieves the content of blob log with its Descriptor. -// If no file is found for the given id, ErrBlobNotFound is returned. -func (l *LocalStore) Get(_ context.Context, id ID) (io.ReadSeekCloser, *Descriptor, error) { +// Get Retrieves the content of blob. +// If no blob is found for the given id, ErrBlobNotFound is returned. +func (l *LocalStore) Get(_ context.Context, id ID) (io.ReadSeekCloser, error) { switch blob, err := os.Open(path.Join(l.dir, id.String()+".bin")); { case err == nil: - stat, err := blob.Stat() - if err != nil { - return nil, nil, err - } - return blob, &Descriptor{ + return blob, nil + case errors.Is(err, os.ErrNotExist): + return nil, ErrBlobNotFound + default: + return nil, err + } +} + +// Describe gets the description of the blob for the given id. +// If no blob is found for the given id, ErrBlobNotFound is returned. +func (l *LocalStore) Describe(ctx context.Context, id ID) (*Descriptor, error) { + switch stat, err := os.Stat(path.Join(l.dir, id.String()+".bin")); { + case err == nil: + return &Descriptor{ ID: id, Size: uint64(stat.Size()), ModificationTime: stat.ModTime(), }, nil case errors.Is(err, os.ErrNotExist): - return nil, nil, ErrBlobNotFound + return nil, ErrBlobNotFound default: - return nil, nil, err + return nil, err } } diff --git a/blob/ribs_store.go b/blob/ribs_store.go index fc3800a..361fb67 100644 --- a/blob/ribs_store.go +++ b/blob/ribs_store.go @@ -32,9 +32,8 @@ type ( // RibsStore is an experimental Store implementation that uses RIBS. // See: https://github.com/filcat/ribs RibsStore struct { - ribs ribs.RIBS - maxSize int - //index map[uuid.UUID]*ribsStoredBlob // TODO persist this on disk + ribs ribs.RIBS + maxSize int indexDir string } ribsStoredBlob struct { @@ -71,9 +70,8 @@ func NewRibsStore(dir string) (*RibsStore, error) { return nil, err } return &RibsStore{ - ribs: rbs, - maxSize: 32 << 30, // 32 GiB - //index: map[uuid.UUID]*ribsStoredBlob{}, + ribs: rbs, + maxSize: 32 << 30, // 32 GiB indexDir: indexDir, }, nil @@ -151,24 +149,39 @@ SplitLoop: return storedBlob.Descriptor, nil } -func (r *RibsStore) Get(ctx context.Context, id ID) (io.ReadSeekCloser, *Descriptor, error) { - index, err := os.Open(path.Join(r.indexDir, id.String())) +func (r *RibsStore) Get(ctx context.Context, id ID) (io.ReadSeekCloser, error) { + storedBlob, err := r.describeRibsStoredBlob(ctx, id) if err != nil { - if errors.Is(err, os.ErrNotExist) { - return nil, nil, ErrBlobNotFound - } - return nil, nil, err - } - var storedBlob ribsStoredBlob - if err := json.NewDecoder(index).Decode(&storedBlob); err != nil { - return nil, nil, err + return nil, err } session := r.ribs.Session(ctx) - reader, err := newRibsStoredBlobReader(session, &storedBlob) + reader, err := newRibsStoredBlobReader(session, storedBlob) + if err != nil { + return nil, err + } + return reader, nil +} + +func (r *RibsStore) Describe(ctx context.Context, id ID) (*Descriptor, error) { + storedBlob, err := r.describeRibsStoredBlob(ctx, id) if err != nil { - return nil, nil, err + return nil, err + } + return storedBlob.Descriptor, err +} + +func (r *RibsStore) describeRibsStoredBlob(_ context.Context, id ID) (*ribsStoredBlob, error) { + switch index, err := os.Open(path.Join(r.indexDir, id.String())); { + case err == nil: + var storedBlob ribsStoredBlob + err := json.NewDecoder(index).Decode(&storedBlob) + // TODO: populate descriptor status with FileCoin chain data about the stored blob. + return &storedBlob, err + case errors.Is(err, os.ErrNotExist): + return nil, ErrBlobNotFound + default: + return nil, err } - return reader, storedBlob.Descriptor, nil } func (r *RibsStore) Shutdown(_ context.Context) error {