diff --git a/examples/trusted-sync/src/cli.rs b/examples/trusted-sync/src/cli.rs index ed2dc495..5607d663 100644 --- a/examples/trusted-sync/src/cli.rs +++ b/examples/trusted-sync/src/cli.rs @@ -43,6 +43,17 @@ pub struct Cli { /// Start blocks from tip. #[clap(long, help = "Number of blocks prior to tip to start from")] pub start_blocks_from_tip: Option, + /// Enable walking back if re-orgs occur and the pipeline gets stuck. + #[clap(long, help = "Enable walking back if re-orgs occur and the pipeline gets stuck")] + pub enable_reorg_walkback: bool, + /// Parameterized threshold amount of blocks to drift from before fast-forwarding or walking + /// back. + #[clap( + long, + default_value_t = 500, + help = "Parameterized threshold amount of blocks to drift from before fast-forwarding or walking back" + )] + pub drift_threshold: u64, } impl Cli { diff --git a/examples/trusted-sync/src/main.rs b/examples/trusted-sync/src/main.rs index dac7ff90..f8668b72 100644 --- a/examples/trusted-sync/src/main.rs +++ b/examples/trusted-sync/src/main.rs @@ -66,7 +66,7 @@ async fn sync(cli: cli::Cli) -> Result<()> { info!(target: LOG_TARGET, "Starting {} blocks from tip at L2 block number: {}", blocks, start); } metrics::START_L2_BLOCK.inc_by(start); - println!("Starting from L2 block number: {}", metrics::START_L2_BLOCK.get()); + info!(target: LOG_TARGET, "Starting from L2 block number: {}", metrics::START_L2_BLOCK.get()); let mut l2_provider = AlloyL2ChainProvider::new_http(l2_rpc_url.clone(), cfg.clone()); let attributes = @@ -85,12 +85,15 @@ async fn sync(cli: cli::Cli) -> Result<()> { .await .expect("Failed to fetch genesis L1 block info for pipeline tip"); let validator = validation::OnlineValidator::new_http(l2_rpc_url.clone(), &cfg); + let genesis_l2_block_number = cfg.genesis.l2.number; let mut pipeline = new_online_pipeline(cfg, l1_provider, dap, l2_provider.clone(), attributes, tip); - // Reset the failed payload derivation metric to 0 so it can be queried. + // Reset metrics so they can be queried. metrics::FAILED_PAYLOAD_DERIVATION.reset(); metrics::DRIFT_WALKBACK.set(0); + metrics::DRIFT_WALKBACK_TIMESTAMP.set(0); + metrics::DERIVED_ATTRIBUTES_COUNT.reset(); // Continuously step on the pipeline and validate payloads. let mut advance_cursor_flag = false; @@ -100,44 +103,65 @@ async fn sync(cli: cli::Cli) -> Result<()> { Ok(latest) => { let prev = metrics::REFERENCE_L2_HEAD.get(); metrics::REFERENCE_L2_HEAD.set(latest as i64); - - // Check if we have drift - walk back in case of a re-org. - let drift = latest as i64 - cursor.block_info.number as i64; - if drift > 500 && cursor.block_info.number as i64 > metrics::DRIFT_WALKBACK.get() { - metrics::DRIFT_WALKBACK.set(cursor.block_info.number as i64); - warn!(target: LOG_TARGET, "Detected drift of over {} blocks, walking back", drift); - cursor = - l2_provider.l2_block_info_by_number(cursor.block_info.number - 10).await?; - advance_cursor_flag = false; - if let Err(e) = pipeline.reset(cursor.block_info).await { - error!(target: LOG_TARGET, "Failed to reset pipeline: {:?}", e); + let timestamp = match std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .map(|s| s.as_secs()) + { + Ok(time) => time, + Err(e) => { + error!(target: LOG_TARGET, "Failed to get latest timestamp in seconds: {:?}", e); + continue; } - let timestamp = match std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .map(|s| s.as_secs()) - { - Ok(time) => time, - Err(e) => { - error!(target: LOG_TARGET, "Failed to get latest timestamp in seconds: {:?}", e); - continue; - } - }; - metrics::DRIFT_WALKBACK_TIMESTAMP.set(timestamp as i64); - } + }; // Update the timestamp if latest as i64 > prev { - let time = match std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .map(|s| s.as_secs()) + metrics::LATEST_REF_SAFE_HEAD_UPDATE.set(timestamp as i64); + } + + // Don't check drift if we're within 10 blocks of origin. + if cursor.block_info.number - genesis_l2_block_number <= 10 { + warn!(target: LOG_TARGET, "Can't walk back further. Cursor: {}, Genesis: {}", cursor.block_info.number, genesis_l2_block_number); + } else { + // Check if we have drift - walk back in case of a re-org. + // Wait for at least 500 drift and 5 minutes since the last walkback. + let drift = latest as i64 - cursor.block_info.number as i64; + + // If walkback isn't enabled, jump to 10 blocks less than the reference l2 + // head. + if drift > cli.drift_threshold as i64 && !cli.enable_reorg_walkback { + cursor = if let Ok(c) = + l2_provider.l2_block_info_by_number(latest - 10).await + { + c + } else { + error!(target: LOG_TARGET, "Failed to get walkback block info by number: {}", latest - 10); + continue; + }; + advance_cursor_flag = false; + if let Err(e) = pipeline.reset(cursor.block_info).await { + error!(target: LOG_TARGET, "Failed to reset pipeline: {:?}", e); + } + } else if drift > cli.drift_threshold as i64 && + timestamp as i64 > metrics::DRIFT_WALKBACK_TIMESTAMP.get() + 300 { - Ok(time) => time, - Err(e) => { - error!(target: LOG_TARGET, "Failed to get latest timestamp in seconds: {:?}", e); + metrics::DRIFT_WALKBACK.set(cursor.block_info.number as i64); + warn!(target: LOG_TARGET, "Detected drift of over {} blocks, walking back", drift); + cursor = if let Ok(c) = + l2_provider.l2_block_info_by_number(cursor.block_info.number - 10).await + { + c + } else { + error!(target: LOG_TARGET, "Failed to get walkback block info by number: {}", cursor.block_info.number - 10); continue; + }; + advance_cursor_flag = false; + if let Err(e) = pipeline.reset(cursor.block_info).await { + error!(target: LOG_TARGET, "Failed to reset pipeline: {:?}", e); } - }; - metrics::LATEST_REF_SAFE_HEAD_UPDATE.set(time as i64); + + metrics::DRIFT_WALKBACK_TIMESTAMP.set(timestamp as i64); + } } } Err(e) => {