Skip to content

Commit

Permalink
Merge pull request #2071 from OffchainLabs/support-multiple-validatio…
Browse files Browse the repository at this point in the history
…n-rpcs
  • Loading branch information
ganeshvanahalli authored Feb 14, 2024
2 parents b8d35e7 + 1e1f96c commit d6251df
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 53 deletions.
7 changes: 4 additions & 3 deletions arbnode/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"github.com/offchainlabs/nitro/util/contracts"
"github.com/offchainlabs/nitro/util/headerreader"
"github.com/offchainlabs/nitro/util/redisutil"
"github.com/offchainlabs/nitro/util/rpcclient"
"github.com/offchainlabs/nitro/util/signature"
"github.com/offchainlabs/nitro/wsbroadcastserver"
)
Expand Down Expand Up @@ -198,7 +199,7 @@ func ConfigDefaultL1NonSequencerTest() *Config {
config.BlockValidator = staker.TestBlockValidatorConfig
config.Staker = staker.TestL1ValidatorConfig
config.Staker.Enable = false
config.BlockValidator.ValidationServer.URL = ""
config.BlockValidator.ValidationServerConfigs = []rpcclient.ClientConfig{{URL: ""}}

return &config
}
Expand All @@ -213,7 +214,7 @@ func ConfigDefaultL2Test() *Config {
config.SeqCoordinator.Signer.ECDSA.Dangerous.AcceptMissing = true
config.Staker = staker.TestL1ValidatorConfig
config.Staker.Enable = false
config.BlockValidator.ValidationServer.URL = ""
config.BlockValidator.ValidationServerConfigs = []rpcclient.ClientConfig{{URL: ""}}
config.TransactionStreamer = DefaultTransactionStreamerConfig

return &config
Expand Down Expand Up @@ -541,7 +542,7 @@ func createNodeImpl(
txStreamer.SetInboxReaders(inboxReader, delayedBridge)

var statelessBlockValidator *staker.StatelessBlockValidator
if config.BlockValidator.ValidationServer.URL != "" {
if config.BlockValidator.ValidationServerConfigs[0].URL != "" {
statelessBlockValidator, err = staker.NewStatelessBlockValidator(
inboxReader,
inboxTracker,
Expand Down
2 changes: 1 addition & 1 deletion cmd/nitro/nitro.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ func mainImpl() int {
}

var sameProcessValidationNodeEnabled bool
if nodeConfig.Node.BlockValidator.Enable && (nodeConfig.Node.BlockValidator.ValidationServer.URL == "self" || nodeConfig.Node.BlockValidator.ValidationServer.URL == "self-auth") {
if nodeConfig.Node.BlockValidator.Enable && (nodeConfig.Node.BlockValidator.ValidationServerConfigs[0].URL == "self" || nodeConfig.Node.BlockValidator.ValidationServerConfigs[0].URL == "self-auth") {
sameProcessValidationNodeEnabled = true
valnode.EnsureValidationExposedViaAuthRPC(&stackConf)
}
Expand Down
100 changes: 65 additions & 35 deletions staker/block_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package staker

import (
"context"
"encoding/json"
"errors"
"fmt"
"runtime"
Expand Down Expand Up @@ -82,16 +83,18 @@ type BlockValidator struct {
}

type BlockValidatorConfig struct {
Enable bool `koanf:"enable"`
ValidationServer rpcclient.ClientConfig `koanf:"validation-server" reload:"hot"`
ValidationPoll time.Duration `koanf:"validation-poll" reload:"hot"`
PrerecordedBlocks uint64 `koanf:"prerecorded-blocks" reload:"hot"`
ForwardBlocks uint64 `koanf:"forward-blocks" reload:"hot"`
CurrentModuleRoot string `koanf:"current-module-root"` // TODO(magic) requires reinitialization on hot reload
PendingUpgradeModuleRoot string `koanf:"pending-upgrade-module-root"` // TODO(magic) requires StatelessBlockValidator recreation on hot reload
FailureIsFatal bool `koanf:"failure-is-fatal" reload:"hot"`
Dangerous BlockValidatorDangerousConfig `koanf:"dangerous"`
MemoryFreeLimit string `koanf:"memory-free-limit" reload:"hot"`
Enable bool `koanf:"enable"`
ValidationServer rpcclient.ClientConfig `koanf:"validation-server" reload:"hot"`
ValidationServerConfigs []rpcclient.ClientConfig `koanf:"validation-server-configs" reload:"hot"`
ValidationPoll time.Duration `koanf:"validation-poll" reload:"hot"`
PrerecordedBlocks uint64 `koanf:"prerecorded-blocks" reload:"hot"`
ForwardBlocks uint64 `koanf:"forward-blocks" reload:"hot"`
CurrentModuleRoot string `koanf:"current-module-root"` // TODO(magic) requires reinitialization on hot reload
PendingUpgradeModuleRoot string `koanf:"pending-upgrade-module-root"` // TODO(magic) requires StatelessBlockValidator recreation on hot reload
FailureIsFatal bool `koanf:"failure-is-fatal" reload:"hot"`
Dangerous BlockValidatorDangerousConfig `koanf:"dangerous"`
MemoryFreeLimit string `koanf:"memory-free-limit" reload:"hot"`
ValidationServerConfigsList string `koanf:"validation-server-configs-list" reload:"hot"`

memoryFreeLimit int
}
Expand All @@ -106,7 +109,26 @@ func (c *BlockValidatorConfig) Validate() error {
}
c.memoryFreeLimit = limit
}
return c.ValidationServer.Validate()
if c.ValidationServerConfigs == nil {
if c.ValidationServerConfigsList == "default" {
c.ValidationServerConfigs = []rpcclient.ClientConfig{c.ValidationServer}
} else {
var validationServersConfigs []rpcclient.ClientConfig
if err := json.Unmarshal([]byte(c.ValidationServerConfigsList), &validationServersConfigs); err != nil {
return fmt.Errorf("failed to parse block-validator validation-server-configs-list string: %w", err)
}
c.ValidationServerConfigs = validationServersConfigs
}
}
if len(c.ValidationServerConfigs) == 0 {
return fmt.Errorf("block-validator validation-server-configs is empty, need at least one validation server config")
}
for _, serverConfig := range c.ValidationServerConfigs {
if err := serverConfig.Validate(); err != nil {
return fmt.Errorf("failed to validate one of the block-validator validation-server-configs. url: %s, err: %w", serverConfig.URL, err)
}
}
return nil
}

type BlockValidatorDangerousConfig struct {
Expand All @@ -118,6 +140,7 @@ type BlockValidatorConfigFetcher func() *BlockValidatorConfig
func BlockValidatorConfigAddOptions(prefix string, f *flag.FlagSet) {
f.Bool(prefix+".enable", DefaultBlockValidatorConfig.Enable, "enable block-by-block validation")
rpcclient.RPCClientAddOptions(prefix+".validation-server", f, &DefaultBlockValidatorConfig.ValidationServer)
f.String(prefix+".validation-server-configs-list", DefaultBlockValidatorConfig.ValidationServerConfigsList, "array of validation rpc configs given as a json string. time duration should be supplied in number indicating nanoseconds")
f.Duration(prefix+".validation-poll", DefaultBlockValidatorConfig.ValidationPoll, "poll time to check validations")
f.Uint64(prefix+".forward-blocks", DefaultBlockValidatorConfig.ForwardBlocks, "prepare entries for up to that many blocks ahead of validation (small footprint)")
f.Uint64(prefix+".prerecorded-blocks", DefaultBlockValidatorConfig.PrerecordedBlocks, "record that many blocks ahead of validation (larger footprint)")
Expand All @@ -133,21 +156,23 @@ func BlockValidatorDangerousConfigAddOptions(prefix string, f *flag.FlagSet) {
}

var DefaultBlockValidatorConfig = BlockValidatorConfig{
Enable: false,
ValidationServer: rpcclient.DefaultClientConfig,
ValidationPoll: time.Second,
ForwardBlocks: 1024,
PrerecordedBlocks: uint64(2 * runtime.NumCPU()),
CurrentModuleRoot: "current",
PendingUpgradeModuleRoot: "latest",
FailureIsFatal: true,
Dangerous: DefaultBlockValidatorDangerousConfig,
MemoryFreeLimit: "default",
Enable: false,
ValidationServerConfigsList: "default",
ValidationServer: rpcclient.DefaultClientConfig,
ValidationPoll: time.Second,
ForwardBlocks: 1024,
PrerecordedBlocks: uint64(2 * runtime.NumCPU()),
CurrentModuleRoot: "current",
PendingUpgradeModuleRoot: "latest",
FailureIsFatal: true,
Dangerous: DefaultBlockValidatorDangerousConfig,
MemoryFreeLimit: "default",
}

var TestBlockValidatorConfig = BlockValidatorConfig{
Enable: false,
ValidationServer: rpcclient.TestClientConfig,
ValidationServerConfigs: []rpcclient.ClientConfig{rpcclient.TestClientConfig},
ValidationPoll: 100 * time.Millisecond,
ForwardBlocks: 128,
PrerecordedBlocks: uint64(2 * runtime.NumCPU()),
Expand Down Expand Up @@ -667,14 +692,12 @@ func (v *BlockValidator) advanceValidations(ctx context.Context) (*arbutil.Messa
defer v.reorgMutex.RUnlock()

wasmRoots := v.GetModuleRootsToValidate()
room := 100 // even if there is more room then that it's fine
for _, spawner := range v.validationSpawners {
rooms := make([]int, len(v.validationSpawners))
currentSpawnerIndex := 0
for i, spawner := range v.validationSpawners {
here := spawner.Room() / len(wasmRoots)
if here <= 0 {
room = 0
}
if here < room {
room = here
if here > 0 {
rooms[i] = here
}
}
pos := v.validated() - 1 // to reverse the first +1 in the loop
Expand Down Expand Up @@ -745,7 +768,13 @@ validationsLoop:
log.Trace("result validated", "count", v.validated(), "blockHash", v.lastValidGS.BlockHash)
continue
}
if room == 0 {
for currentSpawnerIndex < len(rooms) {
if rooms[currentSpawnerIndex] > 0 {
break
}
currentSpawnerIndex++
}
if currentSpawnerIndex == len(rooms) {
log.Trace("advanceValidations: no more room", "pos", pos)
return nil, nil
}
Expand All @@ -772,11 +801,9 @@ validationsLoop:
defer validatorPendingValidationsGauge.Dec(1)
var runs []validator.ValidationRun
for _, moduleRoot := range wasmRoots {
for i, spawner := range v.validationSpawners {
run := spawner.Launch(input, moduleRoot)
log.Trace("advanceValidations: launched", "pos", validationStatus.Entry.Pos, "moduleRoot", moduleRoot, "spawner", i)
runs = append(runs, run)
}
run := v.validationSpawners[currentSpawnerIndex].Launch(input, moduleRoot)
log.Trace("advanceValidations: launched", "pos", validationStatus.Entry.Pos, "moduleRoot", moduleRoot, "spawner", currentSpawnerIndex)
runs = append(runs, run)
}
validationCtx, cancel := context.WithCancel(ctx)
validationStatus.Runs = runs
Expand All @@ -798,7 +825,10 @@ validationsLoop:
}
nonBlockingTrigger(v.progressValidationsChan)
})
room--
rooms[currentSpawnerIndex]--
if rooms[currentSpawnerIndex] == 0 {
currentSpawnerIndex++
}
}
}
}
Expand Down
10 changes: 7 additions & 3 deletions staker/stateless_block_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,14 +226,18 @@ func NewStatelessBlockValidator(
config func() *BlockValidatorConfig,
stack *node.Node,
) (*StatelessBlockValidator, error) {
valConfFetcher := func() *rpcclient.ClientConfig { return &config().ValidationServer }
valClient := server_api.NewValidationClient(valConfFetcher, stack)
validationSpawners := make([]validator.ValidationSpawner, len(config().ValidationServerConfigs))
for i, serverConfig := range config().ValidationServerConfigs {
valConfFetcher := func() *rpcclient.ClientConfig { return &serverConfig }
validationSpawners[i] = server_api.NewValidationClient(valConfFetcher, stack)
}
valConfFetcher := func() *rpcclient.ClientConfig { return &config().ValidationServerConfigs[0] }
execClient := server_api.NewExecutionClient(valConfFetcher, stack)
validator := &StatelessBlockValidator{
config: config(),
execSpawner: execClient,
recorder: recorder,
validationSpawners: []validator.ValidationSpawner{valClient},
validationSpawners: validationSpawners,
inboxReader: inboxReader,
inboxTracker: inbox,
streamer: streamer,
Expand Down
7 changes: 4 additions & 3 deletions system_tests/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -541,15 +541,15 @@ func StaticFetcherFrom[T any](t *testing.T, config *T) func() *T {
}

func configByValidationNode(t *testing.T, clientConfig *arbnode.Config, valStack *node.Node) {
clientConfig.BlockValidator.ValidationServer.URL = valStack.WSEndpoint()
clientConfig.BlockValidator.ValidationServer.JWTSecret = ""
clientConfig.BlockValidator.ValidationServerConfigs[0].URL = valStack.WSEndpoint()
clientConfig.BlockValidator.ValidationServerConfigs[0].JWTSecret = ""
}

func AddDefaultValNode(t *testing.T, ctx context.Context, nodeConfig *arbnode.Config, useJit bool) {
if !nodeConfig.ValidatorRequired() {
return
}
if nodeConfig.BlockValidator.ValidationServer.URL != "" {
if nodeConfig.BlockValidator.ValidationServerConfigs[0].URL != "" {
return
}
conf := valnode.TestValidationConfig
Expand Down Expand Up @@ -925,6 +925,7 @@ func Create2ndNodeWithConfig(
AddDefaultValNode(t, ctx, nodeConfig, true)

Require(t, execConfig.Validate())
Require(t, nodeConfig.Validate())
configFetcher := func() *gethexec.Config { return execConfig }
currentExec, err := gethexec.CreateExecutionNode(ctx, l2stack, l2chainDb, l2blockchain, l1client, configFetcher)
Require(t, err)
Expand Down
16 changes: 8 additions & 8 deletions util/rpcclient/rpcclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ import (
)

type ClientConfig struct {
URL string `koanf:"url"`
JWTSecret string `koanf:"jwtsecret"`
Timeout time.Duration `koanf:"timeout" reload:"hot"`
Retries uint `koanf:"retries" reload:"hot"`
ConnectionWait time.Duration `koanf:"connection-wait"`
ArgLogLimit uint `koanf:"arg-log-limit" reload:"hot"`
RetryErrors string `koanf:"retry-errors" reload:"hot"`
RetryDelay time.Duration `koanf:"retry-delay"`
URL string `json:"url,omitempty" koanf:"url"`
JWTSecret string `json:"jwtsecret,omitempty" koanf:"jwtsecret"`
Timeout time.Duration `json:"timeout,omitempty" koanf:"timeout" reload:"hot"`
Retries uint `json:"retries,omitempty" koanf:"retries" reload:"hot"`
ConnectionWait time.Duration `json:"connection-wait,omitempty" koanf:"connection-wait"`
ArgLogLimit uint `json:"arg-log-limit,omitempty" koanf:"arg-log-limit" reload:"hot"`
RetryErrors string `json:"retry-errors,omitempty" koanf:"retry-errors" reload:"hot"`
RetryDelay time.Duration `json:"retry-delay,omitempty" koanf:"retry-delay"`

retryErrors *regexp.Regexp
}
Expand Down

0 comments on commit d6251df

Please sign in to comment.