Skip to content

Commit

Permalink
Allow configuration of a custom shard indexer
Browse files Browse the repository at this point in the history
Supercedes #154
  • Loading branch information
willscott committed Apr 7, 2023
1 parent 1de8e01 commit d7013a7
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 3 deletions.
18 changes: 17 additions & 1 deletion dagstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

mh "github.com/multiformats/go-multihash"

"github.com/ipld/go-car/v2"
carindex "github.com/ipld/go-car/v2/index"

ds "github.com/ipfs/go-datastore"
Expand Down Expand Up @@ -73,6 +74,7 @@ type DAGStore struct {
config Config
indices index.FullIndexRepo
store ds.Datastore
indexer ShardIndexer

// TopLevelIndex is the top level (cid -> []shards) index that maps a cid to all the shards that is present in.
TopLevelIndex index.Inverted
Expand Down Expand Up @@ -138,6 +140,8 @@ type ShardResult struct {
Accessor *ShardAccessor
}

type ShardIndexer func(context.Context, shard.Key, mount.Reader) (carindex.Index, error)

type Config struct {
// TransientsDir is the path to directory where local transient files will
// be created for remote mounts.
Expand Down Expand Up @@ -183,6 +187,10 @@ type Config struct {
// RecoverOnStart specifies whether failed shards should be recovered
// on start.
RecoverOnStart RecoverOnStartPolicy

// ShardIndexer sets a custom callback for determining the index
// mapping of CID->Offset that should be registered for a shard.
ShardIndexer ShardIndexer
}

// NewDAGStore constructs a new DAG store with the supplied configuration.
Expand All @@ -207,6 +215,12 @@ func NewDAGStore(cfg Config) (*DAGStore, error) {
log.Info("using in-memory inverted index")
cfg.TopLevelIndex = index.NewInverted(dssync.MutexWrap(ds.NewMapDatastore()))
}
// default indexer
if cfg.ShardIndexer == nil {
cfg.ShardIndexer = func(_ context.Context, _ shard.Key, r mount.Reader) (carindex.Index, error) {
return car.ReadOrGenerateIndex(r, car.ZeroLengthSectionAsEOF(true), car.StoreIdentityCIDs(true))
}
}

// handle the datastore.
if cfg.Datastore == nil {
Expand All @@ -229,6 +243,7 @@ func NewDAGStore(cfg Config) (*DAGStore, error) {
TopLevelIndex: cfg.TopLevelIndex,
shards: make(map[shard.Key]*Shard),
store: cfg.Datastore,
indexer: cfg.ShardIndexer,
externalCh: make(chan *task, 128), // len=128, concurrent external tasks that can be queued up before exercising backpressure.
internalCh: make(chan *task, 1), // len=1, because eventloop will only ever stage another internal event.
completionCh: make(chan *task, 64), // len=64, hitting this limit will just make async tasks wait.
Expand Down Expand Up @@ -460,7 +475,8 @@ type RecoverOpts struct {
// will be notified when it completes.
//
// TODO add an operation identifier to ShardResult -- starts to look like
// a Trace event?
//
// a Trace event?
func (d *DAGStore) RecoverShard(ctx context.Context, key shard.Key, out chan ShardResult, _ RecoverOpts) error {
d.lk.Lock()
s, ok := d.shards[key]
Expand Down
3 changes: 1 addition & 2 deletions dagstore_async.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (

"github.com/filecoin-project/dagstore/index"

"github.com/ipld/go-car/v2"
carindex "github.com/ipld/go-car/v2/index"
"github.com/multiformats/go-multihash"

Expand Down Expand Up @@ -120,7 +119,7 @@ func (d *DAGStore) initializeShard(ctx context.Context, s *Shard, mnt mount.Moun
var idx carindex.Index
err = d.throttleIndex.Do(ctx, func(_ context.Context) error {
var err error
idx, err = car.ReadOrGenerateIndex(reader, car.ZeroLengthSectionAsEOF(true), car.StoreIdentityCIDs(true))
idx, err = d.indexer(ctx, s.key, reader)
if err == nil {
log.Debugw("initialize: finished generating index for shard", "shard", s.key)
} else {
Expand Down

0 comments on commit d7013a7

Please sign in to comment.