Skip to content

Commit

Permalink
Merge pull request #88 from filecoin-project/fix/gc-reconcile
Browse files Browse the repository at this point in the history
refactor GC and implement reconciliation.
  • Loading branch information
aarshkshah1992 authored Jul 26, 2021
2 parents 0973071 + a9ca706 commit 187c97e
Show file tree
Hide file tree
Showing 6 changed files with 294 additions and 164 deletions.
69 changes: 21 additions & 48 deletions dagstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/filecoin-project/dagstore/index"
"github.com/filecoin-project/dagstore/mount"
"github.com/filecoin-project/dagstore/shard"
"github.com/hashicorp/go-multierror"
ds "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
"github.com/ipfs/go-datastore/query"
Expand Down Expand Up @@ -86,6 +85,8 @@ type DAGStore struct {
// back to the application. Serviced by a dispatcher goroutine.
// See note in dispatchResultsCh for background.
dispatchFailuresCh chan *dispatch
// gcCh is where requests for GC are sent.
gcCh chan chan *GCResult

// Channels not owned by us.
//
Expand Down Expand Up @@ -210,6 +211,7 @@ func NewDAGStore(cfg Config) (*DAGStore, error) {
internalCh: make(chan *task, 1), // len=1, because eventloop will only ever stage another internal event.
completionCh: make(chan *task, 64), // len=64, hitting this limit will just make async tasks wait.
dispatchResultsCh: make(chan *dispatch, 128), // len=128, same as externalCh.
gcCh: make(chan chan *GCResult, 8),
traceCh: cfg.TraceCh,
failureCh: cfg.FailureCh,
throttleFetch: noopThrottler{},
Expand All @@ -231,6 +233,10 @@ func NewDAGStore(cfg Config) (*DAGStore, error) {
return nil, fmt.Errorf("failed to restore dagstore state: %w", err)
}

if err := dagst.clearOrphaned(); err != nil {
log.Warnf("failed to clear orphaned files on startup: %s", err)
}

// Reset in-progress states.
//
// Queue shards whose registration needs to be restarted. Release those
Expand Down Expand Up @@ -470,57 +476,24 @@ func (d *DAGStore) AllShardsInfo() AllShardsInfo {
return ret
}

// GC attempts to reclaim the transient files of shards that are currently
// available but inactive.
// GC performs DAG store garbage collection by reclaiming transient files of
// shards that are currently available but inactive, or errored.
//
// It is not strictly atomic for now, as it determines which shards to reclaim
// first, sends operations to the event loop, and waits for them to execute.
// In the meantime, there could be state transitions that change reclaimability
// of shards (some shards deemed reclaimable are no longer so, and vice versa).
//
// However, the event loop checks for safety prior to deletion, so it will skip
// over shards that are no longer safe to delete.
func (d *DAGStore) GC(ctx context.Context) (map[shard.Key]error, error) {
var (
merr *multierror.Error
reclaim []*Shard
)

d.lk.RLock()
for _, s := range d.shards {
s.lk.RLock()
if s.state == ShardStateAvailable || s.state == ShardStateErrored {
reclaim = append(reclaim, s)
}
s.lk.RUnlock()
}
d.lk.RUnlock()

var await int
ch := make(chan ShardResult, len(reclaim))
for _, s := range reclaim {
tsk := &task{op: OpShardGC, shard: s, waiter: &waiter{ctx: ctx, outCh: ch}}

err := d.queueTask(tsk, d.externalCh)
if err == nil {
await++
} else {
merr = multierror.Append(merr, fmt.Errorf("failed to enqueue GC task for shard %s: %w", s.key, err))
}
// GC runs with exclusivity from the event loop.
func (d *DAGStore) GC(ctx context.Context) (*GCResult, error) {
ch := make(chan *GCResult)
select {
case d.gcCh <- ch:
case <-ctx.Done():
return nil, ctx.Err()
}

// collect all results.
results := make(map[shard.Key]error, await)
for i := 0; i < await; i++ {
select {
case res := <-ch:
results[res.Key] = res.Error
case <-ctx.Done():
return results, ctx.Err()
}
select {
case res := <-ch:
return res, nil
case <-ctx.Done():
return nil, ctx.Err()
}

return results, nil
}

func (d *DAGStore) Close() error {
Expand Down
73 changes: 30 additions & 43 deletions dagstore_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ const (
OpShardAcquire
OpShardFail
OpShardRelease
OpShardGC
OpShardRecover
)

Expand All @@ -28,39 +27,42 @@ func (o OpType) String() string {
"OpShardAcquire",
"OpShardFail",
"OpShardRelease",
"OpShardGC",
"OpShardRecover"}[o]
}

// control runs the DAG store's event loop.
func (d *DAGStore) control() {
defer d.wg.Done()

var (
tsk *task
prevState ShardState
err error

// wFailure is a synthetic failure waiter that uses the DAGStore's
// global context and the failure channel. Only safe to actually use if
// d.failureCh != nil. wFailure is used to dispatch failure
// notifications to the application.
wFailure = &waiter{ctx: d.ctx, outCh: d.failureCh}
)
// wFailure is a synthetic failure waiter that uses the DAGStore's
// global context and the failure channel. Only safe to actually use if
// d.failureCh != nil. wFailure is used to dispatch failure
// notifications to the application.
var wFailure = &waiter{ctx: d.ctx, outCh: d.failureCh}

for {
// consume the next task; if we're shutting down, this method will error.
if tsk, err = d.consumeNext(); err != nil {
log.Errorw("failed to consume next task in event loop, will return from event loop", "error", err)
break
// consume the next task or GC request; if we're shutting down, this method will error.
tsk, gc, err := d.consumeNext()
if err != nil {
if err == context.Canceled {
log.Infow("dagstore closed")
} else {
log.Errorw("consuming next task failed; aborted event loop; dagstore unoperational", "error", err)
}
return
}

log.Debugw("processing task", "op", tsk.op, "shard", tsk.shard.key, "error", tsk.err)
if gc != nil {
// this was a GC request.
d.gc(gc)
continue
}

s := tsk.shard
s.lk.Lock()
log.Debugw("processing task", "op", tsk.op, "shard", tsk.shard.key, "error", tsk.err)

prevState = s.state
s.lk.Lock()
prevState := s.state

switch tsk.op {
case OpShardRegister:
Expand Down Expand Up @@ -298,19 +300,6 @@ func (d *DAGStore) control() {
d.lk.Unlock()
// TODO are we guaranteed that there are no queued items for this shard?

case OpShardGC:
var err error
if nAcq := len(s.wAcquire); s.state == ShardStateAvailable || s.state == ShardStateErrored || nAcq == 0 {
err = s.mount.DeleteTransient()
if err != nil {
log.Warnw("failed to delete transient", "shard", s.key, "error", err)
}
} else {
err = fmt.Errorf("ignored request to GC shard in state %s with queued acquirers=%d", s.state, nAcq)
}
res := &ShardResult{Key: s.key, Error: err}
d.dispatchResult(res, tsk.waiter)

default:
panic(fmt.Sprintf("unrecognized shard operation: %d", tsk.op))

Expand Down Expand Up @@ -341,27 +330,25 @@ func (d *DAGStore) control() {

s.lk.Unlock()
}

if err != context.Canceled {
log.Errorw("consuming next task failed; aborted event loop; dagstore unoperational", "error", err)
}
}

func (d *DAGStore) consumeNext() (tsk *task, error error) {
func (d *DAGStore) consumeNext() (tsk *task, gc chan *GCResult, error error) {
select {
case tsk = <-d.internalCh: // drain internal first; these are tasks emitted from the event loop.
return tsk, nil
return tsk, nil, nil
case <-d.ctx.Done():
return nil, d.ctx.Err() // TODO drain and process before returning?
return nil, nil, d.ctx.Err() // TODO drain and process before returning?
default:
}

select {
case tsk = <-d.externalCh:
return tsk, nil
return tsk, nil, nil
case tsk = <-d.completionCh:
return tsk, nil
return tsk, nil, nil
case gc := <-d.gcCh:
return nil, gc, nil
case <-d.ctx.Done():
return nil, d.ctx.Err() // TODO drain and process before returning?
return nil, nil, d.ctx.Err() // TODO drain and process before returning?
}
}
104 changes: 104 additions & 0 deletions dagstore_gc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package dagstore

import (
"io/fs"
"os"
"path/filepath"

"github.com/filecoin-project/dagstore/shard"
)

// GCResult is the result of performing a GC operation. It holds the results
// from deleting unused transients.
type GCResult struct {
// Shards includes an entry for every shard whose transient was reclaimed.
// Nil error values indicate success.
Shards map[shard.Key]error
}

// ShardFailures returns the number of shards whose transient reclaim failed.
func (e *GCResult) ShardFailures() int {
var failures int
for _, err := range e.Shards {
if err != nil {
failures++
}
}
return failures
}

// gc performs DAGStore GC. Refer to DAGStore#GC for more information.
//
// The event loops gives it exclusive execution rights, so while GC is running,
// no other events are being processed.
func (d *DAGStore) gc(resCh chan *GCResult) {
res := &GCResult{
Shards: make(map[shard.Key]error),
}

// determine which shards can be reclaimed.
d.lk.RLock()
var reclaim []*Shard
for _, s := range d.shards {
s.lk.RLock()
if nAcq := len(s.wAcquire); (s.state == ShardStateAvailable || s.state == ShardStateErrored) && nAcq == 0 {
reclaim = append(reclaim, s)
}
s.lk.RUnlock()
}
d.lk.RUnlock()

// attempt to delete transients of reclaimed shards.
for _, s := range reclaim {
// only read lock: we're not modifying state, and the mount has its own lock.
s.lk.RLock()
err := s.mount.DeleteTransient()
if err != nil {
log.Warnw("failed to delete transient", "shard", s.key, "error", err)
}

// record the error so we can return it.
res.Shards[s.key] = err

// flush the shard state to the datastore.
if err := s.persist(d.config.Datastore); err != nil {
log.Warnw("failed to persist shard", "shard", s.key, "error", err)
}
s.lk.RUnlock()
}

select {
case resCh <- res:
case <-d.ctx.Done():
}
}

// clearOrphaned removes files that are not referenced by any mount.
//
// This is only safe to be called from the constructor, before we have
// queued tasks.
func (d *DAGStore) clearOrphaned() error {
referenced := make(map[string]struct{})

for _, s := range d.shards {
t := s.mount.TransientPath()
referenced[t] = struct{}{}
}

// Walk the transients dir and delete unreferenced files.
err := filepath.WalkDir(d.config.TransientsDir, func(path string, d fs.DirEntry, err error) error {
if d.IsDir() {
return nil
}
if _, ok := referenced[path]; !ok {
if err := os.Remove(path); err != nil {
log.Warnw("failed to delete orphaned file", "path", path, "error", err)
} else {
log.Infow("deleted orphaned file", "path", path)
}
}
return nil
})

return err
}
Loading

0 comments on commit 187c97e

Please sign in to comment.