-
Notifications
You must be signed in to change notification settings - Fork 96
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
client/db: prune archived orders #2975
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,6 +14,7 @@ import ( | |
"path/filepath" | ||
"sort" | ||
"strings" | ||
"sync" | ||
"time" | ||
|
||
"decred.org/dcrdex/client/db" | ||
|
@@ -138,6 +139,7 @@ var ( | |
// Opts is a set of options for the DB. | ||
type Opts struct { | ||
BackupOnShutdown bool // default is true | ||
ArchiveSizeLimit uint64 | ||
} | ||
|
||
var defaultOpts = Opts{ | ||
|
@@ -234,8 +236,29 @@ func (db *BoltDB) fileSize(path string) int64 { | |
|
||
// Run waits for context cancellation and closes the database. | ||
func (db *BoltDB) Run(ctx context.Context) { | ||
var wg sync.WaitGroup | ||
wg.Add(1) | ||
go func() { | ||
defer wg.Done() | ||
tick := time.After(time.Minute) | ||
for { | ||
select { | ||
case <-tick: | ||
case <-ctx.Done(): | ||
return | ||
} | ||
if err := db.pruneArchivedOrders(); err != nil { | ||
db.log.Errorf("Error cleaning archive: %v", err) | ||
} | ||
tick = time.After(time.Minute * 30) | ||
} | ||
}() | ||
|
||
<-ctx.Done() // wait for shutdown to backup and compact | ||
|
||
// Wait for archive cleaner to exit. | ||
wg.Wait() | ||
|
||
// Create a backup in the backups folder. | ||
if db.opts.BackupOnShutdown { | ||
db.log.Infof("Backing up database...") | ||
|
@@ -408,6 +431,129 @@ func (db *BoltDB) SetPrimaryCredentials(creds *dexdb.PrimaryCredentials) error { | |
}) | ||
} | ||
|
||
func (db *BoltDB) pruneArchivedOrders() error { | ||
var archiveSizeLimit uint64 = 1000 | ||
if db.opts.ArchiveSizeLimit != 0 { | ||
archiveSizeLimit = db.opts.ArchiveSizeLimit | ||
} | ||
|
||
return db.Update(func(tx *bbolt.Tx) error { | ||
archivedOB := tx.Bucket(archivedOrdersBucket) | ||
if archivedOB == nil { | ||
return fmt.Errorf("failed to open %s bucket", string(archivedOrdersBucket)) | ||
} | ||
|
||
// We won't delete any orders with active matches. | ||
activeMatches := tx.Bucket(activeMatchesBucket) | ||
if activeMatches == nil { | ||
return fmt.Errorf("failed to open %s bucket", string(activeMatchesBucket)) | ||
} | ||
oidsWithActiveMatches := make(map[order.OrderID]struct{}, 0) | ||
if err := activeMatches.ForEach(func(k, _ []byte) error { | ||
mBkt := activeMatches.Bucket(k) | ||
if mBkt == nil { | ||
return fmt.Errorf("error getting match bucket %x", k) | ||
} | ||
var oid order.OrderID | ||
copy(oid[:], mBkt.Get(orderIDKey)) | ||
oidsWithActiveMatches[oid] = struct{}{} | ||
return nil | ||
}); err != nil { | ||
return fmt.Errorf("error building active match order ID index: %w", err) | ||
} | ||
|
||
nOrds := uint64(archivedOB.Stats().BucketN - 1 /* BucketN includes top bucket */) | ||
if nOrds <= archiveSizeLimit { | ||
return nil | ||
} | ||
Comment on lines
+465
to
+468
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You could do this first and maybe avoid collecting the active orders sometimes. |
||
|
||
toClear := int(nOrds - archiveSizeLimit) | ||
|
||
type orderStamp struct { | ||
oid []byte | ||
stamp uint64 | ||
} | ||
deletes := make([]*orderStamp, 0, toClear) | ||
sortDeletes := func() { | ||
sort.Slice(deletes, func(i, j int) bool { | ||
return deletes[i].stamp < deletes[j].stamp | ||
}) | ||
} | ||
if err := archivedOB.ForEach(func(oidB, v []byte) error { | ||
var oid order.OrderID | ||
copy(oid[:], oidB) | ||
if _, found := oidsWithActiveMatches[oid]; found { | ||
return nil | ||
} | ||
Comment on lines
+485
to
+487
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What if instead of just not deleting the oids with active matches you don't delete them with any matches. I think the problem is that the archives get clogged up with orders that just get created and cancelled after the market moves. Orders that actually matched shouldn't get deleted because people need to keep records of those. You could even just check the swap fees here, and not delete any trade that has swap fees > 0. |
||
oBkt := archivedOB.Bucket(oidB) | ||
if oBkt == nil { | ||
return fmt.Errorf("no order bucket iterated order %x", oidB) | ||
} | ||
stampB := oBkt.Get(updateTimeKey) | ||
if stampB == nil { | ||
// Highly improbable. | ||
stampB = make([]byte, 8) | ||
} | ||
stamp := intCoder.Uint64(stampB) | ||
if len(deletes) < toClear { | ||
deletes = append(deletes, &orderStamp{ | ||
stamp: stamp, | ||
oid: oidB, | ||
}) | ||
sortDeletes() | ||
return nil | ||
} | ||
if stamp > deletes[len(deletes)-1].stamp { | ||
return nil | ||
} | ||
deletes[len(deletes)-1] = &orderStamp{ | ||
stamp: stamp, | ||
oid: oidB, | ||
} | ||
sortDeletes() | ||
return nil | ||
}); err != nil { | ||
return fmt.Errorf("archive iteration error: %v", err) | ||
} | ||
|
||
deletedOrders := make(map[order.OrderID]struct{}) | ||
for _, del := range deletes { | ||
var oid order.OrderID | ||
copy(oid[:], del.oid) | ||
deletedOrders[oid] = struct{}{} | ||
if err := archivedOB.DeleteBucket(del.oid); err != nil { | ||
return fmt.Errorf("error deleting archived order %q: %v", del.oid, err) | ||
} | ||
} | ||
|
||
matchesToDelete := make([][]byte, 0, archiveSizeLimit /* just avoid some allocs if we can */) | ||
archivedMatches := tx.Bucket(archivedMatchesBucket) | ||
if archivedMatches == nil { | ||
return errors.New("no archived match bucket") | ||
} | ||
if err := archivedMatches.ForEach(func(k, _ []byte) error { | ||
matchBkt := archivedMatches.Bucket(k) | ||
if matchBkt == nil { | ||
return fmt.Errorf("no bucket found for %x during iteration", k) | ||
} | ||
var oid order.OrderID | ||
copy(oid[:], matchBkt.Get(orderIDKey)) | ||
if _, found := deletedOrders[oid]; found { | ||
matchesToDelete = append(matchesToDelete, k) | ||
} | ||
return nil | ||
}); err != nil { | ||
return fmt.Errorf("error finding matches to prune: %w", err) | ||
} | ||
for i := range matchesToDelete { | ||
if err := archivedMatches.DeleteBucket(matchesToDelete[i]); err != nil { | ||
return fmt.Errorf("error deleting pruned match %x: %w", matchesToDelete[i], err) | ||
} | ||
} | ||
return nil | ||
}) | ||
} | ||
|
||
// validateCreds checks that the PrimaryCredentials fields are properly | ||
// populated. | ||
func validateCreds(creds *dexdb.PrimaryCredentials) error { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could mention it cannot be zero, or error somewhere if it is.