From 45e8bff1bddf42f85722ce0fbab42a5b7cbb92b4 Mon Sep 17 00:00:00 2001 From: Renan Santos Date: Tue, 21 May 2024 18:11:27 -0300 Subject: [PATCH] feat: add machine-advancer service --- internal/node/machineadvancer/advancer.go | 35 +++++++ internal/node/nodemachine/machine.go | 108 +++++++++++++++++++++ internal/node/nodemachine/pmutex/pmutex.go | 56 +++++++++++ 3 files changed, 199 insertions(+) create mode 100644 internal/node/machineadvancer/advancer.go create mode 100644 internal/node/nodemachine/machine.go create mode 100644 internal/node/nodemachine/pmutex/pmutex.go diff --git a/internal/node/machineadvancer/advancer.go b/internal/node/machineadvancer/advancer.go new file mode 100644 index 000000000..5b79c3e76 --- /dev/null +++ b/internal/node/machineadvancer/advancer.go @@ -0,0 +1,35 @@ +// (c) Cartesi and individual authors (see AUTHORS) +// SPDX-License-Identifier: Apache-2.0 (see LICENSE) + +package machineadvancer + +import "github.com/cartesi/rollups-node/internal/node/nodemachine" + +type Input = []byte +type Output = []byte +type Report = []byte +type Hash = [32]byte + +func GetInputs() []Input { + return []Input{} +} + +func Store(outputs []Output, reports []Report, outputsHash Hash, machineHash Hash) error { + return nil +} + +func StartAdvanceServer(machine *nodemachine.NodeMachine) { + for { + for _, input := range GetInputs() { + outputs, reports, outputsHash, machineHash, err := machine.Advance(input) + if err != nil { + panic("TODO") + } + + err = Store(outputs, reports, outputsHash, machineHash) + if err != nil { + panic("TODO") + } + } + } +} diff --git a/internal/node/nodemachine/machine.go b/internal/node/nodemachine/machine.go new file mode 100644 index 000000000..cb4405201 --- /dev/null +++ b/internal/node/nodemachine/machine.go @@ -0,0 +1,108 @@ +// (c) Cartesi and individual authors (see AUTHORS) +// SPDX-License-Identifier: Apache-2.0 (see LICENSE) + +package nodemachine + +import ( + "context" + + "github.com/cartesi/rollups-node/internal/node/nodemachine/pmutex" + "github.com/cartesi/rollups-node/pkg/model" + "github.com/cartesi/rollups-node/pkg/rollupsmachine" + + "golang.org/x/sync/semaphore" +) + +type NodeMachine struct { + *rollupsmachine.RollupsMachine + + // Ensures advance/inspect mutual exclusion when accessing the inner RollupsMachine. + // Advances have a higher priority than Inspects to acquire the lock. + mutex *pmutex.PMutex + + // Controls how many inspects can be concurrently active. + inspects *semaphore.Weighted +} + +func New(machine *rollupsmachine.RollupsMachine, maxConcurrentInspects int8) *NodeMachine { + return &NodeMachine{ + RollupsMachine: machine, + mutex: pmutex.New(), + inspects: semaphore.NewWeighted(int64(maxConcurrentInspects)), + } +} + +func (machine *NodeMachine) Advance(input []byte) ( + outputs []rollupsmachine.Output, + reports []rollupsmachine.Report, + outputsHash model.Hash, + machineHash model.Hash, + err error) { + + var fork *rollupsmachine.RollupsMachine + + { // Forks the machine. + machine.mutex.HLock() + defer machine.mutex.Unlock() + fork, err = machine.Fork() + if err != nil { + return outputs, reports, outputsHash, machineHash, err + } + } + + // Sends the advance-state request. + outputs, reports, outputsHash, err = fork.Advance(input) + if err != nil { + return outputs, reports, outputsHash, machineHash, err + } + + // Gets the post-advance machine hash. + machineHash, err = fork.Hash() + if err != nil { + return outputs, reports, outputsHash, machineHash, err + } + + { // Destroys the old machine and updates the current one. + machine.mutex.HLock() + defer machine.mutex.Unlock() + err = machine.Destroy() + if err != nil { + return outputs, reports, outputsHash, machineHash, err + } + machine.RollupsMachine = fork + } + + return outputs, reports, outputsHash, machineHash, err +} + +func (machine *NodeMachine) Inspect(ctx context.Context, query []byte) ( + []rollupsmachine.Report, + error) { + + // Controls how many inspects can be concurrently active. + err := machine.inspects.Acquire(ctx, 1) + if err != nil { + return nil, err + } + defer machine.inspects.Release(1) + + // Forks the machine. + var forkedMachine *rollupsmachine.RollupsMachine + { + machine.mutex.LLock() + defer machine.mutex.Unlock() + forkedMachine, err = machine.Fork() + if err != nil { + return nil, err + } + } + + // Sends the inspect-state request. + reports, err := forkedMachine.Inspect(query) + if err != nil { + return nil, err + } + + // Destroys the forked machine and returns the reports. + return reports, forkedMachine.Destroy() +} diff --git a/internal/node/nodemachine/pmutex/pmutex.go b/internal/node/nodemachine/pmutex/pmutex.go new file mode 100644 index 000000000..1603a6869 --- /dev/null +++ b/internal/node/nodemachine/pmutex/pmutex.go @@ -0,0 +1,56 @@ +// (c) Cartesi and individual authors (see AUTHORS) +// SPDX-License-Identifier: Apache-2.0 (see LICENSE) + +package pmutex + +import ( + "sync" + "sync/atomic" +) + +// A PMutex is a mutual exclusion lock with priority capabilities. +// A call to HLock always acquires the mutex before LLock. +type PMutex struct { + // Main mutex. + mutex *sync.Mutex + + // Condition variable for the waiting low-priority threads. + waitingLow *sync.Cond + + // Quantity of high-priority threads waiting to acquire the lock. + waitingHigh *atomic.Int32 +} + +// New creates a new PMutex. +func New() *PMutex { + mutex := &sync.Mutex{} + return &PMutex{ + mutex: mutex, + waitingLow: sync.NewCond(mutex), + waitingHigh: &atomic.Int32{}, + } +} + +// HLock acquires the mutex for the high-priority threads. +func (lock *PMutex) HLock() { + lock.waitingHigh.Add(1) + lock.mutex.Lock() + lock.waitingHigh.Add(-1) +} + +// LLock acquires the mutex for the low-priority threads. +// It waits until there are no high-priority threads trying to acquire the lock. +func (lock *PMutex) LLock() { + lock.mutex.Lock() + for lock.waitingHigh.Load() != 0 { + // NOTE: a cond.Wait() releases the lock uppon being called + // and tries to acquire it after being awakened. + lock.waitingLow.Wait() + } +} + +// Unlock releases the mutex for both types of threads. +func (lock *PMutex) Unlock() { + lock.waitingLow.Broadcast() + lock.mutex.Unlock() +}