Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dagstore gc and event loop changes #131

Open
wants to merge 1 commit into
base: feat/reserving-upgrader
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
162 changes: 156 additions & 6 deletions dagstore_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@ package dagstore
import (
"context"
"fmt"

"github.com/filecoin-project/dagstore/mount"

"github.com/filecoin-project/dagstore/shard"
)

type OpType int
Expand All @@ -16,6 +20,12 @@ const (
OpShardFail
OpShardRelease
OpShardRecover
OpShardReserveTransient
OpShardReleaseTransientReservation
)

var (
defaultReservation = 134217728 // 128 Mib
)

func (o OpType) String() string {
Expand All @@ -27,7 +37,9 @@ func (o OpType) String() string {
"OpShardAcquire",
"OpShardFail",
"OpShardRelease",
"OpShardRecover"}[o]
"OpShardRecover",
"OpShardReserveTransient",
"OpShardReleaseTransientReservation"}[o]
}

// control runs the DAG store's event loop.
Expand All @@ -53,10 +65,12 @@ func (d *DAGStore) control() {
}

if gc != nil {
// this was a GC request.
d.gc(gc)
// this was a manual GC request.
d.manualGC(gc)
continue
}
// perform GC if the transients directory has already gone above the watermark and automated gc is enabled.
d.automatedGCIfNeeded()

