Skip to content
This repository has been archived by the owner on May 15, 2024. It is now read-only.

Commit

Permalink
Add option to use RIBS as Motion blob store
Browse files Browse the repository at this point in the history
Implement an experimental option to use RIBS as Motion `blob.Store`. The
implementation uses a flat-file directory to store an index of Motion
`blob.ID` to IPFS blocks stored by RIBS.

RIBS would then accumulate the stored information and makes deals in
the background. The SP selection and wallet configuration is hard coded
in RIBS. The FileCoin wallet is instantiated automatically if it does
not exist at `~/.ribswallet`. Making this configurable requires changes
in RIBS.

Retrieval API implements an `io.ReadSeekCloser` which allows range
reads over a list of CIDs.

Offloading onto S3 compatible APIS, like AWS S3 and CloudFlare R2 is
supported by RIBS and activated if specific environment variables are
set. See: https://github.com/FILCAT/ribs/blob/0d5f0580effc112b01fe676f080bdbf1c2c7568a/rbdeal/car_offload_s3.go#L28

The RIBS storage is disabled by default. It is enabled by setting the
option `--experimentalRibsStore`.

This implementation is experimental: it should not be used in
Production environment. It is implemented to facilitate experimentation
and scoping of the Motion project.

Disable 32-bit tests due to RIBS dependencies.
  • Loading branch information
masih committed Jul 20, 2023
1 parent cf06389 commit 08fa06e
Show file tree
Hide file tree
Showing 7 changed files with 1,937 additions and 22 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/go-test-config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"skip32bit": true
}
4 changes: 2 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ COPY go.* ./
RUN go mod download

COPY . .
RUN CGO_ENABLED=0 go build -o /go/bin/motion ./cmd/motion
RUN CGO_ENABLED=1 go build -o /go/bin/motion ./cmd/motion

FROM gcr.io/distroless/static-debian11
FROM gcr.io/distroless/base-debian11
COPY --from=build /go/bin/motion /usr/bin/

