Skip to content

Commit

Permalink
Merge pull request #1 from vulcanize/ian_get_many
Browse files Browse the repository at this point in the history
CBOR GetManyIpldStore
  • Loading branch information
i-norden authored Aug 7, 2023
2 parents 268435e + 10d29ff commit 079030f
Showing 1 changed file with 64 additions and 0 deletions.
64 changes: 64 additions & 0 deletions store.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package cbornode
import (
"bytes"
"context"
"errors"
"fmt"

block "github.com/ipfs/go-block-format"
Expand Down Expand Up @@ -207,3 +208,66 @@ func (mb *mockBlocks) Put(ctx context.Context, b block.Block) error {
mb.data[b.Cid()] = b
return nil
}

type Cursor struct {
CID cid.Cid
Index int
Err error
}

// IpldGetManyStore wraps a GetManyBlockstore and provides an interface for retrieving CBOR encoded data in batches
type IpldGetManyStore interface {
GetMany(ctx context.Context, cs []cid.Cid, outs []interface{}) (<-chan Cursor, []cid.Cid, error)
}

// IpldGetManyBlockStore defines a subset of the go-ipfs-blockstore Blockstore interface providing a method
// for retrieving block-centered data in batches
type IpldGetManyBlockStore interface {
IpldBlockstore
GetMany(ctx context.Context, cs []cid.Cid) ([]block.Block, []cid.Cid, error)
}

// GetManyIpldStore wraps and IpldBlockstore and implements the IpldGetManyStore interface.
type GetManyIpldStore struct {
*BasicIpldStore
GetManyBlocks IpldGetManyBlockStore
}

var _ IpldStore = &GetManyIpldStore{}
var _ IpldGetManyStore = &GetManyIpldStore{}

// NewGetManyCborStore returns an IpldStore implementation backed by the provided IpldGetManyBlockStore.
func NewGetManyCborStore(bs IpldGetManyBlockStore) *GetManyIpldStore {
viewer, _ := bs.(IpldBlockstoreViewer)
bis := &BasicIpldStore{Blocks: bs, Viewer: viewer}
return &GetManyIpldStore{GetManyBlocks: bs, BasicIpldStore: bis}
}

// GetMany reads and unmarshals the content at `cs` into `outs`
// it returns a channel for tracking the position, identify, and any decode errors in output list
// as well as a list of all the CIDs that could not be retrieved from the underlying blockstore
func (g *GetManyIpldStore) GetMany(ctx context.Context, cs []cid.Cid, outs []interface{}) (<-chan Cursor, []cid.Cid, error) {
if len(cs) != len(outs) {
return nil, nil, errors.New("expected list of cids to be the same length as the destination decode list")
}
blks, missingCIDs, err := g.GetManyBlocks.GetMany(ctx, cs)
if err != nil {
return nil, nil, err
}
// tempted to make this all-or-nothing, where if there are any missing CIDs we simply return an error
// because this all feels very hacky...
cursors := make(chan Cursor)
go func() {
for i, blk := range blks {
err := g.decode(blk.RawData(), outs[i])
cursors <- Cursor{
CID: blk.Cid(),
Index: i,
Err: err,
}
}
close(cursors) // this will never be closed if the receiver encounters an error and decides to stop receiving off the channel
// but that's OK, it will eventually be garbage collected in that case
}()
return cursors, missingCIDs, nil
}

0 comments on commit 079030f

Please sign in to comment.