diff --git a/src/Nethermind/Nethermind.Consensus/Processing/BlockCachePreWarmer.cs b/src/Nethermind/Nethermind.Consensus/Processing/BlockCachePreWarmer.cs index aab8d753950..c972b86f9d2 100644 --- a/src/Nethermind/Nethermind.Consensus/Processing/BlockCachePreWarmer.cs +++ b/src/Nethermind/Nethermind.Consensus/Processing/BlockCachePreWarmer.cs @@ -30,7 +30,7 @@ public Task PreWarmCaches(Block suggestedBlock, Hash256? parentStateRoot, Cancel { if (targetWorldState.ClearCache()) { - if (_logger.IsWarn) _logger.Warn("Cashes are not empty. Clearing them."); + if (_logger.IsWarn) _logger.Warn("Caches are not empty. Clearing them."); } if (!IsGenesisBlock(parentStateRoot) && Environment.ProcessorCount > 2 && !cancellationToken.IsCancellationRequested) @@ -54,9 +54,15 @@ private void PreWarmCachesParallel(Block suggestedBlock, Hash256 parentStateRoot try { + var physicalCoreCount = RuntimeInformation.PhysicalCoreCount; + if (physicalCoreCount < 2) + { + if (_logger.IsDebug) _logger.Debug("Physical core count is less than 2. Skipping pre-warming."); + return; + } if (_logger.IsDebug) _logger.Debug($"Started pre-warming caches for block {suggestedBlock.Number}."); - ParallelOptions parallelOptions = new() { MaxDegreeOfParallelism = Math.Max(1, RuntimeInformation.PhysicalCoreCount - 2), CancellationToken = cancellationToken }; + ParallelOptions parallelOptions = new() { MaxDegreeOfParallelism = physicalCoreCount - 1, CancellationToken = cancellationToken }; IReleaseSpec spec = specProvider.GetSpec(suggestedBlock.Header); WarmupTransactions(parallelOptions, spec, suggestedBlock, parentStateRoot); @@ -74,13 +80,18 @@ void WarmupWithdrawals(ParallelOptions parallelOptions, IReleaseSpec spec, Block if (parallelOptions.CancellationToken.IsCancellationRequested) return; if (spec.WithdrawalsEnabled && block.Withdrawals is not null) { + int progress = 0; Parallel.For(0, block.Withdrawals.Length, parallelOptions, - i => + _ => { IReadOnlyTxProcessorSource env = _envPool.Get(); + int i = 0; try { using IReadOnlyTxProcessingScope scope = env.Build(stateRoot); + // Process withdrawals in sequential order, rather than partitioning scheme from Parallel.For + // Interlocked.Increment returns the incremented value, so subtract 1 to start at 0 + i = Interlocked.Increment(ref progress) - 1; scope.WorldState.WarmUp(block.Withdrawals[i].Address); } catch (Exception ex) @@ -98,17 +109,23 @@ void WarmupWithdrawals(ParallelOptions parallelOptions, IReleaseSpec spec, Block void WarmupTransactions(ParallelOptions parallelOptions, IReleaseSpec spec, Block block, Hash256 stateRoot) { if (parallelOptions.CancellationToken.IsCancellationRequested) return; - Parallel.For(0, block.Transactions.Length, parallelOptions, i => - { - // If the transaction has already been processed or being processed, exit early - if (block.TransactionProcessed >= i) return; + int progress = 0; + Parallel.For(0, block.Transactions.Length, parallelOptions, _ => + { using ThreadExtensions.Disposable handle = Thread.CurrentThread.BoostPriority(); - Transaction tx = block.Transactions[i]; IReadOnlyTxProcessorSource env = _envPool.Get(); SystemTransaction systemTransaction = _systemTransactionPool.Get(); + Transaction? tx = null; try { + // Process transactions in sequential order, rather than partitioning scheme from Parallel.For + // Interlocked.Increment returns the incremented value, so subtract 1 to start at 0 + int i = Interlocked.Increment(ref progress) - 1; + // If the transaction has already been processed or being processed, exit early + if (block.TransactionProcessed > i) return; + + tx = block.Transactions[i]; tx.CopyTo(systemTransaction); using IReadOnlyTxProcessingScope scope = env.Build(stateRoot); if (spec.UseTxAccessLists) @@ -116,11 +133,11 @@ void WarmupTransactions(ParallelOptions parallelOptions, IReleaseSpec spec, Bloc scope.WorldState.WarmUp(tx.AccessList); // eip-2930 } TransactionResult result = scope.TransactionProcessor.Trace(systemTransaction, new BlockExecutionContext(block.Header.Clone()), NullTxTracer.Instance); - if (_logger.IsTrace) _logger.Trace($"Finished pre-warming cache for tx {tx.Hash} with {result}"); + if (_logger.IsTrace) _logger.Trace($"Finished pre-warming cache for tx[{i}] {tx.Hash} with {result}"); } catch (Exception ex) { - if (_logger.IsDebug) _logger.Error($"Error pre-warming cache {tx.Hash}", ex); + if (_logger.IsDebug) _logger.Error($"Error pre-warming cache {tx?.Hash}", ex); } finally {