From 4ef271b4dbf1ef673e228b2070ce95930e36b280 Mon Sep 17 00:00:00 2001 From: nicolas <48695862+merklefruit@users.noreply.github.com> Date: Thu, 1 Aug 2024 21:49:11 +0200 Subject: [PATCH] feat(derive+trusted-sync): online blob provider with fallback (#410) * feat(derive): online blob provider with fallback * chore: added test * fix: compute slot * chore: doclint * feat: use fallback blob provider in trusted sync * feat: online blob provider builder * chore: import string * chore: update trusted sync binary * chore: update blob provider builder method names, make archiver optional * fix: unbork doclint --- crates/derive/src/online/blob_provider.rs | 375 +++++++++++++++++++++- crates/derive/src/online/mod.rs | 5 +- crates/derive/src/online/pipeline.rs | 6 +- examples/trusted-sync/src/cli.rs | 10 + examples/trusted-sync/src/main.rs | 7 +- 5 files changed, 394 insertions(+), 9 deletions(-) diff --git a/crates/derive/src/online/blob_provider.rs b/crates/derive/src/online/blob_provider.rs index eb725edc..96eb502e 100644 --- a/crates/derive/src/online/blob_provider.rs +++ b/crates/derive/src/online/blob_provider.rs @@ -1,14 +1,15 @@ //! Contains an online implementation of the [BlobProvider] trait. use crate::{ - online::BeaconClient, + online::{BeaconClient, OnlineBeaconClient}, traits::BlobProvider, types::{APIBlobSidecar, Blob, BlobProviderError, BlobSidecar, BlockInfo, IndexedBlobHash}, }; -use alloc::{boxed::Box, vec::Vec}; +use alloc::{boxed::Box, string::String, vec::Vec}; use anyhow::{anyhow, ensure}; use async_trait::async_trait; use core::marker::PhantomData; +use tracing::warn; /// Specifies the derivation of a slot from a timestamp. pub trait SlotDerivation { @@ -176,6 +177,280 @@ where } } +/// The minimal interface required to fetch sidecars from a remote blob store. +#[async_trait] +pub trait BlobSidecarProvider { + /// Fetches blob sidecars that were confirmed in the specified L1 block with the given indexed + /// hashes. Order of the returned sidecars is guaranteed to be that of the hashes. Blob data is + /// not checked for validity. + /// + /// Consensus specs: + async fn beacon_blob_side_cars( + &self, + slot: u64, + hashes: &[IndexedBlobHash], + ) -> anyhow::Result>; +} + +/// Blanket implementation of the [BlobSidecarProvider] trait for all types that +/// implemend [BeaconClient], which has a superset of the required functionality. +#[async_trait] +impl BlobSidecarProvider for B { + async fn beacon_blob_side_cars( + &self, + slot: u64, + hashes: &[IndexedBlobHash], + ) -> anyhow::Result> { + self.beacon_blob_side_cars(slot, hashes).await + } +} + +/// An online blob provider that optionally falls back to a secondary provider if the +/// primary fails to fetch blob sidecars. +/// +/// This is useful for scenarios where blobs have been evicted from the primary provider's +/// blob store and need to be fetched from a remote archive API. The default eviction +/// policy on Ethereum is to keep blobs for 18 days. +/// +/// Blob storage APIs are expected to implement the [BlobSidecarProvider] trait. +/// One example can be found at +#[derive(Debug, Clone)] +pub struct OnlineBlobProviderWithFallback< + B: BeaconClient, + F: BlobSidecarProvider, + S: SlotDerivation, +> { + primary: OnlineBlobProvider, + fallback: Option, + _slot_derivation: PhantomData, +} + +impl + OnlineBlobProviderWithFallback +{ + /// Creates a new instance of the [OnlineBlobProviderWithFallback] with the + /// specified primary and fallback providers. + pub fn new(primary: OnlineBlobProvider, fallback: Option) -> Self { + Self { primary, fallback, _slot_derivation: PhantomData } + } + + /// Attempts to fetch blob sidecars from the fallback provider, if configured. + /// Calling this method without a fallback provider will return an error. + async fn fallback_fetch_filtered_sidecars( + &self, + block_ref: &BlockInfo, + blob_hashes: &[IndexedBlobHash], + ) -> Result, BlobProviderError> { + let Some(fallback) = self.fallback.as_ref() else { + return Err(BlobProviderError::Custom(anyhow::anyhow!( + "cannot fetch blobs: the primary blob provider failed, and no fallback is configured" + ))); + }; + + if blob_hashes.is_empty() { + return Ok(Vec::new()); + } + + // Extract the genesis timestamp and slot interval from the primary provider. + let slot = S::slot( + self.primary.genesis_time.expect("Genesis Config Loaded"), + self.primary.slot_interval.expect("Config Spec Loaded"), + block_ref.timestamp, + ) + .map_err(BlobProviderError::Slot)?; + + // Fetch blob sidecars for the given block reference and blob hashes. + let sidecars = fallback.beacon_blob_side_cars(slot, blob_hashes).await?; + + // Filter blob sidecars that match the indicies in the specified list. + let blob_hash_indicies = blob_hashes.iter().map(|b| b.index).collect::>(); + let filtered = sidecars + .into_iter() + .filter(|s| blob_hash_indicies.contains(&(s.inner.index as usize))) + .collect::>(); + + // Validate the correct number of blob sidecars were retrieved. + if blob_hashes.len() != filtered.len() { + return Err(BlobProviderError::SidecarLengthMismatch(blob_hashes.len(), filtered.len())); + } + + Ok(filtered.into_iter().map(|s| s.inner).collect::>()) + } +} + +#[async_trait] +impl BlobProvider for OnlineBlobProviderWithFallback +where + B: BeaconClient + Send + Sync, + F: BlobSidecarProvider + Send + Sync, + S: SlotDerivation + Send + Sync, +{ + /// Fetches blob sidecars that were confirmed in the specified L1 block with the given indexed + /// hashes. The blobs are validated for their index and hashes using the specified + /// [IndexedBlobHash]. + async fn get_blobs( + &mut self, + block_ref: &BlockInfo, + blob_hashes: &[IndexedBlobHash], + ) -> Result, BlobProviderError> { + match self.primary.get_blobs(block_ref, blob_hashes).await { + Ok(blobs) => Ok(blobs), + Err(primary_err) => { + crate::inc!(PROVIDER_ERRORS, &["blob_provider", "get_blobs", "primary"]); + warn!(target: "blob_provider", "Primary provider failed: {:?}", primary_err); + + // Fetch the blob sidecars for the given block reference and blob hashes. + let sidecars = + match self.fallback_fetch_filtered_sidecars(block_ref, blob_hashes).await { + Ok(sidecars) => sidecars, + Err(e) => { + warn!(target: "blob_provider", "Fallback provider failed: {:?}", e); + crate::inc!( + PROVIDER_ERRORS, + &["blob_provider", "get_blobs", "fallback_fetch_filtered_sidecars"] + ); + return Err(e); + } + }; + + // Validate the blob sidecars straight away with the `IndexedBlobHash`es. + let blobs = match sidecars + .into_iter() + .enumerate() + .map(|(i, sidecar)| { + let hash = blob_hashes + .get(i) + .ok_or(anyhow!("fallback: failed to get blob hash"))?; + match sidecar.verify_blob(hash) { + Ok(_) => Ok(sidecar.blob), + Err(e) => Err(e), + } + }) + .collect::>>() + { + Ok(blobs) => blobs, + Err(e) => { + crate::inc!( + PROVIDER_ERRORS, + &["blob_provider", "get_blobs", "fallback_verify_blob"] + ); + return Err(BlobProviderError::Custom(e)); + } + }; + + Ok(blobs) + } + } + } +} + +/// A builder for a [OnlineBlobProviderWithFallback] instance. +/// +/// This builder allows for the construction of a blob provider that +/// uses a primary beacon node and can fallback to a secondary [BlobSidecarProvider] +/// if the primary fails to fetch blob sidecars. +/// +/// The fallback provider is optional and can be set using the [Self::with_fallback] method. +/// +/// Two convenience methods are available for initializing the providers from beacon client URLs: +/// - [Self::with_primary] for the primary beacon client. +/// - [Self::with_fallback] for the fallback beacon client. +#[derive(Debug, Clone)] +pub struct OnlineBlobProviderBuilder { + beacon_client: Option, + fallback: Option, + genesis_time: Option, + slot_interval: Option, + _slot_derivation: PhantomData, +} + +impl Default + for OnlineBlobProviderBuilder +{ + fn default() -> Self { + Self { + beacon_client: None, + fallback: None, + genesis_time: None, + slot_interval: None, + _slot_derivation: PhantomData, + } + } +} + +impl + OnlineBlobProviderBuilder +{ + /// Creates a new [OnlineBlobProviderBuilder]. + pub fn new() -> Self { + Self::default() + } + + /// Adds a primary beacon client to the builder. This is required. + pub fn with_beacon_client(mut self, beacon_client: B) -> Self { + self.beacon_client = Some(beacon_client); + self + } + + /// Adds a genesis time to the builder. This is optional. + pub fn with_genesis_time(mut self, genesis_time: u64) -> Self { + self.genesis_time = Some(genesis_time); + self + } + + /// Adds a slot interval to the builder. This is optional. + pub fn with_slot_interval(mut self, slot_interval: u64) -> Self { + self.slot_interval = Some(slot_interval); + self + } + + /// Adds a fallback blob provider to the builder. This is optional. + pub fn with_fallback_provider(mut self, fallback: F) -> Self { + self.fallback = Some(fallback); + self + } + + /// Builds the [OnlineBlobProviderWithFallback] instance. + pub fn build(self) -> OnlineBlobProviderWithFallback { + self.into() + } +} + +impl + OnlineBlobProviderBuilder +{ + /// Adds a primary [OnlineBeaconClient] to the builder using the specified HTTP URL. + pub fn with_primary(mut self, url: String) -> Self { + self.beacon_client = Some(OnlineBeaconClient::new_http(url)); + self + } +} + +impl + OnlineBlobProviderBuilder +{ + /// Adds a fallback [OnlineBeaconClient] to the builder using the specified HTTP URL. + pub fn with_fallback(mut self, maybe_url: Option) -> Self { + self.fallback = maybe_url.map(OnlineBeaconClient::new_http); + self + } +} + +impl + From> for OnlineBlobProviderWithFallback +{ + fn from(builder: OnlineBlobProviderBuilder) -> Self { + Self::new( + OnlineBlobProvider::new( + builder.beacon_client.expect("Primary beacon client must be set"), + builder.genesis_time, + builder.slot_interval, + ), + builder.fallback, + ) + } +} + #[cfg(test)] mod tests { use super::*; @@ -432,4 +707,100 @@ mod tests { BlobProviderError::Custom(anyhow::anyhow!("blob at index 0 failed verification")) ); } + + #[tokio::test] + async fn test_get_blob_fallback() { + let json_bytes = include_bytes!("testdata/eth_v1_beacon_sidecars_goerli.json"); + let sidecars: APIGetBlobSidecarsResponse = serde_json::from_slice(json_bytes).unwrap(); + + // Provide no sidecars to the primary provider to trigger a fallback fetch + let beacon_client = MockBeaconClient { + beacon_genesis: Some(APIGenesisResponse::new(10)), + config_spec: Some(APIConfigResponse::new(12)), + blob_sidecars: None, + ..Default::default() + }; + let fallback_client = + MockBeaconClient { blob_sidecars: Some(sidecars), ..Default::default() }; + let mut blob_provider: OnlineBlobProviderWithFallback<_, _, SimpleSlotDerivation> = + OnlineBlobProviderWithFallback::new( + OnlineBlobProvider::new(beacon_client, None, None), + Some(fallback_client), + ); + let block_ref = BlockInfo { timestamp: 15, ..Default::default() }; + let blob_hashes = vec![ + IndexedBlobHash { + index: 0, + hash: b256!("011075cbb20f3235b3179a5dff22689c410cd091692180f4b6a12be77ea0f586"), + }, + IndexedBlobHash { + index: 1, + hash: b256!("010a9e10aab79bab62e10a5b83c164a91451b6ef56d31ac95a9514ffe6d6b4e6"), + }, + IndexedBlobHash { + index: 2, + hash: b256!("016122c8e41c69917b688240707d107aa6d2a480343e4e323e564241769a6b4a"), + }, + IndexedBlobHash { + index: 3, + hash: b256!("01df1f9ae707f5847513c9c430b683182079edf2b1f94ee12e4daae7f3c8c309"), + }, + IndexedBlobHash { + index: 4, + hash: b256!("01e5ee2f6cbbafb3c03f05f340e795fe5b5a8edbcc9ac3fc7bd3d1940b99ef3c"), + }, + ]; + let blobs = blob_provider.get_blobs(&block_ref, &blob_hashes).await.unwrap(); + assert_eq!(blobs.len(), 5); + } + + #[tokio::test] + async fn test_get_blobs_fallback_partial_sidecar() { + let json_bytes = include_bytes!("testdata/eth_v1_beacon_sidecars_goerli.json"); + let all_sidecars: APIGetBlobSidecarsResponse = serde_json::from_slice(json_bytes).unwrap(); + + let online_sidecars = APIGetBlobSidecarsResponse { + // Remove some sidecars from the online provider to trigger a fallback fetch + data: all_sidecars.data.clone().into_iter().take(2).collect::>(), + }; + + let beacon_client = MockBeaconClient { + beacon_genesis: Some(APIGenesisResponse::new(10)), + config_spec: Some(APIConfigResponse::new(12)), + blob_sidecars: Some(online_sidecars), + ..Default::default() + }; + let fallback_client = + MockBeaconClient { blob_sidecars: Some(all_sidecars), ..Default::default() }; + let mut blob_provider: OnlineBlobProviderWithFallback<_, _, SimpleSlotDerivation> = + OnlineBlobProviderWithFallback::new( + OnlineBlobProvider::new(beacon_client, None, None), + Some(fallback_client), + ); + let block_ref = BlockInfo { timestamp: 15, ..Default::default() }; + let blob_hashes = vec![ + IndexedBlobHash { + index: 0, + hash: b256!("011075cbb20f3235b3179a5dff22689c410cd091692180f4b6a12be77ea0f586"), + }, + IndexedBlobHash { + index: 1, + hash: b256!("010a9e10aab79bab62e10a5b83c164a91451b6ef56d31ac95a9514ffe6d6b4e6"), + }, + IndexedBlobHash { + index: 2, + hash: b256!("016122c8e41c69917b688240707d107aa6d2a480343e4e323e564241769a6b4a"), + }, + IndexedBlobHash { + index: 3, + hash: b256!("01df1f9ae707f5847513c9c430b683182079edf2b1f94ee12e4daae7f3c8c309"), + }, + IndexedBlobHash { + index: 4, + hash: b256!("01e5ee2f6cbbafb3c03f05f340e795fe5b5a8edbcc9ac3fc7bd3d1940b99ef3c"), + }, + ]; + let blobs = blob_provider.get_blobs(&block_ref, &blob_hashes).await.unwrap(); + assert_eq!(blobs.len(), 5); + } } diff --git a/crates/derive/src/online/mod.rs b/crates/derive/src/online/mod.rs index a550b4f1..37c53e3b 100644 --- a/crates/derive/src/online/mod.rs +++ b/crates/derive/src/online/mod.rs @@ -22,7 +22,10 @@ mod alloy_providers; pub use alloy_providers::{AlloyChainProvider, AlloyL2ChainProvider}; mod blob_provider; -pub use blob_provider::{OnlineBlobProvider, SimpleSlotDerivation}; +pub use blob_provider::{ + BlobSidecarProvider, OnlineBlobProvider, OnlineBlobProviderBuilder, + OnlineBlobProviderWithFallback, SimpleSlotDerivation, +}; #[cfg(any(test, feature = "test-utils"))] pub mod test_utils; diff --git a/crates/derive/src/online/pipeline.rs b/crates/derive/src/online/pipeline.rs index caa4a7a7..05aa11bb 100644 --- a/crates/derive/src/online/pipeline.rs +++ b/crates/derive/src/online/pipeline.rs @@ -2,8 +2,8 @@ use super::{ AlloyChainProvider, AlloyL2ChainProvider, BlockInfo, DerivationPipeline, EthereumDataSource, - OnlineBeaconClient, OnlineBlobProvider, PipelineBuilder, RollupConfig, SimpleSlotDerivation, - StatefulAttributesBuilder, + OnlineBeaconClient, OnlineBlobProviderWithFallback, PipelineBuilder, RollupConfig, + SimpleSlotDerivation, StatefulAttributesBuilder, }; use alloc::sync::Arc; // Pipeline internal stages aren't re-exported at the module-level. @@ -18,7 +18,7 @@ pub type OnlinePipeline = /// An `online` Ethereum data source. pub type OnlineDataProvider = EthereumDataSource< AlloyChainProvider, - OnlineBlobProvider, + OnlineBlobProviderWithFallback, >; /// An `online` payload attributes builder for the `AttributesQueue` stage of the derivation diff --git a/examples/trusted-sync/src/cli.rs b/examples/trusted-sync/src/cli.rs index 94fb0904..7131f52a 100644 --- a/examples/trusted-sync/src/cli.rs +++ b/examples/trusted-sync/src/cli.rs @@ -7,6 +7,7 @@ use reqwest::Url; const L1_RPC_URL: &str = "L1_RPC_URL"; const L2_RPC_URL: &str = "L2_RPC_URL"; const BEACON_URL: &str = "BEACON_URL"; +const BLOB_ARCHIVER_URL: &str = "BLOB_ARCHIVER_URL"; const METRICS_URL: &str = "METRICS_URL"; const DEFAULT_METRICS_SERVER_ADDR: &str = "0.0.0.0"; const DEFAULT_METRICS_SERVER_PORT: u16 = 9000; @@ -28,6 +29,9 @@ pub struct Cli { /// The Beacon URL #[clap(long, short)] pub beacon_url: Option, + /// The Blob Archiver URL + #[clap(long, short = 'B')] + pub blob_archiver_url: Option, /// The l2 block to start from. #[clap(long, short, help = "Starting l2 block, defaults to chain genesis if none specified")] pub start_l2_block: Option, @@ -112,4 +116,10 @@ impl Cli { std::env::var(BEACON_URL).map_err(|e| anyhow!(e))? }) } + + /// Returns the blob archiver url from CLI or environment variable. + /// If neither is set, returns None. + pub fn blob_archiver_url(&self) -> Option { + self.blob_archiver_url.clone().or_else(|| std::env::var(BLOB_ARCHIVER_URL).ok()) + } } diff --git a/examples/trusted-sync/src/main.rs b/examples/trusted-sync/src/main.rs index 096d75a2..5f936e7c 100644 --- a/examples/trusted-sync/src/main.rs +++ b/examples/trusted-sync/src/main.rs @@ -71,9 +71,10 @@ async fn sync(cli: cli::Cli) -> Result<()> { let mut l2_provider = AlloyL2ChainProvider::new_http(l2_rpc_url.clone(), cfg.clone()); let attributes = StatefulAttributesBuilder::new(cfg.clone(), l2_provider.clone(), l1_provider.clone()); - let beacon_client = OnlineBeaconClient::new_http(beacon_url); - let blob_provider = - OnlineBlobProvider::<_, SimpleSlotDerivation>::new(beacon_client, None, None); + let blob_provider = OnlineBlobProviderBuilder::new() + .with_primary(beacon_url) + .with_fallback(cli.blob_archiver_url()) + .build(); let dap = EthereumDataSource::new(l1_provider.clone(), blob_provider, &cfg); let mut cursor = l2_provider .l2_block_info_by_number(start)