diff --git a/examples/trusted-sync/src/main.rs b/examples/trusted-sync/src/main.rs index 09468559..8f46eecd 100644 --- a/examples/trusted-sync/src/main.rs +++ b/examples/trusted-sync/src/main.rs @@ -1,6 +1,6 @@ use anyhow::Result; use clap::Parser; -use kona_derive::online::*; +use kona_derive::{online::*, types::StageError}; use std::sync::Arc; use tracing::{debug, error, info, warn}; @@ -90,6 +90,7 @@ async fn sync(cli: cli::Cli) -> Result<()> { // Reset the failed payload derivation metric to 0 so it can be queried. metrics::FAILED_PAYLOAD_DERIVATION.reset(); + metrics::DRIFT_WALKBACK.set(0); // Continuously step on the pipeline and validate payloads. let mut advance_cursor_flag = false; @@ -99,6 +100,29 @@ async fn sync(cli: cli::Cli) -> Result<()> { Ok(latest) => { let prev = metrics::REFERENCE_L2_HEAD.get(); metrics::REFERENCE_L2_HEAD.set(latest as i64); + + // Check if we have drift - walk back in case of a re-org. + let drift = latest as i64 - cursor.block_info.number as i64; + if drift > 200 && cursor.block_info.number as i64 > metrics::DRIFT_WALKBACK.get() { + metrics::DRIFT_WALKBACK.set(cursor.block_info.number as i64); + warn!(target: LOG_TARGET, "Detected drift of over {} blocks, walking back", drift); + cursor = + l2_provider.l2_block_info_by_number(cursor.block_info.number - 10).await?; + advance_cursor_flag = false; + let timestamp = match std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .map(|s| s.as_secs()) + { + Ok(time) => time, + Err(e) => { + error!(target: LOG_TARGET, "Failed to get latest timestamp in seconds: {:?}", e); + continue; + } + }; + metrics::DRIFT_WALKBACK_TIMESTAMP.set(timestamp as i64); + } + + // Update the timestamp if latest as i64 > prev { let time = match std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) @@ -132,8 +156,7 @@ async fn sync(cli: cli::Cli) -> Result<()> { } } } - info!(target: LOG_TARGET, "Validated payload attributes number {}", metrics::DERIVED_ATTRIBUTES_COUNT.get()); - info!(target: LOG_TARGET, "Pending l2 safe head num: {}", cursor.block_info.number); + info!(target: LOG_TARGET, "Stepping on cursor block number: {}", cursor.block_info.number); match pipeline.step(cursor).await { StepResult::PreparedAttributes => { metrics::PIPELINE_STEPS.with_label_values(&["success"]).inc(); @@ -147,10 +170,16 @@ async fn sync(cli: cli::Cli) -> Result<()> { metrics::PIPELINE_STEPS.with_label_values(&["origin_advance_failure"]).inc(); warn!(target: "loop", "Could not advance origin: {:?}", e); } - StepResult::StepFailed(e) => { - metrics::PIPELINE_STEPS.with_label_values(&["failure"]).inc(); - error!(target: "loop", "Error stepping derivation pipeline: {:?}", e); - } + StepResult::StepFailed(e) => match e { + StageError::NotEnoughData => { + metrics::PIPELINE_STEPS.with_label_values(&["not_enough_data"]).inc(); + info!(target: "loop", "Not enough data to step derivation pipeline"); + } + _ => { + metrics::PIPELINE_STEPS.with_label_values(&["failure"]).inc(); + error!(target: "loop", "Error stepping derivation pipeline: {:?}", e); + } + }, } // Peek at the next prepared attributes and validate them. diff --git a/examples/trusted-sync/src/metrics.rs b/examples/trusted-sync/src/metrics.rs index c93db32c..1cbb157c 100644 --- a/examples/trusted-sync/src/metrics.rs +++ b/examples/trusted-sync/src/metrics.rs @@ -44,6 +44,14 @@ lazy_static! { pub static ref REFERENCE_L2_HEAD: IntGauge = register_int_gauge!("trusted_sync_reference_l2_head", "Reference L2 head").expect("Failed to register reference L2 head metric"); + /// Tracks the block number when a drift walkback last happened. + pub static ref DRIFT_WALKBACK: IntGauge = + register_int_gauge!("trusted_sync_drift_walkback", "Latest drift walkback").expect("Failed to register drift walkback metric"); + + /// Tracks the timestamp of the last drift walkback. + pub static ref DRIFT_WALKBACK_TIMESTAMP: IntGauge = + register_int_gauge!("trusted_sync_drift_walkback_timestamp", "Timestamp of the last drift walkback").expect("Failed to register drift walkback timestamp metric"); + /// Tracks the latest reference l2 safe head update. pub static ref LATEST_REF_SAFE_HEAD_UPDATE: IntGauge = register_int_gauge!( "trusted_sync_latest_ref_safe_head_update",