Skip to content
This repository has been archived by the owner on May 15, 2024. It is now read-only.

Commit

Permalink
Extend blob store to return status and handle /status
Browse files Browse the repository at this point in the history
Change the blob store interface to separate `Describe` from `Get`, and
extend the content of `blob.Descriptor` to include `blob.Status`.

Reflect the changes on the existing stores. Leave TODOs in place for the
actual information fetching of the deals made by the store to be
implemented in separate PRs.

Change the sever to correctly handle `GET /v0/blob/{id}/status`.
  • Loading branch information
masih committed Jul 21, 2023
1 parent 27bcb41 commit a322be5
Show file tree
Hide file tree
Showing 7 changed files with 124 additions and 35 deletions.
12 changes: 12 additions & 0 deletions api/model.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package api

import "time"

type (
// PostBlobResponse represents the response to a successful POST request to upload a blob.
PostBlobResponse struct {
Expand All @@ -11,4 +13,14 @@ type (
// Error is the description of the error.
Error string `json:"error"`
}
GetStatusResponse struct {
ID string `json:"id"`
Replicas []Replica `json:"Replicas,omitempty"`
}
Replica struct {
Provider string `json:"provider"`
Status string `json:"status"`
LastVerified time.Time `json:"lastVerified"`
Expiration time.Time `json:"expiration"`
}
)
1 change: 0 additions & 1 deletion api/server/error_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ var (
errResponseBlobNotFound = api.ErrorResponse{Error: "No blob is found for the given ID"}
errResponseNotStreamContentType = api.ErrorResponse{Error: `Invalid content type, expected "application/octet-stream".`}
errResponseInvalidContentLength = api.ErrorResponse{Error: "Invalid content length, expected unsigned numerical value."}
errResponseNotImplemented = api.ErrorResponse{Error: "This functionally is pending implementation."}
)

func errResponseInternalError(err error) api.ErrorResponse {
Expand Down
49 changes: 46 additions & 3 deletions api/server/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,17 @@ func (m *HttpServer) handleBlobGetByID(w http.ResponseWriter, r *http.Request, i
return
}
logger := logger.With("id", id)
blobReader, blobDesc, err := m.store.Get(r.Context(), id)
blobDesc, err := m.store.Describe(r.Context(), id)
switch err {
case nil:
case blob.ErrBlobNotFound:
respondWithJson(w, errResponseBlobNotFound, http.StatusNotFound)
return
default:
respondWithJson(w, errResponseInternalError(err), http.StatusInternalServerError)
return
}
blobReader, err := m.store.Get(r.Context(), id)
switch err {
case nil:
case blob.ErrBlobNotFound:
Expand All @@ -113,8 +123,41 @@ func (m *HttpServer) handleBlobGetByID(w http.ResponseWriter, r *http.Request, i
logger.Debug("Blob fetched successfully")
}

func (m *HttpServer) handleBlobGetStatusByID(w http.ResponseWriter, _ *http.Request, _ string) {
respondWithJson(w, errResponseNotImplemented, http.StatusNotImplemented)
func (m *HttpServer) handleBlobGetStatusByID(w http.ResponseWriter, r *http.Request, idUriSegment string) {
var id blob.ID
if err := id.Decode(idUriSegment); err != nil {
respondWithJson(w, errResponseInvalidBlobID, http.StatusBadRequest)
return
}
logger := logger.With("id", id)
blobDesc, err := m.store.Describe(r.Context(), id)
switch err {
case nil:
case blob.ErrBlobNotFound:
respondWithJson(w, errResponseBlobNotFound, http.StatusNotFound)
return
default:
logger.Errorw("Failed to get status for ID", "err", err)
respondWithJson(w, errResponseInternalError(err), http.StatusInternalServerError)
return
}

response := api.GetStatusResponse{
ID: idUriSegment,
}

if blobDesc.Status != nil {
response.Replicas = make([]api.Replica, len(blobDesc.Status.Replicas))
for _, replica := range blobDesc.Status.Replicas {
response.Replicas = append(response.Replicas, api.Replica{
Provider: replica.Provider,
Status: replica.Status,
LastVerified: replica.LastVerified,
Expiration: replica.Expiration,
})
}
}
respondWithJson(w, response, http.StatusOK)
}

func (m *HttpServer) handleRoot(w http.ResponseWriter, r *http.Request) {
Expand Down
4 changes: 3 additions & 1 deletion api/server/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ func httpHeaderAllow(methods ...string) (string, string) {
func respondWithJson(w http.ResponseWriter, resp any, code int) {
w.Header().Set(httpHeaderContentTypeJson())
w.Header().Set(httpHeaderContentTypeOptionsNoSniff())
w.WriteHeader(code)
if code != http.StatusOK {
w.WriteHeader(code)
}
if err := json.NewEncoder(w).Encode(resp); err != nil {
logger.Errorw("Failed to encode response.", "code", code, "resp", resp, "err", err)
}
Expand Down
13 changes: 12 additions & 1 deletion blob/blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,21 @@ type (
Size uint64
// ModificationTime is the latest time at which the blob was modified.
ModificationTime time.Time
Status *Status
}
Status struct {
Replicas []Replica
}
Replica struct {
Provider string
Status string
LastVerified time.Time
Expiration time.Time
}
Store interface {
Put(context.Context, io.ReadCloser) (*Descriptor, error)
Get(context.Context, ID) (io.ReadSeekCloser, *Descriptor, error)
Describe(context.Context, ID) (*Descriptor, error)
Get(context.Context, ID) (io.ReadSeekCloser, error)
}
)

Expand Down
29 changes: 19 additions & 10 deletions blob/local_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,23 +59,32 @@ func (l *LocalStore) Put(_ context.Context, reader io.ReadCloser) (*Descriptor,
}, nil
}

// Get Retrieves the content of blob log with its Descriptor.
// If no file is found for the given id, ErrBlobNotFound is returned.
func (l *LocalStore) Get(_ context.Context, id ID) (io.ReadSeekCloser, *Descriptor, error) {
// Get Retrieves the content of blob.
// If no blob is found for the given id, ErrBlobNotFound is returned.
func (l *LocalStore) Get(_ context.Context, id ID) (io.ReadSeekCloser, error) {
switch blob, err := os.Open(path.Join(l.dir, id.String()+".bin")); {
case err == nil:
stat, err := blob.Stat()
if err != nil {
return nil, nil, err
}
return blob, &Descriptor{
return blob, nil
case errors.Is(err, os.ErrNotExist):
return nil, ErrBlobNotFound
default:
return nil, err
}
}

// Describe gets the description of the blob for the given id.
// If no blob is found for the given id, ErrBlobNotFound is returned.
func (l *LocalStore) Describe(ctx context.Context, id ID) (*Descriptor, error) {
switch stat, err := os.Stat(path.Join(l.dir, id.String()+".bin")); {
case err == nil:
return &Descriptor{
ID: id,
Size: uint64(stat.Size()),
ModificationTime: stat.ModTime(),
}, nil
case errors.Is(err, os.ErrNotExist):
return nil, nil, ErrBlobNotFound
return nil, ErrBlobNotFound
default:
return nil, nil, err
return nil, err
}
}
51 changes: 32 additions & 19 deletions blob/ribs_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,8 @@ type (
// RibsStore is an experimental Store implementation that uses RIBS.
// See: https://github.com/filcat/ribs
RibsStore struct {
ribs ribs.RIBS
maxSize int
//index map[uuid.UUID]*ribsStoredBlob // TODO persist this on disk
ribs ribs.RIBS
maxSize int
indexDir string
}
ribsStoredBlob struct {
Expand Down Expand Up @@ -71,9 +70,8 @@ func NewRibsStore(dir string) (*RibsStore, error) {
return nil, err
}
return &RibsStore{
ribs: rbs,
maxSize: 32 << 30, // 32 GiB
//index: map[uuid.UUID]*ribsStoredBlob{},
ribs: rbs,
maxSize: 32 << 30, // 32 GiB
indexDir: indexDir,
}, nil

Expand Down Expand Up @@ -151,24 +149,39 @@ SplitLoop:
return storedBlob.Descriptor, nil
}

func (r *RibsStore) Get(ctx context.Context, id ID) (io.ReadSeekCloser, *Descriptor, error) {
index, err := os.Open(path.Join(r.indexDir, id.String()))
func (r *RibsStore) Get(ctx context.Context, id ID) (io.ReadSeekCloser, error) {
storedBlob, err := r.describeRibsStoredBlob(ctx, id)
if err != nil {
if errors.Is(err, os.ErrNotExist) {
return nil, nil, ErrBlobNotFound
}
return nil, nil, err
}
var storedBlob ribsStoredBlob
if err := json.NewDecoder(index).Decode(&storedBlob); err != nil {
return nil, nil, err
return nil, err
}
session := r.ribs.Session(ctx)
reader, err := newRibsStoredBlobReader(session, &storedBlob)
reader, err := newRibsStoredBlobReader(session, storedBlob)
if err != nil {
return nil, err
}
return reader, nil
}

func (r *RibsStore) Describe(ctx context.Context, id ID) (*Descriptor, error) {
storedBlob, err := r.describeRibsStoredBlob(ctx, id)
if err != nil {
return nil, nil, err
return nil, err
}
return storedBlob.Descriptor, err
}

func (r *RibsStore) describeRibsStoredBlob(_ context.Context, id ID) (*ribsStoredBlob, error) {
switch index, err := os.Open(path.Join(r.indexDir, id.String())); {
case err == nil:
var storedBlob ribsStoredBlob
err := json.NewDecoder(index).Decode(&storedBlob)
// TODO: populate descriptor status with FileCoin chain data about the stored blob.
return &storedBlob, err
case errors.Is(err, os.ErrNotExist):
return nil, ErrBlobNotFound
default:
return nil, err
}
return reader, storedBlob.Descriptor, nil
}

func (r *RibsStore) Shutdown(_ context.Context) error {
Expand Down

0 comments on commit a322be5

Please sign in to comment.