Skip to content

Commit

Permalink
refactor(engine): always connect buffered blocks on r/w hook finish (p…
Browse files Browse the repository at this point in the history
  • Loading branch information
shekhirin committed Sep 20, 2023
1 parent afbe88f commit 6016da7
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 50 deletions.
38 changes: 23 additions & 15 deletions crates/consensus/beacon/src/engine/hooks/controller.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,20 @@
use crate::hooks::{EngineContext, EngineHook, EngineHookAction, EngineHookError, EngineHooks};
use crate::hooks::{
EngineContext, EngineHook, EngineHookAction, EngineHookDBAccessLevel, EngineHookError,
EngineHookEvent, EngineHooks,
};
use std::{
collections::VecDeque,
task::{Context, Poll},
};
use tracing::debug;

#[derive(Debug)]
pub(crate) struct PolledHook {
pub(crate) event: EngineHookEvent,
pub(crate) action: Option<EngineHookAction>,
pub(crate) db_access_level: EngineHookDBAccessLevel,
}

/// Manages hooks under the control of the engine.
///
/// This type polls the initialized hooks one by one, respecting the DB access level
Expand Down Expand Up @@ -41,28 +51,27 @@ impl EngineHooksController {
&mut self,
cx: &mut Context<'_>,
args: EngineContext,
) -> Poll<Result<EngineHookAction, EngineHookError>> {
) -> Poll<Result<PolledHook, EngineHookError>> {
let Some(mut hook) = self.running_hook_with_db_write.take() else { return Poll::Pending };

match hook.poll(cx, args) {
Poll::Ready((event, action)) => {
let result = PolledHook { event, action, db_access_level: hook.db_access_level() };

debug!(
target: "consensus::engine::hooks",
hook = hook.name(),
?action,
?event,
?result,
"Polled running hook with db write access"
);

if !event.is_finished() {
if !result.event.is_finished() {
self.running_hook_with_db_write = Some(hook);
} else {
self.hooks.push_back(hook);
}

if let Some(action) = action {
return Poll::Ready(Ok(action))
}
return Poll::Ready(Ok(result))
}
Poll::Pending => {
self.running_hook_with_db_write = Some(hook);
Expand All @@ -89,7 +98,7 @@ impl EngineHooksController {
cx: &mut Context<'_>,
args: EngineContext,
db_write_active: bool,
) -> Poll<Result<EngineHookAction, EngineHookError>> {
) -> Poll<Result<PolledHook, EngineHookError>> {
let Some(mut hook) = self.hooks.pop_front() else { return Poll::Pending };

// Hook with DB write access level is not allowed to run due to already running hook with DB
Expand All @@ -101,23 +110,22 @@ impl EngineHooksController {
}

if let Poll::Ready((event, action)) = hook.poll(cx, args) {
let result = PolledHook { event, action, db_access_level: hook.db_access_level() };

debug!(
target: "consensus::engine::hooks",
hook = hook.name(),
?action,
?event,
?result,
"Polled next hook"
);

if event.is_started() && hook.db_access_level().is_read_write() {
if result.event.is_started() && result.db_access_level.is_read_write() {
self.running_hook_with_db_write = Some(hook);
} else {
self.hooks.push_back(hook);
}

if let Some(action) = action {
return Poll::Ready(Ok(action))
}
return Poll::Ready(Ok(result))
} else {
self.hooks.push_back(hook);
}
Expand Down
5 changes: 2 additions & 3 deletions crates/consensus/beacon/src/engine/hooks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::{
};

mod controller;
pub(crate) use controller::EngineHooksController;
pub(crate) use controller::{EngineHooksController, PolledHook};

mod prune;
pub use prune::PruneHook;
Expand Down Expand Up @@ -88,8 +88,6 @@ impl EngineHookEvent {
pub enum EngineHookAction {
/// Notify about a [SyncState] update.
UpdateSyncState(SyncState),
/// Connect blocks buffered during the hook execution to canonical hashes.
ConnectBufferedBlocks,
}

/// An error returned by [hook][`EngineHook`].
Expand All @@ -107,6 +105,7 @@ pub enum EngineHookError {
}

/// Level of database access the hook needs for execution.
#[derive(Debug, Copy, Clone)]
pub enum EngineHookDBAccessLevel {
/// Read-only database access.
ReadOnly,
Expand Down
8 changes: 1 addition & 7 deletions crates/consensus/beacon/src/engine/hooks/prune.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,7 @@ impl<DB: Database + 'static> PruneHook<DB> {
}
};

let action = if matches!(event, EngineHookEvent::Finished(Ok(_))) {
Some(EngineHookAction::ConnectBufferedBlocks)
} else {
None
};

Poll::Ready((event, action))
Poll::Ready((event, None))
}

/// This will try to spawn the pruner if it is idle:
Expand Down
50 changes: 25 additions & 25 deletions crates/consensus/beacon/src/engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use reth_tasks::TaskSpawner;
use std::{
pin::Pin,
sync::Arc,
task::{ready, Context, Poll},
task::{Context, Poll},
time::Instant,
};
use tokio::sync::{
Expand Down Expand Up @@ -69,7 +69,7 @@ mod handle;
pub use handle::BeaconConsensusEngineHandle;

mod forkchoice;
use crate::hooks::EngineHooks;
use crate::hooks::{EngineHooks, PolledHook};
pub use forkchoice::ForkchoiceStatus;

mod metrics;
Expand Down Expand Up @@ -1683,21 +1683,24 @@ where
None
}

fn on_hook_action(&self, action: EngineHookAction) -> Result<(), BeaconConsensusEngineError> {
match action {
EngineHookAction::UpdateSyncState(state) => {
self.sync_state_updater.update_sync_state(state)
}
// TODO(alexey): always connect buffered blocks if hook had the
// `EngineHookDBAccessLevel::ReadWrite`
EngineHookAction::ConnectBufferedBlocks => {
if let Err(error) = self.blockchain.connect_buffered_blocks_to_canonical_hashes() {
error!(target: "consensus::engine", ?error, "Error restoring blockchain tree state");
return Err(error.into())
fn on_hook_result(&self, result: PolledHook) -> Result<(), BeaconConsensusEngineError> {
if let Some(action) = result.action {
match action {
EngineHookAction::UpdateSyncState(state) => {
self.sync_state_updater.update_sync_state(state)
}
}
}

if result.event.is_finished() && result.db_access_level.is_read_write() {
// If the hook had read-write access to the database,
// it means that the engine may have accumulated some buffered blocks.
if let Err(error) = self.blockchain.connect_buffered_blocks_to_canonical_hashes() {
error!(target: "consensus::engine", ?error, "Error connecting buffered blocks to canonical hashes on hook result");
return Err(error.into())
}
}

Ok(())
}
}
Expand Down Expand Up @@ -1734,10 +1737,8 @@ where
if let Poll::Ready(result) = this.hooks.poll_running_hook_with_db_write(
cx,
EngineContext { tip_block_number: this.blockchain.canonical_tip().number },
) {
if let Err(err) = this.on_hook_action(result?) {
return Poll::Ready(Err(err))
}
)? {
this.on_hook_result(result)?;
}

// Process all incoming messages from the CL, these can affect the state of the
Expand Down Expand Up @@ -1793,18 +1794,17 @@ where
// 1. Engine and sync messages are fully drained (both pending)
// 2. Latest FCU status is not INVALID
if !this.forkchoice_state_tracker.is_latest_invalid() {
let action = ready!(this.hooks.poll_next_hook(
if let Poll::Ready(result) = this.hooks.poll_next_hook(
cx,
EngineContext { tip_block_number: this.blockchain.canonical_tip().number },
this.sync.is_pipeline_active(),
))?;
if let Err(err) = this.on_hook_action(action) {
return Poll::Ready(Err(err))
}
)? {
this.on_hook_result(result)?;

// ensure we're polling until pending while also checking for new engine messages
// before polling the next hook
continue 'main
// ensure we're polling until pending while also checking for new engine
// messages before polling the next hook
continue 'main
}
}

// incoming engine messages and sync events are drained, so we can yield back
Expand Down

0 comments on commit 6016da7

Please sign in to comment.