Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Execution service 5: make execution-service interfaces asynchronous #1620

Draft
wants to merge 24 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
69e6e2b
Merge branch 'validation-rpc-improvements' into execution-service-5
tsahee Mar 30, 2023
85f3a5e
Merge branch 'validation-rpc-improvements' into execution-service-5
tsahee Mar 31, 2023
1c5f071
Merge branch 'validation-rpc-improvements' into execution-service-5
tsahee Mar 31, 2023
564a1ac
Merge branch 'validation-rpc-improvements' into execution-service-5
tsahee Mar 31, 2023
7d27b61
Merge branch 'execution-service-4' into execution-service-5
tsahee Apr 3, 2023
62b96bf
move execution interface to promises
tsahee Apr 4, 2023
0ba3e28
Merge branch 'validation-rpc-improvements' into execution-service-5
tsahee Apr 5, 2023
2c232e4
Merge branch 'execution-service-4' into execution-service-5
tsahee Apr 19, 2023
1cbcc8c
Merge branch 'execution-service-4' into execution-service-5
tsahee Apr 30, 2023
8eed5a9
Merge branch 'execution-service-4' into execution-service-5
tsahee May 9, 2023
baf3351
Merge branch 'execution-service-4' into execution-service-5
tsahee May 11, 2023
65e77ac
Merge branch 'execution-service-4' into execution-service-5
tsahee May 26, 2023
6ab83cd
Merge branch 'execution-service-4' into execution-service-5
tsahee Jun 15, 2023
2ddeec8
Merge branch 'execution-service-4' into execution-service-5
tsahee Jun 27, 2023
98fa1a1
fix merge errors
tsahee Jun 27, 2023
a677b04
Merge branch 'execution-service-4' into execution-service-5
tsahee Jun 27, 2023
e92e690
Merge branch 'execution-service-4' into execution-service-5
tsahee Jul 10, 2023
3e7b813
Merge branch 'execution-service-4' into execution-service-5
tsahee Nov 9, 2023
343d269
update headerreader's interface to promise
tsahee Nov 9, 2023
0507e12
fix execserver syncmonitor default config
tsahee Nov 9, 2023
ed20b75
Merge remote-tracking branch 'origin/master' into execution-service-5
tsahee Mar 28, 2024
06266b7
fix promise interfaces
tsahee Mar 28, 2024
6901661
execution: use exiting validation.BatchInfo
tsahee Mar 28, 2024
bfbd8b9
recorder: only use promise in the node itself
tsahee Mar 29, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion arbnode/batch_poster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1012,7 +1012,7 @@ func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error)
var use4844 bool
config := b.config()
if config.Post4844Blobs && b.daWriter == nil && latestHeader.ExcessBlobGas != nil && latestHeader.BlobGasUsed != nil {
arbOSVersion, err := b.arbOSVersionGetter.ArbOSVersionForMessageNumber(arbutil.MessageIndex(arbmath.SaturatingUSub(uint64(batchPosition.MessageCount), 1)))
arbOSVersion, err := b.arbOSVersionGetter.ArbOSVersionForMessageNumber(arbutil.MessageIndex(arbmath.SaturatingUSub(uint64(batchPosition.MessageCount), 1))).Await(ctx)
if err != nil {
return false, err
}
Expand Down
4 changes: 2 additions & 2 deletions arbnode/delayed_sequencer.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func NewDelayedSequencer(l1Reader *headerreader.HeaderReader, reader *InboxReade
}

func (d *DelayedSequencer) getDelayedMessagesRead() (uint64, error) {
return d.exec.NextDelayedMessageNumber()
return d.exec.NextDelayedMessageNumber().Await(d.GetContext())
}

func (d *DelayedSequencer) trySequence(ctx context.Context, lastBlockHeader *types.Header) error {
Expand Down Expand Up @@ -180,7 +180,7 @@ func (d *DelayedSequencer) sequenceWithoutLockout(ctx context.Context, lastBlock
return fmt.Errorf("inbox reader at delayed message %v db accumulator %v doesn't match delayed bridge accumulator %v at L1 block %v", pos-1, lastDelayedAcc, delayedBridgeAcc, finalized)
}
for i, msg := range messages {
err = d.exec.SequenceDelayedMessage(msg, startPos+uint64(i))
_, err = d.exec.SequenceDelayedMessage(msg, startPos+uint64(i)).Await(d.GetContext())
if err != nil {
return err
}
Expand Down
59 changes: 54 additions & 5 deletions arbnode/inbox_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,14 @@ import (
"github.com/offchainlabs/nitro/arbos/arbostypes"
"github.com/offchainlabs/nitro/arbos/l2pricing"
"github.com/offchainlabs/nitro/arbutil"
"github.com/offchainlabs/nitro/execution"
"github.com/offchainlabs/nitro/execution/gethexec"
"github.com/offchainlabs/nitro/statetransfer"
"github.com/pkg/errors"

"github.com/offchainlabs/nitro/util/arbmath"
"github.com/offchainlabs/nitro/util/containers"
"github.com/offchainlabs/nitro/util/stopwaiter"
"github.com/offchainlabs/nitro/util/testhelpers"

"github.com/ethereum/go-ethereum/common"
Expand All @@ -30,13 +34,58 @@ import (
)

type execClientWrapper struct {
*gethexec.ExecutionEngine
t *testing.T
ExecEngine *gethexec.ExecutionEngine
t *testing.T
}

func (w *execClientWrapper) Pause() { w.t.Error("not supported") }
func (w *execClientWrapper) Activate() { w.t.Error("not supported") }
func (w *execClientWrapper) ForwardTo(url string) error { w.t.Error("not supported"); return nil }
func (w *execClientWrapper) Pause() containers.PromiseInterface[struct{}] {
w.t.Error("not supported")
return containers.NewReadyPromise[struct{}](struct{}{}, errors.New("Pause not supported"))
}

func (w *execClientWrapper) Activate() containers.PromiseInterface[struct{}] {
w.t.Error("not supported")
return containers.NewReadyPromise[struct{}](struct{}{}, errors.New("Activate not supported"))
}

func (w *execClientWrapper) ForwardTo(url string) containers.PromiseInterface[struct{}] {
w.t.Error("not supported")
return containers.NewReadyPromise[struct{}](struct{}{}, errors.New("ForwardTo not supported"))
}

func (w *execClientWrapper) DigestMessage(num arbutil.MessageIndex, msg *arbostypes.MessageWithMetadata, msgForPrefetch *arbostypes.MessageWithMetadata) containers.PromiseInterface[struct{}] {
return containers.NewReadyPromise(struct{}{}, w.ExecEngine.DigestMessage(num, msg, msgForPrefetch))
}

func (w *execClientWrapper) Reorg(count arbutil.MessageIndex, newMessages []arbostypes.MessageWithMetadata, oldMessages []*arbostypes.MessageWithMetadata) containers.PromiseInterface[struct{}] {
return stopwaiter.LaunchPromiseThread(w.ExecEngine, func(ctx context.Context) (struct{}, error) {
return struct{}{}, w.ExecEngine.Reorg(count, newMessages, oldMessages)
})
}

func (w *execClientWrapper) HeadMessageNumber() containers.PromiseInterface[arbutil.MessageIndex] {
return containers.NewReadyPromise(w.ExecEngine.HeadMessageNumber())
}

func (w *execClientWrapper) NextDelayedMessageNumber() containers.PromiseInterface[uint64] {
return containers.NewReadyPromise(w.ExecEngine.NextDelayedMessageNumber())
}

func (w *execClientWrapper) SequenceDelayedMessage(message *arbostypes.L1IncomingMessage, delayedSeqNum uint64) containers.PromiseInterface[struct{}] {
return stopwaiter.LaunchPromiseThread(w.ExecEngine, func(ctx context.Context) (struct{}, error) {
return struct{}{}, w.ExecEngine.SequenceDelayedMessage(ctx, message, delayedSeqNum)
})
}

func (w *execClientWrapper) ResultAtPos(pos arbutil.MessageIndex) containers.PromiseInterface[*execution.MessageResult] {
return containers.NewReadyPromise(w.ExecEngine.ResultAtPos(pos))
}

func (w *execClientWrapper) ArbOSVersionForMessageNumber(messageNum arbutil.MessageIndex) containers.PromiseInterface[uint64] {
return stopwaiter.LaunchPromiseThread(w.ExecEngine, func(ctx context.Context) (uint64, error) {
return w.ExecEngine.ArbOSVersionForMessageNumber(messageNum)
})
}

func NewTransactionStreamerForTest(t *testing.T, ownerAddress common.Address) (*gethexec.ExecutionEngine, *TransactionStreamer, ethdb.Database, *core.BlockChain) {
chainConfig := params.ArbitrumDevTestChainConfig()
Expand Down
9 changes: 5 additions & 4 deletions arbnode/maintenance.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func (mr *MaintenanceRunner) maybeRunMaintenance(ctx context.Context) time.Durat

if mr.seqCoordinator == nil {
mr.lastMaintenance = now
mr.runMaintenance()
mr.runMaintenance(ctx)
return time.Minute
}

Expand All @@ -161,14 +161,14 @@ func (mr *MaintenanceRunner) maybeRunMaintenance(ctx context.Context) time.Durat
// Avoid lockout for the sequencer and try to handoff.
if mr.seqCoordinator.AvoidLockout(ctx) && mr.seqCoordinator.TryToHandoffChosenOne(ctx) {
mr.lastMaintenance = now
mr.runMaintenance()
mr.runMaintenance(ctx)
}
defer mr.seqCoordinator.SeekLockout(ctx) // needs called even if c.Zombify returns false

return time.Minute
}

func (mr *MaintenanceRunner) runMaintenance() {
func (mr *MaintenanceRunner) runMaintenance(ctx context.Context) {
log.Info("Compacting databases (this may take a while...)")
results := make(chan error, len(mr.dbs))
expected := 0
Expand All @@ -181,7 +181,8 @@ func (mr *MaintenanceRunner) runMaintenance() {
}
expected++
go func() {
results <- mr.exec.Maintenance()
_, err := mr.exec.Maintenance().Await(ctx)
results <- err
}()
for i := 0; i < expected; i++ {
err := <-results
Expand Down
65 changes: 42 additions & 23 deletions arbnode/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,14 @@ import (
"github.com/offchainlabs/nitro/solgen/go/rollupgen"
"github.com/offchainlabs/nitro/staker"
"github.com/offchainlabs/nitro/staker/validatorwallet"
"github.com/offchainlabs/nitro/util/containers"
"github.com/offchainlabs/nitro/util/contracts"
"github.com/offchainlabs/nitro/util/headerreader"
"github.com/offchainlabs/nitro/util/redisutil"
"github.com/offchainlabs/nitro/util/rpcclient"
"github.com/offchainlabs/nitro/util/signature"
"github.com/offchainlabs/nitro/util/stopwaiter"
"github.com/offchainlabs/nitro/validator"
"github.com/offchainlabs/nitro/wsbroadcastserver"
)

Expand Down Expand Up @@ -953,50 +956,66 @@ func (n *Node) StopAndWait() {
}
}

func (n *Node) FetchBatch(ctx context.Context, batchNum uint64) ([]byte, common.Hash, error) {
return n.InboxReader.GetSequencerMessageBytes(ctx, batchNum)
func (n *Node) FetchBatch(batchNum uint64) containers.PromiseInterface[validator.BatchInfo] {
return stopwaiter.LaunchPromiseThread(&n.InboxReader.StopWaiterSafe,
func(ctx context.Context) (validator.BatchInfo, error) {
data, hash, err := n.InboxReader.GetSequencerMessageBytes(ctx, batchNum)
if err != nil {
return validator.BatchInfo{}, err
}
return validator.BatchInfo{
Number: batchNum,
Data: data,
BlockHash: hash,
}, nil
})
}

func (n *Node) FindInboxBatchContainingMessage(message arbutil.MessageIndex) (uint64, bool, error) {
return n.InboxTracker.FindInboxBatchContainingMessage(message)
func (n *Node) FindInboxBatchContainingMessage(message arbutil.MessageIndex) containers.PromiseInterface[*uint64] {
batch, found, err := n.InboxTracker.FindInboxBatchContainingMessage(message)
var batchPtr *uint64
if err == nil && found {
batchPtr = &batch
}
return containers.NewReadyPromise[*uint64](batchPtr, err)
}

func (n *Node) GetBatchParentChainBlock(seqNum uint64) (uint64, error) {
return n.InboxTracker.GetBatchParentChainBlock(seqNum)
func (n *Node) GetBatchParentChainBlock(seqNum uint64) containers.PromiseInterface[uint64] {
return containers.NewReadyPromise(n.InboxTracker.GetBatchParentChainBlock(seqNum))
}

func (n *Node) FullSyncProgressMap() map[string]interface{} {
return n.SyncMonitor.FullSyncProgressMap()
func (n *Node) FullSyncProgressMap() containers.PromiseInterface[map[string]interface{}] {
return containers.NewReadyPromise(n.SyncMonitor.FullSyncProgressMap(), nil)
}

func (n *Node) Synced() bool {
return n.SyncMonitor.Synced()
func (n *Node) Synced() containers.PromiseInterface[bool] {
return containers.NewReadyPromise(n.SyncMonitor.Synced(), nil)
}

func (n *Node) SyncTargetMessageCount() arbutil.MessageIndex {
return n.SyncMonitor.SyncTargetMessageCount()
func (n *Node) SyncTargetMessageCount() containers.PromiseInterface[arbutil.MessageIndex] {
return containers.NewReadyPromise(n.SyncMonitor.SyncTargetMessageCount(), nil)
}

// TODO: switch from pulling to pushing safe/finalized
func (n *Node) GetSafeMsgCount(ctx context.Context) (arbutil.MessageIndex, error) {
return n.InboxReader.GetSafeMsgCount(ctx)
func (n *Node) GetSafeMsgCount() containers.PromiseInterface[arbutil.MessageIndex] {
return stopwaiter.LaunchPromiseThread(n.InboxReader, n.InboxReader.GetSafeMsgCount)
}

func (n *Node) GetFinalizedMsgCount(ctx context.Context) (arbutil.MessageIndex, error) {
return n.InboxReader.GetFinalizedMsgCount(ctx)
func (n *Node) GetFinalizedMsgCount() containers.PromiseInterface[arbutil.MessageIndex] {
return stopwaiter.LaunchPromiseThread(n.InboxReader, n.InboxReader.GetFinalizedMsgCount)
}

func (n *Node) WriteMessageFromSequencer(pos arbutil.MessageIndex, msgWithMeta arbostypes.MessageWithMetadata) error {
return n.TxStreamer.WriteMessageFromSequencer(pos, msgWithMeta)
func (n *Node) WriteMessageFromSequencer(pos arbutil.MessageIndex, msgWithMeta arbostypes.MessageWithMetadata) containers.PromiseInterface[struct{}] {
return containers.NewReadyPromise(struct{}{}, n.TxStreamer.WriteMessageFromSequencer(pos, msgWithMeta))
}

func (n *Node) ExpectChosenSequencer() error {
return n.TxStreamer.ExpectChosenSequencer()
func (n *Node) ExpectChosenSequencer() containers.PromiseInterface[struct{}] {
return containers.NewReadyPromise(struct{}{}, n.TxStreamer.ExpectChosenSequencer())
}

func (n *Node) ValidatedMessageCount() (arbutil.MessageIndex, error) {
func (n *Node) ValidatedMessageCount() containers.PromiseInterface[arbutil.MessageIndex] {
if n.BlockValidator == nil {
return 0, errors.New("validator not set up")
return containers.NewReadyPromise(arbutil.MessageIndex(0), errors.New("validator not set up"))
}
return n.BlockValidator.GetValidated(), nil
return containers.NewReadyPromise(n.BlockValidator.GetValidated(), nil)
}
2 changes: 1 addition & 1 deletion arbnode/seq_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -506,7 +506,7 @@ func (c *SeqCoordinator) update(ctx context.Context) time.Duration {
if chosenSeq != c.config.Url() && chosenSeq != c.prevChosenSequencer {
var err error
if c.sequencer != nil {
err = c.sequencer.ForwardTo(chosenSeq)
_, err = c.sequencer.ForwardTo(chosenSeq).Await(ctx)
}
if err == nil {
c.prevChosenSequencer = chosenSeq
Expand Down
10 changes: 5 additions & 5 deletions arbnode/transaction_streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ func (s *TransactionStreamer) reorg(batch ethdb.Batch, count arbutil.MessageInde
s.reorgMutex.Lock()
defer s.reorgMutex.Unlock()

err = s.exec.Reorg(count, newMessages, oldMessages)
_, err = s.exec.Reorg(count, newMessages, oldMessages).Await(s.GetContext())
if err != nil {
return err
}
Expand Down Expand Up @@ -412,7 +412,7 @@ func (s *TransactionStreamer) GetProcessedMessageCount() (arbutil.MessageIndex,
if err != nil {
return 0, err
}
digestedHead, err := s.exec.HeadMessageNumber()
digestedHead, err := s.exec.HeadMessageNumber().Await(s.GetContext())
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -941,7 +941,7 @@ func (s *TransactionStreamer) ResultAtCount(count arbutil.MessageIndex) (*execut
if count == 0 {
return &execution.MessageResult{}, nil
}
return s.exec.ResultAtPos(count - 1)
return s.exec.ResultAtPos(count - 1).Await(s.GetContext())
}

// return value: true if should be called again immediately
Expand All @@ -960,7 +960,7 @@ func (s *TransactionStreamer) executeNextMsg(ctx context.Context, exec execution
return false
}
s.execLastMsgCount = msgCount
pos, err := s.exec.HeadMessageNumber()
pos, err := s.exec.HeadMessageNumber().Await(ctx)
if err != nil {
log.Error("feedOneMsg failed to get exec engine message count", "err", err)
return false
Expand All @@ -983,7 +983,7 @@ func (s *TransactionStreamer) executeNextMsg(ctx context.Context, exec execution
}
msgForPrefetch = msg
}
if err = s.exec.DigestMessage(pos, msg, msgForPrefetch); err != nil {
if _, err = s.exec.DigestMessage(pos, msg, msgForPrefetch).Await(ctx); err != nil {
logger := log.Warn
if prevMessageCount < msgCount {
logger = log.Debug
Expand Down
17 changes: 4 additions & 13 deletions execution/gethexec/block_recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,7 @@ func stateLogFunc(targetHeader, header *types.Header, hasState bool) {

// If msg is nil, this will record block creation up to the point where message would be accessed (for a "too far" proof)
// If keepreference == true, reference to state of prevHeader is added (no reference added if an error is returned)
func (r *BlockRecorder) RecordBlockCreation(
ctx context.Context,
pos arbutil.MessageIndex,
msg *arbostypes.MessageWithMetadata,
) (*execution.RecordResult, error) {

func (r *BlockRecorder) RecordBlockCreation(ctx context.Context, pos arbutil.MessageIndex, msg *arbostypes.MessageWithMetadata) (*execution.RecordResult, error) {
blockNum := r.execEngine.MessageIndexToBlockNumber(pos)

var prevHeader *types.Header
Expand Down Expand Up @@ -123,16 +118,12 @@ func (r *BlockRecorder) RecordBlockCreation(
var readBatchInfo []validator.BatchInfo
if msg != nil {
batchFetcher := func(batchNum uint64) ([]byte, error) {
data, blockHash, err := r.execEngine.consensus.FetchBatch(ctx, batchNum)
batchInfo, err := r.execEngine.consensus.FetchBatch(batchNum).Await(ctx)
if err != nil {
return nil, err
}
readBatchInfo = append(readBatchInfo, validator.BatchInfo{
Number: batchNum,
BlockHash: blockHash,
Data: data,
})
return data, nil
readBatchInfo = append(readBatchInfo, batchInfo)
return batchInfo.Data, nil
}
// Re-fetch the batch instead of using our cached cost,
// as the replay binary won't have the cache populated.
Expand Down
Loading