diff --git a/arbnode/message_pruner.go b/arbnode/message_pruner.go index aeee07ca73..b469ecdbef 100644 --- a/arbnode/message_pruner.go +++ b/arbnode/message_pruner.go @@ -23,11 +23,13 @@ import ( type MessagePruner struct { stopwaiter.StopWaiter - transactionStreamer *TransactionStreamer - inboxTracker *InboxTracker - config MessagePrunerConfigFetcher - pruningLock sync.Mutex - lastPruneDone time.Time + transactionStreamer *TransactionStreamer + inboxTracker *InboxTracker + config MessagePrunerConfigFetcher + pruningLock sync.Mutex + lastPruneDone time.Time + cachedPrunedMessages uint64 + cachedPrunedDelayedMessages uint64 } type MessagePrunerConfig struct { @@ -62,7 +64,7 @@ func (m *MessagePruner) Start(ctxIn context.Context) { m.StopWaiter.Start(ctxIn, m) } -func (m *MessagePruner) UpdateLatestStaked(count arbutil.MessageIndex, globalState validator.GoGlobalState) { +func (m *MessagePruner) UpdateLatestConfirmed(count arbutil.MessageIndex, globalState validator.GoGlobalState) { locked := m.pruningLock.TryLock() if !locked { return @@ -108,11 +110,11 @@ func (m *MessagePruner) prune(ctx context.Context, count arbutil.MessageIndex, g msgCount := endBatchMetadata.MessageCount delayedCount := endBatchMetadata.DelayedMessageCount - return deleteOldMessageFromDB(ctx, msgCount, delayedCount, m.inboxTracker.db, m.transactionStreamer.db) + return m.deleteOldMessagesFromDB(ctx, msgCount, delayedCount) } -func deleteOldMessageFromDB(ctx context.Context, messageCount arbutil.MessageIndex, delayedMessageCount uint64, inboxTrackerDb ethdb.Database, transactionStreamerDb ethdb.Database) error { - prunedKeysRange, err := deleteFromLastPrunedUptoEndKey(ctx, transactionStreamerDb, messagePrefix, uint64(messageCount)) +func (m *MessagePruner) deleteOldMessagesFromDB(ctx context.Context, messageCount arbutil.MessageIndex, delayedMessageCount uint64) error { + prunedKeysRange, err := deleteFromLastPrunedUptoEndKey(ctx, m.transactionStreamer.db, messagePrefix, &m.cachedPrunedMessages, uint64(messageCount)) if err != nil { return fmt.Errorf("error deleting last batch messages: %w", err) } @@ -120,7 +122,7 @@ func deleteOldMessageFromDB(ctx context.Context, messageCount arbutil.MessageInd log.Info("Pruned last batch messages:", "first pruned key", prunedKeysRange[0], "last pruned key", prunedKeysRange[len(prunedKeysRange)-1]) } - prunedKeysRange, err = deleteFromLastPrunedUptoEndKey(ctx, inboxTrackerDb, rlpDelayedMessagePrefix, delayedMessageCount) + prunedKeysRange, err = deleteFromLastPrunedUptoEndKey(ctx, m.inboxTracker.db, rlpDelayedMessagePrefix, &m.cachedPrunedDelayedMessages, delayedMessageCount) if err != nil { return fmt.Errorf("error deleting last batch delayed messages: %w", err) } @@ -130,15 +132,25 @@ func deleteOldMessageFromDB(ctx context.Context, messageCount arbutil.MessageInd return nil } -func deleteFromLastPrunedUptoEndKey(ctx context.Context, db ethdb.Database, prefix []byte, endMinKey uint64) ([]uint64, error) { - startIter := db.NewIterator(prefix, uint64ToKey(1)) - if !startIter.Next() { +// deleteFromLastPrunedUptoEndKey is similar to deleteFromRange but automatically populates the start key +// cachedStartMinKey must not be nil. It's set to the new start key at the end of this function if successful. +func deleteFromLastPrunedUptoEndKey(ctx context.Context, db ethdb.Database, prefix []byte, cachedStartMinKey *uint64, endMinKey uint64) ([]uint64, error) { + startMinKey := *cachedStartMinKey + if startMinKey == 0 { + startIter := db.NewIterator(prefix, uint64ToKey(1)) + if !startIter.Next() { + return nil, nil + } + startMinKey = binary.BigEndian.Uint64(bytes.TrimPrefix(startIter.Key(), prefix)) + startIter.Release() + } + if endMinKey <= startMinKey { + *cachedStartMinKey = startMinKey return nil, nil } - startMinKey := binary.BigEndian.Uint64(bytes.TrimPrefix(startIter.Key(), prefix)) - startIter.Release() - if endMinKey > startMinKey { - return deleteFromRange(ctx, db, prefix, startMinKey, endMinKey-1) + keys, err := deleteFromRange(ctx, db, prefix, startMinKey, endMinKey-1) + if err == nil { + *cachedStartMinKey = endMinKey - 1 } - return nil, nil + return keys, err } diff --git a/arbnode/message_pruner_test.go b/arbnode/message_pruner_test.go index c0cb2cb4fe..0212ed2364 100644 --- a/arbnode/message_pruner_test.go +++ b/arbnode/message_pruner_test.go @@ -17,8 +17,8 @@ func TestMessagePrunerWithPruningEligibleMessagePresent(t *testing.T) { defer cancel() messagesCount := uint64(2 * 100 * 1024) - inboxTrackerDb, transactionStreamerDb := setupDatabase(t, 2*100*1024, 2*100*1024) - err := deleteOldMessageFromDB(ctx, arbutil.MessageIndex(messagesCount), messagesCount, inboxTrackerDb, transactionStreamerDb) + inboxTrackerDb, transactionStreamerDb, pruner := setupDatabase(t, 2*100*1024, 2*100*1024) + err := pruner.deleteOldMessagesFromDB(ctx, arbutil.MessageIndex(messagesCount), messagesCount) Require(t, err) checkDbKeys(t, messagesCount, transactionStreamerDb, messagePrefix) @@ -26,22 +26,21 @@ func TestMessagePrunerWithPruningEligibleMessagePresent(t *testing.T) { } -func TestMessagePrunerTraverseEachMessageOnlyOnce(t *testing.T) { +func TestMessagePrunerTwoHalves(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() messagesCount := uint64(10) - inboxTrackerDb, transactionStreamerDb := setupDatabase(t, messagesCount, messagesCount) - // In first iteration message till messagesCount are tried to be deleted. - err := deleteOldMessageFromDB(ctx, arbutil.MessageIndex(messagesCount), messagesCount, inboxTrackerDb, transactionStreamerDb) + _, transactionStreamerDb, pruner := setupDatabase(t, messagesCount, messagesCount) + // In first iteration message till messagesCount/2 are tried to be deleted. + err := pruner.deleteOldMessagesFromDB(ctx, arbutil.MessageIndex(messagesCount/2), messagesCount/2) Require(t, err) - // After first iteration messagesCount/2 is reinserted in inbox db - err = inboxTrackerDb.Put(dbKey(messagePrefix, messagesCount/2), []byte{}) + // In first iteration all the message till messagesCount/2 are deleted. + checkDbKeys(t, messagesCount/2, transactionStreamerDb, messagePrefix) + // In second iteration message till messagesCount are tried to be deleted. + err = pruner.deleteOldMessagesFromDB(ctx, arbutil.MessageIndex(messagesCount), messagesCount) Require(t, err) - // In second iteration message till messagesCount are again tried to be deleted. - err = deleteOldMessageFromDB(ctx, arbutil.MessageIndex(messagesCount), messagesCount, inboxTrackerDb, transactionStreamerDb) - Require(t, err) - // In second iteration all the message till messagesCount are deleted again. + // In second iteration all the message till messagesCount are deleted. checkDbKeys(t, messagesCount, transactionStreamerDb, messagePrefix) } @@ -50,10 +49,10 @@ func TestMessagePrunerPruneTillLessThenEqualTo(t *testing.T) { defer cancel() messagesCount := uint64(10) - inboxTrackerDb, transactionStreamerDb := setupDatabase(t, 2*messagesCount, 20) + inboxTrackerDb, transactionStreamerDb, pruner := setupDatabase(t, 2*messagesCount, 20) err := inboxTrackerDb.Delete(dbKey(messagePrefix, 9)) Require(t, err) - err = deleteOldMessageFromDB(ctx, arbutil.MessageIndex(messagesCount), messagesCount, inboxTrackerDb, transactionStreamerDb) + err = pruner.deleteOldMessagesFromDB(ctx, arbutil.MessageIndex(messagesCount), messagesCount) Require(t, err) hasKey, err := transactionStreamerDb.Has(dbKey(messagePrefix, messagesCount)) Require(t, err) @@ -67,8 +66,8 @@ func TestMessagePrunerWithNoPruningEligibleMessagePresent(t *testing.T) { defer cancel() messagesCount := uint64(10) - inboxTrackerDb, transactionStreamerDb := setupDatabase(t, messagesCount, messagesCount) - err := deleteOldMessageFromDB(ctx, arbutil.MessageIndex(messagesCount), messagesCount, inboxTrackerDb, transactionStreamerDb) + inboxTrackerDb, transactionStreamerDb, pruner := setupDatabase(t, messagesCount, messagesCount) + err := pruner.deleteOldMessagesFromDB(ctx, arbutil.MessageIndex(messagesCount), messagesCount) Require(t, err) checkDbKeys(t, uint64(messagesCount), transactionStreamerDb, messagePrefix) @@ -76,7 +75,7 @@ func TestMessagePrunerWithNoPruningEligibleMessagePresent(t *testing.T) { } -func setupDatabase(t *testing.T, messageCount, delayedMessageCount uint64) (ethdb.Database, ethdb.Database) { +func setupDatabase(t *testing.T, messageCount, delayedMessageCount uint64) (ethdb.Database, ethdb.Database, *MessagePruner) { transactionStreamerDb := rawdb.NewMemoryDatabase() for i := uint64(0); i < uint64(messageCount); i++ { @@ -90,7 +89,10 @@ func setupDatabase(t *testing.T, messageCount, delayedMessageCount uint64) (ethd Require(t, err) } - return inboxTrackerDb, transactionStreamerDb + return inboxTrackerDb, transactionStreamerDb, &MessagePruner{ + transactionStreamer: &TransactionStreamer{db: transactionStreamerDb}, + inboxTracker: &InboxTracker{db: inboxTrackerDb}, + } } func checkDbKeys(t *testing.T, endCount uint64, db ethdb.Database, prefix []byte) { diff --git a/arbnode/node.go b/arbnode/node.go index 8a4f38f28c..bd5605346b 100644 --- a/arbnode/node.go +++ b/arbnode/node.go @@ -825,13 +825,13 @@ func createNodeImpl( } } - notifiers := make([]staker.LatestStakedNotifier, 0) + var confirmedNotifiers []staker.LatestConfirmedNotifier if config.MessagePruner.Enable && !config.Caching.Archive { messagePruner = NewMessagePruner(txStreamer, inboxTracker, func() *MessagePrunerConfig { return &configFetcher.Get().MessagePruner }) - notifiers = append(notifiers, messagePruner) + confirmedNotifiers = append(confirmedNotifiers, messagePruner) } - stakerObj, err = staker.NewStaker(l1Reader, wallet, bind.CallOpts{}, config.Staker, blockValidator, statelessBlockValidator, notifiers, deployInfo.ValidatorUtils, fatalErrChan) + stakerObj, err = staker.NewStaker(l1Reader, wallet, bind.CallOpts{}, config.Staker, blockValidator, statelessBlockValidator, nil, confirmedNotifiers, deployInfo.ValidatorUtils, fatalErrChan) if err != nil { return nil, err } diff --git a/cmd/daserver/daserver.go b/cmd/daserver/daserver.go index 7b6b504e40..1a587d6847 100644 --- a/cmd/daserver/daserver.go +++ b/cmd/daserver/daserver.go @@ -45,6 +45,8 @@ type DAServerConfig struct { Metrics bool `koanf:"metrics"` MetricsServer genericconf.MetricsServerConfig `koanf:"metrics-server"` + PProf bool `koanf:"pprof"` + PprofCfg genericconf.PProf `koanf:"pprof-cfg"` } var DefaultDAServerConfig = DAServerConfig{ @@ -60,6 +62,8 @@ var DefaultDAServerConfig = DAServerConfig{ ConfConfig: genericconf.ConfConfigDefault, Metrics: false, MetricsServer: genericconf.MetricsServerConfigDefault, + PProf: false, + PprofCfg: genericconf.PProfDefault, LogLevel: 3, } @@ -89,6 +93,9 @@ func parseDAServer(args []string) (*DAServerConfig, error) { f.Bool("metrics", DefaultDAServerConfig.Metrics, "enable metrics") genericconf.MetricsServerAddOptions("metrics-server", f) + f.Bool("pprof", DefaultDAServerConfig.PProf, "enable pprof") + genericconf.PProfAddOptions("pprof-cfg", f) + f.Int("log-level", int(log.LvlInfo), "log level; 1: ERROR, 2: WARN, 3: INFO, 4: DEBUG, 5: TRACE") das.DataAvailabilityConfigAddDaserverOptions("data-availability", f) genericconf.ConfConfigAddOptions("conf", f) @@ -135,6 +142,25 @@ func (c *L1ReaderCloser) String() string { return "l1 reader closer" } +// Checks metrics and PProf flag, runs them if enabled. +// Note: they are separate so one can enable/disable them as they wish, the only +// requirement is that they can't run on the same address and port. +func startMetrics(cfg *DAServerConfig) error { + mAddr := fmt.Sprintf("%v:%v", cfg.MetricsServer.Addr, cfg.MetricsServer.Port) + pAddr := fmt.Sprintf("%v:%v", cfg.PprofCfg.Addr, cfg.PprofCfg.Port) + if cfg.Metrics && cfg.PProf && mAddr == pAddr { + return fmt.Errorf("metrics and pprof cannot be enabled on the same address:port: %s", mAddr) + } + if cfg.Metrics { + go metrics.CollectProcessMetrics(cfg.MetricsServer.UpdateInterval) + exp.Setup(fmt.Sprintf("%v:%v", cfg.MetricsServer.Addr, cfg.MetricsServer.Port)) + } + if cfg.PProf { + genericconf.StartPprof(pAddr) + } + return nil +} + func startup() error { // Some different defaults to DAS config in a node. das.DefaultDataAvailabilityConfig.Enable = true @@ -151,16 +177,8 @@ func startup() error { glogger.Verbosity(log.Lvl(serverConfig.LogLevel)) log.Root().SetHandler(glogger) - if serverConfig.Metrics { - if len(serverConfig.MetricsServer.Addr) == 0 { - fmt.Printf("Metrics is enabled, but missing --metrics-server.addr") - return nil - } - - go metrics.CollectProcessMetrics(serverConfig.MetricsServer.UpdateInterval) - - address := fmt.Sprintf("%v:%v", serverConfig.MetricsServer.Addr, serverConfig.MetricsServer.Port) - exp.Setup(address) + if err := startMetrics(serverConfig); err != nil { + return err } sigint := make(chan os.Signal, 1) diff --git a/cmd/genericconf/pprof.go b/cmd/genericconf/pprof.go index 8f756bbf45..e55bfddd32 100644 --- a/cmd/genericconf/pprof.go +++ b/cmd/genericconf/pprof.go @@ -4,6 +4,7 @@ import ( "fmt" "net/http" + // Blank import pprof registers its HTTP handlers. _ "net/http/pprof" // #nosec G108 "github.com/ethereum/go-ethereum/log" diff --git a/cmd/genericconf/server.go b/cmd/genericconf/server.go index 17c4a7a872..b99429191e 100644 --- a/cmd/genericconf/server.go +++ b/cmd/genericconf/server.go @@ -189,20 +189,32 @@ func AuthRPCConfigAddOptions(prefix string, f *flag.FlagSet) { type MetricsServerConfig struct { Addr string `koanf:"addr"` Port int `koanf:"port"` - Pprof bool `koanf:"pprof"` UpdateInterval time.Duration `koanf:"update-interval"` } var MetricsServerConfigDefault = MetricsServerConfig{ Addr: "127.0.0.1", Port: 6070, - Pprof: false, UpdateInterval: 3 * time.Second, } +type PProf struct { + Addr string `koanf:"addr"` + Port int `koanf:"port"` +} + +var PProfDefault = PProf{ + Addr: "127.0.0.1", + Port: 6071, +} + func MetricsServerAddOptions(prefix string, f *flag.FlagSet) { f.String(prefix+".addr", MetricsServerConfigDefault.Addr, "metrics server address") f.Int(prefix+".port", MetricsServerConfigDefault.Port, "metrics server port") - f.Bool(prefix+".pprof", MetricsServerConfigDefault.Pprof, "enable profiling for Go") f.Duration(prefix+".update-interval", MetricsServerConfigDefault.UpdateInterval, "metrics server update interval") } + +func PProfAddOptions(prefix string, f *flag.FlagSet) { + f.String(prefix+".addr", PProfDefault.Addr, "pprof server address") + f.Int(prefix+".port", PProfDefault.Port, "pprof server port") +} diff --git a/cmd/nitro-val/config.go b/cmd/nitro-val/config.go index 5ab1521f96..12a359cfa4 100644 --- a/cmd/nitro-val/config.go +++ b/cmd/nitro-val/config.go @@ -30,6 +30,8 @@ type ValidationNodeConfig struct { AuthRPC genericconf.AuthRPCConfig `koanf:"auth"` Metrics bool `koanf:"metrics"` MetricsServer genericconf.MetricsServerConfig `koanf:"metrics-server"` + PProf bool `koanf:"pprof"` + PprofCfg genericconf.PProf `koanf:"pprof-cfg"` Workdir string `koanf:"workdir" reload:"hot"` } @@ -67,6 +69,8 @@ var ValidationNodeConfigDefault = ValidationNodeConfig{ AuthRPC: genericconf.AuthRPCConfigDefault, Metrics: false, MetricsServer: genericconf.MetricsServerConfigDefault, + PProf: false, + PprofCfg: genericconf.PProfDefault, Workdir: "", } @@ -83,6 +87,8 @@ func ValidationNodeConfigAddOptions(f *flag.FlagSet) { genericconf.AuthRPCConfigAddOptions("auth", f) f.Bool("metrics", ValidationNodeConfigDefault.Metrics, "enable metrics") genericconf.MetricsServerAddOptions("metrics-server", f) + f.Bool("pprof", ValidationNodeConfigDefault.PProf, "enable pprof") + genericconf.PProfAddOptions("pprof-cfg", f) f.String("workdir", ValidationNodeConfigDefault.Workdir, "path used for purpose of resolving relative paths (ia. jwt secret file, log files), if empty then current working directory will be used.") } diff --git a/cmd/nitro-val/nitro_val.go b/cmd/nitro-val/nitro_val.go index 40d9fce5b6..e6b7bd882f 100644 --- a/cmd/nitro-val/nitro_val.go +++ b/cmd/nitro-val/nitro_val.go @@ -32,6 +32,25 @@ func main() { os.Exit(mainImpl()) } +// Checks metrics and PProf flag, runs them if enabled. +// Note: they are separate so one can enable/disable them as they wish, the only +// requirement is that they can't run on the same address and port. +func startMetrics(cfg *ValidationNodeConfig) error { + mAddr := fmt.Sprintf("%v:%v", cfg.MetricsServer.Addr, cfg.MetricsServer.Port) + pAddr := fmt.Sprintf("%v:%v", cfg.PprofCfg.Addr, cfg.PprofCfg.Port) + if cfg.Metrics && cfg.PProf && mAddr == pAddr { + return fmt.Errorf("metrics and pprof cannot be enabled on the same address:port: %s", mAddr) + } + if cfg.Metrics { + go metrics.CollectProcessMetrics(cfg.MetricsServer.UpdateInterval) + exp.Setup(fmt.Sprintf("%v:%v", cfg.MetricsServer.Addr, cfg.MetricsServer.Port)) + } + if cfg.PProf { + genericconf.StartPprof(pAddr) + } + return nil +} + // Returns the exit code func mainImpl() int { ctx, cancelFunc := context.WithCancel(context.Background()) @@ -96,20 +115,8 @@ func mainImpl() int { log.Crit("failed to initialize geth stack", "err", err) } - if nodeConfig.Metrics { - go metrics.CollectProcessMetrics(nodeConfig.MetricsServer.UpdateInterval) - - if nodeConfig.MetricsServer.Addr != "" { - address := fmt.Sprintf("%v:%v", nodeConfig.MetricsServer.Addr, nodeConfig.MetricsServer.Port) - if nodeConfig.MetricsServer.Pprof { - genericconf.StartPprof(address) - } else { - exp.Setup(address) - } - } - } else if nodeConfig.MetricsServer.Pprof { - flag.Usage() - log.Error("--metrics must be enabled in order to use pprof with the metrics server") + if err := startMetrics(nodeConfig); err != nil { + log.Error("Starting metrics: %v", err) return 1 } diff --git a/cmd/nitro/nitro.go b/cmd/nitro/nitro.go index 97fa67512e..f1af1388cf 100644 --- a/cmd/nitro/nitro.go +++ b/cmd/nitro/nitro.go @@ -122,6 +122,25 @@ func main() { os.Exit(mainImpl()) } +// Checks metrics and PProf flag, runs them if enabled. +// Note: they are separate so one can enable/disable them as they wish, the only +// requirement is that they can't run on the same address and port. +func startMetrics(cfg *NodeConfig) error { + mAddr := fmt.Sprintf("%v:%v", cfg.MetricsServer.Addr, cfg.MetricsServer.Port) + pAddr := fmt.Sprintf("%v:%v", cfg.PprofCfg.Addr, cfg.PprofCfg.Port) + if cfg.Metrics && cfg.PProf && mAddr == pAddr { + return fmt.Errorf("metrics and pprof cannot be enabled on the same address:port: %s", mAddr) + } + if cfg.Metrics { + go metrics.CollectProcessMetrics(cfg.MetricsServer.UpdateInterval) + exp.Setup(fmt.Sprintf("%v:%v", cfg.MetricsServer.Addr, cfg.MetricsServer.Port)) + } + if cfg.PProf { + genericconf.StartPprof(pAddr) + } + return nil +} + // Returns the exit code func mainImpl() int { ctx, cancelFunc := context.WithCancel(context.Background()) @@ -350,6 +369,11 @@ func mainImpl() int { } } + if err := startMetrics(nodeConfig); err != nil { + log.Error("Starting metrics: %v", err) + return 1 + } + chainDb, l2BlockChain, err := openInitializeChainDb(ctx, stack, nodeConfig, new(big.Int).SetUint64(nodeConfig.L2.ChainID), execution.DefaultCacheConfigFor(stack, &nodeConfig.Node.Caching), l1Client, rollupAddrs) defer closeDb(chainDb, "chainDb") if l2BlockChain != nil { @@ -379,23 +403,6 @@ func mainImpl() int { return 1 } - if nodeConfig.Metrics { - go metrics.CollectProcessMetrics(nodeConfig.MetricsServer.UpdateInterval) - - if nodeConfig.MetricsServer.Addr != "" { - address := fmt.Sprintf("%v:%v", nodeConfig.MetricsServer.Addr, nodeConfig.MetricsServer.Port) - if nodeConfig.MetricsServer.Pprof { - genericconf.StartPprof(address) - } else { - exp.Setup(address) - } - } - } else if nodeConfig.MetricsServer.Pprof { - flag.Usage() - log.Error("--metrics must be enabled in order to use pprof with the metrics server") - return 1 - } - fatalErrChan := make(chan error, 10) var valNode *valnode.ValidationNode @@ -525,6 +532,8 @@ type NodeConfig struct { GraphQL genericconf.GraphQLConfig `koanf:"graphql"` Metrics bool `koanf:"metrics"` MetricsServer genericconf.MetricsServerConfig `koanf:"metrics-server"` + PProf bool `koanf:"pprof"` + PprofCfg genericconf.PProf `koanf:"pprof-cfg"` Init InitConfig `koanf:"init"` Rpc genericconf.RpcConfig `koanf:"rpc"` } @@ -542,6 +551,8 @@ var NodeConfigDefault = NodeConfig{ IPC: genericconf.IPCConfigDefault, Metrics: false, MetricsServer: genericconf.MetricsServerConfigDefault, + PProf: false, + PprofCfg: genericconf.PProfDefault, } func NodeConfigAddOptions(f *flag.FlagSet) { @@ -561,6 +572,9 @@ func NodeConfigAddOptions(f *flag.FlagSet) { genericconf.GraphQLConfigAddOptions("graphql", f) f.Bool("metrics", NodeConfigDefault.Metrics, "enable metrics") genericconf.MetricsServerAddOptions("metrics-server", f) + f.Bool("pprof", NodeConfigDefault.PProf, "enable pprof") + genericconf.PProfAddOptions("pprof-cfg", f) + InitConfigAddOptions("init", f) genericconf.RpcConfigAddOptions("rpc", f) } diff --git a/cmd/relay/relay.go b/cmd/relay/relay.go index 9f5669454f..57831c3f59 100644 --- a/cmd/relay/relay.go +++ b/cmd/relay/relay.go @@ -37,6 +37,25 @@ func printSampleUsage(progname string) { fmt.Printf("Sample usage: %s --node.feed.input.url= --chain.id= \n", progname) } +// Checks metrics and PProf flag, runs them if enabled. +// Note: they are separate so one can enable/disable them as they wish, the only +// requirement is that they can't run on the same address and port. +func startMetrics(cfg *relay.Config) error { + mAddr := fmt.Sprintf("%v:%v", cfg.MetricsServer.Addr, cfg.MetricsServer.Port) + pAddr := fmt.Sprintf("%v:%v", cfg.PprofCfg.Addr, cfg.PprofCfg.Port) + if cfg.Metrics && cfg.PProf && mAddr == pAddr { + return fmt.Errorf("metrics and pprof cannot be enabled on the same address:port: %s", mAddr) + } + if cfg.Metrics { + go metrics.CollectProcessMetrics(cfg.MetricsServer.UpdateInterval) + exp.Setup(fmt.Sprintf("%v:%v", cfg.MetricsServer.Addr, cfg.MetricsServer.Port)) + } + if cfg.PProf { + genericconf.StartPprof(pAddr) + } + return nil +} + func startup() error { ctx := context.Background() @@ -68,16 +87,13 @@ func startup() error { if err != nil { return err } - err = newRelay.Start(ctx) - if err != nil { + + if err := startMetrics(relayConfig); err != nil { return err } - if relayConfig.Metrics && relayConfig.MetricsServer.Addr != "" { - go metrics.CollectProcessMetrics(relayConfig.MetricsServer.UpdateInterval) - - address := fmt.Sprintf("%v:%v", relayConfig.MetricsServer.Addr, relayConfig.MetricsServer.Port) - exp.Setup(address) + if err := newRelay.Start(ctx); err != nil { + return err } select { diff --git a/relay/relay.go b/relay/relay.go index b9d70c513b..f4fc33d9e3 100644 --- a/relay/relay.go +++ b/relay/relay.go @@ -146,6 +146,8 @@ type Config struct { LogType string `koanf:"log-type"` Metrics bool `koanf:"metrics"` MetricsServer genericconf.MetricsServerConfig `koanf:"metrics-server"` + PProf bool `koanf:"pprof"` + PprofCfg genericconf.PProf `koanf:"pprof-cfg"` Node NodeConfig `koanf:"node"` Queue int `koanf:"queue"` } @@ -157,6 +159,8 @@ var ConfigDefault = Config{ LogType: "plaintext", Metrics: false, MetricsServer: genericconf.MetricsServerConfigDefault, + PProf: false, + PprofCfg: genericconf.PProfDefault, Node: NodeConfigDefault, Queue: 1024, } @@ -168,6 +172,8 @@ func ConfigAddOptions(f *flag.FlagSet) { f.String("log-type", ConfigDefault.LogType, "log type") f.Bool("metrics", ConfigDefault.Metrics, "enable metrics") genericconf.MetricsServerAddOptions("metrics-server", f) + f.Bool("pprof", ConfigDefault.PProf, "enable pprof") + genericconf.PProfAddOptions("pprof-cfg", f) NodeConfigAddOptions("node", f) f.Int("queue", ConfigDefault.Queue, "size of relay queue") } diff --git a/staker/staker.go b/staker/staker.go index 09a05daad2..230c381d05 100644 --- a/staker/staker.go +++ b/staker/staker.go @@ -17,6 +17,7 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" + "github.com/ethereum/go-ethereum/rpc" flag "github.com/spf13/pflag" "github.com/offchainlabs/nitro/arbutil" @@ -30,6 +31,7 @@ var ( stakerBalanceGauge = metrics.NewRegisteredGaugeFloat64("arb/staker/balance", nil) stakerAmountStakedGauge = metrics.NewRegisteredGauge("arb/staker/amount_staked", nil) stakerLatestStakedNodeGauge = metrics.NewRegisteredGauge("arb/staker/staked_node", nil) + stakerLatestConfirmedNodeGauge = metrics.NewRegisteredGauge("arb/staker/confirmed_node", nil) stakerLastSuccessfulActionGauge = metrics.NewRegisteredGauge("arb/staker/action/last_success", nil) stakerActionSuccessCounter = metrics.NewRegisteredCounter("arb/staker/action/success", nil) stakerActionFailureCounter = metrics.NewRegisteredCounter("arb/staker/action/failure", nil) @@ -195,11 +197,16 @@ type LatestStakedNotifier interface { UpdateLatestStaked(count arbutil.MessageIndex, globalState validator.GoGlobalState) } +type LatestConfirmedNotifier interface { + UpdateLatestConfirmed(count arbutil.MessageIndex, globalState validator.GoGlobalState) +} + type Staker struct { *L1Validator stopwaiter.StopWaiter l1Reader L1ReaderInterface - notifiers []LatestStakedNotifier + stakedNotifiers []LatestStakedNotifier + confirmedNotifiers []LatestConfirmedNotifier activeChallenge *ChallengeManager baseCallOpts bind.CallOpts config L1ValidatorConfig @@ -219,7 +226,8 @@ func NewStaker( config L1ValidatorConfig, blockValidator *BlockValidator, statelessBlockValidator *StatelessBlockValidator, - notifiers []LatestStakedNotifier, + stakedNotifiers []LatestStakedNotifier, + confirmedNotifiers []LatestConfirmedNotifier, validatorUtilsAddress common.Address, fatalErr chan<- error, ) (*Staker, error) { @@ -234,13 +242,14 @@ func NewStaker( return nil, err } stakerLastSuccessfulActionGauge.Update(time.Now().Unix()) - if config.StartFromStaked { - notifiers = append(notifiers, blockValidator) + if config.StartFromStaked && blockValidator != nil { + stakedNotifiers = append(stakedNotifiers, blockValidator) } return &Staker{ L1Validator: val, l1Reader: l1Reader, - notifiers: notifiers, + stakedNotifiers: stakedNotifiers, + confirmedNotifiers: confirmedNotifiers, baseCallOpts: callOpts, config: config, highGasBlocksBuffer: big.NewInt(config.L1PostingStrategy.HighGasDelayBlocks), @@ -280,50 +289,50 @@ func (s *Staker) Initialize(ctx context.Context) error { return nil } -func (s *Staker) checkLatestStaked(ctx context.Context) error { - latestStaked, _, err := s.validatorUtils.LatestStaked(&s.baseCallOpts, s.rollupAddress, s.wallet.AddressOrZero()) +func (s *Staker) getLatestStakedState(ctx context.Context, staker common.Address) (uint64, arbutil.MessageIndex, *validator.GoGlobalState, error) { + callOpts := s.getCallOpts(ctx) + if s.l1Reader.UseFinalityData() { + callOpts.BlockNumber = big.NewInt(int64(rpc.FinalizedBlockNumber)) + } + latestStaked, _, err := s.validatorUtils.LatestStaked(s.getCallOpts(ctx), s.rollupAddress, staker) if err != nil { - return fmt.Errorf("couldn't get LatestStaked: %w", err) + return 0, 0, nil, fmt.Errorf("couldn't get LatestStaked(%v): %w", staker, err) } - stakerLatestStakedNodeGauge.Update(int64(latestStaked)) if latestStaked == 0 { - return nil + return latestStaked, 0, nil, nil } stakedInfo, err := s.rollup.LookupNode(ctx, latestStaked) if err != nil { - return fmt.Errorf("couldn't look up latest node: %w", err) + return 0, 0, nil, fmt.Errorf("couldn't look up latest assertion of %v (%v): %w", staker, latestStaked, err) } - stakedGlobalState := stakedInfo.AfterState().GlobalState - caughtUp, count, err := GlobalStateToMsgCount(s.inboxTracker, s.txStreamer, stakedGlobalState) + globalState := stakedInfo.AfterState().GlobalState + caughtUp, count, err := GlobalStateToMsgCount(s.inboxTracker, s.txStreamer, globalState) if err != nil { if errors.Is(err, ErrGlobalStateNotInChain) && s.fatalErr != nil { - fatal := fmt.Errorf("latest staked not in chain: %w", err) + fatal := fmt.Errorf("latest assertion of %v (%v) not in chain: %w", staker, latestStaked, err) s.fatalErr <- fatal } - return fmt.Errorf("staker: latest staked %w", err) + return 0, 0, nil, fmt.Errorf("latest assertion of %v (%v): %w", staker, latestStaked, err) } if !caughtUp { - log.Info("latest valid not yet in our node", "staked", stakedGlobalState) - return nil + log.Info("latest assertion not yet in our node", "staker", staker, "assertion", latestStaked, "state", globalState) + return latestStaked, 0, nil, nil } processedCount, err := s.txStreamer.GetProcessedMessageCount() if err != nil { - return err + return 0, 0, nil, err } if processedCount < count { - log.Info("execution catching up to last validated", "validatedCount", count, "processedCount", processedCount) - return nil + log.Info("execution catching up to rollup", "staker", staker, "rollupCount", count, "processedCount", processedCount) + return latestStaked, 0, nil, nil } - for _, notifier := range s.notifiers { - notifier.UpdateLatestStaked(count, stakedGlobalState) - } - return nil + return latestStaked, count, &globalState, nil } func (s *Staker) Start(ctxIn context.Context) { @@ -381,10 +390,32 @@ func (s *Staker) Start(ctxIn context.Context) { return backoff }) s.CallIteratively(func(ctx context.Context) time.Duration { - err := s.checkLatestStaked(ctx) + wallet := s.wallet.AddressOrZero() + staked, stakedMsgCount, stakedGlobalState, err := s.getLatestStakedState(ctx, wallet) if err != nil && ctx.Err() == nil { log.Error("staker: error checking latest staked", "err", err) } + stakerLatestStakedNodeGauge.Update(int64(staked)) + if stakedGlobalState != nil { + for _, notifier := range s.stakedNotifiers { + notifier.UpdateLatestStaked(stakedMsgCount, *stakedGlobalState) + } + } + confirmed := staked + confirmedMsgCount := stakedMsgCount + confirmedGlobalState := stakedGlobalState + if wallet != (common.Address{}) { + confirmed, confirmedMsgCount, confirmedGlobalState, err = s.getLatestStakedState(ctx, common.Address{}) + if err != nil && ctx.Err() == nil { + log.Error("staker: error checking latest confirmed", "err", err) + } + } + stakerLatestConfirmedNodeGauge.Update(int64(confirmed)) + if confirmedGlobalState != nil { + for _, notifier := range s.confirmedNotifiers { + notifier.UpdateLatestConfirmed(confirmedMsgCount, *confirmedGlobalState) + } + } return s.config.StakerInterval }) } diff --git a/staker/stateless_block_validator.go b/staker/stateless_block_validator.go index 0242daa3c7..7add3e258d 100644 --- a/staker/stateless_block_validator.go +++ b/staker/stateless_block_validator.go @@ -84,6 +84,7 @@ type L1ReaderInterface interface { Client() arbutil.L1Interface Subscribe(bool) (<-chan *types.Header, func()) WaitForTxApproval(ctx context.Context, tx *types.Transaction) (*types.Receipt, error) + UseFinalityData() bool } type GlobalStatePosition struct { diff --git a/staker/validator_wallet.go b/staker/validator_wallet.go index c36efa7b61..d878749f35 100644 --- a/staker/validator_wallet.go +++ b/staker/validator_wallet.go @@ -8,6 +8,7 @@ import ( "errors" "math/big" "strings" + "sync/atomic" "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/accounts/abi" @@ -38,7 +39,9 @@ func init() { type ValidatorWalletInterface interface { Initialize(context.Context) error + // Address must be able to be called concurrently with other functions Address() *common.Address + // Address must be able to be called concurrently with other functions AddressOrZero() common.Address TxSenderAddress() *common.Address RollupAddress() common.Address @@ -53,7 +56,7 @@ type ValidatorWalletInterface interface { type ContractValidatorWallet struct { con *rollupgen.ValidatorWallet - address *common.Address + address atomic.Pointer[common.Address] onWalletCreated func(common.Address) l1Reader L1ReaderInterface auth *bind.TransactOpts @@ -79,9 +82,8 @@ func NewContractValidatorWallet(address *common.Address, walletFactoryAddr, roll if err != nil { return nil, err } - return &ContractValidatorWallet{ + wallet := &ContractValidatorWallet{ con: con, - address: address, onWalletCreated: onWalletCreated, l1Reader: l1Reader, auth: auth, @@ -89,7 +91,10 @@ func NewContractValidatorWallet(address *common.Address, walletFactoryAddr, roll rollupAddress: rollupAddress, rollup: rollup, rollupFromBlock: rollupFromBlock, - }, nil + } + // Go complains if we make an address variable before wallet and copy it in + wallet.address.Store(address) + return wallet, nil } func (v *ContractValidatorWallet) validateWallet(ctx context.Context) error { @@ -127,15 +132,16 @@ func (v *ContractValidatorWallet) Initialize(ctx context.Context) error { // May be the nil if the wallet hasn't been deployed yet func (v *ContractValidatorWallet) Address() *common.Address { - return v.address + return v.address.Load() } // May be zero if the wallet hasn't been deployed yet func (v *ContractValidatorWallet) AddressOrZero() common.Address { - if v.address == nil { + addr := v.address.Load() + if addr == nil { return common.Address{} } - return *v.address + return *addr } func (v *ContractValidatorWallet) TxSenderAddress() *common.Address { @@ -183,7 +189,7 @@ func (v *ContractValidatorWallet) populateWallet(ctx context.Context, createIfMi } return nil } - if v.address == nil { + if v.address.Load() == nil { auth, err := v.getAuth(ctx, nil) if err != nil { return err @@ -195,12 +201,12 @@ func (v *ContractValidatorWallet) populateWallet(ctx context.Context, createIfMi if addr == nil { return nil } - v.address = addr + v.address.Store(addr) if v.onWalletCreated != nil { v.onWalletCreated(*addr) } } - con, err := rollupgen.NewValidatorWallet(*v.address, v.l1Reader.Client()) + con, err := rollupgen.NewValidatorWallet(*v.Address(), v.l1Reader.Client()) if err != nil { return err } @@ -260,7 +266,7 @@ func (v *ContractValidatorWallet) ExecuteTransactions(ctx context.Context, build totalAmount = totalAmount.Add(totalAmount, tx.Value()) } - balanceInContract, err := v.l1Reader.Client().BalanceAt(ctx, *v.address, nil) + balanceInContract, err := v.l1Reader.Client().BalanceAt(ctx, *v.Address(), nil) if err != nil { return nil, err } diff --git a/system_tests/staker_test.go b/system_tests/staker_test.go index 7a3ae41814..8a8ef29bf3 100644 --- a/system_tests/staker_test.go +++ b/system_tests/staker_test.go @@ -166,6 +166,7 @@ func stakerTestImpl(t *testing.T, faultyStaker bool, honestStakerInactive bool) nil, statelessA, nil, + nil, l2nodeA.DeployInfo.ValidatorUtils, nil, ) @@ -201,6 +202,7 @@ func stakerTestImpl(t *testing.T, faultyStaker bool, honestStakerInactive bool) nil, statelessB, nil, + nil, l2nodeB.DeployInfo.ValidatorUtils, nil, ) @@ -223,6 +225,7 @@ func stakerTestImpl(t *testing.T, faultyStaker bool, honestStakerInactive bool) nil, statelessA, nil, + nil, l2nodeA.DeployInfo.ValidatorUtils, nil, ) diff --git a/util/headerreader/header_reader.go b/util/headerreader/header_reader.go index f25fed02d9..1d52e8f78d 100644 --- a/util/headerreader/header_reader.go +++ b/util/headerreader/header_reader.go @@ -434,6 +434,10 @@ func (s *HeaderReader) Client() arbutil.L1Interface { return s.client } +func (s *HeaderReader) UseFinalityData() bool { + return s.config().UseFinalityData +} + func (s *HeaderReader) Start(ctxIn context.Context) { s.StopWaiter.Start(ctxIn, s) s.LaunchThread(s.broadcastLoop)