diff --git a/arbnode/batch_poster.go b/arbnode/batch_poster.go index 6092e4c442..e7b22b4bb1 100644 --- a/arbnode/batch_poster.go +++ b/arbnode/batch_poster.go @@ -10,6 +10,7 @@ import ( "errors" "fmt" "math/big" + "sync/atomic" "time" "github.com/andybalholm/brotli" @@ -20,6 +21,7 @@ import ( "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/rlp" @@ -67,6 +69,8 @@ type BatchPoster struct { redisLock *SimpleRedisLock firstAccErr time.Time // first time a continuous missing accumulator occurred backlog uint64 // An estimate of the number of unposted batches + + batchReverted atomic.Bool // indicates whether data poster batch was reverted } type BatchPosterConfig struct { @@ -209,6 +213,84 @@ func NewBatchPoster(dataPosterDB ethdb.Database, l1Reader *headerreader.HeaderRe return b, nil } +// checkRevert checks blocks with number in range [from, to] whether they +// contain reverted batch_poster transaction. +func (b *BatchPoster) checkReverts(ctx context.Context, from, to int64) (bool, error) { + if from > to { + return false, fmt.Errorf("wrong range, from: %d is more to: %d", from, to) + } + for idx := from; idx <= to; idx++ { + number := big.NewInt(idx) + block, err := b.l1Reader.Client().BlockByNumber(ctx, number) + if err != nil { + return false, fmt.Errorf("getting block: %v by number: %w", number, err) + } + for idx, tx := range block.Transactions() { + from, err := b.l1Reader.Client().TransactionSender(ctx, tx, block.Hash(), uint(idx)) + if err != nil { + return false, fmt.Errorf("getting sender of transaction tx: %v, %w", tx.Hash(), err) + } + if bytes.Equal(from.Bytes(), b.dataPoster.From().Bytes()) { + r, err := b.l1Reader.Client().TransactionReceipt(ctx, tx.Hash()) + if err != nil { + return false, fmt.Errorf("getting a receipt for transaction: %v, %w", tx.Hash(), err) + } + if r.Status == types.ReceiptStatusFailed { + log.Error("Transaction from batch poster reverted", "nonce", tx.Nonce(), "txHash", tx.Hash(), "blockNumber", r.BlockNumber, "blockHash", r.BlockHash) + return true, nil + } + } + } + } + return false, nil +} + +// pollForReverts runs a gouroutine that listens to l1 block headers, checks +// if any transaction made by batch poster was reverted. +func (b *BatchPoster) pollForReverts(ctx context.Context) { + headerCh, unsubscribe := b.l1Reader.Subscribe(false) + defer unsubscribe() + + last := int64(0) // number of last seen block + for { + // Poll until: + // - L1 headers reader channel is closed, or + // - polling is through context, or + // - we see a transaction in the block from dataposter that was reverted. + select { + case h, closed := <-headerCh: + if closed { + log.Info("L1 headers channel has been closed") + return + } + // If this is the first block header, set last seen as number-1. + // We may see same block number again if there is L1 reorg, in that + // case we check the block again. + if last == 0 || last == h.Number.Int64() { + last = h.Number.Int64() - 1 + } + if h.Number.Int64()-last > 100 { + log.Warn("Large gap between last seen and current block number, skipping check for reverts", "last", last, "current", h.Number) + last = h.Number.Int64() + continue + } + + reverted, err := b.checkReverts(ctx, last+1, h.Number.Int64()) + if err != nil { + log.Error("Checking batch reverts", "error", err) + continue + } + if reverted { + b.batchReverted.Store(true) + return + } + last = h.Number.Int64() + case <-ctx.Done(): + return + } + } +} + func (b *BatchPoster) getBatchPosterPosition(ctx context.Context, blockNum *big.Int) (batchPosterPosition, error) { bigInboxBatchCount, err := b.seqInbox.BatchCount(&bind.CallOpts{Context: ctx, BlockNumber: blockNum}) if err != nil { @@ -555,6 +637,9 @@ func (b *BatchPoster) estimateGas(ctx context.Context, sequencerMessage []byte, } func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error) { + if b.batchReverted.Load() { + return false, fmt.Errorf("batch was reverted, not posting any more batches") + } nonce, batchPosition, err := b.dataPoster.GetNextNonceAndMeta(ctx) if err != nil { return false, err @@ -637,7 +722,7 @@ func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error) cert, err := b.daWriter.Store(ctx, sequencerMsg, uint64(time.Now().Add(config.DASRetentionPeriod).Unix()), []byte{}) // b.daWriter will append signature if enabled if errors.Is(err, das.BatchToDasFailed) { if config.DisableDasFallbackStoreDataOnChain { - return false, errors.New("Unable to batch to DAS and fallback storing data on chain is disabled") + return false, errors.New("unable to batch to DAS and fallback storing data on chain is disabled") } log.Warn("Falling back to storing data on chain", "err", err) } else if err != nil { @@ -698,6 +783,7 @@ func (b *BatchPoster) Start(ctxIn context.Context) { b.dataPoster.Start(ctxIn) b.redisLock.Start(ctxIn) b.StopWaiter.Start(ctxIn, b) + b.LaunchThread(b.pollForReverts) b.CallIteratively(func(ctx context.Context) time.Duration { var err error if common.HexToAddress(b.config().GasRefunderAddress) != (common.Address{}) { diff --git a/staker/block_validator.go b/staker/block_validator.go index 9096324cff..108ef5710c 100644 --- a/staker/block_validator.go +++ b/staker/block_validator.go @@ -808,7 +808,7 @@ func (v *BlockValidator) InitAssumeValid(globalState validator.GoGlobalState) er v.legacyValidInfo = nil - err := v.writeLastValidated(v.lastValidGS, nil) + err := v.writeLastValidated(globalState, nil) if err != nil { log.Error("failed writing new validated to database", "pos", v.lastValidGS, "err", err) } @@ -868,7 +868,7 @@ func (v *BlockValidator) UpdateLatestStaked(count arbutil.MessageIndex, globalSt v.validatedA = countUint64 v.valLoopPos = count validatorMsgCountValidatedGauge.Update(int64(countUint64)) - err = v.writeLastValidated(v.lastValidGS, nil) // we don't know which wasm roots were validated + err = v.writeLastValidated(globalState, nil) // we don't know which wasm roots were validated if err != nil { log.Error("failed writing valid state after reorg", "err", err) } @@ -932,7 +932,7 @@ func (v *BlockValidator) Reorg(ctx context.Context, count arbutil.MessageIndex) if v.validatedA > countUint64 { v.validatedA = countUint64 validatorMsgCountValidatedGauge.Update(int64(countUint64)) - err := v.writeLastValidated(v.lastValidGS, nil) // we don't know which wasm roots were validated + err := v.writeLastValidated(v.nextCreateStartGS, nil) // we don't know which wasm roots were validated if err != nil { log.Error("failed writing valid state after reorg", "err", err) }