diff --git a/arbitrator/wasm-libraries/go-stub/src/value.rs b/arbitrator/wasm-libraries/go-stub/src/value.rs index 3a015bbf70..22c1ed6a86 100644 --- a/arbitrator/wasm-libraries/go-stub/src/value.rs +++ b/arbitrator/wasm-libraries/go-stub/src/value.rs @@ -164,9 +164,9 @@ pub unsafe fn get_field(source: u32, field: &[u8]) -> GoValue { } } else if source == GO_ID { if field == b"_pendingEvent" { - if let Some(event) = &PENDING_EVENT { + if let Some(event) = PENDING_EVENT.clone() { let id = DynamicObjectPool::singleton() - .insert(DynamicObject::PendingEvent(event.clone())); + .insert(DynamicObject::PendingEvent(event)); return GoValue::Object(id); } else { return GoValue::Null; diff --git a/arbnode/inbox_reader.go b/arbnode/inbox_reader.go index 72881b52fd..a1f1a1a930 100644 --- a/arbnode/inbox_reader.go +++ b/arbnode/inbox_reader.go @@ -10,7 +10,6 @@ import ( "math" "math/big" "strings" - "sync" "sync/atomic" "time" @@ -99,10 +98,6 @@ type InboxReader struct { // Atomic lastSeenBatchCount uint64 - - // Behind the mutex - lastReadMutex sync.RWMutex - lastReadBlock uint64 lastReadBatchCount uint64 } @@ -396,10 +391,7 @@ func (r *InboxReader) run(ctx context.Context, hadError bool) error { // There's nothing to do from = arbmath.BigAddByUint(currentHeight, 1) blocksToFetch = config.DefaultBlocksToRead - r.lastReadMutex.Lock() - r.lastReadBlock = currentHeight.Uint64() - r.lastReadBatchCount = checkingBatchCount - r.lastReadMutex.Unlock() + atomic.StoreUint64(&r.lastReadBatchCount, checkingBatchCount) storeSeenBatchCount() if !r.caughtUp && readMode == "latest" { r.caughtUp = true @@ -531,10 +523,7 @@ func (r *InboxReader) run(ctx context.Context, hadError bool) error { } if len(sequencerBatches) > 0 { readAnyBatches = true - r.lastReadMutex.Lock() - r.lastReadBlock = to.Uint64() - r.lastReadBatchCount = sequencerBatches[len(sequencerBatches)-1].SequenceNumber + 1 - r.lastReadMutex.Unlock() + atomic.StoreUint64(&r.lastReadBatchCount, sequencerBatches[len(sequencerBatches)-1].SequenceNumber+1) storeSeenBatchCount() } } @@ -561,10 +550,7 @@ func (r *InboxReader) run(ctx context.Context, hadError bool) error { } if !readAnyBatches { - r.lastReadMutex.Lock() - r.lastReadBlock = currentHeight.Uint64() - r.lastReadBatchCount = checkingBatchCount - r.lastReadMutex.Unlock() + atomic.StoreUint64(&r.lastReadBatchCount, checkingBatchCount) storeSeenBatchCount() } } @@ -635,10 +621,8 @@ func (r *InboxReader) GetSequencerMessageBytes(ctx context.Context, seqNum uint6 return nil, common.Hash{}, fmt.Errorf("sequencer batch %v not found in L1 block %v (found batches %v)", seqNum, metadata.ParentChainBlock, seenBatches) } -func (r *InboxReader) GetLastReadBlockAndBatchCount() (uint64, uint64) { - r.lastReadMutex.RLock() - defer r.lastReadMutex.RUnlock() - return r.lastReadBlock, r.lastReadBatchCount +func (r *InboxReader) GetLastReadBatchCount() uint64 { + return atomic.LoadUint64(&r.lastReadBatchCount) } // GetLastSeenBatchCount returns how many sequencer batches the inbox reader has read in from L1. diff --git a/arbnode/inbox_tracker.go b/arbnode/inbox_tracker.go index f98f93a3eb..b758e95e62 100644 --- a/arbnode/inbox_tracker.go +++ b/arbnode/inbox_tracker.go @@ -204,6 +204,11 @@ func (t *InboxTracker) GetBatchMessageCount(seqNum uint64) (arbutil.MessageIndex return metadata.MessageCount, err } +func (t *InboxTracker) GetBatchParentChainBlock(seqNum uint64) (uint64, error) { + metadata, err := t.GetBatchMetadata(seqNum) + return metadata.ParentChainBlock, err +} + // GetBatchAcc is a convenience function wrapping GetBatchMetadata func (t *InboxTracker) GetBatchAcc(seqNum uint64) (common.Hash, error) { metadata, err := t.GetBatchMetadata(seqNum) @@ -223,6 +228,54 @@ func (t *InboxTracker) GetBatchCount() (uint64, error) { return count, nil } +// err will return unexpected/internal errors +// bool will be false if batch not found (meaning, block not yet posted on a batch) +func (t *InboxTracker) FindInboxBatchContainingMessage(pos arbutil.MessageIndex) (uint64, bool, error) { + batchCount, err := t.GetBatchCount() + if err != nil { + return 0, false, err + } + low := uint64(0) + high := batchCount - 1 + lastBatchMessageCount, err := t.GetBatchMessageCount(high) + if err != nil { + return 0, false, err + } + if lastBatchMessageCount <= pos { + return 0, false, nil + } + // Iteration preconditions: + // - high >= low + // - msgCount(low - 1) <= pos implies low <= target + // - msgCount(high) > pos implies high >= target + // Therefore, if low == high, then low == high == target + for { + // Due to integer rounding, mid >= low && mid < high + mid := (low + high) / 2 + count, err := t.GetBatchMessageCount(mid) + if err != nil { + return 0, false, err + } + if count < pos { + // Must narrow as mid >= low, therefore mid + 1 > low, therefore newLow > oldLow + // Keeps low precondition as msgCount(mid) < pos + low = mid + 1 + } else if count == pos { + return mid + 1, true, nil + } else if count == pos+1 || mid == low { // implied: count > pos + return mid, true, nil + } else { + // implied: count > pos + 1 + // Must narrow as mid < high, therefore newHigh < oldHigh + // Keeps high precondition as msgCount(mid) > pos + high = mid + } + if high == low { + return high, true, nil + } + } +} + func (t *InboxTracker) PopulateFeedBacklog(broadcastServer *broadcaster.Broadcaster) error { batchCount, err := t.GetBatchCount() if err != nil { diff --git a/arbnode/node.go b/arbnode/node.go index c19e02dddc..7a7a99ba88 100644 --- a/arbnode/node.go +++ b/arbnode/node.go @@ -26,6 +26,7 @@ import ( "github.com/offchainlabs/nitro/arbnode/dataposter" "github.com/offchainlabs/nitro/arbnode/dataposter/storage" "github.com/offchainlabs/nitro/arbnode/resourcemanager" + "github.com/offchainlabs/nitro/arbos/arbostypes" "github.com/offchainlabs/nitro/arbstate" "github.com/offchainlabs/nitro/arbutil" "github.com/offchainlabs/nitro/broadcastclient" @@ -196,6 +197,7 @@ func ConfigDefaultL1NonSequencerTest() *Config { config.BatchPoster.Enable = false config.SeqCoordinator.Enable = false config.BlockValidator = staker.TestBlockValidatorConfig + config.SyncMonitor = TestSyncMonitorConfig config.Staker = staker.TestL1ValidatorConfig config.Staker.Enable = false config.BlockValidator.ValidationServerConfigs = []rpcclient.ClientConfig{{URL: ""}} @@ -213,6 +215,7 @@ func ConfigDefaultL2Test() *Config { config.SeqCoordinator.Signer.ECDSA.AcceptSequencer = false config.SeqCoordinator.Signer.ECDSA.Dangerous.AcceptMissing = true config.Staker = staker.TestL1ValidatorConfig + config.SyncMonitor = TestSyncMonitorConfig config.Staker.Enable = false config.BlockValidator.ValidationServerConfigs = []rpcclient.ClientConfig{{URL: ""}} config.TransactionStreamer = DefaultTransactionStreamerConfig @@ -265,7 +268,6 @@ type Node struct { SeqCoordinator *SeqCoordinator MaintenanceRunner *MaintenanceRunner DASLifecycleManager *das.LifecycleManager - ClassicOutboxRetriever *ClassicOutboxRetriever SyncMonitor *SyncMonitor configFetcher ConfigFetcher ctx context.Context @@ -381,17 +383,10 @@ func createNodeImpl( l2ChainId := l2Config.ChainID.Uint64() - syncMonitor := NewSyncMonitor(&config.SyncMonitor) - var classicOutbox *ClassicOutboxRetriever - classicMsgDb, err := stack.OpenDatabase("classic-msg", 0, 0, "", true) - if err != nil { - if l2Config.ArbitrumChainParams.GenesisBlockNum > 0 { - log.Warn("Classic Msg Database not found", "err", err) - } - classicOutbox = nil - } else { - classicOutbox = NewClassicOutboxRetriever(classicMsgDb) + syncConfigFetcher := func() *SyncMonitorConfig { + return &configFetcher.Get().SyncMonitor } + syncMonitor := NewSyncMonitor(syncConfigFetcher) var l1Reader *headerreader.HeaderReader if config.ParentChainReader.Enable { @@ -488,7 +483,6 @@ func createNodeImpl( SeqCoordinator: coordinator, MaintenanceRunner: maintenanceRunner, DASLifecycleManager: nil, - ClassicOutboxRetriever: classicOutbox, SyncMonitor: syncMonitor, configFetcher: configFetcher, ctx: ctx, @@ -706,7 +700,6 @@ func createNodeImpl( SeqCoordinator: coordinator, MaintenanceRunner: maintenanceRunner, DASLifecycleManager: dasLifecycleManager, - ClassicOutboxRetriever: classicOutbox, SyncMonitor: syncMonitor, configFetcher: configFetcher, ctx: ctx, @@ -763,22 +756,36 @@ func CreateNode( return currentNode, nil } +func (n *Node) CacheL1PriceDataOfMsg(pos arbutil.MessageIndex, callDataUnits uint64, l1GasCharged uint64) { + n.TxStreamer.CacheL1PriceDataOfMsg(pos, callDataUnits, l1GasCharged) +} + +func (n *Node) BacklogL1GasCharged() uint64 { + return n.TxStreamer.BacklogL1GasCharged() +} +func (n *Node) BacklogCallDataUnits() uint64 { + return n.TxStreamer.BacklogCallDataUnits() +} + func (n *Node) Start(ctx context.Context) error { execClient, ok := n.Execution.(*gethexec.ExecutionNode) if !ok { execClient = nil } if execClient != nil { - err := execClient.Initialize(ctx, n, n.SyncMonitor) + err := execClient.Initialize(ctx) if err != nil { return fmt.Errorf("error initializing exec client: %w", err) } } - n.SyncMonitor.Initialize(n.InboxReader, n.TxStreamer, n.SeqCoordinator, n.Execution) + n.SyncMonitor.Initialize(n.InboxReader, n.TxStreamer, n.SeqCoordinator) err := n.Stack.Start() if err != nil { return fmt.Errorf("error starting geth stack: %w", err) } + if execClient != nil { + execClient.SetConsensusClient(n) + } err = n.Execution.Start(ctx) if err != nil { return fmt.Errorf("error starting exec client: %w", err) @@ -891,6 +898,7 @@ func (n *Node) Start(ctx context.Context) error { if n.configFetcher != nil { n.configFetcher.Start(ctx) } + n.SyncMonitor.Start(ctx) return nil } @@ -944,6 +952,7 @@ func (n *Node) StopAndWait() { // Just stops the redis client (most other stuff was stopped earlier) n.SeqCoordinator.StopAndWait() } + n.SyncMonitor.StopAndWait() if n.DASLifecycleManager != nil { n.DASLifecycleManager.StopAndWaitUntil(2 * time.Second) } @@ -954,3 +963,51 @@ func (n *Node) StopAndWait() { log.Error("error on stack close", "err", err) } } + +func (n *Node) FetchBatch(ctx context.Context, batchNum uint64) ([]byte, common.Hash, error) { + return n.InboxReader.GetSequencerMessageBytes(ctx, batchNum) +} + +func (n *Node) FindInboxBatchContainingMessage(message arbutil.MessageIndex) (uint64, bool, error) { + return n.InboxTracker.FindInboxBatchContainingMessage(message) +} + +func (n *Node) GetBatchParentChainBlock(seqNum uint64) (uint64, error) { + return n.InboxTracker.GetBatchParentChainBlock(seqNum) +} + +func (n *Node) FullSyncProgressMap() map[string]interface{} { + return n.SyncMonitor.FullSyncProgressMap() +} + +func (n *Node) Synced() bool { + return n.SyncMonitor.Synced() +} + +func (n *Node) SyncTargetMessageCount() arbutil.MessageIndex { + return n.SyncMonitor.SyncTargetMessageCount() +} + +// TODO: switch from pulling to pushing safe/finalized +func (n *Node) GetSafeMsgCount(ctx context.Context) (arbutil.MessageIndex, error) { + return n.InboxReader.GetSafeMsgCount(ctx) +} + +func (n *Node) GetFinalizedMsgCount(ctx context.Context) (arbutil.MessageIndex, error) { + return n.InboxReader.GetFinalizedMsgCount(ctx) +} + +func (n *Node) WriteMessageFromSequencer(pos arbutil.MessageIndex, msgWithMeta arbostypes.MessageWithMetadata) error { + return n.TxStreamer.WriteMessageFromSequencer(pos, msgWithMeta) +} + +func (n *Node) ExpectChosenSequencer() error { + return n.TxStreamer.ExpectChosenSequencer() +} + +func (n *Node) ValidatedMessageCount() (arbutil.MessageIndex, error) { + if n.BlockValidator == nil { + return 0, errors.New("validator not set up") + } + return n.BlockValidator.GetValidated(), nil +} diff --git a/arbnode/sync_monitor.go b/arbnode/sync_monitor.go index 99a66abde2..d3b9a7e1c6 100644 --- a/arbnode/sync_monitor.go +++ b/arbnode/sync_monitor.go @@ -2,120 +2,146 @@ package arbnode import ( "context" - "errors" - "sync/atomic" + "sync" + "time" + "github.com/ethereum/go-ethereum/log" "github.com/offchainlabs/nitro/arbutil" - "github.com/offchainlabs/nitro/execution" + "github.com/offchainlabs/nitro/util/stopwaiter" flag "github.com/spf13/pflag" ) type SyncMonitor struct { - config *SyncMonitorConfig + stopwaiter.StopWaiter + config func() *SyncMonitorConfig inboxReader *InboxReader txStreamer *TransactionStreamer coordinator *SeqCoordinator - exec execution.FullExecutionClient initialized bool + + syncTargetLock sync.Mutex + nextSyncTarget arbutil.MessageIndex + syncTarget arbutil.MessageIndex } -func NewSyncMonitor(config *SyncMonitorConfig) *SyncMonitor { +func NewSyncMonitor(config func() *SyncMonitorConfig) *SyncMonitor { return &SyncMonitor{ config: config, } } type SyncMonitorConfig struct { - BlockBuildLag uint64 `koanf:"block-build-lag"` - BlockBuildSequencerInboxLag uint64 `koanf:"block-build-sequencer-inbox-lag"` - CoordinatorMsgLag uint64 `koanf:"coordinator-msg-lag"` - SafeBlockWaitForBlockValidator bool `koanf:"safe-block-wait-for-block-validator"` - FinalizedBlockWaitForBlockValidator bool `koanf:"finalized-block-wait-for-block-validator"` + MsgLag time.Duration `koanf:"msg-lag"` } var DefaultSyncMonitorConfig = SyncMonitorConfig{ - BlockBuildLag: 20, - BlockBuildSequencerInboxLag: 0, - CoordinatorMsgLag: 15, - SafeBlockWaitForBlockValidator: false, - FinalizedBlockWaitForBlockValidator: false, + MsgLag: time.Second, +} + +var TestSyncMonitorConfig = SyncMonitorConfig{ + MsgLag: time.Millisecond * 10, } func SyncMonitorConfigAddOptions(prefix string, f *flag.FlagSet) { - f.Uint64(prefix+".block-build-lag", DefaultSyncMonitorConfig.BlockBuildLag, "allowed lag between messages read and blocks built") - f.Uint64(prefix+".block-build-sequencer-inbox-lag", DefaultSyncMonitorConfig.BlockBuildSequencerInboxLag, "allowed lag between messages read from sequencer inbox and blocks built") - f.Uint64(prefix+".coordinator-msg-lag", DefaultSyncMonitorConfig.CoordinatorMsgLag, "allowed lag between local and remote messages") - f.Bool(prefix+".safe-block-wait-for-block-validator", DefaultSyncMonitorConfig.SafeBlockWaitForBlockValidator, "wait for block validator to complete before returning safe block number") - f.Bool(prefix+".finalized-block-wait-for-block-validator", DefaultSyncMonitorConfig.FinalizedBlockWaitForBlockValidator, "wait for block validator to complete before returning finalized block number") + f.Duration(prefix+".msg-lag", DefaultSyncMonitorConfig.MsgLag, "allowed msg lag while still considered in sync") } -func (s *SyncMonitor) Initialize(inboxReader *InboxReader, txStreamer *TransactionStreamer, coordinator *SeqCoordinator, exec execution.FullExecutionClient) { +func (s *SyncMonitor) Initialize(inboxReader *InboxReader, txStreamer *TransactionStreamer, coordinator *SeqCoordinator) { s.inboxReader = inboxReader s.txStreamer = txStreamer s.coordinator = coordinator - s.exec = exec s.initialized = true } -func (s *SyncMonitor) SyncProgressMap() map[string]interface{} { - syncing := false - res := make(map[string]interface{}) +func (s *SyncMonitor) updateSyncTarget(ctx context.Context) time.Duration { + nextSyncTarget, err := s.maxMessageCount() + if err != nil { + log.Warn("failed readin max msg count", "err", err) + return s.config().MsgLag + } + s.syncTargetLock.Lock() + defer s.syncTargetLock.Unlock() + s.syncTarget = s.nextSyncTarget + s.nextSyncTarget = nextSyncTarget + return s.config().MsgLag +} - if !s.initialized { - res["err"] = "uninitialized" - return res +func (s *SyncMonitor) SyncTargetMessageCount() arbutil.MessageIndex { + s.syncTargetLock.Lock() + defer s.syncTargetLock.Unlock() + return s.syncTarget +} + +func (s *SyncMonitor) maxMessageCount() (arbutil.MessageIndex, error) { + msgCount, err := s.txStreamer.GetMessageCount() + if err != nil { + return 0, err } - broadcasterQueuedMessagesPos := atomic.LoadUint64(&(s.txStreamer.broadcasterQueuedMessagesPos)) + pending := s.txStreamer.FeedPendingMessageCount() + if pending > msgCount { + msgCount = pending + } - if broadcasterQueuedMessagesPos != 0 { // unprocessed feed - syncing = true + if s.inboxReader != nil { + batchProcessed := s.inboxReader.GetLastReadBatchCount() + + if batchProcessed > 0 { + batchMsgCount, err := s.inboxReader.Tracker().GetBatchMessageCount(batchProcessed - 1) + if err != nil { + return msgCount, err + } + if batchMsgCount > msgCount { + msgCount = batchMsgCount + } + } } - res["broadcasterQueuedMessagesPos"] = broadcasterQueuedMessagesPos - builtMessageCount, err := s.exec.HeadMessageNumber() - if err != nil { - res["builtMessageCountError"] = err.Error() - syncing = true - builtMessageCount = 0 - } else { - blockNum := s.exec.MessageIndexToBlockNumber(builtMessageCount) - res["blockNum"] = blockNum - builtMessageCount++ - res["messageOfLastBlock"] = builtMessageCount + if s.coordinator != nil { + coordinatorMessageCount, err := s.coordinator.GetRemoteMsgCount() //NOTE: this creates a remote call + if err != nil { + return msgCount, err + } + if coordinatorMessageCount > msgCount { + msgCount = coordinatorMessageCount + } } + return msgCount, nil +} + +func (s *SyncMonitor) FullSyncProgressMap() map[string]interface{} { + res := make(map[string]interface{}) + + if !s.initialized { + res["err"] = "uninitialized" + return res + } + + syncTarget := s.SyncTargetMessageCount() + res["syncTargetMsgCount"] = syncTarget + msgCount, err := s.txStreamer.GetMessageCount() if err != nil { res["msgCountError"] = err.Error() - syncing = true - } else { - res["msgCount"] = msgCount - if builtMessageCount+arbutil.MessageIndex(s.config.BlockBuildLag) < msgCount { - syncing = true - } + return res } + res["msgCount"] = msgCount + + res["feedPendingMessageCount"] = s.txStreamer.FeedPendingMessageCount() if s.inboxReader != nil { batchSeen := s.inboxReader.GetLastSeenBatchCount() - _, batchProcessed := s.inboxReader.GetLastReadBlockAndBatchCount() - - if (batchSeen == 0) || // error or not yet read inbox - (batchProcessed < batchSeen) { // unprocessed inbox messages - syncing = true - } res["batchSeen"] = batchSeen + + batchProcessed := s.inboxReader.GetLastReadBatchCount() res["batchProcessed"] = batchProcessed - processedMetadata, err := s.inboxReader.Tracker().GetBatchMetadata(batchProcessed - 1) + processedBatchMsgs, err := s.inboxReader.Tracker().GetBatchMessageCount(batchProcessed - 1) if err != nil { res["batchMetadataError"] = err.Error() - syncing = true } else { - res["messageOfProcessedBatch"] = processedMetadata.MessageCount - if builtMessageCount+arbutil.MessageIndex(s.config.BlockBuildSequencerInboxLag) < processedMetadata.MessageCount { - syncing = true - } + res["messageOfProcessedBatch"] = processedBatchMsgs } l1reader := s.inboxReader.l1Reader @@ -135,73 +161,55 @@ func (s *SyncMonitor) SyncProgressMap() map[string]interface{} { coordinatorMessageCount, err := s.coordinator.GetRemoteMsgCount() //NOTE: this creates a remote call if err != nil { res["coordinatorMsgCountError"] = err.Error() - syncing = true } else { res["coordinatorMessageCount"] = coordinatorMessageCount - if msgCount+arbutil.MessageIndex(s.config.CoordinatorMsgLag) < coordinatorMessageCount { - syncing = true - } } } - if !syncing { - return make(map[string]interface{}) - } - return res } -func (s *SyncMonitor) SafeBlockNumber(ctx context.Context) (uint64, error) { - if s.inboxReader == nil || !s.initialized { - return 0, errors.New("not set up for safeblock") - } - msg, err := s.inboxReader.GetSafeMsgCount(ctx) - if err != nil { - return 0, err - } - // If SafeBlockWaitForBlockValidator is true, we want to wait for the block validator to finish - if s.config.SafeBlockWaitForBlockValidator { - latestValidatedCount, err := s.getLatestValidatedCount() - if err != nil { - return 0, err - } - if msg > latestValidatedCount { - msg = latestValidatedCount - } +func (s *SyncMonitor) SyncProgressMap() map[string]interface{} { + if s.Synced() { + return make(map[string]interface{}) } - block := s.exec.MessageIndexToBlockNumber(msg - 1) - return block, nil + + return s.FullSyncProgressMap() } -func (s *SyncMonitor) getLatestValidatedCount() (arbutil.MessageIndex, error) { - if s.txStreamer.validator == nil { - return 0, errors.New("validator not set up") - } - return s.txStreamer.validator.GetValidated(), nil +func (s *SyncMonitor) Start(ctx_in context.Context) { + s.StopWaiter.Start(ctx_in, s) + s.CallIteratively(s.updateSyncTarget) } -func (s *SyncMonitor) FinalizedBlockNumber(ctx context.Context) (uint64, error) { - if s.inboxReader == nil || !s.initialized { - return 0, errors.New("not set up for safeblock") +func (s *SyncMonitor) Synced() bool { + if !s.initialized { + return false } - msg, err := s.inboxReader.GetFinalizedMsgCount(ctx) + if !s.Started() { + return false + } + syncTarget := s.SyncTargetMessageCount() + + msgCount, err := s.txStreamer.GetMessageCount() if err != nil { - return 0, err + return false } - // If FinalizedBlockWaitForBlockValidator is true, we want to wait for the block validator to finish - if s.config.FinalizedBlockWaitForBlockValidator { - latestValidatedCount, err := s.getLatestValidatedCount() - if err != nil { - return 0, err + + if syncTarget > msgCount { + return false + } + + if s.inboxReader != nil { + batchSeen := s.inboxReader.GetLastSeenBatchCount() + if batchSeen == 0 { + return false } - if msg > latestValidatedCount { - msg = latestValidatedCount + batchProcessed := s.inboxReader.GetLastReadBatchCount() + + if batchProcessed < batchSeen { + return false } } - block := s.exec.MessageIndexToBlockNumber(msg - 1) - return block, nil -} - -func (s *SyncMonitor) Synced() bool { - return len(s.SyncProgressMap()) == 0 + return true } diff --git a/arbnode/transaction_streamer.go b/arbnode/transaction_streamer.go index c2ba5aaf6f..7d24005bcd 100644 --- a/arbnode/transaction_streamer.go +++ b/arbnode/transaction_streamer.go @@ -119,7 +119,6 @@ func NewTransactionStreamer( msgToL1PriceData: []L1PriceDataOfMsg{}, }, } - streamer.exec.SetTransactionStreamer(streamer) err := streamer.cleanupInconsistentState() if err != nil { return nil, err @@ -547,6 +546,21 @@ func (s *TransactionStreamer) AddMessages(pos arbutil.MessageIndex, messagesAreC return s.AddMessagesAndEndBatch(pos, messagesAreConfirmed, messages, nil) } +func (s *TransactionStreamer) FeedPendingMessageCount() arbutil.MessageIndex { + pos := atomic.LoadUint64(&s.broadcasterQueuedMessagesPos) + if pos == 0 { + return 0 + } + + s.insertionMutex.Lock() + defer s.insertionMutex.Unlock() + pos = atomic.LoadUint64(&s.broadcasterQueuedMessagesPos) + if pos == 0 { + return 0 + } + return arbutil.MessageIndex(pos + uint64(len(s.broadcasterQueuedMessages))) +} + func (s *TransactionStreamer) AddBroadcastMessages(feedMessages []*m.BroadcastFeedMessage) error { if len(feedMessages) == 0 { return nil @@ -942,10 +956,6 @@ func (s *TransactionStreamer) addMessagesAndEndBatchImpl(messageStartPos arbutil return nil } -func (s *TransactionStreamer) FetchBatch(batchNum uint64) ([]byte, common.Hash, error) { - return s.inboxReader.GetSequencerMessageBytes(context.TODO(), batchNum) -} - // The caller must hold the insertionMutex func (s *TransactionStreamer) ExpectChosenSequencer() error { if s.coordinator != nil { @@ -987,10 +997,6 @@ func (s *TransactionStreamer) WriteMessageFromSequencer(pos arbutil.MessageIndex return nil } -func (s *TransactionStreamer) GenesisBlockNumber() uint64 { - return s.chainConfig.ArbitrumChainParams.GenesisBlockNum -} - // PauseReorgs until a matching call to ResumeReorgs (may be called concurrently) func (s *TransactionStreamer) PauseReorgs() { s.reorgMutex.RLock() diff --git a/cmd/nitro-val/nitro_val.go b/cmd/nitro-val/nitro_val.go index fea95cbb15..3671c7ea8d 100644 --- a/cmd/nitro-val/nitro_val.go +++ b/cmd/nitro-val/nitro_val.go @@ -20,7 +20,7 @@ import ( "github.com/offchainlabs/nitro/cmd/genericconf" "github.com/offchainlabs/nitro/cmd/util/confighelpers" - _ "github.com/offchainlabs/nitro/nodeInterface" + _ "github.com/offchainlabs/nitro/execution/nodeInterface" "github.com/offchainlabs/nitro/validator/valnode" ) diff --git a/cmd/nitro/nitro.go b/cmd/nitro/nitro.go index 65577d067e..997adf9369 100644 --- a/cmd/nitro/nitro.go +++ b/cmd/nitro/nitro.go @@ -51,7 +51,7 @@ import ( "github.com/offchainlabs/nitro/cmd/util" "github.com/offchainlabs/nitro/cmd/util/confighelpers" "github.com/offchainlabs/nitro/execution/gethexec" - _ "github.com/offchainlabs/nitro/nodeInterface" + _ "github.com/offchainlabs/nitro/execution/nodeInterface" "github.com/offchainlabs/nitro/solgen/go/bridgegen" "github.com/offchainlabs/nitro/solgen/go/precompilesgen" "github.com/offchainlabs/nitro/solgen/go/rollupgen" diff --git a/execution/gethexec/arb_interface.go b/execution/gethexec/arb_interface.go index 50d7dfb891..dbf9c24015 100644 --- a/execution/gethexec/arb_interface.go +++ b/execution/gethexec/arb_interface.go @@ -21,30 +21,31 @@ type TransactionPublisher interface { } type ArbInterface struct { - exec *ExecutionEngine + blockchain *core.BlockChain + node *ExecutionNode txPublisher TransactionPublisher - arbNode interface{} } -func NewArbInterface(exec *ExecutionEngine, txPublisher TransactionPublisher) (*ArbInterface, error) { +func NewArbInterface(blockchain *core.BlockChain, txPublisher TransactionPublisher) (*ArbInterface, error) { return &ArbInterface{ - exec: exec, + blockchain: blockchain, txPublisher: txPublisher, }, nil } -func (a *ArbInterface) Initialize(arbnode interface{}) { - a.arbNode = arbnode +func (a *ArbInterface) Initialize(node *ExecutionNode) { + a.node = node } func (a *ArbInterface) PublishTransaction(ctx context.Context, tx *types.Transaction, options *arbitrum_types.ConditionalOptions) error { return a.txPublisher.PublishTransaction(ctx, tx, options) } +// might be used before Initialize func (a *ArbInterface) BlockChain() *core.BlockChain { - return a.exec.bc + return a.blockchain } func (a *ArbInterface) ArbNode() interface{} { - return a.arbNode + return a.node } diff --git a/execution/gethexec/block_recorder.go b/execution/gethexec/block_recorder.go index d7e702f3c1..5e25e592be 100644 --- a/execution/gethexec/block_recorder.go +++ b/execution/gethexec/block_recorder.go @@ -123,7 +123,7 @@ func (r *BlockRecorder) RecordBlockCreation( var readBatchInfo []validator.BatchInfo if msg != nil { batchFetcher := func(batchNum uint64) ([]byte, error) { - data, blockHash, err := r.execEngine.streamer.FetchBatch(batchNum) + data, blockHash, err := r.execEngine.consensus.FetchBatch(ctx, batchNum) if err != nil { return nil, err } diff --git a/arbnode/classicMessage.go b/execution/gethexec/classicMessage.go similarity index 99% rename from arbnode/classicMessage.go rename to execution/gethexec/classicMessage.go index f03ef5bd45..df749b98b4 100644 --- a/arbnode/classicMessage.go +++ b/execution/gethexec/classicMessage.go @@ -1,7 +1,7 @@ // Copyright 2022, Offchain Labs, Inc. // For license information, see https://github.com/nitro/blob/master/LICENSE -package arbnode +package gethexec import ( "encoding/binary" diff --git a/execution/gethexec/executionengine.go b/execution/gethexec/executionengine.go index 02a8a1145a..f9e98b3a6a 100644 --- a/execution/gethexec/executionengine.go +++ b/execution/gethexec/executionengine.go @@ -13,6 +13,7 @@ import ( "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/params" "github.com/offchainlabs/nitro/arbos" "github.com/offchainlabs/nitro/arbos/arbosState" @@ -20,16 +21,24 @@ import ( "github.com/offchainlabs/nitro/arbos/l1pricing" "github.com/offchainlabs/nitro/arbutil" "github.com/offchainlabs/nitro/execution" + "github.com/offchainlabs/nitro/util/arbmath" "github.com/offchainlabs/nitro/util/sharedmetrics" "github.com/offchainlabs/nitro/util/stopwaiter" ) +var ( + baseFeeGauge = metrics.NewRegisteredGauge("arb/block/basefee", nil) + blockGasUsedHistogram = metrics.NewRegisteredHistogram("arb/block/gasused", nil, metrics.NewBoundedHistogramSample()) + txCountHistogram = metrics.NewRegisteredHistogram("arb/block/transactions/count", nil, metrics.NewBoundedHistogramSample()) + txGasUsedHistogram = metrics.NewRegisteredHistogram("arb/block/transactions/gasused", nil, metrics.NewBoundedHistogramSample()) +) + type ExecutionEngine struct { stopwaiter.StopWaiter - bc *core.BlockChain - streamer execution.TransactionStreamer - recorder *BlockRecorder + bc *core.BlockChain + consensus execution.FullConsensusClient + recorder *BlockRecorder resequenceChan chan []*arbostypes.MessageWithMetadata createBlocksMutex sync.Mutex @@ -83,14 +92,18 @@ func (s *ExecutionEngine) EnablePrefetchBlock() { s.prefetchBlock = true } -func (s *ExecutionEngine) SetTransactionStreamer(streamer execution.TransactionStreamer) { +func (s *ExecutionEngine) SetConsensus(consensus execution.FullConsensusClient) { if s.Started() { - panic("trying to set transaction streamer after start") + panic("trying to set transaction consensus after start") } - if s.streamer != nil { - panic("trying to set transaction streamer when already set") + if s.consensus != nil { + panic("trying to set transaction consensus when already set") } - s.streamer = streamer + s.consensus = consensus +} + +func (s *ExecutionEngine) GetBatchFetcher() execution.BatchFetcher { + return s.consensus } func (s *ExecutionEngine) Reorg(count arbutil.MessageIndex, newMessages []arbostypes.MessageWithMetadata, oldMessages []*arbostypes.MessageWithMetadata) error { @@ -266,7 +279,7 @@ func (s *ExecutionEngine) sequencerWrapper(sequencerFunc func() (*types.Block, e } // We got SequencerInsertLockTaken // option 1: there was a race, we are no longer main sequencer - chosenErr := s.streamer.ExpectChosenSequencer() + chosenErr := s.consensus.ExpectChosenSequencer() if chosenErr != nil { return nil, chosenErr } @@ -353,7 +366,7 @@ func (s *ExecutionEngine) sequenceTransactionsWithBlockMutex(header *arbostypes. return nil, err } - err = s.streamer.WriteMessageFromSequencer(pos, msgWithMeta) + err = s.consensus.WriteMessageFromSequencer(pos, msgWithMeta) if err != nil { return nil, err } @@ -399,7 +412,7 @@ func (s *ExecutionEngine) sequenceDelayedMessageWithBlockMutex(message *arbostyp DelayedMessagesRead: delayedSeqNum + 1, } - err = s.streamer.WriteMessageFromSequencer(lastMsg+1, messageWithMeta) + err = s.consensus.WriteMessageFromSequencer(lastMsg+1, messageWithMeta) if err != nil { return nil, err } @@ -460,6 +473,11 @@ func (s *ExecutionEngine) createBlockFromNextMessage(msg *arbostypes.MessageWith statedb.StartPrefetcher("TransactionStreamer") defer statedb.StopPrefetcher() + batchFetcher := func(num uint64) ([]byte, error) { + data, _, err := s.consensus.FetchBatch(s.GetContext(), num) + return data, err + } + block, receipts, err := arbos.ProduceBlock( msg.Message, msg.DelayedMessagesRead, @@ -467,10 +485,7 @@ func (s *ExecutionEngine) createBlockFromNextMessage(msg *arbostypes.MessageWith statedb, s.bc, s.bc.Config(), - func(batchNum uint64) ([]byte, error) { - data, _, err := s.streamer.FetchBatch(batchNum) - return data, err - }, + batchFetcher, ) return block, statedb, receipts, err @@ -489,6 +504,15 @@ func (s *ExecutionEngine) appendBlock(block *types.Block, statedb *state.StateDB if status == core.SideStatTy { return errors.New("geth rejected block as non-canonical") } + baseFeeGauge.Update(block.BaseFee().Int64()) + txCountHistogram.Update(int64(len(block.Transactions()) - 1)) + var blockGasused uint64 + for i := 1; i < len(receipts); i++ { + val := arbmath.SaturatingUSub(receipts[i].GasUsed, receipts[i].GasUsedForL1) + txGasUsedHistogram.Update(int64(val)) + blockGasused += val + } + blockGasUsedHistogram.Update(int64(blockGasused)) return nil } @@ -553,7 +577,7 @@ func (s *ExecutionEngine) cacheL1PriceDataOfMsg(num arbutil.MessageIndex, receip for _, tx := range block.Transactions() { callDataUnits += tx.CalldataUnits } - s.streamer.CacheL1PriceDataOfMsg(num, callDataUnits, gasChargedForL1) + s.consensus.CacheL1PriceDataOfMsg(num, callDataUnits, gasChargedForL1) } // DigestMessage is used to create a block by executing msg against the latest state and storing it. diff --git a/execution/gethexec/node.go b/execution/gethexec/node.go index ca4fb19c6d..88c1410031 100644 --- a/execution/gethexec/node.go +++ b/execution/gethexec/node.go @@ -51,6 +51,7 @@ type Config struct { TxLookupLimit uint64 `koanf:"tx-lookup-limit"` Dangerous DangerousConfig `koanf:"dangerous"` EnablePrefetchBlock bool `koanf:"enable-prefetch-block"` + SyncMonitor SyncMonitorConfig `koanf:"sync-monitor"` forwardingTarget string } @@ -83,6 +84,7 @@ func ConfigAddOptions(prefix string, f *flag.FlagSet) { AddOptionsForNodeForwarderConfig(prefix+".forwarder", f) TxPreCheckerConfigAddOptions(prefix+".tx-pre-checker", f) CachingConfigAddOptions(prefix+".caching", f) + SyncMonitorConfigAddOptions(prefix+".sync-monitor", f) f.Uint64(prefix+".tx-lookup-limit", ConfigDefault.TxLookupLimit, "retain the ability to lookup transactions by hash for the past N blocks (0 = all blocks)") DangerousConfigAddOptions(prefix+".dangerous", f) f.Bool(prefix+".enable-prefetch-block", ConfigDefault.EnablePrefetchBlock, "enable prefetching of blocks") @@ -118,8 +120,8 @@ func ConfigDefaultNonSequencerTest() *Config { func ConfigDefaultTest() *Config { config := ConfigDefault config.Sequencer = TestSequencerConfig - config.ForwardingTarget = "null" config.ParentChainReader = headerreader.TestConfig + config.ForwardingTarget = "null" _ = config.Validate() @@ -138,7 +140,9 @@ type ExecutionNode struct { Sequencer *Sequencer // either nil or same as TxPublisher TxPublisher TransactionPublisher ConfigFetcher ConfigFetcher + SyncMonitor *SyncMonitor ParentChainReader *headerreader.HeaderReader + ClassicOutbox *ClassicOutboxRetriever started atomic.Bool } @@ -169,6 +173,8 @@ func CreateExecutionNode( if err != nil { return nil, err } + } else if config.Sequencer.Enable { + log.Warn("sequencer enabled without l1 client") } if config.Sequencer.Enable { @@ -192,7 +198,7 @@ func CreateExecutionNode( txprecheckConfigFetcher := func() *TxPreCheckerConfig { return &configFetcher().TxPreChecker } txPublisher = NewTxPreChecker(txPublisher, l2BlockChain, txprecheckConfigFetcher) - arbInterface, err := NewArbInterface(execEngine, txPublisher) + arbInterface, err := NewArbInterface(l2BlockChain, txPublisher) if err != nil { return nil, err } @@ -205,6 +211,20 @@ func CreateExecutionNode( return nil, err } + syncMon := NewSyncMonitor(&config.SyncMonitor, execEngine) + + var classicOutbox *ClassicOutboxRetriever + + if l2BlockChain.Config().ArbitrumChainParams.GenesisBlockNum > 0 { + classicMsgDb, err := stack.OpenDatabase("classic-msg", 0, 0, "", true) + if err != nil { + log.Warn("Classic Msg Database not found", "err", err) + classicOutbox = nil + } else { + classicOutbox = NewClassicOutboxRetriever(classicMsgDb) + } + } + apis := []rpc.API{{ Namespace: "arb", Version: "1.0", @@ -248,7 +268,9 @@ func CreateExecutionNode( Sequencer: sequencer, TxPublisher: txPublisher, ConfigFetcher: configFetcher, + SyncMonitor: syncMon, ParentChainReader: parentChainReader, + ClassicOutbox: classicOutbox, }, nil } @@ -257,8 +279,8 @@ func (n *ExecutionNode) GetL1GasPriceEstimate() (uint64, error) { return n.ExecEngine.GetL1GasPriceEstimate() } -func (n *ExecutionNode) Initialize(ctx context.Context, arbnode interface{}, sync arbitrum.SyncProgressBackend) error { - n.ArbInterface.Initialize(arbnode) +func (n *ExecutionNode) Initialize(ctx context.Context) error { + n.ArbInterface.Initialize(n) err := n.Backend.Start() if err != nil { return fmt.Errorf("error starting geth backend: %w", err) @@ -267,7 +289,7 @@ func (n *ExecutionNode) Initialize(ctx context.Context, arbnode interface{}, syn if err != nil { return fmt.Errorf("error initializing transaction publisher: %w", err) } - err = n.Backend.APIBackend().SetSyncBackend(sync) + err = n.Backend.APIBackend().SetSyncBackend(n.SyncMonitor) if err != nil { return fmt.Errorf("error setting sync backend: %w", err) } @@ -365,11 +387,13 @@ func (n *ExecutionNode) Pause() { n.Sequencer.Pause() } } + func (n *ExecutionNode) Activate() { if n.Sequencer != nil { n.Sequencer.Activate() } } + func (n *ExecutionNode) ForwardTo(url string) error { if n.Sequencer != nil { return n.Sequencer.ForwardTo(url) @@ -377,9 +401,12 @@ func (n *ExecutionNode) ForwardTo(url string) error { return errors.New("forwardTo not supported - sequencer not active") } } -func (n *ExecutionNode) SetTransactionStreamer(streamer execution.TransactionStreamer) { - n.ExecEngine.SetTransactionStreamer(streamer) + +func (n *ExecutionNode) SetConsensusClient(consensus execution.FullConsensusClient) { + n.ExecEngine.SetConsensus(consensus) + n.SyncMonitor.SetConsensusInfo(consensus) } + func (n *ExecutionNode) MessageIndexToBlockNumber(messageNum arbutil.MessageIndex) uint64 { return n.ExecEngine.MessageIndexToBlockNumber(messageNum) } diff --git a/execution/gethexec/sequencer.go b/execution/gethexec/sequencer.go index cf5ec7f68c..63461cd33f 100644 --- a/execution/gethexec/sequencer.go +++ b/execution/gethexec/sequencer.go @@ -527,7 +527,7 @@ func (s *Sequencer) CheckHealth(ctx context.Context) error { if pauseChan != nil { return nil } - return s.execEngine.streamer.ExpectChosenSequencer() + return s.execEngine.consensus.ExpectChosenSequencer() } func (s *Sequencer) ForwardTarget() string { @@ -1015,8 +1015,8 @@ func (s *Sequencer) updateExpectedSurplus(ctx context.Context) (int64, error) { if err != nil { return 0, fmt.Errorf("error encountered getting l1 pricing surplus while updating expectedSurplus: %w", err) } - backlogL1GasCharged := int64(s.execEngine.streamer.BacklogL1GasCharged()) - backlogCallDataUnits := int64(s.execEngine.streamer.BacklogCallDataUnits()) + backlogL1GasCharged := int64(s.execEngine.consensus.BacklogL1GasCharged()) + backlogCallDataUnits := int64(s.execEngine.consensus.BacklogCallDataUnits()) expectedSurplus := int64(surplus) + backlogL1GasCharged - backlogCallDataUnits*int64(l1GasPrice) // update metrics l1GasPriceGauge.Update(int64(l1GasPrice)) diff --git a/execution/gethexec/sync_monitor.go b/execution/gethexec/sync_monitor.go new file mode 100644 index 0000000000..84f45083e9 --- /dev/null +++ b/execution/gethexec/sync_monitor.go @@ -0,0 +1,113 @@ +package gethexec + +import ( + "context" + + "github.com/offchainlabs/nitro/execution" + "github.com/pkg/errors" + flag "github.com/spf13/pflag" +) + +type SyncMonitorConfig struct { + SafeBlockWaitForBlockValidator bool `koanf:"safe-block-wait-for-block-validator"` + FinalizedBlockWaitForBlockValidator bool `koanf:"finalized-block-wait-for-block-validator"` +} + +var DefaultSyncMonitorConfig = SyncMonitorConfig{ + SafeBlockWaitForBlockValidator: false, + FinalizedBlockWaitForBlockValidator: false, +} + +func SyncMonitorConfigAddOptions(prefix string, f *flag.FlagSet) { + f.Bool(prefix+".safe-block-wait-for-block-validator", DefaultSyncMonitorConfig.SafeBlockWaitForBlockValidator, "wait for block validator to complete before returning safe block number") + f.Bool(prefix+".finalized-block-wait-for-block-validator", DefaultSyncMonitorConfig.FinalizedBlockWaitForBlockValidator, "wait for block validator to complete before returning finalized block number") +} + +type SyncMonitor struct { + config *SyncMonitorConfig + consensus execution.ConsensusInfo + exec *ExecutionEngine +} + +func NewSyncMonitor(config *SyncMonitorConfig, exec *ExecutionEngine) *SyncMonitor { + return &SyncMonitor{ + config: config, + exec: exec, + } +} + +func (s *SyncMonitor) FullSyncProgressMap() map[string]interface{} { + res := s.consensus.FullSyncProgressMap() + consensusSyncTarget := s.consensus.SyncTargetMessageCount() + + built, err := s.exec.HeadMessageNumber() + if err != nil { + res["headMsgNumberError"] = err + } + + res["builtBlock"] = built + res["consensusSyncTarget"] = consensusSyncTarget + + return res +} + +func (s *SyncMonitor) SyncProgressMap() map[string]interface{} { + if s.consensus.Synced() { + built, err := s.exec.HeadMessageNumber() + consensusSyncTarget := s.consensus.SyncTargetMessageCount() + if err != nil && built+1 >= consensusSyncTarget { + return make(map[string]interface{}) + } + } + return s.FullSyncProgressMap() +} + +func (s *SyncMonitor) SafeBlockNumber(ctx context.Context) (uint64, error) { + if s.consensus == nil { + return 0, errors.New("not set up for safeblock") + } + msg, err := s.consensus.GetSafeMsgCount(ctx) + if err != nil { + return 0, err + } + if s.config.SafeBlockWaitForBlockValidator { + latestValidatedCount, err := s.consensus.ValidatedMessageCount() + if err != nil { + return 0, err + } + if msg > latestValidatedCount { + msg = latestValidatedCount + } + } + block := s.exec.MessageIndexToBlockNumber(msg - 1) + return block, nil +} + +func (s *SyncMonitor) FinalizedBlockNumber(ctx context.Context) (uint64, error) { + if s.consensus == nil { + return 0, errors.New("not set up for safeblock") + } + msg, err := s.consensus.GetFinalizedMsgCount(ctx) + if err != nil { + return 0, err + } + if s.config.FinalizedBlockWaitForBlockValidator { + latestValidatedCount, err := s.consensus.ValidatedMessageCount() + if err != nil { + return 0, err + } + if msg > latestValidatedCount { + msg = latestValidatedCount + } + } + block := s.exec.MessageIndexToBlockNumber(msg - 1) + return block, nil +} + +func (s *SyncMonitor) Synced() bool { + return len(s.SyncProgressMap()) == 0 +} + +func (s *SyncMonitor) SetConsensusInfo(consensus execution.ConsensusInfo) { + s.consensus = consensus +} diff --git a/execution/interface.go b/execution/interface.go index 361fbc8ad5..7540a09210 100644 --- a/execution/interface.go +++ b/execution/interface.go @@ -54,7 +54,6 @@ type ExecutionSequencer interface { ForwardTo(url string) error SequenceDelayedMessage(message *arbostypes.L1IncomingMessage, delayedSeqNum uint64) error NextDelayedMessageNumber() (uint64, error) - SetTransactionStreamer(streamer TransactionStreamer) GetL1GasPriceEstimate() (uint64, error) } @@ -68,22 +67,38 @@ type FullExecutionClient interface { Maintenance() error - // TODO: only used to get safe/finalized block numbers - MessageIndexToBlockNumber(messageNum arbutil.MessageIndex) uint64 - ArbOSVersionForMessageNumber(messageNum arbutil.MessageIndex) (uint64, error) } // not implemented in execution, used as input +// BatchFetcher is required for any execution node type BatchFetcher interface { - FetchBatch(batchNum uint64) ([]byte, common.Hash, error) + FetchBatch(ctx context.Context, batchNum uint64) ([]byte, common.Hash, error) + FindInboxBatchContainingMessage(message arbutil.MessageIndex) (uint64, bool, error) + GetBatchParentChainBlock(seqNum uint64) (uint64, error) } -type TransactionStreamer interface { - BatchFetcher +type ConsensusInfo interface { + Synced() bool + FullSyncProgressMap() map[string]interface{} + SyncTargetMessageCount() arbutil.MessageIndex + + // TODO: switch from pulling to pushing safe/finalized + GetSafeMsgCount(ctx context.Context) (arbutil.MessageIndex, error) + GetFinalizedMsgCount(ctx context.Context) (arbutil.MessageIndex, error) + ValidatedMessageCount() (arbutil.MessageIndex, error) +} + +type ConsensusSequencer interface { WriteMessageFromSequencer(pos arbutil.MessageIndex, msgWithMeta arbostypes.MessageWithMetadata) error ExpectChosenSequencer() error CacheL1PriceDataOfMsg(pos arbutil.MessageIndex, callDataUnits uint64, l1GasCharged uint64) BacklogL1GasCharged() uint64 BacklogCallDataUnits() uint64 } + +type FullConsensusClient interface { + BatchFetcher + ConsensusInfo + ConsensusSequencer +} diff --git a/nodeInterface/NodeInterface.go b/execution/nodeInterface/NodeInterface.go similarity index 88% rename from nodeInterface/NodeInterface.go rename to execution/nodeInterface/NodeInterface.go index bdcfb569f4..7e524731d0 100644 --- a/nodeInterface/NodeInterface.go +++ b/execution/nodeInterface/NodeInterface.go @@ -20,14 +20,12 @@ import ( "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/rpc" - "github.com/offchainlabs/nitro/arbnode" "github.com/offchainlabs/nitro/arbos" "github.com/offchainlabs/nitro/arbos/l1pricing" "github.com/offchainlabs/nitro/arbos/retryables" "github.com/offchainlabs/nitro/arbos/util" "github.com/offchainlabs/nitro/arbutil" "github.com/offchainlabs/nitro/solgen/go/node_interfacegen" - "github.com/offchainlabs/nitro/staker" "github.com/offchainlabs/nitro/util/arbmath" "github.com/offchainlabs/nitro/util/merkletree" ) @@ -53,90 +51,129 @@ var merkleTopic common.Hash var l2ToL1TxTopic common.Hash var l2ToL1TransactionTopic common.Hash -var blockInGenesis = errors.New("") -var blockAfterLatestBatch = errors.New("") - func (n NodeInterface) NitroGenesisBlock(c ctx) (huge, error) { block := n.backend.ChainConfig().ArbitrumChainParams.GenesisBlockNum return arbmath.UintToBig(block), nil } +// bool will be false but no error if behind genesis +func (n NodeInterface) blockNumToMessageIndex(blockNum uint64) (arbutil.MessageIndex, bool, error) { + node, err := gethExecFromNodeInterfaceBackend(n.backend) + if err != nil { + return 0, false, err + } + blockchain, err := blockchainFromNodeInterfaceBackend(n.backend) + if err != nil { + return 0, false, err + } + if blockNum < blockchain.Config().ArbitrumChainParams.GenesisBlockNum { + return 0, true, nil + } + msgIndex, err := node.ExecEngine.BlockNumberToMessageIndex(blockNum) + if err != nil { + return 0, false, err + } + return msgIndex, true, nil +} + +func (n NodeInterface) msgNumToInboxBatch(msgIndex arbutil.MessageIndex) (uint64, bool, error) { + node, err := gethExecFromNodeInterfaceBackend(n.backend) + if err != nil { + return 0, false, err + } + fetcher := node.ExecEngine.GetBatchFetcher() + if fetcher == nil { + return 0, false, errors.New("batch fetcher not set") + } + return fetcher.FindInboxBatchContainingMessage(msgIndex) +} + func (n NodeInterface) FindBatchContainingBlock(c ctx, evm mech, blockNum uint64) (uint64, error) { - node, err := arbNodeFromNodeInterfaceBackend(n.backend) + msgIndex, found, err := n.blockNumToMessageIndex(blockNum) if err != nil { return 0, err } - return findBatchContainingBlock(node, node.TxStreamer.GenesisBlockNumber(), blockNum) + if !found { + return 0, fmt.Errorf("block %v is part of genesis", blockNum) + } + res, found, err := n.msgNumToInboxBatch(msgIndex) + if err == nil && !found { + return 0, errors.New("block not yet found on any batch") + } + return res, err } func (n NodeInterface) GetL1Confirmations(c ctx, evm mech, blockHash bytes32) (uint64, error) { - node, err := arbNodeFromNodeInterfaceBackend(n.backend) + node, err := gethExecFromNodeInterfaceBackend(n.backend) if err != nil { return 0, err } - if node.InboxReader == nil { - return 0, nil - } - bc, err := blockchainFromNodeInterfaceBackend(n.backend) + blockchain, err := blockchainFromNodeInterfaceBackend(n.backend) if err != nil { return 0, err } - header := bc.GetHeaderByHash(blockHash) + header := blockchain.GetHeaderByHash(blockHash) if header == nil { return 0, errors.New("unknown block hash") } blockNum := header.Number.Uint64() - genesis := node.TxStreamer.GenesisBlockNumber() - batch, err := findBatchContainingBlock(node, genesis, blockNum) + + // blocks behind genesis are treated as belonging to batch 0 + msgNum, _, err := n.blockNumToMessageIndex(blockNum) if err != nil { - if errors.Is(err, blockInGenesis) { - batch = 0 - } else if errors.Is(err, blockAfterLatestBatch) { - return 0, nil - } else { - return 0, err - } + return 0, err + } + // batches not yet posted have 0 confirmations but no error + batchNum, found, err := n.msgNumToInboxBatch(msgNum) + if err != nil { + return 0, err + } + if !found { + return 0, nil } - meta, err := node.InboxTracker.GetBatchMetadata(batch) + parentChainBlockNum, err := node.ExecEngine.GetBatchFetcher().GetBatchParentChainBlock(batchNum) if err != nil { return 0, err } - if node.L1Reader.IsParentChainArbitrum() { - parentChainClient := node.L1Reader.Client() + + if node.ParentChainReader.IsParentChainArbitrum() { + parentChainClient := node.ParentChainReader.Client() parentNodeInterface, err := node_interfacegen.NewNodeInterface(types.NodeInterfaceAddress, parentChainClient) if err != nil { return 0, err } - parentChainBlock, err := parentChainClient.BlockByNumber(n.context, new(big.Int).SetUint64(meta.ParentChainBlock)) + parentChainBlock, err := parentChainClient.BlockByNumber(n.context, new(big.Int).SetUint64(parentChainBlockNum)) if err != nil { // Hide the parent chain RPC error from the client in case it contains sensitive information. // Likely though, this error is just "not found" because the block got reorg'd. - return 0, fmt.Errorf("failed to get parent chain block %v containing batch", meta.ParentChainBlock) + return 0, fmt.Errorf("failed to get parent chain block %v containing batch", parentChainBlockNum) } confs, err := parentNodeInterface.GetL1Confirmations(&bind.CallOpts{Context: n.context}, parentChainBlock.Hash()) if err != nil { log.Warn( "Failed to get L1 confirmations from parent chain", - "blockNumber", meta.ParentChainBlock, + "blockNumber", parentChainBlockNum, "blockHash", parentChainBlock.Hash(), "err", err, ) return 0, fmt.Errorf("failed to get L1 confirmations from parent chain for block %v", parentChainBlock.Hash()) } return confs, nil } - latestL1Block, latestBatchCount := node.InboxReader.GetLastReadBlockAndBatchCount() - if latestBatchCount <= batch { - return 0, nil // batch was reorg'd out? - } - if latestL1Block < meta.ParentChainBlock || arbutil.BlockNumberToMessageCount(blockNum, genesis) > meta.MessageCount { + if node.ParentChainReader == nil { return 0, nil } - canonicalHash := bc.GetCanonicalHash(header.Number.Uint64()) - if canonicalHash != header.Hash() { - return 0, errors.New("block hash is non-canonical") + latestHeader, err := node.ParentChainReader.LastHeaderWithError() + if err != nil { + return 0, err + } + if latestHeader == nil { + return 0, errors.New("no headers read from l1") } - confs := (latestL1Block - meta.ParentChainBlock) + 1 + node.InboxReader.GetDelayBlocks() - return confs, nil + latestBlockNum := latestHeader.Number.Uint64() + if latestBlockNum < parentChainBlockNum { + return 0, nil + } + return (latestBlockNum - parentChainBlockNum), nil } func (n NodeInterface) EstimateRetryableTicket( @@ -561,42 +598,18 @@ func (n NodeInterface) GasEstimateComponents( return total, gasForL1, baseFee, l1BaseFeeEstimate, nil } -func findBatchContainingBlock(node *arbnode.Node, genesis uint64, block uint64) (uint64, error) { - if block <= genesis { - return 0, fmt.Errorf("%wblock %v is part of genesis", blockInGenesis, block) - } - pos := arbutil.BlockNumberToMessageCount(block, genesis) - 1 - high, err := node.InboxTracker.GetBatchCount() - if err != nil { - return 0, err - } - high-- - latestCount, err := node.InboxTracker.GetBatchMessageCount(high) - if err != nil { - return 0, err - } - latestBlock := arbutil.MessageCountToBlockNumber(latestCount, genesis) - if int64(block) > latestBlock { - return 0, fmt.Errorf( - "%wrequested block %v is after latest on-chain block %v published in batch %v", - blockAfterLatestBatch, block, latestBlock, high, - ) - } - return staker.FindBatchContainingMessageIndex(node.InboxTracker, pos, high) -} - func (n NodeInterface) LegacyLookupMessageBatchProof(c ctx, evm mech, batchNum huge, index uint64) ( proof []bytes32, path huge, l2Sender addr, l1Dest addr, l2Block huge, l1Block huge, timestamp huge, amount huge, calldataForL1 []byte, err error) { - node, err := arbNodeFromNodeInterfaceBackend(n.backend) + node, err := gethExecFromNodeInterfaceBackend(n.backend) if err != nil { return } - if node.ClassicOutboxRetriever == nil { + if node.ClassicOutbox == nil { err = errors.New("this node doesnt support classicLookupMessageBatchProof") return } - msg, err := node.ClassicOutboxRetriever.GetMsg(batchNum, index) + msg, err := node.ClassicOutbox.GetMsg(batchNum, index) if err != nil { return } diff --git a/nodeInterface/NodeInterfaceDebug.go b/execution/nodeInterface/NodeInterfaceDebug.go similarity index 100% rename from nodeInterface/NodeInterfaceDebug.go rename to execution/nodeInterface/NodeInterfaceDebug.go diff --git a/nodeInterface/virtual-contracts.go b/execution/nodeInterface/virtual-contracts.go similarity index 96% rename from nodeInterface/virtual-contracts.go rename to execution/nodeInterface/virtual-contracts.go index b35381a77a..3a863e31b5 100644 --- a/nodeInterface/virtual-contracts.go +++ b/execution/nodeInterface/virtual-contracts.go @@ -15,10 +15,10 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/log" - "github.com/offchainlabs/nitro/arbnode" "github.com/offchainlabs/nitro/arbos" "github.com/offchainlabs/nitro/arbos/arbosState" "github.com/offchainlabs/nitro/arbos/l1pricing" + "github.com/offchainlabs/nitro/execution/gethexec" "github.com/offchainlabs/nitro/gethhook" "github.com/offchainlabs/nitro/precompiles" "github.com/offchainlabs/nitro/solgen/go/node_interfacegen" @@ -173,16 +173,16 @@ func init() { merkleTopic = arbSys.Events["SendMerkleUpdate"].ID } -func arbNodeFromNodeInterfaceBackend(backend BackendAPI) (*arbnode.Node, error) { +func gethExecFromNodeInterfaceBackend(backend BackendAPI) (*gethexec.ExecutionNode, error) { apiBackend, ok := backend.(*arbitrum.APIBackend) if !ok { return nil, errors.New("API backend isn't Arbitrum") } - arbNode, ok := apiBackend.GetArbitrumNode().(*arbnode.Node) + exec, ok := apiBackend.GetArbitrumNode().(*gethexec.ExecutionNode) if !ok { return nil, errors.New("failed to get Arbitrum Node from backend") } - return arbNode, nil + return exec, nil } func blockchainFromNodeInterfaceBackend(backend BackendAPI) (*core.BlockChain, error) { diff --git a/go.mod b/go.mod index cf9e61f9b9..58e2fe11ce 100644 --- a/go.mod +++ b/go.mod @@ -91,6 +91,7 @@ require ( github.com/crate-crypto/go-kzg-4844 v0.7.0 // indirect github.com/cskr/pubsub v1.0.2 // indirect github.com/davidlazar/go-crypto v0.0.0-20200604182044-b73af7476f6c // indirect + github.com/deckarep/golang-set v1.8.0 // indirect github.com/deckarep/golang-set/v2 v2.1.0 // indirect github.com/decred/dcrd/dcrec/secp256k1/v4 v4.1.0 // indirect github.com/dgraph-io/badger v1.6.2 // indirect @@ -245,6 +246,7 @@ require ( github.com/quic-go/webtransport-go v0.5.2 // indirect github.com/raulk/go-watchdog v1.3.0 // indirect github.com/rhnvrm/simples3 v0.6.1 // indirect + github.com/rjeczalik/notify v0.9.1 // indirect github.com/rivo/uniseg v0.4.3 // indirect github.com/rogpeppe/go-internal v1.9.0 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect diff --git a/go.sum b/go.sum index f2b4c668cc..39b1caffe4 100644 --- a/go.sum +++ b/go.sum @@ -277,6 +277,8 @@ github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs github.com/davidlazar/go-crypto v0.0.0-20170701192655-dcfb0a7ac018/go.mod h1:rQYf4tfk5sSwFsnDg3qYaBxSjsD9S8+59vW0dKUgme4= github.com/davidlazar/go-crypto v0.0.0-20200604182044-b73af7476f6c h1:pFUpOrbxDR6AkioZ1ySsx5yxlDQZ8stG2b88gTPxgJU= github.com/davidlazar/go-crypto v0.0.0-20200604182044-b73af7476f6c/go.mod h1:6UhI8N9EjYm1c2odKpFpAYeR8dsBeM7PtzQhRgxRr9U= +github.com/deckarep/golang-set v1.8.0 h1:sk9/l/KqpunDwP7pSjUg0keiOOLEnOBHzykLrsPppp4= +github.com/deckarep/golang-set v1.8.0/go.mod h1:5nI87KwE7wgsBU1F4GKAw2Qod7p5kyS383rP6+o6qqo= github.com/deckarep/golang-set/v2 v2.1.0 h1:g47V4Or+DUdzbs8FxCCmgb6VYd+ptPAngjM6dtGktsI= github.com/deckarep/golang-set/v2 v2.1.0/go.mod h1:VAky9rY/yGXJOLEDv3OMci+7wtDpOF4IN+y82NBOac4= github.com/decred/dcrd/crypto/blake256 v1.0.0 h1:/8DMNYp9SGi5f0w7uCm6d6M4OU2rGFK09Y2A4Xv7EE0= @@ -1485,6 +1487,8 @@ github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqn github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/rhnvrm/simples3 v0.6.1 h1:H0DJwybR6ryQE+Odi9eqkHuzjYAeJgtGcGtuBwOhsH8= github.com/rhnvrm/simples3 v0.6.1/go.mod h1:Y+3vYm2V7Y4VijFoJHHTrja6OgPrJ2cBti8dPGkC3sA= +github.com/rjeczalik/notify v0.9.1 h1:CLCKso/QK1snAlnhNR/CNvNiFU2saUtjV0bx3EwNeCE= +github.com/rjeczalik/notify v0.9.1/go.mod h1:rKwnCoCGeuQnwBtTSPL9Dad03Vh2n40ePRrjvIXnJho= github.com/rivo/tview v0.0.0-20230814110005-ccc2c8119703 h1:ZyM/+FYnpbZsFWuCohniM56kRoHRB4r5EuIzXEYkpxo= github.com/rivo/tview v0.0.0-20230814110005-ccc2c8119703/go.mod h1:nVwGv4MP47T0jvlk7KuTTjjuSmrGO4JF0iaiNt4bufE= github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= diff --git a/staker/l1_validator.go b/staker/l1_validator.go index 4e7aa22cbe..56389ae80e 100644 --- a/staker/l1_validator.go +++ b/staker/l1_validator.go @@ -339,10 +339,14 @@ func (v *L1Validator) generateNodeAction( batchNum = localBatchCount - 1 validatedCount = messageCount } else { - batchNum, err = FindBatchContainingMessageIndex(v.inboxTracker, validatedCount-1, localBatchCount) + var found bool + batchNum, found, err = v.inboxTracker.FindInboxBatchContainingMessage(validatedCount - 1) if err != nil { return nil, false, err } + if !found { + return nil, false, errors.New("batch not found on L1") + } } execResult, err := v.txStreamer.ResultAtCount(validatedCount) if err != nil { diff --git a/staker/stateless_block_validator.go b/staker/stateless_block_validator.go index fcd1f247c2..abfc08ec33 100644 --- a/staker/stateless_block_validator.go +++ b/staker/stateless_block_validator.go @@ -56,6 +56,7 @@ type InboxTrackerInterface interface { GetBatchMessageCount(seqNum uint64) (arbutil.MessageIndex, error) GetBatchAcc(seqNum uint64) (common.Hash, error) GetBatchCount() (uint64, error) + FindInboxBatchContainingMessage(pos arbutil.MessageIndex) (uint64, bool, error) } type TransactionStreamerInterface interface { @@ -108,39 +109,6 @@ func GlobalStatePositionsAtCount( return startPos, GlobalStatePosition{batch, posInBatch + 1}, nil } -func FindBatchContainingMessageIndex( - tracker InboxTrackerInterface, pos arbutil.MessageIndex, high uint64, -) (uint64, error) { - var low uint64 - // Iteration preconditions: - // - high >= low - // - msgCount(low - 1) <= pos implies low <= target - // - msgCount(high) > pos implies high >= target - // Therefore, if low == high, then low == high == target - for high > low { - // Due to integer rounding, mid >= low && mid < high - mid := (low + high) / 2 - count, err := tracker.GetBatchMessageCount(mid) - if err != nil { - return 0, err - } - if count < pos { - // Must narrow as mid >= low, therefore mid + 1 > low, therefore newLow > oldLow - // Keeps low precondition as msgCount(mid) < pos - low = mid + 1 - } else if count == pos { - return mid + 1, nil - } else if count == pos+1 || mid == low { // implied: count > pos - return mid, nil - } else { // implied: count > pos + 1 - // Must narrow as mid < high, therefore newHigh < lowHigh - // Keeps high precondition as msgCount(mid) > pos - high = mid - } - } - return low, nil -} - type ValidationEntryStage uint32 const ( @@ -352,13 +320,12 @@ func (v *StatelessBlockValidator) GlobalStatePositionsAtCount(count arbutil.Mess if count == 1 { return GlobalStatePosition{}, GlobalStatePosition{1, 0}, nil } - batchCount, err := v.inboxTracker.GetBatchCount() + batch, found, err := v.inboxTracker.FindInboxBatchContainingMessage(count - 1) if err != nil { return GlobalStatePosition{}, GlobalStatePosition{}, err } - batch, err := FindBatchContainingMessageIndex(v.inboxTracker, count-1, batchCount) - if err != nil { - return GlobalStatePosition{}, GlobalStatePosition{}, err + if !found { + return GlobalStatePosition{}, GlobalStatePosition{}, errors.New("batch not found on L1 yet") } return GlobalStatePositionsAtCount(v.inboxTracker, count, batch) } diff --git a/system_tests/common_test.go b/system_tests/common_test.go index c37eb1db34..cd65cd2edc 100644 --- a/system_tests/common_test.go +++ b/system_tests/common_test.go @@ -57,7 +57,7 @@ import ( "github.com/offchainlabs/nitro/arbnode" "github.com/offchainlabs/nitro/arbos" "github.com/offchainlabs/nitro/arbutil" - _ "github.com/offchainlabs/nitro/nodeInterface" + _ "github.com/offchainlabs/nitro/execution/nodeInterface" "github.com/offchainlabs/nitro/solgen/go/bridgegen" "github.com/offchainlabs/nitro/solgen/go/mocksgen" "github.com/offchainlabs/nitro/solgen/go/precompilesgen" diff --git a/system_tests/fees_test.go b/system_tests/fees_test.go index 3ff3bfc43f..01ace5c7e2 100644 --- a/system_tests/fees_test.go +++ b/system_tests/fees_test.go @@ -119,8 +119,6 @@ func TestSequencerFeePaid(t *testing.T) { } func testSequencerPriceAdjustsFrom(t *testing.T, initialEstimate uint64) { - t.Parallel() - _ = os.Mkdir("test-data", 0766) path := filepath.Join("test-data", fmt.Sprintf("testSequencerPriceAdjustsFrom%v.csv", initialEstimate)) diff --git a/system_tests/full_challenge_impl_test.go b/system_tests/full_challenge_impl_test.go index 1c67cf4d95..03b6d690f1 100644 --- a/system_tests/full_challenge_impl_test.go +++ b/system_tests/full_challenge_impl_test.go @@ -1,10 +1,6 @@ // Copyright 2021-2022, Offchain Labs, Inc. // For license information, see https://github.com/nitro/blob/master/LICENSE -// race detection makes things slow and miss timeouts -//go:build !race -// +build !race - package arbtest import ( @@ -29,14 +25,17 @@ import ( "github.com/offchainlabs/nitro/arbcompress" "github.com/offchainlabs/nitro/arbnode" "github.com/offchainlabs/nitro/arbos" + "github.com/offchainlabs/nitro/arbos/arbostypes" "github.com/offchainlabs/nitro/arbstate" "github.com/offchainlabs/nitro/arbutil" + "github.com/offchainlabs/nitro/cmd/chaininfo" "github.com/offchainlabs/nitro/execution/gethexec" "github.com/offchainlabs/nitro/solgen/go/challengegen" "github.com/offchainlabs/nitro/solgen/go/mocksgen" "github.com/offchainlabs/nitro/solgen/go/ospgen" "github.com/offchainlabs/nitro/solgen/go/yulgen" "github.com/offchainlabs/nitro/staker" + "github.com/offchainlabs/nitro/util/signature" "github.com/offchainlabs/nitro/validator" "github.com/offchainlabs/nitro/validator/server_common" "github.com/offchainlabs/nitro/validator/valnode" @@ -239,6 +238,16 @@ func setupSequencerInboxStub(ctx context.Context, t *testing.T, l1Info *Blockcha return bridgeAddr, seqInbox, seqInboxAddr } +func createL2Nodes(t *testing.T, ctx context.Context, conf *arbnode.Config, chainConfig *params.ChainConfig, l1Client arbutil.L1Interface, l2info *BlockchainTestInfo, rollupAddresses *chaininfo.RollupAddresses, initMsg *arbostypes.ParsedInitMessage, txOpts *bind.TransactOpts, signer signature.DataSignerFunc, fatalErrChan chan error) (*arbnode.Node, *gethexec.ExecutionNode) { + _, stack, l2ChainDb, l2ArbDb, l2Blockchain := createL2BlockChainWithStackConfig(t, l2info, "", chainConfig, initMsg, nil, nil) + execNode, err := gethexec.CreateExecutionNode(ctx, stack, l2ChainDb, l2Blockchain, l1Client, gethexec.ConfigDefaultTest) + Require(t, err) + consensusNode, err := arbnode.CreateNode(ctx, stack, execNode, l2ArbDb, NewFetcherFromConfig(conf), chainConfig, l1Client, rollupAddresses, txOpts, txOpts, signer, fatalErrChan, big.NewInt(1337), nil) + Require(t, err) + + return consensusNode, execNode +} + func RunChallengeTest(t *testing.T, asserterIsCorrect bool, useStubs bool, challengeMsgIdx int64) { glogger := log.NewGlogHandler(log.StreamHandler(os.Stderr, log.TerminalFormat(false))) glogger.Verbosity(log.LvlInfo) @@ -281,25 +290,18 @@ func RunChallengeTest(t *testing.T, asserterIsCorrect bool, useStubs bool, chall asserterBridgeAddr, asserterSeqInbox, asserterSeqInboxAddr := setupSequencerInboxStub(ctx, t, l1Info, l1Backend, chainConfig) challengerBridgeAddr, challengerSeqInbox, challengerSeqInboxAddr := setupSequencerInboxStub(ctx, t, l1Info, l1Backend, chainConfig) - asserterL2Info, asserterL2Stack, asserterL2ChainDb, asserterL2ArbDb, asserterL2Blockchain := createL2BlockChainWithStackConfig(t, nil, "", chainConfig, initMessage, nil, nil) asserterRollupAddresses.Bridge = asserterBridgeAddr asserterRollupAddresses.SequencerInbox = asserterSeqInboxAddr - asserterExec, err := gethexec.CreateExecutionNode(ctx, asserterL2Stack, asserterL2ChainDb, asserterL2Blockchain, l1Backend, gethexec.ConfigDefaultTest) - Require(t, err) - parentChainID := big.NewInt(1337) - asserterL2, err := arbnode.CreateNode(ctx, asserterL2Stack, asserterExec, asserterL2ArbDb, NewFetcherFromConfig(conf), chainConfig, l1Backend, asserterRollupAddresses, nil, nil, nil, fatalErrChan, parentChainID, nil) - Require(t, err) - err = asserterL2.Start(ctx) + asserterL2Info := NewArbTestInfo(t, chainConfig.ChainID) + asserterL2, asserterExec := createL2Nodes(t, ctx, conf, chainConfig, l1Backend, asserterL2Info, asserterRollupAddresses, initMessage, nil, nil, fatalErrChan) + err := asserterL2.Start(ctx) Require(t, err) - challengerL2Info, challengerL2Stack, challengerL2ChainDb, challengerL2ArbDb, challengerL2Blockchain := createL2BlockChainWithStackConfig(t, nil, "", chainConfig, initMessage, nil, nil) challengerRollupAddresses := *asserterRollupAddresses challengerRollupAddresses.Bridge = challengerBridgeAddr challengerRollupAddresses.SequencerInbox = challengerSeqInboxAddr - challengerExec, err := gethexec.CreateExecutionNode(ctx, challengerL2Stack, challengerL2ChainDb, challengerL2Blockchain, l1Backend, gethexec.ConfigDefaultTest) - Require(t, err) - challengerL2, err := arbnode.CreateNode(ctx, challengerL2Stack, challengerExec, challengerL2ArbDb, NewFetcherFromConfig(conf), chainConfig, l1Backend, &challengerRollupAddresses, nil, nil, nil, fatalErrChan, parentChainID, nil) - Require(t, err) + challengerL2Info := NewArbTestInfo(t, chainConfig.ChainID) + challengerL2, challengerExec := createL2Nodes(t, ctx, conf, chainConfig, l1Backend, challengerL2Info, &challengerRollupAddresses, initMessage, nil, nil, fatalErrChan) err = challengerL2.Start(ctx) Require(t, err) @@ -387,7 +389,7 @@ func RunChallengeTest(t *testing.T, asserterIsCorrect bool, useStubs bool, chall confirmLatestBlock(ctx, t, l1Info, l1Backend) - asserterValidator, err := staker.NewStatelessBlockValidator(asserterL2.InboxReader, asserterL2.InboxTracker, asserterL2.TxStreamer, asserterExec.Recorder, asserterL2ArbDb, nil, nil, StaticFetcherFrom(t, &conf.BlockValidator), valStack) + asserterValidator, err := staker.NewStatelessBlockValidator(asserterL2.InboxReader, asserterL2.InboxTracker, asserterL2.TxStreamer, asserterExec.Recorder, asserterL2.ArbDB, nil, nil, StaticFetcherFrom(t, &conf.BlockValidator), valStack) if err != nil { Fatal(t, err) } @@ -404,7 +406,7 @@ func RunChallengeTest(t *testing.T, asserterIsCorrect bool, useStubs bool, chall if err != nil { Fatal(t, err) } - challengerValidator, err := staker.NewStatelessBlockValidator(challengerL2.InboxReader, challengerL2.InboxTracker, challengerL2.TxStreamer, challengerExec.Recorder, challengerL2ArbDb, nil, nil, StaticFetcherFrom(t, &conf.BlockValidator), valStack) + challengerValidator, err := staker.NewStatelessBlockValidator(challengerL2.InboxReader, challengerL2.InboxTracker, challengerL2.TxStreamer, challengerExec.Recorder, challengerL2.ArbDB, nil, nil, StaticFetcherFrom(t, &conf.BlockValidator), valStack) if err != nil { Fatal(t, err) } @@ -492,17 +494,3 @@ func RunChallengeTest(t *testing.T, asserterIsCorrect bool, useStubs bool, chall Fatal(t, "challenge timed out without winner") } - -func TestMockChallengeManagerAsserterIncorrect(t *testing.T) { - t.Parallel() - for i := int64(1); i <= makeBatch_MsgsPerBatch*3; i++ { - RunChallengeTest(t, false, true, i) - } -} - -func TestMockChallengeManagerAsserterCorrect(t *testing.T) { - t.Parallel() - for i := int64(1); i <= makeBatch_MsgsPerBatch*3; i++ { - RunChallengeTest(t, true, true, i) - } -} diff --git a/system_tests/full_challenge_mock_test.go b/system_tests/full_challenge_mock_test.go new file mode 100644 index 0000000000..d32c2b40ab --- /dev/null +++ b/system_tests/full_challenge_mock_test.go @@ -0,0 +1,21 @@ +// race detection makes things slow and miss timeouts +//go:build !race +// +build !race + +package arbtest + +import "testing" + +func TestMockChallengeManagerAsserterIncorrect(t *testing.T) { + t.Parallel() + for i := int64(1); i <= makeBatch_MsgsPerBatch*3; i++ { + RunChallengeTest(t, false, true, i) + } +} + +func TestMockChallengeManagerAsserterCorrect(t *testing.T) { + t.Parallel() + for i := int64(1); i <= makeBatch_MsgsPerBatch*3; i++ { + RunChallengeTest(t, true, true, i) + } +} diff --git a/system_tests/nodeinterface_test.go b/system_tests/nodeinterface_test.go index 3424a58e9e..b692af6e30 100644 --- a/system_tests/nodeinterface_test.go +++ b/system_tests/nodeinterface_test.go @@ -1,6 +1,10 @@ // Copyright 2021-2022, Offchain Labs, Inc. // For license information, see https://github.com/nitro/blob/master/LICENSE +// race detection makes things slow and miss timeouts +//go:build !race +// +build !race + package arbtest import ( @@ -11,10 +15,82 @@ import ( "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/params" + "github.com/offchainlabs/nitro/arbnode" "github.com/offchainlabs/nitro/arbos/util" "github.com/offchainlabs/nitro/solgen/go/node_interfacegen" ) +func TestFindBatch(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + l1Info := NewL1TestInfo(t) + initialBalance := new(big.Int).Lsh(big.NewInt(1), 200) + l1Info.GenerateGenesisAccount("deployer", initialBalance) + l1Info.GenerateGenesisAccount("asserter", initialBalance) + l1Info.GenerateGenesisAccount("challenger", initialBalance) + l1Info.GenerateGenesisAccount("sequencer", initialBalance) + + l1Info, l1Backend, _, _ := createTestL1BlockChain(t, l1Info) + conf := arbnode.ConfigDefaultL1Test() + conf.BlockValidator.Enable = false + conf.BatchPoster.Enable = false + + chainConfig := params.ArbitrumDevTestChainConfig() + fatalErrChan := make(chan error, 10) + rollupAddresses, initMsg := DeployOnTestL1(t, ctx, l1Info, l1Backend, chainConfig) + + bridgeAddr, seqInbox, seqInboxAddr := setupSequencerInboxStub(ctx, t, l1Info, l1Backend, chainConfig) + + callOpts := bind.CallOpts{Context: ctx} + + rollupAddresses.Bridge = bridgeAddr + rollupAddresses.SequencerInbox = seqInboxAddr + l2Info := NewArbTestInfo(t, chainConfig.ChainID) + consensus, _ := createL2Nodes(t, ctx, conf, chainConfig, l1Backend, l2Info, rollupAddresses, initMsg, nil, nil, fatalErrChan) + err := consensus.Start(ctx) + Require(t, err) + + l2Client := ClientForStack(t, consensus.Stack) + nodeInterface, err := node_interfacegen.NewNodeInterface(types.NodeInterfaceAddress, l2Client) + Require(t, err) + sequencerTxOpts := l1Info.GetDefaultTransactOpts("sequencer", ctx) + + l2Info.GenerateAccount("Destination") + makeBatch(t, consensus, l2Info, l1Backend, &sequencerTxOpts, seqInbox, seqInboxAddr, -1) + makeBatch(t, consensus, l2Info, l1Backend, &sequencerTxOpts, seqInbox, seqInboxAddr, -1) + makeBatch(t, consensus, l2Info, l1Backend, &sequencerTxOpts, seqInbox, seqInboxAddr, -1) + + for blockNum := uint64(0); blockNum < uint64(makeBatch_MsgsPerBatch)*3; blockNum++ { + gotBatchNum, err := nodeInterface.FindBatchContainingBlock(&callOpts, blockNum) + Require(t, err) + expBatchNum := uint64(0) + if blockNum > 0 { + expBatchNum = 1 + (blockNum-1)/uint64(makeBatch_MsgsPerBatch) + } + if expBatchNum != gotBatchNum { + Fatal(t, "wrong result from findBatchContainingBlock. blocknum ", blockNum, " expected ", expBatchNum, " got ", gotBatchNum) + } + batchL1Block, err := consensus.InboxTracker.GetBatchParentChainBlock(gotBatchNum) + Require(t, err) + blockHeader, err := l2Client.HeaderByNumber(ctx, new(big.Int).SetUint64(blockNum)) + Require(t, err) + blockHash := blockHeader.Hash() + + minCurrentL1Block, err := l1Backend.BlockNumber(ctx) + Require(t, err) + gotConfirmations, err := nodeInterface.GetL1Confirmations(&callOpts, blockHash) + Require(t, err) + maxCurrentL1Block, err := l1Backend.BlockNumber(ctx) + Require(t, err) + + if gotConfirmations > (maxCurrentL1Block-batchL1Block) || gotConfirmations < (minCurrentL1Block-batchL1Block) { + Fatal(t, "wrong number of confirmations. got ", gotConfirmations) + } + } +} + func TestL2BlockRangeForL1(t *testing.T) { t.Parallel() ctx, cancel := context.WithCancel(context.Background())