diff --git a/Cargo.lock b/Cargo.lock index 6b70577f51..ea13cce87d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2980,6 +2980,7 @@ dependencies = [ "sqlx", "tendermint", "tendermint-rpc", + "thiserror", "tikv-jemallocator", "time", "tokio", diff --git a/dictionary.txt b/dictionary.txt index eea6f5d274..5249addf4a 100644 --- a/dictionary.txt +++ b/dictionary.txt @@ -690,6 +690,7 @@ repellendus repr reqwest restaking +retryable rgba ripemd rlib diff --git a/hubble/Cargo.toml b/hubble/Cargo.toml index 38143ee656..037a7299be 100644 --- a/hubble/Cargo.toml +++ b/hubble/Cargo.toml @@ -31,6 +31,7 @@ serde_json = { workspace = true } sqlx = { workspace = true, features = ["postgres", "runtime-tokio", "tls-rustls", "time", "macros", "json"] } tendermint = { workspace = true, features = ["std"] } tendermint-rpc = { workspace = true, features = ["http-client", "tokio"] } +thiserror = { workspace = true } time = { version = "0.3.30", features = ["serde"] } tokio = { workspace = true, features = ["full"] } tracing = { workspace = true } diff --git a/hubble/src/eth.rs b/hubble/src/eth.rs index 8fa93631f1..a3c7e362c2 100644 --- a/hubble/src/eth.rs +++ b/hubble/src/eth.rs @@ -25,9 +25,20 @@ pub struct Config { pub url: Url, } +/// Unit struct describing parametrization of associated types for Evm based chains. +pub struct Evm; + +impl postgres::ChainType for Evm { + type BlockHash = String; + type BlockHeight = i32; + type TransactionHash = String; +} + +pub type PgLog = postgres::Log; + pub struct Indexer { range: Range, - chain_id: postgres::ChainId, + chain_id: ChainId, tasks: tokio::task::JoinSet>, pool: PgPool, provider: Provider, @@ -119,6 +130,12 @@ async fn index_blocks( .try_chunks(200) .map_err(|err| err.1) .try_fold(pool, |pool, chunk| async { + info!( + chain_id.canonical, + "indexing blocks for chunk: {}..{}", + &chunk.first().unwrap(), + &chunk.last().unwrap() + ); let tx = pool.begin().await.map_err(Report::from)?; let inserts = FuturesOrdered::from_iter( chunk @@ -134,7 +151,7 @@ async fn index_blocks( return Err(err); } Ok(info) => { - info!( + debug!( chain_id.canonical, height = info.height, hash = info.hash, @@ -163,7 +180,7 @@ async fn index_blocks( Ok(()) } -/// A worker routine which continuously re-indexes the last 20 blocks into `postgres::Logs`. +/// A worker routine which continuously re-indexes the last 20 blocks into `PgLogs`. async fn reindex_blocks( pool: PgPool, chain_id: ChainId, @@ -189,7 +206,7 @@ async fn reindex_blocks( })); inserts .try_fold(tx, |mut tx, (_, block)| async move { - let log = postgres::Log { + let log = PgLog { chain_id: block.chain_id, block_hash: block.hash.clone(), height: block.height, @@ -244,21 +261,50 @@ pub struct LogData { pub header: ethers::types::Block, } +#[derive(Debug, thiserror::Error)] +enum FromProviderError { + #[error("block not found")] + BlockNotFound, + #[error("data belonging to block not found")] + DataNotFound(ethers::providers::ProviderError), + #[error("something else went wrong")] + Other(Report), +} + +impl FromProviderError { + fn retryable(&self) -> bool { + matches!(self, FromProviderError::BlockNotFound) + } +} + +impl From for FromProviderError { + fn from(error: ethers::providers::ProviderError) -> Self { + Self::Other(Report::from(error)) + } +} + +impl From for FromProviderError { + fn from(error: Report) -> Self { + Self::Other(error) + } +} + impl BlockInsert { async fn from_provider_retried( chain_id: ChainId, height: u64, provider: &Provider, ) -> Result<(usize, Self), Report> { - let count = 0; + let mut count = 0; let max_retries = 100; loop { match Self::from_provider(chain_id, height, provider).await { Ok(block) => return Ok((count, block)), Err(err) => { - if count > max_retries { - return Err(err); + if !err.retryable() || count > max_retries { + return Err(err.into()); } + count += 1; tokio::time::sleep(Duration::from_secs(1)).await; continue; } @@ -270,63 +316,62 @@ impl BlockInsert { chain_id: ChainId, height: u64, provider: &Provider, - ) -> Result { - let block = provider.get_block(height).await?.unwrap(); - let mut receipts = provider.get_block_receipts(height).await.unwrap(); - let ts = if block.number.unwrap().is_zero() { - match chain_id.canonical { - "1" => 1438269973, - "11155111" => 1691344421, - id => todo!( - "chain-id {:?} unsupported. only ethereum and sepolia genesis are supported", - id - ), + ) -> Result { + let block = provider + .get_block(height) + .await? + .ok_or(FromProviderError::BlockNotFound)?; + let mut receipts = provider + .get_block_receipts(height) + .await + .map_err(FromProviderError::DataNotFound)?; + + let result: Result = try { + let ts = block.time().unwrap_or_default().timestamp(); + let time = OffsetDateTime::from_unix_timestamp(ts)?; + let block_hash = block.hash.unwrap().to_lower_hex(); + let height: i32 = block.number.unwrap().as_u32().try_into()?; + receipts.sort_by(|a, b| a.transaction_index.cmp(&b.transaction_index)); + + let transactions = receipts + .into_iter() + .map(|receipt| { + let transaction_hash = receipt.transaction_hash.to_lower_hex(); + let transaction_index = receipt.transaction_index.as_u32() as i32; + + let events = receipt + .clone() + .logs + .into_iter() + .enumerate() + .map(|(transaction_log_index, log)| { + let data = serde_json::to_value(&log).unwrap(); + EventInsert { + data, + log_index: log.log_index.unwrap().as_usize(), + transaction_log_index: transaction_log_index as i32, + } + }) + .collect(); + TransactionInsert { + hash: transaction_hash, + data: receipt, + index: transaction_index, + events, + } + }) + .collect(); + + BlockInsert { + chain_id, + hash: block_hash, + header: block, + height, + time, + transactions, } - } else { - block.time()?.timestamp() }; - let time = OffsetDateTime::from_unix_timestamp(ts)?; - let block_hash = block.hash.unwrap().to_lower_hex(); - let height: i32 = block.number.unwrap().as_u32().try_into()?; - receipts.sort_by(|a, b| a.transaction_index.cmp(&b.transaction_index)); - - let transactions = receipts - .into_iter() - .map(|receipt| { - let transaction_hash = receipt.transaction_hash.to_lower_hex(); - let transaction_index = receipt.transaction_index.as_u32() as i32; - - let events = receipt - .clone() - .logs - .into_iter() - .enumerate() - .map(|(transaction_log_index, log)| { - let data = serde_json::to_value(&log).unwrap(); - EventInsert { - data, - log_index: log.log_index.unwrap().as_usize(), - transaction_log_index: transaction_log_index as i32, - } - }) - .collect(); - TransactionInsert { - hash: transaction_hash, - data: receipt, - index: transaction_index, - events, - } - }) - .collect(); - - Ok(BlockInsert { - chain_id, - hash: block_hash, - header: block, - height, - time, - transactions, - }) + result.map_err(Into::into) } /// Handles inserting the block data and transactions as a log. @@ -338,7 +383,7 @@ impl BlockInsert { .map(|tx| tx.events.len() as i32) .sum(); - let log = postgres::Log { + let log = PgLog { chain_id: self.chain_id, block_hash: self.hash.clone(), height: self.height, diff --git a/hubble/src/main.rs b/hubble/src/main.rs index e1116efe68..7c7166b667 100644 --- a/hubble/src/main.rs +++ b/hubble/src/main.rs @@ -1,4 +1,5 @@ #![feature(more_qualified_paths)] +#![feature(try_blocks)] #![allow(clippy::manual_async_fn, clippy::needless_lifetimes)] use axum::{routing::get, Router}; diff --git a/hubble/src/postgres.rs b/hubble/src/postgres.rs index a35b9fe60e..ded884b89c 100644 --- a/hubble/src/postgres.rs +++ b/hubble/src/postgres.rs @@ -1,3 +1,5 @@ +use core::fmt::Debug; + use futures::{Stream, StreamExt, TryStreamExt}; use serde::Serialize; use sqlx::{Acquire, Postgres, QueryBuilder}; @@ -7,64 +9,92 @@ use valuable::Valuable; pub const BIND_LIMIT: usize = 65535; -pub struct Block { +/// A trait to describe the different parameters of a chain, used to instantiate types for insertion. +pub trait ChainType { + type BlockHeight; + type BlockHash; + type TransactionHash; +} + +/// DTO corresponding to the v0.blocks table. +pub struct Block { pub chain_id: ChainId, - pub hash: String, - pub height: i32, + pub hash: Chain::BlockHash, + pub height: Chain::BlockHeight, pub time: OffsetDateTime, pub data: serde_json::Value, } -pub struct Transaction { +/// DTO corresponding to the v0.transactions table. +pub struct Transaction { pub chain_id: ChainId, - pub block_hash: String, - pub block_height: i32, + pub block_hash: Chain::BlockHash, + pub block_height: Chain::BlockHeight, pub time: OffsetDateTime, pub data: serde_json::Value, - pub hash: String, + pub hash: Chain::TransactionHash, pub index: i32, } -pub struct Event { +/// DTO corresponding to the v0.events table. +pub struct Event { pub chain_id: ChainId, - pub block_hash: String, - pub block_height: i32, + pub block_hash: Chain::BlockHash, + pub block_height: Chain::BlockHeight, pub time: OffsetDateTime, pub data: serde_json::Value, - pub transaction_hash: Option, + pub transaction_hash: Option, pub transaction_index: Option, pub block_index: i32, } +/// DTO corresponding to the v0.logs table. Note that `logs` are considered opaque, unprocessed +/// chunks of data depending on the chain type. For example, for Ethereum, a log is a header + transaction receipts. +pub struct Log { + pub chain_id: ChainId, + pub block_hash: Chain::BlockHash, + pub height: Chain::BlockHeight, + pub time: OffsetDateTime, + pub data: T, +} + +/// ChainIds track both the database ID of a chain, as well as some canonical representation for +/// debug logging. +/// +/// # Implementation Detail +/// ChainIds contain leaked values, hence care should be taken when creating them. +/// +/// We do not track too many chains in hubble, hence leaking the canonical +/// chain-id makes the code more efficient and easier to pass IDs around as `Copy`. +pub type ChainId = ChainIdInner<'static>; + /// The internal representation of a chain-id, assigned by the database, combined /// with the canonical chain-id (from the genesis). -#[derive(Copy, Clone, Debug, Valuable)] -pub struct ChainId { +#[derive(Clone, Debug, Valuable)] +pub struct ChainIdInner<'a> { pub db: i32, - // We do not track too many chains in hubble, hence leaking the canonical - // chain-id makes the code more efficient and easier to pass IDs around as `Copy`. - pub canonical: &'static str, + pub canonical: &'a str, } -impl ChainId { - pub fn new_leaked(db: i32, canonical: String) -> Self { - let canonical = canonical.leak(); +/// Inside of Hubble, we leak the ChainId.canonical to make ChainIds easily copyable. +impl Copy for ChainIdInner<'static> {} + +impl<'a> ChainIdInner<'a> { + pub fn new(db: i32, canonical: &'a str) -> Self { Self { db, canonical } } } -pub struct Log { - pub chain_id: ChainId, - pub block_hash: String, - pub height: i32, - pub time: OffsetDateTime, - pub data: T, -} - -pub async fn insert_batch_logs>>( +pub async fn insert_batch_logs>>( tx: &mut sqlx::Transaction<'_, Postgres>, logs: B, -) -> sqlx::Result<()> { +) -> sqlx::Result<()> +where + ::BlockHeight: + for<'q> sqlx::Encode<'q, Postgres> + Send + sqlx::Type + Send + Debug, + ::BlockHash: + for<'q> sqlx::Encode<'q, Postgres> + Send + sqlx::Type + Send + Debug + Clone, +{ logs.chunks(BIND_LIMIT / 5) .map(Ok::<_, sqlx::Error>) .try_fold(tx.as_mut(), |tx, chunk| async { @@ -77,8 +107,8 @@ pub async fn insert_batch_logs>>( logs_query_builder.push_values(chunk.into_iter(), |mut b, log| { debug!( chain_id = log.chain_id.canonical, - height = log.height, - block_hash = log.block_hash, + height = ?log.height, + block_hash = ?log.block_hash, "batch inserting log" ); b.push_bind(log.chain_id.db) @@ -99,10 +129,14 @@ pub async fn insert_batch_logs>>( Ok(()) } -pub async fn upsert_log( +pub async fn upsert_log( tx: &mut sqlx::Transaction<'_, Postgres>, - log: Log, -) -> sqlx::Result<()> { + log: Log, +) -> sqlx::Result<()> +where + ::BlockHeight: Into, + ::BlockHash: AsRef, +{ sqlx::query!( " INSERT INTO v0.logs (chain_id, block_hash, data, height, time) @@ -112,9 +146,9 @@ pub async fn upsert_log( SET chain_id = $1, block_hash = $2, data = $3, height = $4, time = $5 ", log.chain_id.db, - log.block_hash, + log.block_hash.as_ref(), serde_json::to_value(&log.data).unwrap(), - log.height, + log.height.into(), log.time ) .execute(tx.as_mut()) @@ -122,10 +156,16 @@ pub async fn upsert_log( Ok(()) } -pub async fn insert_batch_blocks>( +pub async fn insert_batch_blocks>>( tx: &mut sqlx::Transaction<'_, Postgres>, blocks: B, -) -> sqlx::Result<()> { +) -> sqlx::Result<()> +where + ::BlockHeight: + for<'q> sqlx::Encode<'q, Postgres> + Send + sqlx::Type + Debug, + ::BlockHash: + for<'q> sqlx::Encode<'q, Postgres> + Send + sqlx::Type + Debug + Clone, +{ blocks .chunks(BIND_LIMIT / 5) .map(Ok::<_, sqlx::Error>) @@ -139,8 +179,8 @@ pub async fn insert_batch_blocks>( blocks_query_builder.push_values(chunk.into_iter(), |mut b, block| { debug!( chain_id = block.chain_id.canonical, - height = block.height, - block_hash = block.hash, + height = ?block.height, + block_hash = ?block.hash, "batch inserting block" ); b.push_bind(block.chain_id.db) @@ -161,10 +201,18 @@ pub async fn insert_batch_blocks>( Ok(()) } -pub async fn insert_batch_transactions>( +pub async fn insert_batch_transactions>>( tx: &mut sqlx::Transaction<'_, Postgres>, transactions: B, -) -> sqlx::Result<()> { +) -> sqlx::Result<()> +where + ::BlockHeight: + for<'q> sqlx::Encode<'q, Postgres> + Send + sqlx::Type + Debug, + ::BlockHash: + for<'q> sqlx::Encode<'q, Postgres> + Send + sqlx::Type + Debug + Clone, + ::TransactionHash: + for<'q> sqlx::Encode<'q, Postgres> + Send + sqlx::Type + Debug + Clone, +{ // We insert all transactions in batched statements without their logs first. transactions .chunks(BIND_LIMIT / 6) @@ -179,9 +227,9 @@ pub async fn insert_batch_transactions>( tx_query_builder.push_values(chunk.into_iter(), |mut b, transaction| { debug!( chain_id = transaction.chain_id.canonical, - height = transaction.block_height, - block_hash = transaction.block_hash, - transaction_hash = &transaction.hash, + height = ?transaction.block_height, + block_hash = ?transaction.block_hash, + transaction_hash = ?&transaction.hash, transaction_index = transaction.index, "batch inserting transaction" ); @@ -205,10 +253,18 @@ pub async fn insert_batch_transactions>( Ok(()) } -pub async fn insert_batch_events>( +pub async fn insert_batch_events>>( tx: &mut sqlx::Transaction<'_, Postgres>, events: B, -) -> sqlx::Result<()> { +) -> sqlx::Result<()> +where + ::BlockHeight: + for<'q> sqlx::Encode<'q, Postgres> + Send + sqlx::Type + Debug, + ::BlockHash: + for<'q> sqlx::Encode<'q, Postgres> + Send + sqlx::Type + Debug + Clone, + ::TransactionHash: + for<'q> sqlx::Encode<'q, Postgres> + Send + sqlx::Type + Debug + Clone, +{ events .chunks(BIND_LIMIT / 8) .map(Ok::<_, sqlx::Error>) @@ -222,9 +278,9 @@ pub async fn insert_batch_events>( event_query_builder.push_values(chunk.into_iter(), |mut b, event| { debug!( chain_id = event.chain_id.canonical, - height = event.block_height, - block_hash = event.block_hash, - transaction_hash = &event.transaction_hash, + height = ?event.block_height, + block_hash = ?event.block_hash, + transaction_hash = ?&event.transaction_hash, transaction_index = event.transaction_index, index = event.block_index, "batch inserting event" @@ -281,7 +337,7 @@ pub async fn fetch_or_insert_chain_id<'a, A: Acquire<'a, Database = Postgres>>( .fetch_optional(&mut *conn) .await? { - Fetched(ChainId::new_leaked(chain_id.id, canonical)) + Fetched(ChainId::new(chain_id.id, canonical.leak())) } else { let id = sqlx::query!( "INSERT INTO \"v0\".chains (chain_id) VALUES ($1) RETURNING id", @@ -290,7 +346,7 @@ pub async fn fetch_or_insert_chain_id<'a, A: Acquire<'a, Database = Postgres>>( .fetch_one(&mut *conn) .await? .id; - Created(ChainId::new_leaked(id, canonical)) + Created(ChainId::new(id, canonical.leak())) }; Ok(db_chain_id) } diff --git a/hubble/src/tm.rs b/hubble/src/tm.rs index 0397a3e126..5bac12a6e4 100644 --- a/hubble/src/tm.rs +++ b/hubble/src/tm.rs @@ -28,6 +28,19 @@ pub struct Config { pub until: Option, } +/// Unit struct describing parametrization of associated types for CosmosSDK based chains. +pub struct CosmosSDK; + +impl postgres::ChainType for CosmosSDK { + type BlockHash = String; + type BlockHeight = i32; + type TransactionHash = String; +} + +pub type PgBlock = postgres::Block; +pub type PgTransaction = postgres::Transaction; +pub type PgEvent = postgres::Event; + impl Config { /// The batch size for the fast sync protocol. This corresponds to the maximum number of headers returned over a node's RPC. pub const BATCH_SIZE: u32 = 20; @@ -149,7 +162,7 @@ async fn batch_sync( let submit_blocks = postgres::insert_batch_blocks( tx, stream::iter(headers.block_metas.clone().into_iter().map(|meta| { - postgres::Block { + PgBlock { chain_id, hash: meta.header.hash().to_string(), height: meta.header.height.value() as i32, @@ -199,7 +212,7 @@ async fn batch_sync( let data = serde_json::to_value(&tx).unwrap().replace_escape_chars(); events.extend(tx.tx_result.events.into_iter().enumerate().map( |(i, event)| { - let event = postgres::Event { + let event = PgEvent { chain_id, block_hash: block_hash.clone(), block_height, @@ -215,7 +228,7 @@ async fn batch_sync( event }, )); - postgres::Transaction { + PgTransaction { chain_id, block_hash: block_hash.clone(), block_height, @@ -226,12 +239,15 @@ async fn batch_sync( } }) .collect::>(); - events.extend(finalize_block_events.into_iter().enumerate().map(|(i, e)| { - postgres::Event { - block_index: i as i32 + block_index, - ..e - } - })); + events.extend( + finalize_block_events + .into_iter() + .enumerate() + .map(|(i, e)| PgEvent { + block_index: i as i32 + block_index, + ..e + }), + ); txs }); postgres::insert_batch_transactions(tx, stream::iter(transactions)).await?; @@ -270,7 +286,7 @@ async fn sync_next( } } Ok(block) => ( - postgres::Block { + PgBlock { chain_id, hash: header.hash().to_string(), height: block_height.value().try_into().unwrap(), @@ -300,7 +316,7 @@ async fn sync_next( .iter() .enumerate() .for_each(|(i, event)| { - events.push(postgres::Event { + events.push(PgEvent { chain_id, block_hash: header.hash().to_string(), block_height: block_height.value().try_into().unwrap(), @@ -312,7 +328,7 @@ async fn sync_next( }); block_index += 1; }); - postgres::Transaction { + PgTransaction { chain_id, block_hash: header.hash().to_string(), block_height: block_height.value().try_into().unwrap(), @@ -330,7 +346,7 @@ async fn sync_next( .into_iter() .chain(finalize_events) .enumerate() - .map(|(i, e)| postgres::Event { + .map(|(i, e)| PgEvent { block_index: i as i32, ..e }); @@ -392,27 +408,17 @@ async fn fetch_transactions_for_block( pub trait BlockExt { /// Returns the non-tx related events from a block formatted for insertion. - fn events( - self, - chain_id: ChainId, - block_hash: String, - time: OffsetDateTime, - ) -> Vec; + fn events(self, chain_id: ChainId, block_hash: String, time: OffsetDateTime) -> Vec; } impl BlockExt for BlockResponse { - fn events( - self, - chain_id: ChainId, - block_hash: String, - time: OffsetDateTime, - ) -> Vec { + fn events(self, chain_id: ChainId, block_hash: String, time: OffsetDateTime) -> Vec { let block_height: i32 = self.height.value().try_into().unwrap(); let begin_block_events = self .begin_block_events .unwrap_or_default() .into_iter() - .map(|e| postgres::Event { + .map(|e| PgEvent { chain_id, block_hash: block_hash.clone(), block_height, @@ -422,7 +428,7 @@ impl BlockExt for BlockResponse { transaction_index: None, block_index: 0, }); - let end_block_events = self.end_block_events.into_iter().map(|e| postgres::Event { + let end_block_events = self.end_block_events.into_iter().map(|e| PgEvent { chain_id, block_hash: block_hash.clone(), block_height, @@ -432,20 +438,17 @@ impl BlockExt for BlockResponse { transaction_index: None, block_index: 0, }); - let finalize_block_events = - self.finalize_block_events - .into_iter() - .map(|e| postgres::Event { - chain_id, - block_hash: block_hash.clone(), - block_height, - time, - data: serde_json::to_value(e).unwrap().replace_escape_chars(), - transaction_hash: None, - transaction_index: None, - block_index: 0, - }); - let validator_updates = self.validator_updates.into_iter().map(|e| postgres::Event { + let finalize_block_events = self.finalize_block_events.into_iter().map(|e| PgEvent { + chain_id, + block_hash: block_hash.clone(), + block_height, + time, + data: serde_json::to_value(e).unwrap().replace_escape_chars(), + transaction_hash: None, + transaction_index: None, + block_index: 0, + }); + let validator_updates = self.validator_updates.into_iter().map(|e| PgEvent { chain_id, block_hash: block_hash.clone(), block_height, @@ -457,21 +460,18 @@ impl BlockExt for BlockResponse { transaction_index: None, block_index: 0, }); - let consensus_param_updates = - self.consensus_param_updates - .into_iter() - .map(|e| postgres::Event { - chain_id, - block_hash: block_hash.clone(), - block_height, - time, - data: serde_json::to_value(WithType::consensus_param_update(e)) - .unwrap() - .replace_escape_chars(), - transaction_hash: None, - transaction_index: None, - block_index: 0, - }); + let consensus_param_updates = self.consensus_param_updates.into_iter().map(|e| PgEvent { + chain_id, + block_hash: block_hash.clone(), + block_height, + time, + data: serde_json::to_value(WithType::consensus_param_update(e)) + .unwrap() + .replace_escape_chars(), + transaction_hash: None, + transaction_index: None, + block_index: 0, + }); begin_block_events .chain(end_block_events)