diff --git a/Cargo.lock b/Cargo.lock index 1775aacea4..14ae57afeb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2983,6 +2983,7 @@ dependencies = [ "sqlx", "tendermint", "tendermint-rpc", + "thiserror", "tikv-jemallocator", "time", "tokio", diff --git a/hubble/Cargo.toml b/hubble/Cargo.toml index 38143ee656..037a7299be 100644 --- a/hubble/Cargo.toml +++ b/hubble/Cargo.toml @@ -31,6 +31,7 @@ serde_json = { workspace = true } sqlx = { workspace = true, features = ["postgres", "runtime-tokio", "tls-rustls", "time", "macros", "json"] } tendermint = { workspace = true, features = ["std"] } tendermint-rpc = { workspace = true, features = ["http-client", "tokio"] } +thiserror = { workspace = true } time = { version = "0.3.30", features = ["serde"] } tokio = { workspace = true, features = ["full"] } tracing = { workspace = true } diff --git a/hubble/src/eth.rs b/hubble/src/eth.rs index 8fa93631f1..6ab14bba5f 100644 --- a/hubble/src/eth.rs +++ b/hubble/src/eth.rs @@ -244,21 +244,50 @@ pub struct LogData { pub header: ethers::types::Block, } +#[derive(Debug, thiserror::Error)] +enum FromProviderError { + #[error("block not found")] + BlockNotFound, + #[error("data belonging to block not found")] + DataNotFound(ethers::providers::ProviderError), + #[error("something else went wrong")] + Other(Report), +} + +impl FromProviderError { + fn retryable(&self) -> bool { + matches!(self, FromProviderError::BlockNotFound) + } +} + +impl From for FromProviderError { + fn from(error: ethers::providers::ProviderError) -> Self { + Self::Other(Report::from(error)) + } +} + +impl From for FromProviderError { + fn from(error: Report) -> Self { + Self::Other(error) + } +} + impl BlockInsert { async fn from_provider_retried( chain_id: ChainId, height: u64, provider: &Provider, ) -> Result<(usize, Self), Report> { - let count = 0; + let mut count = 0; let max_retries = 100; loop { match Self::from_provider(chain_id, height, provider).await { Ok(block) => return Ok((count, block)), Err(err) => { - if count > max_retries { - return Err(err); + if !err.retryable() && count > max_retries { + return Err(err.into()); } + count += 1; tokio::time::sleep(Duration::from_secs(1)).await; continue; } @@ -270,63 +299,62 @@ impl BlockInsert { chain_id: ChainId, height: u64, provider: &Provider, - ) -> Result { - let block = provider.get_block(height).await?.unwrap(); - let mut receipts = provider.get_block_receipts(height).await.unwrap(); - let ts = if block.number.unwrap().is_zero() { - match chain_id.canonical { - "1" => 1438269973, - "11155111" => 1691344421, - id => todo!( - "chain-id {:?} unsupported. only ethereum and sepolia genesis are supported", - id - ), + ) -> Result { + let block = provider + .get_block(height) + .await? + .ok_or(FromProviderError::BlockNotFound)?; + let mut receipts = provider + .get_block_receipts(height) + .await + .map_err(|err| FromProviderError::DataNotFound(err))?; + + let result: Result = try { + let ts = block.time().unwrap_or_default().timestamp(); + let time = OffsetDateTime::from_unix_timestamp(ts)?; + let block_hash = block.hash.unwrap().to_lower_hex(); + let height: i32 = block.number.unwrap().as_u32().try_into()?; + receipts.sort_by(|a, b| a.transaction_index.cmp(&b.transaction_index)); + + let transactions = receipts + .into_iter() + .map(|receipt| { + let transaction_hash = receipt.transaction_hash.to_lower_hex(); + let transaction_index = receipt.transaction_index.as_u32() as i32; + + let events = receipt + .clone() + .logs + .into_iter() + .enumerate() + .map(|(transaction_log_index, log)| { + let data = serde_json::to_value(&log).unwrap(); + EventInsert { + data, + log_index: log.log_index.unwrap().as_usize(), + transaction_log_index: transaction_log_index as i32, + } + }) + .collect(); + TransactionInsert { + hash: transaction_hash, + data: receipt, + index: transaction_index, + events, + } + }) + .collect(); + + BlockInsert { + chain_id, + hash: block_hash, + header: block, + height, + time, + transactions, } - } else { - block.time()?.timestamp() }; - let time = OffsetDateTime::from_unix_timestamp(ts)?; - let block_hash = block.hash.unwrap().to_lower_hex(); - let height: i32 = block.number.unwrap().as_u32().try_into()?; - receipts.sort_by(|a, b| a.transaction_index.cmp(&b.transaction_index)); - - let transactions = receipts - .into_iter() - .map(|receipt| { - let transaction_hash = receipt.transaction_hash.to_lower_hex(); - let transaction_index = receipt.transaction_index.as_u32() as i32; - - let events = receipt - .clone() - .logs - .into_iter() - .enumerate() - .map(|(transaction_log_index, log)| { - let data = serde_json::to_value(&log).unwrap(); - EventInsert { - data, - log_index: log.log_index.unwrap().as_usize(), - transaction_log_index: transaction_log_index as i32, - } - }) - .collect(); - TransactionInsert { - hash: transaction_hash, - data: receipt, - index: transaction_index, - events, - } - }) - .collect(); - - Ok(BlockInsert { - chain_id, - hash: block_hash, - header: block, - height, - time, - transactions, - }) + result.map_err(Into::into) } /// Handles inserting the block data and transactions as a log. diff --git a/hubble/src/main.rs b/hubble/src/main.rs index e1116efe68..7c7166b667 100644 --- a/hubble/src/main.rs +++ b/hubble/src/main.rs @@ -1,4 +1,5 @@ #![feature(more_qualified_paths)] +#![feature(try_blocks)] #![allow(clippy::manual_async_fn, clippy::needless_lifetimes)] use axum::{routing::get, Router};