forked from filecoin-project/dagstore
-
Notifications
You must be signed in to change notification settings - Fork 0
/
handlers.go
64 lines (56 loc) · 1.92 KB
/
handlers.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
package dagstore
import (
"context"
"github.com/filecoin-project/dagstore/shard"
)
// RecoverImmediately takes a failureCh where DAGStore failures are sent, and
// attempts to recover the shard immediately up until maxAttempts for each
// unique shard.
//
// Attempt tracking does not survive restarts. When the passed context fires,
// the failure handler will yield and the given `onDone` function is called before returning. It is recommended to call this
// method from a dedicated goroutine, as it runs an infinite event
// loop.
func RecoverImmediately(ctx context.Context, dagst *DAGStore, failureCh chan ShardResult, maxAttempts uint64, onDone func()) {
if onDone != nil {
defer onDone()
}
var (
recResCh = make(chan ShardResult, 128)
attempts = make(map[shard.Key]uint64)
)
for {
select {
case res := <-failureCh:
key := res.Key
att := attempts[key]
if att >= maxAttempts {
log.Infow("failure handler: max attempts exceeded; skipping recovery", "key", key, "from_error", res.Error, "attempt", att)
continue
}
log.Infow("failure handler: recovering shard", "key", key, "from_error", res.Error, "attempt", att)
// queue the recovery for this key.
if err := dagst.RecoverShard(ctx, key, recResCh, RecoverOpts{}); err != nil {
log.Warnw("failure handler: failed to queue shard recovery", "key", key, "error", err)
continue
}
attempts[key]++
case res := <-recResCh:
// this channel is just informational; a failure to recover will
// trigger another failure on the failureCh, which will be handled
// above for retry.
key := res.Key
if res.Error == nil {
log.Infow("failure handler: successfully recovered shard", "key", key)
delete(attempts, key)
} else {
log.Warnw("failure handler: failed to recover shard", "key", key, "attempt", attempts[key])
}
continue
case <-ctx.Done():
log.Info("failure handler: stopping")
attempts = nil
return
}
}
}