Skip to content

Commit

Permalink
feat(hubble): improve internal types (#1676)
Browse files Browse the repository at this point in the history
Just some nit fixes that didn't get in on #997.
  • Loading branch information
KaiserKarel authored Apr 9, 2024
2 parents 8a4ec00 + 1ed2f3b commit d6e9fd6
Show file tree
Hide file tree
Showing 7 changed files with 276 additions and 171 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions dictionary.txt
Original file line number Diff line number Diff line change
Expand Up @@ -690,6 +690,7 @@ repellendus
repr
reqwest
restaking
retryable
rgba
ripemd
rlib
Expand Down
1 change: 1 addition & 0 deletions hubble/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ serde_json = { workspace = true }
sqlx = { workspace = true, features = ["postgres", "runtime-tokio", "tls-rustls", "time", "macros", "json"] }
tendermint = { workspace = true, features = ["std"] }
tendermint-rpc = { workspace = true, features = ["http-client", "tokio"] }
thiserror = { workspace = true }
time = { version = "0.3.30", features = ["serde"] }
tokio = { workspace = true, features = ["full"] }
tracing = { workspace = true }
Expand Down
171 changes: 108 additions & 63 deletions hubble/src/eth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,20 @@ pub struct Config {
pub url: Url,
}

/// Unit struct describing parametrization of associated types for Evm based chains.
pub struct Evm;

impl postgres::ChainType for Evm {
type BlockHash = String;
type BlockHeight = i32;
type TransactionHash = String;
}

pub type PgLog<T> = postgres::Log<Evm, T>;

pub struct Indexer {
range: Range<u64>,
chain_id: postgres::ChainId,
chain_id: ChainId,
tasks: tokio::task::JoinSet<Result<(), Report>>,
pool: PgPool,
provider: Provider<Http>,
Expand Down Expand Up @@ -119,6 +130,12 @@ async fn index_blocks(
.try_chunks(200)
.map_err(|err| err.1)
.try_fold(pool, |pool, chunk| async {
info!(
chain_id.canonical,
"indexing blocks for chunk: {}..{}",
&chunk.first().unwrap(),
&chunk.last().unwrap()
);
let tx = pool.begin().await.map_err(Report::from)?;
let inserts = FuturesOrdered::from_iter(
chunk
Expand All @@ -134,7 +151,7 @@ async fn index_blocks(
return Err(err);
}
Ok(info) => {
info!(
debug!(
chain_id.canonical,
height = info.height,
hash = info.hash,
Expand Down Expand Up @@ -163,7 +180,7 @@ async fn index_blocks(
Ok(())
}

/// A worker routine which continuously re-indexes the last 20 blocks into `postgres::Logs`.
/// A worker routine which continuously re-indexes the last 20 blocks into `PgLogs`.
async fn reindex_blocks(
pool: PgPool,
chain_id: ChainId,
Expand All @@ -189,7 +206,7 @@ async fn reindex_blocks(
}));
inserts
.try_fold(tx, |mut tx, (_, block)| async move {
let log = postgres::Log {
let log = PgLog {
chain_id: block.chain_id,
block_hash: block.hash.clone(),
height: block.height,
Expand Down Expand Up @@ -244,21 +261,50 @@ pub struct LogData {
pub header: ethers::types::Block<H256>,
}

#[derive(Debug, thiserror::Error)]
enum FromProviderError {
#[error("block not found")]
BlockNotFound,
#[error("data belonging to block not found")]
DataNotFound(ethers::providers::ProviderError),
#[error("something else went wrong")]
Other(Report),
}

impl FromProviderError {
fn retryable(&self) -> bool {
matches!(self, FromProviderError::BlockNotFound)
}
}

impl From<ethers::providers::ProviderError> for FromProviderError {
fn from(error: ethers::providers::ProviderError) -> Self {
Self::Other(Report::from(error))
}
}

impl From<Report> for FromProviderError {
fn from(error: Report) -> Self {
Self::Other(error)
}
}

impl BlockInsert {
async fn from_provider_retried(
chain_id: ChainId,
height: u64,
provider: &Provider<Http>,
) -> Result<(usize, Self), Report> {
let count = 0;
let mut count = 0;
let max_retries = 100;
loop {
match Self::from_provider(chain_id, height, provider).await {
Ok(block) => return Ok((count, block)),
Err(err) => {
if count > max_retries {
return Err(err);
if !err.retryable() || count > max_retries {
return Err(err.into());
}
count += 1;
tokio::time::sleep(Duration::from_secs(1)).await;
continue;
}
Expand All @@ -270,63 +316,62 @@ impl BlockInsert {
chain_id: ChainId,
height: u64,
provider: &Provider<Http>,
) -> Result<Self, Report> {
let block = provider.get_block(height).await?.unwrap();
let mut receipts = provider.get_block_receipts(height).await.unwrap();
let ts = if block.number.unwrap().is_zero() {
match chain_id.canonical {
"1" => 1438269973,
"11155111" => 1691344421,
id => todo!(
"chain-id {:?} unsupported. only ethereum and sepolia genesis are supported",
id
),
) -> Result<Self, FromProviderError> {
let block = provider
.get_block(height)
.await?
.ok_or(FromProviderError::BlockNotFound)?;
let mut receipts = provider
.get_block_receipts(height)
.await
.map_err(FromProviderError::DataNotFound)?;

let result: Result<Self, Report> = try {
let ts = block.time().unwrap_or_default().timestamp();
let time = OffsetDateTime::from_unix_timestamp(ts)?;
let block_hash = block.hash.unwrap().to_lower_hex();
let height: i32 = block.number.unwrap().as_u32().try_into()?;
receipts.sort_by(|a, b| a.transaction_index.cmp(&b.transaction_index));

let transactions = receipts
.into_iter()
.map(|receipt| {
let transaction_hash = receipt.transaction_hash.to_lower_hex();
let transaction_index = receipt.transaction_index.as_u32() as i32;

let events = receipt
.clone()
.logs
.into_iter()
.enumerate()
.map(|(transaction_log_index, log)| {
let data = serde_json::to_value(&log).unwrap();
EventInsert {
data,
log_index: log.log_index.unwrap().as_usize(),
transaction_log_index: transaction_log_index as i32,
}
})
.collect();
TransactionInsert {
hash: transaction_hash,
data: receipt,
index: transaction_index,
events,
}
})
.collect();

BlockInsert {
chain_id,
hash: block_hash,
header: block,
height,
time,
transactions,
}
} else {
block.time()?.timestamp()
};
let time = OffsetDateTime::from_unix_timestamp(ts)?;
let block_hash = block.hash.unwrap().to_lower_hex();
let height: i32 = block.number.unwrap().as_u32().try_into()?;
receipts.sort_by(|a, b| a.transaction_index.cmp(&b.transaction_index));

let transactions = receipts
.into_iter()
.map(|receipt| {
let transaction_hash = receipt.transaction_hash.to_lower_hex();
let transaction_index = receipt.transaction_index.as_u32() as i32;

let events = receipt
.clone()
.logs
.into_iter()
.enumerate()
.map(|(transaction_log_index, log)| {
let data = serde_json::to_value(&log).unwrap();
EventInsert {
data,
log_index: log.log_index.unwrap().as_usize(),
transaction_log_index: transaction_log_index as i32,
}
})
.collect();
TransactionInsert {
hash: transaction_hash,
data: receipt,
index: transaction_index,
events,
}
})
.collect();

Ok(BlockInsert {
chain_id,
hash: block_hash,
header: block,
height,
time,
transactions,
})
result.map_err(Into::into)
}

/// Handles inserting the block data and transactions as a log.
Expand All @@ -338,7 +383,7 @@ impl BlockInsert {
.map(|tx| tx.events.len() as i32)
.sum();

let log = postgres::Log {
let log = PgLog {
chain_id: self.chain_id,
block_hash: self.hash.clone(),
height: self.height,
Expand Down
1 change: 1 addition & 0 deletions hubble/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#![feature(more_qualified_paths)]
#![feature(try_blocks)]
#![allow(clippy::manual_async_fn, clippy::needless_lifetimes)]

use axum::{routing::get, Router};
Expand Down
Loading

0 comments on commit d6e9fd6

Please sign in to comment.