From 6818ae66e9cae87e14231bfbb518cb00d8cbc3a4 Mon Sep 17 00:00:00 2001 From: Renan Santos Date: Fri, 30 Aug 2024 10:31:51 -0300 Subject: [PATCH] feat: add the machines package --- docs/config.md | 51 +++++- internal/node/advancer/machines/machines.go | 167 ++++++++++++++++++++ internal/node/config/config.go | 18 +++ internal/node/config/generate/Config.toml | 48 +++++- internal/node/config/generate/code.go | 12 ++ internal/node/config/generated.go | 96 +++++++++++ internal/node/model/models.go | 1 + internal/nodemachine/machine.go | 11 +- 8 files changed, 397 insertions(+), 7 deletions(-) create mode 100644 internal/node/advancer/machines/machines.go diff --git a/docs/config.md b/docs/config.md index 67a4653ae..6fd5aa5d0 100644 --- a/docs/config.md +++ b/docs/config.md @@ -223,6 +223,13 @@ When enabled, will connect to postgres database via SSL. * **Type:** `bool` * **Default:** `"true"` +## `CARTESI_ADVANCER_POLLING_INTERVAL` + +How many seconds the node will wait before querying the database for new inputs. + +* **Type:** `Duration` +* **Default:** `"30"` + ## `CARTESI_EPOCH_LENGTH` Length of a rollups epoch in blocks. @@ -234,7 +241,7 @@ At the end of each epoch, the node will send claims to the blockchain. ## `CARTESI_EVM_READER_RETRY_POLICY_MAX_DELAY` -How seconds the retry policy will wait between retries. +How many seconds the retry policy will wait between retries. * **Type:** `Duration` * **Default:** `"3"` @@ -246,6 +253,13 @@ How many times some functions should be retried after an error. * **Type:** `uint64` * **Default:** `"3"` +## `CARTESI_MAX_CONCURRENT_INSPECTS` + +Maximum number of inspect-state requests that can be concurrently active. + +* **Type:** `uint8` +* **Default:** `"10"` + ## `CARTESI_VALIDATOR_POLLING_INTERVAL` How many seconds the node will wait before trying to finish epochs for all applications. @@ -258,3 +272,38 @@ How many seconds the node will wait before trying to finish epochs for all appli Path to the directory with the cartesi-machine snapshot that will be loaded by the node. * **Type:** `string` + +## `CARTESI_MACHINE_ADVANCE_TIMEOUT` + +TODO. + +* **Type:** `Duration` +* **Default:** `"60"` + +## `CARTESI_MACHINE_INC_CYCLES` + +TODO. + +* **Type:** `uint64` +* **Default:** `"50000000"` + +## `CARTESI_MACHINE_INSPECT_TIMEOUT` + +TODO. + +* **Type:** `Duration` +* **Default:** `"10"` + +## `CARTESI_MACHINE_MAX_CYCLES` + +TODO. + +* **Type:** `uint64` +* **Default:** `"5000000000"` + +## `CARTESI_MACHINE_SERVER_VERBOSITY` + +TODO. + +* **Type:** `string` +* **Default:** `"info"` diff --git a/internal/node/advancer/machines/machines.go b/internal/node/advancer/machines/machines.go new file mode 100644 index 000000000..00d35fb8d --- /dev/null +++ b/internal/node/advancer/machines/machines.go @@ -0,0 +1,167 @@ +// (c) Cartesi and individual authors (see AUTHORS) +// SPDX-License-Identifier: Apache-2.0 (see LICENSE) + +package machines + +import ( + "context" + "errors" + "fmt" + "log/slog" + "os" + "sync" + + "github.com/cartesi/rollups-node/internal/node/config" + "github.com/cartesi/rollups-node/internal/node/model" + "github.com/cartesi/rollups-node/internal/nodemachine" + "github.com/cartesi/rollups-node/internal/repository" + "github.com/cartesi/rollups-node/pkg/emulator" + "github.com/cartesi/rollups-node/pkg/rollupsmachine" + "github.com/cartesi/rollups-node/pkg/rollupsmachine/cartesimachine" +) + +type Repository interface { + GetAppData(context.Context) ([]*repository.AppData, error) +} + +type AdvanceMachine interface { + Advance(_ context.Context, input []byte, index uint64) (*nodemachine.AdvanceResult, error) +} + +type InspectMachine interface { + Inspect(_ context.Context, query []byte) (*nodemachine.InspectResult, error) +} + +type Machines struct { + mutex sync.RWMutex + machines map[model.Address]*nodemachine.NodeMachine +} + +func Load(ctx context.Context, config config.NodeConfig, repo Repository) (*Machines, error) { + appData, err := repo.GetAppData(ctx) + if err != nil { + return nil, err + } + + machines := map[model.Address]*nodemachine.NodeMachine{} + + maxConcurrentInspects := config.MaxConcurrentInspects + + serverVerbosity := config.MachineServerVerbosity + machineInc := config.MachineIncCycles + machineMax := config.MachineMaxCycles + machineAdvanceTimeout := config.MachineAdvanceTimeout + machineInspectTimeout := config.MachineInspectTimeout + + for _, appData := range appData { + appAddress := appData.AppAddress + snapshotPath := appData.SnapshotPath + snapshotInputIndex := appData.InputIndex + + address, err := cartesimachine.StartServer(serverVerbosity, 0, os.Stdout, os.Stderr) + if err != nil { + return nil, closeMachines(machines) + } + + config := &emulator.MachineRuntimeConfig{} + cartesiMachine, err := cartesimachine.Load(ctx, snapshotPath, address, config) + if err != nil { + err = errors.Join(err, cartesimachine.StopServer(address), closeMachines(machines)) + return nil, err + } + + rollupsMachine, err := rollupsmachine.New(ctx, cartesiMachine, machineInc, machineMax) + if err != nil { + err = errors.Join(err, cartesiMachine.Close(ctx), closeMachines(machines)) + return nil, err + } + + nodeMachine, err := nodemachine.New(rollupsMachine, + snapshotInputIndex, + machineAdvanceTimeout, + machineInspectTimeout, + maxConcurrentInspects) + if err != nil { + err = errors.Join(err, rollupsMachine.Close(ctx), closeMachines(machines)) + return nil, err + } + + machines[appAddress] = nodeMachine + } + + return &Machines{machines: machines}, nil +} + +func (m *Machines) GetAdvanceMachine(app model.Address) AdvanceMachine { + return m.get(app) +} + +func (m *Machines) GetInspectMachine(app model.Address) InspectMachine { + return m.get(app) +} + +func (m *Machines) Set(app model.Address, machine *nodemachine.NodeMachine) bool { + m.mutex.Lock() + defer m.mutex.Unlock() + + if _, ok := m.machines[app]; ok { + return false + } else { + m.machines[app] = machine + return true + } +} + +func (m *Machines) Remove(app model.Address) *nodemachine.NodeMachine { + m.mutex.Lock() + defer m.mutex.Unlock() + + if machine, ok := m.machines[app]; ok { + return nil + } else { + delete(m.machines, app) + return machine + } +} + +func (m *Machines) Keys() []model.Address { + m.mutex.RLock() + defer m.mutex.Unlock() + + keys := make([]model.Address, len(m.machines)) + i := 0 + for k := range m.machines { + keys[i] = k + i++ + } + return keys +} + +func (m *Machines) Close() error { + m.mutex.Lock() + defer m.mutex.Unlock() + + err := closeMachines(m.machines) + if err != nil { + slog.Error(fmt.Sprintf("failed to close some machines: %v", err)) + } + return err +} + +// ------------------------------------------------------------------------------------------------ + +func (m *Machines) get(app model.Address) *nodemachine.NodeMachine { + m.mutex.RLock() + defer m.mutex.Unlock() + return m.machines[app] +} + +func closeMachines(machines map[model.Address]*nodemachine.NodeMachine) (err error) { + for _, machine := range machines { + err = errors.Join(err, machine.Close()) + } + for app := range machines { + delete(machines, app) + } + return +} diff --git a/internal/node/config/config.go b/internal/node/config/config.go index a67323e49..b7f0c8887 100644 --- a/internal/node/config/config.go +++ b/internal/node/config/config.go @@ -8,6 +8,8 @@ package config import ( "fmt" "os" + + "github.com/cartesi/rollups-node/pkg/rollupsmachine/cartesimachine" ) // NodeConfig contains all the Node variables. @@ -39,7 +41,15 @@ type NodeConfig struct { ExperimentalSunodoValidatorEnabled bool ExperimentalSunodoValidatorRedisEndpoint string Auth Auth + MaxConcurrentInspects uint8 + AdvancerPollingInterval Duration ValidatorPollingInterval Duration + // Temporary + MachineServerVerbosity cartesimachine.ServerVerbosity + MachineIncCycles uint64 + MachineMaxCycles uint64 + MachineAdvanceTimeout Duration + MachineInspectTimeout Duration } // Auth is used to sign transactions. @@ -106,7 +116,15 @@ func FromEnv() NodeConfig { if getFeatureClaimerEnabled() && !getExperimentalSunodoValidatorEnabled() { config.Auth = authFromEnv() } + config.MaxConcurrentInspects = getMaxConcurrentInspects() + config.AdvancerPollingInterval = getAdvancerPollingInterval() config.ValidatorPollingInterval = getValidatorPollingInterval() + // Temporary. + config.MachineServerVerbosity = cartesimachine.ServerVerbosity(getMachineServerVerbosity()) + config.MachineIncCycles = getMachineIncCycles() + config.MachineMaxCycles = getMachineMaxCycles() + config.MachineAdvanceTimeout = getMachineAdvanceTimeout() + config.MachineInspectTimeout = getMachineInspectTimeout() return config } diff --git a/internal/node/config/generate/Config.toml b/internal/node/config/generate/Config.toml index a1b9dfe27..c29a98448 100644 --- a/internal/node/config/generate/Config.toml +++ b/internal/node/config/generate/Config.toml @@ -54,7 +54,19 @@ How many times some functions should be retried after an error.""" default = "3" go-type = "Duration" description = """ -How seconds the retry policy will wait between retries.""" +How many seconds the retry policy will wait between retries.""" + +[rollups.CARTESI_MAX_CONCURRENT_INSPECTS] +default = "10" +go-type = "uint8" +description = """ +Maximum number of inspect-state requests that can be concurrently active.""" + +[rollups.CARTESI_ADVANCER_POLLING_INTERVAL] +default = "30" +go-type = "Duration" +description = """ +How many seconds the node will wait before querying the database for new inputs.""" [rollups.CARTESI_VALIDATOR_POLLING_INTERVAL] default = "30" @@ -250,3 +262,37 @@ go-type = "bool" description = """ When enabled, prints server-manager output to stdout and stderr directly. All other log configurations are ignored.""" + +# +# Temporary +# + +[temp.CARTESI_MACHINE_SERVER_VERBOSITY] +default = "info" +go-type = "string" +description = """ +TODO.""" + +[temp.CARTESI_MACHINE_INC_CYCLES] +default = "50000000" +go-type = "uint64" +description = """ +TODO.""" + +[temp.CARTESI_MACHINE_MAX_CYCLES] +default = "5000000000" +go-type = "uint64" +description = """ +TODO.""" + +[temp.CARTESI_MACHINE_ADVANCE_TIMEOUT] +default = "60" +go-type = "Duration" +description = """ +TODO.""" + +[temp.CARTESI_MACHINE_INSPECT_TIMEOUT] +default = "10" +go-type = "Duration" +description = """ +TODO.""" diff --git a/internal/node/config/generate/code.go b/internal/node/config/generate/code.go index 33f30e900..b7dbd0969 100644 --- a/internal/node/config/generate/code.go +++ b/internal/node/config/generate/code.go @@ -100,6 +100,16 @@ func ToInt64FromString(s string) (int64, error) { return strconv.ParseInt(s, 10, 64) } +func ToUint8FromString(s string) (uint8, error) { + value, err := strconv.ParseUint(s, 10, 8) + return uint8(value), err +} + +func ToUint32FromString(s string) (uint32, error) { + value, err := strconv.ParseUint(s, 10, 32) + return uint32(value), err +} + func ToUint64FromString(s string) (uint64, error) { value, err := strconv.ParseUint(s, 10, 64) return value, err @@ -164,6 +174,8 @@ var ( toBool = strconv.ParseBool toInt = strconv.Atoi toInt64 = ToInt64FromString + toUint8 = ToUint8FromString + toUint32 = ToUint32FromString toUint64 = ToUint64FromString toString = ToStringFromString toDuration = ToDurationFromSeconds diff --git a/internal/node/config/generated.go b/internal/node/config/generated.go index 24c899e2e..73f25822a 100644 --- a/internal/node/config/generated.go +++ b/internal/node/config/generated.go @@ -44,6 +44,16 @@ func ToInt64FromString(s string) (int64, error) { return strconv.ParseInt(s, 10, 64) } +func ToUint8FromString(s string) (uint8, error) { + value, err := strconv.ParseUint(s, 10, 8) + return uint8(value), err +} + +func ToUint32FromString(s string) (uint32, error) { + value, err := strconv.ParseUint(s, 10, 32) + return uint32(value), err +} + func ToUint64FromString(s string) (uint64, error) { value, err := strconv.ParseUint(s, 10, 64) return value, err @@ -108,6 +118,8 @@ var ( toBool = strconv.ParseBool toInt = strconv.Atoi toInt64 = ToInt64FromString + toUint8 = ToUint8FromString + toUint32 = ToUint32FromString toUint64 = ToUint64FromString toString = ToStringFromString toDuration = ToDurationFromSeconds @@ -468,6 +480,18 @@ func getPostgresSslmodeEnabled() bool { return val } +func getAdvancerPollingInterval() Duration { + s, ok := os.LookupEnv("CARTESI_ADVANCER_POLLING_INTERVAL") + if !ok { + s = "30" + } + val, err := toDuration(s) + if err != nil { + panic(fmt.Sprintf("failed to parse CARTESI_ADVANCER_POLLING_INTERVAL: %v", err)) + } + return val +} + func getEpochLength() uint64 { s, ok := os.LookupEnv("CARTESI_EPOCH_LENGTH") if !ok { @@ -504,6 +528,18 @@ func getEvmReaderRetryPolicyMaxRetries() uint64 { return val } +func getMaxConcurrentInspects() uint8 { + s, ok := os.LookupEnv("CARTESI_MAX_CONCURRENT_INSPECTS") + if !ok { + s = "10" + } + val, err := toUint8(s) + if err != nil { + panic(fmt.Sprintf("failed to parse CARTESI_MAX_CONCURRENT_INSPECTS: %v", err)) + } + return val +} + func getValidatorPollingInterval() Duration { s, ok := os.LookupEnv("CARTESI_VALIDATOR_POLLING_INTERVAL") if !ok { @@ -527,3 +563,63 @@ func getSnapshotDir() string { } return val } + +func getMachineAdvanceTimeout() Duration { + s, ok := os.LookupEnv("CARTESI_MACHINE_ADVANCE_TIMEOUT") + if !ok { + s = "60" + } + val, err := toDuration(s) + if err != nil { + panic(fmt.Sprintf("failed to parse CARTESI_MACHINE_ADVANCE_TIMEOUT: %v", err)) + } + return val +} + +func getMachineIncCycles() uint64 { + s, ok := os.LookupEnv("CARTESI_MACHINE_INC_CYCLES") + if !ok { + s = "50000000" + } + val, err := toUint64(s) + if err != nil { + panic(fmt.Sprintf("failed to parse CARTESI_MACHINE_INC_CYCLES: %v", err)) + } + return val +} + +func getMachineInspectTimeout() Duration { + s, ok := os.LookupEnv("CARTESI_MACHINE_INSPECT_TIMEOUT") + if !ok { + s = "10" + } + val, err := toDuration(s) + if err != nil { + panic(fmt.Sprintf("failed to parse CARTESI_MACHINE_INSPECT_TIMEOUT: %v", err)) + } + return val +} + +func getMachineMaxCycles() uint64 { + s, ok := os.LookupEnv("CARTESI_MACHINE_MAX_CYCLES") + if !ok { + s = "5000000000" + } + val, err := toUint64(s) + if err != nil { + panic(fmt.Sprintf("failed to parse CARTESI_MACHINE_MAX_CYCLES: %v", err)) + } + return val +} + +func getMachineServerVerbosity() string { + s, ok := os.LookupEnv("CARTESI_MACHINE_SERVER_VERBOSITY") + if !ok { + s = "info" + } + val, err := toString(s) + if err != nil { + panic(fmt.Sprintf("failed to parse CARTESI_MACHINE_SERVER_VERBOSITY: %v", err)) + } + return val +} diff --git a/internal/node/model/models.go b/internal/node/model/models.go index ab11d1d99..b0bd46c3d 100644 --- a/internal/node/model/models.go +++ b/internal/node/model/models.go @@ -65,6 +65,7 @@ type Application struct { Id uint64 ContractAddress Address TemplateHash Hash + TemplateUri string LastProcessedBlock uint64 Status ApplicationStatus IConsensusAddress Address diff --git a/internal/nodemachine/machine.go b/internal/nodemachine/machine.go index 4abf71da6..396d1a0b0 100644 --- a/internal/nodemachine/machine.go +++ b/internal/nodemachine/machine.go @@ -34,7 +34,7 @@ type AdvanceResult struct { } type InspectResult struct { - InputIndex uint64 + InputIndex *uint64 Accepted bool Reports [][]byte Error error @@ -44,7 +44,8 @@ type NodeMachine struct { inner rollupsmachine.RollupsMachine // Index of the last Input that was processed. - lastInputIndex uint64 + // Can be nil if no inputs were processed. + lastInputIndex *uint64 // How long a call to inner.Advance or inner.Inspect can take. advanceTimeout, inspectTimeout time.Duration @@ -68,7 +69,7 @@ type NodeMachine struct { func New( inner rollupsmachine.RollupsMachine, - inputIndex uint64, + inputIndex *uint64, advanceTimeout time.Duration, inspectTimeout time.Duration, maxConcurrentInspects uint8, @@ -148,14 +149,14 @@ func (machine *NodeMachine) Advance(ctx context.Context, // Replaces the current machine with the fork and updates lastInputIndex. machine.mutex.HLock() machine.inner = fork - machine.lastInputIndex = index + machine.lastInputIndex = &index machine.mutex.Unlock() } else { // Closes the forked machine. err = fork.Close(ctx) // Updates lastInputIndex. machine.mutex.HLock() - machine.lastInputIndex = index + machine.lastInputIndex = &index machine.mutex.Unlock() }