diff --git a/hubble/src/eth.rs b/hubble/src/eth.rs index 6ab14bba5f..d2eb5fffc4 100644 --- a/hubble/src/eth.rs +++ b/hubble/src/eth.rs @@ -25,9 +25,20 @@ pub struct Config { pub url: Url, } +/// Unit struct describing parametrization of associated types for Evm based chains. +pub struct Evm; + +impl postgres::ChainType for Evm { + type BlockHash = String; + type BlockHeight = i32; + type TransactionHash = String; +} + +pub type PgLog = postgres::Log; + pub struct Indexer { range: Range, - chain_id: postgres::ChainId, + chain_id: ChainId, tasks: tokio::task::JoinSet>, pool: PgPool, provider: Provider, @@ -163,7 +174,7 @@ async fn index_blocks( Ok(()) } -/// A worker routine which continuously re-indexes the last 20 blocks into `postgres::Logs`. +/// A worker routine which continuously re-indexes the last 20 blocks into `PgLogs`. async fn reindex_blocks( pool: PgPool, chain_id: ChainId, @@ -189,7 +200,7 @@ async fn reindex_blocks( })); inserts .try_fold(tx, |mut tx, (_, block)| async move { - let log = postgres::Log { + let log = PgLog { chain_id: block.chain_id, block_hash: block.hash.clone(), height: block.height, @@ -307,7 +318,7 @@ impl BlockInsert { let mut receipts = provider .get_block_receipts(height) .await - .map_err(|err| FromProviderError::DataNotFound(err))?; + .map_err(FromProviderError::DataNotFound)?; let result: Result = try { let ts = block.time().unwrap_or_default().timestamp(); @@ -366,7 +377,7 @@ impl BlockInsert { .map(|tx| tx.events.len() as i32) .sum(); - let log = postgres::Log { + let log = PgLog { chain_id: self.chain_id, block_hash: self.hash.clone(), height: self.height, diff --git a/hubble/src/postgres.rs b/hubble/src/postgres.rs index a35b9fe60e..3b4fdb5f12 100644 --- a/hubble/src/postgres.rs +++ b/hubble/src/postgres.rs @@ -1,3 +1,5 @@ +use core::fmt::Debug; + use futures::{Stream, StreamExt, TryStreamExt}; use serde::Serialize; use sqlx::{Acquire, Postgres, QueryBuilder}; @@ -7,64 +9,92 @@ use valuable::Valuable; pub const BIND_LIMIT: usize = 65535; -pub struct Block { +/// A trait to describe the different parameters of a chain, used to instantiate types for insertion. +pub trait ChainType { + type BlockHeight; + type BlockHash; + type TransactionHash; +} + +/// DTO corresponding to the v0.blocks table. +pub struct Block { pub chain_id: ChainId, - pub hash: String, - pub height: i32, + pub hash: Chain::BlockHash, + pub height: Chain::BlockHeight, pub time: OffsetDateTime, pub data: serde_json::Value, } -pub struct Transaction { +/// DTO corresponding to the v0.transactions table. +pub struct Transaction { pub chain_id: ChainId, - pub block_hash: String, - pub block_height: i32, + pub block_hash: Chain::BlockHash, + pub block_height: Chain::BlockHeight, pub time: OffsetDateTime, pub data: serde_json::Value, - pub hash: String, + pub hash: Chain::TransactionHash, pub index: i32, } -pub struct Event { +/// DTO corresponding to the v0.events table. +pub struct Event { pub chain_id: ChainId, - pub block_hash: String, - pub block_height: i32, + pub block_hash: Chain::BlockHash, + pub block_height: Chain::BlockHeight, pub time: OffsetDateTime, pub data: serde_json::Value, - pub transaction_hash: Option, + pub transaction_hash: Option, pub transaction_index: Option, pub block_index: i32, } +/// DTO corresponding to the v0.logs table. Note that `logs` are considered opaque, unprocessed +/// chunks of data depending on the chain type. For example, for Ethereum, a log is a header + transaction receipts. +pub struct Log { + pub chain_id: ChainId, + pub block_hash: Chain::BlockHash, + pub height: Chain::BlockHeight, + pub time: OffsetDateTime, + pub data: T, +} + +/// ChainIds track both the database ID of a chain, as well as some canonical representation for +/// degug logging +/// +/// # Implementation Detail +/// ChainIds contain leaked values, hence care should be taken when creating them. +/// +/// We do not track too many chains in hubble, hence leaking the canonical +/// chain-id makes the code more efficient and easier to pass IDs around as `Copy`. +pub type ChainId = ChainIdInner<'static>; + /// The internal representation of a chain-id, assigned by the database, combined /// with the canonical chain-id (from the genesis). -#[derive(Copy, Clone, Debug, Valuable)] -pub struct ChainId { +#[derive(Clone, Debug, Valuable)] +pub struct ChainIdInner<'a> { pub db: i32, - // We do not track too many chains in hubble, hence leaking the canonical - // chain-id makes the code more efficient and easier to pass IDs around as `Copy`. - pub canonical: &'static str, + pub canonical: &'a str, } -impl ChainId { - pub fn new_leaked(db: i32, canonical: String) -> Self { - let canonical = canonical.leak(); +/// Inside of Hubble, we leak the ChainId.canonical to make ChainIds easily copyable. +impl Copy for ChainIdInner<'static> {} + +impl<'a> ChainIdInner<'a> { + pub fn new(db: i32, canonical: &'a str) -> Self { Self { db, canonical } } } -pub struct Log { - pub chain_id: ChainId, - pub block_hash: String, - pub height: i32, - pub time: OffsetDateTime, - pub data: T, -} - -pub async fn insert_batch_logs>>( +pub async fn insert_batch_logs>>( tx: &mut sqlx::Transaction<'_, Postgres>, logs: B, -) -> sqlx::Result<()> { +) -> sqlx::Result<()> +where + ::BlockHeight: + for<'q> sqlx::Encode<'q, Postgres> + Send + sqlx::Type + Send + Debug, + ::BlockHash: + for<'q> sqlx::Encode<'q, Postgres> + Send + sqlx::Type + Send + Debug + Clone, +{ logs.chunks(BIND_LIMIT / 5) .map(Ok::<_, sqlx::Error>) .try_fold(tx.as_mut(), |tx, chunk| async { @@ -77,8 +107,8 @@ pub async fn insert_batch_logs>>( logs_query_builder.push_values(chunk.into_iter(), |mut b, log| { debug!( chain_id = log.chain_id.canonical, - height = log.height, - block_hash = log.block_hash, + height = ?log.height, + block_hash = ?log.block_hash, "batch inserting log" ); b.push_bind(log.chain_id.db) @@ -99,10 +129,14 @@ pub async fn insert_batch_logs>>( Ok(()) } -pub async fn upsert_log( +pub async fn upsert_log( tx: &mut sqlx::Transaction<'_, Postgres>, - log: Log, -) -> sqlx::Result<()> { + log: Log, +) -> sqlx::Result<()> +where + ::BlockHeight: Into, + ::BlockHash: AsRef, +{ sqlx::query!( " INSERT INTO v0.logs (chain_id, block_hash, data, height, time) @@ -112,9 +146,9 @@ pub async fn upsert_log( SET chain_id = $1, block_hash = $2, data = $3, height = $4, time = $5 ", log.chain_id.db, - log.block_hash, + log.block_hash.as_ref(), serde_json::to_value(&log.data).unwrap(), - log.height, + log.height.into(), log.time ) .execute(tx.as_mut()) @@ -122,10 +156,16 @@ pub async fn upsert_log( Ok(()) } -pub async fn insert_batch_blocks>( +pub async fn insert_batch_blocks>>( tx: &mut sqlx::Transaction<'_, Postgres>, blocks: B, -) -> sqlx::Result<()> { +) -> sqlx::Result<()> +where + ::BlockHeight: + for<'q> sqlx::Encode<'q, Postgres> + Send + sqlx::Type + Debug, + ::BlockHash: + for<'q> sqlx::Encode<'q, Postgres> + Send + sqlx::Type + Debug + Clone, +{ blocks .chunks(BIND_LIMIT / 5) .map(Ok::<_, sqlx::Error>) @@ -139,8 +179,8 @@ pub async fn insert_batch_blocks>( blocks_query_builder.push_values(chunk.into_iter(), |mut b, block| { debug!( chain_id = block.chain_id.canonical, - height = block.height, - block_hash = block.hash, + height = ?block.height, + block_hash = ?block.hash, "batch inserting block" ); b.push_bind(block.chain_id.db) @@ -161,10 +201,18 @@ pub async fn insert_batch_blocks>( Ok(()) } -pub async fn insert_batch_transactions>( +pub async fn insert_batch_transactions>>( tx: &mut sqlx::Transaction<'_, Postgres>, transactions: B, -) -> sqlx::Result<()> { +) -> sqlx::Result<()> +where + ::BlockHeight: + for<'q> sqlx::Encode<'q, Postgres> + Send + sqlx::Type + Debug, + ::BlockHash: + for<'q> sqlx::Encode<'q, Postgres> + Send + sqlx::Type + Debug + Clone, + ::TransactionHash: + for<'q> sqlx::Encode<'q, Postgres> + Send + sqlx::Type + Debug + Clone, +{ // We insert all transactions in batched statements without their logs first. transactions .chunks(BIND_LIMIT / 6) @@ -179,9 +227,9 @@ pub async fn insert_batch_transactions>( tx_query_builder.push_values(chunk.into_iter(), |mut b, transaction| { debug!( chain_id = transaction.chain_id.canonical, - height = transaction.block_height, - block_hash = transaction.block_hash, - transaction_hash = &transaction.hash, + height = ?transaction.block_height, + block_hash = ?transaction.block_hash, + transaction_hash = ?&transaction.hash, transaction_index = transaction.index, "batch inserting transaction" ); @@ -205,10 +253,18 @@ pub async fn insert_batch_transactions>( Ok(()) } -pub async fn insert_batch_events>( +pub async fn insert_batch_events>>( tx: &mut sqlx::Transaction<'_, Postgres>, events: B, -) -> sqlx::Result<()> { +) -> sqlx::Result<()> +where + ::BlockHeight: + for<'q> sqlx::Encode<'q, Postgres> + Send + sqlx::Type + Debug, + ::BlockHash: + for<'q> sqlx::Encode<'q, Postgres> + Send + sqlx::Type + Debug + Clone, + ::TransactionHash: + for<'q> sqlx::Encode<'q, Postgres> + Send + sqlx::Type + Debug + Clone, +{ events .chunks(BIND_LIMIT / 8) .map(Ok::<_, sqlx::Error>) @@ -222,9 +278,9 @@ pub async fn insert_batch_events>( event_query_builder.push_values(chunk.into_iter(), |mut b, event| { debug!( chain_id = event.chain_id.canonical, - height = event.block_height, - block_hash = event.block_hash, - transaction_hash = &event.transaction_hash, + height = ?event.block_height, + block_hash = ?event.block_hash, + transaction_hash = ?&event.transaction_hash, transaction_index = event.transaction_index, index = event.block_index, "batch inserting event" @@ -281,7 +337,7 @@ pub async fn fetch_or_insert_chain_id<'a, A: Acquire<'a, Database = Postgres>>( .fetch_optional(&mut *conn) .await? { - Fetched(ChainId::new_leaked(chain_id.id, canonical)) + Fetched(ChainId::new(chain_id.id, canonical.leak())) } else { let id = sqlx::query!( "INSERT INTO \"v0\".chains (chain_id) VALUES ($1) RETURNING id", @@ -290,7 +346,7 @@ pub async fn fetch_or_insert_chain_id<'a, A: Acquire<'a, Database = Postgres>>( .fetch_one(&mut *conn) .await? .id; - Created(ChainId::new_leaked(id, canonical)) + Created(ChainId::new(id, canonical.leak())) }; Ok(db_chain_id) } diff --git a/hubble/src/tm.rs b/hubble/src/tm.rs index 0397a3e126..5bac12a6e4 100644 --- a/hubble/src/tm.rs +++ b/hubble/src/tm.rs @@ -28,6 +28,19 @@ pub struct Config { pub until: Option, } +/// Unit struct describing parametrization of associated types for CosmosSDK based chains. +pub struct CosmosSDK; + +impl postgres::ChainType for CosmosSDK { + type BlockHash = String; + type BlockHeight = i32; + type TransactionHash = String; +} + +pub type PgBlock = postgres::Block; +pub type PgTransaction = postgres::Transaction; +pub type PgEvent = postgres::Event; + impl Config { /// The batch size for the fast sync protocol. This corresponds to the maximum number of headers returned over a node's RPC. pub const BATCH_SIZE: u32 = 20; @@ -149,7 +162,7 @@ async fn batch_sync( let submit_blocks = postgres::insert_batch_blocks( tx, stream::iter(headers.block_metas.clone().into_iter().map(|meta| { - postgres::Block { + PgBlock { chain_id, hash: meta.header.hash().to_string(), height: meta.header.height.value() as i32, @@ -199,7 +212,7 @@ async fn batch_sync( let data = serde_json::to_value(&tx).unwrap().replace_escape_chars(); events.extend(tx.tx_result.events.into_iter().enumerate().map( |(i, event)| { - let event = postgres::Event { + let event = PgEvent { chain_id, block_hash: block_hash.clone(), block_height, @@ -215,7 +228,7 @@ async fn batch_sync( event }, )); - postgres::Transaction { + PgTransaction { chain_id, block_hash: block_hash.clone(), block_height, @@ -226,12 +239,15 @@ async fn batch_sync( } }) .collect::>(); - events.extend(finalize_block_events.into_iter().enumerate().map(|(i, e)| { - postgres::Event { - block_index: i as i32 + block_index, - ..e - } - })); + events.extend( + finalize_block_events + .into_iter() + .enumerate() + .map(|(i, e)| PgEvent { + block_index: i as i32 + block_index, + ..e + }), + ); txs }); postgres::insert_batch_transactions(tx, stream::iter(transactions)).await?; @@ -270,7 +286,7 @@ async fn sync_next( } } Ok(block) => ( - postgres::Block { + PgBlock { chain_id, hash: header.hash().to_string(), height: block_height.value().try_into().unwrap(), @@ -300,7 +316,7 @@ async fn sync_next( .iter() .enumerate() .for_each(|(i, event)| { - events.push(postgres::Event { + events.push(PgEvent { chain_id, block_hash: header.hash().to_string(), block_height: block_height.value().try_into().unwrap(), @@ -312,7 +328,7 @@ async fn sync_next( }); block_index += 1; }); - postgres::Transaction { + PgTransaction { chain_id, block_hash: header.hash().to_string(), block_height: block_height.value().try_into().unwrap(), @@ -330,7 +346,7 @@ async fn sync_next( .into_iter() .chain(finalize_events) .enumerate() - .map(|(i, e)| postgres::Event { + .map(|(i, e)| PgEvent { block_index: i as i32, ..e }); @@ -392,27 +408,17 @@ async fn fetch_transactions_for_block( pub trait BlockExt { /// Returns the non-tx related events from a block formatted for insertion. - fn events( - self, - chain_id: ChainId, - block_hash: String, - time: OffsetDateTime, - ) -> Vec; + fn events(self, chain_id: ChainId, block_hash: String, time: OffsetDateTime) -> Vec; } impl BlockExt for BlockResponse { - fn events( - self, - chain_id: ChainId, - block_hash: String, - time: OffsetDateTime, - ) -> Vec { + fn events(self, chain_id: ChainId, block_hash: String, time: OffsetDateTime) -> Vec { let block_height: i32 = self.height.value().try_into().unwrap(); let begin_block_events = self .begin_block_events .unwrap_or_default() .into_iter() - .map(|e| postgres::Event { + .map(|e| PgEvent { chain_id, block_hash: block_hash.clone(), block_height, @@ -422,7 +428,7 @@ impl BlockExt for BlockResponse { transaction_index: None, block_index: 0, }); - let end_block_events = self.end_block_events.into_iter().map(|e| postgres::Event { + let end_block_events = self.end_block_events.into_iter().map(|e| PgEvent { chain_id, block_hash: block_hash.clone(), block_height, @@ -432,20 +438,17 @@ impl BlockExt for BlockResponse { transaction_index: None, block_index: 0, }); - let finalize_block_events = - self.finalize_block_events - .into_iter() - .map(|e| postgres::Event { - chain_id, - block_hash: block_hash.clone(), - block_height, - time, - data: serde_json::to_value(e).unwrap().replace_escape_chars(), - transaction_hash: None, - transaction_index: None, - block_index: 0, - }); - let validator_updates = self.validator_updates.into_iter().map(|e| postgres::Event { + let finalize_block_events = self.finalize_block_events.into_iter().map(|e| PgEvent { + chain_id, + block_hash: block_hash.clone(), + block_height, + time, + data: serde_json::to_value(e).unwrap().replace_escape_chars(), + transaction_hash: None, + transaction_index: None, + block_index: 0, + }); + let validator_updates = self.validator_updates.into_iter().map(|e| PgEvent { chain_id, block_hash: block_hash.clone(), block_height, @@ -457,21 +460,18 @@ impl BlockExt for BlockResponse { transaction_index: None, block_index: 0, }); - let consensus_param_updates = - self.consensus_param_updates - .into_iter() - .map(|e| postgres::Event { - chain_id, - block_hash: block_hash.clone(), - block_height, - time, - data: serde_json::to_value(WithType::consensus_param_update(e)) - .unwrap() - .replace_escape_chars(), - transaction_hash: None, - transaction_index: None, - block_index: 0, - }); + let consensus_param_updates = self.consensus_param_updates.into_iter().map(|e| PgEvent { + chain_id, + block_hash: block_hash.clone(), + block_height, + time, + data: serde_json::to_value(WithType::consensus_param_update(e)) + .unwrap() + .replace_escape_chars(), + transaction_hash: None, + transaction_index: None, + block_index: 0, + }); begin_block_events .chain(end_block_events)