Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into cgroupsv2-resourcem…
Browse files Browse the repository at this point in the history
…anager
  • Loading branch information
Tristan-Wilson committed Jul 17, 2023
2 parents 5c1d6c7 + 82ec72f commit 0009f06
Show file tree
Hide file tree
Showing 68 changed files with 3,419 additions and 1,969 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,8 @@ jobs:

- name: Set environment variables
run: |
mkdir -p target/tmp
echo "TMPDIR=$(pwd)/target/tmp" >> "$GITHUB_ENV"
mkdir -p target/tmp/deadbeefbee
echo "TMPDIR=$(pwd)/target/tmp/deadbeefbee" >> "$GITHUB_ENV"
echo "GOMEMLIMIT=6GiB" >> "$GITHUB_ENV"
echo "GOGC=80" >> "$GITHUB_ENV"
Expand Down
1 change: 1 addition & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ COPY ./scripts/download-machine.sh .
#RUN ./download-machine.sh consensus-v9 0xd1842bfbe047322b3f3b3635b5fe62eb611557784d17ac1d2b1ce9c170af6544
RUN ./download-machine.sh consensus-v10 0x6b94a7fc388fd8ef3def759297828dc311761e88d8179c7ee8d3887dc554f3c3
RUN ./download-machine.sh consensus-v10.1 0xda4e3ad5e7feacb817c21c8d0220da7650fe9051ece68a3f0b1c5d38bbb27b21
RUN ./download-machine.sh consensus-v10.2 0x0754e09320c381566cc0449904c377a52bd34a6b9404432e80afd573b67f7b17

FROM golang:1.20-bullseye as node-builder
WORKDIR /workspace
Expand Down
40 changes: 14 additions & 26 deletions arbnode/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,23 +9,17 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/rpc"
"github.com/offchainlabs/nitro/arbutil"
"github.com/offchainlabs/nitro/staker"
"github.com/offchainlabs/nitro/validator"
)

type BlockValidatorAPI struct {
val *staker.BlockValidator
}

func (a *BlockValidatorAPI) LatestValidatedBlock(ctx context.Context) (hexutil.Uint64, error) {
block := a.val.LastBlockValidated()
return hexutil.Uint64(block), nil
}

