Skip to content

Commit

Permalink
Merge branch 'master' into support-multiple-validation-rpcs
Browse files Browse the repository at this point in the history
  • Loading branch information
ganeshvanahalli authored Feb 14, 2024
2 parents 7ceab18 + b8d35e7 commit 1e1f96c
Show file tree
Hide file tree
Showing 19 changed files with 586 additions and 111 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/release-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,6 @@ jobs:
path: /tmp/.buildx-cache
key: ${{ runner.os }}-buildx-${{ hashFiles('Dockerfile') }}
restore-keys: ${{ runner.os }}-buildx-

- name: Startup Nitro testnode
run: ./scripts/startup-testnode.bash
4 changes: 2 additions & 2 deletions arbnode/delayed.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,10 +221,10 @@ func (b *DelayedBridge) logsToDeliveredMessages(ctx context.Context, logs []type
msgKey := common.BigToHash(parsedLog.MessageIndex)
data, ok := messageData[msgKey]
if !ok {
return nil, errors.New("message not found")
return nil, fmt.Errorf("message %v data not found", parsedLog.MessageIndex)
}
if crypto.Keccak256Hash(data) != parsedLog.MessageDataHash {
return nil, errors.New("found message data with mismatched hash")
return nil, fmt.Errorf("found message %v data with mismatched hash", parsedLog.MessageIndex)
}

requestId := common.BigToHash(parsedLog.MessageIndex)
Expand Down
100 changes: 71 additions & 29 deletions arbnode/inbox_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type InboxReaderConfig struct {
DefaultBlocksToRead uint64 `koanf:"default-blocks-to-read" reload:"hot"`
TargetMessagesRead uint64 `koanf:"target-messages-read" reload:"hot"`
MaxBlocksToRead uint64 `koanf:"max-blocks-to-read" reload:"hot"`
ReadMode string `koanf:"read-mode" reload:"hot"`
}

type InboxReaderConfigFetcher func() *InboxReaderConfig
Expand All @@ -40,6 +41,10 @@ func (c *InboxReaderConfig) Validate() error {
if c.MaxBlocksToRead == 0 || c.MaxBlocksToRead < c.DefaultBlocksToRead {
return errors.New("inbox reader max-blocks-to-read cannot be zero or less than default-blocks-to-read")
}
c.ReadMode = strings.ToLower(c.ReadMode)
if c.ReadMode != "latest" && c.ReadMode != "safe" && c.ReadMode != "finalized" {
return fmt.Errorf("inbox reader read-mode is invalid, want: latest or safe or finalized, got: %s", c.ReadMode)
}
return nil
}

Expand All @@ -51,6 +56,7 @@ func InboxReaderConfigAddOptions(prefix string, f *flag.FlagSet) {
f.Uint64(prefix+".default-blocks-to-read", DefaultInboxReaderConfig.DefaultBlocksToRead, "the default number of blocks to read at once (will vary based on traffic by default)")
f.Uint64(prefix+".target-messages-read", DefaultInboxReaderConfig.TargetMessagesRead, "if adjust-blocks-to-read is enabled, the target number of messages to read at once")
f.Uint64(prefix+".max-blocks-to-read", DefaultInboxReaderConfig.MaxBlocksToRead, "if adjust-blocks-to-read is enabled, the maximum number of blocks to read at once")
f.String(prefix+".read-mode", DefaultInboxReaderConfig.ReadMode, "mode to only read latest or safe or finalized L1 blocks. Enabling safe or finalized disables feed input and output. Defaults to latest. Takes string input, valid strings- latest, safe, finalized")
}

var DefaultInboxReaderConfig = InboxReaderConfig{
Expand All @@ -61,6 +67,7 @@ var DefaultInboxReaderConfig = InboxReaderConfig{
DefaultBlocksToRead: 100,
TargetMessagesRead: 500,
MaxBlocksToRead: 2000,
ReadMode: "latest",
}

var TestInboxReaderConfig = InboxReaderConfig{
Expand All @@ -71,6 +78,7 @@ var TestInboxReaderConfig = InboxReaderConfig{
DefaultBlocksToRead: 100,
TargetMessagesRead: 500,
MaxBlocksToRead: 2000,
ReadMode: "latest",
}

type InboxReader struct {
Expand Down Expand Up @@ -219,6 +227,7 @@ func (r *InboxReader) CaughtUp() chan struct{} {
}

func (r *InboxReader) run(ctx context.Context, hadError bool) error {
readMode := r.config().ReadMode
from, err := r.getNextBlockToRead()
if err != nil {
return err
Expand All @@ -239,38 +248,71 @@ func (r *InboxReader) run(ctx context.Context, hadError bool) error {
}
defer storeSeenBatchCount() // in case of error
for {

latestHeader, err := r.l1Reader.LastHeader(ctx)
if err != nil {
return err
}
config := r.config()
currentHeight := latestHeader.Number

neededBlockAdvance := config.DelayBlocks + arbmath.SaturatingUSub(config.MinBlocksToRead, 1)
neededBlockHeight := arbmath.BigAddByUint(from, neededBlockAdvance)
checkDelayTimer := time.NewTimer(config.CheckDelay)
WaitForHeight:
for arbmath.BigLessThan(currentHeight, neededBlockHeight) {
select {
case latestHeader = <-newHeaders:
if latestHeader == nil {
// shutting down
currentHeight := big.NewInt(0)
if readMode != "latest" {
var blockNum uint64
fetchLatestSafeOrFinalized := func() {
if readMode == "safe" {
blockNum, err = r.l1Reader.LatestSafeBlockNr(ctx)
} else {
blockNum, err = r.l1Reader.LatestFinalizedBlockNr(ctx)
}
}
fetchLatestSafeOrFinalized()
if err != nil || blockNum == 0 {
return fmt.Errorf("inboxreader running in read only %s mode and unable to fetch latest %s block. err: %w", readMode, readMode, err)
}
currentHeight.SetUint64(blockNum)
// latest block in our db is newer than the latest safe/finalized block hence reset 'from' to match the last safe/finalized block number
if from.Uint64() > currentHeight.Uint64()+1 {
from.Set(currentHeight)
}
for currentHeight.Cmp(from) <= 0 {
select {
case <-newHeaders:
fetchLatestSafeOrFinalized()
if err != nil || blockNum == 0 {
return fmt.Errorf("inboxreader waiting for recent %s block and unable to fetch its block number. err: %w", readMode, err)
}
currentHeight.SetUint64(blockNum)
case <-ctx.Done():
return nil
}
currentHeight = new(big.Int).Set(latestHeader.Number)
case <-ctx.Done():
return nil
case <-checkDelayTimer.C:
break WaitForHeight
}
}
checkDelayTimer.Stop()
} else {

if config.DelayBlocks > 0 {
currentHeight = new(big.Int).Sub(currentHeight, new(big.Int).SetUint64(config.DelayBlocks))
if currentHeight.Cmp(r.firstMessageBlock) < 0 {
currentHeight = new(big.Int).Set(r.firstMessageBlock)
latestHeader, err := r.l1Reader.LastHeader(ctx)
if err != nil {
return err
}
currentHeight = latestHeader.Number

neededBlockAdvance := config.DelayBlocks + arbmath.SaturatingUSub(config.MinBlocksToRead, 1)
neededBlockHeight := arbmath.BigAddByUint(from, neededBlockAdvance)
checkDelayTimer := time.NewTimer(config.CheckDelay)
WaitForHeight:
for arbmath.BigLessThan(currentHeight, neededBlockHeight) {
select {
case latestHeader = <-newHeaders:
if latestHeader == nil {
// shutting down
return nil
}
currentHeight = new(big.Int).Set(latestHeader.Number)
case <-ctx.Done():
return nil
case <-checkDelayTimer.C:
break WaitForHeight
}
}
checkDelayTimer.Stop()

if config.DelayBlocks > 0 {
currentHeight = new(big.Int).Sub(currentHeight, new(big.Int).SetUint64(config.DelayBlocks))
if currentHeight.Cmp(r.firstMessageBlock) < 0 {
currentHeight = new(big.Int).Set(r.firstMessageBlock)
}
}
}

Expand Down Expand Up @@ -359,7 +401,7 @@ func (r *InboxReader) run(ctx context.Context, hadError bool) error {
r.lastReadBatchCount = checkingBatchCount
r.lastReadMutex.Unlock()
storeSeenBatchCount()
if !r.caughtUp {
if !r.caughtUp && readMode == "latest" {
r.caughtUp = true
close(r.caughtUpChan)
}
Expand Down Expand Up @@ -408,7 +450,7 @@ func (r *InboxReader) run(ctx context.Context, hadError bool) error {
if err != nil {
return err
}
if !r.caughtUp && to.Cmp(currentHeight) == 0 {
if !r.caughtUp && to.Cmp(currentHeight) == 0 && readMode == "latest" {
r.caughtUp = true
close(r.caughtUpChan)
}
Expand Down
4 changes: 2 additions & 2 deletions arbnode/inbox_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,11 +374,11 @@ func (t *InboxTracker) AddDelayedMessages(messages []*DelayedInboxMessage, hardR
}

if seqNum != pos {
return errors.New("unexpected delayed sequence number")
return fmt.Errorf("unexpected delayed sequence number %v, expected %v", seqNum, pos)
}

if nextAcc != message.BeforeInboxAcc {
return errors.New("previous delayed accumulator mismatch")
return fmt.Errorf("previous delayed accumulator mismatch for message %v", seqNum)
}
nextAcc = message.AfterInboxAcc()

Expand Down
7 changes: 7 additions & 0 deletions arbnode/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,13 @@ func (c *Config) Validate() error {
if c.DelayedSequencer.Enable && !c.Sequencer {
return errors.New("cannot enable delayed sequencer without enabling sequencer")
}
if c.InboxReader.ReadMode != "latest" {
if c.Sequencer {
return errors.New("cannot enable inboxreader in safe or finalized mode along with sequencer")
}
c.Feed.Output.Enable = false
c.Feed.Input.URL = []string{}
}
if err := c.BlockValidator.Validate(); err != nil {
return err
}
Expand Down
45 changes: 39 additions & 6 deletions arbnode/sync_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,27 @@ func NewSyncMonitor(config *SyncMonitorConfig) *SyncMonitor {
}

type SyncMonitorConfig struct {
BlockBuildLag uint64 `koanf:"block-build-lag"`
BlockBuildSequencerInboxLag uint64 `koanf:"block-build-sequencer-inbox-lag"`
CoordinatorMsgLag uint64 `koanf:"coordinator-msg-lag"`
BlockBuildLag uint64 `koanf:"block-build-lag"`
BlockBuildSequencerInboxLag uint64 `koanf:"block-build-sequencer-inbox-lag"`
CoordinatorMsgLag uint64 `koanf:"coordinator-msg-lag"`
SafeBlockWaitForBlockValidator bool `koanf:"safe-block-wait-for-block-validator"`
FinalizedBlockWaitForBlockValidator bool `koanf:"finalized-block-wait-for-block-validator"`
}

var DefaultSyncMonitorConfig = SyncMonitorConfig{
BlockBuildLag: 20,
BlockBuildSequencerInboxLag: 0,
CoordinatorMsgLag: 15,
BlockBuildLag: 20,
BlockBuildSequencerInboxLag: 0,
CoordinatorMsgLag: 15,
SafeBlockWaitForBlockValidator: false,
FinalizedBlockWaitForBlockValidator: false,
}

func SyncMonitorConfigAddOptions(prefix string, f *flag.FlagSet) {
f.Uint64(prefix+".block-build-lag", DefaultSyncMonitorConfig.BlockBuildLag, "allowed lag between messages read and blocks built")
f.Uint64(prefix+".block-build-sequencer-inbox-lag", DefaultSyncMonitorConfig.BlockBuildSequencerInboxLag, "allowed lag between messages read from sequencer inbox and blocks built")
f.Uint64(prefix+".coordinator-msg-lag", DefaultSyncMonitorConfig.CoordinatorMsgLag, "allowed lag between local and remote messages")
f.Bool(prefix+".safe-block-wait-for-block-validator", DefaultSyncMonitorConfig.SafeBlockWaitForBlockValidator, "wait for block validator to complete before returning safe block number")
f.Bool(prefix+".finalized-block-wait-for-block-validator", DefaultSyncMonitorConfig.FinalizedBlockWaitForBlockValidator, "wait for block validator to complete before returning finalized block number")
}

func (s *SyncMonitor) Initialize(inboxReader *InboxReader, txStreamer *TransactionStreamer, coordinator *SeqCoordinator, exec execution.FullExecutionClient) {
Expand Down Expand Up @@ -153,10 +159,27 @@ func (s *SyncMonitor) SafeBlockNumber(ctx context.Context) (uint64, error) {
if err != nil {
return 0, err
}
// If SafeBlockWaitForBlockValidator is true, we want to wait for the block validator to finish
if s.config.SafeBlockWaitForBlockValidator {
latestValidatedCount, err := s.getLatestValidatedCount()
if err != nil {
return 0, err
}
if msg > latestValidatedCount {
msg = latestValidatedCount
}
}
block := s.exec.MessageIndexToBlockNumber(msg - 1)
return block, nil
}

func (s *SyncMonitor) getLatestValidatedCount() (arbutil.MessageIndex, error) {
if s.txStreamer.validator == nil {
return 0, errors.New("validator not set up")
}
return s.txStreamer.validator.GetValidated(), nil
}

func (s *SyncMonitor) FinalizedBlockNumber(ctx context.Context) (uint64, error) {
if s.inboxReader == nil || !s.initialized {
return 0, errors.New("not set up for safeblock")
Expand All @@ -165,6 +188,16 @@ func (s *SyncMonitor) FinalizedBlockNumber(ctx context.Context) (uint64, error)
if err != nil {
return 0, err
}
// If FinalizedBlockWaitForBlockValidator is true, we want to wait for the block validator to finish
if s.config.FinalizedBlockWaitForBlockValidator {
latestValidatedCount, err := s.getLatestValidatedCount()
if err != nil {
return 0, err
}
if msg > latestValidatedCount {
msg = latestValidatedCount
}
}
block := s.exec.MessageIndexToBlockNumber(msg - 1)
return block, nil
}
Expand Down
Loading

0 comments on commit 1e1f96c

Please sign in to comment.