Skip to content

Commit

Permalink
Add a nitro option to stop syncing at a given block number
Browse files Browse the repository at this point in the history
  • Loading branch information
amsanghi committed Oct 22, 2024
1 parent d3fa9a7 commit 8a13df5
Show file tree
Hide file tree
Showing 15 changed files with 150 additions and 52 deletions.
4 changes: 2 additions & 2 deletions arbnode/delayed_seq_reorg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ func TestSequencerReorgFromDelayed(t *testing.T) {
tracker, err := NewInboxTracker(db, streamer, nil, DefaultSnapSyncConfig)
Require(t, err)

err = streamer.Start(ctx)
err = streamer.Start(ctx, 0)
Require(t, err)
exec.Start(ctx)
exec.Start(ctx, 0)
init, err := streamer.GetMessage(0)
Require(t, err)

Expand Down
17 changes: 14 additions & 3 deletions arbnode/delayed_sequencer.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,11 +211,20 @@ func (d *DelayedSequencer) ForceSequenceDelayed(ctx context.Context) error {
return d.sequenceWithoutLockout(ctx, lastBlockHeader)
}

func (d *DelayedSequencer) run(ctx context.Context) {
func (d *DelayedSequencer) run(ctx context.Context, syncTillBlock uint64) {
headerChan, cancel := d.l1Reader.Subscribe(false)
defer cancel()

for {
delayedCount, err := d.inbox.GetDelayedCount()
if err != nil {
log.Warn("error reading delayed count", "err", err)
continue
}
if syncTillBlock > 0 && delayedCount >= syncTillBlock {
log.Info("stopping block creation in delayed sequencer", "syncTillBlock", syncTillBlock)
return
}
select {
case nextHeader, ok := <-headerChan:
if !ok {
Expand All @@ -232,7 +241,9 @@ func (d *DelayedSequencer) run(ctx context.Context) {
}
}

func (d *DelayedSequencer) Start(ctxIn context.Context) {
func (d *DelayedSequencer) Start(ctxIn context.Context, syncTillBlock uint64) {
d.StopWaiter.Start(ctxIn, d)
d.LaunchThread(d.run)
d.LaunchThread(func(ctx context.Context) {
d.run(ctx, syncTillBlock)
})
}
36 changes: 27 additions & 9 deletions arbnode/inbox_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,18 +119,36 @@ func NewInboxReader(tracker *InboxTracker, client *ethclient.Client, l1Reader *h
}, nil
}

func (r *InboxReader) Start(ctxIn context.Context) error {
func (r *InboxReader) Start(ctxIn context.Context, syncTillBlock uint64) error {
r.StopWaiter.Start(ctxIn, r)
hadError := false
r.CallIteratively(func(ctx context.Context) time.Duration {
err := r.run(ctx, hadError)
if err != nil && !errors.Is(err, context.Canceled) && !strings.Contains(err.Error(), "header not found") {
log.Warn("error reading inbox", "err", err)
hadError = true
} else {
hadError = false
r.LaunchThread(func(ctx context.Context) {
for {
delayedCount, err := r.tracker.GetDelayedCount()
if err != nil {
log.Warn("error reading delayed count", "err", err)
hadError = true
}
if syncTillBlock > 0 && delayedCount >= syncTillBlock {
log.Info("stopping block creation in inbox reader", "syncTillBlock", syncTillBlock)
return
}
err = r.run(ctx, hadError)
if err != nil && !errors.Is(err, context.Canceled) && !strings.Contains(err.Error(), "header not found") {
log.Warn("error reading inbox", "err", err)
hadError = true
} else {
hadError = false
}
interval := time.Second
timer := time.NewTimer(interval)
select {
case <-ctx.Done():
timer.Stop()
return
case <-timer.C:
}
}
return time.Second
})

// Ensure we read the init message before other things start up
Expand Down
4 changes: 2 additions & 2 deletions arbnode/inbox_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,9 @@ func TestTransactionStreamer(t *testing.T) {

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
err := inbox.Start(ctx)
err := inbox.Start(ctx, 0)
Require(t, err)
exec.Start(ctx)
exec.Start(ctx, 0)

maxExpectedGasCost := big.NewInt(l2pricing.InitialBaseFeeWei)
maxExpectedGasCost.Mul(maxExpectedGasCost, big.NewInt(2100*2))
Expand Down
15 changes: 9 additions & 6 deletions arbnode/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ func GenerateRollupConfig(prod bool, wasmModuleRoot common.Hash, rollupOwner com

type Config struct {
Sequencer bool `koanf:"sequencer"`
SyncTillBlock uint64 `koanf:"sync-till-block"`
ParentChainReader headerreader.Config `koanf:"parent-chain-reader" reload:"hot"`
InboxReader InboxReaderConfig `koanf:"inbox-reader" reload:"hot"`
DelayedSequencer DelayedSequencerConfig `koanf:"delayed-sequencer" reload:"hot"`
Expand Down Expand Up @@ -145,6 +146,7 @@ func (c *Config) ValidatorRequired() bool {

func ConfigAddOptions(prefix string, f *flag.FlagSet, feedInputEnable bool, feedOutputEnable bool) {
f.Bool(prefix+".sequencer", ConfigDefault.Sequencer, "enable sequencer")
f.Uint64(prefix+".sync-till-block", ConfigDefault.SyncTillBlock, "sync till block")
headerreader.AddOptions(prefix+".parent-chain-reader", f)
InboxReaderConfigAddOptions(prefix+".inbox-reader", f)
DelayedSequencerConfigAddOptions(prefix+".delayed-sequencer", f)
Expand All @@ -163,6 +165,7 @@ func ConfigAddOptions(prefix string, f *flag.FlagSet, feedInputEnable bool, feed

var ConfigDefault = Config{
Sequencer: false,
SyncTillBlock: 0,
ParentChainReader: headerreader.DefaultConfig,
InboxReader: DefaultInboxReaderConfig,
DelayedSequencer: DefaultDelayedSequencerConfig,
Expand Down Expand Up @@ -839,7 +842,7 @@ func (n *Node) Start(ctx context.Context) error {
if execClient != nil {
execClient.SetConsensusClient(n)
}
err = n.Execution.Start(ctx)
err = n.Execution.Start(ctx, n.configFetcher.Get().SyncTillBlock)
if err != nil {
return fmt.Errorf("error starting exec client: %w", err)
}
Expand Down Expand Up @@ -869,12 +872,12 @@ func (n *Node) Start(ctx context.Context) error {
return fmt.Errorf("error populating feed backlog on startup: %w", err)
}
}
err = n.TxStreamer.Start(ctx)
err = n.TxStreamer.Start(ctx, n.configFetcher.Get().SyncTillBlock)
if err != nil {
return fmt.Errorf("error starting transaction streamer: %w", err)
}
if n.InboxReader != nil {
err = n.InboxReader.Start(ctx)
err = n.InboxReader.Start(ctx, n.configFetcher.Get().SyncTillBlock)
if err != nil {
return fmt.Errorf("error starting inbox reader: %w", err)
}
Expand All @@ -887,15 +890,15 @@ func (n *Node) Start(ctx context.Context) error {
}
}
if n.SeqCoordinator != nil {
n.SeqCoordinator.Start(ctx)
n.SeqCoordinator.Start(ctx, n.configFetcher.Get().SyncTillBlock)
} else {
n.Execution.Activate()
}
if n.MaintenanceRunner != nil {
n.MaintenanceRunner.Start(ctx)
}
if n.DelayedSequencer != nil {
n.DelayedSequencer.Start(ctx)
n.DelayedSequencer.Start(ctx, n.configFetcher.Get().SyncTillBlock)
}
if n.BatchPoster != nil {
n.BatchPoster.Start(ctx)
Expand Down Expand Up @@ -945,7 +948,7 @@ func (n *Node) Start(ctx context.Context) error {
return
}
}
n.BroadcastClients.Start(ctx)
n.BroadcastClients.Start(ctx, n.configFetcher.Get().SyncTillBlock)
}()
}
if n.configFetcher != nil {
Expand Down
30 changes: 28 additions & 2 deletions arbnode/seq_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -854,7 +854,7 @@ func (c *SeqCoordinator) launchHealthcheckServer(ctx context.Context) {
}
}

func (c *SeqCoordinator) Start(ctxIn context.Context) {
func (c *SeqCoordinator) Start(ctxIn context.Context, syncTillBlock uint64) {
c.StopWaiter.Start(ctxIn, c)
var newRedisCoordinator *redisutil.RedisCoordinator
if c.config.NewRedisUrl != "" {
Expand All @@ -865,7 +865,33 @@ func (c *SeqCoordinator) Start(ctxIn context.Context) {
err, "newRedisUrl", c.config.NewRedisUrl)
}
}
c.CallIteratively(func(ctx context.Context) time.Duration { return c.chooseRedisAndUpdate(ctx, newRedisCoordinator) })

c.LaunchThread(func(ctx context.Context) {
for {
count, err := c.streamer.GetMessageCount()
if err != nil {
log.Warn("failed to get message count", "err", err)
}
if syncTillBlock > 0 && uint64(count) >= syncTillBlock {
log.Info("stopping block creation in sequencer", "syncTillBlock", syncTillBlock)
return
}
interval := c.chooseRedisAndUpdate(ctx, newRedisCoordinator)
if ctx.Err() != nil {
return
}
if interval == time.Duration(0) {
continue
}
timer := time.NewTimer(interval)
select {
case <-ctx.Done():
timer.Stop()
return
case <-timer.C:
}
}
})
if c.config.ChosenHealthcheckAddr != "" {
c.StopWaiter.LaunchThread(c.launchHealthcheckServer)
}
Expand Down
29 changes: 27 additions & 2 deletions arbnode/transaction_streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1218,7 +1218,32 @@ func (s *TransactionStreamer) executeMessages(ctx context.Context, ignored struc
return s.config().ExecuteMessageLoopDelay
}

func (s *TransactionStreamer) Start(ctxIn context.Context) error {
func (s *TransactionStreamer) Start(ctxIn context.Context, syncTillBlock uint64) error {
s.StopWaiter.Start(ctxIn, s)
return stopwaiter.CallIterativelyWith[struct{}](&s.StopWaiterSafe, s.executeMessages, s.newMessageNotifier)
return s.LaunchThreadSafe(func(ctx context.Context) {
var defaultVal struct{}
var val struct{}
for {
if syncTillBlock > 0 && uint64(s.execLastMsgCount) >= syncTillBlock {
log.Info("stopping block creation in transaction streamer", "syncTillBlock", syncTillBlock)
return
}
interval := s.executeMessages(ctx, val)
if ctx.Err() != nil {
return
}
val = defaultVal
if interval == time.Duration(0) {
continue
}
timer := time.NewTimer(interval)
select {
case <-ctx.Done():
timer.Stop()
return
case <-timer.C:
case val = <-s.newMessageNotifier:
}
}
})
}
7 changes: 6 additions & 1 deletion broadcastclient/broadcastclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func NewBroadcastClient(
}, err
}

func (bc *BroadcastClient) Start(ctxIn context.Context) {
func (bc *BroadcastClient) Start(ctxIn context.Context, syncTillBlock uint64) {
bc.StopWaiter.Start(ctxIn, bc)
if bc.StopWaiter.Stopped() {
log.Info("broadcast client has already been stopped, not starting")
Expand All @@ -185,6 +185,11 @@ func (bc *BroadcastClient) Start(ctxIn context.Context) {
bc.LaunchThread(func(ctx context.Context) {
backoffDuration := bc.config().ReconnectInitialBackoff
for {

if syncTillBlock > 0 && uint64(bc.nextSeqNum) >= syncTillBlock {
log.Info("stopping block creation in broadcast client", "syncTillBlock", syncTillBlock)
return
}
earlyFrameData, err := bc.connect(ctx, bc.nextSeqNum)
if errors.Is(err, ErrMissingChainId) ||
errors.Is(err, ErrIncorrectChainId) ||
Expand Down
20 changes: 10 additions & 10 deletions broadcastclient/broadcastclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func TestInvalidSignature(t *testing.T) {
&badSequencerAddr,
)
Require(t, err)
broadcastClient.Start(ctx)
broadcastClient.Start(ctx, 0)

go func() {
for i := 0; i < messageCount; i++ {
Expand Down Expand Up @@ -227,7 +227,7 @@ func startMakeBroadcastClient(ctx context.Context, t *testing.T, clientConfig Co
sequencerAddr,
)
Require(t, err)
broadcastClient.Start(ctx)
broadcastClient.Start(ctx, 0)
messageCount := 0

wg.Add(1)
Expand Down Expand Up @@ -315,7 +315,7 @@ func TestServerClientDisconnect(t *testing.T) {
&sequencerAddr,
)
Require(t, err)
broadcastClient.Start(ctx)
broadcastClient.Start(ctx, 0)

t.Log("broadcasting seq 0 message")
Require(t, b.BroadcastSingle(arbostypes.EmptyTestMessageWithMetadata, 0, nil))
Expand Down Expand Up @@ -386,7 +386,7 @@ func TestBroadcastClientConfirmedMessage(t *testing.T) {
&sequencerAddr,
)
Require(t, err)
broadcastClient.Start(ctx)
broadcastClient.Start(ctx, 0)

t.Log("broadcasting seq 0 message")
Require(t, b.BroadcastSingle(arbostypes.EmptyTestMessageWithMetadata, 0, nil))
Expand Down Expand Up @@ -458,7 +458,7 @@ func TestServerIncorrectChainId(t *testing.T) {
&sequencerAddr,
)
Require(t, err)
badBroadcastClient.Start(ctx)
badBroadcastClient.Start(ctx, 0)
badTimer := time.NewTimer(5 * time.Second)
select {
case err := <-feedErrChan:
Expand Down Expand Up @@ -517,7 +517,7 @@ func TestServerMissingChainId(t *testing.T) {
&sequencerAddr,
)
Require(t, err)
badBroadcastClient.Start(ctx)
badBroadcastClient.Start(ctx, 0)
badTimer := time.NewTimer(5 * time.Second)
select {
case err := <-feedErrChan:
Expand Down Expand Up @@ -574,7 +574,7 @@ func TestServerIncorrectFeedServerVersion(t *testing.T) {
&sequencerAddr,
)
Require(t, err)
badBroadcastClient.Start(ctx)
badBroadcastClient.Start(ctx, 0)
badTimer := time.NewTimer(5 * time.Second)
select {
case err := <-feedErrChan:
Expand Down Expand Up @@ -633,7 +633,7 @@ func TestServerMissingFeedServerVersion(t *testing.T) {
&sequencerAddr,
)
Require(t, err)
badBroadcastClient.Start(ctx)
badBroadcastClient.Start(ctx, 0)
badTimer := time.NewTimer(5 * time.Second)
select {
case err := <-feedErrChan:
Expand Down Expand Up @@ -684,7 +684,7 @@ func TestBroadcastClientReconnectsOnServerDisconnect(t *testing.T) {
&sequencerAddr,
)
Require(t, err)
broadcastClient.Start(ctx)
broadcastClient.Start(ctx, 0)
defer broadcastClient.StopAndWait()

// Client set to timeout connection at 200 milliseconds, and server set to send ping every 50 seconds,
Expand Down Expand Up @@ -796,7 +796,7 @@ func connectAndGetCachedMessages(ctx context.Context, addr net.Addr, chainId uin
sequencerAddr,
)
Require(t, err)
broadcastClient.Start(ctx)
broadcastClient.Start(ctx, 0)

go func() {
defer wg.Done()
Expand Down
Loading

0 comments on commit 8a13df5

Please sign in to comment.