From 91a8843cd5be6c68c2352d2c78e57694be9ed772 Mon Sep 17 00:00:00 2001 From: Nodar Date: Wed, 19 Jul 2023 15:36:07 +0200 Subject: [PATCH 1/2] Document QueueStorage and ConfigFetcher, move DataPoster at the beginning of a file --- arbnode/dataposter/data_poster.go | 163 ++++++++++++----------- arbnode/dataposter/leveldb/leveldb.go | 8 +- arbnode/dataposter/redis/redisstorage.go | 10 +- arbnode/dataposter/slice/slicestorage.go | 14 +- arbnode/dataposter/storage_test.go | 22 +-- 5 files changed, 113 insertions(+), 104 deletions(-) diff --git a/arbnode/dataposter/data_poster.go b/arbnode/dataposter/data_poster.go index 1dec6ad0c4..bf2725a063 100644 --- a/arbnode/dataposter/data_poster.go +++ b/arbnode/dataposter/data_poster.go @@ -33,75 +33,6 @@ import ( redisstorage "github.com/offchainlabs/nitro/arbnode/dataposter/redis" ) -type queuedTransaction[Meta any] struct { - FullTx *types.Transaction - Data types.DynamicFeeTx - Meta Meta - Sent bool - Created time.Time // may be earlier than the tx was given to the tx poster - NextReplacement time.Time -} - -// Note: one of the implementation of this interface (Redis storage) does not -// support duplicate values. -type QueueStorage[Item any] interface { - GetContents(ctx context.Context, startingIndex uint64, maxResults uint64) ([]*Item, error) - GetLast(ctx context.Context) (*Item, error) - Prune(ctx context.Context, keepStartingAt uint64) error - Put(ctx context.Context, index uint64, prevItem *Item, newItem *Item) error - Length(ctx context.Context) (int, error) - IsPersistent() bool -} - -type DataPosterConfig struct { - RedisSigner signature.SimpleHmacConfig `koanf:"redis-signer"` - ReplacementTimes string `koanf:"replacement-times"` - WaitForL1Finality bool `koanf:"wait-for-l1-finality" reload:"hot"` - MaxMempoolTransactions uint64 `koanf:"max-mempool-transactions" reload:"hot"` - MaxQueuedTransactions int `koanf:"max-queued-transactions" reload:"hot"` - TargetPriceGwei float64 `koanf:"target-price-gwei" reload:"hot"` - UrgencyGwei float64 `koanf:"urgency-gwei" reload:"hot"` - MinFeeCapGwei float64 `koanf:"min-fee-cap-gwei" reload:"hot"` - MinTipCapGwei float64 `koanf:"min-tip-cap-gwei" reload:"hot"` - EnableLevelDB bool `koanf:"enable-leveldb" reload:"hot"` -} - -type DataPosterConfigFetcher func() *DataPosterConfig - -func DataPosterConfigAddOptions(prefix string, f *pflag.FlagSet) { - f.String(prefix+".replacement-times", DefaultDataPosterConfig.ReplacementTimes, "comma-separated list of durations since first posting to attempt a replace-by-fee") - f.Bool(prefix+".wait-for-l1-finality", DefaultDataPosterConfig.WaitForL1Finality, "only treat a transaction as confirmed after L1 finality has been achieved (recommended)") - f.Uint64(prefix+".max-mempool-transactions", DefaultDataPosterConfig.MaxMempoolTransactions, "the maximum number of transactions to have queued in the mempool at once (0 = unlimited)") - f.Int(prefix+".max-queued-transactions", DefaultDataPosterConfig.MaxQueuedTransactions, "the maximum number of unconfirmed transactions to track at once (0 = unlimited)") - f.Float64(prefix+".target-price-gwei", DefaultDataPosterConfig.TargetPriceGwei, "the target price to use for maximum fee cap calculation") - f.Float64(prefix+".urgency-gwei", DefaultDataPosterConfig.UrgencyGwei, "the urgency to use for maximum fee cap calculation") - f.Float64(prefix+".min-fee-cap-gwei", DefaultDataPosterConfig.MinFeeCapGwei, "the minimum fee cap to post transactions at") - f.Float64(prefix+".min-tip-cap-gwei", DefaultDataPosterConfig.MinTipCapGwei, "the minimum tip cap to post transactions at") - f.Bool(prefix+".enable-leveldb", DefaultDataPosterConfig.EnableLevelDB, "uses leveldb when enabled") - signature.SimpleHmacConfigAddOptions(prefix+".redis-signer", f) -} - -var DefaultDataPosterConfig = DataPosterConfig{ - ReplacementTimes: "5m,10m,20m,30m,1h,2h,4h,6h,8h,12h,16h,18h,20h,22h", - WaitForL1Finality: true, - TargetPriceGwei: 60., - UrgencyGwei: 2., - MaxMempoolTransactions: 64, - MinTipCapGwei: 0.05, - EnableLevelDB: false, -} - -var TestDataPosterConfig = DataPosterConfig{ - ReplacementTimes: "1s,2s,5s,10s,20s,30s,1m,5m", - RedisSigner: signature.TestSimpleHmacConfig, - WaitForL1Finality: false, - TargetPriceGwei: 60., - UrgencyGwei: 2., - MaxMempoolTransactions: 64, - MinTipCapGwei: 0.05, - EnableLevelDB: false, -} - // DataPoster must be RLP serializable and deserializable type DataPoster[Meta any] struct { stopwaiter.StopWaiter @@ -109,11 +40,11 @@ type DataPoster[Meta any] struct { client arbutil.L1Interface auth *bind.TransactOpts redisLock AttemptLocker - config DataPosterConfigFetcher + config ConfigFetcher replacementTimes []time.Duration metadataRetriever func(ctx context.Context, blockNum *big.Int) (Meta, error) - // these fields are protected by the mutex + // These fields are protected by the mutex. mutex sync.Mutex lastBlock *big.Int balance *big.Int @@ -126,7 +57,7 @@ type AttemptLocker interface { AttemptLock(context.Context) bool } -func NewDataPoster[Meta any](db ethdb.Database, headerReader *headerreader.HeaderReader, auth *bind.TransactOpts, redisClient redis.UniversalClient, redisLock AttemptLocker, config DataPosterConfigFetcher, metadataRetriever func(ctx context.Context, blockNum *big.Int) (Meta, error)) (*DataPoster[Meta], error) { +func NewDataPoster[Meta any](db ethdb.Database, headerReader *headerreader.HeaderReader, auth *bind.TransactOpts, redisClient redis.UniversalClient, redisLock AttemptLocker, config ConfigFetcher, metadataRetriever func(ctx context.Context, blockNum *big.Int) (Meta, error)) (*DataPoster[Meta], error) { var replacementTimes []time.Duration var lastReplacementTime time.Duration for _, s := range strings.Split(config().ReplacementTimes, ",") { @@ -184,7 +115,7 @@ func (p *DataPoster[Meta]) GetNextNonceAndMeta(ctx context.Context) (uint64, Met if err != nil { return 0, emptyMeta, err } - lastQueueItem, err := p.queue.GetLast(ctx) + lastQueueItem, err := p.queue.FetchLast(ctx) if err != nil { return 0, emptyMeta, err } @@ -231,7 +162,7 @@ func (p *DataPoster[Meta]) GetNextNonceAndMeta(ctx context.Context) (uint64, Met const minRbfIncrease = arbmath.OneInBips * 11 / 10 -func (p *DataPoster[Meta]) getFeeAndTipCaps(ctx context.Context, gasLimit uint64, lastFeeCap *big.Int, lastTipCap *big.Int, dataCreatedAt time.Time, backlogOfBatches uint64) (*big.Int, *big.Int, error) { +func (p *DataPoster[Meta]) feeAndTipCaps(ctx context.Context, gasLimit uint64, lastFeeCap *big.Int, lastTipCap *big.Int, dataCreatedAt time.Time, backlogOfBatches uint64) (*big.Int, *big.Int, error) { config := p.config() latestHeader, err := p.headerReader.LastHeader(ctx) if err != nil { @@ -312,7 +243,7 @@ func (p *DataPoster[Meta]) PostTransaction(ctx context.Context, dataCreatedAt ti if err != nil { return fmt.Errorf("failed to update data poster balance: %w", err) } - feeCap, tipCap, err := p.getFeeAndTipCaps(ctx, gasLimit, nil, nil, dataCreatedAt, 0) + feeCap, tipCap, err := p.feeAndTipCaps(ctx, gasLimit, nil, nil, dataCreatedAt, 0) if err != nil { return err } @@ -374,7 +305,7 @@ func (p *DataPoster[Meta]) sendTx(ctx context.Context, prevTx *queuedTransaction // the mutex must be held by the caller func (p *DataPoster[Meta]) replaceTx(ctx context.Context, prevTx *queuedTransaction[Meta], backlogOfBatches uint64) error { - newFeeCap, newTipCap, err := p.getFeeAndTipCaps(ctx, prevTx.Data.Gas, prevTx.Data.GasFeeCap, prevTx.Data.GasTipCap, prevTx.Created, backlogOfBatches) + newFeeCap, newTipCap, err := p.feeAndTipCaps(ctx, prevTx.Data.Gas, prevTx.Data.GasFeeCap, prevTx.Data.GasTipCap, prevTx.Created, backlogOfBatches) if err != nil { return err } @@ -455,6 +386,7 @@ func (p *DataPoster[Meta]) updateNonce(ctx context.Context) error { return nil } +// Updates dataposter balance to balance at pending block. func (p *DataPoster[Meta]) updateBalance(ctx context.Context) error { // Use the pending (representated as -1) balance because we're looking at batches we'd post, // so we want to see how much gas we could afford with our pending state. @@ -520,7 +452,7 @@ func (p *DataPoster[Meta]) Start(ctxIn context.Context) { // We use unconfirmedNonce here to replace-by-fee transactions that aren't in a block, // excluding those that are in an unconfirmed block. If a reorg occurs, we'll continue // replacing them by fee. - queueContents, err := p.queue.GetContents(ctx, unconfirmedNonce, maxTxsToRbf) + queueContents, err := p.queue.FetchContents(ctx, unconfirmedNonce, maxTxsToRbf) if err != nil { log.Warn("failed to get tx queue contents", "err", err) return minWait @@ -554,3 +486,80 @@ func (p *DataPoster[Meta]) Start(ctxIn context.Context) { return wait }) } + +type queuedTransaction[Meta any] struct { + FullTx *types.Transaction + Data types.DynamicFeeTx + Meta Meta + Sent bool + Created time.Time // may be earlier than the tx was given to the tx poster + NextReplacement time.Time +} + +// Note: one of the implementation of this interface (Redis storage) does not +// support duplicate values. +type QueueStorage[Item any] interface { + // Returns at most maxResults items starting from specified index. + FetchContents(ctx context.Context, startingIndex uint64, maxResults uint64) ([]*Item, error) + // Returns item with the biggest index. + FetchLast(ctx context.Context) (*Item, error) + // Prunes items up to (excluding) specified index. + Prune(ctx context.Context, until uint64) error + // Inserts new item at specified index if previous value matches specified value. + Put(ctx context.Context, index uint64, prevItem *Item, newItem *Item) error + // Returns the size of a queue. + Length(ctx context.Context) (int, error) + // Indicates whether queue stored at disk. + IsPersistent() bool +} + +type DataPosterConfig struct { + RedisSigner signature.SimpleHmacConfig `koanf:"redis-signer"` + ReplacementTimes string `koanf:"replacement-times"` + WaitForL1Finality bool `koanf:"wait-for-l1-finality" reload:"hot"` + MaxMempoolTransactions uint64 `koanf:"max-mempool-transactions" reload:"hot"` + MaxQueuedTransactions int `koanf:"max-queued-transactions" reload:"hot"` + TargetPriceGwei float64 `koanf:"target-price-gwei" reload:"hot"` + UrgencyGwei float64 `koanf:"urgency-gwei" reload:"hot"` + MinFeeCapGwei float64 `koanf:"min-fee-cap-gwei" reload:"hot"` + MinTipCapGwei float64 `koanf:"min-tip-cap-gwei" reload:"hot"` + EnableLevelDB bool `koanf:"enable-leveldb" reload:"hot"` +} + +// ConfigFetcher function type is used instead of directly passing config so +// that flags can be reloaded dynamically. +type ConfigFetcher func() *DataPosterConfig + +func DataPosterConfigAddOptions(prefix string, f *pflag.FlagSet) { + f.String(prefix+".replacement-times", DefaultDataPosterConfig.ReplacementTimes, "comma-separated list of durations since first posting to attempt a replace-by-fee") + f.Bool(prefix+".wait-for-l1-finality", DefaultDataPosterConfig.WaitForL1Finality, "only treat a transaction as confirmed after L1 finality has been achieved (recommended)") + f.Uint64(prefix+".max-mempool-transactions", DefaultDataPosterConfig.MaxMempoolTransactions, "the maximum number of transactions to have queued in the mempool at once (0 = unlimited)") + f.Int(prefix+".max-queued-transactions", DefaultDataPosterConfig.MaxQueuedTransactions, "the maximum number of unconfirmed transactions to track at once (0 = unlimited)") + f.Float64(prefix+".target-price-gwei", DefaultDataPosterConfig.TargetPriceGwei, "the target price to use for maximum fee cap calculation") + f.Float64(prefix+".urgency-gwei", DefaultDataPosterConfig.UrgencyGwei, "the urgency to use for maximum fee cap calculation") + f.Float64(prefix+".min-fee-cap-gwei", DefaultDataPosterConfig.MinFeeCapGwei, "the minimum fee cap to post transactions at") + f.Float64(prefix+".min-tip-cap-gwei", DefaultDataPosterConfig.MinTipCapGwei, "the minimum tip cap to post transactions at") + f.Bool(prefix+".enable-leveldb", DefaultDataPosterConfig.EnableLevelDB, "uses leveldb when enabled") + signature.SimpleHmacConfigAddOptions(prefix+".redis-signer", f) +} + +var DefaultDataPosterConfig = DataPosterConfig{ + ReplacementTimes: "5m,10m,20m,30m,1h,2h,4h,6h,8h,12h,16h,18h,20h,22h", + WaitForL1Finality: true, + TargetPriceGwei: 60., + UrgencyGwei: 2., + MaxMempoolTransactions: 64, + MinTipCapGwei: 0.05, + EnableLevelDB: false, +} + +var TestDataPosterConfig = DataPosterConfig{ + ReplacementTimes: "1s,2s,5s,10s,20s,30s,1m,5m", + RedisSigner: signature.TestSimpleHmacConfig, + WaitForL1Finality: false, + TargetPriceGwei: 60., + UrgencyGwei: 2., + MaxMempoolTransactions: 64, + MinTipCapGwei: 0.05, + EnableLevelDB: false, +} diff --git a/arbnode/dataposter/leveldb/leveldb.go b/arbnode/dataposter/leveldb/leveldb.go index c271b71267..aaf4935f15 100644 --- a/arbnode/dataposter/leveldb/leveldb.go +++ b/arbnode/dataposter/leveldb/leveldb.go @@ -45,7 +45,7 @@ func idxToKey(idx uint64) []byte { return []byte(fmt.Sprintf("%020d", idx)) } -func (s *Storage[Item]) GetContents(_ context.Context, startingIndex uint64, maxResults uint64) ([]*Item, error) { +func (s *Storage[Item]) FetchContents(_ context.Context, startingIndex uint64, maxResults uint64) ([]*Item, error) { var res []*Item it := s.db.NewIterator([]byte(""), idxToKey(startingIndex)) defer it.Release() @@ -66,7 +66,7 @@ func (s *Storage[Item]) lastItemIdx(context.Context) ([]byte, error) { return s.db.Get(lastItemIdxKey) } -func (s *Storage[Item]) GetLast(ctx context.Context) (*Item, error) { +func (s *Storage[Item]) FetchLast(ctx context.Context) (*Item, error) { size, err := s.Length(ctx) if err != nil { return nil, err @@ -85,12 +85,12 @@ func (s *Storage[Item]) GetLast(ctx context.Context) (*Item, error) { return s.decodeItem(val) } -func (s *Storage[Item]) Prune(ctx context.Context, keepStartingAt uint64) error { +func (s *Storage[Item]) Prune(ctx context.Context, until uint64) error { cnt, err := s.Length(ctx) if err != nil { return err } - end := idxToKey(keepStartingAt) + end := idxToKey(until) it := s.db.NewIterator([]byte{}, idxToKey(0)) defer it.Release() b := s.db.NewBatch() diff --git a/arbnode/dataposter/redis/redisstorage.go b/arbnode/dataposter/redis/redisstorage.go index f7aed00e59..738eea521d 100644 --- a/arbnode/dataposter/redis/redisstorage.go +++ b/arbnode/dataposter/redis/redisstorage.go @@ -52,7 +52,7 @@ func (s *Storage[Item]) peelVerifySignature(data []byte) ([]byte, error) { return data[32:], nil } -func (s *Storage[Item]) GetContents(ctx context.Context, startingIndex uint64, maxResults uint64) ([]*Item, error) { +func (s *Storage[Item]) FetchContents(ctx context.Context, startingIndex uint64, maxResults uint64) ([]*Item, error) { query := redis.ZRangeArgs{ Key: s.key, ByScore: true, @@ -79,7 +79,7 @@ func (s *Storage[Item]) GetContents(ctx context.Context, startingIndex uint64, m return items, nil } -func (s *Storage[Item]) GetLast(ctx context.Context) (*Item, error) { +func (s *Storage[Item]) FetchLast(ctx context.Context) (*Item, error) { query := redis.ZRangeArgs{ Key: s.key, Start: 0, @@ -109,9 +109,9 @@ func (s *Storage[Item]) GetLast(ctx context.Context) (*Item, error) { return ret, nil } -func (s *Storage[Item]) Prune(ctx context.Context, keepStartingAt uint64) error { - if keepStartingAt > 0 { - return s.client.ZRemRangeByScore(ctx, s.key, "-inf", fmt.Sprintf("%v", keepStartingAt-1)).Err() +func (s *Storage[Item]) Prune(ctx context.Context, until uint64) error { + if until > 0 { + return s.client.ZRemRangeByScore(ctx, s.key, "-inf", fmt.Sprintf("%v", until-1)).Err() } return nil } diff --git a/arbnode/dataposter/slice/slicestorage.go b/arbnode/dataposter/slice/slicestorage.go index b0a253086f..e438f9fe46 100644 --- a/arbnode/dataposter/slice/slicestorage.go +++ b/arbnode/dataposter/slice/slicestorage.go @@ -19,7 +19,7 @@ func NewStorage[Item any]() *Storage[Item] { return &Storage[Item]{} } -func (s *Storage[Item]) GetContents(_ context.Context, startingIndex uint64, maxResults uint64) ([]*Item, error) { +func (s *Storage[Item]) FetchContents(_ context.Context, startingIndex uint64, maxResults uint64) ([]*Item, error) { ret := s.queue if startingIndex >= s.firstNonce+uint64(len(s.queue)) || maxResults == 0 { return nil, nil @@ -33,19 +33,19 @@ func (s *Storage[Item]) GetContents(_ context.Context, startingIndex uint64, max return ret, nil } -func (s *Storage[Item]) GetLast(context.Context) (*Item, error) { +func (s *Storage[Item]) FetchLast(context.Context) (*Item, error) { if len(s.queue) == 0 { return nil, nil } return s.queue[len(s.queue)-1], nil } -func (s *Storage[Item]) Prune(_ context.Context, keepStartingAt uint64) error { - if keepStartingAt >= s.firstNonce+uint64(len(s.queue)) { +func (s *Storage[Item]) Prune(_ context.Context, until uint64) error { + if until >= s.firstNonce+uint64(len(s.queue)) { s.queue = nil - } else if keepStartingAt >= s.firstNonce { - s.queue = s.queue[keepStartingAt-s.firstNonce:] - s.firstNonce = keepStartingAt + } else if until >= s.firstNonce { + s.queue = s.queue[until-s.firstNonce:] + s.firstNonce = until } return nil } diff --git a/arbnode/dataposter/storage_test.go b/arbnode/dataposter/storage_test.go index 0ef83ed5ba..057080f603 100644 --- a/arbnode/dataposter/storage_test.go +++ b/arbnode/dataposter/storage_test.go @@ -84,7 +84,7 @@ func strPtrs(values []string) []*string { return res } -func TestGetContents(t *testing.T) { +func TestFetchContents(t *testing.T) { ctx := context.Background() for name, s := range initStorages(ctx, t) { for _, tc := range []struct { @@ -126,19 +126,19 @@ func TestGetContents(t *testing.T) { }, } { t.Run(name+"_"+tc.desc, func(t *testing.T) { - values, err := s.GetContents(ctx, tc.startIdx, tc.maxResults) + values, err := s.FetchContents(ctx, tc.startIdx, tc.maxResults) if err != nil { - t.Fatalf("GetContents(%d, %d) unexpected error: %v", tc.startIdx, tc.maxResults, err) + t.Fatalf("FetchContents(%d, %d) unexpected error: %v", tc.startIdx, tc.maxResults, err) } if diff := cmp.Diff(tc.want, values); diff != "" { - t.Errorf("GetContext(%d, %d) unexpected diff:\n%s", tc.startIdx, tc.maxResults, diff) + t.Errorf("FetchContents(%d, %d) unexpected diff:\n%s", tc.startIdx, tc.maxResults, diff) } }) } } } -func TestGetLast(t *testing.T) { +func TestLast(t *testing.T) { cnt := 100 for name, s := range storages(t) { t.Run(name, func(t *testing.T) { @@ -148,12 +148,12 @@ func TestGetLast(t *testing.T) { if err := s.Put(ctx, uint64(i), nil, &val); err != nil { t.Fatalf("Error putting a key/value: %v", err) } - got, err := s.GetLast(ctx) + got, err := s.FetchLast(ctx) if err != nil { t.Fatalf("Error getting a last element: %v", err) } if *got != val { - t.Errorf("GetLast() = %q want %q", *got, val) + t.Errorf("FetchLast() = %q want %q", *got, val) } } @@ -167,12 +167,12 @@ func TestGetLast(t *testing.T) { if err := s.Put(ctx, uint64(i), &prev, &newVal); err != nil { t.Fatalf("Error putting a key/value: %v, prev: %v, new: %v", err, prev, newVal) } - got, err := s.GetLast(ctx) + got, err := s.FetchLast(ctx) if err != nil { t.Fatalf("Error getting a last element: %v", err) } if *got != last { - t.Errorf("GetLast() = %q want %q", *got, last) + t.Errorf("FetchLast() = %q want %q", *got, last) } gotCnt, err := s.Length(ctx) if err != nil { @@ -225,9 +225,9 @@ func TestPrune(t *testing.T) { if err := s.Prune(ctx, tc.pruneFrom); err != nil { t.Fatalf("Prune(%d) unexpected error: %v", tc.pruneFrom, err) } - got, err := s.GetContents(ctx, 0, 20) + got, err := s.FetchContents(ctx, 0, 20) if err != nil { - t.Fatalf("GetContents() unexpected error: %v", err) + t.Fatalf("FetchContents() unexpected error: %v", err) } if diff := cmp.Diff(tc.want, got); diff != "" { t.Errorf("Prune(%d) unexpected diff:\n%s", tc.pruneFrom, diff) From 14cb1dad57e51c2af51830b31b0628dc2466d55b Mon Sep 17 00:00:00 2001 From: Nodar Date: Wed, 19 Jul 2023 15:42:34 +0200 Subject: [PATCH 2/2] Add QueueStorage documentation --- arbnode/dataposter/data_poster.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/arbnode/dataposter/data_poster.go b/arbnode/dataposter/data_poster.go index bf2725a063..b06b4b2cb3 100644 --- a/arbnode/dataposter/data_poster.go +++ b/arbnode/dataposter/data_poster.go @@ -496,6 +496,11 @@ type queuedTransaction[Meta any] struct { NextReplacement time.Time } +// Implements queue-alike storage that can +// - Insert item at specified index +// - Update item with the condition that existing value equals assumed value +// - Delete all the items up to specified index (prune) +// - Calculate length // Note: one of the implementation of this interface (Redis storage) does not // support duplicate values. type QueueStorage[Item any] interface {