From 5d4e8e99b8b2b65153d0fc9895b1dd732869e35a Mon Sep 17 00:00:00 2001 From: "Masih H. Derkani" Date: Thu, 20 Jul 2023 20:15:22 +0100 Subject: [PATCH] Extend blob store to return status and handle /status Change the blob store interface to separate `Describe` from `Get`, and extend the content of `blob.Descriptor` to include `blob.Status`. Reflect the changes on the existing stores. Leave TODOs in place for the actual information fetching of the deals made by the store to be implemented in separate PRs. Change the sever to correctly handle `GET /v0/blob/{id}/status`. --- api/model.go | 12 +++++++++ api/server/error_response.go | 1 - api/server/handler.go | 49 +++++++++++++++++++++++++++++++--- api/server/util.go | 4 ++- blob/blob.go | 13 ++++++++- blob/local_store.go | 29 +++++++++++++------- blob/ribs_store.go | 51 ++++++++++++++++++++++-------------- 7 files changed, 124 insertions(+), 35 deletions(-) diff --git a/api/model.go b/api/model.go index 967d571..3763476 100644 --- a/api/model.go +++ b/api/model.go @@ -1,5 +1,7 @@ package api +import "time" + type ( // PostBlobResponse represents the response to a successful POST request to upload a blob. PostBlobResponse struct { @@ -11,4 +13,14 @@ type ( // Error is the description of the error. Error string `json:"error"` } + GetStatusResponse struct { + ID string `json:"id"` + Replicas []Replica `json:"Replicas,omitempty"` + } + Replica struct { + Provider string `json:"provider"` + Status string `json:"status"` + LastVerified time.Time `json:"lastVerified"` + Expiration time.Time `json:"expiration"` + } ) diff --git a/api/server/error_response.go b/api/server/error_response.go index 6f813e6..5f64e35 100644 --- a/api/server/error_response.go +++ b/api/server/error_response.go @@ -12,7 +12,6 @@ var ( errResponseBlobNotFound = api.ErrorResponse{Error: "No blob is found for the given ID"} errResponseNotStreamContentType = api.ErrorResponse{Error: `Invalid content type, expected "application/octet-stream".`} errResponseInvalidContentLength = api.ErrorResponse{Error: "Invalid content length, expected unsigned numerical value."} - errResponseNotImplemented = api.ErrorResponse{Error: "This functionally is pending implementation."} ) func errResponseInternalError(err error) api.ErrorResponse { diff --git a/api/server/handler.go b/api/server/handler.go index da4c0f7..4b3cf28 100644 --- a/api/server/handler.go +++ b/api/server/handler.go @@ -94,7 +94,17 @@ func (m *HttpServer) handleBlobGetByID(w http.ResponseWriter, r *http.Request, i return } logger := logger.With("id", id) - blobReader, blobDesc, err := m.store.Get(r.Context(), id) + blobDesc, err := m.store.Describe(r.Context(), id) + switch err { + case nil: + case blob.ErrBlobNotFound: + respondWithJson(w, errResponseBlobNotFound, http.StatusNotFound) + return + default: + respondWithJson(w, errResponseInternalError(err), http.StatusInternalServerError) + return + } + blobReader, err := m.store.Get(r.Context(), id) switch err { case nil: case blob.ErrBlobNotFound: @@ -113,8 +123,41 @@ func (m *HttpServer) handleBlobGetByID(w http.ResponseWriter, r *http.Request, i logger.Debug("Blob fetched successfully") } -func (m *HttpServer) handleBlobGetStatusByID(w http.ResponseWriter, _ *http.Request, _ string) { - respondWithJson(w, errResponseNotImplemented, http.StatusNotImplemented) +func (m *HttpServer) handleBlobGetStatusByID(w http.ResponseWriter, r *http.Request, idUriSegment string) { + var id blob.ID + if err := id.Decode(idUriSegment); err != nil { + respondWithJson(w, errResponseInvalidBlobID, http.StatusBadRequest) + return + } + logger := logger.With("id", id) + blobDesc, err := m.store.Describe(r.Context(), id) + switch err { + case nil: + case blob.ErrBlobNotFound: + respondWithJson(w, errResponseBlobNotFound, http.StatusNotFound) + return + default: + logger.Errorw("Failed to get status for ID", "err", err) + respondWithJson(w, errResponseInternalError(err), http.StatusInternalServerError) + return + } + + response := api.GetStatusResponse{ + ID: idUriSegment, + } + + if blobDesc.Status != nil { + response.Replicas = make([]api.Replica, len(blobDesc.Status.Replicas)) + for _, replica := range blobDesc.Status.Replicas { + response.Replicas = append(response.Replicas, api.Replica{ + Provider: replica.Provider, + Status: replica.Status, + LastVerified: replica.LastVerified, + Expiration: replica.Expiration, + }) + } + } + respondWithJson(w, response, http.StatusOK) } func (m *HttpServer) handleRoot(w http.ResponseWriter, r *http.Request) { diff --git a/api/server/util.go b/api/server/util.go index c51e4ad..d6ffada 100644 --- a/api/server/util.go +++ b/api/server/util.go @@ -31,7 +31,9 @@ func httpHeaderAllow(methods ...string) (string, string) { func respondWithJson(w http.ResponseWriter, resp any, code int) { w.Header().Set(httpHeaderContentTypeJson()) w.Header().Set(httpHeaderContentTypeOptionsNoSniff()) - w.WriteHeader(code) + if code != http.StatusOK { + w.WriteHeader(code) + } if err := json.NewEncoder(w).Encode(resp); err != nil { logger.Errorw("Failed to encode response.", "code", code, "resp", resp, "err", err) } diff --git a/blob/blob.go b/blob/blob.go index 565e294..70474a7 100644 --- a/blob/blob.go +++ b/blob/blob.go @@ -25,10 +25,21 @@ type ( Size uint64 // ModificationTime is the latest time at which the blob was modified. ModificationTime time.Time + Status *Status + } + Status struct { + Replicas []Replica + } + Replica struct { + Provider string + Status string + LastVerified time.Time + Expiration time.Time } Store interface { Put(context.Context, io.ReadCloser) (*Descriptor, error) - Get(context.Context, ID) (io.ReadSeekCloser, *Descriptor, error) + Describe(context.Context, ID) (*Descriptor, error) + Get(context.Context, ID) (io.ReadSeekCloser, error) } ) diff --git a/blob/local_store.go b/blob/local_store.go index 3c474fa..488a924 100644 --- a/blob/local_store.go +++ b/blob/local_store.go @@ -59,23 +59,32 @@ func (l *LocalStore) Put(_ context.Context, reader io.ReadCloser) (*Descriptor, }, nil } -// Get Retrieves the content of blob log with its Descriptor. -// If no file is found for the given id, ErrBlobNotFound is returned. -func (l *LocalStore) Get(_ context.Context, id ID) (io.ReadSeekCloser, *Descriptor, error) { +// Get Retrieves the content of blob. +// If no blob is found for the given id, ErrBlobNotFound is returned. +func (l *LocalStore) Get(_ context.Context, id ID) (io.ReadSeekCloser, error) { switch blob, err := os.Open(path.Join(l.dir, id.String()+".bin")); { case err == nil: - stat, err := blob.Stat() - if err != nil { - return nil, nil, err - } - return blob, &Descriptor{ + return blob, nil + case errors.Is(err, os.ErrNotExist): + return nil, ErrBlobNotFound + default: + return nil, err + } +} + +// Describe gets the description of the blob for the given id. +// If no blob is found for the given id, ErrBlobNotFound is returned. +func (l *LocalStore) Describe(ctx context.Context, id ID) (*Descriptor, error) { + switch stat, err := os.Stat(path.Join(l.dir, id.String()+".bin")); { + case err == nil: + return &Descriptor{ ID: id, Size: uint64(stat.Size()), ModificationTime: stat.ModTime(), }, nil case errors.Is(err, os.ErrNotExist): - return nil, nil, ErrBlobNotFound + return nil, ErrBlobNotFound default: - return nil, nil, err + return nil, err } } diff --git a/blob/ribs_store.go b/blob/ribs_store.go index fc3800a..361fb67 100644 --- a/blob/ribs_store.go +++ b/blob/ribs_store.go @@ -32,9 +32,8 @@ type ( // RibsStore is an experimental Store implementation that uses RIBS. // See: https://github.com/filcat/ribs RibsStore struct { - ribs ribs.RIBS - maxSize int - //index map[uuid.UUID]*ribsStoredBlob // TODO persist this on disk + ribs ribs.RIBS + maxSize int indexDir string } ribsStoredBlob struct { @@ -71,9 +70,8 @@ func NewRibsStore(dir string) (*RibsStore, error) { return nil, err } return &RibsStore{ - ribs: rbs, - maxSize: 32 << 30, // 32 GiB - //index: map[uuid.UUID]*ribsStoredBlob{}, + ribs: rbs, + maxSize: 32 << 30, // 32 GiB indexDir: indexDir, }, nil @@ -151,24 +149,39 @@ SplitLoop: return storedBlob.Descriptor, nil } -func (r *RibsStore) Get(ctx context.Context, id ID) (io.ReadSeekCloser, *Descriptor, error) { - index, err := os.Open(path.Join(r.indexDir, id.String())) +func (r *RibsStore) Get(ctx context.Context, id ID) (io.ReadSeekCloser, error) { + storedBlob, err := r.describeRibsStoredBlob(ctx, id) if err != nil { - if errors.Is(err, os.ErrNotExist) { - return nil, nil, ErrBlobNotFound - } - return nil, nil, err - } - var storedBlob ribsStoredBlob - if err := json.NewDecoder(index).Decode(&storedBlob); err != nil { - return nil, nil, err + return nil, err } session := r.ribs.Session(ctx) - reader, err := newRibsStoredBlobReader(session, &storedBlob) + reader, err := newRibsStoredBlobReader(session, storedBlob) + if err != nil { + return nil, err + } + return reader, nil +} + +func (r *RibsStore) Describe(ctx context.Context, id ID) (*Descriptor, error) { + storedBlob, err := r.describeRibsStoredBlob(ctx, id) if err != nil { - return nil, nil, err + return nil, err + } + return storedBlob.Descriptor, err +} + +func (r *RibsStore) describeRibsStoredBlob(_ context.Context, id ID) (*ribsStoredBlob, error) { + switch index, err := os.Open(path.Join(r.indexDir, id.String())); { + case err == nil: + var storedBlob ribsStoredBlob + err := json.NewDecoder(index).Decode(&storedBlob) + // TODO: populate descriptor status with FileCoin chain data about the stored blob. + return &storedBlob, err + case errors.Is(err, os.ErrNotExist): + return nil, ErrBlobNotFound + default: + return nil, err } - return reader, storedBlob.Descriptor, nil } func (r *RibsStore) Shutdown(_ context.Context) error {