ENTRYPOINT ["/usr/bin/motion"]
1 change: 1 addition & 0 deletions blob/local_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ func NewLocalStore(dir string) *LocalStore {
// The Descriptor.ModificationTime is set to the modification date of the file that corresponds to the content.
// The Descriptor.ID is randomly generated; see NewID.
func (l *LocalStore) Put(_ context.Context, reader io.ReadCloser) (*Descriptor, error) {
// TODO: add size limiter here and return ErrBlobTooLarge.
id, err := NewID()
if err != nil {
return nil, err
Expand Down
260 changes: 260 additions & 0 deletions blob/ribs_store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,260 @@
package blob

import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"os"
"path"
"time"

"github.com/google/uuid"
"github.com/ipfs/boxo/chunker"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
"github.com/lotus-web3/ribs"
"github.com/lotus-web3/ribs/rbdeal"
"github.com/multiformats/go-multihash"
)

// TODO parameterize this.
const ribsStoreChunkSize = 1 << 20 // 1 MiB

var (
_ Store = (*RibsStore)(nil)
_ io.ReadSeekCloser = (*ribsStoredBlobReader)(nil)
)

type (
// RibsStore is an experimental Store implementation that uses RIBS.
// See: https://github.com/filcat/ribs
RibsStore struct {
ribs ribs.RIBS
maxSize int
//index map[uuid.UUID]*ribsStoredBlob // TODO persist this on disk
indexDir string
}
ribsStoredBlob struct {
*Descriptor
Chunks []cid.Cid `json:"chunks"`
}
ribsStoredBlobReader struct {
sess ribs.Session
blob *ribsStoredBlob
offset int64

currentChunkIndex int
currentChunkReader *bytes.Reader
currentChunkPendingSeek int64
}
)

// NewRibsStore instantiates a new experimental RIBS store.
func NewRibsStore(dir string) (*RibsStore, error) {
rbdealDir := path.Join(path.Clean(dir), "rbdeal")
if err := os.Mkdir(rbdealDir, 0750); err != nil && !errors.Is(err, os.ErrExist) {
return nil, fmt.Errorf("failed to create internal directories: %w", err)
}
indexDir := path.Join(path.Clean(dir), "index")
if err := os.Mkdir(indexDir, 0750); err != nil && !errors.Is(err, os.ErrExist) {
return nil, fmt.Errorf("failed to create internal directories: %w", err)
}

// TODO Path to wallet is hardcoded in RIBS. Parameterise it and allow user to configure
// See: https://github.com/FILCAT/ribs/blob/7c8766206ec1e5ec30c613cde2b3a49d0ccf25d0/rbdeal/ribs.go#L156

rbs, err := rbdeal.Open(rbdealDir)
if err != nil {
return nil, err
}
return &RibsStore{
ribs: rbs,
maxSize: 32 << 30, // 32 GiB
//index: map[uuid.UUID]*ribsStoredBlob{},
indexDir: indexDir,
}, nil

}

func (r *RibsStore) Start(_ context.Context) error {
// TODO: change RIBS to take context.
return r.ribs.Start()
}
func (r *RibsStore) Put(ctx context.Context, in io.ReadCloser) (*Descriptor, error) {

// Generate ID early to fail early if generation fails.
id, err := uuid.NewRandom()
if err != nil {
return nil, err
}
modtime := time.Now().UTC()

// TODO incorporate https://github.com/filecoin-project/data-prep-tools/tree/main/docs/best-practices
// also see: https://github.com/anjor/anelace
// TODO we could do things here to make commp etc. more efficient.
// for now this implementation remains highly experimental and optimised for velocity.
batch := r.ribs.Session(ctx).Batch(ctx)

splitter := chunk.NewSizeSplitter(in, ribsStoreChunkSize)

// TODO: Store the byte ranges for satisfying io.ReadSeaker in case chunk size is not constant across blocks?
var chunkCids []cid.Cid
var size int
SplitLoop:
for {
b, err := splitter.NextBytes()
switch err {
case io.EOF:
break SplitLoop
case nil:
size += len(b)
if size > r.maxSize {
return nil, ErrBlobTooLarge
}
mh, err := multihash.Sum(b, multihash.SHA2_256, -1)
if err != nil {
return nil, err
}
blk, err := blocks.NewBlockWithCid(b, cid.NewCidV1(cid.Raw, mh))
if err != nil {
return nil, err
}
if err := batch.Put(ctx, []blocks.Block{blk}); err != nil {
return nil, err
}
chunkCids = append(chunkCids, blk.Cid())
default:
return nil, err
}
}
if err := batch.Flush(ctx); err != nil {
return nil, err
}
storedBlob := &ribsStoredBlob{
Descriptor: &Descriptor{
ID: ID(id),
Size: uint64(size),
ModificationTime: modtime,
},
Chunks: chunkCids,
}
index, err := os.Create(path.Join(r.indexDir, id.String()))
if err != nil {
return nil, err
}
if err := json.NewEncoder(index).Encode(storedBlob); err != nil {
return nil, err
}
return storedBlob.Descriptor, nil
}

func (r *RibsStore) Get(ctx context.Context, id ID) (io.ReadSeekCloser, *Descriptor, error) {
index, err := os.Open(path.Join(r.indexDir, id.String()))
if err != nil {
if errors.Is(err, os.ErrNotExist) {
return nil, nil, ErrBlobNotFound
}
return nil, nil, err
}
var storedBlob ribsStoredBlob
if err := json.NewDecoder(index).Decode(&storedBlob); err != nil {
return nil, nil, err
}
session := r.ribs.Session(ctx)
reader, err := newRibsStoredBlobReader(session, &storedBlob)
if err != nil {
return nil, nil, err
}
return reader, storedBlob.Descriptor, nil
}

func (r *RibsStore) Shutdown(_ context.Context) error {
// TODO: change RIBS to take context.
return r.ribs.Close()
}

func (rsb *ribsStoredBlob) chunkIndexAtOffset(o int64) (int, bool) {
var i int
if o >= ribsStoreChunkSize {
i = int(o / ribsStoreChunkSize)
}
if i >= len(rsb.Chunks) {
return -1, false
}
return i, true
}

func newRibsStoredBlobReader(sess ribs.Session, rsb *ribsStoredBlob) (*ribsStoredBlobReader, error) {
return &ribsStoredBlobReader{
sess: sess,
blob: rsb,
}, nil
}

func (r *ribsStoredBlobReader) Read(p []byte) (n int, err error) {
if r.currentChunkReader == nil {
if err := r.sess.View(context.TODO(),
[]multihash.Multihash{r.blob.Chunks[r.currentChunkIndex].Hash()},
func(_ int, data []byte) {
//TODO: cache the retrieved bytes?
// but first, check if RIBS is already doing it.
r.currentChunkReader = bytes.NewReader(data)
}); err != nil {
return 0, err
}
}
if r.currentChunkPendingSeek > 0 {
if _, err := r.currentChunkReader.Seek(r.currentChunkPendingSeek, io.SeekStart); err != nil {
return 0, err
}
r.currentChunkPendingSeek = 0
}
read, err := r.currentChunkReader.Read(p)
if err == io.EOF {
if read < len(p) && r.currentChunkIndex+1 < len(r.blob.Chunks) {
r.currentChunkIndex += 1
r.currentChunkReader = nil
readRemaining, err := r.Read(p[read:])
return read + readRemaining, err
}
}
return read, err
}

func (r *ribsStoredBlobReader) Seek(offset int64, whence int) (int64, error) {
var newOffset int64
switch whence {
case io.SeekStart:
newOffset = offset
case io.SeekCurrent:
newOffset = r.offset + offset
case io.SeekEnd:
newOffset = int64(r.blob.Size) + offset
default:
return 0, fmt.Errorf("invalid whence: %d", whence)
}
if newOffset < 0 {
return 0, fmt.Errorf("offset too small: %d", offset)
}
if newOffset > int64(r.blob.Size) {
return 0, fmt.Errorf("offset beyond size: %d", offset)
}
chunkIndex, found := r.blob.chunkIndexAtOffset(newOffset)
if !found {
return 0, fmt.Errorf("offset beyond size: %d", r.offset)
}
if chunkIndex != r.currentChunkIndex && r.currentChunkReader != nil {
r.currentChunkReader = nil
}
r.offset = newOffset
r.currentChunkIndex = chunkIndex
r.currentChunkPendingSeek = newOffset % ribsStoreChunkSize
return r.offset, nil
}

func (r *ribsStoredBlobReader) Close() error {
return nil
}
34 changes: 29 additions & 5 deletions cmd/motion/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,32 @@ func main() {
Value: os.TempDir(),
EnvVars: []string{"MOTION_STORE_DIR"},
},
&cli.BoolFlag{
Name: "experimentalRibsStore",
Usage: "Whether to use experimental RIBS as the storage and deal making",
DefaultText: "Local storage is used",
},
},
Action: func(cctx *cli.Context) error {
storeDir := cctx.String("storeDir")
logger.Infow("Using local blob store", "storeDir", storeDir)
m, err := motion.New(
motion.WithBlobStore(blob.NewLocalStore(storeDir)),
)
var store blob.Store
if cctx.Bool("experimentalRibsStore") {
rbstore, err := blob.NewRibsStore(storeDir)
if err != nil {
return err
}
logger.Infow("Using RIBS blob store", "storeDir", storeDir)
if err := rbstore.Start(cctx.Context); err != nil {
logger.Errorw("Failed to start RIBS blob store", "err", err)
return err
}
store = rbstore
} else {
store = blob.NewLocalStore(storeDir)
logger.Infow("Using local blob store", "storeDir", storeDir)
}

m, err := motion.New(motion.WithBlobStore(store))
if err != nil {
logger.Fatalw("Failed to instantiate Motion", "err", err)
}
Expand All @@ -48,8 +67,13 @@ func main() {
logger.Info("Terminating...")
if err := m.Shutdown(ctx); err != nil {
logger.Warnw("Failure occurred while shutting down Motion.", "err", err)
return err
}
// TODO: Re-enable once the panic is fixed. See: https://github.com/FILCAT/ribs/issues/39
//if rbstore, ok := store.(*blob.RibsStore); ok {
// if err := rbstore.Shutdown(ctx); err != nil {
// logger.Warnw("Failure occurred while shutting down RIBS blob store.", "err", err)
// }
//}
logger.Info("Shut down Motion successfully.")
return nil
},
Expand Down
Loading

0 comments on commit 08fa06e

Please sign in to comment.