Skip to content

Commit

Permalink
fix(hubble): handle retries in from_provider
Browse files Browse the repository at this point in the history
  • Loading branch information
KaiserKarel committed Apr 3, 2024
1 parent 891c619 commit 4869580
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 58 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions hubble/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ serde_json = { workspace = true }
sqlx = { workspace = true, features = ["postgres", "runtime-tokio", "tls-rustls", "time", "macros", "json"] }
tendermint = { workspace = true, features = ["std"] }
tendermint-rpc = { workspace = true, features = ["http-client", "tokio"] }
thiserror = { workspace = true }
time = { version = "0.3.30", features = ["serde"] }
tokio = { workspace = true, features = ["full"] }
tracing = { workspace = true }
Expand Down
144 changes: 86 additions & 58 deletions hubble/src/eth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,21 +244,50 @@ pub struct LogData {
pub header: ethers::types::Block<H256>,
}

#[derive(Debug, thiserror::Error)]
enum FromProviderError {
#[error("block not found")]
BlockNotFound,
#[error("data belonging to block not found")]
DataNotFound(ethers::providers::ProviderError),
#[error("something else went wrong")]
Other(Report),
}

impl FromProviderError {
fn retryable(&self) -> bool {
matches!(self, FromProviderError::BlockNotFound)
}
}

impl From<ethers::providers::ProviderError> for FromProviderError {
fn from(error: ethers::providers::ProviderError) -> Self {
Self::Other(Report::from(error))
}
}

impl From<Report> for FromProviderError {
fn from(error: Report) -> Self {
Self::Other(error)
}
}

impl BlockInsert {
async fn from_provider_retried(
chain_id: ChainId,
height: u64,
provider: &Provider<Http>,
) -> Result<(usize, Self), Report> {
let count = 0;
let mut count = 0;
let max_retries = 100;
loop {
match Self::from_provider(chain_id, height, provider).await {
Ok(block) => return Ok((count, block)),
Err(err) => {
if count > max_retries {
return Err(err);
if !err.retryable() && count > max_retries {
return Err(err.into());
}
count += 1;
tokio::time::sleep(Duration::from_secs(1)).await;
continue;
}
Expand All @@ -270,63 +299,62 @@ impl BlockInsert {
chain_id: ChainId,
height: u64,
provider: &Provider<Http>,
) -> Result<Self, Report> {
let block = provider.get_block(height).await?.unwrap();
let mut receipts = provider.get_block_receipts(height).await.unwrap();
let ts = if block.number.unwrap().is_zero() {
match chain_id.canonical {
"1" => 1438269973,
"11155111" => 1691344421,
id => todo!(
"chain-id {:?} unsupported. only ethereum and sepolia genesis are supported",
id
),
) -> Result<Self, FromProviderError> {
let block = provider
.get_block(height)
.await?
.ok_or(FromProviderError::BlockNotFound)?;
let mut receipts = provider
.get_block_receipts(height)
.await
.map_err(|err| FromProviderError::DataNotFound(err))?;

let result: Result<Self, Report> = try {
let ts = block.time().unwrap_or_default().timestamp();
let time = OffsetDateTime::from_unix_timestamp(ts)?;
let block_hash = block.hash.unwrap().to_lower_hex();
let height: i32 = block.number.unwrap().as_u32().try_into()?;
receipts.sort_by(|a, b| a.transaction_index.cmp(&b.transaction_index));

let transactions = receipts
.into_iter()
.map(|receipt| {
let transaction_hash = receipt.transaction_hash.to_lower_hex();
let transaction_index = receipt.transaction_index.as_u32() as i32;

let events = receipt
.clone()
.logs
.into_iter()
.enumerate()
.map(|(transaction_log_index, log)| {
let data = serde_json::to_value(&log).unwrap();
EventInsert {
data,
log_index: log.log_index.unwrap().as_usize(),
transaction_log_index: transaction_log_index as i32,
}
})
.collect();
TransactionInsert {
hash: transaction_hash,
data: receipt,
index: transaction_index,
events,
}
})
.collect();

BlockInsert {
chain_id,
hash: block_hash,
header: block,
height,
time,
transactions,
}
} else {
block.time()?.timestamp()
};
let time = OffsetDateTime::from_unix_timestamp(ts)?;
let block_hash = block.hash.unwrap().to_lower_hex();
let height: i32 = block.number.unwrap().as_u32().try_into()?;
receipts.sort_by(|a, b| a.transaction_index.cmp(&b.transaction_index));

let transactions = receipts
.into_iter()
.map(|receipt| {
let transaction_hash = receipt.transaction_hash.to_lower_hex();
let transaction_index = receipt.transaction_index.as_u32() as i32;

let events = receipt
.clone()
.logs
.into_iter()
.enumerate()
.map(|(transaction_log_index, log)| {
let data = serde_json::to_value(&log).unwrap();
EventInsert {
data,
log_index: log.log_index.unwrap().as_usize(),
transaction_log_index: transaction_log_index as i32,
}
})
.collect();
TransactionInsert {
hash: transaction_hash,
data: receipt,
index: transaction_index,
events,
}
})
.collect();

Ok(BlockInsert {
chain_id,
hash: block_hash,
header: block,
height,
time,
transactions,
})
result.map_err(Into::into)
}

/// Handles inserting the block data and transactions as a log.
Expand Down
1 change: 1 addition & 0 deletions hubble/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#![feature(more_qualified_paths)]
#![feature(try_blocks)]
#![allow(clippy::manual_async_fn, clippy::needless_lifetimes)]

use axum::{routing::get, Router};
Expand Down

0 comments on commit 4869580

Please sign in to comment.