From 79ddfe1ba4f9bbc8bc54f47cb13d3025ec976b76 Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Mon, 3 Oct 2022 11:04:17 +0200 Subject: [PATCH 1/2] fix: cast in index backed blockstore --- indexbs/indexbacked_bs.go | 2 +- indexbs/indexbacked_bs_test.go | 101 ++++++++++++++++++++++++++++++++- 2 files changed, 101 insertions(+), 2 deletions(-) diff --git a/indexbs/indexbacked_bs.go b/indexbs/indexbacked_bs.go index 74166c7..385a96c 100644 --- a/indexbs/indexbacked_bs.go +++ b/indexbs/indexbacked_bs.go @@ -180,7 +180,7 @@ func (ro *IndexBackedBlockstore) execOp(ctx context.Context, c cid.Cid, op Block // thread was waiting to enter the sync.Once val, ok := ro.blockstoreCache.Get(sk) if ok { - return val.(dagstore.ReadBlockstore), nil + return val.(*accessorWithBlockstore).bs, nil } // Acquire the blockstore for the selected shard diff --git a/indexbs/indexbacked_bs_test.go b/indexbs/indexbacked_bs_test.go index 08972aa..7ab9d13 100644 --- a/indexbs/indexbacked_bs_test.go +++ b/indexbs/indexbacked_bs_test.go @@ -3,6 +3,7 @@ package indexbs import ( "context" "errors" + "fmt" "testing" "golang.org/x/sync/errgroup" @@ -26,7 +27,7 @@ var noOpSelector = func(c cid.Cid, shards []shard.Key) (shard.Key, error) { var carv2mnt = &mount.FSMount{FS: testdata.FS, Path: testdata.FSPathCarV2} -func TestReadOnlyBs(t *testing.T) { +func TestIndexBackedBlockstore(t *testing.T) { ctx := context.Background() store := dssync.MutexWrap(datastore.NewMapDatastore()) dagst, err := dagstore.NewDAGStore(dagstore.Config{ @@ -181,6 +182,104 @@ func TestReadOnlyBs(t *testing.T) { require.EqualValues(t, 0, sz) } +func TestIndexBackedBlockstoreFuzz(t *testing.T) { + ctx := context.Background() + store := dssync.MutexWrap(datastore.NewMapDatastore()) + dagst, err := dagstore.NewDAGStore(dagstore.Config{ + MountRegistry: testRegistry(t), + TransientsDir: t.TempDir(), + Datastore: store, + }) + require.NoError(t, err) + + err = dagst.Start(context.Background()) + require.NoError(t, err) + + // register some shards + var sks []shard.Key + for i := 0; i < 10; i++ { + ch := make(chan dagstore.ShardResult, 1) + sk := shard.KeyFromString(fmt.Sprintf("test%d", i)) + err = dagst.RegisterShard(context.Background(), sk, carv2mnt, ch, dagstore.RegisterOpts{}) + require.NoError(t, err) + res := <-ch + require.NoError(t, res.Error) + sks = append(sks, sk) + } + + rbs, err := NewIndexBackedBlockstore(ctx, dagst, noOpSelector, 10) + require.NoError(t, err) + + var errg errgroup.Group + for _, sk := range sks { + sk := sk + errg.Go(func() error { + it, err := dagst.GetIterableIndex(sk) + if err != nil { + return err + } + + var skerrg errgroup.Group + for i := 0; i < 10; i++ { + it.ForEach(func(mh multihash.Multihash, _ uint64) error { + mhs := mh + skerrg.Go(func() error { + c := cid.NewCidV1(cid.Raw, mhs) + + errs := make(chan error, 3) + go func() { + has, err := rbs.Has(ctx, c) + if err != nil { + errs <- err + } + if has { + errs <- nil + } else { + errs <- errors.New("has should be true") + } + }() + + go func() { + blk, err := rbs.Get(ctx, c) + if err != nil { + errs <- err + } + if blk == nil { + errs <- errors.New("block should not be empty") + return + } + + // ensure cids match + if blk.Cid() != c { + errs <- errors.New("cid mismatch") + return + } + errs <- nil + }() + + go func() { + _, err = rbs.GetSize(ctx, c) + errs <- err + }() + + for i := 0; i < 3; i++ { + err := <-errs + if err != nil { + return err + } + } + return nil + }) + + return nil + }) + } + return skerrg.Wait() + }) + } + require.NoError(t, errg.Wait()) +} + func testRegistry(t *testing.T) *mount.Registry { r := mount.NewRegistry() err := r.Register("fs", &mount.FSMount{FS: testdata.FS}) From 072cace8cd8bb593e4050488c59b80ab46bfb7ca Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Mon, 3 Oct 2022 14:03:32 +0200 Subject: [PATCH 2/2] fix: dont exceed race detector go-routine limit --- indexbs/indexbacked_bs_test.go | 83 +++++++++++++++------------------- 1 file changed, 36 insertions(+), 47 deletions(-) diff --git a/indexbs/indexbacked_bs_test.go b/indexbs/indexbacked_bs_test.go index 7ab9d13..e5a95f8 100644 --- a/indexbs/indexbacked_bs_test.go +++ b/indexbs/indexbacked_bs_test.go @@ -197,7 +197,7 @@ func TestIndexBackedBlockstoreFuzz(t *testing.T) { // register some shards var sks []shard.Key - for i := 0; i < 10; i++ { + for i := 0; i < 3; i++ { ch := make(chan dagstore.ShardResult, 1) sk := shard.KeyFromString(fmt.Sprintf("test%d", i)) err = dagst.RegisterShard(context.Background(), sk, carv2mnt, ch, dagstore.RegisterOpts{}) @@ -219,62 +219,51 @@ func TestIndexBackedBlockstoreFuzz(t *testing.T) { return err } - var skerrg errgroup.Group - for i := 0; i < 10; i++ { + for i := 0; i < 3; i++ { + var skerrg errgroup.Group it.ForEach(func(mh multihash.Multihash, _ uint64) error { mhs := mh + c := cid.NewCidV1(cid.Raw, mhs) skerrg.Go(func() error { - c := cid.NewCidV1(cid.Raw, mhs) - - errs := make(chan error, 3) - go func() { - has, err := rbs.Has(ctx, c) - if err != nil { - errs <- err - } - if has { - errs <- nil - } else { - errs <- errors.New("has should be true") - } - }() - - go func() { - blk, err := rbs.Get(ctx, c) - if err != nil { - errs <- err - } - if blk == nil { - errs <- errors.New("block should not be empty") - return - } - - // ensure cids match - if blk.Cid() != c { - errs <- errors.New("cid mismatch") - return - } - errs <- nil - }() - - go func() { - _, err = rbs.GetSize(ctx, c) - errs <- err - }() - - for i := 0; i < 3; i++ { - err := <-errs - if err != nil { - return err - } + has, err := rbs.Has(ctx, c) + if err != nil { + return err + } + if !has { + return errors.New("has should be true") } return nil }) + skerrg.Go(func() error { + blk, err := rbs.Get(ctx, c) + if err != nil { + return err + } + if blk == nil { + return errors.New("block should not be empty") + } + + // ensure cids match + if blk.Cid() != c { + return errors.New("cid mismatch") + } + return nil + }) + + skerrg.Go(func() error { + _, err := rbs.GetSize(ctx, c) + return err + }) + return nil }) + err := skerrg.Wait() + if err != nil { + return err + } } - return skerrg.Wait() + return nil }) } require.NoError(t, errg.Wait())