Skip to content

Commit

Permalink
Merge pull request #1771 from OffchainLabs/dataposter-doc
Browse files Browse the repository at this point in the history
Document QueueStorage and ConfigFetcher
  • Loading branch information
anodar committed Jul 28, 2023
2 parents 22af898 + 0df4e76 commit c1adbe8
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 104 deletions.
168 changes: 91 additions & 77 deletions arbnode/dataposter/data_poster.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,87 +33,18 @@ import (
redisstorage "github.com/offchainlabs/nitro/arbnode/dataposter/redis"
)

type queuedTransaction[Meta any] struct {
FullTx *types.Transaction
Data types.DynamicFeeTx
Meta Meta
Sent bool
Created time.Time // may be earlier than the tx was given to the tx poster
NextReplacement time.Time
}

// Note: one of the implementation of this interface (Redis storage) does not
// support duplicate values.
type QueueStorage[Item any] interface {
GetContents(ctx context.Context, startingIndex uint64, maxResults uint64) ([]*Item, error)
GetLast(ctx context.Context) (*Item, error)
Prune(ctx context.Context, keepStartingAt uint64) error
Put(ctx context.Context, index uint64, prevItem *Item, newItem *Item) error
Length(ctx context.Context) (int, error)
IsPersistent() bool
}

type DataPosterConfig struct {
RedisSigner signature.SimpleHmacConfig `koanf:"redis-signer"`
ReplacementTimes string `koanf:"replacement-times"`
WaitForL1Finality bool `koanf:"wait-for-l1-finality" reload:"hot"`
MaxMempoolTransactions uint64 `koanf:"max-mempool-transactions" reload:"hot"`
MaxQueuedTransactions int `koanf:"max-queued-transactions" reload:"hot"`
TargetPriceGwei float64 `koanf:"target-price-gwei" reload:"hot"`
UrgencyGwei float64 `koanf:"urgency-gwei" reload:"hot"`
MinFeeCapGwei float64 `koanf:"min-fee-cap-gwei" reload:"hot"`
MinTipCapGwei float64 `koanf:"min-tip-cap-gwei" reload:"hot"`
EnableLevelDB bool `koanf:"enable-leveldb" reload:"hot"`
}

type DataPosterConfigFetcher func() *DataPosterConfig

func DataPosterConfigAddOptions(prefix string, f *pflag.FlagSet) {
f.String(prefix+".replacement-times", DefaultDataPosterConfig.ReplacementTimes, "comma-separated list of durations since first posting to attempt a replace-by-fee")
f.Bool(prefix+".wait-for-l1-finality", DefaultDataPosterConfig.WaitForL1Finality, "only treat a transaction as confirmed after L1 finality has been achieved (recommended)")
f.Uint64(prefix+".max-mempool-transactions", DefaultDataPosterConfig.MaxMempoolTransactions, "the maximum number of transactions to have queued in the mempool at once (0 = unlimited)")
f.Int(prefix+".max-queued-transactions", DefaultDataPosterConfig.MaxQueuedTransactions, "the maximum number of unconfirmed transactions to track at once (0 = unlimited)")
f.Float64(prefix+".target-price-gwei", DefaultDataPosterConfig.TargetPriceGwei, "the target price to use for maximum fee cap calculation")
f.Float64(prefix+".urgency-gwei", DefaultDataPosterConfig.UrgencyGwei, "the urgency to use for maximum fee cap calculation")
f.Float64(prefix+".min-fee-cap-gwei", DefaultDataPosterConfig.MinFeeCapGwei, "the minimum fee cap to post transactions at")
f.Float64(prefix+".min-tip-cap-gwei", DefaultDataPosterConfig.MinTipCapGwei, "the minimum tip cap to post transactions at")
f.Bool(prefix+".enable-leveldb", DefaultDataPosterConfig.EnableLevelDB, "uses leveldb when enabled")
signature.SimpleHmacConfigAddOptions(prefix+".redis-signer", f)
}

