Skip to content

Commit

Permalink
Merge branch 'master' into test-cases-contractTxType
Browse files Browse the repository at this point in the history
  • Loading branch information
anodar committed Jul 17, 2023
2 parents 522eb88 + 676bef3 commit 87563a0
Show file tree
Hide file tree
Showing 16 changed files with 265 additions and 126 deletions.
48 changes: 30 additions & 18 deletions arbnode/message_pruner.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@ import (

type MessagePruner struct {
stopwaiter.StopWaiter
transactionStreamer *TransactionStreamer
inboxTracker *InboxTracker
config MessagePrunerConfigFetcher
pruningLock sync.Mutex
lastPruneDone time.Time
transactionStreamer *TransactionStreamer
inboxTracker *InboxTracker
config MessagePrunerConfigFetcher
pruningLock sync.Mutex
lastPruneDone time.Time
cachedPrunedMessages uint64
cachedPrunedDelayedMessages uint64
}

type MessagePrunerConfig struct {
Expand Down Expand Up @@ -62,7 +64,7 @@ func (m *MessagePruner) Start(ctxIn context.Context) {
m.StopWaiter.Start(ctxIn, m)
}

func (m *MessagePruner) UpdateLatestStaked(count arbutil.MessageIndex, globalState validator.GoGlobalState) {
func (m *MessagePruner) UpdateLatestConfirmed(count arbutil.MessageIndex, globalState validator.GoGlobalState) {
locked := m.pruningLock.TryLock()
if !locked {
return
Expand Down Expand Up @@ -108,19 +110,19 @@ func (m *MessagePruner) prune(ctx context.Context, count arbutil.MessageIndex, g
msgCount := endBatchMetadata.MessageCount
delayedCount := endBatchMetadata.DelayedMessageCount

return deleteOldMessageFromDB(ctx, msgCount, delayedCount, m.inboxTracker.db, m.transactionStreamer.db)
return m.deleteOldMessagesFromDB(ctx, msgCount, delayedCount)
}

func deleteOldMessageFromDB(ctx context.Context, messageCount arbutil.MessageIndex, delayedMessageCount uint64, inboxTrackerDb ethdb.Database, transactionStreamerDb ethdb.Database) error {
prunedKeysRange, err := deleteFromLastPrunedUptoEndKey(ctx, transactionStreamerDb, messagePrefix, uint64(messageCount))
func (m *MessagePruner) deleteOldMessagesFromDB(ctx context.Context, messageCount arbutil.MessageIndex, delayedMessageCount uint64) error {
prunedKeysRange, err := deleteFromLastPrunedUptoEndKey(ctx, m.transactionStreamer.db, messagePrefix, &m.cachedPrunedMessages, uint64(messageCount))
if err != nil {
return fmt.Errorf("error deleting last batch messages: %w", err)
}
if len(prunedKeysRange) > 0 {
log.Info("Pruned last batch messages:", "first pruned key", prunedKeysRange[0], "last pruned key", prunedKeysRange[len(prunedKeysRange)-1])
}

prunedKeysRange, err = deleteFromLastPrunedUptoEndKey(ctx, inboxTrackerDb, rlpDelayedMessagePrefix, delayedMessageCount)
prunedKeysRange, err = deleteFromLastPrunedUptoEndKey(ctx, m.inboxTracker.db, rlpDelayedMessagePrefix, &m.cachedPrunedDelayedMessages, delayedMessageCount)
if err != nil {
return fmt.Errorf("error deleting last batch delayed messages: %w", err)
}
Expand All @@ -130,15 +132,25 @@ func deleteOldMessageFromDB(ctx context.Context, messageCount arbutil.MessageInd
return nil
}

func deleteFromLastPrunedUptoEndKey(ctx context.Context, db ethdb.Database, prefix []byte, endMinKey uint64) ([]uint64, error) {
startIter := db.NewIterator(prefix, uint64ToKey(1))
if !startIter.Next() {
// deleteFromLastPrunedUptoEndKey is similar to deleteFromRange but automatically populates the start key
// cachedStartMinKey must not be nil. It's set to the new start key at the end of this function if successful.
func deleteFromLastPrunedUptoEndKey(ctx context.Context, db ethdb.Database, prefix []byte, cachedStartMinKey *uint64, endMinKey uint64) ([]uint64, error) {
startMinKey := *cachedStartMinKey
if startMinKey == 0 {
startIter := db.NewIterator(prefix, uint64ToKey(1))
if !startIter.Next() {
return nil, nil
}
startMinKey = binary.BigEndian.Uint64(bytes.TrimPrefix(startIter.Key(), prefix))
startIter.Release()
}
if endMinKey <= startMinKey {
*cachedStartMinKey = startMinKey
return nil, nil
}
startMinKey := binary.BigEndian.Uint64(bytes.TrimPrefix(startIter.Key(), prefix))
startIter.Release()
if endMinKey > startMinKey {
return deleteFromRange(ctx, db, prefix, startMinKey, endMinKey-1)
keys, err := deleteFromRange(ctx, db, prefix, startMinKey, endMinKey-1)
if err == nil {
*cachedStartMinKey = endMinKey - 1
}
return nil, nil
return keys, err
}
38 changes: 20 additions & 18 deletions arbnode/message_pruner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,31 +17,30 @@ func TestMessagePrunerWithPruningEligibleMessagePresent(t *testing.T) {
defer cancel()

messagesCount := uint64(2 * 100 * 1024)
inboxTrackerDb, transactionStreamerDb := setupDatabase(t, 2*100*1024, 2*100*1024)
err := deleteOldMessageFromDB(ctx, arbutil.MessageIndex(messagesCount), messagesCount, inboxTrackerDb, transactionStreamerDb)
inboxTrackerDb, transactionStreamerDb, pruner := setupDatabase(t, 2*100*1024, 2*100*1024)
err := pruner.deleteOldMessagesFromDB(ctx, arbutil.MessageIndex(messagesCount), messagesCount)
Require(t, err)

checkDbKeys(t, messagesCount, transactionStreamerDb, messagePrefix)
checkDbKeys(t, messagesCount, inboxTrackerDb, rlpDelayedMessagePrefix)

}

func TestMessagePrunerTraverseEachMessageOnlyOnce(t *testing.T) {
func TestMessagePrunerTwoHalves(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

messagesCount := uint64(10)
inboxTrackerDb, transactionStreamerDb := setupDatabase(t, messagesCount, messagesCount)
// In first iteration message till messagesCount are tried to be deleted.
err := deleteOldMessageFromDB(ctx, arbutil.MessageIndex(messagesCount), messagesCount, inboxTrackerDb, transactionStreamerDb)
_, transactionStreamerDb, pruner := setupDatabase(t, messagesCount, messagesCount)
// In first iteration message till messagesCount/2 are tried to be deleted.
err := pruner.deleteOldMessagesFromDB(ctx, arbutil.MessageIndex(messagesCount/2), messagesCount/2)
Require(t, err)
// After first iteration messagesCount/2 is reinserted in inbox db
err = inboxTrackerDb.Put(dbKey(messagePrefix, messagesCount/2), []byte{})
// In first iteration all the message till messagesCount/2 are deleted.
checkDbKeys(t, messagesCount/2, transactionStreamerDb, messagePrefix)
// In second iteration message till messagesCount are tried to be deleted.
err = pruner.deleteOldMessagesFromDB(ctx, arbutil.MessageIndex(messagesCount), messagesCount)
Require(t, err)
// In second iteration message till messagesCount are again tried to be deleted.
err = deleteOldMessageFromDB(ctx, arbutil.MessageIndex(messagesCount), messagesCount, inboxTrackerDb, transactionStreamerDb)
Require(t, err)
// In second iteration all the message till messagesCount are deleted again.
// In second iteration all the message till messagesCount are deleted.
checkDbKeys(t, messagesCount, transactionStreamerDb, messagePrefix)
}

Expand All @@ -50,10 +49,10 @@ func TestMessagePrunerPruneTillLessThenEqualTo(t *testing.T) {
defer cancel()

messagesCount := uint64(10)
inboxTrackerDb, transactionStreamerDb := setupDatabase(t, 2*messagesCount, 20)
inboxTrackerDb, transactionStreamerDb, pruner := setupDatabase(t, 2*messagesCount, 20)
err := inboxTrackerDb.Delete(dbKey(messagePrefix, 9))
Require(t, err)
err = deleteOldMessageFromDB(ctx, arbutil.MessageIndex(messagesCount), messagesCount, inboxTrackerDb, transactionStreamerDb)
err = pruner.deleteOldMessagesFromDB(ctx, arbutil.MessageIndex(messagesCount), messagesCount)
Require(t, err)
hasKey, err := transactionStreamerDb.Has(dbKey(messagePrefix, messagesCount))
Require(t, err)
Expand All @@ -67,16 +66,16 @@ func TestMessagePrunerWithNoPruningEligibleMessagePresent(t *testing.T) {
defer cancel()

messagesCount := uint64(10)
inboxTrackerDb, transactionStreamerDb := setupDatabase(t, messagesCount, messagesCount)
err := deleteOldMessageFromDB(ctx, arbutil.MessageIndex(messagesCount), messagesCount, inboxTrackerDb, transactionStreamerDb)
inboxTrackerDb, transactionStreamerDb, pruner := setupDatabase(t, messagesCount, messagesCount)
err := pruner.deleteOldMessagesFromDB(ctx, arbutil.MessageIndex(messagesCount), messagesCount)
Require(t, err)

checkDbKeys(t, uint64(messagesCount), transactionStreamerDb, messagePrefix)
checkDbKeys(t, messagesCount, inboxTrackerDb, rlpDelayedMessagePrefix)

}

func setupDatabase(t *testing.T, messageCount, delayedMessageCount uint64) (ethdb.Database, ethdb.Database) {
func setupDatabase(t *testing.T, messageCount, delayedMessageCount uint64) (ethdb.Database, ethdb.Database, *MessagePruner) {

transactionStreamerDb := rawdb.NewMemoryDatabase()
for i := uint64(0); i < uint64(messageCount); i++ {
Expand All @@ -90,7 +89,10 @@ func setupDatabase(t *testing.T, messageCount, delayedMessageCount uint64) (ethd
Require(t, err)
}

return inboxTrackerDb, transactionStreamerDb
return inboxTrackerDb, transactionStreamerDb, &MessagePruner{
transactionStreamer: &TransactionStreamer{db: transactionStreamerDb},
inboxTracker: &InboxTracker{db: inboxTrackerDb},
}
}

func checkDbKeys(t *testing.T, endCount uint64, db ethdb.Database, prefix []byte) {
Expand Down
6 changes: 3 additions & 3 deletions arbnode/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -825,13 +825,13 @@ func createNodeImpl(
}
}

notifiers := make([]staker.LatestStakedNotifier, 0)
var confirmedNotifiers []staker.LatestConfirmedNotifier
if config.MessagePruner.Enable && !config.Caching.Archive {
messagePruner = NewMessagePruner(txStreamer, inboxTracker, func() *MessagePrunerConfig { return &configFetcher.Get().MessagePruner })
notifiers = append(notifiers, messagePruner)
confirmedNotifiers = append(confirmedNotifiers, messagePruner)
}

stakerObj, err = staker.NewStaker(l1Reader, wallet, bind.CallOpts{}, config.Staker, blockValidator, statelessBlockValidator, notifiers, deployInfo.ValidatorUtils, fatalErrChan)
stakerObj, err = staker.NewStaker(l1Reader, wallet, bind.CallOpts{}, config.Staker, blockValidator, statelessBlockValidator, nil, confirmedNotifiers, deployInfo.ValidatorUtils, fatalErrChan)
if err != nil {
return nil, err
}
Expand Down
38 changes: 28 additions & 10 deletions cmd/daserver/daserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ type DAServerConfig struct {

Metrics bool `koanf:"metrics"`
MetricsServer genericconf.MetricsServerConfig `koanf:"metrics-server"`
PProf bool `koanf:"pprof"`
PprofCfg genericconf.PProf `koanf:"pprof-cfg"`
}

var DefaultDAServerConfig = DAServerConfig{
Expand All @@ -60,6 +62,8 @@ var DefaultDAServerConfig = DAServerConfig{
ConfConfig: genericconf.ConfConfigDefault,
Metrics: false,
MetricsServer: genericconf.MetricsServerConfigDefault,
PProf: false,
PprofCfg: genericconf.PProfDefault,
LogLevel: 3,
}

Expand Down Expand Up @@ -89,6 +93,9 @@ func parseDAServer(args []string) (*DAServerConfig, error) {
f.Bool("metrics", DefaultDAServerConfig.Metrics, "enable metrics")
genericconf.MetricsServerAddOptions("metrics-server", f)

f.Bool("pprof", DefaultDAServerConfig.PProf, "enable pprof")
genericconf.PProfAddOptions("pprof-cfg", f)

f.Int("log-level", int(log.LvlInfo), "log level; 1: ERROR, 2: WARN, 3: INFO, 4: DEBUG, 5: TRACE")
das.DataAvailabilityConfigAddDaserverOptions("data-availability", f)
genericconf.ConfConfigAddOptions("conf", f)
Expand Down Expand Up @@ -135,6 +142,25 @@ func (c *L1ReaderCloser) String() string {
return "l1 reader closer"
}

// Checks metrics and PProf flag, runs them if enabled.
// Note: they are separate so one can enable/disable them as they wish, the only
// requirement is that they can't run on the same address and port.
func startMetrics(cfg *DAServerConfig) error {
mAddr := fmt.Sprintf("%v:%v", cfg.MetricsServer.Addr, cfg.MetricsServer.Port)
pAddr := fmt.Sprintf("%v:%v", cfg.PprofCfg.Addr, cfg.PprofCfg.Port)
if cfg.Metrics && cfg.PProf && mAddr == pAddr {
return fmt.Errorf("metrics and pprof cannot be enabled on the same address:port: %s", mAddr)
}
if cfg.Metrics {
go metrics.CollectProcessMetrics(cfg.MetricsServer.UpdateInterval)
exp.Setup(fmt.Sprintf("%v:%v", cfg.MetricsServer.Addr, cfg.MetricsServer.Port))
}
if cfg.PProf {
genericconf.StartPprof(pAddr)
}
return nil
}

func startup() error {
// Some different defaults to DAS config in a node.
das.DefaultDataAvailabilityConfig.Enable = true
Expand All @@ -151,16 +177,8 @@ func startup() error {
glogger.Verbosity(log.Lvl(serverConfig.LogLevel))
log.Root().SetHandler(glogger)

if serverConfig.Metrics {
if len(serverConfig.MetricsServer.Addr) == 0 {
fmt.Printf("Metrics is enabled, but missing --metrics-server.addr")
return nil
}

go metrics.CollectProcessMetrics(serverConfig.MetricsServer.UpdateInterval)

address := fmt.Sprintf("%v:%v", serverConfig.MetricsServer.Addr, serverConfig.MetricsServer.Port)
exp.Setup(address)
if err := startMetrics(serverConfig); err != nil {
return err
}

sigint := make(chan os.Signal, 1)
Expand Down
1 change: 1 addition & 0 deletions cmd/genericconf/pprof.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"net/http"

// Blank import pprof registers its HTTP handlers.
_ "net/http/pprof" // #nosec G108

"github.com/ethereum/go-ethereum/log"
Expand Down
18 changes: 15 additions & 3 deletions cmd/genericconf/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,20 +189,32 @@ func AuthRPCConfigAddOptions(prefix string, f *flag.FlagSet) {
type MetricsServerConfig struct {
Addr string `koanf:"addr"`
Port int `koanf:"port"`
Pprof bool `koanf:"pprof"`
UpdateInterval time.Duration `koanf:"update-interval"`
}

var MetricsServerConfigDefault = MetricsServerConfig{
Addr: "127.0.0.1",
Port: 6070,
Pprof: false,
UpdateInterval: 3 * time.Second,
}

type PProf struct {
Addr string `koanf:"addr"`
Port int `koanf:"port"`
}

var PProfDefault = PProf{
Addr: "127.0.0.1",
Port: 6071,
}

func MetricsServerAddOptions(prefix string, f *flag.FlagSet) {
f.String(prefix+".addr", MetricsServerConfigDefault.Addr, "metrics server address")
f.Int(prefix+".port", MetricsServerConfigDefault.Port, "metrics server port")
f.Bool(prefix+".pprof", MetricsServerConfigDefault.Pprof, "enable profiling for Go")
f.Duration(prefix+".update-interval", MetricsServerConfigDefault.UpdateInterval, "metrics server update interval")
}

func PProfAddOptions(prefix string, f *flag.FlagSet) {
f.String(prefix+".addr", PProfDefault.Addr, "pprof server address")
f.Int(prefix+".port", PProfDefault.Port, "pprof server port")
}
6 changes: 6 additions & 0 deletions cmd/nitro-val/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ type ValidationNodeConfig struct {
AuthRPC genericconf.AuthRPCConfig `koanf:"auth"`
Metrics bool `koanf:"metrics"`
MetricsServer genericconf.MetricsServerConfig `koanf:"metrics-server"`
PProf bool `koanf:"pprof"`
PprofCfg genericconf.PProf `koanf:"pprof-cfg"`
Workdir string `koanf:"workdir" reload:"hot"`
}

Expand Down Expand Up @@ -67,6 +69,8 @@ var ValidationNodeConfigDefault = ValidationNodeConfig{
AuthRPC: genericconf.AuthRPCConfigDefault,
Metrics: false,
MetricsServer: genericconf.MetricsServerConfigDefault,
PProf: false,
PprofCfg: genericconf.PProfDefault,
Workdir: "",
}

Expand All @@ -83,6 +87,8 @@ func ValidationNodeConfigAddOptions(f *flag.FlagSet) {
genericconf.AuthRPCConfigAddOptions("auth", f)
f.Bool("metrics", ValidationNodeConfigDefault.Metrics, "enable metrics")
genericconf.MetricsServerAddOptions("metrics-server", f)
f.Bool("pprof", ValidationNodeConfigDefault.PProf, "enable pprof")
genericconf.PProfAddOptions("pprof-cfg", f)
f.String("workdir", ValidationNodeConfigDefault.Workdir, "path used for purpose of resolving relative paths (ia. jwt secret file, log files), if empty then current working directory will be used.")
}

Expand Down
35 changes: 21 additions & 14 deletions cmd/nitro-val/nitro_val.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,25 @@ func main() {
os.Exit(mainImpl())
}

// Checks metrics and PProf flag, runs them if enabled.
// Note: they are separate so one can enable/disable them as they wish, the only
// requirement is that they can't run on the same address and port.
func startMetrics(cfg *ValidationNodeConfig) error {
mAddr := fmt.Sprintf("%v:%v", cfg.MetricsServer.Addr, cfg.MetricsServer.Port)
pAddr := fmt.Sprintf("%v:%v", cfg.PprofCfg.Addr, cfg.PprofCfg.Port)
if cfg.Metrics && cfg.PProf && mAddr == pAddr {
return fmt.Errorf("metrics and pprof cannot be enabled on the same address:port: %s", mAddr)
}
if cfg.Metrics {
go metrics.CollectProcessMetrics(cfg.MetricsServer.UpdateInterval)
exp.Setup(fmt.Sprintf("%v:%v", cfg.MetricsServer.Addr, cfg.MetricsServer.Port))
}
if cfg.PProf {
genericconf.StartPprof(pAddr)
}
return nil
}

// Returns the exit code
func mainImpl() int {
ctx, cancelFunc := context.WithCancel(context.Background())
Expand Down Expand Up @@ -96,20 +115,8 @@ func mainImpl() int {
log.Crit("failed to initialize geth stack", "err", err)
}

if nodeConfig.Metrics {
go metrics.CollectProcessMetrics(nodeConfig.MetricsServer.UpdateInterval)

if nodeConfig.MetricsServer.Addr != "" {
address := fmt.Sprintf("%v:%v", nodeConfig.MetricsServer.Addr, nodeConfig.MetricsServer.Port)
if nodeConfig.MetricsServer.Pprof {
genericconf.StartPprof(address)
} else {
exp.Setup(address)
}
}
} else if nodeConfig.MetricsServer.Pprof {
flag.Usage()
log.Error("--metrics must be enabled in order to use pprof with the metrics server")
if err := startMetrics(nodeConfig); err != nil {
log.Error("Starting metrics: %v", err)
return 1
}

Expand Down
Loading

0 comments on commit 87563a0

Please sign in to comment.