diff --git a/dagstore.go b/dagstore.go index d615b26..bb53477 100644 --- a/dagstore.go +++ b/dagstore.go @@ -9,6 +9,7 @@ import ( mh "github.com/multiformats/go-multihash" + "github.com/ipld/go-car/v2" carindex "github.com/ipld/go-car/v2/index" ds "github.com/ipfs/go-datastore" @@ -73,6 +74,7 @@ type DAGStore struct { config Config indices index.FullIndexRepo store ds.Datastore + indexer ShardIndexer // TopLevelIndex is the top level (cid -> []shards) index that maps a cid to all the shards that is present in. TopLevelIndex index.Inverted @@ -138,6 +140,8 @@ type ShardResult struct { Accessor *ShardAccessor } +type ShardIndexer func(context.Context, shard.Key, mount.Reader) (carindex.Index, error) + type Config struct { // TransientsDir is the path to directory where local transient files will // be created for remote mounts. @@ -183,6 +187,10 @@ type Config struct { // RecoverOnStart specifies whether failed shards should be recovered // on start. RecoverOnStart RecoverOnStartPolicy + + // ShardIndexer sets a custom callback for determining the index + // mapping of CID->Offset that should be registered for a shard. + ShardIndexer ShardIndexer } // NewDAGStore constructs a new DAG store with the supplied configuration. @@ -207,6 +215,12 @@ func NewDAGStore(cfg Config) (*DAGStore, error) { log.Info("using in-memory inverted index") cfg.TopLevelIndex = index.NewInverted(dssync.MutexWrap(ds.NewMapDatastore())) } + // default indexer + if cfg.ShardIndexer == nil { + cfg.ShardIndexer = func(_ context.Context, _ shard.Key, r mount.Reader) (carindex.Index, error) { + return car.ReadOrGenerateIndex(r, car.ZeroLengthSectionAsEOF(true), car.StoreIdentityCIDs(true)) + } + } // handle the datastore. if cfg.Datastore == nil { @@ -229,6 +243,7 @@ func NewDAGStore(cfg Config) (*DAGStore, error) { TopLevelIndex: cfg.TopLevelIndex, shards: make(map[shard.Key]*Shard), store: cfg.Datastore, + indexer: cfg.ShardIndexer, externalCh: make(chan *task, 128), // len=128, concurrent external tasks that can be queued up before exercising backpressure. internalCh: make(chan *task, 1), // len=1, because eventloop will only ever stage another internal event. completionCh: make(chan *task, 64), // len=64, hitting this limit will just make async tasks wait. @@ -460,7 +475,8 @@ type RecoverOpts struct { // will be notified when it completes. // // TODO add an operation identifier to ShardResult -- starts to look like -// a Trace event? +// +// a Trace event? func (d *DAGStore) RecoverShard(ctx context.Context, key shard.Key, out chan ShardResult, _ RecoverOpts) error { d.lk.Lock() s, ok := d.shards[key] diff --git a/dagstore_async.go b/dagstore_async.go index 13f5a0c..d85fe18 100644 --- a/dagstore_async.go +++ b/dagstore_async.go @@ -5,7 +5,6 @@ import ( "github.com/filecoin-project/dagstore/index" - "github.com/ipld/go-car/v2" carindex "github.com/ipld/go-car/v2/index" "github.com/multiformats/go-multihash" @@ -120,7 +119,7 @@ func (d *DAGStore) initializeShard(ctx context.Context, s *Shard, mnt mount.Moun var idx carindex.Index err = d.throttleIndex.Do(ctx, func(_ context.Context) error { var err error - idx, err = car.ReadOrGenerateIndex(reader, car.ZeroLengthSectionAsEOF(true), car.StoreIdentityCIDs(true)) + idx, err = d.indexer(ctx, s.key, reader) if err == nil { log.Debugw("initialize: finished generating index for shard", "shard", s.key) } else {