From 0b6ce71397b8d230f6b7a19d6698c38235e90be7 Mon Sep 17 00:00:00 2001 From: Nodar Date: Thu, 22 Jun 2023 16:14:21 +0200 Subject: [PATCH 1/7] Poll for data poster batch reverts --- arbnode/batch_poster.go | 68 +++++++++++++++++++++++++++++++++++++ arbutil/transaction_data.go | 9 +++++ 2 files changed, 77 insertions(+) diff --git a/arbnode/batch_poster.go b/arbnode/batch_poster.go index 3e5e6a738f..9e1b8d1038 100644 --- a/arbnode/batch_poster.go +++ b/arbnode/batch_poster.go @@ -10,6 +10,7 @@ import ( "errors" "fmt" "math/big" + "sync/atomic" "time" "github.com/andybalholm/brotli" @@ -19,6 +20,7 @@ import ( "github.com/ethereum/go-ethereum/accounts/abi" "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/rlp" @@ -66,6 +68,9 @@ type BatchPoster struct { redisLock *SimpleRedisLock firstAccErr time.Time // first time a continuous missing accumulator occurred backlog uint64 // An estimate of the number of unposted batches + + batchReverted atomic.Bool // indicates whether data poster batch was reverted + stopPollingForReverts chan struct{} // channel for stopping batch revert detections } type BatchPosterConfig struct { @@ -205,9 +210,68 @@ func NewBatchPoster(l1Reader *headerreader.HeaderReader, inbox *InboxTracker, st if err != nil { return nil, err } + b.poolForReverts() return b, nil } +func (b *BatchPoster) checkReverts(ctx context.Context, hash common.Hash) (bool, error) { + block, err := b.l1Reader.Client().BlockByHash(ctx, hash) + if err != nil { + return false, fmt.Errorf("getting block by hash: %w", err) + } + for idx, tx := range block.Transactions() { + from, err := b.l1Reader.Client().TransactionSender(ctx, tx, block.Hash(), uint(idx)) + if err != nil { + return false, fmt.Errorf("getting sender of transaction tx: %v, %w", tx.Hash(), err) + } + if !bytes.Equal(from.Bytes(), b.dataPoster.From().Bytes()) { + r, err := b.l1Reader.Client().TransactionReceipt(ctx, tx.Hash()) + if err != nil { + return false, fmt.Errorf("getting a receipt for transaction: %v, %w", tx.Hash(), err) + } + if r.Status == types.ReceiptStatusFailed { + return true, nil + } + } + } + return false, nil +} + +// poolForReverts runs a gouroutine that listens to l1 block headers, checks +// if any transaction made by batch poster was reverted. +func (b *BatchPoster) poolForReverts() { + headerCh, unsubscribe := b.l1Reader.Subscribe(false) + defer unsubscribe() + + go func() { + ctx := context.Background() + for { + // Poll until: + // - L1 headers reader channel is closed, or + // - polling is cancelled from outside (through returned channel), or + // - we see a transaction in the block from dataposter that was reverted. + select { + case h, closed := <-headerCh: + if closed { + log.Info("L1 headers channel has been closed") + return + } + reverted, err := b.checkReverts(ctx, h.Hash()) + if err != nil { + log.Error("Checking batch reverts", "error", err) + continue + } + if reverted { + b.batchReverted.Store(true) + return + } + case <-b.stopPollingForReverts: + return + } + } + }() +} + func (b *BatchPoster) getBatchPosterPosition(ctx context.Context, blockNum *big.Int) (batchPosterPosition, error) { bigInboxBatchCount, err := b.seqInbox.BatchCount(&bind.CallOpts{Context: ctx, BlockNumber: blockNum}) if err != nil { @@ -554,6 +618,9 @@ func (b *BatchPoster) estimateGas(ctx context.Context, sequencerMessage []byte, } func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error) { + if b.batchReverted.Load() { + log.Error("Batch was reverted, not posting any more batches") + } nonce, batchPosition, err := b.dataPoster.GetNextNonceAndMeta(ctx) if err != nil { return false, err @@ -749,4 +816,5 @@ func (b *BatchPoster) StopAndWait() { b.StopWaiter.StopAndWait() b.dataPoster.StopAndWait() b.redisLock.StopAndWait() + b.stopPollingForReverts <- struct{}{} } diff --git a/arbutil/transaction_data.go b/arbutil/transaction_data.go index 7741af6e9b..a84b059ec3 100644 --- a/arbutil/transaction_data.go +++ b/arbutil/transaction_data.go @@ -7,6 +7,7 @@ import ( "context" "fmt" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" ) @@ -24,3 +25,11 @@ func GetLogEmitterTxData(ctx context.Context, client L1Interface, log types.Log) } return tx.Data(), nil } + +func BlockTransactions(ctx context.Context, l1Client L1Interface, hash common.Hash) (types.Transactions, error) { + b, err := l1Client.BlockByHash(ctx, hash) + if err != nil { + return nil, fmt.Errorf("getting block by hash: %w", err) + } + return b.Transactions(), nil +} From 6a6b1ece1e8c84db5f64376f0460d5f95849db92 Mon Sep 17 00:00:00 2001 From: Nodar Date: Fri, 23 Jun 2023 15:38:04 +0200 Subject: [PATCH 2/7] Check all blocks between last seen and current block number --- arbnode/batch_poster.go | 109 ++++++++++++++++++++++++---------------- 1 file changed, 65 insertions(+), 44 deletions(-) diff --git a/arbnode/batch_poster.go b/arbnode/batch_poster.go index 9e1b8d1038..cbe9f4ccfa 100644 --- a/arbnode/batch_poster.go +++ b/arbnode/batch_poster.go @@ -69,8 +69,7 @@ type BatchPoster struct { firstAccErr time.Time // first time a continuous missing accumulator occurred backlog uint64 // An estimate of the number of unposted batches - batchReverted atomic.Bool // indicates whether data poster batch was reverted - stopPollingForReverts chan struct{} // channel for stopping batch revert detections + batchReverted atomic.Bool // indicates whether data poster batch was reverted } type BatchPosterConfig struct { @@ -210,66 +209,88 @@ func NewBatchPoster(l1Reader *headerreader.HeaderReader, inbox *InboxTracker, st if err != nil { return nil, err } - b.poolForReverts() return b, nil } -func (b *BatchPoster) checkReverts(ctx context.Context, hash common.Hash) (bool, error) { - block, err := b.l1Reader.Client().BlockByHash(ctx, hash) - if err != nil { - return false, fmt.Errorf("getting block by hash: %w", err) +// checkRevert checks blocks with number in range [from, to] whether they +// contain reverted batch_poster transaction. +func (b *BatchPoster) checkReverts(ctx context.Context, from, to *big.Int) (bool, error) { + + if from.Cmp(to) == 1 { + return false, fmt.Errorf("wrong range, from: %v is more to: %v", from.Int64(), to.Int64()) } - for idx, tx := range block.Transactions() { - from, err := b.l1Reader.Client().TransactionSender(ctx, tx, block.Hash(), uint(idx)) + + one := big.NewInt(1) + + for number := new(big.Int).Set(from); number.Cmp(to) != 1; number.Add(number, one) { + block, err := b.l1Reader.Client().BlockByNumber(ctx, number) if err != nil { - return false, fmt.Errorf("getting sender of transaction tx: %v, %w", tx.Hash(), err) + return false, fmt.Errorf("getting block: %v by number: %w", number.Int64(), err) } - if !bytes.Equal(from.Bytes(), b.dataPoster.From().Bytes()) { - r, err := b.l1Reader.Client().TransactionReceipt(ctx, tx.Hash()) + for idx, tx := range block.Transactions() { + from, err := b.l1Reader.Client().TransactionSender(ctx, tx, block.Hash(), uint(idx)) if err != nil { - return false, fmt.Errorf("getting a receipt for transaction: %v, %w", tx.Hash(), err) + return false, fmt.Errorf("getting sender of transaction tx: %v, %w", tx.Hash(), err) } - if r.Status == types.ReceiptStatusFailed { - return true, nil + if !bytes.Equal(from.Bytes(), b.dataPoster.From().Bytes()) { + r, err := b.l1Reader.Client().TransactionReceipt(ctx, tx.Hash()) + if err != nil { + return false, fmt.Errorf("getting a receipt for transaction: %v, %w", tx.Hash(), err) + } + if r.Status == types.ReceiptStatusFailed { + return true, nil + } } } } return false, nil } -// poolForReverts runs a gouroutine that listens to l1 block headers, checks +// pollForReverts runs a gouroutine that listens to l1 block headers, checks // if any transaction made by batch poster was reverted. -func (b *BatchPoster) poolForReverts() { +func (b *BatchPoster) pollForReverts(ctx context.Context) { headerCh, unsubscribe := b.l1Reader.Subscribe(false) defer unsubscribe() + var ( + last *big.Int // number of last seen block + one = big.NewInt(1) + ) + for { + // Poll until: + // - L1 headers reader channel is closed, or + // - polling is through context, or + // - we see a transaction in the block from dataposter that was reverted. + select { + case h, closed := <-headerCh: + if closed { + log.Info("L1 headers channel has been closed") + return + } + // If this is the first block header, set last seen as number-1. + if last == nil { + last = new(big.Int).Set(h.Number) + last.Sub(last, one) + } + if h.Number.Int64()-last.Int64() > 100 { + log.Warn("Large gap between past seen: %v and current block number: %v, skipping check for reverts", last.Int64(), h.Number.Int64()) + last.Set(h.Number) + continue + } - go func() { - ctx := context.Background() - for { - // Poll until: - // - L1 headers reader channel is closed, or - // - polling is cancelled from outside (through returned channel), or - // - we see a transaction in the block from dataposter that was reverted. - select { - case h, closed := <-headerCh: - if closed { - log.Info("L1 headers channel has been closed") - return - } - reverted, err := b.checkReverts(ctx, h.Hash()) - if err != nil { - log.Error("Checking batch reverts", "error", err) - continue - } - if reverted { - b.batchReverted.Store(true) - return - } - case <-b.stopPollingForReverts: + reverted, err := b.checkReverts(ctx, last.Add(last, big.NewInt(1)), h.Number) + if err != nil { + log.Error("Checking batch reverts", "error", err) + continue + } + if reverted { + b.batchReverted.Store(true) return } + last.Set(h.Number) + case <-ctx.Done(): + return } - }() + } } func (b *BatchPoster) getBatchPosterPosition(ctx context.Context, blockNum *big.Int) (batchPosterPosition, error) { @@ -619,7 +640,7 @@ func (b *BatchPoster) estimateGas(ctx context.Context, sequencerMessage []byte, func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error) { if b.batchReverted.Load() { - log.Error("Batch was reverted, not posting any more batches") + return false, fmt.Errorf("batch was reverted, not posting any more batches") } nonce, batchPosition, err := b.dataPoster.GetNextNonceAndMeta(ctx) if err != nil { @@ -703,7 +724,7 @@ func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error) cert, err := b.daWriter.Store(ctx, sequencerMsg, uint64(time.Now().Add(config.DASRetentionPeriod).Unix()), []byte{}) // b.daWriter will append signature if enabled if errors.Is(err, das.BatchToDasFailed) { if config.DisableDasFallbackStoreDataOnChain { - return false, errors.New("Unable to batch to DAS and fallback storing data on chain is disabled") + return false, errors.New("unable to batch to DAS and fallback storing data on chain is disabled") } log.Warn("Falling back to storing data on chain", "err", err) } else if err != nil { @@ -764,6 +785,7 @@ func (b *BatchPoster) Start(ctxIn context.Context) { b.dataPoster.Start(ctxIn) b.redisLock.Start(ctxIn) b.StopWaiter.Start(ctxIn, b) + b.LaunchThread(b.pollForReverts) b.CallIteratively(func(ctx context.Context) time.Duration { var err error if common.HexToAddress(b.config().GasRefunderAddress) != (common.Address{}) { @@ -816,5 +838,4 @@ func (b *BatchPoster) StopAndWait() { b.StopWaiter.StopAndWait() b.dataPoster.StopAndWait() b.redisLock.StopAndWait() - b.stopPollingForReverts <- struct{}{} } From 787ce5405fa3a0ea31556f26614d8dc5c2c10b64 Mon Sep 17 00:00:00 2001 From: Nodar Date: Fri, 23 Jun 2023 15:44:10 +0200 Subject: [PATCH 3/7] drop unused function, cleanup checkReverts --- arbnode/batch_poster.go | 5 +---- arbutil/transaction_data.go | 9 --------- 2 files changed, 1 insertion(+), 13 deletions(-) diff --git a/arbnode/batch_poster.go b/arbnode/batch_poster.go index cbe9f4ccfa..3f5287f820 100644 --- a/arbnode/batch_poster.go +++ b/arbnode/batch_poster.go @@ -215,13 +215,10 @@ func NewBatchPoster(l1Reader *headerreader.HeaderReader, inbox *InboxTracker, st // checkRevert checks blocks with number in range [from, to] whether they // contain reverted batch_poster transaction. func (b *BatchPoster) checkReverts(ctx context.Context, from, to *big.Int) (bool, error) { - if from.Cmp(to) == 1 { return false, fmt.Errorf("wrong range, from: %v is more to: %v", from.Int64(), to.Int64()) } - one := big.NewInt(1) - for number := new(big.Int).Set(from); number.Cmp(to) != 1; number.Add(number, one) { block, err := b.l1Reader.Client().BlockByNumber(ctx, number) if err != nil { @@ -232,7 +229,7 @@ func (b *BatchPoster) checkReverts(ctx context.Context, from, to *big.Int) (bool if err != nil { return false, fmt.Errorf("getting sender of transaction tx: %v, %w", tx.Hash(), err) } - if !bytes.Equal(from.Bytes(), b.dataPoster.From().Bytes()) { + if bytes.Equal(from.Bytes(), b.dataPoster.From().Bytes()) { r, err := b.l1Reader.Client().TransactionReceipt(ctx, tx.Hash()) if err != nil { return false, fmt.Errorf("getting a receipt for transaction: %v, %w", tx.Hash(), err) diff --git a/arbutil/transaction_data.go b/arbutil/transaction_data.go index a84b059ec3..7741af6e9b 100644 --- a/arbutil/transaction_data.go +++ b/arbutil/transaction_data.go @@ -7,7 +7,6 @@ import ( "context" "fmt" - "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" ) @@ -25,11 +24,3 @@ func GetLogEmitterTxData(ctx context.Context, client L1Interface, log types.Log) } return tx.Data(), nil } - -func BlockTransactions(ctx context.Context, l1Client L1Interface, hash common.Hash) (types.Transactions, error) { - b, err := l1Client.BlockByHash(ctx, hash) - if err != nil { - return nil, fmt.Errorf("getting block by hash: %w", err) - } - return b.Transactions(), nil -} From afa8c31cefc655d17a1097033a3552b2c28f5800 Mon Sep 17 00:00:00 2001 From: Nodar Date: Tue, 27 Jun 2023 13:55:22 +0200 Subject: [PATCH 4/7] Log reverted transaction hash, use int64 in revert checking logic rather than big.Int --- arbnode/batch_poster.go | 34 +++++++++++++++++----------------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/arbnode/batch_poster.go b/arbnode/batch_poster.go index 3f5287f820..edc4dd8f96 100644 --- a/arbnode/batch_poster.go +++ b/arbnode/batch_poster.go @@ -214,12 +214,12 @@ func NewBatchPoster(l1Reader *headerreader.HeaderReader, inbox *InboxTracker, st // checkRevert checks blocks with number in range [from, to] whether they // contain reverted batch_poster transaction. -func (b *BatchPoster) checkReverts(ctx context.Context, from, to *big.Int) (bool, error) { - if from.Cmp(to) == 1 { - return false, fmt.Errorf("wrong range, from: %v is more to: %v", from.Int64(), to.Int64()) +func (b *BatchPoster) checkReverts(ctx context.Context, from, to int64) (bool, error) { + if from > to { + return false, fmt.Errorf("wrong range, from: %d is more to: %d", from, to) } - one := big.NewInt(1) - for number := new(big.Int).Set(from); number.Cmp(to) != 1; number.Add(number, one) { + for idx := from; idx < to; idx++ { + number := big.NewInt(idx) block, err := b.l1Reader.Client().BlockByNumber(ctx, number) if err != nil { return false, fmt.Errorf("getting block: %v by number: %w", number.Int64(), err) @@ -235,6 +235,7 @@ func (b *BatchPoster) checkReverts(ctx context.Context, from, to *big.Int) (bool return false, fmt.Errorf("getting a receipt for transaction: %v, %w", tx.Hash(), err) } if r.Status == types.ReceiptStatusFailed { + log.Error("Transaction: %v from batch poster was reverted", tx.Hash()) return true, nil } } @@ -248,10 +249,8 @@ func (b *BatchPoster) checkReverts(ctx context.Context, from, to *big.Int) (bool func (b *BatchPoster) pollForReverts(ctx context.Context) { headerCh, unsubscribe := b.l1Reader.Subscribe(false) defer unsubscribe() - var ( - last *big.Int // number of last seen block - one = big.NewInt(1) - ) + + var last int64 // number of last seen block for { // Poll until: // - L1 headers reader channel is closed, or @@ -264,17 +263,18 @@ func (b *BatchPoster) pollForReverts(ctx context.Context) { return } // If this is the first block header, set last seen as number-1. - if last == nil { - last = new(big.Int).Set(h.Number) - last.Sub(last, one) + // We may see same block number again if there is L1 reorg, in that + // case we check the block again. + if last == 0 || last == h.Number.Int64() { + last = h.Number.Int64() - 1 } - if h.Number.Int64()-last.Int64() > 100 { - log.Warn("Large gap between past seen: %v and current block number: %v, skipping check for reverts", last.Int64(), h.Number.Int64()) - last.Set(h.Number) + if h.Number.Int64()-last > 100 { + log.Warn("Large gap between last seen and current block number, skipping check for reverts", "last", last, "current", h.Number) + last = h.Number.Int64() continue } - reverted, err := b.checkReverts(ctx, last.Add(last, big.NewInt(1)), h.Number) + reverted, err := b.checkReverts(ctx, last+1, h.Number.Int64()) if err != nil { log.Error("Checking batch reverts", "error", err) continue @@ -283,7 +283,7 @@ func (b *BatchPoster) pollForReverts(ctx context.Context) { b.batchReverted.Store(true) return } - last.Set(h.Number) + last = h.Number.Int64() case <-ctx.Done(): return } From 189702aaa3220c563ecb6dc7a0a6b02f68a0c256 Mon Sep 17 00:00:00 2001 From: Nodar Date: Tue, 27 Jun 2023 13:57:55 +0200 Subject: [PATCH 5/7] check including current block for transaction reverts --- arbnode/batch_poster.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/arbnode/batch_poster.go b/arbnode/batch_poster.go index edc4dd8f96..9a969fc387 100644 --- a/arbnode/batch_poster.go +++ b/arbnode/batch_poster.go @@ -218,7 +218,7 @@ func (b *BatchPoster) checkReverts(ctx context.Context, from, to int64) (bool, e if from > to { return false, fmt.Errorf("wrong range, from: %d is more to: %d", from, to) } - for idx := from; idx < to; idx++ { + for idx := from; idx <= to; idx++ { number := big.NewInt(idx) block, err := b.l1Reader.Client().BlockByNumber(ctx, number) if err != nil { @@ -250,7 +250,7 @@ func (b *BatchPoster) pollForReverts(ctx context.Context) { headerCh, unsubscribe := b.l1Reader.Subscribe(false) defer unsubscribe() - var last int64 // number of last seen block + last := int64(0) // number of last seen block for { // Poll until: // - L1 headers reader channel is closed, or From 1ee47ca8250c721a0601946eaba3bd3f682304d4 Mon Sep 17 00:00:00 2001 From: Tsahi Zidenberg Date: Tue, 11 Jul 2023 10:57:19 -0600 Subject: [PATCH 6/7] block_validator: fix calls to writeLastValidated the function now writes lastValidGS, so don't pass that as agument --- staker/block_validator.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/staker/block_validator.go b/staker/block_validator.go index 9096324cff..108ef5710c 100644 --- a/staker/block_validator.go +++ b/staker/block_validator.go @@ -808,7 +808,7 @@ func (v *BlockValidator) InitAssumeValid(globalState validator.GoGlobalState) er v.legacyValidInfo = nil - err := v.writeLastValidated(v.lastValidGS, nil) + err := v.writeLastValidated(globalState, nil) if err != nil { log.Error("failed writing new validated to database", "pos", v.lastValidGS, "err", err) } @@ -868,7 +868,7 @@ func (v *BlockValidator) UpdateLatestStaked(count arbutil.MessageIndex, globalSt v.validatedA = countUint64 v.valLoopPos = count validatorMsgCountValidatedGauge.Update(int64(countUint64)) - err = v.writeLastValidated(v.lastValidGS, nil) // we don't know which wasm roots were validated + err = v.writeLastValidated(globalState, nil) // we don't know which wasm roots were validated if err != nil { log.Error("failed writing valid state after reorg", "err", err) } @@ -932,7 +932,7 @@ func (v *BlockValidator) Reorg(ctx context.Context, count arbutil.MessageIndex) if v.validatedA > countUint64 { v.validatedA = countUint64 validatorMsgCountValidatedGauge.Update(int64(countUint64)) - err := v.writeLastValidated(v.lastValidGS, nil) // we don't know which wasm roots were validated + err := v.writeLastValidated(v.nextCreateStartGS, nil) // we don't know which wasm roots were validated if err != nil { log.Error("failed writing valid state after reorg", "err", err) } From 6f4946177875b0a4a2d4278f1a1aa9431128af83 Mon Sep 17 00:00:00 2001 From: Nodar Date: Tue, 11 Jul 2023 23:27:20 +0200 Subject: [PATCH 7/7] Fix logging in batch poster, drop .Int64() when formatting biging since it implements stringer --- arbnode/batch_poster.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/arbnode/batch_poster.go b/arbnode/batch_poster.go index 9a969fc387..9f8da95841 100644 --- a/arbnode/batch_poster.go +++ b/arbnode/batch_poster.go @@ -222,7 +222,7 @@ func (b *BatchPoster) checkReverts(ctx context.Context, from, to int64) (bool, e number := big.NewInt(idx) block, err := b.l1Reader.Client().BlockByNumber(ctx, number) if err != nil { - return false, fmt.Errorf("getting block: %v by number: %w", number.Int64(), err) + return false, fmt.Errorf("getting block: %v by number: %w", number, err) } for idx, tx := range block.Transactions() { from, err := b.l1Reader.Client().TransactionSender(ctx, tx, block.Hash(), uint(idx)) @@ -235,7 +235,7 @@ func (b *BatchPoster) checkReverts(ctx context.Context, from, to int64) (bool, e return false, fmt.Errorf("getting a receipt for transaction: %v, %w", tx.Hash(), err) } if r.Status == types.ReceiptStatusFailed { - log.Error("Transaction: %v from batch poster was reverted", tx.Hash()) + log.Error("Transaction from batch poster reverted", "nonce", tx.Nonce(), "txHash", tx.Hash(), "blockNumber", r.BlockNumber, "blockHash", r.BlockHash) return true, nil } }