func (a *BlockValidatorAPI) LatestValidatedBlockHash(ctx context.Context) (common.Hash, error) {
_, hash, _ := a.val.LastBlockValidatedAndHash()
return hash, nil
func (a *BlockValidatorAPI) LatestValidated(ctx context.Context) (*staker.GlobalStateValidatedInfo, error) {
return a.val.ReadLastValidatedInfo()
}

type BlockValidatorDebugAPI struct {
Expand All @@ -34,25 +28,16 @@ type BlockValidatorDebugAPI struct {
}

type ValidateBlockResult struct {
Valid bool `json:"valid"`
Latency string `json:"latency"`
Valid bool `json:"valid"`
Latency string `json:"latency"`
GlobalState validator.GoGlobalState `json:"globalstate"`
}

func (a *BlockValidatorDebugAPI) ValidateBlock(
ctx context.Context, blockNum rpc.BlockNumber, full bool, moduleRootOptional *common.Hash,
func (a *BlockValidatorDebugAPI) ValidateMessageNumber(
ctx context.Context, msgNum hexutil.Uint64, full bool, moduleRootOptional *common.Hash,
) (ValidateBlockResult, error) {
result := ValidateBlockResult{}

if blockNum < 0 {
return result, errors.New("this method only accepts absolute block numbers")
}
header := a.blockchain.GetHeaderByNumber(uint64(blockNum))
if header == nil {
return result, errors.New("block not found")
}
if !a.blockchain.Config().IsArbitrumNitro(header.Number) {
return result, types.ErrUseFallback
}
var moduleRoot common.Hash
if moduleRootOptional != nil {
moduleRoot = *moduleRootOptional
Expand All @@ -64,8 +49,11 @@ func (a *BlockValidatorDebugAPI) ValidateBlock(
moduleRoot = moduleRoots[0]
}
start_time := time.Now()
valid, err := a.val.ValidateBlock(ctx, header, full, moduleRoot)
result.Valid = valid
valid, gs, err := a.val.ValidateResult(ctx, arbutil.MessageIndex(msgNum), full, moduleRoot)
result.Latency = fmt.Sprintf("%vms", time.Since(start_time).Milliseconds())
if gs != nil {
result.GlobalState = *gs
}
result.Valid = valid
return result, err
}
101 changes: 94 additions & 7 deletions arbnode/batch_poster.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,24 @@ import (
"errors"
"fmt"
"math/big"
"sync/atomic"
"time"

"github.com/andybalholm/brotli"
flag "github.com/spf13/pflag"
"github.com/spf13/pflag"

"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/accounts/abi"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/rpc"

"github.com/offchainlabs/nitro/arbnode/dataposter"
"github.com/offchainlabs/nitro/arbnode/dataposter/storage"
"github.com/offchainlabs/nitro/arbos/arbostypes"
"github.com/offchainlabs/nitro/arbstate"
"github.com/offchainlabs/nitro/arbutil"
Expand Down Expand Up @@ -66,6 +69,8 @@ type BatchPoster struct {
redisLock *SimpleRedisLock
firstAccErr time.Time // first time a continuous missing accumulator occurred
backlog uint64 // An estimate of the number of unposted batches

batchReverted atomic.Bool // indicates whether data poster batch was reverted
}

type BatchPosterConfig struct {
Expand Down Expand Up @@ -101,7 +106,7 @@ func (c *BatchPosterConfig) Validate() error {

type BatchPosterConfigFetcher func() *BatchPosterConfig

func BatchPosterConfigAddOptions(prefix string, f *flag.FlagSet) {
func BatchPosterConfigAddOptions(prefix string, f *pflag.FlagSet) {
f.Bool(prefix+".enable", DefaultBatchPosterConfig.Enable, "enable posting batches to l1")
f.Bool(prefix+".disable-das-fallback-store-data-on-chain", DefaultBatchPosterConfig.DisableDasFallbackStoreDataOnChain, "If unable to batch to DAS, disable fallback storing data on chain")
f.Int(prefix+".max-size", DefaultBatchPosterConfig.MaxBatchSize, "maximum batch size")
Expand Down Expand Up @@ -158,7 +163,7 @@ var TestBatchPosterConfig = BatchPosterConfig{
L1Wallet: DefaultBatchPosterL1WalletConfig,
}

func NewBatchPoster(l1Reader *headerreader.HeaderReader, inbox *InboxTracker, streamer *TransactionStreamer, syncMonitor *SyncMonitor, config BatchPosterConfigFetcher, deployInfo *chaininfo.RollupAddresses, transactOpts *bind.TransactOpts, daWriter das.DataAvailabilityServiceWriter) (*BatchPoster, error) {
func NewBatchPoster(dataPosterDB ethdb.Database, l1Reader *headerreader.HeaderReader, inbox *InboxTracker, streamer *TransactionStreamer, syncMonitor *SyncMonitor, config BatchPosterConfigFetcher, deployInfo *chaininfo.RollupAddresses, transactOpts *bind.TransactOpts, daWriter das.DataAvailabilityServiceWriter) (*BatchPoster, error) {
seqInbox, err := bridgegen.NewSequencerInbox(deployInfo.SequencerInbox, l1Reader.Client())
if err != nil {
return nil, err
Expand Down Expand Up @@ -201,13 +206,91 @@ func NewBatchPoster(l1Reader *headerreader.HeaderReader, inbox *InboxTracker, st
dataPosterConfigFetcher := func() *dataposter.DataPosterConfig {
return &config().DataPoster
}
b.dataPoster, err = dataposter.NewDataPoster(l1Reader, transactOpts, redisClient, redisLock, dataPosterConfigFetcher, b.getBatchPosterPosition)
b.dataPoster, err = dataposter.NewDataPoster(dataPosterDB, l1Reader, transactOpts, redisClient, redisLock, dataPosterConfigFetcher, b.getBatchPosterPosition)
if err != nil {
return nil, err
}
return b, nil
}

// checkRevert checks blocks with number in range [from, to] whether they
// contain reverted batch_poster transaction.
func (b *BatchPoster) checkReverts(ctx context.Context, from, to int64) (bool, error) {
if from > to {
return false, fmt.Errorf("wrong range, from: %d is more to: %d", from, to)
}
for idx := from; idx <= to; idx++ {
number := big.NewInt(idx)
block, err := b.l1Reader.Client().BlockByNumber(ctx, number)
if err != nil {
return false, fmt.Errorf("getting block: %v by number: %w", number, err)
}
for idx, tx := range block.Transactions() {
from, err := b.l1Reader.Client().TransactionSender(ctx, tx, block.Hash(), uint(idx))
if err != nil {
return false, fmt.Errorf("getting sender of transaction tx: %v, %w", tx.Hash(), err)
}
if bytes.Equal(from.Bytes(), b.dataPoster.From().Bytes()) {
r, err := b.l1Reader.Client().TransactionReceipt(ctx, tx.Hash())
if err != nil {
return false, fmt.Errorf("getting a receipt for transaction: %v, %w", tx.Hash(), err)
}
if r.Status == types.ReceiptStatusFailed {
log.Error("Transaction from batch poster reverted", "nonce", tx.Nonce(), "txHash", tx.Hash(), "blockNumber", r.BlockNumber, "blockHash", r.BlockHash)
return true, nil
}
}
}
}
return false, nil
}

// pollForReverts runs a gouroutine that listens to l1 block headers, checks
// if any transaction made by batch poster was reverted.
func (b *BatchPoster) pollForReverts(ctx context.Context) {
headerCh, unsubscribe := b.l1Reader.Subscribe(false)
defer unsubscribe()

last := int64(0) // number of last seen block
for {
// Poll until:
// - L1 headers reader channel is closed, or
// - polling is through context, or
// - we see a transaction in the block from dataposter that was reverted.
select {
case h, closed := <-headerCh:
if closed {
log.Info("L1 headers channel has been closed")
return
}
// If this is the first block header, set last seen as number-1.
// We may see same block number again if there is L1 reorg, in that
// case we check the block again.
if last == 0 || last == h.Number.Int64() {
last = h.Number.Int64() - 1
}
if h.Number.Int64()-last > 100 {
log.Warn("Large gap between last seen and current block number, skipping check for reverts", "last", last, "current", h.Number)
last = h.Number.Int64()
continue
}

reverted, err := b.checkReverts(ctx, last+1, h.Number.Int64())
if err != nil {
log.Error("Checking batch reverts", "error", err)
continue
}
if reverted {
b.batchReverted.Store(true)
return
}
last = h.Number.Int64()
case <-ctx.Done():
return
}
}
}

func (b *BatchPoster) getBatchPosterPosition(ctx context.Context, blockNum *big.Int) (batchPosterPosition, error) {
bigInboxBatchCount, err := b.seqInbox.BatchCount(&bind.CallOpts{Context: ctx, BlockNumber: blockNum})
if err != nil {
Expand Down Expand Up @@ -554,6 +637,9 @@ func (b *BatchPoster) estimateGas(ctx context.Context, sequencerMessage []byte,
}

func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error) {
if b.batchReverted.Load() {
return false, fmt.Errorf("batch was reverted, not posting any more batches")
}
nonce, batchPosition, err := b.dataPoster.GetNextNonceAndMeta(ctx)
if err != nil {
return false, err
Expand Down Expand Up @@ -636,7 +722,7 @@ func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error)
cert, err := b.daWriter.Store(ctx, sequencerMsg, uint64(time.Now().Add(config.DASRetentionPeriod).Unix()), []byte{}) // b.daWriter will append signature if enabled
if errors.Is(err, das.BatchToDasFailed) {
if config.DisableDasFallbackStoreDataOnChain {
return false, errors.New("Unable to batch to DAS and fallback storing data on chain is disabled")
return false, errors.New("unable to batch to DAS and fallback storing data on chain is disabled")
}
log.Warn("Falling back to storing data on chain", "err", err)
} else if err != nil {
Expand Down Expand Up @@ -697,6 +783,7 @@ func (b *BatchPoster) Start(ctxIn context.Context) {
b.dataPoster.Start(ctxIn)
b.redisLock.Start(ctxIn)
b.StopWaiter.Start(ctxIn, b)
b.LaunchThread(b.pollForReverts)
b.CallIteratively(func(ctx context.Context) time.Duration {
var err error
if common.HexToAddress(b.config().GasRefunderAddress) != (common.Address{}) {
Expand All @@ -723,7 +810,7 @@ func (b *BatchPoster) Start(ctxIn context.Context) {
if err != nil {
b.building = nil
logLevel := log.Error
if errors.Is(err, AccumulatorNotFoundErr) || errors.Is(err, dataposter.ErrStorageRace) {
if errors.Is(err, AccumulatorNotFoundErr) || errors.Is(err, storage.ErrStorageRace) {
// Likely the inbox tracker just isn't caught up.
// Let's see if this error disappears naturally.
if b.firstAccErr == (time.Time{}) {
Expand Down
31 changes: 23 additions & 8 deletions arbnode/dataposter/data_poster.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,22 @@ import (
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rpc"
"github.com/go-redis/redis/v8"
"github.com/offchainlabs/nitro/arbnode/dataposter/leveldb"
"github.com/offchainlabs/nitro/arbnode/dataposter/slice"
"github.com/offchainlabs/nitro/arbnode/dataposter/storage"
"github.com/offchainlabs/nitro/arbutil"
"github.com/offchainlabs/nitro/util/arbmath"
"github.com/offchainlabs/nitro/util/headerreader"
"github.com/offchainlabs/nitro/util/signature"
"github.com/offchainlabs/nitro/util/stopwaiter"
flag "github.com/spf13/pflag"
"github.com/spf13/pflag"

redisstorage "github.com/offchainlabs/nitro/arbnode/dataposter/redis"
)

type queuedTransaction[Meta any] struct {
Expand All @@ -36,6 +42,8 @@ type queuedTransaction[Meta any] struct {
NextReplacement time.Time
}

// Note: one of the implementation of this interface (Redis storage) does not
// support duplicate values.
type QueueStorage[Item any] interface {
GetContents(ctx context.Context, startingIndex uint64, maxResults uint64) ([]*Item, error)
GetLast(ctx context.Context) (*Item, error)
Expand All @@ -55,11 +63,12 @@ type DataPosterConfig struct {
UrgencyGwei float64 `koanf:"urgency-gwei" reload:"hot"`
MinFeeCapGwei float64 `koanf:"min-fee-cap-gwei" reload:"hot"`
MinTipCapGwei float64 `koanf:"min-tip-cap-gwei" reload:"hot"`
EnableLevelDB bool `koanf:"enable-leveldb" reload:"hot"`
}

type DataPosterConfigFetcher func() *DataPosterConfig

func DataPosterConfigAddOptions(prefix string, f *flag.FlagSet) {
func DataPosterConfigAddOptions(prefix string, f *pflag.FlagSet) {
f.String(prefix+".replacement-times", DefaultDataPosterConfig.ReplacementTimes, "comma-separated list of durations since first posting to attempt a replace-by-fee")
f.Bool(prefix+".wait-for-l1-finality", DefaultDataPosterConfig.WaitForL1Finality, "only treat a transaction as confirmed after L1 finality has been achieved (recommended)")
f.Uint64(prefix+".max-mempool-transactions", DefaultDataPosterConfig.MaxMempoolTransactions, "the maximum number of transactions to have queued in the mempool at once (0 = unlimited)")
Expand All @@ -68,6 +77,7 @@ func DataPosterConfigAddOptions(prefix string, f *flag.FlagSet) {
f.Float64(prefix+".urgency-gwei", DefaultDataPosterConfig.UrgencyGwei, "the urgency to use for maximum fee cap calculation")
f.Float64(prefix+".min-fee-cap-gwei", DefaultDataPosterConfig.MinFeeCapGwei, "the minimum fee cap to post transactions at")
f.Float64(prefix+".min-tip-cap-gwei", DefaultDataPosterConfig.MinTipCapGwei, "the minimum tip cap to post transactions at")
f.Bool(prefix+".enable-leveldb", DefaultDataPosterConfig.EnableLevelDB, "uses leveldb when enabled")
signature.SimpleHmacConfigAddOptions(prefix+".redis-signer", f)
}

Expand All @@ -78,6 +88,7 @@ var DefaultDataPosterConfig = DataPosterConfig{
UrgencyGwei: 2.,
MaxMempoolTransactions: 64,
MinTipCapGwei: 0.05,
EnableLevelDB: false,
}

var TestDataPosterConfig = DataPosterConfig{
Expand All @@ -88,6 +99,7 @@ var TestDataPosterConfig = DataPosterConfig{
UrgencyGwei: 2.,
MaxMempoolTransactions: 64,
MinTipCapGwei: 0.05,
EnableLevelDB: false,
}

// DataPoster must be RLP serializable and deserializable
Expand All @@ -114,7 +126,7 @@ type AttemptLocker interface {
AttemptLock(context.Context) bool
}

func NewDataPoster[Meta any](headerReader *headerreader.HeaderReader, auth *bind.TransactOpts, redisClient redis.UniversalClient, redisLock AttemptLocker, config DataPosterConfigFetcher, metadataRetriever func(ctx context.Context, blockNum *big.Int) (Meta, error)) (*DataPoster[Meta], error) {
func NewDataPoster[Meta any](db ethdb.Database, headerReader *headerreader.HeaderReader, auth *bind.TransactOpts, redisClient redis.UniversalClient, redisLock AttemptLocker, config DataPosterConfigFetcher, metadataRetriever func(ctx context.Context, blockNum *big.Int) (Meta, error)) (*DataPoster[Meta], error) {
var replacementTimes []time.Duration
var lastReplacementTime time.Duration
for _, s := range strings.Split(config().ReplacementTimes, ",") {
Expand All @@ -134,11 +146,14 @@ func NewDataPoster[Meta any](headerReader *headerreader.HeaderReader, auth *bind
// To avoid special casing "don't replace again", replace in 10 years
replacementTimes = append(replacementTimes, time.Hour*24*365*10)
var queue QueueStorage[queuedTransaction[Meta]]
if redisClient == nil {
queue = NewSliceStorage[queuedTransaction[Meta]]()
} else {
switch {
case config().EnableLevelDB:
queue = leveldb.New[queuedTransaction[Meta]](db)
case redisClient == nil:
queue = slice.NewStorage[queuedTransaction[Meta]]()
default:
var err error
queue, err = NewRedisStorage[queuedTransaction[Meta]](redisClient, "data-poster.queue", &config().RedisSigner)
queue, err = redisstorage.NewStorage[queuedTransaction[Meta]](redisClient, "data-poster.queue", &config().RedisSigner)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -460,7 +475,7 @@ func (p *DataPoster[Meta]) maybeLogError(err error, tx *queuedTransaction[Meta],
return
}
logLevel := log.Error
if errors.Is(err, ErrStorageRace) {
if errors.Is(err, storage.ErrStorageRace) {
p.errorCount[nonce]++
if p.errorCount[nonce] <= maxConsecutiveIntermittentErrors {
logLevel = log.Debug
Expand Down
Loading

0 comments on commit 0009f06

Please sign in to comment.