s := tsk.shard
log.Debugw("processing task", "op", tsk.op, "shard", tsk.shard.key, "error", tsk.err)
Expand Down Expand Up @@ -105,6 +119,14 @@ func (d *DAGStore) control() {
// or when recovering from a failure.

s.state = ShardStateAvailable

st, err := s.mount.Stat(d.ctx)
if err != nil {
log.Errorw("failed to stat transient", "shard", s.key, "error", err)
} else {
s.transientSize = st.Size
}

s.err = nil // nillify past errors

// notify the registration waiter, if there is one.
Expand Down Expand Up @@ -203,6 +225,7 @@ func (d *DAGStore) control() {

case OpShardFail:
s.state = ShardStateErrored
s.transientSize = 0
s.err = tsk.err

// notify the registration waiter, if there is one.
Expand Down Expand Up @@ -263,7 +286,6 @@ func (d *DAGStore) control() {

// set the state to recovering.
s.state = ShardStateRecovering

// park the waiter; there can never be more than one because
// subsequent calls to recover the same shard will be rejected
// because the state is no longer ShardStateErrored.
Expand All @@ -272,9 +294,11 @@ func (d *DAGStore) control() {
// attempt to delete the transient first; this can happen if the
// transient has been removed by hand. DeleteTransient resets the
// transient to "" always.
if err := s.mount.DeleteTransient(); err != nil {
freed, err := s.mount.DeleteTransient()
if err != nil {
log.Warnw("recovery: failed to delete transient", "shard", s.key, "error", err)
}
d.totalTransientDirSize -= freed

// attempt to drop the index.
dropped, err := d.indices.DropFullIndex(s.key)
Expand All @@ -300,11 +324,62 @@ func (d *DAGStore) control() {
d.lk.Unlock()
// TODO are we guaranteed that there are no queued items for this shard?

case OpShardReserveTransient:
if s.state != ShardStateServing && s.state != ShardStateInitializing && s.state != ShardStateRecovering {
// sanity check failed
_ = d.failShard(s, d.internalCh, "%w: expected shard to be in 'serving' or `initialising` or `recovering` "+
"state; was: %s", ErrShardIllegalReservationRequest, s.state)
break
}

toReserve := tsk.reservationReq.want
reservationSizeUnknown := toReserve == 0

// increase the space allocated linearly as more reservations are requested for a shard whose
// transient size is unknown upfront.
if reservationSizeUnknown {
toReserve = (tsk.reservationReq.nPrevReservations + 1) * d.defaultReservationSize
}

mkReservation := func() {
d.totalTransientDirSize += toReserve
tsk.reservationReq.response <- &reservationResp{reserved: toReserve}
}
// do we have enough space available ? if yes, allocate the reservation right away
if d.totalTransientDirSize+toReserve <= d.maxTransientDirSize {
mkReservation()
break
}

// otherwise, perform a GC to make space for the reservation.
d.gcUptoTarget(float64(d.maxTransientDirSize - toReserve))

// if we have enough space available after the gc, allocate the reservation
if d.totalTransientDirSize+toReserve <= d.maxTransientDirSize {
mkReservation()
break
}

// we weren't able to make space for the reservation request even after a GC attempt.
// fail the reservation request.
tsk.reservationReq.response <- &reservationResp{err: mount.ErrNotEnoughSpaceInTransientsDir}

case OpShardReleaseTransientReservation:
if s.state != ShardStateServing && s.state != ShardStateInitializing && s.state != ShardStateRecovering {
// sanity check failed
_ = d.failShard(s, d.internalCh, "%w: expected shard to be in 'serving' or `initialising` or `recovering` "+
"state; was: %s", ErrShardIllegalReservationRequest, s.state)
break
}
d.totalTransientDirSize -= tsk.releaseReq.release

default:
panic(fmt.Sprintf("unrecognized shard operation: %d", tsk.op))

}

// update the GarbageCollector
d.notifyGCStrategy(s.key, s, tsk.op)

// persist the current shard state.
if err := s.persist(d.ctx, d.config.Datastore); err != nil { // TODO maybe fail shard?
log.Warnw("failed to persist shard", "shard", s.key, "error", err)
Expand All @@ -321,6 +396,7 @@ func (d *DAGStore) control() {
Error: s.err,
refs: s.refs,
},
TransientDirSizeCounter: d.totalTransientDirSize,
}
d.traceCh <- n
log.Debugw("finished writing trace to the trace channel", "shard", s.key)
Expand All @@ -332,6 +408,25 @@ func (d *DAGStore) control() {
}
}

func (d *DAGStore) notifyGCStrategy(key shard.Key, s *Shard, op OpType) {
if op == OpShardDestroy {
d.gcs.NotifyRemoved(s.key)
return
}

// notify the garbage collector if shard was accessed
if op == OpShardAcquire {
d.gcs.NotifyAccessed(key)
}

// notify the garbage collector if shard is in a state where it can be reclaimed/gc'd.
if nAcq := len(s.wAcquire); nAcq == 0 && (s.state == ShardStateAvailable || s.state == ShardStateErrored) {
d.gcs.NotifyReclaimable(key)
} else {
d.gcs.NotifyNotReclaimable(key)
}
}

func (d *DAGStore) consumeNext() (tsk *task, gc chan *GCResult, error error) {
select {
case tsk = <-d.internalCh: // drain internal first; these are tasks emitted from the event loop.
Expand All @@ -352,3 +447,58 @@ func (d *DAGStore) consumeNext() (tsk *task, gc chan *GCResult, error error) {
return nil, nil, d.ctx.Err() // TODO drain and process before returning?
}
}

var _ mount.TransientAllocator = (*transientAllocator)(nil)

// transientAllocator submits messages to the event loop to reserve and release space
// for transient downloads when the mount does not know the size of the transient to be downloaded upfront.
type transientAllocator struct {
d *DAGStore
}

func (t *transientAllocator) Reserve(ctx context.Context, key shard.Key, nPrevReservations int64, toReserve int64) (reserved int64, err error) {
t.d.lk.Lock()
s, ok := t.d.shards[key]
if !ok {
t.d.lk.Unlock()
return 0, ErrShardUnknown
}
t.d.lk.Unlock()

out := make(chan *reservationResp, 1)
tsk := &task{
op: OpShardReserveTransient,
shard: s,
reservationReq: &reservationReq{
nPrevReservations: nPrevReservations,
want: toReserve,
response: out,
},
}

if err := t.d.queueTask(tsk, t.d.completionCh); err != nil {
return 0, fmt.Errorf("failed to send reservation request: %w", err)
}

select {
case resp := <-out:
return resp.reserved, resp.err
case <-ctx.Done():
return 0, ctx.Err()
}
}

func (t *transientAllocator) Release(_ context.Context, key shard.Key, release int64) error {
t.d.lk.Lock()
s, ok := t.d.shards[key]
if !ok {
t.d.lk.Unlock()
return ErrShardUnknown
}
t.d.lk.Unlock()

tsk := &task{op: OpShardReleaseTransientReservation, shard: s,
releaseReq: &releaseReq{release: release}}

return t.d.queueTask(tsk, t.d.completionCh)
}
105 changes: 102 additions & 3 deletions dagstore_gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,24 @@ type GCResult struct {
// Shards includes an entry for every shard whose transient was reclaimed.
// Nil error values indicate success.
Shards map[shard.Key]error
// TransientDirSizeAfterGC is the size of the transients directory after a round of manual GC.
TransientDirSizeAfterGC int64
}

// AutomatedGCResult is the result of reclaiming a transient as part of an Automated GC.
type AutomatedGCResult struct {
// TransientsAccountingBeforeReclaim is the sum of the actual contents on disk and the number of bytes reserved
// but still not written before a shard was reclaimed.
TransientsAccountingBeforeReclaim int64
// TransientsAccountingAfterReclaim is the sum of the actual contents on disk and the number of bytes reserved
// but still not written after a shard was reclaimed.
TransientsAccountingAfterReclaim int64
// TransientsDirSizeBeforeReclaim indicates the size of the transient directory before a shard transient was removed.
TransientsDirSizeBeforeReclaim int64
// TransientsDirSizeAfterReclaim indicates the size of the transient directory after a shard transient was removed.
TransientsDirSizeAfterReclaim int64
// ReclaimedShard is the key of the shard whose transient was reclaimed by an Automated GC.
ReclaimedShard shard.Key
}

// ShardFailures returns the number of shards whose transient reclaim failed.
Expand All @@ -27,11 +45,89 @@ func (e *GCResult) ShardFailures() int {
return failures
}

// gc performs DAGStore GC. Refer to DAGStore#GC for more information.
func (d *DAGStore) automatedGCIfNeeded() {
if d.automatedGCEnabled {
maxTransientDirSize := d.maxTransientDirSize
transientsGCWatermarkHigh := d.transientsGCWatermarkHigh
transientsGCWatermarkLow := d.transientsGCWatermarkLow

if float64(d.totalTransientDirSize) >= float64(maxTransientDirSize)*transientsGCWatermarkHigh {
target := float64(maxTransientDirSize) * transientsGCWatermarkLow
d.gcUptoTarget(target)
}
}
}

// gcUptoTarget GC's transients till the size of the transients directory
// goes below the given target. It relies on the configured Garbage Collector to return a list of shards
// whose transients can be GC'd prioritized in the order they should be GC'd in.
// This method can only be called from the event loop.
func (d *DAGStore) gcUptoTarget(target float64) {
reclaimable := d.gcs.Reclaimable()

d.lk.RLock()
defer d.lk.RUnlock()

var reclaimed []shard.Key

// attempt to delete transients of reclaimed shards.
for _, sk := range reclaimable {
if float64(d.totalTransientDirSize) <= target {
break
}

s := d.shards[sk]

// only read lock: we're not modifying state, and the mount has its own lock.
before := d.totalTransientDirSize
beforeReclaimDiskSize, err := d.transientDirSize()
if err != nil {
log.Errorw("failed to fetch transient dir size", "error", err)
}
s.lk.RLock()
freed, err := s.mount.DeleteTransient()
if err != nil {
log.Warnw("failed to delete transient", "shard", s.key, "error", err)
}

d.totalTransientDirSize -= freed
reclaimed = append(reclaimed, sk)

if freed != 0 && d.automatedgcTraceCh != nil {
afterReclaimDiskSize, err := d.transientDirSize()
if err != nil {
log.Errorw("failed to fetch transient dir size", "error", err)
}
result := AutomatedGCResult{
TransientsAccountingBeforeReclaim: before,
TransientsAccountingAfterReclaim: d.totalTransientDirSize,
TransientsDirSizeBeforeReclaim: beforeReclaimDiskSize,
TransientsDirSizeAfterReclaim: afterReclaimDiskSize,
ReclaimedShard: sk,
}

select {
case d.automatedgcTraceCh <- result:
case <-d.ctx.Done():
return
}
}

// flush the shard state to the datastore.
if err := s.persist(d.ctx, d.config.Datastore); err != nil {
log.Warnw("failed to persist shard", "shard", s.key, "error", err)
}
s.lk.RUnlock()
}

d.gcs.NotifyReclaimed(reclaimed)
}

// manualGC performs DAGStore GC. Refer to DAGStore#GC for more information.
//
// The event loops gives it exclusive execution rights, so while GC is running,
// no other events are being processed.
func (d *DAGStore) gc(resCh chan *GCResult) {
func (d *DAGStore) manualGC(resCh chan *GCResult) {
res := &GCResult{
Shards: make(map[shard.Key]error),
}
Expand All @@ -52,10 +148,11 @@ func (d *DAGStore) gc(resCh chan *GCResult) {
for _, s := range reclaim {
// only read lock: we're not modifying state, and the mount has its own lock.
s.lk.RLock()
err := s.mount.DeleteTransient()
freed, err := s.mount.DeleteTransient()
if err != nil {
log.Warnw("failed to delete transient", "shard", s.key, "error", err)
}
d.totalTransientDirSize -= freed

// record the error so we can return it.
res.Shards[s.key] = err
Expand All @@ -67,6 +164,8 @@ func (d *DAGStore) gc(resCh chan *GCResult) {
s.lk.RUnlock()
}

res.TransientDirSizeAfterGC = d.totalTransientDirSize

select {
case resCh <- res:
case <-d.ctx.Done():
Expand Down
Loading