Skip to content

Commit

Permalink
Merge branch 'master' into validator-messages
Browse files Browse the repository at this point in the history
  • Loading branch information
PlasmaPower authored Jul 12, 2023
2 parents a41e6e5 + f8a9421 commit ef41e1d
Show file tree
Hide file tree
Showing 13 changed files with 631 additions and 54 deletions.
101 changes: 94 additions & 7 deletions arbnode/batch_poster.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,24 @@ import (
"errors"
"fmt"
"math/big"
"sync/atomic"
"time"

"github.com/andybalholm/brotli"
flag "github.com/spf13/pflag"
"github.com/spf13/pflag"

"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/accounts/abi"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/rpc"

"github.com/offchainlabs/nitro/arbnode/dataposter"
"github.com/offchainlabs/nitro/arbnode/dataposter/storage"
"github.com/offchainlabs/nitro/arbos/arbostypes"
"github.com/offchainlabs/nitro/arbstate"
"github.com/offchainlabs/nitro/arbutil"
Expand Down Expand Up @@ -66,6 +69,8 @@ type BatchPoster struct {
redisLock *SimpleRedisLock
firstAccErr time.Time // first time a continuous missing accumulator occurred
backlog uint64 // An estimate of the number of unposted batches

batchReverted atomic.Bool // indicates whether data poster batch was reverted
}

type BatchPosterConfig struct {
Expand Down Expand Up @@ -101,7 +106,7 @@ func (c *BatchPosterConfig) Validate() error {

type BatchPosterConfigFetcher func() *BatchPosterConfig

func BatchPosterConfigAddOptions(prefix string, f *flag.FlagSet) {
func BatchPosterConfigAddOptions(prefix string, f *pflag.FlagSet) {
f.Bool(prefix+".enable", DefaultBatchPosterConfig.Enable, "enable posting batches to l1")
f.Bool(prefix+".disable-das-fallback-store-data-on-chain", DefaultBatchPosterConfig.DisableDasFallbackStoreDataOnChain, "If unable to batch to DAS, disable fallback storing data on chain")
f.Int(prefix+".max-size", DefaultBatchPosterConfig.MaxBatchSize, "maximum batch size")
Expand Down Expand Up @@ -158,7 +163,7 @@ var TestBatchPosterConfig = BatchPosterConfig{
L1Wallet: DefaultBatchPosterL1WalletConfig,
}

func NewBatchPoster(l1Reader *headerreader.HeaderReader, inbox *InboxTracker, streamer *TransactionStreamer, syncMonitor *SyncMonitor, config BatchPosterConfigFetcher, deployInfo *chaininfo.RollupAddresses, transactOpts *bind.TransactOpts, daWriter das.DataAvailabilityServiceWriter) (*BatchPoster, error) {
func NewBatchPoster(dataPosterDB ethdb.Database, l1Reader *headerreader.HeaderReader, inbox *InboxTracker, streamer *TransactionStreamer, syncMonitor *SyncMonitor, config BatchPosterConfigFetcher, deployInfo *chaininfo.RollupAddresses, transactOpts *bind.TransactOpts, daWriter das.DataAvailabilityServiceWriter) (*BatchPoster, error) {
seqInbox, err := bridgegen.NewSequencerInbox(deployInfo.SequencerInbox, l1Reader.Client())
if err != nil {
return nil, err
Expand Down Expand Up @@ -201,13 +206,91 @@ func NewBatchPoster(l1Reader *headerreader.HeaderReader, inbox *InboxTracker, st
dataPosterConfigFetcher := func() *dataposter.DataPosterConfig {
return &config().DataPoster
}
b.dataPoster, err = dataposter.NewDataPoster(l1Reader, transactOpts, redisClient, redisLock, dataPosterConfigFetcher, b.getBatchPosterPosition)
b.dataPoster, err = dataposter.NewDataPoster(dataPosterDB, l1Reader, transactOpts, redisClient, redisLock, dataPosterConfigFetcher, b.getBatchPosterPosition)
if err != nil {
return nil, err
}
return b, nil
}

// checkRevert checks blocks with number in range [from, to] whether they
// contain reverted batch_poster transaction.
func (b *BatchPoster) checkReverts(ctx context.Context, from, to int64) (bool, error) {
if from > to {
return false, fmt.Errorf("wrong range, from: %d is more to: %d", from, to)
}
for idx := from; idx <= to; idx++ {
number := big.NewInt(idx)
block, err := b.l1Reader.Client().BlockByNumber(ctx, number)
if err != nil {
return false, fmt.Errorf("getting block: %v by number: %w", number, err)
}
for idx, tx := range block.Transactions() {
from, err := b.l1Reader.Client().TransactionSender(ctx, tx, block.Hash(), uint(idx))
if err != nil {
return false, fmt.Errorf("getting sender of transaction tx: %v, %w", tx.Hash(), err)
}
if bytes.Equal(from.Bytes(), b.dataPoster.From().Bytes()) {
r, err := b.l1Reader.Client().TransactionReceipt(ctx, tx.Hash())
if err != nil {
return false, fmt.Errorf("getting a receipt for transaction: %v, %w", tx.Hash(), err)
}
if r.Status == types.ReceiptStatusFailed {
log.Error("Transaction from batch poster reverted", "nonce", tx.Nonce(), "txHash", tx.Hash(), "blockNumber", r.BlockNumber, "blockHash", r.BlockHash)
return true, nil
}
}
}
}
return false, nil
}

// pollForReverts runs a gouroutine that listens to l1 block headers, checks
// if any transaction made by batch poster was reverted.
func (b *BatchPoster) pollForReverts(ctx context.Context) {
headerCh, unsubscribe := b.l1Reader.Subscribe(false)
defer unsubscribe()

last := int64(0) // number of last seen block
for {
// Poll until:
// - L1 headers reader channel is closed, or
// - polling is through context, or
// - we see a transaction in the block from dataposter that was reverted.
select {
case h, closed := <-headerCh:
if closed {
log.Info("L1 headers channel has been closed")
return
}
// If this is the first block header, set last seen as number-1.
// We may see same block number again if there is L1 reorg, in that
// case we check the block again.
if last == 0 || last == h.Number.Int64() {
last = h.Number.Int64() - 1
}
if h.Number.Int64()-last > 100 {
log.Warn("Large gap between last seen and current block number, skipping check for reverts", "last", last, "current", h.Number)
last = h.Number.Int64()
continue
}

reverted, err := b.checkReverts(ctx, last+1, h.Number.Int64())
if err != nil {
log.Error("Checking batch reverts", "error", err)
continue
}
if reverted {
b.batchReverted.Store(true)
return
}
last = h.Number.Int64()
case <-ctx.Done():
return
}
}
}

func (b *BatchPoster) getBatchPosterPosition(ctx context.Context, blockNum *big.Int) (batchPosterPosition, error) {
bigInboxBatchCount, err := b.seqInbox.BatchCount(&bind.CallOpts{Context: ctx, BlockNumber: blockNum})
if err != nil {
Expand Down Expand Up @@ -554,6 +637,9 @@ func (b *BatchPoster) estimateGas(ctx context.Context, sequencerMessage []byte,
}

func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error) {
if b.batchReverted.Load() {
return false, fmt.Errorf("batch was reverted, not posting any more batches")
}
nonce, batchPosition, err := b.dataPoster.GetNextNonceAndMeta(ctx)
if err != nil {
return false, err
Expand Down Expand Up @@ -636,7 +722,7 @@ func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error)
cert, err := b.daWriter.Store(ctx, sequencerMsg, uint64(time.Now().Add(config.DASRetentionPeriod).Unix()), []byte{}) // b.daWriter will append signature if enabled
if errors.Is(err, das.BatchToDasFailed) {
if config.DisableDasFallbackStoreDataOnChain {
return false, errors.New("Unable to batch to DAS and fallback storing data on chain is disabled")
return false, errors.New("unable to batch to DAS and fallback storing data on chain is disabled")
}
log.Warn("Falling back to storing data on chain", "err", err)
} else if err != nil {
Expand Down Expand Up @@ -697,6 +783,7 @@ func (b *BatchPoster) Start(ctxIn context.Context) {
b.dataPoster.Start(ctxIn)
b.redisLock.Start(ctxIn)
b.StopWaiter.Start(ctxIn, b)
b.LaunchThread(b.pollForReverts)
b.CallIteratively(func(ctx context.Context) time.Duration {
var err error
if common.HexToAddress(b.config().GasRefunderAddress) != (common.Address{}) {
Expand All @@ -723,7 +810,7 @@ func (b *BatchPoster) Start(ctxIn context.Context) {
if err != nil {
b.building = nil
logLevel := log.Error
if errors.Is(err, AccumulatorNotFoundErr) || errors.Is(err, dataposter.ErrStorageRace) {
if errors.Is(err, AccumulatorNotFoundErr) || errors.Is(err, storage.ErrStorageRace) {
// Likely the inbox tracker just isn't caught up.
// Let's see if this error disappears naturally.
if b.firstAccErr == (time.Time{}) {
Expand Down
29 changes: 21 additions & 8 deletions arbnode/dataposter/data_poster.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,22 @@ import (
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rpc"
"github.com/go-redis/redis/v8"
"github.com/offchainlabs/nitro/arbnode/dataposter/leveldb"
"github.com/offchainlabs/nitro/arbnode/dataposter/slice"
"github.com/offchainlabs/nitro/arbnode/dataposter/storage"
"github.com/offchainlabs/nitro/arbutil"
"github.com/offchainlabs/nitro/util/arbmath"
"github.com/offchainlabs/nitro/util/headerreader"
"github.com/offchainlabs/nitro/util/signature"
"github.com/offchainlabs/nitro/util/stopwaiter"
flag "github.com/spf13/pflag"
"github.com/spf13/pflag"

redisstorage "github.com/offchainlabs/nitro/arbnode/dataposter/redis"
)

type queuedTransaction[Meta any] struct {
Expand Down Expand Up @@ -55,11 +61,12 @@ type DataPosterConfig struct {
UrgencyGwei float64 `koanf:"urgency-gwei" reload:"hot"`
MinFeeCapGwei float64 `koanf:"min-fee-cap-gwei" reload:"hot"`
MinTipCapGwei float64 `koanf:"min-tip-cap-gwei" reload:"hot"`
EnableLevelDB bool `koanf:"enable-leveldb" reload:"hot"`
}

type DataPosterConfigFetcher func() *DataPosterConfig

func DataPosterConfigAddOptions(prefix string, f *flag.FlagSet) {
func DataPosterConfigAddOptions(prefix string, f *pflag.FlagSet) {
f.String(prefix+".replacement-times", DefaultDataPosterConfig.ReplacementTimes, "comma-separated list of durations since first posting to attempt a replace-by-fee")
f.Bool(prefix+".wait-for-l1-finality", DefaultDataPosterConfig.WaitForL1Finality, "only treat a transaction as confirmed after L1 finality has been achieved (recommended)")
f.Uint64(prefix+".max-mempool-transactions", DefaultDataPosterConfig.MaxMempoolTransactions, "the maximum number of transactions to have queued in the mempool at once (0 = unlimited)")
Expand All @@ -68,6 +75,7 @@ func DataPosterConfigAddOptions(prefix string, f *flag.FlagSet) {
f.Float64(prefix+".urgency-gwei", DefaultDataPosterConfig.UrgencyGwei, "the urgency to use for maximum fee cap calculation")
f.Float64(prefix+".min-fee-cap-gwei", DefaultDataPosterConfig.MinFeeCapGwei, "the minimum fee cap to post transactions at")
f.Float64(prefix+".min-tip-cap-gwei", DefaultDataPosterConfig.MinTipCapGwei, "the minimum tip cap to post transactions at")
f.Bool(prefix+".enable-leveldb", DefaultDataPosterConfig.EnableLevelDB, "uses leveldb when enabled")
signature.SimpleHmacConfigAddOptions(prefix+".redis-signer", f)
}

Expand All @@ -78,6 +86,7 @@ var DefaultDataPosterConfig = DataPosterConfig{
UrgencyGwei: 2.,
MaxMempoolTransactions: 64,
MinTipCapGwei: 0.05,
EnableLevelDB: false,
}

var TestDataPosterConfig = DataPosterConfig{
Expand All @@ -88,6 +97,7 @@ var TestDataPosterConfig = DataPosterConfig{
UrgencyGwei: 2.,
MaxMempoolTransactions: 64,
MinTipCapGwei: 0.05,
EnableLevelDB: false,
}

// DataPoster must be RLP serializable and deserializable
Expand All @@ -114,7 +124,7 @@ type AttemptLocker interface {
AttemptLock(context.Context) bool
}

func NewDataPoster[Meta any](headerReader *headerreader.HeaderReader, auth *bind.TransactOpts, redisClient redis.UniversalClient, redisLock AttemptLocker, config DataPosterConfigFetcher, metadataRetriever func(ctx context.Context, blockNum *big.Int) (Meta, error)) (*DataPoster[Meta], error) {
func NewDataPoster[Meta any](db ethdb.Database, headerReader *headerreader.HeaderReader, auth *bind.TransactOpts, redisClient redis.UniversalClient, redisLock AttemptLocker, config DataPosterConfigFetcher, metadataRetriever func(ctx context.Context, blockNum *big.Int) (Meta, error)) (*DataPoster[Meta], error) {
var replacementTimes []time.Duration
var lastReplacementTime time.Duration
for _, s := range strings.Split(config().ReplacementTimes, ",") {
Expand All @@ -134,11 +144,14 @@ func NewDataPoster[Meta any](headerReader *headerreader.HeaderReader, auth *bind
// To avoid special casing "don't replace again", replace in 10 years
replacementTimes = append(replacementTimes, time.Hour*24*365*10)
var queue QueueStorage[queuedTransaction[Meta]]
if redisClient == nil {
queue = NewSliceStorage[queuedTransaction[Meta]]()
} else {
switch {
case config().EnableLevelDB:
queue = leveldb.New[queuedTransaction[Meta]](db)
case redisClient == nil:
queue = slice.NewStorage[queuedTransaction[Meta]]()
default:
var err error
queue, err = NewRedisStorage[queuedTransaction[Meta]](redisClient, "data-poster.queue", &config().RedisSigner)
queue, err = redisstorage.NewStorage[queuedTransaction[Meta]](redisClient, "data-poster.queue", &config().RedisSigner)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -460,7 +473,7 @@ func (p *DataPoster[Meta]) maybeLogError(err error, tx *queuedTransaction[Meta],
return
}
logLevel := log.Error
if errors.Is(err, ErrStorageRace) {
if errors.Is(err, storage.ErrStorageRace) {
p.errorCount[nonce]++
if p.errorCount[nonce] <= maxConsecutiveIntermittentErrors {
logLevel = log.Debug
Expand Down
Loading

0 comments on commit ef41e1d

Please sign in to comment.