Skip to content

Commit

Permalink
fix: Drift Walkback (#382)
Browse files Browse the repository at this point in the history
* fix: make walkback time-based

* fix: drift work

* disable walkback by default

* parameterize drift threshold
  • Loading branch information
refcell authored Jul 17, 2024
1 parent 62193d5 commit 64177b3
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 33 deletions.
11 changes: 11 additions & 0 deletions examples/trusted-sync/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,17 @@ pub struct Cli {
/// Start blocks from tip.
#[clap(long, help = "Number of blocks prior to tip to start from")]
pub start_blocks_from_tip: Option<u64>,
/// Enable walking back if re-orgs occur and the pipeline gets stuck.
#[clap(long, help = "Enable walking back if re-orgs occur and the pipeline gets stuck")]
pub enable_reorg_walkback: bool,
/// Parameterized threshold amount of blocks to drift from before fast-forwarding or walking
/// back.
#[clap(
long,
default_value_t = 500,
help = "Parameterized threshold amount of blocks to drift from before fast-forwarding or walking back"
)]
pub drift_threshold: u64,
}

impl Cli {
Expand Down
90 changes: 57 additions & 33 deletions examples/trusted-sync/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ async fn sync(cli: cli::Cli) -> Result<()> {
info!(target: LOG_TARGET, "Starting {} blocks from tip at L2 block number: {}", blocks, start);
}
metrics::START_L2_BLOCK.inc_by(start);
println!("Starting from L2 block number: {}", metrics::START_L2_BLOCK.get());
info!(target: LOG_TARGET, "Starting from L2 block number: {}", metrics::START_L2_BLOCK.get());

let mut l2_provider = AlloyL2ChainProvider::new_http(l2_rpc_url.clone(), cfg.clone());
let attributes =
Expand All @@ -85,12 +85,15 @@ async fn sync(cli: cli::Cli) -> Result<()> {
.await
.expect("Failed to fetch genesis L1 block info for pipeline tip");
let validator = validation::OnlineValidator::new_http(l2_rpc_url.clone(), &cfg);
let genesis_l2_block_number = cfg.genesis.l2.number;
let mut pipeline =
new_online_pipeline(cfg, l1_provider, dap, l2_provider.clone(), attributes, tip);

// Reset the failed payload derivation metric to 0 so it can be queried.
// Reset metrics so they can be queried.
metrics::FAILED_PAYLOAD_DERIVATION.reset();
metrics::DRIFT_WALKBACK.set(0);
metrics::DRIFT_WALKBACK_TIMESTAMP.set(0);
metrics::DERIVED_ATTRIBUTES_COUNT.reset();

// Continuously step on the pipeline and validate payloads.
let mut advance_cursor_flag = false;
Expand All @@ -100,44 +103,65 @@ async fn sync(cli: cli::Cli) -> Result<()> {
Ok(latest) => {
let prev = metrics::REFERENCE_L2_HEAD.get();
metrics::REFERENCE_L2_HEAD.set(latest as i64);

// Check if we have drift - walk back in case of a re-org.
let drift = latest as i64 - cursor.block_info.number as i64;
if drift > 500 && cursor.block_info.number as i64 > metrics::DRIFT_WALKBACK.get() {
metrics::DRIFT_WALKBACK.set(cursor.block_info.number as i64);
warn!(target: LOG_TARGET, "Detected drift of over {} blocks, walking back", drift);
cursor =
l2_provider.l2_block_info_by_number(cursor.block_info.number - 10).await?;
advance_cursor_flag = false;
if let Err(e) = pipeline.reset(cursor.block_info).await {
error!(target: LOG_TARGET, "Failed to reset pipeline: {:?}", e);
let timestamp = match std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|s| s.as_secs())
{
Ok(time) => time,
Err(e) => {
error!(target: LOG_TARGET, "Failed to get latest timestamp in seconds: {:?}", e);
continue;
}
let timestamp = match std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|s| s.as_secs())
{
Ok(time) => time,
Err(e) => {
error!(target: LOG_TARGET, "Failed to get latest timestamp in seconds: {:?}", e);
continue;
}
};
metrics::DRIFT_WALKBACK_TIMESTAMP.set(timestamp as i64);
}
};

// Update the timestamp
if latest as i64 > prev {
let time = match std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|s| s.as_secs())
metrics::LATEST_REF_SAFE_HEAD_UPDATE.set(timestamp as i64);
}

// Don't check drift if we're within 10 blocks of origin.
if cursor.block_info.number - genesis_l2_block_number <= 10 {
warn!(target: LOG_TARGET, "Can't walk back further. Cursor: {}, Genesis: {}", cursor.block_info.number, genesis_l2_block_number);
} else {
// Check if we have drift - walk back in case of a re-org.
// Wait for at least 500 drift and 5 minutes since the last walkback.
let drift = latest as i64 - cursor.block_info.number as i64;

// If walkback isn't enabled, jump to 10 blocks less than the reference l2
// head.
if drift > cli.drift_threshold as i64 && !cli.enable_reorg_walkback {
cursor = if let Ok(c) =
l2_provider.l2_block_info_by_number(latest - 10).await
{
c
} else {
error!(target: LOG_TARGET, "Failed to get walkback block info by number: {}", latest - 10);
continue;
};
advance_cursor_flag = false;
if let Err(e) = pipeline.reset(cursor.block_info).await {
error!(target: LOG_TARGET, "Failed to reset pipeline: {:?}", e);
}
} else if drift > cli.drift_threshold as i64 &&
timestamp as i64 > metrics::DRIFT_WALKBACK_TIMESTAMP.get() + 300
{
Ok(time) => time,
Err(e) => {
error!(target: LOG_TARGET, "Failed to get latest timestamp in seconds: {:?}", e);
metrics::DRIFT_WALKBACK.set(cursor.block_info.number as i64);
warn!(target: LOG_TARGET, "Detected drift of over {} blocks, walking back", drift);
cursor = if let Ok(c) =
l2_provider.l2_block_info_by_number(cursor.block_info.number - 10).await
{
c
} else {
error!(target: LOG_TARGET, "Failed to get walkback block info by number: {}", cursor.block_info.number - 10);
continue;
};
advance_cursor_flag = false;
if let Err(e) = pipeline.reset(cursor.block_info).await {
error!(target: LOG_TARGET, "Failed to reset pipeline: {:?}", e);
}
};
metrics::LATEST_REF_SAFE_HEAD_UPDATE.set(time as i64);

metrics::DRIFT_WALKBACK_TIMESTAMP.set(timestamp as i64);
}
}
}
Err(e) => {
Expand Down

0 comments on commit 64177b3

Please sign in to comment.