diff --git a/arbnode/inbox_reader.go b/arbnode/inbox_reader.go index c7af96c86a..72881b52fd 100644 --- a/arbnode/inbox_reader.go +++ b/arbnode/inbox_reader.go @@ -32,6 +32,7 @@ type InboxReaderConfig struct { DefaultBlocksToRead uint64 `koanf:"default-blocks-to-read" reload:"hot"` TargetMessagesRead uint64 `koanf:"target-messages-read" reload:"hot"` MaxBlocksToRead uint64 `koanf:"max-blocks-to-read" reload:"hot"` + ReadMode string `koanf:"read-mode" reload:"hot"` } type InboxReaderConfigFetcher func() *InboxReaderConfig @@ -40,6 +41,10 @@ func (c *InboxReaderConfig) Validate() error { if c.MaxBlocksToRead == 0 || c.MaxBlocksToRead < c.DefaultBlocksToRead { return errors.New("inbox reader max-blocks-to-read cannot be zero or less than default-blocks-to-read") } + c.ReadMode = strings.ToLower(c.ReadMode) + if c.ReadMode != "latest" && c.ReadMode != "safe" && c.ReadMode != "finalized" { + return fmt.Errorf("inbox reader read-mode is invalid, want: latest or safe or finalized, got: %s", c.ReadMode) + } return nil } @@ -51,6 +56,7 @@ func InboxReaderConfigAddOptions(prefix string, f *flag.FlagSet) { f.Uint64(prefix+".default-blocks-to-read", DefaultInboxReaderConfig.DefaultBlocksToRead, "the default number of blocks to read at once (will vary based on traffic by default)") f.Uint64(prefix+".target-messages-read", DefaultInboxReaderConfig.TargetMessagesRead, "if adjust-blocks-to-read is enabled, the target number of messages to read at once") f.Uint64(prefix+".max-blocks-to-read", DefaultInboxReaderConfig.MaxBlocksToRead, "if adjust-blocks-to-read is enabled, the maximum number of blocks to read at once") + f.String(prefix+".read-mode", DefaultInboxReaderConfig.ReadMode, "mode to only read latest or safe or finalized L1 blocks. Enabling safe or finalized disables feed input and output. Defaults to latest. Takes string input, valid strings- latest, safe, finalized") } var DefaultInboxReaderConfig = InboxReaderConfig{ @@ -61,6 +67,7 @@ var DefaultInboxReaderConfig = InboxReaderConfig{ DefaultBlocksToRead: 100, TargetMessagesRead: 500, MaxBlocksToRead: 2000, + ReadMode: "latest", } var TestInboxReaderConfig = InboxReaderConfig{ @@ -71,6 +78,7 @@ var TestInboxReaderConfig = InboxReaderConfig{ DefaultBlocksToRead: 100, TargetMessagesRead: 500, MaxBlocksToRead: 2000, + ReadMode: "latest", } type InboxReader struct { @@ -219,6 +227,7 @@ func (r *InboxReader) CaughtUp() chan struct{} { } func (r *InboxReader) run(ctx context.Context, hadError bool) error { + readMode := r.config().ReadMode from, err := r.getNextBlockToRead() if err != nil { return err @@ -239,38 +248,71 @@ func (r *InboxReader) run(ctx context.Context, hadError bool) error { } defer storeSeenBatchCount() // in case of error for { - - latestHeader, err := r.l1Reader.LastHeader(ctx) - if err != nil { - return err - } config := r.config() - currentHeight := latestHeader.Number - - neededBlockAdvance := config.DelayBlocks + arbmath.SaturatingUSub(config.MinBlocksToRead, 1) - neededBlockHeight := arbmath.BigAddByUint(from, neededBlockAdvance) - checkDelayTimer := time.NewTimer(config.CheckDelay) - WaitForHeight: - for arbmath.BigLessThan(currentHeight, neededBlockHeight) { - select { - case latestHeader = <-newHeaders: - if latestHeader == nil { - // shutting down + currentHeight := big.NewInt(0) + if readMode != "latest" { + var blockNum uint64 + fetchLatestSafeOrFinalized := func() { + if readMode == "safe" { + blockNum, err = r.l1Reader.LatestSafeBlockNr(ctx) + } else { + blockNum, err = r.l1Reader.LatestFinalizedBlockNr(ctx) + } + } + fetchLatestSafeOrFinalized() + if err != nil || blockNum == 0 { + return fmt.Errorf("inboxreader running in read only %s mode and unable to fetch latest %s block. err: %w", readMode, readMode, err) + } + currentHeight.SetUint64(blockNum) + // latest block in our db is newer than the latest safe/finalized block hence reset 'from' to match the last safe/finalized block number + if from.Uint64() > currentHeight.Uint64()+1 { + from.Set(currentHeight) + } + for currentHeight.Cmp(from) <= 0 { + select { + case <-newHeaders: + fetchLatestSafeOrFinalized() + if err != nil || blockNum == 0 { + return fmt.Errorf("inboxreader waiting for recent %s block and unable to fetch its block number. err: %w", readMode, err) + } + currentHeight.SetUint64(blockNum) + case <-ctx.Done(): return nil } - currentHeight = new(big.Int).Set(latestHeader.Number) - case <-ctx.Done(): - return nil - case <-checkDelayTimer.C: - break WaitForHeight } - } - checkDelayTimer.Stop() + } else { - if config.DelayBlocks > 0 { - currentHeight = new(big.Int).Sub(currentHeight, new(big.Int).SetUint64(config.DelayBlocks)) - if currentHeight.Cmp(r.firstMessageBlock) < 0 { - currentHeight = new(big.Int).Set(r.firstMessageBlock) + latestHeader, err := r.l1Reader.LastHeader(ctx) + if err != nil { + return err + } + currentHeight = latestHeader.Number + + neededBlockAdvance := config.DelayBlocks + arbmath.SaturatingUSub(config.MinBlocksToRead, 1) + neededBlockHeight := arbmath.BigAddByUint(from, neededBlockAdvance) + checkDelayTimer := time.NewTimer(config.CheckDelay) + WaitForHeight: + for arbmath.BigLessThan(currentHeight, neededBlockHeight) { + select { + case latestHeader = <-newHeaders: + if latestHeader == nil { + // shutting down + return nil + } + currentHeight = new(big.Int).Set(latestHeader.Number) + case <-ctx.Done(): + return nil + case <-checkDelayTimer.C: + break WaitForHeight + } + } + checkDelayTimer.Stop() + + if config.DelayBlocks > 0 { + currentHeight = new(big.Int).Sub(currentHeight, new(big.Int).SetUint64(config.DelayBlocks)) + if currentHeight.Cmp(r.firstMessageBlock) < 0 { + currentHeight = new(big.Int).Set(r.firstMessageBlock) + } } } @@ -359,7 +401,7 @@ func (r *InboxReader) run(ctx context.Context, hadError bool) error { r.lastReadBatchCount = checkingBatchCount r.lastReadMutex.Unlock() storeSeenBatchCount() - if !r.caughtUp { + if !r.caughtUp && readMode == "latest" { r.caughtUp = true close(r.caughtUpChan) } @@ -408,7 +450,7 @@ func (r *InboxReader) run(ctx context.Context, hadError bool) error { if err != nil { return err } - if !r.caughtUp && to.Cmp(currentHeight) == 0 { + if !r.caughtUp && to.Cmp(currentHeight) == 0 && readMode == "latest" { r.caughtUp = true close(r.caughtUpChan) } diff --git a/arbnode/node.go b/arbnode/node.go index de9745f2a8..31c13133d8 100644 --- a/arbnode/node.go +++ b/arbnode/node.go @@ -101,6 +101,13 @@ func (c *Config) Validate() error { if c.DelayedSequencer.Enable && !c.Sequencer { return errors.New("cannot enable delayed sequencer without enabling sequencer") } + if c.InboxReader.ReadMode != "latest" { + if c.Sequencer { + return errors.New("cannot enable inboxreader in safe or finalized mode along with sequencer") + } + c.Feed.Output.Enable = false + c.Feed.Input.URL = []string{} + } if err := c.BlockValidator.Validate(); err != nil { return err }