-
Notifications
You must be signed in to change notification settings - Fork 24
/
dagstore_gc.go
104 lines (90 loc) · 2.6 KB
/
dagstore_gc.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
package dagstore
import (
"io/fs"
"os"
"path/filepath"
"github.com/filecoin-project/dagstore/shard"
)
// GCResult is the result of performing a GC operation. It holds the results
// from deleting unused transients.
type GCResult struct {
// Shards includes an entry for every shard whose transient was reclaimed.
// Nil error values indicate success.
Shards map[shard.Key]error
}
// ShardFailures returns the number of shards whose transient reclaim failed.
func (e *GCResult) ShardFailures() int {
var failures int
for _, err := range e.Shards {
if err != nil {
failures++
}
}
return failures
}
// gc performs DAGStore GC. Refer to DAGStore#GC for more information.
//
// The event loops gives it exclusive execution rights, so while GC is running,
// no other events are being processed.
func (d *DAGStore) gc(resCh chan *GCResult) {
res := &GCResult{
Shards: make(map[shard.Key]error),
}
// determine which shards can be reclaimed.
d.lk.RLock()
var reclaim []*Shard
for _, s := range d.shards {
s.lk.RLock()
if nAcq := len(s.wAcquire); (s.state == ShardStateAvailable || s.state == ShardStateErrored) && nAcq == 0 {
reclaim = append(reclaim, s)
}
s.lk.RUnlock()
}
d.lk.RUnlock()
// attempt to delete transients of reclaimed shards.
for _, s := range reclaim {
// only read lock: we're not modifying state, and the mount has its own lock.
s.lk.RLock()
err := s.mount.DeleteTransient()
if err != nil {
log.Warnw("failed to delete transient", "shard", s.key, "error", err)
}
// record the error so we can return it.
res.Shards[s.key] = err
// flush the shard state to the datastore.
if err := s.persist(d.ctx, d.config.Datastore); err != nil {
log.Warnw("failed to persist shard", "shard", s.key, "error", err)
}
s.lk.RUnlock()
}
select {
case resCh <- res:
case <-d.ctx.Done():
}
}
// clearOrphaned removes files that are not referenced by any mount.
//
// This is only safe to be called from the constructor, before we have
// queued tasks.
func (d *DAGStore) clearOrphaned() error {
referenced := make(map[string]struct{})
for _, s := range d.shards {
t := s.mount.TransientPath()
referenced[t] = struct{}{}
}
// Walk the transients dir and delete unreferenced files.
err := filepath.WalkDir(d.config.TransientsDir, func(path string, d fs.DirEntry, err error) error {
if d.IsDir() {
return nil
}
if _, ok := referenced[path]; !ok {
if err := os.Remove(path); err != nil {
log.Warnw("failed to delete orphaned file", "path", path, "error", err)
} else {
log.Infow("deleted orphaned file", "path", path)
}
}
return nil
})
return err
}