var DefaultDataPosterConfig = DataPosterConfig{
ReplacementTimes: "5m,10m,20m,30m,1h,2h,4h,6h,8h,12h,16h,18h,20h,22h",
WaitForL1Finality: true,
TargetPriceGwei: 60.,
UrgencyGwei: 2.,
MaxMempoolTransactions: 64,
MinTipCapGwei: 0.05,
EnableLevelDB: false,
}

var TestDataPosterConfig = DataPosterConfig{
ReplacementTimes: "1s,2s,5s,10s,20s,30s,1m,5m",
RedisSigner: signature.TestSimpleHmacConfig,
WaitForL1Finality: false,
TargetPriceGwei: 60.,
UrgencyGwei: 2.,
MaxMempoolTransactions: 64,
MinTipCapGwei: 0.05,
EnableLevelDB: false,
}

// DataPoster must be RLP serializable and deserializable
type DataPoster[Meta any] struct {
stopwaiter.StopWaiter
headerReader *headerreader.HeaderReader
client arbutil.L1Interface
auth *bind.TransactOpts
redisLock AttemptLocker
config DataPosterConfigFetcher
config ConfigFetcher
replacementTimes []time.Duration
metadataRetriever func(ctx context.Context, blockNum *big.Int) (Meta, error)

// these fields are protected by the mutex
// These fields are protected by the mutex.
mutex sync.Mutex
lastBlock *big.Int
balance *big.Int
Expand All @@ -126,7 +57,7 @@ type AttemptLocker interface {
AttemptLock(context.Context) bool
}

func NewDataPoster[Meta any](db ethdb.Database, headerReader *headerreader.HeaderReader, auth *bind.TransactOpts, redisClient redis.UniversalClient, redisLock AttemptLocker, config DataPosterConfigFetcher, metadataRetriever func(ctx context.Context, blockNum *big.Int) (Meta, error)) (*DataPoster[Meta], error) {
func NewDataPoster[Meta any](db ethdb.Database, headerReader *headerreader.HeaderReader, auth *bind.TransactOpts, redisClient redis.UniversalClient, redisLock AttemptLocker, config ConfigFetcher, metadataRetriever func(ctx context.Context, blockNum *big.Int) (Meta, error)) (*DataPoster[Meta], error) {
var replacementTimes []time.Duration
var lastReplacementTime time.Duration
for _, s := range strings.Split(config().ReplacementTimes, ",") {
Expand Down Expand Up @@ -184,7 +115,7 @@ func (p *DataPoster[Meta]) GetNextNonceAndMeta(ctx context.Context) (uint64, Met
if err != nil {
return 0, emptyMeta, err
}
lastQueueItem, err := p.queue.GetLast(ctx)
lastQueueItem, err := p.queue.FetchLast(ctx)
if err != nil {
return 0, emptyMeta, err
}
Expand Down Expand Up @@ -231,7 +162,7 @@ func (p *DataPoster[Meta]) GetNextNonceAndMeta(ctx context.Context) (uint64, Met

const minRbfIncrease = arbmath.OneInBips * 11 / 10

func (p *DataPoster[Meta]) getFeeAndTipCaps(ctx context.Context, gasLimit uint64, lastFeeCap *big.Int, lastTipCap *big.Int, dataCreatedAt time.Time, backlogOfBatches uint64) (*big.Int, *big.Int, error) {
func (p *DataPoster[Meta]) feeAndTipCaps(ctx context.Context, gasLimit uint64, lastFeeCap *big.Int, lastTipCap *big.Int, dataCreatedAt time.Time, backlogOfBatches uint64) (*big.Int, *big.Int, error) {
config := p.config()
latestHeader, err := p.headerReader.LastHeader(ctx)
if err != nil {
Expand Down Expand Up @@ -312,7 +243,7 @@ func (p *DataPoster[Meta]) PostTransaction(ctx context.Context, dataCreatedAt ti
if err != nil {
return fmt.Errorf("failed to update data poster balance: %w", err)
}
feeCap, tipCap, err := p.getFeeAndTipCaps(ctx, gasLimit, nil, nil, dataCreatedAt, 0)
feeCap, tipCap, err := p.feeAndTipCaps(ctx, gasLimit, nil, nil, dataCreatedAt, 0)
if err != nil {
return err
}
Expand Down Expand Up @@ -374,7 +305,7 @@ func (p *DataPoster[Meta]) sendTx(ctx context.Context, prevTx *queuedTransaction

// the mutex must be held by the caller
func (p *DataPoster[Meta]) replaceTx(ctx context.Context, prevTx *queuedTransaction[Meta], backlogOfBatches uint64) error {
newFeeCap, newTipCap, err := p.getFeeAndTipCaps(ctx, prevTx.Data.Gas, prevTx.Data.GasFeeCap, prevTx.Data.GasTipCap, prevTx.Created, backlogOfBatches)
newFeeCap, newTipCap, err := p.feeAndTipCaps(ctx, prevTx.Data.Gas, prevTx.Data.GasFeeCap, prevTx.Data.GasTipCap, prevTx.Created, backlogOfBatches)
if err != nil {
return err
}
Expand Down Expand Up @@ -455,6 +386,7 @@ func (p *DataPoster[Meta]) updateNonce(ctx context.Context) error {
return nil
}

// Updates dataposter balance to balance at pending block.
func (p *DataPoster[Meta]) updateBalance(ctx context.Context) error {
// Use the pending (representated as -1) balance because we're looking at batches we'd post,
// so we want to see how much gas we could afford with our pending state.
Expand Down Expand Up @@ -520,7 +452,7 @@ func (p *DataPoster[Meta]) Start(ctxIn context.Context) {
// We use unconfirmedNonce here to replace-by-fee transactions that aren't in a block,
// excluding those that are in an unconfirmed block. If a reorg occurs, we'll continue
// replacing them by fee.
queueContents, err := p.queue.GetContents(ctx, unconfirmedNonce, maxTxsToRbf)
queueContents, err := p.queue.FetchContents(ctx, unconfirmedNonce, maxTxsToRbf)
if err != nil {
log.Warn("failed to get tx queue contents", "err", err)
return minWait
Expand Down Expand Up @@ -554,3 +486,85 @@ func (p *DataPoster[Meta]) Start(ctxIn context.Context) {
return wait
})
}

type queuedTransaction[Meta any] struct {
FullTx *types.Transaction
Data types.DynamicFeeTx
Meta Meta
Sent bool
Created time.Time // may be earlier than the tx was given to the tx poster
NextReplacement time.Time
}

// Implements queue-alike storage that can
// - Insert item at specified index
// - Update item with the condition that existing value equals assumed value
// - Delete all the items up to specified index (prune)
// - Calculate length
// Note: one of the implementation of this interface (Redis storage) does not
// support duplicate values.
type QueueStorage[Item any] interface {
// Returns at most maxResults items starting from specified index.
FetchContents(ctx context.Context, startingIndex uint64, maxResults uint64) ([]*Item, error)
// Returns item with the biggest index.
FetchLast(ctx context.Context) (*Item, error)
// Prunes items up to (excluding) specified index.
Prune(ctx context.Context, until uint64) error
// Inserts new item at specified index if previous value matches specified value.
Put(ctx context.Context, index uint64, prevItem *Item, newItem *Item) error
// Returns the size of a queue.
Length(ctx context.Context) (int, error)
// Indicates whether queue stored at disk.
IsPersistent() bool
}

type DataPosterConfig struct {
RedisSigner signature.SimpleHmacConfig `koanf:"redis-signer"`
ReplacementTimes string `koanf:"replacement-times"`
WaitForL1Finality bool `koanf:"wait-for-l1-finality" reload:"hot"`
MaxMempoolTransactions uint64 `koanf:"max-mempool-transactions" reload:"hot"`
MaxQueuedTransactions int `koanf:"max-queued-transactions" reload:"hot"`
TargetPriceGwei float64 `koanf:"target-price-gwei" reload:"hot"`
UrgencyGwei float64 `koanf:"urgency-gwei" reload:"hot"`
MinFeeCapGwei float64 `koanf:"min-fee-cap-gwei" reload:"hot"`
MinTipCapGwei float64 `koanf:"min-tip-cap-gwei" reload:"hot"`
EnableLevelDB bool `koanf:"enable-leveldb" reload:"hot"`
}

// ConfigFetcher function type is used instead of directly passing config so
// that flags can be reloaded dynamically.
type ConfigFetcher func() *DataPosterConfig

func DataPosterConfigAddOptions(prefix string, f *pflag.FlagSet) {
f.String(prefix+".replacement-times", DefaultDataPosterConfig.ReplacementTimes, "comma-separated list of durations since first posting to attempt a replace-by-fee")
f.Bool(prefix+".wait-for-l1-finality", DefaultDataPosterConfig.WaitForL1Finality, "only treat a transaction as confirmed after L1 finality has been achieved (recommended)")
f.Uint64(prefix+".max-mempool-transactions", DefaultDataPosterConfig.MaxMempoolTransactions, "the maximum number of transactions to have queued in the mempool at once (0 = unlimited)")
f.Int(prefix+".max-queued-transactions", DefaultDataPosterConfig.MaxQueuedTransactions, "the maximum number of unconfirmed transactions to track at once (0 = unlimited)")
f.Float64(prefix+".target-price-gwei", DefaultDataPosterConfig.TargetPriceGwei, "the target price to use for maximum fee cap calculation")
f.Float64(prefix+".urgency-gwei", DefaultDataPosterConfig.UrgencyGwei, "the urgency to use for maximum fee cap calculation")
f.Float64(prefix+".min-fee-cap-gwei", DefaultDataPosterConfig.MinFeeCapGwei, "the minimum fee cap to post transactions at")
f.Float64(prefix+".min-tip-cap-gwei", DefaultDataPosterConfig.MinTipCapGwei, "the minimum tip cap to post transactions at")
f.Bool(prefix+".enable-leveldb", DefaultDataPosterConfig.EnableLevelDB, "uses leveldb when enabled")
signature.SimpleHmacConfigAddOptions(prefix+".redis-signer", f)
}

var DefaultDataPosterConfig = DataPosterConfig{
ReplacementTimes: "5m,10m,20m,30m,1h,2h,4h,6h,8h,12h,16h,18h,20h,22h",
WaitForL1Finality: true,
TargetPriceGwei: 60.,
UrgencyGwei: 2.,
MaxMempoolTransactions: 64,
MinTipCapGwei: 0.05,
EnableLevelDB: false,
}

var TestDataPosterConfig = DataPosterConfig{
ReplacementTimes: "1s,2s,5s,10s,20s,30s,1m,5m",
RedisSigner: signature.TestSimpleHmacConfig,
WaitForL1Finality: false,
TargetPriceGwei: 60.,
UrgencyGwei: 2.,
MaxMempoolTransactions: 64,
MinTipCapGwei: 0.05,
EnableLevelDB: false,
}
8 changes: 4 additions & 4 deletions arbnode/dataposter/leveldb/leveldb.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func idxToKey(idx uint64) []byte {
return []byte(fmt.Sprintf("%020d", idx))
}

func (s *Storage[Item]) GetContents(_ context.Context, startingIndex uint64, maxResults uint64) ([]*Item, error) {
func (s *Storage[Item]) FetchContents(_ context.Context, startingIndex uint64, maxResults uint64) ([]*Item, error) {
var res []*Item
it := s.db.NewIterator([]byte(""), idxToKey(startingIndex))
defer it.Release()
Expand All @@ -66,7 +66,7 @@ func (s *Storage[Item]) lastItemIdx(context.Context) ([]byte, error) {
return s.db.Get(lastItemIdxKey)
}

func (s *Storage[Item]) GetLast(ctx context.Context) (*Item, error) {
func (s *Storage[Item]) FetchLast(ctx context.Context) (*Item, error) {
size, err := s.Length(ctx)
if err != nil {
return nil, err
Expand All @@ -85,12 +85,12 @@ func (s *Storage[Item]) GetLast(ctx context.Context) (*Item, error) {
return s.decodeItem(val)
}

func (s *Storage[Item]) Prune(ctx context.Context, keepStartingAt uint64) error {
func (s *Storage[Item]) Prune(ctx context.Context, until uint64) error {
cnt, err := s.Length(ctx)
if err != nil {
return err
}
end := idxToKey(keepStartingAt)
end := idxToKey(until)
it := s.db.NewIterator([]byte{}, idxToKey(0))
defer it.Release()
b := s.db.NewBatch()
Expand Down
10 changes: 5 additions & 5 deletions arbnode/dataposter/redis/redisstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (s *Storage[Item]) peelVerifySignature(data []byte) ([]byte, error) {
return data[32:], nil
}

func (s *Storage[Item]) GetContents(ctx context.Context, startingIndex uint64, maxResults uint64) ([]*Item, error) {
func (s *Storage[Item]) FetchContents(ctx context.Context, startingIndex uint64, maxResults uint64) ([]*Item, error) {
query := redis.ZRangeArgs{
Key: s.key,
ByScore: true,
Expand All @@ -79,7 +79,7 @@ func (s *Storage[Item]) GetContents(ctx context.Context, startingIndex uint64, m
return items, nil
}

func (s *Storage[Item]) GetLast(ctx context.Context) (*Item, error) {
func (s *Storage[Item]) FetchLast(ctx context.Context) (*Item, error) {
query := redis.ZRangeArgs{
Key: s.key,
Start: 0,
Expand Down Expand Up @@ -109,9 +109,9 @@ func (s *Storage[Item]) GetLast(ctx context.Context) (*Item, error) {
return ret, nil
}

func (s *Storage[Item]) Prune(ctx context.Context, keepStartingAt uint64) error {
if keepStartingAt > 0 {
return s.client.ZRemRangeByScore(ctx, s.key, "-inf", fmt.Sprintf("%v", keepStartingAt-1)).Err()
func (s *Storage[Item]) Prune(ctx context.Context, until uint64) error {
if until > 0 {
return s.client.ZRemRangeByScore(ctx, s.key, "-inf", fmt.Sprintf("%v", until-1)).Err()
}
return nil
}
Expand Down
14 changes: 7 additions & 7 deletions arbnode/dataposter/slice/slicestorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func NewStorage[Item any]() *Storage[Item] {
return &Storage[Item]{}
}

func (s *Storage[Item]) GetContents(_ context.Context, startingIndex uint64, maxResults uint64) ([]*Item, error) {
func (s *Storage[Item]) FetchContents(_ context.Context, startingIndex uint64, maxResults uint64) ([]*Item, error) {
ret := s.queue
if startingIndex >= s.firstNonce+uint64(len(s.queue)) || maxResults == 0 {
return nil, nil
Expand All @@ -33,19 +33,19 @@ func (s *Storage[Item]) GetContents(_ context.Context, startingIndex uint64, max
return ret, nil
}

func (s *Storage[Item]) GetLast(context.Context) (*Item, error) {
func (s *Storage[Item]) FetchLast(context.Context) (*Item, error) {
if len(s.queue) == 0 {
return nil, nil
}
return s.queue[len(s.queue)-1], nil
}

func (s *Storage[Item]) Prune(_ context.Context, keepStartingAt uint64) error {
if keepStartingAt >= s.firstNonce+uint64(len(s.queue)) {
func (s *Storage[Item]) Prune(_ context.Context, until uint64) error {
if until >= s.firstNonce+uint64(len(s.queue)) {
s.queue = nil
} else if keepStartingAt >= s.firstNonce {
s.queue = s.queue[keepStartingAt-s.firstNonce:]
s.firstNonce = keepStartingAt
} else if until >= s.firstNonce {
s.queue = s.queue[until-s.firstNonce:]
s.firstNonce = until
}
return nil
}
Expand Down
Loading

0 comments on commit c1adbe8

Please sign in to comment.