From 6d975a954ff66196730b5847fc8e9682c27e1761 Mon Sep 17 00:00:00 2001 From: Ilya Mikheev <54912776+JkLondon@users.noreply.github.com> Date: Mon, 23 Sep 2024 14:51:34 +0200 Subject: [PATCH] Revert "InMem/Sync/IsMining replacement" (#12067) Reverts erigontech/erigon#12028 --------- Co-authored-by: JkLondon --- cmd/integration/commands/stages.go | 5 ++- cmd/state/exec3/state.go | 24 +++++++------- core/blockchain.go | 12 +++---- eth/backend.go | 8 ++--- eth/stagedsync/exec3.go | 42 +++++++++++++----------- eth/stagedsync/stage.go | 1 - eth/stagedsync/stage_execute.go | 10 +++--- eth/stagedsync/stage_mining_exec.go | 4 +-- eth/stagedsync/stagedsynctest/harness.go | 2 -- eth/stagedsync/stages/stages.go | 8 ----- eth/stagedsync/sync.go | 10 +++--- eth/stagedsync/sync_test.go | 22 ++++++------- turbo/stages/mock/mock_sentry.go | 6 ++-- turbo/stages/stageloop.go | 1 - 14 files changed, 68 insertions(+), 87 deletions(-) diff --git a/cmd/integration/commands/stages.go b/cmd/integration/commands/stages.go index 223bee009fb..7429e06aa3e 100644 --- a/cmd/integration/commands/stages.go +++ b/cmd/integration/commands/stages.go @@ -1486,9 +1486,9 @@ func newSync(ctx context.Context, db kv.RwDB, miningConfig *params.MiningConfig, recents = bor.Recents signatures = bor.Signatures } - stagesDefault := stages2.NewDefaultStages(context.Background(), db, snapDb, p2p.Config{}, &cfg, sentryControlServer, notifications, nil, blockReader, blockRetire, agg, nil, nil, + stages := stages2.NewDefaultStages(context.Background(), db, snapDb, p2p.Config{}, &cfg, sentryControlServer, notifications, nil, blockReader, blockRetire, agg, nil, nil, heimdallClient, recents, signatures, logger) - sync := stagedsync.New(cfg.Sync, stagesDefault, stagedsync.DefaultUnwindOrder, stagedsync.DefaultPruneOrder, logger, stages.ApplyingBlocks) + sync := stagedsync.New(cfg.Sync, stages, stagedsync.DefaultUnwindOrder, stagedsync.DefaultPruneOrder, logger) miner := stagedsync.NewMiningState(&cfg.Miner) miningCancel := make(chan struct{}) @@ -1527,7 +1527,6 @@ func newSync(ctx context.Context, db kv.RwDB, miningConfig *params.MiningConfig, stagedsync.MiningUnwindOrder, stagedsync.MiningPruneOrder, logger, - stages.BlockProduction, ) return engine, vmConfig, sync, miningSync, miner diff --git a/cmd/state/exec3/state.go b/cmd/state/exec3/state.go index 1db17933b05..db7a8272a6d 100644 --- a/cmd/state/exec3/state.go +++ b/cmd/state/exec3/state.go @@ -20,8 +20,6 @@ import ( "context" "sync" - "github.com/erigontech/erigon/eth/stagedsync/stages" - "golang.org/x/sync/errgroup" "github.com/erigontech/erigon-lib/log/v3" @@ -72,10 +70,10 @@ type Worker struct { dirs datadir.Dirs - mode stages.Mode + isMining bool } -func NewWorker(lock sync.Locker, logger log.Logger, ctx context.Context, background bool, chainDb kv.RoDB, in *state.QueueWithRetry, blockReader services.FullBlockReader, chainConfig *chain.Config, genesis *types.Genesis, results *state.ResultsQueue, engine consensus.Engine, dirs datadir.Dirs, mode stages.Mode) *Worker { +func NewWorker(lock sync.Locker, logger log.Logger, ctx context.Context, background bool, chainDb kv.RoDB, in *state.QueueWithRetry, blockReader services.FullBlockReader, chainConfig *chain.Config, genesis *types.Genesis, results *state.ResultsQueue, engine consensus.Engine, dirs datadir.Dirs, isMining bool) *Worker { w := &Worker{ lock: lock, logger: logger, @@ -96,7 +94,7 @@ func NewWorker(lock sync.Locker, logger log.Logger, ctx context.Context, backgro dirs: dirs, - mode: mode, + isMining: isMining, } w.taskGasPool.AddBlobGas(chainConfig.GetMaxBlobGasPerBlock()) w.vmCfg = vm.Config{Debug: true, Tracer: w.callTracer} @@ -132,7 +130,7 @@ func (rw *Worker) ResetTx(chainTx kv.Tx) { func (rw *Worker) Run() error { for txTask, ok := rw.in.Next(rw.ctx); ok; txTask, ok = rw.in.Next(rw.ctx) { - rw.RunTxTask(txTask, rw.mode) + rw.RunTxTask(txTask, rw.isMining) if err := rw.resultCh.Add(rw.ctx, txTask); err != nil { return err } @@ -140,10 +138,10 @@ func (rw *Worker) Run() error { return nil } -func (rw *Worker) RunTxTask(txTask *state.TxTask, mode stages.Mode) { +func (rw *Worker) RunTxTask(txTask *state.TxTask, isMining bool) { rw.lock.Lock() defer rw.lock.Unlock() - rw.RunTxTaskNoLock(txTask, mode) + rw.RunTxTaskNoLock(txTask, isMining) } // Needed to set history reader when need to offset few txs from block beginning and does not break processing, @@ -165,7 +163,7 @@ func (rw *Worker) SetReader(reader state.ResettableStateReader) { } } -func (rw *Worker) RunTxTaskNoLock(txTask *state.TxTask, mode stages.Mode) { +func (rw *Worker) RunTxTaskNoLock(txTask *state.TxTask, isMining bool) { if txTask.HistoryExecution && !rw.historyMode { // in case if we cancelled execution and commitment happened in the middle of the block, we have to process block // from the beginning until committed txNum and only then disable history mode. @@ -234,7 +232,7 @@ func (rw *Worker) RunTxTaskNoLock(txTask *state.TxTask, mode stages.Mode) { return core.SysCallContract(contract, data, rw.chainConfig, ibs, header, rw.engine, false /* constCall */) } - if mode == stages.BlockProduction { + if isMining { _, txTask.Txs, txTask.BlockReceipts, err = rw.engine.FinalizeAndAssemble(rw.chainConfig, types.CopyHeader(header), ibs, txTask.Txs, txTask.Uncles, txTask.BlockReceipts, txTask.Withdrawals, txTask.Requests, rw.chain, syscall, nil, rw.logger) } else { _, _, _, err = rw.engine.Finalize(rw.chainConfig, types.CopyHeader(header), ibs, txTask.Txs, txTask.Uncles, txTask.BlockReceipts, txTask.Withdrawals, txTask.Requests, rw.chain, syscall, rw.logger) @@ -299,7 +297,7 @@ func (rw *Worker) RunTxTaskNoLock(txTask *state.TxTask, mode stages.Mode) { } } -func NewWorkersPool(lock sync.Locker, accumulator *shards.Accumulator, logger log.Logger, ctx context.Context, background bool, chainDb kv.RoDB, rs *state.StateV3, in *state.QueueWithRetry, blockReader services.FullBlockReader, chainConfig *chain.Config, genesis *types.Genesis, engine consensus.Engine, workerCount int, dirs datadir.Dirs, mode stages.Mode) (reconWorkers []*Worker, applyWorker *Worker, rws *state.ResultsQueue, clear func(), wait func()) { +func NewWorkersPool(lock sync.Locker, accumulator *shards.Accumulator, logger log.Logger, ctx context.Context, background bool, chainDb kv.RoDB, rs *state.StateV3, in *state.QueueWithRetry, blockReader services.FullBlockReader, chainConfig *chain.Config, genesis *types.Genesis, engine consensus.Engine, workerCount int, dirs datadir.Dirs, isMining bool) (reconWorkers []*Worker, applyWorker *Worker, rws *state.ResultsQueue, clear func(), wait func()) { reconWorkers = make([]*Worker, workerCount) resultChSize := workerCount * 8 @@ -310,7 +308,7 @@ func NewWorkersPool(lock sync.Locker, accumulator *shards.Accumulator, logger lo ctx, cancel := context.WithCancel(ctx) g, ctx := errgroup.WithContext(ctx) for i := 0; i < workerCount; i++ { - reconWorkers[i] = NewWorker(lock, logger, ctx, background, chainDb, in, blockReader, chainConfig, genesis, rws, engine, dirs, mode) + reconWorkers[i] = NewWorker(lock, logger, ctx, background, chainDb, in, blockReader, chainConfig, genesis, rws, engine, dirs, isMining) reconWorkers[i].ResetState(rs, accumulator) } if background { @@ -337,7 +335,7 @@ func NewWorkersPool(lock sync.Locker, accumulator *shards.Accumulator, logger lo //applyWorker.ResetTx(nil) } } - applyWorker = NewWorker(lock, logger, ctx, false, chainDb, in, blockReader, chainConfig, genesis, rws, engine, dirs, mode) + applyWorker = NewWorker(lock, logger, ctx, false, chainDb, in, blockReader, chainConfig, genesis, rws, engine, dirs, isMining) return reconWorkers, applyWorker, rws, clear, wait } diff --git a/core/blockchain.go b/core/blockchain.go index 19c3519d751..068de0b2c0e 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -28,8 +28,6 @@ import ( "slices" "time" - "github.com/erigontech/erigon/eth/stagedsync/stages" - "golang.org/x/crypto/sha3" "github.com/erigontech/erigon-lib/chain" @@ -172,7 +170,7 @@ func ExecuteBlockEphemerally( if !vmConfig.ReadOnly { txs := block.Transactions() - if _, _, _, err := FinalizeBlockExecution(engine, stateReader, block.Header(), txs, block.Uncles(), stateWriter, chainConfig, ibs, receipts, block.Withdrawals(), block.Requests(), chainReader, stages.ApplyingBlocks, logger); err != nil { + if _, _, _, err := FinalizeBlockExecution(engine, stateReader, block.Header(), txs, block.Uncles(), stateWriter, chainConfig, ibs, receipts, block.Withdrawals(), block.Requests(), chainReader, false, logger); err != nil { return nil, err } } @@ -325,14 +323,14 @@ func FinalizeBlockExecution( stateWriter state.StateWriter, cc *chain.Config, ibs *state.IntraBlockState, receipts types.Receipts, withdrawals []*types.Withdrawal, requests types.Requests, chainReader consensus.ChainReader, - mode stages.Mode, + isMining bool, logger log.Logger, ) (newBlock *types.Block, newTxs types.Transactions, newReceipt types.Receipts, err error) { syscall := func(contract libcommon.Address, data []byte) ([]byte, error) { return SysCallContract(contract, data, cc, ibs, header, engine, false /* constCall */) } - if mode == stages.BlockProduction { + if isMining { newBlock, newTxs, newReceipt, err = engine.FinalizeAndAssemble(cc, header, ibs, txs, uncles, receipts, withdrawals, requests, chainReader, syscall, nil, logger) } else { var rss types.Requests @@ -369,7 +367,7 @@ func InitializeBlockExecution(engine consensus.Engine, chain consensus.ChainHead return nil } -func BlockPostValidation(gasUsed, blobGasUsed uint64, checkReceipts bool, receipts types.Receipts, h *types.Header, mode stages.Mode) error { +func BlockPostValidation(gasUsed, blobGasUsed uint64, checkReceipts bool, receipts types.Receipts, h *types.Header, isMining bool) error { if gasUsed != h.GasUsed { return fmt.Errorf("gas used by execution: %d, in header: %d, headerNum=%d, %x", gasUsed, h.GasUsed, h.Number.Uint64(), h.Hash()) @@ -385,7 +383,7 @@ func BlockPostValidation(gasUsed, blobGasUsed uint64, checkReceipts bool, receip } receiptHash := types.DeriveSha(receipts) if receiptHash != h.ReceiptHash { - if mode == stages.BlockProduction { + if isMining { h.ReceiptHash = receiptHash return nil } diff --git a/eth/backend.go b/eth/backend.go index a8154b09de1..b9280e2476c 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -722,7 +722,7 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger stagedsync.StageMiningExecCfg(backend.chainDB, miner, backend.notifications.Events, *backend.chainConfig, backend.engine, &vm.Config{}, tmpdir, nil, 0, backend.txPool, backend.txPoolDB, blockReader), stagedsync.StageMiningFinishCfg(backend.chainDB, *backend.chainConfig, backend.engine, miner, backend.miningSealingQuit, backend.blockReader, latestBlockBuiltStore), ), stagedsync.MiningUnwindOrder, stagedsync.MiningPruneOrder, - logger, stages.BlockProduction) + logger) var ethashApi *ethash.API if casted, ok := backend.engine.(*ethash.Ethash); ok { @@ -763,7 +763,7 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger ), stagedsync.StageSendersCfg(backend.chainDB, chainConfig, config.Sync, false, dirs.Tmp, config.Prune, blockReader, backend.sentriesClient.Hd), stagedsync.StageMiningExecCfg(backend.chainDB, miningStatePos, backend.notifications.Events, *backend.chainConfig, backend.engine, &vm.Config{}, tmpdir, interrupt, param.PayloadId, backend.txPool, backend.txPoolDB, blockReader), - stagedsync.StageMiningFinishCfg(backend.chainDB, *backend.chainConfig, backend.engine, miningStatePos, backend.miningSealingQuit, backend.blockReader, latestBlockBuiltStore)), stagedsync.MiningUnwindOrder, stagedsync.MiningPruneOrder, logger, stages.BlockProduction) + stagedsync.StageMiningFinishCfg(backend.chainDB, *backend.chainConfig, backend.engine, miningStatePos, backend.miningSealingQuit, backend.blockReader, latestBlockBuiltStore)), stagedsync.MiningUnwindOrder, stagedsync.MiningPruneOrder, logger) // We start the mining step if err := stages2.MiningStep(ctx, backend.chainDB, proposingSync, tmpdir, logger); err != nil { return nil, err @@ -916,7 +916,7 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger backend.syncPruneOrder = stagedsync.DefaultPruneOrder } - backend.stagedSync = stagedsync.New(config.Sync, backend.syncStages, backend.syncUnwindOrder, backend.syncPruneOrder, logger, stages.ApplyingBlocks) + backend.stagedSync = stagedsync.New(config.Sync, backend.syncStages, backend.syncUnwindOrder, backend.syncPruneOrder, logger) hook := stages2.NewHook(backend.sentryCtx, backend.chainDB, backend.notifications, backend.stagedSync, backend.blockReader, backend.chainConfig, backend.logger, backend.sentriesClient.SetStatus) @@ -944,7 +944,7 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger checkStateRoot := true pipelineStages := stages2.NewPipelineStages(ctx, backend.chainDB, config, p2pConfig, backend.sentriesClient, backend.notifications, backend.downloaderClient, blockReader, blockRetire, backend.agg, backend.silkworm, backend.forkValidator, logger, checkStateRoot) - backend.pipelineStagedSync = stagedsync.New(config.Sync, pipelineStages, stagedsync.PipelineUnwindOrder, stagedsync.PipelinePruneOrder, logger, stages.ApplyingBlocks) + backend.pipelineStagedSync = stagedsync.New(config.Sync, pipelineStages, stagedsync.PipelineUnwindOrder, stagedsync.PipelinePruneOrder, logger) backend.eth1ExecutionServer = eth1.NewEthereumExecutionModule(blockReader, backend.chainDB, backend.pipelineStagedSync, backend.forkValidator, chainConfig, assembleBlockPOS, hook, backend.notifications.Accumulator, backend.notifications.StateChangesConsumer, logger, backend.engine, config.Sync, ctx) executionRpc := direct.NewExecutionClientDirect(backend.eth1ExecutionServer) diff --git a/eth/stagedsync/exec3.go b/eth/stagedsync/exec3.go index 70fbd72f0cd..110a42727a1 100644 --- a/eth/stagedsync/exec3.go +++ b/eth/stagedsync/exec3.go @@ -189,6 +189,7 @@ func ExecV3(ctx context.Context, maxBlockNum uint64, logger log.Logger, initialCycle bool, + isMining bool, ) error { // TODO: e35 doesn't support parallel-exec yet parallel = false //nolint @@ -225,8 +226,9 @@ func ExecV3(ctx context.Context, pruneNonEssentials := cfg.prune.History.Enabled() && cfg.prune.History.PruneTo(execStage.BlockNumber) == execStage.BlockNumber var err error + inMemExec := txc.Doms != nil var doms *state2.SharedDomains - if execStage.CurrentSyncCycle.Mode == stages.ForkValidation { + if inMemExec { doms = txc.Doms } else { var err error @@ -378,10 +380,10 @@ func ExecV3(ctx context.Context, rwsConsumed := make(chan struct{}, 1) defer close(rwsConsumed) - execWorkers, _, rws, stopWorkers, waitWorkers := exec3.NewWorkersPool(lock.RLocker(), accumulator, logger, ctx, parallel, chainDb, rs, in, blockReader, chainConfig, genesis, engine, workerCount+1, cfg.dirs, execStage.CurrentSyncCycle.Mode) + execWorkers, _, rws, stopWorkers, waitWorkers := exec3.NewWorkersPool(lock.RLocker(), accumulator, logger, ctx, parallel, chainDb, rs, in, blockReader, chainConfig, genesis, engine, workerCount+1, cfg.dirs, isMining) defer stopWorkers() applyWorker := cfg.applyWorker - if execStage.CurrentSyncCycle.Mode == stages.BlockProduction { + if isMining { applyWorker = cfg.applyWorkerMining } applyWorker.ResetState(rs, accumulator) @@ -422,7 +424,7 @@ func ExecV3(ctx context.Context, return err } - processedTxNum, conflicts, triggers, processedBlockNum, stoppedAtBlockEnd, err := processResultQueue(ctx, in, rws, outputTxNum.Load(), rs, agg, tx, rwsConsumed, applyWorker, true, false, execStage.CurrentSyncCycle.Mode) + processedTxNum, conflicts, triggers, processedBlockNum, stoppedAtBlockEnd, err := processResultQueue(ctx, in, rws, outputTxNum.Load(), rs, agg, tx, rwsConsumed, applyWorker, true, false, isMining) if err != nil { return err } @@ -500,14 +502,14 @@ func ExecV3(ctx context.Context, return err } ac.Close() - if execStage.CurrentSyncCycle.Mode != stages.ForkValidation { + if !inMemExec { if err = doms.Flush(ctx, tx); err != nil { return err } } break } - if execStage.CurrentSyncCycle.Mode == stages.ForkValidation { + if inMemExec { break } @@ -523,7 +525,7 @@ func ExecV3(ctx context.Context, rws.DrainNonBlocking() applyWorker.ResetTx(tx) - processedTxNum, conflicts, triggers, processedBlockNum, stoppedAtBlockEnd, err := processResultQueue(ctx, in, rws, outputTxNum.Load(), rs, agg, tx, nil, applyWorker, false, true, execStage.CurrentSyncCycle.Mode) + processedTxNum, conflicts, triggers, processedBlockNum, stoppedAtBlockEnd, err := processResultQueue(ctx, in, rws, outputTxNum.Load(), rs, agg, tx, nil, applyWorker, false, true, isMining) if err != nil { return err } @@ -855,7 +857,7 @@ Loop: if txTask.Error != nil { break Loop } - applyWorker.RunTxTaskNoLock(txTask, execStage.CurrentSyncCycle.Mode) + applyWorker.RunTxTaskNoLock(txTask, isMining) if err := func() error { if errors.Is(txTask.Error, context.Canceled) { return err @@ -874,12 +876,12 @@ Loop: blobGasUsed += txTask.Tx.GetBlobGas() } if txTask.Final { - if execStage.CurrentSyncCycle.Mode == stages.ApplyingBlocks && !execStage.CurrentSyncCycle.IsInitialCycle { + if !isMining && !inMemExec && !execStage.CurrentSyncCycle.IsInitialCycle { cfg.notifications.RecentLogs.Add(receipts) } - checkReceipts := !cfg.vmConfig.StatelessExec && chainConfig.IsByzantium(txTask.BlockNum) && !cfg.vmConfig.NoReceipts && execStage.CurrentSyncCycle.Mode != stages.BlockProduction + checkReceipts := !cfg.vmConfig.StatelessExec && chainConfig.IsByzantium(txTask.BlockNum) && !cfg.vmConfig.NoReceipts && !isMining if txTask.BlockNum > 0 && !skipPostEvaluation { //Disable check for genesis. Maybe need somehow improve it in future - to satisfy TestExecutionSpec - if err := core.BlockPostValidation(usedGas, blobGasUsed, checkReceipts, receipts, txTask.Header, execStage.CurrentSyncCycle.Mode); err != nil { + if err := core.BlockPostValidation(usedGas, blobGasUsed, checkReceipts, receipts, txTask.Header, isMining); err != nil { return fmt.Errorf("%w, txnIdx=%d, %v", consensus.ErrInvalidBlock, txTask.TxIndex, err) //same as in stage_exec.go } } @@ -936,7 +938,7 @@ Loop: ts += time.Since(start) aggTx.RestrictSubsetFileDeletions(false) doms.SavePastChangesetAccumulator(b.Hash(), blockNum, changeset) - if execStage.CurrentSyncCycle.Mode != stages.ForkValidation { + if !inMemExec { if err := state2.WriteDiffSet(applyTx, blockNum, b.Hash(), changeset); err != nil { return err } @@ -951,7 +953,7 @@ Loop: // MA commitTx if !parallel { - if execStage.CurrentSyncCycle.Mode == stages.ApplyingBlocks { + if !inMemExec && !isMining { metrics2.UpdateBlockConsumerPostExecutionDelay(b.Time(), blockNum, logger) } @@ -959,7 +961,7 @@ Loop: select { case <-logEvery.C: - if execStage.CurrentSyncCycle.Mode == stages.ForkValidation || execStage.CurrentSyncCycle.Mode == stages.BlockProduction { + if inMemExec || isMining { break } @@ -990,7 +992,7 @@ Loop: t1, t2, t3 time.Duration ) - if ok, err := flushAndCheckCommitmentV3(ctx, b.HeaderNoCopy(), applyTx, doms, cfg, execStage, stageProgress, parallel, logger, u, execStage.CurrentSyncCycle.Mode); err != nil { + if ok, err := flushAndCheckCommitmentV3(ctx, b.HeaderNoCopy(), applyTx, doms, cfg, execStage, stageProgress, parallel, logger, u, inMemExec); err != nil { return err } else if !ok { break Loop @@ -1076,7 +1078,7 @@ Loop: if u != nil && !u.HasUnwindPoint() { if b != nil { - _, err := flushAndCheckCommitmentV3(ctx, b.HeaderNoCopy(), applyTx, doms, cfg, execStage, stageProgress, parallel, logger, u, execStage.CurrentSyncCycle.Mode) + _, err := flushAndCheckCommitmentV3(ctx, b.HeaderNoCopy(), applyTx, doms, cfg, execStage, stageProgress, parallel, logger, u, inMemExec) if err != nil { return err } @@ -1150,7 +1152,7 @@ func dumpPlainStateDebug(tx kv.RwTx, doms *state2.SharedDomains) { } // flushAndCheckCommitmentV3 - does write state to db and then check commitment -func flushAndCheckCommitmentV3(ctx context.Context, header *types.Header, applyTx kv.RwTx, doms *state2.SharedDomains, cfg ExecuteBlockCfg, e *StageState, maxBlockNum uint64, parallel bool, logger log.Logger, u Unwinder, mode stages.Mode) (bool, error) { +func flushAndCheckCommitmentV3(ctx context.Context, header *types.Header, applyTx kv.RwTx, doms *state2.SharedDomains, cfg ExecuteBlockCfg, e *StageState, maxBlockNum uint64, parallel bool, logger log.Logger, u Unwinder, inMemExec bool) (bool, error) { // E2 state root check was in another stage - means we did flush state even if state root will not match // And Unwind expecting it @@ -1183,7 +1185,7 @@ func flushAndCheckCommitmentV3(ctx context.Context, header *types.Header, applyT return true, nil } if bytes.Equal(rh, header.Root.Bytes()) { - if mode != stages.ForkValidation { + if !inMemExec { if err := doms.Flush(ctx, applyTx); err != nil { return false, err } @@ -1254,7 +1256,7 @@ func blockWithSenders(ctx context.Context, db kv.RoDB, tx kv.Tx, blockReader ser return b, err } -func processResultQueue(ctx context.Context, in *state.QueueWithRetry, rws *state.ResultsQueue, outputTxNumIn uint64, rs *state.StateV3, agg *state2.Aggregator, applyTx kv.Tx, backPressure chan struct{}, applyWorker *exec3.Worker, canRetry, forceStopAtBlockEnd bool, mode stages.Mode) (outputTxNum uint64, conflicts, triggers int, processedBlockNum uint64, stopedAtBlockEnd bool, err error) { +func processResultQueue(ctx context.Context, in *state.QueueWithRetry, rws *state.ResultsQueue, outputTxNumIn uint64, rs *state.StateV3, agg *state2.Aggregator, applyTx kv.Tx, backPressure chan struct{}, applyWorker *exec3.Worker, canRetry, forceStopAtBlockEnd bool, isMining bool) (outputTxNum uint64, conflicts, triggers int, processedBlockNum uint64, stopedAtBlockEnd bool, err error) { rwsIt := rws.Iter() defer rwsIt.Close() @@ -1272,7 +1274,7 @@ func processResultQueue(ctx context.Context, in *state.QueueWithRetry, rws *stat } // resolve first conflict right here: it's faster and conflict-free - applyWorker.RunTxTask(txTask, mode) + applyWorker.RunTxTask(txTask, isMining) if txTask.Error != nil { return outputTxNum, conflicts, triggers, processedBlockNum, false, fmt.Errorf("%w: %v", consensus.ErrInvalidBlock, txTask.Error) } diff --git a/eth/stagedsync/stage.go b/eth/stagedsync/stage.go index 7f88ec867ab..ce651474132 100644 --- a/eth/stagedsync/stage.go +++ b/eth/stagedsync/stage.go @@ -59,7 +59,6 @@ type Stage struct { type CurrentSyncCycleInfo struct { IsInitialCycle bool // means: not-on-chain-tip. can be several sync cycle in this mode. IsFirstCycle bool // means: first cycle - Mode stages.Mode } // StageState is the state of the stage. diff --git a/eth/stagedsync/stage_execute.go b/eth/stagedsync/stage_execute.go index 1d0b1920826..9a6bdda9dd9 100644 --- a/eth/stagedsync/stage_execute.go +++ b/eth/stagedsync/stage_execute.go @@ -131,15 +131,15 @@ func StageExecuteBlocksCfg( historyV3: true, syncCfg: syncCfg, silkworm: silkworm, + applyWorker: exec3.NewWorker(nil, log.Root(), context.Background(), false, db, nil, blockReader, chainConfig, genesis, nil, engine, dirs, false), + applyWorkerMining: exec3.NewWorker(nil, log.Root(), context.Background(), false, db, nil, blockReader, chainConfig, genesis, nil, engine, dirs, true), keepAllChangesets: keepAllChangesets, - applyWorker: exec3.NewWorker(nil, log.Root(), context.Background(), false, db, nil, blockReader, chainConfig, genesis, nil, engine, dirs, stages.ApplyingBlocks), - applyWorkerMining: exec3.NewWorker(nil, log.Root(), context.Background(), false, db, nil, blockReader, chainConfig, genesis, nil, engine, dirs, stages.BlockProduction), } } // ================ Erigon3 ================ -func ExecBlockV3(s *StageState, u Unwinder, txc wrap.TxContainer, toBlock uint64, ctx context.Context, cfg ExecuteBlockCfg, initialCycle bool, logger log.Logger) (err error) { +func ExecBlockV3(s *StageState, u Unwinder, txc wrap.TxContainer, toBlock uint64, ctx context.Context, cfg ExecuteBlockCfg, initialCycle bool, logger log.Logger, isMining bool) (err error) { workersCount := cfg.syncCfg.ExecWorkerCount if !initialCycle { workersCount = 1 @@ -159,7 +159,7 @@ func ExecBlockV3(s *StageState, u Unwinder, txc wrap.TxContainer, toBlock uint64 } parallel := txc.Tx == nil - if err := ExecV3(ctx, s, u, workersCount, cfg, txc, parallel, to, logger, initialCycle); err != nil { + if err := ExecV3(ctx, s, u, workersCount, cfg, txc, parallel, to, logger, initialCycle, isMining); err != nil { return err } return nil @@ -252,7 +252,7 @@ func SpawnExecuteBlocksStage(s *StageState, u Unwinder, txc wrap.TxContainer, to if dbg.StagesOnlyBlocks { return nil } - if err = ExecBlockV3(s, u, txc, toBlock, ctx, cfg, s.CurrentSyncCycle.IsInitialCycle, logger); err != nil { + if err = ExecBlockV3(s, u, txc, toBlock, ctx, cfg, s.CurrentSyncCycle.IsInitialCycle, logger, false); err != nil { return err } return nil diff --git a/eth/stagedsync/stage_mining_exec.go b/eth/stagedsync/stage_mining_exec.go index 160b25a36bd..a98387dac6d 100644 --- a/eth/stagedsync/stage_mining_exec.go +++ b/eth/stagedsync/stage_mining_exec.go @@ -200,7 +200,7 @@ func SpawnMiningExecStage(s *StageState, txc wrap.TxContainer, cfg MiningExecCfg var err error var block *types.Block - block, current.Txs, current.Receipts, err = core.FinalizeBlockExecution(cfg.engine, stateReader, current.Header, current.Txs, current.Uncles, &state.NoopWriter{}, &cfg.chainConfig, ibs, current.Receipts, current.Withdrawals, current.Requests, chainReader, s.state.mode, logger) + block, current.Txs, current.Receipts, err = core.FinalizeBlockExecution(cfg.engine, stateReader, current.Header, current.Txs, current.Uncles, &state.NoopWriter{}, &cfg.chainConfig, ibs, current.Receipts, current.Withdrawals, current.Requests, chainReader, true, logger) if err != nil { return fmt.Errorf("cannot finalize block execution: %s", err) } @@ -238,7 +238,7 @@ func SpawnMiningExecStage(s *StageState, txc wrap.TxContainer, cfg MiningExecCfg // This flag will skip checking the state root execCfg.blockProduction = true execS := &StageState{state: s.state, ID: stages.Execution, BlockNumber: blockHeight - 1} - if err = ExecBlockV3(execS, u, txc, blockHeight, context.Background(), execCfg, false, logger); err != nil { + if err = ExecBlockV3(execS, u, txc, blockHeight, context.Background(), execCfg, false, logger, true); err != nil { logger.Error("cannot execute block execution", "err", err) return err } diff --git a/eth/stagedsync/stagedsynctest/harness.go b/eth/stagedsync/stagedsynctest/harness.go index 6bcaec8a82d..71803cf0b32 100644 --- a/eth/stagedsync/stagedsynctest/harness.go +++ b/eth/stagedsync/stagedsynctest/harness.go @@ -86,7 +86,6 @@ func InitHarness(ctx context.Context, t *testing.T, cfg HarnessCfg) Harness { stagedsync.DefaultUnwindOrder, stagedsync.DefaultPruneOrder, logger, - stages.ApplyingBlocks, ) miningSyncStages := stagedsync.MiningStages( ctx, @@ -103,7 +102,6 @@ func InitHarness(ctx context.Context, t *testing.T, cfg HarnessCfg) Harness { stagedsync.MiningUnwindOrder, stagedsync.MiningPruneOrder, logger, - stages.BlockProduction, ) validatorKey, err := crypto.GenerateKey() require.NoError(t, err) diff --git a/eth/stagedsync/stages/stages.go b/eth/stagedsync/stages/stages.go index 4183fb19c04..a6d70c5b8b2 100644 --- a/eth/stagedsync/stages/stages.go +++ b/eth/stagedsync/stages/stages.go @@ -115,11 +115,3 @@ func encodeBigEndian(n uint64) []byte { binary.BigEndian.PutUint64(v[:], n) return v[:] } - -type Mode int - -const ( - BlockProduction = Mode(iota) - ForkValidation - ApplyingBlocks -) diff --git a/eth/stagedsync/sync.go b/eth/stagedsync/sync.go index aa9985145f6..5f3b20ce3ed 100644 --- a/eth/stagedsync/sync.go +++ b/eth/stagedsync/sync.go @@ -49,7 +49,6 @@ type Sync struct { logPrefixes []string logger log.Logger stagesIdsList []string - mode stages.Mode } type Timing struct { @@ -78,7 +77,7 @@ func (s *Sync) PrevUnwindPoint() *uint64 { } func (s *Sync) NewUnwindState(id stages.SyncStage, unwindPoint, currentProgress uint64, initialCycle, firstCycle bool) *UnwindState { - return &UnwindState{id, unwindPoint, currentProgress, UnwindReason{nil, nil}, s, CurrentSyncCycleInfo{initialCycle, firstCycle, s.mode}} + return &UnwindState{id, unwindPoint, currentProgress, UnwindReason{nil, nil}, s, CurrentSyncCycleInfo{initialCycle, firstCycle}} } // Get the current prune status from the DB @@ -103,7 +102,7 @@ func (s *Sync) PruneStageState(id stages.SyncStage, forwardProgress uint64, tx k } } - return &PruneState{id, forwardProgress, pruneProgress, s, CurrentSyncCycleInfo{initialCycle, false, s.mode}}, nil + return &PruneState{id, forwardProgress, pruneProgress, s, CurrentSyncCycleInfo{initialCycle, false}}, nil } func (s *Sync) NextStage() { @@ -208,7 +207,7 @@ func (s *Sync) SetCurrentStage(id stages.SyncStage) error { return fmt.Errorf("stage not found with id: %v", id) } -func New(cfg ethconfig.Sync, stagesList []*Stage, unwindOrder UnwindOrder, pruneOrder PruneOrder, logger log.Logger, mode stages.Mode) *Sync { +func New(cfg ethconfig.Sync, stagesList []*Stage, unwindOrder UnwindOrder, pruneOrder PruneOrder, logger log.Logger) *Sync { unwindStages := make([]*Stage, len(stagesList)) for i, stageIndex := range unwindOrder { for _, s := range stagesList { @@ -244,7 +243,6 @@ func New(cfg ethconfig.Sync, stagesList []*Stage, unwindOrder UnwindOrder, prune logPrefixes: logPrefixes, logger: logger, stagesIdsList: stagesIdsList, - mode: mode, } } @@ -269,7 +267,7 @@ func (s *Sync) StageState(stage stages.SyncStage, tx kv.Tx, db kv.RoDB, initialC } } - return &StageState{s, stage, blockNum, CurrentSyncCycleInfo{initialCycle, firstCycle, s.mode}}, nil + return &StageState{s, stage, blockNum, CurrentSyncCycleInfo{initialCycle, firstCycle}}, nil } func (s *Sync) RunUnwind(db kv.RwDB, txc wrap.TxContainer) error { diff --git a/eth/stagedsync/sync_test.go b/eth/stagedsync/sync_test.go index f86c4c3f223..ed9c8db253a 100644 --- a/eth/stagedsync/sync_test.go +++ b/eth/stagedsync/sync_test.go @@ -59,7 +59,7 @@ func TestStagesSuccess(t *testing.T) { }, }, } - state := New(ethconfig.Defaults.Sync, s, nil, nil, log.New(), stages.ApplyingBlocks) + state := New(ethconfig.Defaults.Sync, s, nil, nil, log.New()) db, tx := memdb.NewTestTx(t) _, err := state.Run(db, wrap.TxContainer{Tx: tx}, true /* initialCycle */, false) assert.NoError(t, err) @@ -99,7 +99,7 @@ func TestDisabledStages(t *testing.T) { }, }, } - state := New(ethconfig.Defaults.Sync, s, nil, nil, log.New(), stages.ApplyingBlocks) + state := New(ethconfig.Defaults.Sync, s, nil, nil, log.New()) db, tx := memdb.NewTestTx(t) _, err := state.Run(db, wrap.TxContainer{Tx: tx}, true /* initialCycle */, false) assert.NoError(t, err) @@ -139,7 +139,7 @@ func TestErroredStage(t *testing.T) { }, }, } - state := New(ethconfig.Defaults.Sync, s, []stages.SyncStage{s[2].ID, s[1].ID, s[0].ID}, nil, log.New(), stages.ApplyingBlocks) + state := New(ethconfig.Defaults.Sync, s, []stages.SyncStage{s[2].ID, s[1].ID, s[0].ID}, nil, log.New()) db, tx := memdb.NewTestTx(t) _, err := state.Run(db, wrap.TxContainer{Tx: tx}, true /* initialCycle */, false) assert.Equal(t, fmt.Errorf("[2/3 Bodies] %w", expectedErr), err) @@ -207,7 +207,7 @@ func TestUnwindSomeStagesBehindUnwindPoint(t *testing.T) { }, }, } - state := New(ethconfig.Defaults.Sync, s, []stages.SyncStage{s[2].ID, s[1].ID, s[0].ID}, nil, log.New(), stages.ApplyingBlocks) + state := New(ethconfig.Defaults.Sync, s, []stages.SyncStage{s[2].ID, s[1].ID, s[0].ID}, nil, log.New()) db, tx := memdb.NewTestTx(t) _, err := state.Run(db, wrap.TxContainer{Tx: tx}, true /* initialCycle */, false) assert.NoError(t, err) @@ -285,7 +285,7 @@ func TestUnwind(t *testing.T) { }, }, } - state := New(ethconfig.Defaults.Sync, s, []stages.SyncStage{s[2].ID, s[1].ID, s[0].ID}, nil, log.New(), stages.ApplyingBlocks) + state := New(ethconfig.Defaults.Sync, s, []stages.SyncStage{s[2].ID, s[1].ID, s[0].ID}, nil, log.New()) db, tx := memdb.NewTestTx(t) _, err := state.Run(db, wrap.TxContainer{Tx: tx}, true /* initialCycle */, false) assert.NoError(t, err) @@ -374,7 +374,7 @@ func TestUnwindEmptyUnwinder(t *testing.T) { }, }, } - state := New(ethconfig.Defaults.Sync, s, []stages.SyncStage{s[2].ID, s[1].ID, s[0].ID}, nil, log.New(), stages.ApplyingBlocks) + state := New(ethconfig.Defaults.Sync, s, []stages.SyncStage{s[2].ID, s[1].ID, s[0].ID}, nil, log.New()) db, tx := memdb.NewTestTx(t) _, err := state.Run(db, wrap.TxContainer{Tx: tx}, true /* initialCycle */, false) assert.NoError(t, err) @@ -430,12 +430,12 @@ func TestSyncDoTwice(t *testing.T) { }, } - state := New(ethconfig.Defaults.Sync, s, nil, nil, log.New(), stages.ApplyingBlocks) + state := New(ethconfig.Defaults.Sync, s, nil, nil, log.New()) db, tx := memdb.NewTestTx(t) _, err := state.Run(db, wrap.TxContainer{Tx: tx}, true /* initialCycle */, false) assert.NoError(t, err) - state = New(ethconfig.Defaults.Sync, s, nil, nil, log.New(), stages.ApplyingBlocks) + state = New(ethconfig.Defaults.Sync, s, nil, nil, log.New()) _, err = state.Run(db, wrap.TxContainer{Tx: tx}, true /* initialCycle */, false) assert.NoError(t, err) @@ -488,14 +488,14 @@ func TestStateSyncInterruptRestart(t *testing.T) { }, } - state := New(ethconfig.Defaults.Sync, s, nil, nil, log.New(), stages.ApplyingBlocks) + state := New(ethconfig.Defaults.Sync, s, nil, nil, log.New()) db, tx := memdb.NewTestTx(t) _, err := state.Run(db, wrap.TxContainer{Tx: tx}, true /* initialCycle */, false) assert.Equal(t, fmt.Errorf("[2/3 Bodies] %w", expectedErr), err) expectedErr = nil - state = New(ethconfig.Defaults.Sync, s, nil, nil, log.New(), stages.ApplyingBlocks) + state = New(ethconfig.Defaults.Sync, s, nil, nil, log.New()) _, err = state.Run(db, wrap.TxContainer{Tx: tx}, true /* initialCycle */, false) assert.NoError(t, err) @@ -567,7 +567,7 @@ func TestSyncInterruptLongUnwind(t *testing.T) { }, }, } - state := New(ethconfig.Defaults.Sync, s, []stages.SyncStage{s[2].ID, s[1].ID, s[0].ID}, nil, log.New(), stages.ApplyingBlocks) + state := New(ethconfig.Defaults.Sync, s, []stages.SyncStage{s[2].ID, s[1].ID, s[0].ID}, nil, log.New()) db, tx := memdb.NewTestTx(t) _, err := state.Run(db, wrap.TxContainer{Tx: tx}, true /* initialCycle */, false) assert.Error(t, errInterrupted, err) diff --git a/turbo/stages/mock/mock_sentry.go b/turbo/stages/mock/mock_sentry.go index 00f8857ae18..d31ee10e72c 100644 --- a/turbo/stages/mock/mock_sentry.go +++ b/turbo/stages/mock/mock_sentry.go @@ -504,7 +504,7 @@ func MockWithEverything(tb testing.TB, gspec *types.Genesis, key *ecdsa.PrivateK stagedsync.StageMiningExecCfg(mock.DB, miner, nil, *mock.ChainConfig, mock.Engine, &vm.Config{}, dirs.Tmp, nil, 0, mock.TxPool, nil, mock.BlockReader), stagedsync.StageMiningFinishCfg(mock.DB, *mock.ChainConfig, mock.Engine, miner, miningCancel, mock.BlockReader, latestBlockBuiltStore), ), stagedsync.MiningUnwindOrder, stagedsync.MiningPruneOrder, - logger, stages.BlockProduction) + logger) // We start the mining step if err := stages2.MiningStep(ctx, mock.DB, proposingSync, tmpdir, logger); err != nil { return nil, err @@ -541,13 +541,12 @@ func MockWithEverything(tb testing.TB, gspec *types.Genesis, key *ecdsa.PrivateK stagedsync.DefaultUnwindOrder, stagedsync.DefaultPruneOrder, logger, - stages.ApplyingBlocks, ) cfg.Genesis = gspec pipelineStages := stages2.NewPipelineStages(mock.Ctx, db, &cfg, p2p.Config{}, mock.sentriesClient, mock.Notifications, snapDownloader, mock.BlockReader, blockRetire, mock.agg, nil, forkValidator, logger, checkStateRoot) - mock.posStagedSync = stagedsync.New(cfg.Sync, pipelineStages, stagedsync.PipelineUnwindOrder, stagedsync.PipelinePruneOrder, logger, stages.ApplyingBlocks) + mock.posStagedSync = stagedsync.New(cfg.Sync, pipelineStages, stagedsync.PipelineUnwindOrder, stagedsync.PipelinePruneOrder, logger) mock.Eth1ExecutionService = eth1.NewEthereumExecutionModule(mock.BlockReader, mock.DB, mock.posStagedSync, forkValidator, mock.ChainConfig, assembleBlockPOS, nil, mock.Notifications.Accumulator, mock.Notifications.StateChangesConsumer, logger, engine, cfg.Sync, ctx) @@ -583,7 +582,6 @@ func MockWithEverything(tb testing.TB, gspec *types.Genesis, key *ecdsa.PrivateK stagedsync.MiningUnwindOrder, stagedsync.MiningPruneOrder, logger, - stages.BlockProduction, ) cfg.Genesis = gspec diff --git a/turbo/stages/stageloop.go b/turbo/stages/stageloop.go index d965fa3af55..52bc1f1c571 100644 --- a/turbo/stages/stageloop.go +++ b/turbo/stages/stageloop.go @@ -714,7 +714,6 @@ func NewInMemoryExecution(ctx context.Context, db kv.RwDB, cfg *ethconfig.Config stagedsync.StateUnwindOrder, nil, /* pruneOrder */ logger, - stages.ForkValidation, ) }