Skip to content

Commit

Permalink
Revert "InMem/Sync/IsMining replacement" (#12067)
Browse files Browse the repository at this point in the history
Reverts #12028

---------

Co-authored-by: JkLondon <[email protected]>
  • Loading branch information
JkLondon and JkLondon authored Sep 23, 2024
1 parent 7d26b48 commit 6d975a9
Show file tree
Hide file tree
Showing 14 changed files with 68 additions and 87 deletions.
5 changes: 2 additions & 3 deletions cmd/integration/commands/stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -1486,9 +1486,9 @@ func newSync(ctx context.Context, db kv.RwDB, miningConfig *params.MiningConfig,
recents = bor.Recents
signatures = bor.Signatures
}
stagesDefault := stages2.NewDefaultStages(context.Background(), db, snapDb, p2p.Config{}, &cfg, sentryControlServer, notifications, nil, blockReader, blockRetire, agg, nil, nil,
stages := stages2.NewDefaultStages(context.Background(), db, snapDb, p2p.Config{}, &cfg, sentryControlServer, notifications, nil, blockReader, blockRetire, agg, nil, nil,
heimdallClient, recents, signatures, logger)
sync := stagedsync.New(cfg.Sync, stagesDefault, stagedsync.DefaultUnwindOrder, stagedsync.DefaultPruneOrder, logger, stages.ApplyingBlocks)
sync := stagedsync.New(cfg.Sync, stages, stagedsync.DefaultUnwindOrder, stagedsync.DefaultPruneOrder, logger)

miner := stagedsync.NewMiningState(&cfg.Miner)
miningCancel := make(chan struct{})
Expand Down Expand Up @@ -1527,7 +1527,6 @@ func newSync(ctx context.Context, db kv.RwDB, miningConfig *params.MiningConfig,
stagedsync.MiningUnwindOrder,
stagedsync.MiningPruneOrder,
logger,
stages.BlockProduction,
)

return engine, vmConfig, sync, miningSync, miner
Expand Down
24 changes: 11 additions & 13 deletions cmd/state/exec3/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ import (
"context"
"sync"

"github.com/erigontech/erigon/eth/stagedsync/stages"

"golang.org/x/sync/errgroup"

"github.com/erigontech/erigon-lib/log/v3"
Expand Down Expand Up @@ -72,10 +70,10 @@ type Worker struct {

dirs datadir.Dirs

mode stages.Mode
isMining bool
}

func NewWorker(lock sync.Locker, logger log.Logger, ctx context.Context, background bool, chainDb kv.RoDB, in *state.QueueWithRetry, blockReader services.FullBlockReader, chainConfig *chain.Config, genesis *types.Genesis, results *state.ResultsQueue, engine consensus.Engine, dirs datadir.Dirs, mode stages.Mode) *Worker {
func NewWorker(lock sync.Locker, logger log.Logger, ctx context.Context, background bool, chainDb kv.RoDB, in *state.QueueWithRetry, blockReader services.FullBlockReader, chainConfig *chain.Config, genesis *types.Genesis, results *state.ResultsQueue, engine consensus.Engine, dirs datadir.Dirs, isMining bool) *Worker {
w := &Worker{
lock: lock,
logger: logger,
Expand All @@ -96,7 +94,7 @@ func NewWorker(lock sync.Locker, logger log.Logger, ctx context.Context, backgro

dirs: dirs,

mode: mode,
isMining: isMining,
}
w.taskGasPool.AddBlobGas(chainConfig.GetMaxBlobGasPerBlock())
w.vmCfg = vm.Config{Debug: true, Tracer: w.callTracer}
Expand Down Expand Up @@ -132,18 +130,18 @@ func (rw *Worker) ResetTx(chainTx kv.Tx) {

func (rw *Worker) Run() error {
for txTask, ok := rw.in.Next(rw.ctx); ok; txTask, ok = rw.in.Next(rw.ctx) {
rw.RunTxTask(txTask, rw.mode)
rw.RunTxTask(txTask, rw.isMining)
if err := rw.resultCh.Add(rw.ctx, txTask); err != nil {
return err
}
}
return nil
}

func (rw *Worker) RunTxTask(txTask *state.TxTask, mode stages.Mode) {
func (rw *Worker) RunTxTask(txTask *state.TxTask, isMining bool) {
rw.lock.Lock()
defer rw.lock.Unlock()
rw.RunTxTaskNoLock(txTask, mode)
rw.RunTxTaskNoLock(txTask, isMining)
}

// Needed to set history reader when need to offset few txs from block beginning and does not break processing,
Expand All @@ -165,7 +163,7 @@ func (rw *Worker) SetReader(reader state.ResettableStateReader) {
}
}

func (rw *Worker) RunTxTaskNoLock(txTask *state.TxTask, mode stages.Mode) {
func (rw *Worker) RunTxTaskNoLock(txTask *state.TxTask, isMining bool) {
if txTask.HistoryExecution && !rw.historyMode {
// in case if we cancelled execution and commitment happened in the middle of the block, we have to process block
// from the beginning until committed txNum and only then disable history mode.
Expand Down Expand Up @@ -234,7 +232,7 @@ func (rw *Worker) RunTxTaskNoLock(txTask *state.TxTask, mode stages.Mode) {
return core.SysCallContract(contract, data, rw.chainConfig, ibs, header, rw.engine, false /* constCall */)
}

if mode == stages.BlockProduction {
if isMining {
_, txTask.Txs, txTask.BlockReceipts, err = rw.engine.FinalizeAndAssemble(rw.chainConfig, types.CopyHeader(header), ibs, txTask.Txs, txTask.Uncles, txTask.BlockReceipts, txTask.Withdrawals, txTask.Requests, rw.chain, syscall, nil, rw.logger)
} else {
_, _, _, err = rw.engine.Finalize(rw.chainConfig, types.CopyHeader(header), ibs, txTask.Txs, txTask.Uncles, txTask.BlockReceipts, txTask.Withdrawals, txTask.Requests, rw.chain, syscall, rw.logger)
Expand Down Expand Up @@ -299,7 +297,7 @@ func (rw *Worker) RunTxTaskNoLock(txTask *state.TxTask, mode stages.Mode) {
}
}

func NewWorkersPool(lock sync.Locker, accumulator *shards.Accumulator, logger log.Logger, ctx context.Context, background bool, chainDb kv.RoDB, rs *state.StateV3, in *state.QueueWithRetry, blockReader services.FullBlockReader, chainConfig *chain.Config, genesis *types.Genesis, engine consensus.Engine, workerCount int, dirs datadir.Dirs, mode stages.Mode) (reconWorkers []*Worker, applyWorker *Worker, rws *state.ResultsQueue, clear func(), wait func()) {
func NewWorkersPool(lock sync.Locker, accumulator *shards.Accumulator, logger log.Logger, ctx context.Context, background bool, chainDb kv.RoDB, rs *state.StateV3, in *state.QueueWithRetry, blockReader services.FullBlockReader, chainConfig *chain.Config, genesis *types.Genesis, engine consensus.Engine, workerCount int, dirs datadir.Dirs, isMining bool) (reconWorkers []*Worker, applyWorker *Worker, rws *state.ResultsQueue, clear func(), wait func()) {
reconWorkers = make([]*Worker, workerCount)

resultChSize := workerCount * 8
Expand All @@ -310,7 +308,7 @@ func NewWorkersPool(lock sync.Locker, accumulator *shards.Accumulator, logger lo
ctx, cancel := context.WithCancel(ctx)
g, ctx := errgroup.WithContext(ctx)
for i := 0; i < workerCount; i++ {
reconWorkers[i] = NewWorker(lock, logger, ctx, background, chainDb, in, blockReader, chainConfig, genesis, rws, engine, dirs, mode)
reconWorkers[i] = NewWorker(lock, logger, ctx, background, chainDb, in, blockReader, chainConfig, genesis, rws, engine, dirs, isMining)
reconWorkers[i].ResetState(rs, accumulator)
}
if background {
Expand All @@ -337,7 +335,7 @@ func NewWorkersPool(lock sync.Locker, accumulator *shards.Accumulator, logger lo
//applyWorker.ResetTx(nil)
}
}
applyWorker = NewWorker(lock, logger, ctx, false, chainDb, in, blockReader, chainConfig, genesis, rws, engine, dirs, mode)
applyWorker = NewWorker(lock, logger, ctx, false, chainDb, in, blockReader, chainConfig, genesis, rws, engine, dirs, isMining)

return reconWorkers, applyWorker, rws, clear, wait
}
12 changes: 5 additions & 7 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ import (
"slices"
"time"

"github.com/erigontech/erigon/eth/stagedsync/stages"

"golang.org/x/crypto/sha3"

"github.com/erigontech/erigon-lib/chain"
Expand Down Expand Up @@ -172,7 +170,7 @@ func ExecuteBlockEphemerally(

if !vmConfig.ReadOnly {
txs := block.Transactions()
if _, _, _, err := FinalizeBlockExecution(engine, stateReader, block.Header(), txs, block.Uncles(), stateWriter, chainConfig, ibs, receipts, block.Withdrawals(), block.Requests(), chainReader, stages.ApplyingBlocks, logger); err != nil {
if _, _, _, err := FinalizeBlockExecution(engine, stateReader, block.Header(), txs, block.Uncles(), stateWriter, chainConfig, ibs, receipts, block.Withdrawals(), block.Requests(), chainReader, false, logger); err != nil {
return nil, err
}
}
Expand Down Expand Up @@ -325,14 +323,14 @@ func FinalizeBlockExecution(
stateWriter state.StateWriter, cc *chain.Config,
ibs *state.IntraBlockState, receipts types.Receipts,
withdrawals []*types.Withdrawal, requests types.Requests, chainReader consensus.ChainReader,
mode stages.Mode,
isMining bool,
logger log.Logger,
) (newBlock *types.Block, newTxs types.Transactions, newReceipt types.Receipts, err error) {
syscall := func(contract libcommon.Address, data []byte) ([]byte, error) {
return SysCallContract(contract, data, cc, ibs, header, engine, false /* constCall */)
}

if mode == stages.BlockProduction {
if isMining {
newBlock, newTxs, newReceipt, err = engine.FinalizeAndAssemble(cc, header, ibs, txs, uncles, receipts, withdrawals, requests, chainReader, syscall, nil, logger)
} else {
var rss types.Requests
Expand Down Expand Up @@ -369,7 +367,7 @@ func InitializeBlockExecution(engine consensus.Engine, chain consensus.ChainHead
return nil
}

func BlockPostValidation(gasUsed, blobGasUsed uint64, checkReceipts bool, receipts types.Receipts, h *types.Header, mode stages.Mode) error {
func BlockPostValidation(gasUsed, blobGasUsed uint64, checkReceipts bool, receipts types.Receipts, h *types.Header, isMining bool) error {
if gasUsed != h.GasUsed {
return fmt.Errorf("gas used by execution: %d, in header: %d, headerNum=%d, %x",
gasUsed, h.GasUsed, h.Number.Uint64(), h.Hash())
Expand All @@ -385,7 +383,7 @@ func BlockPostValidation(gasUsed, blobGasUsed uint64, checkReceipts bool, receip
}
receiptHash := types.DeriveSha(receipts)
if receiptHash != h.ReceiptHash {
if mode == stages.BlockProduction {
if isMining {
h.ReceiptHash = receiptHash
return nil
}
Expand Down
8 changes: 4 additions & 4 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -722,7 +722,7 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger
stagedsync.StageMiningExecCfg(backend.chainDB, miner, backend.notifications.Events, *backend.chainConfig, backend.engine, &vm.Config{}, tmpdir, nil, 0, backend.txPool, backend.txPoolDB, blockReader),
stagedsync.StageMiningFinishCfg(backend.chainDB, *backend.chainConfig, backend.engine, miner, backend.miningSealingQuit, backend.blockReader, latestBlockBuiltStore),
), stagedsync.MiningUnwindOrder, stagedsync.MiningPruneOrder,
logger, stages.BlockProduction)
logger)

var ethashApi *ethash.API
if casted, ok := backend.engine.(*ethash.Ethash); ok {
Expand Down Expand Up @@ -763,7 +763,7 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger
),
stagedsync.StageSendersCfg(backend.chainDB, chainConfig, config.Sync, false, dirs.Tmp, config.Prune, blockReader, backend.sentriesClient.Hd),
stagedsync.StageMiningExecCfg(backend.chainDB, miningStatePos, backend.notifications.Events, *backend.chainConfig, backend.engine, &vm.Config{}, tmpdir, interrupt, param.PayloadId, backend.txPool, backend.txPoolDB, blockReader),
stagedsync.StageMiningFinishCfg(backend.chainDB, *backend.chainConfig, backend.engine, miningStatePos, backend.miningSealingQuit, backend.blockReader, latestBlockBuiltStore)), stagedsync.MiningUnwindOrder, stagedsync.MiningPruneOrder, logger, stages.BlockProduction)
stagedsync.StageMiningFinishCfg(backend.chainDB, *backend.chainConfig, backend.engine, miningStatePos, backend.miningSealingQuit, backend.blockReader, latestBlockBuiltStore)), stagedsync.MiningUnwindOrder, stagedsync.MiningPruneOrder, logger)
// We start the mining step
if err := stages2.MiningStep(ctx, backend.chainDB, proposingSync, tmpdir, logger); err != nil {
return nil, err
Expand Down Expand Up @@ -916,7 +916,7 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger
backend.syncPruneOrder = stagedsync.DefaultPruneOrder
}

backend.stagedSync = stagedsync.New(config.Sync, backend.syncStages, backend.syncUnwindOrder, backend.syncPruneOrder, logger, stages.ApplyingBlocks)
backend.stagedSync = stagedsync.New(config.Sync, backend.syncStages, backend.syncUnwindOrder, backend.syncPruneOrder, logger)

hook := stages2.NewHook(backend.sentryCtx, backend.chainDB, backend.notifications, backend.stagedSync, backend.blockReader, backend.chainConfig, backend.logger, backend.sentriesClient.SetStatus)

Expand Down Expand Up @@ -944,7 +944,7 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger

checkStateRoot := true
pipelineStages := stages2.NewPipelineStages(ctx, backend.chainDB, config, p2pConfig, backend.sentriesClient, backend.notifications, backend.downloaderClient, blockReader, blockRetire, backend.agg, backend.silkworm, backend.forkValidator, logger, checkStateRoot)
backend.pipelineStagedSync = stagedsync.New(config.Sync, pipelineStages, stagedsync.PipelineUnwindOrder, stagedsync.PipelinePruneOrder, logger, stages.ApplyingBlocks)
backend.pipelineStagedSync = stagedsync.New(config.Sync, pipelineStages, stagedsync.PipelineUnwindOrder, stagedsync.PipelinePruneOrder, logger)
backend.eth1ExecutionServer = eth1.NewEthereumExecutionModule(blockReader, backend.chainDB, backend.pipelineStagedSync, backend.forkValidator, chainConfig, assembleBlockPOS, hook, backend.notifications.Accumulator, backend.notifications.StateChangesConsumer, logger, backend.engine, config.Sync, ctx)
executionRpc := direct.NewExecutionClientDirect(backend.eth1ExecutionServer)

Expand Down
Loading

0 comments on commit 6d975a9

Please sign in to comment.