Skip to content

Commit

Permalink
feat(hubble): make ChainIds optionally 'static
Browse files Browse the repository at this point in the history
  • Loading branch information
KaiserKarel committed Apr 4, 2024
1 parent 4869580 commit 74ed1fe
Show file tree
Hide file tree
Showing 3 changed files with 180 additions and 113 deletions.
21 changes: 16 additions & 5 deletions hubble/src/eth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,20 @@ pub struct Config {
pub url: Url,
}

/// Unit struct describing parametrization of associated types for Evm based chains.
pub struct Evm;

impl postgres::ChainType for Evm {
type BlockHash = String;
type BlockHeight = i32;
type TransactionHash = String;
}

pub type PgLog<T> = postgres::Log<Evm, T>;

pub struct Indexer {
range: Range<u64>,
chain_id: postgres::ChainId,
chain_id: ChainId,
tasks: tokio::task::JoinSet<Result<(), Report>>,
pool: PgPool,
provider: Provider<Http>,
Expand Down Expand Up @@ -163,7 +174,7 @@ async fn index_blocks(
Ok(())
}

/// A worker routine which continuously re-indexes the last 20 blocks into `postgres::Logs`.
/// A worker routine which continuously re-indexes the last 20 blocks into `PgLogs`.
async fn reindex_blocks(
pool: PgPool,
chain_id: ChainId,
Expand All @@ -189,7 +200,7 @@ async fn reindex_blocks(
}));
inserts
.try_fold(tx, |mut tx, (_, block)| async move {
let log = postgres::Log {
let log = PgLog {
chain_id: block.chain_id,
block_hash: block.hash.clone(),
height: block.height,
Expand Down Expand Up @@ -307,7 +318,7 @@ impl BlockInsert {
let mut receipts = provider
.get_block_receipts(height)
.await
.map_err(|err| FromProviderError::DataNotFound(err))?;
.map_err(FromProviderError::DataNotFound)?;

let result: Result<Self, Report> = try {
let ts = block.time().unwrap_or_default().timestamp();
Expand Down Expand Up @@ -366,7 +377,7 @@ impl BlockInsert {
.map(|tx| tx.events.len() as i32)
.sum();

let log = postgres::Log {
let log = PgLog {
chain_id: self.chain_id,
block_hash: self.hash.clone(),
height: self.height,
Expand Down
160 changes: 108 additions & 52 deletions hubble/src/postgres.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use core::fmt::Debug;

use futures::{Stream, StreamExt, TryStreamExt};
use serde::Serialize;
use sqlx::{Acquire, Postgres, QueryBuilder};
Expand All @@ -7,64 +9,92 @@ use valuable::Valuable;

pub const BIND_LIMIT: usize = 65535;

pub struct Block {
/// A trait to describe the different parameters of a chain, used to instantiate types for insertion.
pub trait ChainType {
type BlockHeight;
type BlockHash;
type TransactionHash;
}

/// DTO corresponding to the v0.blocks table.
pub struct Block<Chain: ChainType> {
pub chain_id: ChainId,
pub hash: String,
pub height: i32,
pub hash: Chain::BlockHash,
pub height: Chain::BlockHeight,
pub time: OffsetDateTime,
pub data: serde_json::Value,
}

pub struct Transaction {
/// DTO corresponding to the v0.transactions table.
pub struct Transaction<Chain: ChainType> {
pub chain_id: ChainId,
pub block_hash: String,
pub block_height: i32,
pub block_hash: Chain::BlockHash,
pub block_height: Chain::BlockHeight,
pub time: OffsetDateTime,
pub data: serde_json::Value,
pub hash: String,
pub hash: Chain::TransactionHash,
pub index: i32,
}

pub struct Event {
/// DTO corresponding to the v0.events table.
pub struct Event<Chain: ChainType> {
pub chain_id: ChainId,
pub block_hash: String,
pub block_height: i32,
pub block_hash: Chain::BlockHash,
pub block_height: Chain::BlockHeight,
pub time: OffsetDateTime,
pub data: serde_json::Value,
pub transaction_hash: Option<String>,
pub transaction_hash: Option<Chain::TransactionHash>,
pub transaction_index: Option<i32>,
pub block_index: i32,
}

/// DTO corresponding to the v0.logs table. Note that `logs` are considered opaque, unprocessed
/// chunks of data depending on the chain type. For example, for Ethereum, a log is a header + transaction receipts.
pub struct Log<Chain: ChainType, T> {
pub chain_id: ChainId,
pub block_hash: Chain::BlockHash,
pub height: Chain::BlockHeight,
pub time: OffsetDateTime,
pub data: T,
}

/// ChainIds track both the database ID of a chain, as well as some canonical representation for
/// degug logging
///
/// # Implementation Detail
/// ChainIds contain leaked values, hence care should be taken when creating them.
///
/// We do not track too many chains in hubble, hence leaking the canonical
/// chain-id makes the code more efficient and easier to pass IDs around as `Copy`.
pub type ChainId = ChainIdInner<'static>;

/// The internal representation of a chain-id, assigned by the database, combined
/// with the canonical chain-id (from the genesis).
#[derive(Copy, Clone, Debug, Valuable)]
pub struct ChainId {
#[derive(Clone, Debug, Valuable)]
pub struct ChainIdInner<'a> {
pub db: i32,
// We do not track too many chains in hubble, hence leaking the canonical
// chain-id makes the code more efficient and easier to pass IDs around as `Copy`.
pub canonical: &'static str,
pub canonical: &'a str,
}

impl ChainId {
pub fn new_leaked(db: i32, canonical: String) -> Self {
let canonical = canonical.leak();
/// Inside of Hubble, we leak the ChainId.canonical to make ChainIds easily copyable.
impl Copy for ChainIdInner<'static> {}

impl<'a> ChainIdInner<'a> {
pub fn new(db: i32, canonical: &'a str) -> Self {
Self { db, canonical }
}
}

pub struct Log<T> {
pub chain_id: ChainId,
pub block_hash: String,
pub height: i32,
pub time: OffsetDateTime,
pub data: T,
}

pub async fn insert_batch_logs<T: Serialize, B: Stream<Item = Log<T>>>(
pub async fn insert_batch_logs<C: ChainType, T: Serialize, B: Stream<Item = Log<C, T>>>(
tx: &mut sqlx::Transaction<'_, Postgres>,
logs: B,
) -> sqlx::Result<()> {
) -> sqlx::Result<()>
where
<C as ChainType>::BlockHeight:
for<'q> sqlx::Encode<'q, Postgres> + Send + sqlx::Type<Postgres> + Send + Debug,
<C as ChainType>::BlockHash:
for<'q> sqlx::Encode<'q, Postgres> + Send + sqlx::Type<Postgres> + Send + Debug + Clone,
{
logs.chunks(BIND_LIMIT / 5)
.map(Ok::<_, sqlx::Error>)
.try_fold(tx.as_mut(), |tx, chunk| async {
Expand All @@ -77,8 +107,8 @@ pub async fn insert_batch_logs<T: Serialize, B: Stream<Item = Log<T>>>(
logs_query_builder.push_values(chunk.into_iter(), |mut b, log| {
debug!(
chain_id = log.chain_id.canonical,
height = log.height,
block_hash = log.block_hash,
height = ?log.height,
block_hash = ?log.block_hash,
"batch inserting log"
);
b.push_bind(log.chain_id.db)
Expand All @@ -99,10 +129,14 @@ pub async fn insert_batch_logs<T: Serialize, B: Stream<Item = Log<T>>>(
Ok(())
}

pub async fn upsert_log<T: Serialize>(
pub async fn upsert_log<C: ChainType, T: Serialize>(
tx: &mut sqlx::Transaction<'_, Postgres>,
log: Log<T>,
) -> sqlx::Result<()> {
log: Log<C, T>,
) -> sqlx::Result<()>
where
<C as ChainType>::BlockHeight: Into<i32>,
<C as ChainType>::BlockHash: AsRef<str>,
{
sqlx::query!(
"
INSERT INTO v0.logs (chain_id, block_hash, data, height, time)
Expand All @@ -112,20 +146,26 @@ pub async fn upsert_log<T: Serialize>(
SET chain_id = $1, block_hash = $2, data = $3, height = $4, time = $5
",
log.chain_id.db,
log.block_hash,
log.block_hash.as_ref(),
serde_json::to_value(&log.data).unwrap(),
log.height,
log.height.into(),
log.time
)
.execute(tx.as_mut())
.await?;
Ok(())
}

pub async fn insert_batch_blocks<B: Stream<Item = Block>>(
pub async fn insert_batch_blocks<C: ChainType, B: Stream<Item = Block<C>>>(
tx: &mut sqlx::Transaction<'_, Postgres>,
blocks: B,
) -> sqlx::Result<()> {
) -> sqlx::Result<()>
where
<C as ChainType>::BlockHeight:
for<'q> sqlx::Encode<'q, Postgres> + Send + sqlx::Type<Postgres> + Debug,
<C as ChainType>::BlockHash:
for<'q> sqlx::Encode<'q, Postgres> + Send + sqlx::Type<Postgres> + Debug + Clone,
{
blocks
.chunks(BIND_LIMIT / 5)
.map(Ok::<_, sqlx::Error>)
Expand All @@ -139,8 +179,8 @@ pub async fn insert_batch_blocks<B: Stream<Item = Block>>(
blocks_query_builder.push_values(chunk.into_iter(), |mut b, block| {
debug!(
chain_id = block.chain_id.canonical,
height = block.height,
block_hash = block.hash,
height = ?block.height,
block_hash = ?block.hash,
"batch inserting block"
);
b.push_bind(block.chain_id.db)
Expand All @@ -161,10 +201,18 @@ pub async fn insert_batch_blocks<B: Stream<Item = Block>>(
Ok(())
}

pub async fn insert_batch_transactions<B: Stream<Item = Transaction>>(
pub async fn insert_batch_transactions<C: ChainType, B: Stream<Item = Transaction<C>>>(
tx: &mut sqlx::Transaction<'_, Postgres>,
transactions: B,
) -> sqlx::Result<()> {
) -> sqlx::Result<()>
where
<C as ChainType>::BlockHeight:
for<'q> sqlx::Encode<'q, Postgres> + Send + sqlx::Type<Postgres> + Debug,
<C as ChainType>::BlockHash:
for<'q> sqlx::Encode<'q, Postgres> + Send + sqlx::Type<Postgres> + Debug + Clone,
<C as ChainType>::TransactionHash:
for<'q> sqlx::Encode<'q, Postgres> + Send + sqlx::Type<Postgres> + Debug + Clone,
{
// We insert all transactions in batched statements without their logs first.
transactions
.chunks(BIND_LIMIT / 6)
Expand All @@ -179,9 +227,9 @@ pub async fn insert_batch_transactions<B: Stream<Item = Transaction>>(
tx_query_builder.push_values(chunk.into_iter(), |mut b, transaction| {
debug!(
chain_id = transaction.chain_id.canonical,
height = transaction.block_height,
block_hash = transaction.block_hash,
transaction_hash = &transaction.hash,
height = ?transaction.block_height,
block_hash = ?transaction.block_hash,
transaction_hash = ?&transaction.hash,
transaction_index = transaction.index,
"batch inserting transaction"
);
Expand All @@ -205,10 +253,18 @@ pub async fn insert_batch_transactions<B: Stream<Item = Transaction>>(
Ok(())
}

pub async fn insert_batch_events<B: Stream<Item = Event>>(
pub async fn insert_batch_events<C: ChainType, B: Stream<Item = Event<C>>>(
tx: &mut sqlx::Transaction<'_, Postgres>,
events: B,
) -> sqlx::Result<()> {
) -> sqlx::Result<()>
where
<C as ChainType>::BlockHeight:
for<'q> sqlx::Encode<'q, Postgres> + Send + sqlx::Type<Postgres> + Debug,
<C as ChainType>::BlockHash:
for<'q> sqlx::Encode<'q, Postgres> + Send + sqlx::Type<Postgres> + Debug + Clone,
<C as ChainType>::TransactionHash:
for<'q> sqlx::Encode<'q, Postgres> + Send + sqlx::Type<Postgres> + Debug + Clone,
{
events
.chunks(BIND_LIMIT / 8)
.map(Ok::<_, sqlx::Error>)
Expand All @@ -222,9 +278,9 @@ pub async fn insert_batch_events<B: Stream<Item = Event>>(
event_query_builder.push_values(chunk.into_iter(), |mut b, event| {
debug!(
chain_id = event.chain_id.canonical,
height = event.block_height,
block_hash = event.block_hash,
transaction_hash = &event.transaction_hash,
height = ?event.block_height,
block_hash = ?event.block_hash,
transaction_hash = ?&event.transaction_hash,
transaction_index = event.transaction_index,
index = event.block_index,
"batch inserting event"
Expand Down Expand Up @@ -281,7 +337,7 @@ pub async fn fetch_or_insert_chain_id<'a, A: Acquire<'a, Database = Postgres>>(
.fetch_optional(&mut *conn)
.await?
{
Fetched(ChainId::new_leaked(chain_id.id, canonical))
Fetched(ChainId::new(chain_id.id, canonical.leak()))
} else {
let id = sqlx::query!(
"INSERT INTO \"v0\".chains (chain_id) VALUES ($1) RETURNING id",
Expand All @@ -290,7 +346,7 @@ pub async fn fetch_or_insert_chain_id<'a, A: Acquire<'a, Database = Postgres>>(
.fetch_one(&mut *conn)
.await?
.id;
Created(ChainId::new_leaked(id, canonical))
Created(ChainId::new(id, canonical.leak()))
};
Ok(db_chain_id)
}
Loading

0 comments on commit 74ed1fe

Please sign in to comment.