Skip to content

Commit

Permalink
Fix: add on-disk deletion of shards (#145)
Browse files Browse the repository at this point in the history
* add on disk deletion of shards

* Add Test, fix persist

* change persist condition

* log delete errors

* implement suggestions

* Apply suggestions from code review
  • Loading branch information
LexLuthr authored Sep 20, 2022
1 parent 12408ab commit b9daa1d
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 3 deletions.
16 changes: 13 additions & 3 deletions dagstore_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package dagstore
import (
"context"
"fmt"

"github.com/ipfs/go-datastore"
)

type OpType int
Expand Down Expand Up @@ -297,6 +299,8 @@ func (d *DAGStore) control() {

d.lk.Lock()
delete(d.shards, s.key)

// Perform on-disk delete after the switch statement. This is only in-memory delete.
d.lk.Unlock()
res := &ShardResult{Key: s.key, Error: nil}
d.dispatchResult(res, tsk.waiter)
Expand All @@ -307,9 +311,15 @@ func (d *DAGStore) control() {

}

// persist the current shard state.
if err := s.persist(d.ctx, d.config.Datastore); err != nil { // TODO maybe fail shard?
log.Warnw("failed to persist shard", "shard", s.key, "error", err)
// persist the current shard state. If Op is OpShardDestroy then delete directly from DB.
if tsk.op == OpShardDestroy {
if err := d.store.Delete(d.ctx, datastore.NewKey(s.key.String())); err != nil {
log.Errorw("DestroyShard: failed to delete shard from database", "shard", s.key, "error", err)
}
} else {
if err := s.persist(d.ctx, d.config.Datastore); err != nil { // TODO maybe fail shard?
log.Warnw("failed to persist shard", "shard", s.key, "error", err)
}
}

// send a notification if the user provided a notification channel.
Expand Down
57 changes: 57 additions & 0 deletions dagstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,63 @@ func TestDestroyShard(t *testing.T) {
require.Equal(t, 1, len(info))
}

func TestDestroyAcrossRestart(t *testing.T) {
dir := t.TempDir()
store := datastore.NewLogDatastore(dssync.MutexWrap(datastore.NewMapDatastore()), "trace")
idx, err := index.NewFSRepo(t.TempDir())
require.NoError(t, err)

dagst, err := NewDAGStore(Config{
MountRegistry: testRegistry(t),
TransientsDir: dir,
Datastore: store,
IndexRepo: idx,
})
require.NoError(t, err)

err = dagst.Start(context.Background())
require.NoError(t, err)

keys := registerShards(t, dagst, 100, carv2mnt, RegisterOpts{})

res, err := store.Query(context.TODO(), dsq.Query{})
require.NoError(t, err)
entries, err := res.Rest()
require.NoError(t, err)
require.Len(t, entries, 100) // we have 100 shards.

// Remove 20 shards
for _, j := range keys[20:40] {
desres := make(chan ShardResult, 1)
err = dagst.DestroyShard(context.Background(), j, desres, DestroyOpts{})
require.NoError(t, err)
res := <-desres
require.NoError(t, res.Error)
}

// close the DAG store.
err = dagst.Close()
require.NoError(t, err)

// create a new dagstore with the same datastore.
dagst, err = NewDAGStore(Config{
MountRegistry: testRegistry(t),
TransientsDir: dir,
Datastore: store,
IndexRepo: idx,
})
require.NoError(t, err)

err = dagst.Start(context.Background())
require.NoError(t, err)

// Compare the number of shards after restart
info := dagst.AllShardsInfo()
l := len(info)
t.Log(l)
require.Len(t, info, 80)
}

func TestRegisterUsingExistingTransient(t *testing.T) {
ds := datastore.NewMapDatastore()
dagst, err := NewDAGStore(Config{
Expand Down

0 comments on commit b9daa1d

Please sign in to comment.