From 65fd5c93306213b20dc3e377ee7bafb236879c1d Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Tue, 7 Mar 2023 11:39:37 +0100 Subject: [PATCH] refactor: change index-backed blockstore dagstore interface to work on cids rather than hashes (so we can detect identity cids) --- indexbs/indexbacked_bs.go | 25 +++++++++++++++++-------- indexbs/indexbacked_bs_test.go | 12 +++++++----- 2 files changed, 24 insertions(+), 13 deletions(-) diff --git a/indexbs/indexbacked_bs.go b/indexbs/indexbacked_bs.go index d2edae8..f2717eb 100644 --- a/indexbs/indexbacked_bs.go +++ b/indexbs/indexbacked_bs.go @@ -35,16 +35,15 @@ type accessorWithBlockstore struct { bs dagstore.ReadBlockstore } -type blockstoreAcquire struct { - once sync.Once - bs dagstore.ReadBlockstore - err error +type IdxBstoreDagstore interface { + ShardsContainingCid(ctx context.Context, c cid.Cid) ([]shard.Key, error) + AcquireShard(ctx context.Context, key shard.Key, out chan dagstore.ShardResult, _ dagstore.AcquireOpts) error } // IndexBackedBlockstore is a read only blockstore over all cids across all shards in the dagstore. type IndexBackedBlockstore struct { ctx context.Context - d dagstore.Interface + d IdxBstoreDagstore shardSelectF ShardSelectorF // caches the blockstore for a given shard for shard read affinity @@ -54,7 +53,7 @@ type IndexBackedBlockstore struct { stripedLock [256]sync.Mutex } -func NewIndexBackedBlockstore(ctx context.Context, d dagstore.Interface, shardSelector ShardSelectorF, maxCacheSize int, cacheExpire time.Duration) (blockstore.Blockstore, error) { +func NewIndexBackedBlockstore(ctx context.Context, d IdxBstoreDagstore, shardSelector ShardSelectorF, maxCacheSize int, cacheExpire time.Duration) (blockstore.Blockstore, error) { cache := ttlcache.NewCache() cache.SetTTL(cacheExpire) cache.SetCacheSizeLimit(maxCacheSize) @@ -124,7 +123,7 @@ func (ro *IndexBackedBlockstore) execOpWithLogs(ctx context.Context, c cid.Cid, func (ro *IndexBackedBlockstore) execOp(ctx context.Context, c cid.Cid, op BlockstoreOp) (*opRes, error) { // Fetch all the shards containing the multihash - shards, err := ro.d.ShardsContainingMultihash(ctx, c.Hash()) + shards, err := ro.d.ShardsContainingCid(ctx, c) if err != nil { if errors.Is(err, datastore.ErrNotFound) { return nil, ErrBlockNotFound @@ -242,7 +241,7 @@ func (ro *IndexBackedBlockstore) Has(ctx context.Context, c cid.Cid) (bool, erro logbs.Debugw("Has", "cid", c) // Get shards that contain the cid's hash - shards, err := ro.d.ShardsContainingMultihash(ctx, c.Hash()) + shards, err := ro.d.ShardsContainingCid(ctx, c) if err != nil { logbs.Debugw("Has error", "cid", c, "err", err) return false, nil @@ -283,3 +282,13 @@ func (ro *IndexBackedBlockstore) PutMany(context.Context, []blocks.Block) error func (ro *IndexBackedBlockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { return nil, errors.New("unsupported operation AllKeysChan") } + +type IdxBstoreDagstoreFromDagstore struct { + dagstore.Interface +} + +var _ IdxBstoreDagstore = (*IdxBstoreDagstoreFromDagstore)(nil) + +func (d *IdxBstoreDagstoreFromDagstore) ShardsContainingCid(ctx context.Context, c cid.Cid) ([]shard.Key, error) { + return d.Interface.ShardsContainingMultihash(ctx, c.Hash()) +} diff --git a/indexbs/indexbacked_bs_test.go b/indexbs/indexbacked_bs_test.go index 89f97fc..2b787b5 100644 --- a/indexbs/indexbacked_bs_test.go +++ b/indexbs/indexbacked_bs_test.go @@ -50,7 +50,8 @@ func TestIndexBackedBlockstore(t *testing.T) { res := <-ch require.NoError(t, res.Error) - rbs, err := NewIndexBackedBlockstore(ctx, dagst, noOpSelector, 10, time.Minute) + ibsapi := &IdxBstoreDagstoreFromDagstore{Interface: dagst} + rbs, err := NewIndexBackedBlockstore(ctx, ibsapi, noOpSelector, 10, time.Minute) require.NoError(t, err) // iterate over the CARV2 Index for the given CARv2 file and ensure the readonly blockstore @@ -111,7 +112,7 @@ func TestIndexBackedBlockstore(t *testing.T) { return shard.Key{}, rejectedErr } - rbs, err = NewIndexBackedBlockstore(ctx, dagst, fss, 10, time.Minute) + rbs, err = NewIndexBackedBlockstore(ctx, ibsapi, fss, 10, time.Minute) require.NoError(t, err) it.ForEach(func(mh multihash.Multihash, u uint64) error { c := cid.NewCidV1(cid.Raw, mh) @@ -137,7 +138,7 @@ func TestIndexBackedBlockstore(t *testing.T) { return shard.Key{}, ErrNoShardSelected } - rbs, err = NewIndexBackedBlockstore(ctx, dagst, fss, 10, time.Minute) + rbs, err = NewIndexBackedBlockstore(ctx, ibsapi, fss, 10, time.Minute) require.NoError(t, err) it.ForEach(func(mh multihash.Multihash, u uint64) error { c := cid.NewCidV1(cid.Raw, mh) @@ -165,7 +166,7 @@ func TestIndexBackedBlockstore(t *testing.T) { notFoundCid, err := cid.Parse("bafzbeigai3eoy2ccc7ybwjfz5r3rdxqrinwi4rwytly24tdbh6yk7zslrm") require.NoError(t, err) - rbs, err = NewIndexBackedBlockstore(ctx, dagst, noOpSelector, 10, time.Minute) + rbs, err = NewIndexBackedBlockstore(ctx, ibsapi, noOpSelector, 10, time.Minute) require.NoError(t, err) // Has should return false @@ -219,7 +220,8 @@ func TestIndexBackedBlockstoreFuzz(t *testing.T) { sks = append(sks, sk) } - rbs, err := NewIndexBackedBlockstore(ctx, dagst, noOpSelector, 3, time.Minute) + ibsapi := &IdxBstoreDagstoreFromDagstore{Interface: dagst} + rbs, err := NewIndexBackedBlockstore(ctx, ibsapi, noOpSelector, 3, time.Minute) require.NoError(t, err) var errg errgroup.Group