Skip to content

Commit

Permalink
prune archived orders
Browse files Browse the repository at this point in the history
Prune archived orders when there are more than 1000 (configurable).
  • Loading branch information
buck54321 committed Sep 15, 2024
1 parent ae2fbd0 commit 65179f1
Show file tree
Hide file tree
Showing 4 changed files with 298 additions and 14 deletions.
29 changes: 18 additions & 11 deletions client/app/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,18 @@ import (
)

const (
defaultRPCCertFile = "rpc.cert"
defaultRPCKeyFile = "rpc.key"
defaultMainnetHost = "127.0.0.1"
defaultTestnetHost = "127.0.0.2"
defaultSimnetHost = "127.0.0.3"
walletPairOneHost = "127.0.0.6"
walletPairTwoHost = "127.0.0.7"
defaultRPCPort = "5757"
defaultWebPort = "5758"
defaultLogLevel = "debug"
configFilename = "dexc.conf"
defaultRPCCertFile = "rpc.cert"
defaultRPCKeyFile = "rpc.key"
defaultMainnetHost = "127.0.0.1"
defaultTestnetHost = "127.0.0.2"
defaultSimnetHost = "127.0.0.3"
walletPairOneHost = "127.0.0.6"
walletPairTwoHost = "127.0.0.7"
defaultRPCPort = "5757"
defaultWebPort = "5758"
defaultLogLevel = "debug"
configFilename = "dexc.conf"
defaultArchiveSizeLimit = 1000
)

var (
Expand Down Expand Up @@ -106,6 +107,8 @@ type CoreConfig struct {
UnlockCoinsOnLogin bool `long:"release-wallet-coins" description:"On login or wallet creation, instruct the wallet to release any coins that it may have locked."`

ExtensionModeFile string `long:"extension-mode-file" description:"path to a file that specifies options for running core as an extension."`

ArchiveSizeLimit uint64 `long:"archivesize" description:"the maximum number of orders to be archived before deleting the oldest"`
}

// WebConfig encapsulates the configuration needed for the web server.
Expand Down Expand Up @@ -213,6 +216,7 @@ func (cfg *Config) Core(log dex.Logger) *core.Config {
NoAutoWalletLock: cfg.NoAutoWalletLock,
NoAutoDBBackup: cfg.NoAutoDBBackup,
ExtensionModeFile: cfg.ExtensionModeFile,
ArchiveSizeLimit: cfg.ArchiveSizeLimit,
}
}

Expand All @@ -223,6 +227,9 @@ var DefaultConfig = Config{
RPCConfig: RPCConfig{
CertHosts: []string{defaultTestnetHost, defaultSimnetHost, defaultMainnetHost},
},
CoreConfig: CoreConfig{
ArchiveSizeLimit: defaultArchiveSizeLimit,
},
}

// ParseCLIConfig parses the command-line arguments into the provided struct
Expand Down
4 changes: 4 additions & 0 deletions client/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -1433,6 +1433,9 @@ type Config struct {
// for running core in extension mode, which gives the caller options for
// e.g. limiting the ability to configure wallets.
ExtensionModeFile string
// ArchiveSizeLimit is the maximum number of orders that will be archived
// before we start deleting the oldest.
ArchiveSizeLimit uint64
}

