Skip to content

Commit

Permalink
Merge branch 'master' into leveldb-based-dataposter
Browse files Browse the repository at this point in the history
  • Loading branch information
anodar committed Jul 11, 2023
2 parents aec5d79 + 274c78a commit 35097b1
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 4 deletions.
88 changes: 87 additions & 1 deletion arbnode/batch_poster.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"errors"
"fmt"
"math/big"
"sync/atomic"
"time"

"github.com/andybalholm/brotli"
Expand All @@ -20,6 +21,7 @@ import (
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethdb"

Check failure on line 23 in arbnode/batch_poster.go

View workflow job for this annotation

GitHub Actions / Go Tests (challenge)

File is not `gofmt`-ed with `-s` (gofmt)

Check failure on line 23 in arbnode/batch_poster.go

View workflow job for this annotation

GitHub Actions / Go Tests (defaults)

File is not `gofmt`-ed with `-s` (gofmt)

Check failure on line 23 in arbnode/batch_poster.go

View workflow job for this annotation

GitHub Actions / Go Tests (race)

File is not `gofmt`-ed with `-s` (gofmt)
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/rlp"
Expand Down Expand Up @@ -67,6 +69,8 @@ type BatchPoster struct {
redisLock *SimpleRedisLock
firstAccErr time.Time // first time a continuous missing accumulator occurred
backlog uint64 // An estimate of the number of unposted batches

batchReverted atomic.Bool // indicates whether data poster batch was reverted
}

type BatchPosterConfig struct {
Expand Down Expand Up @@ -209,6 +213,84 @@ func NewBatchPoster(dataPosterDB ethdb.Database, l1Reader *headerreader.HeaderRe
return b, nil
}

// checkRevert checks blocks with number in range [from, to] whether they
// contain reverted batch_poster transaction.
func (b *BatchPoster) checkReverts(ctx context.Context, from, to int64) (bool, error) {
if from > to {
return false, fmt.Errorf("wrong range, from: %d is more to: %d", from, to)
}
for idx := from; idx <= to; idx++ {
number := big.NewInt(idx)
block, err := b.l1Reader.Client().BlockByNumber(ctx, number)
if err != nil {
return false, fmt.Errorf("getting block: %v by number: %w", number, err)
}
for idx, tx := range block.Transactions() {
from, err := b.l1Reader.Client().TransactionSender(ctx, tx, block.Hash(), uint(idx))
if err != nil {
return false, fmt.Errorf("getting sender of transaction tx: %v, %w", tx.Hash(), err)
}
if bytes.Equal(from.Bytes(), b.dataPoster.From().Bytes()) {
r, err := b.l1Reader.Client().TransactionReceipt(ctx, tx.Hash())
if err != nil {
return false, fmt.Errorf("getting a receipt for transaction: %v, %w", tx.Hash(), err)
}
if r.Status == types.ReceiptStatusFailed {
log.Error("Transaction from batch poster reverted", "nonce", tx.Nonce(), "txHash", tx.Hash(), "blockNumber", r.BlockNumber, "blockHash", r.BlockHash)
return true, nil
}
}
}
}
return false, nil
}

// pollForReverts runs a gouroutine that listens to l1 block headers, checks
// if any transaction made by batch poster was reverted.
func (b *BatchPoster) pollForReverts(ctx context.Context) {
headerCh, unsubscribe := b.l1Reader.Subscribe(false)
defer unsubscribe()

last := int64(0) // number of last seen block
for {
// Poll until:
// - L1 headers reader channel is closed, or
// - polling is through context, or
// - we see a transaction in the block from dataposter that was reverted.
select {
case h, closed := <-headerCh:
if closed {
log.Info("L1 headers channel has been closed")
return
}
// If this is the first block header, set last seen as number-1.
// We may see same block number again if there is L1 reorg, in that
// case we check the block again.
if last == 0 || last == h.Number.Int64() {
last = h.Number.Int64() - 1
}
if h.Number.Int64()-last > 100 {
log.Warn("Large gap between last seen and current block number, skipping check for reverts", "last", last, "current", h.Number)
last = h.Number.Int64()
continue
}

reverted, err := b.checkReverts(ctx, last+1, h.Number.Int64())
if err != nil {
log.Error("Checking batch reverts", "error", err)
continue
}
if reverted {
b.batchReverted.Store(true)
return
}
last = h.Number.Int64()
case <-ctx.Done():
return
}
}
}

func (b *BatchPoster) getBatchPosterPosition(ctx context.Context, blockNum *big.Int) (batchPosterPosition, error) {
bigInboxBatchCount, err := b.seqInbox.BatchCount(&bind.CallOpts{Context: ctx, BlockNumber: blockNum})
if err != nil {
Expand Down Expand Up @@ -555,6 +637,9 @@ func (b *BatchPoster) estimateGas(ctx context.Context, sequencerMessage []byte,
}

func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error) {
if b.batchReverted.Load() {
return false, fmt.Errorf("batch was reverted, not posting any more batches")
}
nonce, batchPosition, err := b.dataPoster.GetNextNonceAndMeta(ctx)
if err != nil {
return false, err
Expand Down Expand Up @@ -637,7 +722,7 @@ func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error)
cert, err := b.daWriter.Store(ctx, sequencerMsg, uint64(time.Now().Add(config.DASRetentionPeriod).Unix()), []byte{}) // b.daWriter will append signature if enabled
if errors.Is(err, das.BatchToDasFailed) {
if config.DisableDasFallbackStoreDataOnChain {
return false, errors.New("Unable to batch to DAS and fallback storing data on chain is disabled")
return false, errors.New("unable to batch to DAS and fallback storing data on chain is disabled")
}
log.Warn("Falling back to storing data on chain", "err", err)
} else if err != nil {
Expand Down Expand Up @@ -698,6 +783,7 @@ func (b *BatchPoster) Start(ctxIn context.Context) {
b.dataPoster.Start(ctxIn)
b.redisLock.Start(ctxIn)
b.StopWaiter.Start(ctxIn, b)
b.LaunchThread(b.pollForReverts)
b.CallIteratively(func(ctx context.Context) time.Duration {
var err error
if common.HexToAddress(b.config().GasRefunderAddress) != (common.Address{}) {
Expand Down
6 changes: 3 additions & 3 deletions staker/block_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -808,7 +808,7 @@ func (v *BlockValidator) InitAssumeValid(globalState validator.GoGlobalState) er

v.legacyValidInfo = nil

err := v.writeLastValidated(v.lastValidGS, nil)
err := v.writeLastValidated(globalState, nil)
if err != nil {
log.Error("failed writing new validated to database", "pos", v.lastValidGS, "err", err)
}
Expand Down Expand Up @@ -868,7 +868,7 @@ func (v *BlockValidator) UpdateLatestStaked(count arbutil.MessageIndex, globalSt
v.validatedA = countUint64
v.valLoopPos = count
validatorMsgCountValidatedGauge.Update(int64(countUint64))
err = v.writeLastValidated(v.lastValidGS, nil) // we don't know which wasm roots were validated
err = v.writeLastValidated(globalState, nil) // we don't know which wasm roots were validated
if err != nil {
log.Error("failed writing valid state after reorg", "err", err)
}
Expand Down Expand Up @@ -932,7 +932,7 @@ func (v *BlockValidator) Reorg(ctx context.Context, count arbutil.MessageIndex)
if v.validatedA > countUint64 {
v.validatedA = countUint64
validatorMsgCountValidatedGauge.Update(int64(countUint64))
err := v.writeLastValidated(v.lastValidGS, nil) // we don't know which wasm roots were validated
err := v.writeLastValidated(v.nextCreateStartGS, nil) // we don't know which wasm roots were validated
if err != nil {
log.Error("failed writing valid state after reorg", "err", err)
}
Expand Down

0 comments on commit 35097b1

Please sign in to comment.