Skip to content

Commit

Permalink
resolve merge conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
ganeshvanahalli committed Mar 28, 2024
2 parents 158ce7b + f93d2c3 commit 4ad776c
Show file tree
Hide file tree
Showing 28 changed files with 706 additions and 345 deletions.
4 changes: 2 additions & 2 deletions arbitrator/wasm-libraries/go-stub/src/value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,9 +164,9 @@ pub unsafe fn get_field(source: u32, field: &[u8]) -> GoValue {
}
} else if source == GO_ID {
if field == b"_pendingEvent" {
if let Some(event) = &PENDING_EVENT {
if let Some(event) = PENDING_EVENT.clone() {
let id = DynamicObjectPool::singleton()
.insert(DynamicObject::PendingEvent(event.clone()));
.insert(DynamicObject::PendingEvent(event));
return GoValue::Object(id);
} else {
return GoValue::Null;
Expand Down
26 changes: 5 additions & 21 deletions arbnode/inbox_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"math"
"math/big"
"strings"
"sync"
"sync/atomic"
"time"

Expand Down Expand Up @@ -99,10 +98,6 @@ type InboxReader struct {

// Atomic
lastSeenBatchCount uint64

// Behind the mutex
lastReadMutex sync.RWMutex
lastReadBlock uint64
lastReadBatchCount uint64
}

Expand Down Expand Up @@ -396,10 +391,7 @@ func (r *InboxReader) run(ctx context.Context, hadError bool) error {
// There's nothing to do
from = arbmath.BigAddByUint(currentHeight, 1)
blocksToFetch = config.DefaultBlocksToRead
r.lastReadMutex.Lock()
r.lastReadBlock = currentHeight.Uint64()
r.lastReadBatchCount = checkingBatchCount
r.lastReadMutex.Unlock()
atomic.StoreUint64(&r.lastReadBatchCount, checkingBatchCount)
storeSeenBatchCount()
if !r.caughtUp && readMode == "latest" {
r.caughtUp = true
Expand Down Expand Up @@ -531,10 +523,7 @@ func (r *InboxReader) run(ctx context.Context, hadError bool) error {
}
if len(sequencerBatches) > 0 {
readAnyBatches = true
r.lastReadMutex.Lock()
r.lastReadBlock = to.Uint64()
r.lastReadBatchCount = sequencerBatches[len(sequencerBatches)-1].SequenceNumber + 1
r.lastReadMutex.Unlock()
atomic.StoreUint64(&r.lastReadBatchCount, sequencerBatches[len(sequencerBatches)-1].SequenceNumber+1)
storeSeenBatchCount()
}
}
Expand All @@ -561,10 +550,7 @@ func (r *InboxReader) run(ctx context.Context, hadError bool) error {
}

if !readAnyBatches {
r.lastReadMutex.Lock()
r.lastReadBlock = currentHeight.Uint64()
r.lastReadBatchCount = checkingBatchCount
r.lastReadMutex.Unlock()
atomic.StoreUint64(&r.lastReadBatchCount, checkingBatchCount)
storeSeenBatchCount()
}
}
Expand Down Expand Up @@ -635,10 +621,8 @@ func (r *InboxReader) GetSequencerMessageBytes(ctx context.Context, seqNum uint6
return nil, common.Hash{}, fmt.Errorf("sequencer batch %v not found in L1 block %v (found batches %v)", seqNum, metadata.ParentChainBlock, seenBatches)
}

func (r *InboxReader) GetLastReadBlockAndBatchCount() (uint64, uint64) {
r.lastReadMutex.RLock()
defer r.lastReadMutex.RUnlock()
return r.lastReadBlock, r.lastReadBatchCount
func (r *InboxReader) GetLastReadBatchCount() uint64 {
return atomic.LoadUint64(&r.lastReadBatchCount)
}

// GetLastSeenBatchCount returns how many sequencer batches the inbox reader has read in from L1.
Expand Down
53 changes: 53 additions & 0 deletions arbnode/inbox_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,11 @@ func (t *InboxTracker) GetBatchMessageCount(seqNum uint64) (arbutil.MessageIndex
return metadata.MessageCount, err
}

func (t *InboxTracker) GetBatchParentChainBlock(seqNum uint64) (uint64, error) {
metadata, err := t.GetBatchMetadata(seqNum)
return metadata.ParentChainBlock, err
}

// GetBatchAcc is a convenience function wrapping GetBatchMetadata
func (t *InboxTracker) GetBatchAcc(seqNum uint64) (common.Hash, error) {
metadata, err := t.GetBatchMetadata(seqNum)
Expand All @@ -223,6 +228,54 @@ func (t *InboxTracker) GetBatchCount() (uint64, error) {
return count, nil
}

// err will return unexpected/internal errors
// bool will be false if batch not found (meaning, block not yet posted on a batch)
func (t *InboxTracker) FindInboxBatchContainingMessage(pos arbutil.MessageIndex) (uint64, bool, error) {
batchCount, err := t.GetBatchCount()
if err != nil {
return 0, false, err
}
low := uint64(0)
high := batchCount - 1
lastBatchMessageCount, err := t.GetBatchMessageCount(high)
if err != nil {
return 0, false, err
}
if lastBatchMessageCount <= pos {
return 0, false, nil
}
// Iteration preconditions:
// - high >= low
// - msgCount(low - 1) <= pos implies low <= target
// - msgCount(high) > pos implies high >= target
// Therefore, if low == high, then low == high == target
for {
// Due to integer rounding, mid >= low && mid < high
mid := (low + high) / 2
count, err := t.GetBatchMessageCount(mid)
if err != nil {
return 0, false, err
}
if count < pos {
// Must narrow as mid >= low, therefore mid + 1 > low, therefore newLow > oldLow
// Keeps low precondition as msgCount(mid) < pos
low = mid + 1
} else if count == pos {
return mid + 1, true, nil
} else if count == pos+1 || mid == low { // implied: count > pos
return mid, true, nil
} else {
// implied: count > pos + 1
// Must narrow as mid < high, therefore newHigh < oldHigh
// Keeps high precondition as msgCount(mid) > pos
high = mid
}
if high == low {
return high, true, nil
}
}
}

func (t *InboxTracker) PopulateFeedBacklog(broadcastServer *broadcaster.Broadcaster) error {
batchCount, err := t.GetBatchCount()
if err != nil {
Expand Down
87 changes: 72 additions & 15 deletions arbnode/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/offchainlabs/nitro/arbnode/dataposter"
"github.com/offchainlabs/nitro/arbnode/dataposter/storage"
"github.com/offchainlabs/nitro/arbnode/resourcemanager"
"github.com/offchainlabs/nitro/arbos/arbostypes"
"github.com/offchainlabs/nitro/arbstate"
"github.com/offchainlabs/nitro/arbutil"
"github.com/offchainlabs/nitro/broadcastclient"
Expand Down Expand Up @@ -196,6 +197,7 @@ func ConfigDefaultL1NonSequencerTest() *Config {
config.BatchPoster.Enable = false
config.SeqCoordinator.Enable = false
config.BlockValidator = staker.TestBlockValidatorConfig
config.SyncMonitor = TestSyncMonitorConfig
config.Staker = staker.TestL1ValidatorConfig
config.Staker.Enable = false
config.BlockValidator.ValidationServerConfigs = []rpcclient.ClientConfig{{URL: ""}}
Expand All @@ -213,6 +215,7 @@ func ConfigDefaultL2Test() *Config {
config.SeqCoordinator.Signer.ECDSA.AcceptSequencer = false
config.SeqCoordinator.Signer.ECDSA.Dangerous.AcceptMissing = true
config.Staker = staker.TestL1ValidatorConfig
config.SyncMonitor = TestSyncMonitorConfig
config.Staker.Enable = false
config.BlockValidator.ValidationServerConfigs = []rpcclient.ClientConfig{{URL: ""}}
config.TransactionStreamer = DefaultTransactionStreamerConfig
Expand Down Expand Up @@ -265,7 +268,6 @@ type Node struct {
SeqCoordinator *SeqCoordinator
MaintenanceRunner *MaintenanceRunner
DASLifecycleManager *das.LifecycleManager
ClassicOutboxRetriever *ClassicOutboxRetriever
SyncMonitor *SyncMonitor
configFetcher ConfigFetcher
ctx context.Context
Expand Down Expand Up @@ -381,17 +383,10 @@ func createNodeImpl(

l2ChainId := l2Config.ChainID.Uint64()

syncMonitor := NewSyncMonitor(&config.SyncMonitor)
var classicOutbox *ClassicOutboxRetriever
classicMsgDb, err := stack.OpenDatabase("classic-msg", 0, 0, "", true)
if err != nil {
if l2Config.ArbitrumChainParams.GenesisBlockNum > 0 {
log.Warn("Classic Msg Database not found", "err", err)
}
classicOutbox = nil
} else {
classicOutbox = NewClassicOutboxRetriever(classicMsgDb)
syncConfigFetcher := func() *SyncMonitorConfig {
return &configFetcher.Get().SyncMonitor
}
syncMonitor := NewSyncMonitor(syncConfigFetcher)

var l1Reader *headerreader.HeaderReader
if config.ParentChainReader.Enable {
Expand Down Expand Up @@ -488,7 +483,6 @@ func createNodeImpl(
SeqCoordinator: coordinator,
MaintenanceRunner: maintenanceRunner,
DASLifecycleManager: nil,
ClassicOutboxRetriever: classicOutbox,
SyncMonitor: syncMonitor,
configFetcher: configFetcher,
ctx: ctx,
Expand Down Expand Up @@ -706,7 +700,6 @@ func createNodeImpl(
SeqCoordinator: coordinator,
MaintenanceRunner: maintenanceRunner,
DASLifecycleManager: dasLifecycleManager,
ClassicOutboxRetriever: classicOutbox,
SyncMonitor: syncMonitor,
configFetcher: configFetcher,
ctx: ctx,
Expand Down Expand Up @@ -763,22 +756,36 @@ func CreateNode(
return currentNode, nil
}

func (n *Node) CacheL1PriceDataOfMsg(pos arbutil.MessageIndex, callDataUnits uint64, l1GasCharged uint64) {
n.TxStreamer.CacheL1PriceDataOfMsg(pos, callDataUnits, l1GasCharged)
}

func (n *Node) BacklogL1GasCharged() uint64 {
return n.TxStreamer.BacklogL1GasCharged()
}
func (n *Node) BacklogCallDataUnits() uint64 {
return n.TxStreamer.BacklogCallDataUnits()
}

func (n *Node) Start(ctx context.Context) error {
execClient, ok := n.Execution.(*gethexec.ExecutionNode)
if !ok {
execClient = nil
}
if execClient != nil {
err := execClient.Initialize(ctx, n, n.SyncMonitor)
err := execClient.Initialize(ctx)
if err != nil {
return fmt.Errorf("error initializing exec client: %w", err)
}
}
n.SyncMonitor.Initialize(n.InboxReader, n.TxStreamer, n.SeqCoordinator, n.Execution)
n.SyncMonitor.Initialize(n.InboxReader, n.TxStreamer, n.SeqCoordinator)
err := n.Stack.Start()
if err != nil {
return fmt.Errorf("error starting geth stack: %w", err)
}
if execClient != nil {
execClient.SetConsensusClient(n)
}
err = n.Execution.Start(ctx)
if err != nil {
return fmt.Errorf("error starting exec client: %w", err)
Expand Down Expand Up @@ -891,6 +898,7 @@ func (n *Node) Start(ctx context.Context) error {
if n.configFetcher != nil {
n.configFetcher.Start(ctx)
}
n.SyncMonitor.Start(ctx)
return nil
}

Expand Down Expand Up @@ -944,6 +952,7 @@ func (n *Node) StopAndWait() {
// Just stops the redis client (most other stuff was stopped earlier)
n.SeqCoordinator.StopAndWait()
}
n.SyncMonitor.StopAndWait()
if n.DASLifecycleManager != nil {
n.DASLifecycleManager.StopAndWaitUntil(2 * time.Second)
}
Expand All @@ -954,3 +963,51 @@ func (n *Node) StopAndWait() {
log.Error("error on stack close", "err", err)
}
}

func (n *Node) FetchBatch(ctx context.Context, batchNum uint64) ([]byte, common.Hash, error) {
return n.InboxReader.GetSequencerMessageBytes(ctx, batchNum)
}

func (n *Node) FindInboxBatchContainingMessage(message arbutil.MessageIndex) (uint64, bool, error) {
return n.InboxTracker.FindInboxBatchContainingMessage(message)
}

func (n *Node) GetBatchParentChainBlock(seqNum uint64) (uint64, error) {
return n.InboxTracker.GetBatchParentChainBlock(seqNum)
}

func (n *Node) FullSyncProgressMap() map[string]interface{} {
return n.SyncMonitor.FullSyncProgressMap()
}

func (n *Node) Synced() bool {
return n.SyncMonitor.Synced()
}

func (n *Node) SyncTargetMessageCount() arbutil.MessageIndex {
return n.SyncMonitor.SyncTargetMessageCount()
}

// TODO: switch from pulling to pushing safe/finalized
func (n *Node) GetSafeMsgCount(ctx context.Context) (arbutil.MessageIndex, error) {
return n.InboxReader.GetSafeMsgCount(ctx)
}

func (n *Node) GetFinalizedMsgCount(ctx context.Context) (arbutil.MessageIndex, error) {
return n.InboxReader.GetFinalizedMsgCount(ctx)
}

func (n *Node) WriteMessageFromSequencer(pos arbutil.MessageIndex, msgWithMeta arbostypes.MessageWithMetadata) error {
return n.TxStreamer.WriteMessageFromSequencer(pos, msgWithMeta)
}

func (n *Node) ExpectChosenSequencer() error {
return n.TxStreamer.ExpectChosenSequencer()
}

func (n *Node) ValidatedMessageCount() (arbutil.MessageIndex, error) {
if n.BlockValidator == nil {
return 0, errors.New("validator not set up")
}
return n.BlockValidator.GetValidated(), nil
}
Loading

0 comments on commit 4ad776c

Please sign in to comment.