Skip to content

Commit

Permalink
resolve conflict and address PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
ganeshvanahalli committed Feb 21, 2024
2 parents 93e0154 + 3e14543 commit 3295032
Show file tree
Hide file tree
Showing 46 changed files with 1,080 additions and 314 deletions.
30 changes: 30 additions & 0 deletions .github/workflows/release-ci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
name: Release CI
run-name: Release CI triggered from @${{ github.actor }} of ${{ github.head_ref }}

on:
workflow_dispatch:

jobs:
build_and_run:
runs-on: ubuntu-8

steps:
- name: Checkout
uses: actions/checkout@v4
with:
submodules: recursive

- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
with:
driver-opts: network=host

- name: Cache Docker layers
uses: actions/cache@v3
with:
path: /tmp/.buildx-cache
key: ${{ runner.os }}-buildx-${{ hashFiles('Dockerfile') }}
restore-keys: ${{ runner.os }}-buildx-

- name: Startup Nitro testnode
run: ./scripts/startup-testnode.bash
3 changes: 2 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ RUN ./download-machine.sh consensus-v10.2 0x0754e09320c381566cc0449904c377a52bd3
RUN ./download-machine.sh consensus-v10.3 0xf559b6d4fa869472dabce70fe1c15221bdda837533dfd891916836975b434dec
RUN ./download-machine.sh consensus-v11 0xf4389b835497a910d7ba3ebfb77aa93da985634f3c052de1290360635be40c4a
RUN ./download-machine.sh consensus-v11.1 0x68e4fe5023f792d4ef584796c84d710303a5e12ea02d6e37e2b5e9c4332507c4
RUN ./download-machine.sh consensus-v20 0x8b104a2e80ac6165dc58b9048de12f301d70b02a0ab51396c22b4b4b802a16a4

