From e31956ac24e0e6946d56d88a0a2468c2340c6beb Mon Sep 17 00:00:00 2001 From: Aarsh Shah Date: Wed, 10 Aug 2022 12:13:53 +0200 Subject: [PATCH 1/2] shard destruction --- dagstore_control.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/dagstore_control.go b/dagstore_control.go index e27a370..d209da9 100644 --- a/dagstore_control.go +++ b/dagstore_control.go @@ -298,6 +298,8 @@ func (d *DAGStore) control() { d.lk.Lock() delete(d.shards, s.key) d.lk.Unlock() + res := &ShardResult{Key: s.key, Error: nil} + d.dispatchResult(res, tsk.waiter) // TODO are we guaranteed that there are no queued items for this shard? default: From 11d39cb3d4ff369ea2461860b9315a79380aa433 Mon Sep 17 00:00:00 2001 From: Aarsh Shah Date: Wed, 10 Aug 2022 12:20:00 +0200 Subject: [PATCH 2/2] dagstore test --- dagstore_test.go | 61 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 61 insertions(+) diff --git a/dagstore_test.go b/dagstore_test.go index 1c61bd2..8412516 100644 --- a/dagstore_test.go +++ b/dagstore_test.go @@ -35,6 +35,67 @@ func init() { _ = logging.SetLogLevel("dagstore", "DEBUG") } +// TestDestroyShard tests that shards are removed properly and relevant +// errors are returned in case of failure. +func TestDestroyShard(t *testing.T) { + dir := t.TempDir() + store := datastore.NewLogDatastore(dssync.MutexWrap(datastore.NewMapDatastore()), "trace") + sink := tracer(128) + dagst, err := NewDAGStore(Config{ + MountRegistry: testRegistry(t), + TransientsDir: dir, + Datastore: store, + TraceCh: sink, + }) + require.NoError(t, err) + + err = dagst.Start(context.Background()) + require.NoError(t, err) + + sh := []string{"foo", "bar"} + + for _, v := range sh { + ch := make(chan ShardResult, 1) + k := shard.KeyFromString(v) + counting := &mount.Counting{Mount: carv2mnt} + err = dagst.RegisterShard(context.Background(), k, counting, ch, RegisterOpts{ + LazyInitialization: false, + }) + require.NoError(t, err) + res1 := <-ch + require.NoError(t, res1.Error) + + info, err := dagst.GetShardInfo(k) + require.NoError(t, err) + require.Equal(t, ShardStateAvailable, info.ShardState) + } + + // Acquire one shard and keep other Available + acres := make(chan ShardResult, 1) + + err = dagst.AcquireShard(context.Background(), shard.KeyFromString(sh[0]), acres, AcquireOpts{}) + require.NoError(t, err) + + res := <-acres + require.NoError(t, res.Error) + + // Try to destroy both shards, fail for any error except Active Shards + desres1 := make(chan ShardResult, 1) + err = dagst.DestroyShard(context.Background(), shard.KeyFromString(sh[0]), desres1, DestroyOpts{}) + require.NoError(t, err) + res1 := <-desres1 + require.Contains(t, res1.Error.Error(), "failed to destroy shard; active references") + + desres2 := make(chan ShardResult, 1) + err = dagst.DestroyShard(context.Background(), shard.KeyFromString(sh[1]), desres2, DestroyOpts{}) + require.NoError(t, err) + res2 := <-desres2 + require.NoError(t, res2.Error) + + info := dagst.AllShardsInfo() + require.Equal(t, 1, len(info)) +} + func TestRegisterUsingExistingTransient(t *testing.T) { ds := datastore.NewMapDatastore() dagst, err := NewDAGStore(Config{