diff --git a/crates/consensus/beacon/src/engine/hooks/controller.rs b/crates/consensus/beacon/src/engine/hooks/controller.rs index 1449629c70f8..30c0a969772e 100644 --- a/crates/consensus/beacon/src/engine/hooks/controller.rs +++ b/crates/consensus/beacon/src/engine/hooks/controller.rs @@ -1,10 +1,20 @@ -use crate::hooks::{EngineContext, EngineHook, EngineHookAction, EngineHookError, EngineHooks}; +use crate::hooks::{ + EngineContext, EngineHook, EngineHookAction, EngineHookDBAccessLevel, EngineHookError, + EngineHookEvent, EngineHooks, +}; use std::{ collections::VecDeque, task::{Context, Poll}, }; use tracing::debug; +#[derive(Debug)] +pub(crate) struct PolledHook { + pub(crate) event: EngineHookEvent, + pub(crate) action: Option, + pub(crate) db_access_level: EngineHookDBAccessLevel, +} + /// Manages hooks under the control of the engine. /// /// This type polls the initialized hooks one by one, respecting the DB access level @@ -41,28 +51,27 @@ impl EngineHooksController { &mut self, cx: &mut Context<'_>, args: EngineContext, - ) -> Poll> { + ) -> Poll> { let Some(mut hook) = self.running_hook_with_db_write.take() else { return Poll::Pending }; match hook.poll(cx, args) { Poll::Ready((event, action)) => { + let result = PolledHook { event, action, db_access_level: hook.db_access_level() }; + debug!( target: "consensus::engine::hooks", hook = hook.name(), - ?action, - ?event, + ?result, "Polled running hook with db write access" ); - if !event.is_finished() { + if !result.event.is_finished() { self.running_hook_with_db_write = Some(hook); } else { self.hooks.push_back(hook); } - if let Some(action) = action { - return Poll::Ready(Ok(action)) - } + return Poll::Ready(Ok(result)) } Poll::Pending => { self.running_hook_with_db_write = Some(hook); @@ -89,7 +98,7 @@ impl EngineHooksController { cx: &mut Context<'_>, args: EngineContext, db_write_active: bool, - ) -> Poll> { + ) -> Poll> { let Some(mut hook) = self.hooks.pop_front() else { return Poll::Pending }; // Hook with DB write access level is not allowed to run due to already running hook with DB @@ -101,23 +110,22 @@ impl EngineHooksController { } if let Poll::Ready((event, action)) = hook.poll(cx, args) { + let result = PolledHook { event, action, db_access_level: hook.db_access_level() }; + debug!( target: "consensus::engine::hooks", hook = hook.name(), - ?action, - ?event, + ?result, "Polled next hook" ); - if event.is_started() && hook.db_access_level().is_read_write() { + if result.event.is_started() && result.db_access_level.is_read_write() { self.running_hook_with_db_write = Some(hook); } else { self.hooks.push_back(hook); } - if let Some(action) = action { - return Poll::Ready(Ok(action)) - } + return Poll::Ready(Ok(result)) } else { self.hooks.push_back(hook); } diff --git a/crates/consensus/beacon/src/engine/hooks/mod.rs b/crates/consensus/beacon/src/engine/hooks/mod.rs index d2770a77c1c7..f031231b23a0 100644 --- a/crates/consensus/beacon/src/engine/hooks/mod.rs +++ b/crates/consensus/beacon/src/engine/hooks/mod.rs @@ -6,7 +6,7 @@ use std::{ }; mod controller; -pub(crate) use controller::EngineHooksController; +pub(crate) use controller::{EngineHooksController, PolledHook}; mod prune; pub use prune::PruneHook; @@ -88,8 +88,6 @@ impl EngineHookEvent { pub enum EngineHookAction { /// Notify about a [SyncState] update. UpdateSyncState(SyncState), - /// Connect blocks buffered during the hook execution to canonical hashes. - ConnectBufferedBlocks, } /// An error returned by [hook][`EngineHook`]. @@ -107,6 +105,7 @@ pub enum EngineHookError { } /// Level of database access the hook needs for execution. +#[derive(Debug, Copy, Clone)] pub enum EngineHookDBAccessLevel { /// Read-only database access. ReadOnly, diff --git a/crates/consensus/beacon/src/engine/hooks/prune.rs b/crates/consensus/beacon/src/engine/hooks/prune.rs index f720082ee3bc..f18c9eafdf6f 100644 --- a/crates/consensus/beacon/src/engine/hooks/prune.rs +++ b/crates/consensus/beacon/src/engine/hooks/prune.rs @@ -73,13 +73,7 @@ impl PruneHook { } }; - let action = if matches!(event, EngineHookEvent::Finished(Ok(_))) { - Some(EngineHookAction::ConnectBufferedBlocks) - } else { - None - }; - - Poll::Ready((event, action)) + Poll::Ready((event, None)) } /// This will try to spawn the pruner if it is idle: diff --git a/crates/consensus/beacon/src/engine/mod.rs b/crates/consensus/beacon/src/engine/mod.rs index cd772bf13253..f8d893d6b981 100644 --- a/crates/consensus/beacon/src/engine/mod.rs +++ b/crates/consensus/beacon/src/engine/mod.rs @@ -39,7 +39,7 @@ use reth_tasks::TaskSpawner; use std::{ pin::Pin, sync::Arc, - task::{ready, Context, Poll}, + task::{Context, Poll}, time::Instant, }; use tokio::sync::{ @@ -69,7 +69,7 @@ mod handle; pub use handle::BeaconConsensusEngineHandle; mod forkchoice; -use crate::hooks::EngineHooks; +use crate::hooks::{EngineHooks, PolledHook}; pub use forkchoice::ForkchoiceStatus; mod metrics; @@ -1683,21 +1683,24 @@ where None } - fn on_hook_action(&self, action: EngineHookAction) -> Result<(), BeaconConsensusEngineError> { - match action { - EngineHookAction::UpdateSyncState(state) => { - self.sync_state_updater.update_sync_state(state) - } - // TODO(alexey): always connect buffered blocks if hook had the - // `EngineHookDBAccessLevel::ReadWrite` - EngineHookAction::ConnectBufferedBlocks => { - if let Err(error) = self.blockchain.connect_buffered_blocks_to_canonical_hashes() { - error!(target: "consensus::engine", ?error, "Error restoring blockchain tree state"); - return Err(error.into()) + fn on_hook_result(&self, result: PolledHook) -> Result<(), BeaconConsensusEngineError> { + if let Some(action) = result.action { + match action { + EngineHookAction::UpdateSyncState(state) => { + self.sync_state_updater.update_sync_state(state) } } } + if result.event.is_finished() && result.db_access_level.is_read_write() { + // If the hook had read-write access to the database, + // it means that the engine may have accumulated some buffered blocks. + if let Err(error) = self.blockchain.connect_buffered_blocks_to_canonical_hashes() { + error!(target: "consensus::engine", ?error, "Error connecting buffered blocks to canonical hashes on hook result"); + return Err(error.into()) + } + } + Ok(()) } } @@ -1734,10 +1737,8 @@ where if let Poll::Ready(result) = this.hooks.poll_running_hook_with_db_write( cx, EngineContext { tip_block_number: this.blockchain.canonical_tip().number }, - ) { - if let Err(err) = this.on_hook_action(result?) { - return Poll::Ready(Err(err)) - } + )? { + this.on_hook_result(result)?; } // Process all incoming messages from the CL, these can affect the state of the @@ -1793,18 +1794,17 @@ where // 1. Engine and sync messages are fully drained (both pending) // 2. Latest FCU status is not INVALID if !this.forkchoice_state_tracker.is_latest_invalid() { - let action = ready!(this.hooks.poll_next_hook( + if let Poll::Ready(result) = this.hooks.poll_next_hook( cx, EngineContext { tip_block_number: this.blockchain.canonical_tip().number }, this.sync.is_pipeline_active(), - ))?; - if let Err(err) = this.on_hook_action(action) { - return Poll::Ready(Err(err)) - } + )? { + this.on_hook_result(result)?; - // ensure we're polling until pending while also checking for new engine messages - // before polling the next hook - continue 'main + // ensure we're polling until pending while also checking for new engine + // messages before polling the next hook + continue 'main + } } // incoming engine messages and sync events are drained, so we can yield back