// locale is data associated with the currently selected language.
Expand Down Expand Up @@ -1515,6 +1518,7 @@ func New(cfg *Config) (*Core, error) {
}
dbOpts := bolt.Opts{
BackupOnShutdown: !cfg.NoAutoDBBackup,
ArchiveSizeLimit: cfg.ArchiveSizeLimit,
}
boltDB, err := bolt.NewDB(cfg.DBPath, cfg.Logger.SubLogger("DB"), dbOpts)
if err != nil {
Expand Down
141 changes: 141 additions & 0 deletions client/db/bolt/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"path/filepath"
"sort"
"strings"
"sync"
"time"

"decred.org/dcrdex/client/db"
Expand Down Expand Up @@ -138,6 +139,7 @@ var (
// Opts is a set of options for the DB.
type Opts struct {
BackupOnShutdown bool // default is true
ArchiveSizeLimit uint64
}

var defaultOpts = Opts{
Expand Down Expand Up @@ -234,8 +236,29 @@ func (db *BoltDB) fileSize(path string) int64 {

// Run waits for context cancellation and closes the database.
func (db *BoltDB) Run(ctx context.Context) {
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
tick := time.After(time.Minute)
for {
select {
case <-tick:
case <-ctx.Done():
return
}
if err := db.pruneArchivedOrders(); err != nil {
db.log.Errorf("Error cleaning archive: %v", err)
}
tick = time.After(time.Minute * 30)
}
}()

<-ctx.Done() // wait for shutdown to backup and compact

// Wait for archive cleaner to exit.
wg.Wait()

// Create a backup in the backups folder.
if db.opts.BackupOnShutdown {
db.log.Infof("Backing up database...")
Expand Down Expand Up @@ -408,6 +431,124 @@ func (db *BoltDB) SetPrimaryCredentials(creds *dexdb.PrimaryCredentials) error {
})
}

func (db *BoltDB) pruneArchivedOrders() error {
var archiveSizeLimit uint64 = 1000
if db.opts.ArchiveSizeLimit != 0 {
archiveSizeLimit = db.opts.ArchiveSizeLimit
}

return db.Update(func(tx *bbolt.Tx) error {
archivedOB := tx.Bucket(archivedOrdersBucket)
if archivedOB == nil {
return fmt.Errorf("failed to open %s bucket", string(archivedOrdersBucket))
}

// We won't delete any orders with active matches.
activeMatches := tx.Bucket(activeMatchesBucket)
if activeMatches == nil {
return fmt.Errorf("failed to open %s bucket", string(activeMatchesBucket))
}
oidsWithActiveMatches := make(map[order.OrderID]struct{}, 0)
if err := activeMatches.ForEach(func(k, _ []byte) error {
mBkt := activeMatches.Bucket(k)
if mBkt == nil {
return fmt.Errorf("error getting match bucket %x", k)
}
var oid order.OrderID
copy(oid[:], mBkt.Get(orderIDKey))
oidsWithActiveMatches[oid] = struct{}{}
return nil
}); err != nil {
return fmt.Errorf("error building active match order ID index: %w", err)
}

nOrds := uint64(archivedOB.Stats().BucketN - 1 /* BucketN includes top bucket */)
if nOrds <= archiveSizeLimit {
return nil
}

toClear := int(nOrds - archiveSizeLimit)

type orderStamp struct {
oid []byte
stamp int64
}
deletes := make([]*orderStamp, 0, toClear)
sortDeletes := func() {
sort.Slice(deletes, func(i, j int) bool {
return deletes[i].stamp < deletes[j].stamp
})
}
if err := archivedOB.ForEach(func(oidB, v []byte) error {
var oid order.OrderID
copy(oid[:], oidB)
if _, found := oidsWithActiveMatches[oid]; found {
return nil
}
ord, err := decodeOrderBucket(oidB, archivedOB.Bucket(oidB))
if err != nil {
return fmt.Errorf("error decoding order %x: %v", oid, err)
}
stamp := ord.Order.Prefix().ClientTime.Unix()
if len(deletes) < toClear {
deletes = append(deletes, &orderStamp{
stamp: stamp,
oid: oidB,
})
sortDeletes()
return nil
}
if stamp > deletes[len(deletes)-1].stamp {
return nil
}
deletes[len(deletes)-1] = &orderStamp{
stamp: stamp,
oid: oidB,
}
sortDeletes()
return nil
}); err != nil {
return fmt.Errorf("archive iteration error: %v", err)
}

deletedOrders := make(map[order.OrderID]struct{})
for _, del := range deletes {
var oid order.OrderID
copy(oid[:], del.oid)
deletedOrders[oid] = struct{}{}
if err := archivedOB.DeleteBucket(del.oid); err != nil {
return fmt.Errorf("error deleting archived order %q: %v", del.oid, err)
}
}

matchesToDelete := make([][]byte, 0, archiveSizeLimit /* just avoid some allocs if we can */)
archivedMatches := tx.Bucket(archivedMatchesBucket)
if archivedMatches == nil {
return errors.New("no archived match bucket")
}
if err := archivedMatches.ForEach(func(k, _ []byte) error {
matchBkt := archivedMatches.Bucket(k)
if matchBkt == nil {
return fmt.Errorf("no bucket found for %x during iteration", k)
}
var oid order.OrderID
copy(oid[:], matchBkt.Get(orderIDKey))
if _, found := deletedOrders[oid]; found {
matchesToDelete = append(matchesToDelete, k)
}
return nil
}); err != nil {
return fmt.Errorf("error finding matches to prune: %w", err)
}
for i := range matchesToDelete {
if err := archivedMatches.DeleteBucket(matchesToDelete[i]); err != nil {
return fmt.Errorf("error deleting pruned match %x: %w", matchesToDelete[i], err)
}
}
return nil
})
}

// validateCreds checks that the PrimaryCredentials fields are properly
// populated.
func validateCreds(creds *dexdb.PrimaryCredentials) error {
Expand Down
Loading

0 comments on commit 65179f1

Please sign in to comment.