From 5accbc3863fccfc5b7178ec8f70e300b95a4eb7e Mon Sep 17 00:00:00 2001 From: Lee Bousfield Date: Wed, 12 Jul 2023 10:14:19 -0600 Subject: [PATCH 1/7] Cache the start key for message pruning --- arbnode/message_pruner.go | 46 +++++++++++++++++++++------------- arbnode/message_pruner_test.go | 38 +++++++++++++++------------- 2 files changed, 49 insertions(+), 35 deletions(-) diff --git a/arbnode/message_pruner.go b/arbnode/message_pruner.go index aeee07ca73..a0aa86050f 100644 --- a/arbnode/message_pruner.go +++ b/arbnode/message_pruner.go @@ -23,11 +23,13 @@ import ( type MessagePruner struct { stopwaiter.StopWaiter - transactionStreamer *TransactionStreamer - inboxTracker *InboxTracker - config MessagePrunerConfigFetcher - pruningLock sync.Mutex - lastPruneDone time.Time + transactionStreamer *TransactionStreamer + inboxTracker *InboxTracker + config MessagePrunerConfigFetcher + pruningLock sync.Mutex + lastPruneDone time.Time + cachedPrunedMessages uint64 + cachedPrunedDelayedMessages uint64 } type MessagePrunerConfig struct { @@ -108,11 +110,11 @@ func (m *MessagePruner) prune(ctx context.Context, count arbutil.MessageIndex, g msgCount := endBatchMetadata.MessageCount delayedCount := endBatchMetadata.DelayedMessageCount - return deleteOldMessageFromDB(ctx, msgCount, delayedCount, m.inboxTracker.db, m.transactionStreamer.db) + return m.deleteOldMessagesFromDB(ctx, msgCount, delayedCount) } -func deleteOldMessageFromDB(ctx context.Context, messageCount arbutil.MessageIndex, delayedMessageCount uint64, inboxTrackerDb ethdb.Database, transactionStreamerDb ethdb.Database) error { - prunedKeysRange, err := deleteFromLastPrunedUptoEndKey(ctx, transactionStreamerDb, messagePrefix, uint64(messageCount)) +func (m *MessagePruner) deleteOldMessagesFromDB(ctx context.Context, messageCount arbutil.MessageIndex, delayedMessageCount uint64) error { + prunedKeysRange, err := deleteFromLastPrunedUptoEndKey(ctx, m.transactionStreamer.db, messagePrefix, &m.cachedPrunedMessages, uint64(messageCount)) if err != nil { return fmt.Errorf("error deleting last batch messages: %w", err) } @@ -120,7 +122,7 @@ func deleteOldMessageFromDB(ctx context.Context, messageCount arbutil.MessageInd log.Info("Pruned last batch messages:", "first pruned key", prunedKeysRange[0], "last pruned key", prunedKeysRange[len(prunedKeysRange)-1]) } - prunedKeysRange, err = deleteFromLastPrunedUptoEndKey(ctx, inboxTrackerDb, rlpDelayedMessagePrefix, delayedMessageCount) + prunedKeysRange, err = deleteFromLastPrunedUptoEndKey(ctx, m.inboxTracker.db, rlpDelayedMessagePrefix, &m.cachedPrunedDelayedMessages, delayedMessageCount) if err != nil { return fmt.Errorf("error deleting last batch delayed messages: %w", err) } @@ -130,15 +132,25 @@ func deleteOldMessageFromDB(ctx context.Context, messageCount arbutil.MessageInd return nil } -func deleteFromLastPrunedUptoEndKey(ctx context.Context, db ethdb.Database, prefix []byte, endMinKey uint64) ([]uint64, error) { - startIter := db.NewIterator(prefix, uint64ToKey(1)) - if !startIter.Next() { +// deleteFromLastPrunedUptoEndKey is similar to deleteFromRange but automatically populates the start key +// cachedStartMinKey must not be nil. It's set to the new start key at the end of this function if successful. +func deleteFromLastPrunedUptoEndKey(ctx context.Context, db ethdb.Database, prefix []byte, cachedStartMinKey *uint64, endMinKey uint64) ([]uint64, error) { + startMinKey := *cachedStartMinKey + if startMinKey == 0 { + startIter := db.NewIterator(prefix, uint64ToKey(1)) + if !startIter.Next() { + return nil, nil + } + startMinKey = binary.BigEndian.Uint64(bytes.TrimPrefix(startIter.Key(), prefix)) + startIter.Release() + } + if endMinKey <= startMinKey { + *cachedStartMinKey = startMinKey return nil, nil } - startMinKey := binary.BigEndian.Uint64(bytes.TrimPrefix(startIter.Key(), prefix)) - startIter.Release() - if endMinKey > startMinKey { - return deleteFromRange(ctx, db, prefix, startMinKey, endMinKey-1) + keys, err := deleteFromRange(ctx, db, prefix, startMinKey, endMinKey-1) + if err == nil { + *cachedStartMinKey = endMinKey - 1 } - return nil, nil + return keys, err } diff --git a/arbnode/message_pruner_test.go b/arbnode/message_pruner_test.go index c0cb2cb4fe..0212ed2364 100644 --- a/arbnode/message_pruner_test.go +++ b/arbnode/message_pruner_test.go @@ -17,8 +17,8 @@ func TestMessagePrunerWithPruningEligibleMessagePresent(t *testing.T) { defer cancel() messagesCount := uint64(2 * 100 * 1024) - inboxTrackerDb, transactionStreamerDb := setupDatabase(t, 2*100*1024, 2*100*1024) - err := deleteOldMessageFromDB(ctx, arbutil.MessageIndex(messagesCount), messagesCount, inboxTrackerDb, transactionStreamerDb) + inboxTrackerDb, transactionStreamerDb, pruner := setupDatabase(t, 2*100*1024, 2*100*1024) + err := pruner.deleteOldMessagesFromDB(ctx, arbutil.MessageIndex(messagesCount), messagesCount) Require(t, err) checkDbKeys(t, messagesCount, transactionStreamerDb, messagePrefix) @@ -26,22 +26,21 @@ func TestMessagePrunerWithPruningEligibleMessagePresent(t *testing.T) { } -func TestMessagePrunerTraverseEachMessageOnlyOnce(t *testing.T) { +func TestMessagePrunerTwoHalves(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() messagesCount := uint64(10) - inboxTrackerDb, transactionStreamerDb := setupDatabase(t, messagesCount, messagesCount) - // In first iteration message till messagesCount are tried to be deleted. - err := deleteOldMessageFromDB(ctx, arbutil.MessageIndex(messagesCount), messagesCount, inboxTrackerDb, transactionStreamerDb) + _, transactionStreamerDb, pruner := setupDatabase(t, messagesCount, messagesCount) + // In first iteration message till messagesCount/2 are tried to be deleted. + err := pruner.deleteOldMessagesFromDB(ctx, arbutil.MessageIndex(messagesCount/2), messagesCount/2) Require(t, err) - // After first iteration messagesCount/2 is reinserted in inbox db - err = inboxTrackerDb.Put(dbKey(messagePrefix, messagesCount/2), []byte{}) + // In first iteration all the message till messagesCount/2 are deleted. + checkDbKeys(t, messagesCount/2, transactionStreamerDb, messagePrefix) + // In second iteration message till messagesCount are tried to be deleted. + err = pruner.deleteOldMessagesFromDB(ctx, arbutil.MessageIndex(messagesCount), messagesCount) Require(t, err) - // In second iteration message till messagesCount are again tried to be deleted. - err = deleteOldMessageFromDB(ctx, arbutil.MessageIndex(messagesCount), messagesCount, inboxTrackerDb, transactionStreamerDb) - Require(t, err) - // In second iteration all the message till messagesCount are deleted again. + // In second iteration all the message till messagesCount are deleted. checkDbKeys(t, messagesCount, transactionStreamerDb, messagePrefix) } @@ -50,10 +49,10 @@ func TestMessagePrunerPruneTillLessThenEqualTo(t *testing.T) { defer cancel() messagesCount := uint64(10) - inboxTrackerDb, transactionStreamerDb := setupDatabase(t, 2*messagesCount, 20) + inboxTrackerDb, transactionStreamerDb, pruner := setupDatabase(t, 2*messagesCount, 20) err := inboxTrackerDb.Delete(dbKey(messagePrefix, 9)) Require(t, err) - err = deleteOldMessageFromDB(ctx, arbutil.MessageIndex(messagesCount), messagesCount, inboxTrackerDb, transactionStreamerDb) + err = pruner.deleteOldMessagesFromDB(ctx, arbutil.MessageIndex(messagesCount), messagesCount) Require(t, err) hasKey, err := transactionStreamerDb.Has(dbKey(messagePrefix, messagesCount)) Require(t, err) @@ -67,8 +66,8 @@ func TestMessagePrunerWithNoPruningEligibleMessagePresent(t *testing.T) { defer cancel() messagesCount := uint64(10) - inboxTrackerDb, transactionStreamerDb := setupDatabase(t, messagesCount, messagesCount) - err := deleteOldMessageFromDB(ctx, arbutil.MessageIndex(messagesCount), messagesCount, inboxTrackerDb, transactionStreamerDb) + inboxTrackerDb, transactionStreamerDb, pruner := setupDatabase(t, messagesCount, messagesCount) + err := pruner.deleteOldMessagesFromDB(ctx, arbutil.MessageIndex(messagesCount), messagesCount) Require(t, err) checkDbKeys(t, uint64(messagesCount), transactionStreamerDb, messagePrefix) @@ -76,7 +75,7 @@ func TestMessagePrunerWithNoPruningEligibleMessagePresent(t *testing.T) { } -func setupDatabase(t *testing.T, messageCount, delayedMessageCount uint64) (ethdb.Database, ethdb.Database) { +func setupDatabase(t *testing.T, messageCount, delayedMessageCount uint64) (ethdb.Database, ethdb.Database, *MessagePruner) { transactionStreamerDb := rawdb.NewMemoryDatabase() for i := uint64(0); i < uint64(messageCount); i++ { @@ -90,7 +89,10 @@ func setupDatabase(t *testing.T, messageCount, delayedMessageCount uint64) (ethd Require(t, err) } - return inboxTrackerDb, transactionStreamerDb + return inboxTrackerDb, transactionStreamerDb, &MessagePruner{ + transactionStreamer: &TransactionStreamer{db: transactionStreamerDb}, + inboxTracker: &InboxTracker{db: inboxTrackerDb}, + } } func checkDbKeys(t *testing.T, endCount uint64, db ethdb.Database, prefix []byte) { From 65a852972ff9a0b50446815f7f6ae15e58005ceb Mon Sep 17 00:00:00 2001 From: Lee Bousfield Date: Wed, 12 Jul 2023 10:49:03 -0600 Subject: [PATCH 2/7] Use latest confirmed instead of latest staked for message pruner --- arbnode/message_pruner.go | 2 +- arbnode/node.go | 6 +- staker/staker.go | 102 +++++++++++++++++++++------- staker/stateless_block_validator.go | 1 + system_tests/staker_test.go | 3 + util/headerreader/header_reader.go | 4 ++ 6 files changed, 91 insertions(+), 27 deletions(-) diff --git a/arbnode/message_pruner.go b/arbnode/message_pruner.go index a0aa86050f..b469ecdbef 100644 --- a/arbnode/message_pruner.go +++ b/arbnode/message_pruner.go @@ -64,7 +64,7 @@ func (m *MessagePruner) Start(ctxIn context.Context) { m.StopWaiter.Start(ctxIn, m) } -func (m *MessagePruner) UpdateLatestStaked(count arbutil.MessageIndex, globalState validator.GoGlobalState) { +func (m *MessagePruner) UpdateLatestConfirmed(count arbutil.MessageIndex, globalState validator.GoGlobalState) { locked := m.pruningLock.TryLock() if !locked { return diff --git a/arbnode/node.go b/arbnode/node.go index 8a4f38f28c..bd5605346b 100644 --- a/arbnode/node.go +++ b/arbnode/node.go @@ -825,13 +825,13 @@ func createNodeImpl( } } - notifiers := make([]staker.LatestStakedNotifier, 0) + var confirmedNotifiers []staker.LatestConfirmedNotifier if config.MessagePruner.Enable && !config.Caching.Archive { messagePruner = NewMessagePruner(txStreamer, inboxTracker, func() *MessagePrunerConfig { return &configFetcher.Get().MessagePruner }) - notifiers = append(notifiers, messagePruner) + confirmedNotifiers = append(confirmedNotifiers, messagePruner) } - stakerObj, err = staker.NewStaker(l1Reader, wallet, bind.CallOpts{}, config.Staker, blockValidator, statelessBlockValidator, notifiers, deployInfo.ValidatorUtils, fatalErrChan) + stakerObj, err = staker.NewStaker(l1Reader, wallet, bind.CallOpts{}, config.Staker, blockValidator, statelessBlockValidator, nil, confirmedNotifiers, deployInfo.ValidatorUtils, fatalErrChan) if err != nil { return nil, err } diff --git a/staker/staker.go b/staker/staker.go index 09a05daad2..f360a60a7d 100644 --- a/staker/staker.go +++ b/staker/staker.go @@ -17,6 +17,7 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" + "github.com/ethereum/go-ethereum/rpc" flag "github.com/spf13/pflag" "github.com/offchainlabs/nitro/arbutil" @@ -30,6 +31,7 @@ var ( stakerBalanceGauge = metrics.NewRegisteredGaugeFloat64("arb/staker/balance", nil) stakerAmountStakedGauge = metrics.NewRegisteredGauge("arb/staker/amount_staked", nil) stakerLatestStakedNodeGauge = metrics.NewRegisteredGauge("arb/staker/staked_node", nil) + stakerLatestConfirmedNodeGauge = metrics.NewRegisteredGauge("arb/staker/confirmed_node", nil) stakerLastSuccessfulActionGauge = metrics.NewRegisteredGauge("arb/staker/action/last_success", nil) stakerActionSuccessCounter = metrics.NewRegisteredCounter("arb/staker/action/success", nil) stakerActionFailureCounter = metrics.NewRegisteredCounter("arb/staker/action/failure", nil) @@ -195,11 +197,16 @@ type LatestStakedNotifier interface { UpdateLatestStaked(count arbutil.MessageIndex, globalState validator.GoGlobalState) } +type LatestConfirmedNotifier interface { + UpdateLatestConfirmed(count arbutil.MessageIndex, globalState validator.GoGlobalState) +} + type Staker struct { *L1Validator stopwaiter.StopWaiter l1Reader L1ReaderInterface - notifiers []LatestStakedNotifier + stakedNotifiers []LatestStakedNotifier + confirmedNotifiers []LatestConfirmedNotifier activeChallenge *ChallengeManager baseCallOpts bind.CallOpts config L1ValidatorConfig @@ -219,7 +226,8 @@ func NewStaker( config L1ValidatorConfig, blockValidator *BlockValidator, statelessBlockValidator *StatelessBlockValidator, - notifiers []LatestStakedNotifier, + stakedNotifiers []LatestStakedNotifier, + confirmedNotifiers []LatestConfirmedNotifier, validatorUtilsAddress common.Address, fatalErr chan<- error, ) (*Staker, error) { @@ -235,12 +243,13 @@ func NewStaker( } stakerLastSuccessfulActionGauge.Update(time.Now().Unix()) if config.StartFromStaked { - notifiers = append(notifiers, blockValidator) + stakedNotifiers = append(stakedNotifiers, blockValidator) } return &Staker{ L1Validator: val, l1Reader: l1Reader, - notifiers: notifiers, + stakedNotifiers: stakedNotifiers, + confirmedNotifiers: confirmedNotifiers, baseCallOpts: callOpts, config: config, highGasBlocksBuffer: big.NewInt(config.L1PostingStrategy.HighGasDelayBlocks), @@ -280,8 +289,42 @@ func (s *Staker) Initialize(ctx context.Context) error { return nil } +func (s *Staker) latestNodeDetailsForUpdate(ctx context.Context, description string, node uint64) (arbutil.MessageIndex, *validator.GoGlobalState, error) { + stakedInfo, err := s.rollup.LookupNode(ctx, node) + if err != nil { + return 0, nil, fmt.Errorf("couldn't look up latest %v assertion %v: %w", description, node, err) + } + + globalState := stakedInfo.AfterState().GlobalState + caughtUp, count, err := GlobalStateToMsgCount(s.inboxTracker, s.txStreamer, globalState) + if err != nil { + if errors.Is(err, ErrGlobalStateNotInChain) && s.fatalErr != nil { + fatal := fmt.Errorf("latest %v assertion %v not in chain: %w", description, node, err) + s.fatalErr <- fatal + } + return 0, nil, fmt.Errorf("latest %v assertion %v: %w", description, node, err) + } + + if !caughtUp { + log.Info(fmt.Sprintf("latest %v assertion not yet in our node", description), "assertion", node, "state", globalState) + return 0, nil, nil + } + + processedCount, err := s.txStreamer.GetProcessedMessageCount() + if err != nil { + return 0, nil, err + } + + if processedCount < count { + log.Info("execution catching up to rollup", "lookingFor", description, "rollupCount", count, "processedCount", processedCount) + return 0, nil, nil + } + + return count, &globalState, nil +} + func (s *Staker) checkLatestStaked(ctx context.Context) error { - latestStaked, _, err := s.validatorUtils.LatestStaked(&s.baseCallOpts, s.rollupAddress, s.wallet.AddressOrZero()) + latestStaked, _, err := s.validatorUtils.LatestStaked(s.getCallOpts(ctx), s.rollupAddress, s.wallet.AddressOrZero()) if err != nil { return fmt.Errorf("couldn't get LatestStaked: %w", err) } @@ -290,38 +333,44 @@ func (s *Staker) checkLatestStaked(ctx context.Context) error { return nil } - stakedInfo, err := s.rollup.LookupNode(ctx, latestStaked) + count, globalState, err := s.latestNodeDetailsForUpdate(ctx, "staked", latestStaked) if err != nil { - return fmt.Errorf("couldn't look up latest node: %w", err) + return err + } + if globalState == nil { + return nil } - stakedGlobalState := stakedInfo.AfterState().GlobalState - caughtUp, count, err := GlobalStateToMsgCount(s.inboxTracker, s.txStreamer, stakedGlobalState) - if err != nil { - if errors.Is(err, ErrGlobalStateNotInChain) && s.fatalErr != nil { - fatal := fmt.Errorf("latest staked not in chain: %w", err) - s.fatalErr <- fatal - } - return fmt.Errorf("staker: latest staked %w", err) + for _, notifier := range s.stakedNotifiers { + notifier.UpdateLatestStaked(count, *globalState) } + return nil +} - if !caughtUp { - log.Info("latest valid not yet in our node", "staked", stakedGlobalState) +func (s *Staker) checkLatestConfirmed(ctx context.Context) error { + callOpts := s.getCallOpts(ctx) + if s.l1Reader.UseFinalityData() { + callOpts.BlockNumber = big.NewInt(int64(rpc.FinalizedBlockNumber)) + } + latestConfirmed, err := s.rollup.LatestConfirmed(callOpts) + if err != nil { + return fmt.Errorf("couldn't get LatestConfirmed: %w", err) + } + stakerLatestConfirmedNodeGauge.Update(int64(latestConfirmed)) + if latestConfirmed == 0 { return nil } - processedCount, err := s.txStreamer.GetProcessedMessageCount() + count, globalState, err := s.latestNodeDetailsForUpdate(ctx, "confirmed", latestConfirmed) if err != nil { return err } - - if processedCount < count { - log.Info("execution catching up to last validated", "validatedCount", count, "processedCount", processedCount) + if globalState == nil { return nil } - for _, notifier := range s.notifiers { - notifier.UpdateLatestStaked(count, stakedGlobalState) + for _, notifier := range s.confirmedNotifiers { + notifier.UpdateLatestConfirmed(count, *globalState) } return nil } @@ -387,6 +436,13 @@ func (s *Staker) Start(ctxIn context.Context) { } return s.config.StakerInterval }) + s.CallIteratively(func(ctx context.Context) time.Duration { + err := s.checkLatestConfirmed(ctx) + if err != nil && ctx.Err() == nil { + log.Error("staker: error checking latest confirmed", "err", err) + } + return s.config.StakerInterval + }) } func (s *Staker) IsWhitelisted(ctx context.Context) (bool, error) { diff --git a/staker/stateless_block_validator.go b/staker/stateless_block_validator.go index 0242daa3c7..7add3e258d 100644 --- a/staker/stateless_block_validator.go +++ b/staker/stateless_block_validator.go @@ -84,6 +84,7 @@ type L1ReaderInterface interface { Client() arbutil.L1Interface Subscribe(bool) (<-chan *types.Header, func()) WaitForTxApproval(ctx context.Context, tx *types.Transaction) (*types.Receipt, error) + UseFinalityData() bool } type GlobalStatePosition struct { diff --git a/system_tests/staker_test.go b/system_tests/staker_test.go index 7a3ae41814..8a8ef29bf3 100644 --- a/system_tests/staker_test.go +++ b/system_tests/staker_test.go @@ -166,6 +166,7 @@ func stakerTestImpl(t *testing.T, faultyStaker bool, honestStakerInactive bool) nil, statelessA, nil, + nil, l2nodeA.DeployInfo.ValidatorUtils, nil, ) @@ -201,6 +202,7 @@ func stakerTestImpl(t *testing.T, faultyStaker bool, honestStakerInactive bool) nil, statelessB, nil, + nil, l2nodeB.DeployInfo.ValidatorUtils, nil, ) @@ -223,6 +225,7 @@ func stakerTestImpl(t *testing.T, faultyStaker bool, honestStakerInactive bool) nil, statelessA, nil, + nil, l2nodeA.DeployInfo.ValidatorUtils, nil, ) diff --git a/util/headerreader/header_reader.go b/util/headerreader/header_reader.go index f25fed02d9..1d52e8f78d 100644 --- a/util/headerreader/header_reader.go +++ b/util/headerreader/header_reader.go @@ -434,6 +434,10 @@ func (s *HeaderReader) Client() arbutil.L1Interface { return s.client } +func (s *HeaderReader) UseFinalityData() bool { + return s.config().UseFinalityData +} + func (s *HeaderReader) Start(ctxIn context.Context) { s.StopWaiter.Start(ctxIn, s) s.LaunchThread(s.broadcastLoop) From 3eb03409fde8da61b14b287dcebdef8f4fae21f4 Mon Sep 17 00:00:00 2001 From: Nodar Date: Thu, 13 Jul 2023 14:39:31 +0200 Subject: [PATCH 3/7] Run pprof on a separate port than metrics --- cmd/genericconf/server.go | 5 ++++- cmd/nitro/nitro.go | 41 ++++++++++++++++++++++++--------------- 2 files changed, 29 insertions(+), 17 deletions(-) diff --git a/cmd/genericconf/server.go b/cmd/genericconf/server.go index 17c4a7a872..88d0750e1c 100644 --- a/cmd/genericconf/server.go +++ b/cmd/genericconf/server.go @@ -190,13 +190,15 @@ type MetricsServerConfig struct { Addr string `koanf:"addr"` Port int `koanf:"port"` Pprof bool `koanf:"pprof"` + PprofPort int `koanf:"pprof-port"` UpdateInterval time.Duration `koanf:"update-interval"` } var MetricsServerConfigDefault = MetricsServerConfig{ Addr: "127.0.0.1", Port: 6070, - Pprof: false, + Pprof: true, + PprofPort: 6071, UpdateInterval: 3 * time.Second, } @@ -204,5 +206,6 @@ func MetricsServerAddOptions(prefix string, f *flag.FlagSet) { f.String(prefix+".addr", MetricsServerConfigDefault.Addr, "metrics server address") f.Int(prefix+".port", MetricsServerConfigDefault.Port, "metrics server port") f.Bool(prefix+".pprof", MetricsServerConfigDefault.Pprof, "enable profiling for Go") + f.Int(prefix+".pprof-port", MetricsServerConfigDefault.PprofPort, "pprof server port") f.Duration(prefix+".update-interval", MetricsServerConfigDefault.UpdateInterval, "metrics server update interval") } diff --git a/cmd/nitro/nitro.go b/cmd/nitro/nitro.go index 97fa67512e..863c563a06 100644 --- a/cmd/nitro/nitro.go +++ b/cmd/nitro/nitro.go @@ -122,6 +122,30 @@ func main() { os.Exit(mainImpl()) } +// Runs metrics server at address:port specified by config if metrics flag is +// enabled. Additionally if pprof at specified pprof port when it's enabled. +func mustRunMetrics(cfg *NodeConfig) { + if !cfg.Metrics { + if cfg.MetricsServer.Pprof { + log.Warn("Metrics must be enabled in order to use pprof with the metrics server") + } + log.Debug("Metrics are disabled") + return + } + if cfg.MetricsServer.Addr == "" { + log.Crit("Metrics are enabled but server address is not specified") + } + go metrics.CollectProcessMetrics(cfg.MetricsServer.UpdateInterval) + + if cfg.MetricsServer.Pprof { + if cfg.MetricsServer.Port == cfg.MetricsServer.PprofPort { + log.Crit("Cannot use same port for metrics and pprof servers", "port", cfg.MetricsServer.Port) + } + genericconf.StartPprof(fmt.Sprintf("%v:%v", cfg.MetricsServer.Addr, cfg.MetricsServer.PprofPort)) + } + exp.Setup(fmt.Sprintf("%v:%v", cfg.MetricsServer.Addr, cfg.MetricsServer.Port)) +} + // Returns the exit code func mainImpl() int { ctx, cancelFunc := context.WithCancel(context.Background()) @@ -379,22 +403,7 @@ func mainImpl() int { return 1 } - if nodeConfig.Metrics { - go metrics.CollectProcessMetrics(nodeConfig.MetricsServer.UpdateInterval) - - if nodeConfig.MetricsServer.Addr != "" { - address := fmt.Sprintf("%v:%v", nodeConfig.MetricsServer.Addr, nodeConfig.MetricsServer.Port) - if nodeConfig.MetricsServer.Pprof { - genericconf.StartPprof(address) - } else { - exp.Setup(address) - } - } - } else if nodeConfig.MetricsServer.Pprof { - flag.Usage() - log.Error("--metrics must be enabled in order to use pprof with the metrics server") - return 1 - } + mustRunMetrics(nodeConfig) fatalErrChan := make(chan error, 10) From c0019495c74b33d2c2610673b07850d9a312a049 Mon Sep 17 00:00:00 2001 From: Nodar Date: Thu, 13 Jul 2023 20:11:35 +0200 Subject: [PATCH 4/7] Separate PProf flag from metrics flag, disable by default --- cmd/daserver/daserver.go | 25 ++++++++++++++++++++++++ cmd/genericconf/server.go | 21 ++++++++++++++------ cmd/nitro/nitro.go | 41 ++++++++++++++++++++------------------- cmd/relay/relay.go | 28 +++++++++++++++++++------- relay/relay.go | 6 ++++++ 5 files changed, 88 insertions(+), 33 deletions(-) diff --git a/cmd/daserver/daserver.go b/cmd/daserver/daserver.go index 7b6b504e40..059c19d7ca 100644 --- a/cmd/daserver/daserver.go +++ b/cmd/daserver/daserver.go @@ -45,6 +45,8 @@ type DAServerConfig struct { Metrics bool `koanf:"metrics"` MetricsServer genericconf.MetricsServerConfig `koanf:"metrics-server"` + PProf bool `koanf:"pprof"` + PprofCfg genericconf.PProf `koanf:"pprof-cfg"` } var DefaultDAServerConfig = DAServerConfig{ @@ -60,6 +62,8 @@ var DefaultDAServerConfig = DAServerConfig{ ConfConfig: genericconf.ConfConfigDefault, Metrics: false, MetricsServer: genericconf.MetricsServerConfigDefault, + PProf: false, + PprofCfg: genericconf.PProfDefault, LogLevel: 3, } @@ -89,6 +93,9 @@ func parseDAServer(args []string) (*DAServerConfig, error) { f.Bool("metrics", DefaultDAServerConfig.Metrics, "enable metrics") genericconf.MetricsServerAddOptions("metrics-server", f) + f.Bool("pprof", DefaultDAServerConfig.PProf, "enable pprof") + genericconf.PProfAddOptions("pprof-cfg", f) + f.Int("log-level", int(log.LvlInfo), "log level; 1: ERROR, 2: WARN, 3: INFO, 4: DEBUG, 5: TRACE") das.DataAvailabilityConfigAddDaserverOptions("data-availability", f) genericconf.ConfConfigAddOptions("conf", f) @@ -135,6 +142,24 @@ func (c *L1ReaderCloser) String() string { return "l1 reader closer" } +// Checks metrics and PProf flag, runs them if enabled. +// Note: they are separate so one can enable/disable them as they wish, the only +// requirement is that they can't run on the same address and port. +func mustRunMetrics(cfg *DAServerConfig) { + mAddr := fmt.Sprintf("%v:%v", cfg.MetricsServer.Addr, cfg.MetricsServer.Port) + pAddr := fmt.Sprintf("%v:%v", cfg.PprofCfg.Addr, cfg.PprofCfg.Port) + if cfg.Metrics && cfg.PProf && mAddr == pAddr { + log.Crit("Metrics and pprof cannot be enabled on the same address:port", "addr", mAddr) + } + if cfg.Metrics { + go metrics.CollectProcessMetrics(cfg.MetricsServer.UpdateInterval) + exp.Setup(fmt.Sprintf("%v:%v", cfg.MetricsServer.Addr, cfg.MetricsServer.Port)) + } + if cfg.PProf { + genericconf.StartPprof(pAddr) + } +} + func startup() error { // Some different defaults to DAS config in a node. das.DefaultDataAvailabilityConfig.Enable = true diff --git a/cmd/genericconf/server.go b/cmd/genericconf/server.go index 88d0750e1c..b99429191e 100644 --- a/cmd/genericconf/server.go +++ b/cmd/genericconf/server.go @@ -189,23 +189,32 @@ func AuthRPCConfigAddOptions(prefix string, f *flag.FlagSet) { type MetricsServerConfig struct { Addr string `koanf:"addr"` Port int `koanf:"port"` - Pprof bool `koanf:"pprof"` - PprofPort int `koanf:"pprof-port"` UpdateInterval time.Duration `koanf:"update-interval"` } var MetricsServerConfigDefault = MetricsServerConfig{ Addr: "127.0.0.1", Port: 6070, - Pprof: true, - PprofPort: 6071, UpdateInterval: 3 * time.Second, } +type PProf struct { + Addr string `koanf:"addr"` + Port int `koanf:"port"` +} + +var PProfDefault = PProf{ + Addr: "127.0.0.1", + Port: 6071, +} + func MetricsServerAddOptions(prefix string, f *flag.FlagSet) { f.String(prefix+".addr", MetricsServerConfigDefault.Addr, "metrics server address") f.Int(prefix+".port", MetricsServerConfigDefault.Port, "metrics server port") - f.Bool(prefix+".pprof", MetricsServerConfigDefault.Pprof, "enable profiling for Go") - f.Int(prefix+".pprof-port", MetricsServerConfigDefault.PprofPort, "pprof server port") f.Duration(prefix+".update-interval", MetricsServerConfigDefault.UpdateInterval, "metrics server update interval") } + +func PProfAddOptions(prefix string, f *flag.FlagSet) { + f.String(prefix+".addr", PProfDefault.Addr, "pprof server address") + f.Int(prefix+".port", PProfDefault.Port, "pprof server port") +} diff --git a/cmd/nitro/nitro.go b/cmd/nitro/nitro.go index 863c563a06..0f3a65db44 100644 --- a/cmd/nitro/nitro.go +++ b/cmd/nitro/nitro.go @@ -122,28 +122,22 @@ func main() { os.Exit(mainImpl()) } -// Runs metrics server at address:port specified by config if metrics flag is -// enabled. Additionally if pprof at specified pprof port when it's enabled. +// Checks metrics and PProf flag, runs them if enabled. +// Note: they are separate so one can enable/disable them as they wish, the only +// requirement is that they can't run on the same address and port. func mustRunMetrics(cfg *NodeConfig) { - if !cfg.Metrics { - if cfg.MetricsServer.Pprof { - log.Warn("Metrics must be enabled in order to use pprof with the metrics server") - } - log.Debug("Metrics are disabled") - return + mAddr := fmt.Sprintf("%v:%v", cfg.MetricsServer.Addr, cfg.MetricsServer.Port) + pAddr := fmt.Sprintf("%v:%v", cfg.PprofCfg.Addr, cfg.PprofCfg.Port) + if cfg.Metrics && cfg.PProf && mAddr == pAddr { + log.Crit("Metrics and pprof cannot be enabled on the same address:port", "addr", mAddr) } - if cfg.MetricsServer.Addr == "" { - log.Crit("Metrics are enabled but server address is not specified") + if cfg.Metrics { + go metrics.CollectProcessMetrics(cfg.MetricsServer.UpdateInterval) + exp.Setup(fmt.Sprintf("%v:%v", cfg.MetricsServer.Addr, cfg.MetricsServer.Port)) } - go metrics.CollectProcessMetrics(cfg.MetricsServer.UpdateInterval) - - if cfg.MetricsServer.Pprof { - if cfg.MetricsServer.Port == cfg.MetricsServer.PprofPort { - log.Crit("Cannot use same port for metrics and pprof servers", "port", cfg.MetricsServer.Port) - } - genericconf.StartPprof(fmt.Sprintf("%v:%v", cfg.MetricsServer.Addr, cfg.MetricsServer.PprofPort)) + if cfg.PProf { + genericconf.StartPprof(pAddr) } - exp.Setup(fmt.Sprintf("%v:%v", cfg.MetricsServer.Addr, cfg.MetricsServer.Port)) } // Returns the exit code @@ -374,6 +368,8 @@ func mainImpl() int { } } + mustRunMetrics(nodeConfig) + chainDb, l2BlockChain, err := openInitializeChainDb(ctx, stack, nodeConfig, new(big.Int).SetUint64(nodeConfig.L2.ChainID), execution.DefaultCacheConfigFor(stack, &nodeConfig.Node.Caching), l1Client, rollupAddrs) defer closeDb(chainDb, "chainDb") if l2BlockChain != nil { @@ -403,8 +399,6 @@ func mainImpl() int { return 1 } - mustRunMetrics(nodeConfig) - fatalErrChan := make(chan error, 10) var valNode *valnode.ValidationNode @@ -534,6 +528,8 @@ type NodeConfig struct { GraphQL genericconf.GraphQLConfig `koanf:"graphql"` Metrics bool `koanf:"metrics"` MetricsServer genericconf.MetricsServerConfig `koanf:"metrics-server"` + PProf bool `koanf:"pprof"` + PprofCfg genericconf.PProf `koanf:"pprof-cfg"` Init InitConfig `koanf:"init"` Rpc genericconf.RpcConfig `koanf:"rpc"` } @@ -551,6 +547,8 @@ var NodeConfigDefault = NodeConfig{ IPC: genericconf.IPCConfigDefault, Metrics: false, MetricsServer: genericconf.MetricsServerConfigDefault, + PProf: false, + PprofCfg: genericconf.PProfDefault, } func NodeConfigAddOptions(f *flag.FlagSet) { @@ -570,6 +568,9 @@ func NodeConfigAddOptions(f *flag.FlagSet) { genericconf.GraphQLConfigAddOptions("graphql", f) f.Bool("metrics", NodeConfigDefault.Metrics, "enable metrics") genericconf.MetricsServerAddOptions("metrics-server", f) + f.Bool("pprof", NodeConfigDefault.PProf, "enable pprof") + genericconf.PProfAddOptions("pprof-cfg", f) + InitConfigAddOptions("init", f) genericconf.RpcConfigAddOptions("rpc", f) } diff --git a/cmd/relay/relay.go b/cmd/relay/relay.go index 9f5669454f..2d466d835a 100644 --- a/cmd/relay/relay.go +++ b/cmd/relay/relay.go @@ -37,6 +37,24 @@ func printSampleUsage(progname string) { fmt.Printf("Sample usage: %s --node.feed.input.url= --chain.id= \n", progname) } +// Checks metrics and PProf flag, runs them if enabled. +// Note: they are separate so one can enable/disable them as they wish, the only +// requirement is that they can't run on the same address and port. +func mustRunMetrics(cfg *relay.Config) { + mAddr := fmt.Sprintf("%v:%v", cfg.MetricsServer.Addr, cfg.MetricsServer.Port) + pAddr := fmt.Sprintf("%v:%v", cfg.PprofCfg.Addr, cfg.PprofCfg.Port) + if cfg.Metrics && cfg.PProf && mAddr == pAddr { + log.Crit("Metrics and pprof cannot be enabled on the same address:port", "addr", mAddr) + } + if cfg.Metrics { + go metrics.CollectProcessMetrics(cfg.MetricsServer.UpdateInterval) + exp.Setup(fmt.Sprintf("%v:%v", cfg.MetricsServer.Addr, cfg.MetricsServer.Port)) + } + if cfg.PProf { + genericconf.StartPprof(pAddr) + } +} + func startup() error { ctx := context.Background() @@ -68,18 +86,14 @@ func startup() error { if err != nil { return err } + + mustRunMetrics(relayConfig) + err = newRelay.Start(ctx) if err != nil { return err } - if relayConfig.Metrics && relayConfig.MetricsServer.Addr != "" { - go metrics.CollectProcessMetrics(relayConfig.MetricsServer.UpdateInterval) - - address := fmt.Sprintf("%v:%v", relayConfig.MetricsServer.Addr, relayConfig.MetricsServer.Port) - exp.Setup(address) - } - select { case <-sigint: log.Info("shutting down because of sigint") diff --git a/relay/relay.go b/relay/relay.go index b9d70c513b..f4fc33d9e3 100644 --- a/relay/relay.go +++ b/relay/relay.go @@ -146,6 +146,8 @@ type Config struct { LogType string `koanf:"log-type"` Metrics bool `koanf:"metrics"` MetricsServer genericconf.MetricsServerConfig `koanf:"metrics-server"` + PProf bool `koanf:"pprof"` + PprofCfg genericconf.PProf `koanf:"pprof-cfg"` Node NodeConfig `koanf:"node"` Queue int `koanf:"queue"` } @@ -157,6 +159,8 @@ var ConfigDefault = Config{ LogType: "plaintext", Metrics: false, MetricsServer: genericconf.MetricsServerConfigDefault, + PProf: false, + PprofCfg: genericconf.PProfDefault, Node: NodeConfigDefault, Queue: 1024, } @@ -168,6 +172,8 @@ func ConfigAddOptions(f *flag.FlagSet) { f.String("log-type", ConfigDefault.LogType, "log type") f.Bool("metrics", ConfigDefault.Metrics, "enable metrics") genericconf.MetricsServerAddOptions("metrics-server", f) + f.Bool("pprof", ConfigDefault.PProf, "enable pprof") + genericconf.PProfAddOptions("pprof-cfg", f) NodeConfigAddOptions("node", f) f.Int("queue", ConfigDefault.Queue, "size of relay queue") } From 65c79942b1303848e0e85d0481fb47f9cc0a9b51 Mon Sep 17 00:00:00 2001 From: Lee Bousfield Date: Thu, 13 Jul 2023 21:14:12 -0600 Subject: [PATCH 5/7] Unify latest staked and latest confirmed checking --- staker/staker.go | 117 +++++++++++++++---------------------- staker/validator_wallet.go | 28 +++++---- 2 files changed, 63 insertions(+), 82 deletions(-) diff --git a/staker/staker.go b/staker/staker.go index f360a60a7d..230c381d05 100644 --- a/staker/staker.go +++ b/staker/staker.go @@ -242,7 +242,7 @@ func NewStaker( return nil, err } stakerLastSuccessfulActionGauge.Update(time.Now().Unix()) - if config.StartFromStaked { + if config.StartFromStaked && blockValidator != nil { stakedNotifiers = append(stakedNotifiers, blockValidator) } return &Staker{ @@ -289,90 +289,50 @@ func (s *Staker) Initialize(ctx context.Context) error { return nil } -func (s *Staker) latestNodeDetailsForUpdate(ctx context.Context, description string, node uint64) (arbutil.MessageIndex, *validator.GoGlobalState, error) { - stakedInfo, err := s.rollup.LookupNode(ctx, node) +func (s *Staker) getLatestStakedState(ctx context.Context, staker common.Address) (uint64, arbutil.MessageIndex, *validator.GoGlobalState, error) { + callOpts := s.getCallOpts(ctx) + if s.l1Reader.UseFinalityData() { + callOpts.BlockNumber = big.NewInt(int64(rpc.FinalizedBlockNumber)) + } + latestStaked, _, err := s.validatorUtils.LatestStaked(s.getCallOpts(ctx), s.rollupAddress, staker) + if err != nil { + return 0, 0, nil, fmt.Errorf("couldn't get LatestStaked(%v): %w", staker, err) + } + if latestStaked == 0 { + return latestStaked, 0, nil, nil + } + + stakedInfo, err := s.rollup.LookupNode(ctx, latestStaked) if err != nil { - return 0, nil, fmt.Errorf("couldn't look up latest %v assertion %v: %w", description, node, err) + return 0, 0, nil, fmt.Errorf("couldn't look up latest assertion of %v (%v): %w", staker, latestStaked, err) } globalState := stakedInfo.AfterState().GlobalState caughtUp, count, err := GlobalStateToMsgCount(s.inboxTracker, s.txStreamer, globalState) if err != nil { if errors.Is(err, ErrGlobalStateNotInChain) && s.fatalErr != nil { - fatal := fmt.Errorf("latest %v assertion %v not in chain: %w", description, node, err) + fatal := fmt.Errorf("latest assertion of %v (%v) not in chain: %w", staker, latestStaked, err) s.fatalErr <- fatal } - return 0, nil, fmt.Errorf("latest %v assertion %v: %w", description, node, err) + return 0, 0, nil, fmt.Errorf("latest assertion of %v (%v): %w", staker, latestStaked, err) } if !caughtUp { - log.Info(fmt.Sprintf("latest %v assertion not yet in our node", description), "assertion", node, "state", globalState) - return 0, nil, nil + log.Info("latest assertion not yet in our node", "staker", staker, "assertion", latestStaked, "state", globalState) + return latestStaked, 0, nil, nil } processedCount, err := s.txStreamer.GetProcessedMessageCount() if err != nil { - return 0, nil, err + return 0, 0, nil, err } if processedCount < count { - log.Info("execution catching up to rollup", "lookingFor", description, "rollupCount", count, "processedCount", processedCount) - return 0, nil, nil + log.Info("execution catching up to rollup", "staker", staker, "rollupCount", count, "processedCount", processedCount) + return latestStaked, 0, nil, nil } - return count, &globalState, nil -} - -func (s *Staker) checkLatestStaked(ctx context.Context) error { - latestStaked, _, err := s.validatorUtils.LatestStaked(s.getCallOpts(ctx), s.rollupAddress, s.wallet.AddressOrZero()) - if err != nil { - return fmt.Errorf("couldn't get LatestStaked: %w", err) - } - stakerLatestStakedNodeGauge.Update(int64(latestStaked)) - if latestStaked == 0 { - return nil - } - - count, globalState, err := s.latestNodeDetailsForUpdate(ctx, "staked", latestStaked) - if err != nil { - return err - } - if globalState == nil { - return nil - } - - for _, notifier := range s.stakedNotifiers { - notifier.UpdateLatestStaked(count, *globalState) - } - return nil -} - -func (s *Staker) checkLatestConfirmed(ctx context.Context) error { - callOpts := s.getCallOpts(ctx) - if s.l1Reader.UseFinalityData() { - callOpts.BlockNumber = big.NewInt(int64(rpc.FinalizedBlockNumber)) - } - latestConfirmed, err := s.rollup.LatestConfirmed(callOpts) - if err != nil { - return fmt.Errorf("couldn't get LatestConfirmed: %w", err) - } - stakerLatestConfirmedNodeGauge.Update(int64(latestConfirmed)) - if latestConfirmed == 0 { - return nil - } - - count, globalState, err := s.latestNodeDetailsForUpdate(ctx, "confirmed", latestConfirmed) - if err != nil { - return err - } - if globalState == nil { - return nil - } - - for _, notifier := range s.confirmedNotifiers { - notifier.UpdateLatestConfirmed(count, *globalState) - } - return nil + return latestStaked, count, &globalState, nil } func (s *Staker) Start(ctxIn context.Context) { @@ -430,16 +390,31 @@ func (s *Staker) Start(ctxIn context.Context) { return backoff }) s.CallIteratively(func(ctx context.Context) time.Duration { - err := s.checkLatestStaked(ctx) + wallet := s.wallet.AddressOrZero() + staked, stakedMsgCount, stakedGlobalState, err := s.getLatestStakedState(ctx, wallet) if err != nil && ctx.Err() == nil { log.Error("staker: error checking latest staked", "err", err) } - return s.config.StakerInterval - }) - s.CallIteratively(func(ctx context.Context) time.Duration { - err := s.checkLatestConfirmed(ctx) - if err != nil && ctx.Err() == nil { - log.Error("staker: error checking latest confirmed", "err", err) + stakerLatestStakedNodeGauge.Update(int64(staked)) + if stakedGlobalState != nil { + for _, notifier := range s.stakedNotifiers { + notifier.UpdateLatestStaked(stakedMsgCount, *stakedGlobalState) + } + } + confirmed := staked + confirmedMsgCount := stakedMsgCount + confirmedGlobalState := stakedGlobalState + if wallet != (common.Address{}) { + confirmed, confirmedMsgCount, confirmedGlobalState, err = s.getLatestStakedState(ctx, common.Address{}) + if err != nil && ctx.Err() == nil { + log.Error("staker: error checking latest confirmed", "err", err) + } + } + stakerLatestConfirmedNodeGauge.Update(int64(confirmed)) + if confirmedGlobalState != nil { + for _, notifier := range s.confirmedNotifiers { + notifier.UpdateLatestConfirmed(confirmedMsgCount, *confirmedGlobalState) + } } return s.config.StakerInterval }) diff --git a/staker/validator_wallet.go b/staker/validator_wallet.go index c36efa7b61..d878749f35 100644 --- a/staker/validator_wallet.go +++ b/staker/validator_wallet.go @@ -8,6 +8,7 @@ import ( "errors" "math/big" "strings" + "sync/atomic" "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/accounts/abi" @@ -38,7 +39,9 @@ func init() { type ValidatorWalletInterface interface { Initialize(context.Context) error + // Address must be able to be called concurrently with other functions Address() *common.Address + // Address must be able to be called concurrently with other functions AddressOrZero() common.Address TxSenderAddress() *common.Address RollupAddress() common.Address @@ -53,7 +56,7 @@ type ValidatorWalletInterface interface { type ContractValidatorWallet struct { con *rollupgen.ValidatorWallet - address *common.Address + address atomic.Pointer[common.Address] onWalletCreated func(common.Address) l1Reader L1ReaderInterface auth *bind.TransactOpts @@ -79,9 +82,8 @@ func NewContractValidatorWallet(address *common.Address, walletFactoryAddr, roll if err != nil { return nil, err } - return &ContractValidatorWallet{ + wallet := &ContractValidatorWallet{ con: con, - address: address, onWalletCreated: onWalletCreated, l1Reader: l1Reader, auth: auth, @@ -89,7 +91,10 @@ func NewContractValidatorWallet(address *common.Address, walletFactoryAddr, roll rollupAddress: rollupAddress, rollup: rollup, rollupFromBlock: rollupFromBlock, - }, nil + } + // Go complains if we make an address variable before wallet and copy it in + wallet.address.Store(address) + return wallet, nil } func (v *ContractValidatorWallet) validateWallet(ctx context.Context) error { @@ -127,15 +132,16 @@ func (v *ContractValidatorWallet) Initialize(ctx context.Context) error { // May be the nil if the wallet hasn't been deployed yet func (v *ContractValidatorWallet) Address() *common.Address { - return v.address + return v.address.Load() } // May be zero if the wallet hasn't been deployed yet func (v *ContractValidatorWallet) AddressOrZero() common.Address { - if v.address == nil { + addr := v.address.Load() + if addr == nil { return common.Address{} } - return *v.address + return *addr } func (v *ContractValidatorWallet) TxSenderAddress() *common.Address { @@ -183,7 +189,7 @@ func (v *ContractValidatorWallet) populateWallet(ctx context.Context, createIfMi } return nil } - if v.address == nil { + if v.address.Load() == nil { auth, err := v.getAuth(ctx, nil) if err != nil { return err @@ -195,12 +201,12 @@ func (v *ContractValidatorWallet) populateWallet(ctx context.Context, createIfMi if addr == nil { return nil } - v.address = addr + v.address.Store(addr) if v.onWalletCreated != nil { v.onWalletCreated(*addr) } } - con, err := rollupgen.NewValidatorWallet(*v.address, v.l1Reader.Client()) + con, err := rollupgen.NewValidatorWallet(*v.Address(), v.l1Reader.Client()) if err != nil { return err } @@ -260,7 +266,7 @@ func (v *ContractValidatorWallet) ExecuteTransactions(ctx context.Context, build totalAmount = totalAmount.Add(totalAmount, tx.Value()) } - balanceInContract, err := v.l1Reader.Client().BalanceAt(ctx, *v.address, nil) + balanceInContract, err := v.l1Reader.Client().BalanceAt(ctx, *v.Address(), nil) if err != nil { return nil, err } From 741a9c843a7aed0c131eecea1a9ae06c9bff64e3 Mon Sep 17 00:00:00 2001 From: Nodar Date: Fri, 14 Jul 2023 12:30:53 +0200 Subject: [PATCH 6/7] Rename mustRunMetrics to startMetrics and return error instead --- cmd/daserver/daserver.go | 17 +++++------------ cmd/nitro-val/config.go | 6 ++++++ cmd/nitro-val/nitro_val.go | 35 +++++++++++++++++++++-------------- cmd/nitro/nitro.go | 10 +++++++--- cmd/relay/relay.go | 12 +++++++----- 5 files changed, 46 insertions(+), 34 deletions(-) diff --git a/cmd/daserver/daserver.go b/cmd/daserver/daserver.go index 059c19d7ca..1a587d6847 100644 --- a/cmd/daserver/daserver.go +++ b/cmd/daserver/daserver.go @@ -145,11 +145,11 @@ func (c *L1ReaderCloser) String() string { // Checks metrics and PProf flag, runs them if enabled. // Note: they are separate so one can enable/disable them as they wish, the only // requirement is that they can't run on the same address and port. -func mustRunMetrics(cfg *DAServerConfig) { +func startMetrics(cfg *DAServerConfig) error { mAddr := fmt.Sprintf("%v:%v", cfg.MetricsServer.Addr, cfg.MetricsServer.Port) pAddr := fmt.Sprintf("%v:%v", cfg.PprofCfg.Addr, cfg.PprofCfg.Port) if cfg.Metrics && cfg.PProf && mAddr == pAddr { - log.Crit("Metrics and pprof cannot be enabled on the same address:port", "addr", mAddr) + return fmt.Errorf("metrics and pprof cannot be enabled on the same address:port: %s", mAddr) } if cfg.Metrics { go metrics.CollectProcessMetrics(cfg.MetricsServer.UpdateInterval) @@ -158,6 +158,7 @@ func mustRunMetrics(cfg *DAServerConfig) { if cfg.PProf { genericconf.StartPprof(pAddr) } + return nil } func startup() error { @@ -176,16 +177,8 @@ func startup() error { glogger.Verbosity(log.Lvl(serverConfig.LogLevel)) log.Root().SetHandler(glogger) - if serverConfig.Metrics { - if len(serverConfig.MetricsServer.Addr) == 0 { - fmt.Printf("Metrics is enabled, but missing --metrics-server.addr") - return nil - } - - go metrics.CollectProcessMetrics(serverConfig.MetricsServer.UpdateInterval) - - address := fmt.Sprintf("%v:%v", serverConfig.MetricsServer.Addr, serverConfig.MetricsServer.Port) - exp.Setup(address) + if err := startMetrics(serverConfig); err != nil { + return err } sigint := make(chan os.Signal, 1) diff --git a/cmd/nitro-val/config.go b/cmd/nitro-val/config.go index 5ab1521f96..12a359cfa4 100644 --- a/cmd/nitro-val/config.go +++ b/cmd/nitro-val/config.go @@ -30,6 +30,8 @@ type ValidationNodeConfig struct { AuthRPC genericconf.AuthRPCConfig `koanf:"auth"` Metrics bool `koanf:"metrics"` MetricsServer genericconf.MetricsServerConfig `koanf:"metrics-server"` + PProf bool `koanf:"pprof"` + PprofCfg genericconf.PProf `koanf:"pprof-cfg"` Workdir string `koanf:"workdir" reload:"hot"` } @@ -67,6 +69,8 @@ var ValidationNodeConfigDefault = ValidationNodeConfig{ AuthRPC: genericconf.AuthRPCConfigDefault, Metrics: false, MetricsServer: genericconf.MetricsServerConfigDefault, + PProf: false, + PprofCfg: genericconf.PProfDefault, Workdir: "", } @@ -83,6 +87,8 @@ func ValidationNodeConfigAddOptions(f *flag.FlagSet) { genericconf.AuthRPCConfigAddOptions("auth", f) f.Bool("metrics", ValidationNodeConfigDefault.Metrics, "enable metrics") genericconf.MetricsServerAddOptions("metrics-server", f) + f.Bool("pprof", ValidationNodeConfigDefault.PProf, "enable pprof") + genericconf.PProfAddOptions("pprof-cfg", f) f.String("workdir", ValidationNodeConfigDefault.Workdir, "path used for purpose of resolving relative paths (ia. jwt secret file, log files), if empty then current working directory will be used.") } diff --git a/cmd/nitro-val/nitro_val.go b/cmd/nitro-val/nitro_val.go index 40d9fce5b6..e6b7bd882f 100644 --- a/cmd/nitro-val/nitro_val.go +++ b/cmd/nitro-val/nitro_val.go @@ -32,6 +32,25 @@ func main() { os.Exit(mainImpl()) } +// Checks metrics and PProf flag, runs them if enabled. +// Note: they are separate so one can enable/disable them as they wish, the only +// requirement is that they can't run on the same address and port. +func startMetrics(cfg *ValidationNodeConfig) error { + mAddr := fmt.Sprintf("%v:%v", cfg.MetricsServer.Addr, cfg.MetricsServer.Port) + pAddr := fmt.Sprintf("%v:%v", cfg.PprofCfg.Addr, cfg.PprofCfg.Port) + if cfg.Metrics && cfg.PProf && mAddr == pAddr { + return fmt.Errorf("metrics and pprof cannot be enabled on the same address:port: %s", mAddr) + } + if cfg.Metrics { + go metrics.CollectProcessMetrics(cfg.MetricsServer.UpdateInterval) + exp.Setup(fmt.Sprintf("%v:%v", cfg.MetricsServer.Addr, cfg.MetricsServer.Port)) + } + if cfg.PProf { + genericconf.StartPprof(pAddr) + } + return nil +} + // Returns the exit code func mainImpl() int { ctx, cancelFunc := context.WithCancel(context.Background()) @@ -96,20 +115,8 @@ func mainImpl() int { log.Crit("failed to initialize geth stack", "err", err) } - if nodeConfig.Metrics { - go metrics.CollectProcessMetrics(nodeConfig.MetricsServer.UpdateInterval) - - if nodeConfig.MetricsServer.Addr != "" { - address := fmt.Sprintf("%v:%v", nodeConfig.MetricsServer.Addr, nodeConfig.MetricsServer.Port) - if nodeConfig.MetricsServer.Pprof { - genericconf.StartPprof(address) - } else { - exp.Setup(address) - } - } - } else if nodeConfig.MetricsServer.Pprof { - flag.Usage() - log.Error("--metrics must be enabled in order to use pprof with the metrics server") + if err := startMetrics(nodeConfig); err != nil { + log.Error("Starting metrics: %v", err) return 1 } diff --git a/cmd/nitro/nitro.go b/cmd/nitro/nitro.go index 0f3a65db44..f1af1388cf 100644 --- a/cmd/nitro/nitro.go +++ b/cmd/nitro/nitro.go @@ -125,11 +125,11 @@ func main() { // Checks metrics and PProf flag, runs them if enabled. // Note: they are separate so one can enable/disable them as they wish, the only // requirement is that they can't run on the same address and port. -func mustRunMetrics(cfg *NodeConfig) { +func startMetrics(cfg *NodeConfig) error { mAddr := fmt.Sprintf("%v:%v", cfg.MetricsServer.Addr, cfg.MetricsServer.Port) pAddr := fmt.Sprintf("%v:%v", cfg.PprofCfg.Addr, cfg.PprofCfg.Port) if cfg.Metrics && cfg.PProf && mAddr == pAddr { - log.Crit("Metrics and pprof cannot be enabled on the same address:port", "addr", mAddr) + return fmt.Errorf("metrics and pprof cannot be enabled on the same address:port: %s", mAddr) } if cfg.Metrics { go metrics.CollectProcessMetrics(cfg.MetricsServer.UpdateInterval) @@ -138,6 +138,7 @@ func mustRunMetrics(cfg *NodeConfig) { if cfg.PProf { genericconf.StartPprof(pAddr) } + return nil } // Returns the exit code @@ -368,7 +369,10 @@ func mainImpl() int { } } - mustRunMetrics(nodeConfig) + if err := startMetrics(nodeConfig); err != nil { + log.Error("Starting metrics: %v", err) + return 1 + } chainDb, l2BlockChain, err := openInitializeChainDb(ctx, stack, nodeConfig, new(big.Int).SetUint64(nodeConfig.L2.ChainID), execution.DefaultCacheConfigFor(stack, &nodeConfig.Node.Caching), l1Client, rollupAddrs) defer closeDb(chainDb, "chainDb") diff --git a/cmd/relay/relay.go b/cmd/relay/relay.go index 2d466d835a..57831c3f59 100644 --- a/cmd/relay/relay.go +++ b/cmd/relay/relay.go @@ -40,11 +40,11 @@ func printSampleUsage(progname string) { // Checks metrics and PProf flag, runs them if enabled. // Note: they are separate so one can enable/disable them as they wish, the only // requirement is that they can't run on the same address and port. -func mustRunMetrics(cfg *relay.Config) { +func startMetrics(cfg *relay.Config) error { mAddr := fmt.Sprintf("%v:%v", cfg.MetricsServer.Addr, cfg.MetricsServer.Port) pAddr := fmt.Sprintf("%v:%v", cfg.PprofCfg.Addr, cfg.PprofCfg.Port) if cfg.Metrics && cfg.PProf && mAddr == pAddr { - log.Crit("Metrics and pprof cannot be enabled on the same address:port", "addr", mAddr) + return fmt.Errorf("metrics and pprof cannot be enabled on the same address:port: %s", mAddr) } if cfg.Metrics { go metrics.CollectProcessMetrics(cfg.MetricsServer.UpdateInterval) @@ -53,6 +53,7 @@ func mustRunMetrics(cfg *relay.Config) { if cfg.PProf { genericconf.StartPprof(pAddr) } + return nil } func startup() error { @@ -87,10 +88,11 @@ func startup() error { return err } - mustRunMetrics(relayConfig) + if err := startMetrics(relayConfig); err != nil { + return err + } - err = newRelay.Start(ctx) - if err != nil { + if err := newRelay.Start(ctx); err != nil { return err } From 38f9b3213ac1a56c067c0f2ee6f2c0b00482efa5 Mon Sep 17 00:00:00 2001 From: Nodar Date: Mon, 17 Jul 2023 14:09:35 +0200 Subject: [PATCH 7/7] Add comment explaining where pprof handlers are registered --- cmd/genericconf/pprof.go | 1 + 1 file changed, 1 insertion(+) diff --git a/cmd/genericconf/pprof.go b/cmd/genericconf/pprof.go index 8f756bbf45..e55bfddd32 100644 --- a/cmd/genericconf/pprof.go +++ b/cmd/genericconf/pprof.go @@ -4,6 +4,7 @@ import ( "fmt" "net/http" + // Blank import pprof registers its HTTP handlers. _ "net/http/pprof" // #nosec G108 "github.com/ethereum/go-ethereum/log"