FROM golang:1.20-bookworm as node-builder
WORKDIR /workspace
Expand Down Expand Up @@ -200,7 +201,7 @@ FROM debian:bookworm-slim as nitro-fuzzer
COPY --from=fuzz-builder /workspace/fuzzers/*.fuzz /usr/local/bin/
COPY ./scripts/fuzz.bash /usr/local/bin
RUN mkdir /fuzzcache
ENTRYPOINT [ "/usr/local/bin/fuzz.bash", "--binary-path", "/usr/local/bin/", "--fuzzcache-path", "/fuzzcache" ]
ENTRYPOINT [ "/usr/local/bin/fuzz.bash", "FuzzStateTransition", "--binary-path", "/usr/local/bin/", "--fuzzcache-path", "/fuzzcache" ]

FROM debian:bookworm-slim as nitro-node-slim
WORKDIR /home/user
Expand Down
96 changes: 53 additions & 43 deletions arbnode/batch_poster.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/offchainlabs/nitro/cmd/chaininfo"
"github.com/offchainlabs/nitro/cmd/genericconf"
"github.com/offchainlabs/nitro/das"
"github.com/offchainlabs/nitro/execution"
"github.com/offchainlabs/nitro/solgen/go/bridgegen"
"github.com/offchainlabs/nitro/util"
"github.com/offchainlabs/nitro/util/arbmath"
Expand Down Expand Up @@ -73,22 +74,23 @@ type batchPosterPosition struct {

type BatchPoster struct {
stopwaiter.StopWaiter
l1Reader *headerreader.HeaderReader
inbox *InboxTracker
streamer *TransactionStreamer
config BatchPosterConfigFetcher
seqInbox *bridgegen.SequencerInbox
bridge *bridgegen.Bridge
syncMonitor *SyncMonitor
seqInboxABI *abi.ABI
seqInboxAddr common.Address
bridgeAddr common.Address
gasRefunderAddr common.Address
building *buildingBatch
daWriter das.DataAvailabilityServiceWriter
dataPoster *dataposter.DataPoster
redisLock *redislock.Simple
messagesPerBatch *arbmath.MovingAverage[uint64]
l1Reader *headerreader.HeaderReader
inbox *InboxTracker
streamer *TransactionStreamer
arbOSVersionGetter execution.FullExecutionClient
config BatchPosterConfigFetcher
seqInbox *bridgegen.SequencerInbox
bridge *bridgegen.Bridge
syncMonitor *SyncMonitor
seqInboxABI *abi.ABI
seqInboxAddr common.Address
bridgeAddr common.Address
gasRefunderAddr common.Address
building *buildingBatch
daWriter das.DataAvailabilityServiceWriter
dataPoster *dataposter.DataPoster
redisLock *redislock.Simple
messagesPerBatch *arbmath.MovingAverage[uint64]
// This is an atomic variable that should only be accessed atomically.
// An estimate of the number of batches we want to post but haven't yet.
// This doesn't include batches which we don't want to post yet due to the L1 bounds.
Expand Down Expand Up @@ -136,7 +138,7 @@ type BatchPosterConfig struct {
RedisLock redislock.SimpleCfg `koanf:"redis-lock" reload:"hot"`
ExtraBatchGas uint64 `koanf:"extra-batch-gas" reload:"hot"`
Post4844Blobs bool `koanf:"post-4844-blobs" reload:"hot"`
ForcePost4844Blobs bool `koanf:"force-post-4844-blobs" reload:"hot"`
IgnoreBlobPrice bool `koanf:"ignore-blob-price" reload:"hot"`
ParentChainWallet genericconf.WalletConfig `koanf:"parent-chain-wallet"`
L1BlockBound string `koanf:"l1-block-bound" reload:"hot"`
L1BlockBoundBypass time.Duration `koanf:"l1-block-bound-bypass" reload:"hot"`
Expand Down Expand Up @@ -186,7 +188,7 @@ func BatchPosterConfigAddOptions(prefix string, f *pflag.FlagSet) {
f.String(prefix+".gas-refunder-address", DefaultBatchPosterConfig.GasRefunderAddress, "The gas refunder contract address (optional)")
f.Uint64(prefix+".extra-batch-gas", DefaultBatchPosterConfig.ExtraBatchGas, "use this much more gas than estimation says is necessary to post batches")
f.Bool(prefix+".post-4844-blobs", DefaultBatchPosterConfig.Post4844Blobs, "if the parent chain supports 4844 blobs and they're well priced, post EIP-4844 blobs")
f.Bool(prefix+".force-post-4844-blobs", DefaultBatchPosterConfig.ForcePost4844Blobs, "if the parent chain supports 4844 blobs and post-4844-blobs is true, post 4844 blobs even if it's not price efficient")
f.Bool(prefix+".ignore-blob-price", DefaultBatchPosterConfig.IgnoreBlobPrice, "if the parent chain supports 4844 blobs and ignore-blob-price is true, post 4844 blobs even if it's not price efficient")
f.String(prefix+".redis-url", DefaultBatchPosterConfig.RedisUrl, "if non-empty, the Redis URL to store queued transactions in")
f.String(prefix+".l1-block-bound", DefaultBatchPosterConfig.L1BlockBound, "only post messages to batches when they're within the max future block/timestamp as of this L1 block tag (\"safe\", \"finalized\", \"latest\", or \"ignore\" to ignore this check)")
f.Duration(prefix+".l1-block-bound-bypass", DefaultBatchPosterConfig.L1BlockBoundBypass, "post batches even if not within the layer 1 future bounds if we're within this margin of the max delay")
Expand All @@ -212,7 +214,7 @@ var DefaultBatchPosterConfig = BatchPosterConfig{
GasRefunderAddress: "",
ExtraBatchGas: 50_000,
Post4844Blobs: false,
ForcePost4844Blobs: false,
IgnoreBlobPrice: false,
DataPoster: dataposter.DefaultDataPosterConfig,
ParentChainWallet: DefaultBatchPosterL1WalletConfig,
L1BlockBound: "",
Expand Down Expand Up @@ -242,7 +244,7 @@ var TestBatchPosterConfig = BatchPosterConfig{
GasRefunderAddress: "",
ExtraBatchGas: 10_000,
Post4844Blobs: true,
ForcePost4844Blobs: false,
IgnoreBlobPrice: false,
DataPoster: dataposter.TestDataPosterConfig,
ParentChainWallet: DefaultBatchPosterL1WalletConfig,
L1BlockBound: "",
Expand All @@ -255,6 +257,7 @@ type BatchPosterOpts struct {
L1Reader *headerreader.HeaderReader
Inbox *InboxTracker
Streamer *TransactionStreamer
VersionGetter execution.FullExecutionClient
SyncMonitor *SyncMonitor
Config BatchPosterConfigFetcher
DeployInfo *chaininfo.RollupAddresses
Expand Down Expand Up @@ -293,19 +296,20 @@ func NewBatchPoster(ctx context.Context, opts *BatchPosterOpts) (*BatchPoster, e
return nil, err
}
b := &BatchPoster{
l1Reader: opts.L1Reader,
inbox: opts.Inbox,
streamer: opts.Streamer,
syncMonitor: opts.SyncMonitor,
config: opts.Config,
bridge: bridge,
seqInbox: seqInbox,
seqInboxABI: seqInboxABI,
seqInboxAddr: opts.DeployInfo.SequencerInbox,
gasRefunderAddr: opts.Config().gasRefunder,
bridgeAddr: opts.DeployInfo.Bridge,
daWriter: opts.DAWriter,
redisLock: redisLock,
l1Reader: opts.L1Reader,
inbox: opts.Inbox,
streamer: opts.Streamer,
arbOSVersionGetter: opts.VersionGetter,
syncMonitor: opts.SyncMonitor,
config: opts.Config,
bridge: bridge,
seqInbox: seqInbox,
seqInboxABI: seqInboxABI,
seqInboxAddr: opts.DeployInfo.SequencerInbox,
gasRefunderAddr: opts.Config().gasRefunder,
bridgeAddr: opts.DeployInfo.Bridge,
daWriter: opts.DAWriter,
redisLock: redisLock,
}
b.messagesPerBatch, err = arbmath.NewMovingAverage[uint64](20)
if err != nil {
Expand Down Expand Up @@ -947,7 +951,6 @@ func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error)
if dbBatchCount > batchPosition.NextSeqNum {
return false, fmt.Errorf("attempting to post batch %v, but the local inbox tracker database already has %v batches", batchPosition.NextSeqNum, dbBatchCount)
}

if b.building == nil || b.building.startMsgCount != batchPosition.MessageCount {
latestHeader, err := b.l1Reader.LastHeader(ctx)
if err != nil {
Expand All @@ -956,17 +959,24 @@ func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error)
var use4844 bool
config := b.config()
if config.Post4844Blobs && latestHeader.ExcessBlobGas != nil && latestHeader.BlobGasUsed != nil {
if config.ForcePost4844Blobs {
use4844 = true
} else {
blobFeePerByte := eip4844.CalcBlobFee(eip4844.CalcExcessBlobGas(*latestHeader.ExcessBlobGas, *latestHeader.BlobGasUsed))
blobFeePerByte.Mul(blobFeePerByte, blobTxBlobGasPerBlob)
blobFeePerByte.Div(blobFeePerByte, usableBytesInBlob)

calldataFeePerByte := arbmath.BigMulByUint(latestHeader.BaseFee, 16)
use4844 = arbmath.BigLessThan(blobFeePerByte, calldataFeePerByte)
arbOSVersion, err := b.arbOSVersionGetter.ArbOSVersionForMessageNumber(arbutil.MessageIndex(arbmath.SaturatingUSub(uint64(batchPosition.MessageCount), 1)))
if err != nil {
return false, err
}
if arbOSVersion >= 20 {
if config.IgnoreBlobPrice {
use4844 = true
} else {
blobFeePerByte := eip4844.CalcBlobFee(eip4844.CalcExcessBlobGas(*latestHeader.ExcessBlobGas, *latestHeader.BlobGasUsed))
blobFeePerByte.Mul(blobFeePerByte, blobTxBlobGasPerBlob)
blobFeePerByte.Div(blobFeePerByte, usableBytesInBlob)

calldataFeePerByte := arbmath.BigMulByUint(latestHeader.BaseFee, 16)
use4844 = arbmath.BigLessThan(blobFeePerByte, calldataFeePerByte)
}
}
}

b.building = &buildingBatch{
segments: newBatchSegments(batchPosition.DelayedMessageCount, b.config(), b.GetBacklogEstimate(), use4844),
msgCount: batchPosition.MessageCount,
Expand Down
4 changes: 2 additions & 2 deletions arbnode/delayed.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,10 +221,10 @@ func (b *DelayedBridge) logsToDeliveredMessages(ctx context.Context, logs []type
msgKey := common.BigToHash(parsedLog.MessageIndex)
data, ok := messageData[msgKey]
if !ok {
return nil, errors.New("message not found")
return nil, fmt.Errorf("message %v data not found", parsedLog.MessageIndex)
}
if crypto.Keccak256Hash(data) != parsedLog.MessageDataHash {
return nil, errors.New("found message data with mismatched hash")
return nil, fmt.Errorf("found message %v data with mismatched hash", parsedLog.MessageIndex)
}

requestId := common.BigToHash(parsedLog.MessageIndex)
Expand Down
100 changes: 71 additions & 29 deletions arbnode/inbox_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type InboxReaderConfig struct {
DefaultBlocksToRead uint64 `koanf:"default-blocks-to-read" reload:"hot"`
TargetMessagesRead uint64 `koanf:"target-messages-read" reload:"hot"`
MaxBlocksToRead uint64 `koanf:"max-blocks-to-read" reload:"hot"`
ReadMode string `koanf:"read-mode" reload:"hot"`
}

type InboxReaderConfigFetcher func() *InboxReaderConfig
Expand All @@ -40,6 +41,10 @@ func (c *InboxReaderConfig) Validate() error {
if c.MaxBlocksToRead == 0 || c.MaxBlocksToRead < c.DefaultBlocksToRead {
return errors.New("inbox reader max-blocks-to-read cannot be zero or less than default-blocks-to-read")
}
c.ReadMode = strings.ToLower(c.ReadMode)
if c.ReadMode != "latest" && c.ReadMode != "safe" && c.ReadMode != "finalized" {
return fmt.Errorf("inbox reader read-mode is invalid, want: latest or safe or finalized, got: %s", c.ReadMode)
}
return nil
}

Expand All @@ -51,6 +56,7 @@ func InboxReaderConfigAddOptions(prefix string, f *flag.FlagSet) {
f.Uint64(prefix+".default-blocks-to-read", DefaultInboxReaderConfig.DefaultBlocksToRead, "the default number of blocks to read at once (will vary based on traffic by default)")
f.Uint64(prefix+".target-messages-read", DefaultInboxReaderConfig.TargetMessagesRead, "if adjust-blocks-to-read is enabled, the target number of messages to read at once")
f.Uint64(prefix+".max-blocks-to-read", DefaultInboxReaderConfig.MaxBlocksToRead, "if adjust-blocks-to-read is enabled, the maximum number of blocks to read at once")
f.String(prefix+".read-mode", DefaultInboxReaderConfig.ReadMode, "mode to only read latest or safe or finalized L1 blocks. Enabling safe or finalized disables feed input and output. Defaults to latest. Takes string input, valid strings- latest, safe, finalized")
}

var DefaultInboxReaderConfig = InboxReaderConfig{
Expand All @@ -61,6 +67,7 @@ var DefaultInboxReaderConfig = InboxReaderConfig{
DefaultBlocksToRead: 100,
TargetMessagesRead: 500,
MaxBlocksToRead: 2000,
ReadMode: "latest",
}

var TestInboxReaderConfig = InboxReaderConfig{
Expand All @@ -71,6 +78,7 @@ var TestInboxReaderConfig = InboxReaderConfig{
DefaultBlocksToRead: 100,
TargetMessagesRead: 500,
MaxBlocksToRead: 2000,
ReadMode: "latest",
}

type InboxReader struct {
Expand Down Expand Up @@ -219,6 +227,7 @@ func (r *InboxReader) CaughtUp() chan struct{} {
}

func (r *InboxReader) run(ctx context.Context, hadError bool) error {
readMode := r.config().ReadMode
from, err := r.getNextBlockToRead()
if err != nil {
return err
Expand All @@ -239,38 +248,71 @@ func (r *InboxReader) run(ctx context.Context, hadError bool) error {
}
defer storeSeenBatchCount() // in case of error
for {

latestHeader, err := r.l1Reader.LastHeader(ctx)
if err != nil {
return err
}
config := r.config()
currentHeight := latestHeader.Number

neededBlockAdvance := config.DelayBlocks + arbmath.SaturatingUSub(config.MinBlocksToRead, 1)
neededBlockHeight := arbmath.BigAddByUint(from, neededBlockAdvance)
checkDelayTimer := time.NewTimer(config.CheckDelay)
WaitForHeight:
for arbmath.BigLessThan(currentHeight, neededBlockHeight) {
select {
case latestHeader = <-newHeaders:
if latestHeader == nil {
// shutting down
currentHeight := big.NewInt(0)
if readMode != "latest" {
var blockNum uint64
fetchLatestSafeOrFinalized := func() {
if readMode == "safe" {
blockNum, err = r.l1Reader.LatestSafeBlockNr(ctx)
} else {
blockNum, err = r.l1Reader.LatestFinalizedBlockNr(ctx)
}
}
fetchLatestSafeOrFinalized()
if err != nil || blockNum == 0 {
return fmt.Errorf("inboxreader running in read only %s mode and unable to fetch latest %s block. err: %w", readMode, readMode, err)
}
currentHeight.SetUint64(blockNum)
// latest block in our db is newer than the latest safe/finalized block hence reset 'from' to match the last safe/finalized block number
if from.Uint64() > currentHeight.Uint64()+1 {
from.Set(currentHeight)
}
for currentHeight.Cmp(from) <= 0 {
select {
case <-newHeaders:
fetchLatestSafeOrFinalized()
if err != nil || blockNum == 0 {
return fmt.Errorf("inboxreader waiting for recent %s block and unable to fetch its block number. err: %w", readMode, err)
}
currentHeight.SetUint64(blockNum)
case <-ctx.Done():
return nil
}
currentHeight = new(big.Int).Set(latestHeader.Number)
case <-ctx.Done():
return nil
case <-checkDelayTimer.C:
break WaitForHeight
}
}
checkDelayTimer.Stop()
} else {

if config.DelayBlocks > 0 {
currentHeight = new(big.Int).Sub(currentHeight, new(big.Int).SetUint64(config.DelayBlocks))
if currentHeight.Cmp(r.firstMessageBlock) < 0 {
currentHeight = new(big.Int).Set(r.firstMessageBlock)
latestHeader, err := r.l1Reader.LastHeader(ctx)
if err != nil {
return err
}
currentHeight = latestHeader.Number

neededBlockAdvance := config.DelayBlocks + arbmath.SaturatingUSub(config.MinBlocksToRead, 1)
neededBlockHeight := arbmath.BigAddByUint(from, neededBlockAdvance)
checkDelayTimer := time.NewTimer(config.CheckDelay)
WaitForHeight:
for arbmath.BigLessThan(currentHeight, neededBlockHeight) {
select {
case latestHeader = <-newHeaders:
if latestHeader == nil {
// shutting down
return nil
}
currentHeight = new(big.Int).Set(latestHeader.Number)
case <-ctx.Done():
return nil
case <-checkDelayTimer.C:
break WaitForHeight
}
}
checkDelayTimer.Stop()

if config.DelayBlocks > 0 {
currentHeight = new(big.Int).Sub(currentHeight, new(big.Int).SetUint64(config.DelayBlocks))
if currentHeight.Cmp(r.firstMessageBlock) < 0 {
currentHeight = new(big.Int).Set(r.firstMessageBlock)
}
}
}

Expand Down Expand Up @@ -359,7 +401,7 @@ func (r *InboxReader) run(ctx context.Context, hadError bool) error {
r.lastReadBatchCount = checkingBatchCount
r.lastReadMutex.Unlock()
storeSeenBatchCount()
if !r.caughtUp {
if !r.caughtUp && readMode == "latest" {
r.caughtUp = true
close(r.caughtUpChan)
}
Expand Down Expand Up @@ -408,7 +450,7 @@ func (r *InboxReader) run(ctx context.Context, hadError bool) error {
if err != nil {
return err
}
if !r.caughtUp && to.Cmp(currentHeight) == 0 {
if !r.caughtUp && to.Cmp(currentHeight) == 0 && readMode == "latest" {
r.caughtUp = true
close(r.caughtUpChan)
}
Expand Down
Loading

0 comments on commit 3295032

Please sign in to comment.