From 21f0de742baeeff5f828b11a02bf5d4193dc1e37 Mon Sep 17 00:00:00 2001 From: Felipe Rosa Date: Tue, 23 Apr 2024 18:04:58 -0300 Subject: [PATCH] chore: refactor event-db queries to do batch inserts and updates --- catalyst-gateway/bin/src/cardano/mod.rs | 467 +++++++++++++----- .../src/event_db/cardano/chain_state/mod.rs | 51 ++ .../cardano/cip36_registration/mod.rs | 74 +++ .../bin/src/event_db/cardano/utxo/mod.rs | 155 ++++++ catalyst-gateway/bin/src/main.rs | 2 + 5 files changed, 624 insertions(+), 125 deletions(-) diff --git a/catalyst-gateway/bin/src/cardano/mod.rs b/catalyst-gateway/bin/src/cardano/mod.rs index 1765122656e..fc9fbcc544f 100644 --- a/catalyst-gateway/bin/src/cardano/mod.rs +++ b/catalyst-gateway/bin/src/cardano/mod.rs @@ -1,5 +1,5 @@ //! Logic for orchestrating followers -use std::{path::PathBuf, sync::Arc}; +use std::{path::PathBuf, sync::Arc, time::Duration}; /// Handler for follower tasks, allows for control over spawned follower threads pub type ManageTasks = JoinHandle<()>; @@ -7,10 +7,15 @@ pub type ManageTasks = JoinHandle<()>; use cardano_chain_follower::{ network_genesis_values, ChainUpdate, Follower, FollowerConfigBuilder, Network, Point, }; -use pallas::ledger::traverse::{wellknown::GenesisValues, MultiEraBlock}; +use pallas::ledger::{ + addresses::Address, + traverse::{wellknown::GenesisValues, MultiEraBlock}, +}; use tokio::{task::JoinHandle, time}; use tracing::{debug, error, info}; +use self::util::parse_policy_assets; +use crate::cardano::cip36_registration::{Cip36Metadata, VotingInfo}; use crate::{ cardano::util::valid_era, event_db::{ @@ -130,159 +135,371 @@ async fn process_blocks( follower: &mut Follower, db: Arc, network: Network, machine_id: MachineId, genesis_values: &GenesisValues, ) { - loop { - let chain_update = match follower.next().await { - Ok(chain_update) => chain_update, - Err(err) => { - error!( - "Unable to receive next update from the {network:?} follower err: {err} - skip..", - ); - continue; - }, - }; + info!("Follower started processing blocks"); - match chain_update { - ChainUpdate::Block(data) => { - let block = match data.decode() { - Ok(block) => block, - Err(err) => { - error!("Unable to decode {network:?} block {err} - skip..",); - continue; + let mut blocks_buffer = Vec::new(); + + let mut ticker = tokio::time::interval(Duration::from_secs(1)); + ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); + + loop { + tokio::select! { + // Safe because follower.next is cancelation-safe. + result = follower.next() => match result { + Ok(chain_update) => match chain_update { + ChainUpdate::Block(data) => { + blocks_buffer.push(data); + + // We have enough blocks to index + if blocks_buffer.len() >= 1024 { + let current_buffer = std::mem::take(&mut blocks_buffer); + index_block_buffer(db.clone(), genesis_values, network, &machine_id, current_buffer).await; + + // Since we just wrote stuff to the database, + // reset the ticker so the next write is at least 1 interval from now. + ticker.reset(); + } }, - }; - let start_index_block = time::Instant::now(); - index_block(db.clone(), genesis_values, network, &machine_id, &block).await; - debug!( - "{network:?} block {} indexing time: {}ns. txs amount: {}", - block.hash().to_string(), - start_index_block.elapsed().as_nanos(), - block.txs().len() - ); - }, - ChainUpdate::Rollback(data) => { - let block = match data.decode() { - Ok(block) => block, - Err(err) => { - error!("Unable to decode {network:?} block {err} - skip.."); - continue; + ChainUpdate::Rollback(data) => { + let block = match data.decode() { + Ok(block) => block, + Err(err) => { + error!("Unable to decode {network:?} block {err} - skip.."); + continue; + }, + }; + + info!( + "Rollback block NUMBER={} SLOT={} HASH={}", + block.number(), + block.slot(), + hex::encode(block.hash()), + ); }, - }; + }, + Err(err) => { + error!( + "Unable to receive next update from the {network:?} follower err: {err} - skip..", + ); + continue; + }, + }, + _ = ticker.tick() => { + /// This executes when we have not indexed blocks for more than the configured + /// tick interval. This means that if any errors occur in that time we lose the buffered block data (e.g. + /// cat-gateway is shutdown ungracefully). This is not a problem since cat-gateway + /// checkpoints the latest database writes so it simply restarts from the last + /// written block. + /// + /// This is most likely to happen when following from the tip or receiving blocks + /// from the network (since updates will come at larger intervals). + if blocks_buffer.is_empty() { + continue; + } - info!( - "Rollback block NUMBER={} SLOT={} HASH={}", - block.number(), - block.slot(), - hex::encode(block.hash()), - ); + let current_buffer = std::mem::take(&mut blocks_buffer); + index_block_buffer(db.clone(), genesis_values, network, &machine_id, current_buffer).await; + + // Reset the ticker so it counts the interval as starting after we wrote everything + // to the database. + ticker.reset(); + } + } + } +} + +/// +async fn index_block_buffer( + db: Arc, genesis_values: &GenesisValues, network: Network, machine_id: &MachineId, + buffer: Vec, +) { + info!("Starting data indexing"); + + let mut blocks = Vec::new(); + + for block_data in &buffer { + match block_data.decode() { + Ok(block) => blocks.push(block), + Err(e) => { + error!(error = ?e, "Failed to decode block"); }, } } + + index_many_blocks(db.clone(), genesis_values, network, machine_id, &blocks).await; } -/// Index block data, store it in our db -async fn index_block( +/// +#[allow(clippy::too_many_lines)] +async fn index_many_blocks( db: Arc, genesis_values: &GenesisValues, network: Network, machine_id: &MachineId, - block: &MultiEraBlock<'_>, + blocks: &[MultiEraBlock<'_>], ) { - // Parse block - let epoch = match block.epoch(genesis_values).0.try_into() { - Ok(epoch) => epoch, - Err(err) => { - error!("Cannot parse epoch from {network:?} block {err} - skip.."); - return; - }, + let Some(last_block) = blocks.last() else { + return; }; - let wallclock = match block.wallclock(genesis_values).try_into() { - Ok(time) => chrono::DateTime::from_timestamp(time, 0).unwrap_or_default(), - Err(err) => { - error!("Cannot parse wall time from {network:?} block {err} - skip.."); - return; - }, - }; + // Index blocks data + { + let mut values = Vec::new(); - let slot = match block.slot().try_into() { - Ok(slot) => slot, - Err(err) => { - error!("Cannot parse slot from {network:?} block {err} - skip.."); - return; - }, - }; + for block in blocks { + let epoch = match block.epoch(genesis_values).0.try_into() { + Ok(epoch) => epoch, + Err(err) => { + error!("Cannot parse epoch from {network:?} block {err} - skip.."); + return; + }, + }; - let start_index_follower_data = time::Instant::now(); - match db - .index_follower_data(slot, network, epoch, wallclock, block.hash().to_vec()) - .await + let wallclock = match block.wallclock(genesis_values).try_into() { + Ok(time) => chrono::DateTime::from_timestamp(time, 0).unwrap_or_default(), + Err(err) => { + error!("Cannot parse wall time from {network:?} block {err} - skip.."); + return; + }, + }; + + let slot = match block.slot().try_into() { + Ok(slot) => slot, + Err(err) => { + error!("Cannot parse slot from {network:?} block {err} - skip.."); + return; + }, + }; + + values.push(( + slot, + network.to_string(), + epoch, + wallclock, + block.hash().to_vec(), + )); + } + + match db.index_many_follower_data(&values).await { + Ok(()) => { + info!(count = values.len(), "Finished indexing block data"); + }, + Err(e) => { + error!(error = ?e, "Failed to write DB entries"); + return; + }, + } + } + + let blocks_txs: Vec<_> = blocks + .iter() + .flat_map(|b| b.txs().into_iter().map(|tx| (b.slot(), tx))) + .collect(); + + // Index transaction data { - Ok(()) => (), - Err(err) => { - error!("Unable to index {network:?} follower data {err} - skip.."); - return; - }, + let mut values = Vec::new(); + + for (slot, tx) in &blocks_txs { + // SAFETY: This is safe to ignore because we would not reach this point if + // the try_into inside the block indexing loop had failed. + #[allow(clippy::cast_possible_wrap)] + values.push((tx.hash().to_vec(), *slot as i64, network.to_string())); + } + + match db.index_many_txn_data(&values).await { + Ok(()) => info!(count = values.len(), "Finished indexing transactions"), + Err(e) => { + error!(error = ?e, "Failed to index transactions"); + return; + }, + } } - debug!( - "{network:?} follower data indexing time: {}ns", - start_index_follower_data.elapsed().as_nanos() - ); - - for tx in block.txs() { - // index tx - let start_index_txn_data = time::Instant::now(); - match db.index_txn_data(tx.hash().as_slice(), slot, network).await { - Ok(()) => (), - Err(err) => { - error!("Unable to index {network:?} txn data {err} - skip.."); - continue; + + // Index transaction output data + { + let mut values = Vec::new(); + + for (_, tx) in &blocks_txs { + for (tx_out, index) in tx.outputs().into_iter().zip(0..) { + let assets = + match serde_json::to_value(parse_policy_assets(&tx_out.non_ada_assets())) { + Ok(assets) => assets, + Err(e) => { + error!(error = ?e, "Failed to parse tx output policy assets"); + return; + }, + }; + + let stake_address = match tx_out.address() { + Ok(Address::Shelley(address)) => address.try_into().ok(), + Ok(Address::Stake(stake_address)) => Some(stake_address), + Ok(Address::Byron(_)) => None, + Err(e) => { + error!(error = ?e, "Failed to parse tx output stake address"); + return; + }, + }; + let stake_credential = stake_address.map(|val| val.payload().as_hash().to_vec()); + + let lovelace_amount = match tx_out.lovelace_amount().try_into() { + Ok(amount) => amount, + Err(e) => { + error!(error = ?e, "Failed to parse tx output lovelace amount"); + return; + }, + }; + + values.push(( + tx.hash().to_vec(), + index, + assets, + stake_credential, + lovelace_amount, + )); + } + } + + match db.index_many_txn_output_data(&values).await { + Ok(()) => { + info!( + count = values.len(), + "Finished indexing transaction outputs data" + ); + }, + Err(e) => { + error!(error = ?e, "Failed to index transaction outputs data"); + return; }, } - debug!( - "{network:?} tx {} data indexing time: {}ns", - tx.hash().to_string(), - start_index_txn_data.elapsed().as_nanos() - ); - - // index utxo - let start_index_utxo_data = time::Instant::now(); - match db.index_utxo_data(&tx).await { - Ok(()) => (), - Err(err) => { - error!("Unable to index {network:?} utxo data for tx {err} - skip.."); - continue; + } + + // Index transaction input data + { + let mut values = Vec::new(); + + for (_, tx) in &blocks_txs { + for tx_in in tx.inputs() { + let output_index = match tx_in.output_ref().index().try_into() { + Ok(index) => index, + Err(e) => { + error!(error = ?e, "Failed to parse transaction input output ref index"); + return; + }, + }; + + values.push(( + tx_in.hash().to_vec(), + tx_in.output_ref().hash().to_vec(), + output_index, + )); + } + } + + match db.index_many_txn_input_data(&values).await { + Ok(()) => { + info!( + count = values.len(), + "Finished indexing transaction inputs data" + ); + }, + Err(e) => { + error!(error = ?e, "Failed to index transaction inputs data"); + return; }, } - debug!( - "{network:?} tx {} utxo data indexing time: {}ns", - tx.hash().to_string(), - start_index_utxo_data.elapsed().as_nanos() - ); - - // Block processing for Eras before staking are ignored. - if valid_era(block.era()) { - // index catalyst registrations - let start_index_registration_data = time::Instant::now(); - match db.index_registration_data(&tx, network).await { - Ok(()) => (), - Err(err) => { - error!("Unable to index {network:?} registration data for tx {err} - skip..",); + } + + // Index voter registrations + { + let mut values = Vec::new(); + + for block in blocks { + if !valid_era(block.era()) { + continue; + } + + for tx in block.txs() { + if tx.metadata().is_empty() { continue; - }, + } + + let Some(cip36_metadata) = + Cip36Metadata::generate_from_tx_metadata(&tx.metadata(), network) + else { + continue; + }; + + let (stake_credential, voting_info, rewards_address, nonce) = + if let Some(reg) = cip36_metadata.registration { + ( + Some(reg.stake_key.get_credentials().to_vec()), + Some(reg.voting_info), + Some(reg.rewards_address.0), + Some(reg.nonce.0), + ) + } else { + (None, None, None, None) + }; + + let encoded_voting_key = if let Some(voting_info) = voting_info.as_ref() { + let Ok(enc) = serde_json::to_string(voting_info) else { + continue; + }; + + Some(enc.into_bytes()) + } else { + None + }; + + let multiple_delegations = voting_info.as_ref().is_some_and(|vi| { + if let VotingInfo::Delegated(delegations) = vi { + delegations.len() > 1 + } else { + false + } + }); + + let is_valid = !multiple_delegations + && stake_credential.is_some() + && encoded_voting_key.is_some() + && rewards_address.is_some() + && cip36_metadata.raw_metadata.is_some() + && nonce.is_some() + && cip36_metadata.errors_report.is_empty(); + + values.push(( + tx.hash().to_vec(), + stake_credential, + encoded_voting_key, + rewards_address, + nonce, + cip36_metadata.raw_metadata, + serde_json::json!(cip36_metadata.errors_report), + is_valid, + )); } - debug!( - "{network:?} tx {} registration data indexing time: {}ns", - tx.hash().to_string(), - start_index_registration_data.elapsed().as_nanos() - ); + } - // Rewards + match db.index_many_voter_registration_data(&values).await { + Ok(()) => { + info!( + count = values.len(), + "Finished indexing voter registrations data" + ); + }, + Err(e) => { + error!(error = ?e, "Failed to index voter registrations data"); + return; + }, } } - // Refresh update metadata for future followers + // SAFETY: This is safe to ignore because we would not reach this point if + // the try_into inside the block indexing loop had failed. + #[allow(clippy::cast_possible_wrap)] match db .refresh_last_updated( chrono::offset::Utc::now(), - slot, - block.hash().to_vec(), + last_block.slot() as i64, + last_block.hash().to_vec(), network, machine_id, ) diff --git a/catalyst-gateway/bin/src/event_db/cardano/chain_state/mod.rs b/catalyst-gateway/bin/src/event_db/cardano/chain_state/mod.rs index 08962489e66..1edb5fa3fb6 100644 --- a/catalyst-gateway/bin/src/event_db/cardano/chain_state/mod.rs +++ b/catalyst-gateway/bin/src/event_db/cardano/chain_state/mod.rs @@ -86,6 +86,57 @@ impl SlotInfoQueryType { } impl EventDB { + /// + pub(crate) async fn index_many_follower_data( + &self, values: &[(SlotNumber, String, EpochNumber, DateTime, Vec)], + ) -> anyhow::Result<()> { + if values.is_empty() { + return Ok(()); + } + + let conn = self.pool.get().await?; + + let chunk_size = u16::MAX / 5; + for chunk in values.chunks(chunk_size.into()) { + // Build query VALUES statements + let mut values_strs = Vec::with_capacity(chunk.len()); + let mut i = 1; + + for v in chunk { + values_strs.push(format!( + "(${},${},${},${},${})", + i, + i + 1, + i + 2, + i + 3, + i + 4, + )); + + i += 5; + } + + let query = format!("INSERT INTO cardano_slot_index (slot_no, network, epoch_no, block_time, block_hash) VALUES {} ON CONFLICT DO NOTHING", values_strs.join(",")); + + #[allow(trivial_casts)] + let params: Vec<_> = chunk + .iter() + .flat_map(|vs| { + [ + &vs.0 as &(dyn tokio_postgres::types::ToSql + Sync), + &vs.1, + &vs.2, + &vs.3, + &vs.4, + ] + }) + .collect(); + + conn.query(&query, ¶ms).await?; + } + + Ok(()) + } + /// Index follower block stream pub(crate) async fn index_follower_data( &self, slot_no: SlotNumber, network: Network, epoch_no: EpochNumber, block_time: DateTime, diff --git a/catalyst-gateway/bin/src/event_db/cardano/cip36_registration/mod.rs b/catalyst-gateway/bin/src/event_db/cardano/cip36_registration/mod.rs index 0f3cc548021..61d797b71ba 100644 --- a/catalyst-gateway/bin/src/event_db/cardano/cip36_registration/mod.rs +++ b/catalyst-gateway/bin/src/event_db/cardano/cip36_registration/mod.rs @@ -38,7 +38,81 @@ const INSERT_VOTER_REGISTRATION_SQL: &str = include_str!("insert_cip36_registrat /// `select_voter_registration.sql` const SELECT_VOTER_REGISTRATION_SQL: &str = include_str!("select_cip36_registration.sql"); +/// +pub(crate) type InsertManyVoterRegistrationParams = ( + TxId, + Option, + Option>, + Option, + Option, + Option, + serde_json::Value, + bool, +); + impl EventDB { + /// + pub(crate) async fn index_many_voter_registration_data( + &self, values: &[InsertManyVoterRegistrationParams], + ) -> anyhow::Result<()> { + if values.is_empty() { + return Ok(()); + } + + let conn = self.pool.get().await?; + + let chunk_size = u16::MAX / 8; + for chunk in values.chunks(chunk_size.into()) { + // Build query VALUES statements + let mut values_strs = Vec::with_capacity(chunk.len()); + let mut i = 1; + + for v in chunk { + values_strs.push(format!( + "(${},${},${},${},${},${},${},${})", + i, + i + 1, + i + 2, + i + 3, + i + 4, + i + 5, + i + 6, + i + 7, + )); + + i += 8; + } + + let query = format!( + r#"INSERT INTO cardano_voter_registration (tx_id, stake_credential, public_voting_key, payment_address, nonce, metadata_cip36, stats, valid) VALUES {} + ON CONFLICT (tx_id) DO UPDATE SET stake_credential = EXCLUDED.stake_credential, public_voting_key = EXCLUDED.public_voting_key, payment_address = EXCLUDED.payment_address, + nonce = EXCLUDED.nonce, metadata_cip36 = EXCLUDED.metadata_cip36, stats = EXCLUDED.stats, valid = EXCLUDED.valid"#, + values_strs.join(",") + ); + + #[allow(trivial_casts)] + let params: Vec<_> = chunk + .iter() + .flat_map(|vs| { + [ + &vs.0 as &(dyn tokio_postgres::types::ToSql + Sync), + &vs.1, + &vs.2, + &vs.3, + &vs.4, + &vs.5, + &vs.6, + &vs.7, + ] + }) + .collect(); + + conn.query(&query, ¶ms).await?; + } + + Ok(()) + } + /// Inserts voter registration data, replacing any existing data. #[allow(clippy::too_many_arguments)] async fn insert_voter_registration( diff --git a/catalyst-gateway/bin/src/event_db/cardano/utxo/mod.rs b/catalyst-gateway/bin/src/event_db/cardano/utxo/mod.rs index 54284a1d20c..5e29d51c441 100644 --- a/catalyst-gateway/bin/src/event_db/cardano/utxo/mod.rs +++ b/catalyst-gateway/bin/src/event_db/cardano/utxo/mod.rs @@ -21,7 +21,116 @@ const SELECT_TOTAL_UTXO_AMOUNT_SQL: &str = include_str!("select_total_utxo_amoun /// `update_utxo.sql` const UPDATE_UTXO_SQL: &str = include_str!("update_utxo.sql"); +/// +pub(crate) type IndexManyTxnOutputParams = (Vec, i32, serde_json::Value, Option>, i64); + impl EventDB { + /// + #[allow(clippy::unused_async)] + pub(crate) async fn index_many_txn_output_data( + &self, values: &[IndexManyTxnOutputParams], + ) -> anyhow::Result<()> { + if values.is_empty() { + return Ok(()); + } + + let conn = self.pool.get().await?; + + // Queries are divided into chunks because + // Postgres has a limit of u16::MAX parameters a query can have. + let chunk_size = u16::MAX / 5; + for chunk in values.chunks(chunk_size.into()) { + let mut values_strs = Vec::with_capacity(chunk.len()); + let mut i = 1; + + for v in chunk { + values_strs.push(format!( + "(${},${},${},${},${})", + i, + i + 1, + i + 2, + i + 3, + i + 4 + )); + i += 5; + } + + let query = format!( + "INSERT INTO cardano_utxo (tx_id, index, asset, stake_credential, value) VALUES {} ON CONFLICT (index, tx_id) DO NOTHING", + values_strs.join(",") + ); + + #[allow(trivial_casts)] + let params: Vec<_> = chunk + .iter() + .flat_map(|vs| { + [ + &vs.0 as &(dyn tokio_postgres::types::ToSql + Sync), + &vs.1, + &vs.2, + &vs.3, + &vs.4, + ] + }) + .collect(); + + conn.query(&query, ¶ms).await?; + } + + Ok(()) + } + + /// + #[allow(clippy::unused_async)] + pub(crate) async fn index_many_txn_input_data( + &self, values: &[(Vec, Vec, i32)], + ) -> anyhow::Result<()> { + if values.is_empty() { + return Ok(()); + } + + let conn = self.pool.get().await?; + + // Queries are divided into chunks because + // Postgres has a limit of u16::MAX parameters a query can have. + let chunk_size = u16::MAX / 3; + for chunk in values.chunks(chunk_size.into()) { + let mut values_strs = Vec::with_capacity(chunk.len()); + let mut i = 1; + + for v in chunk { + values_strs.push(format!( + "(${}::bytea,${}::bytea,${}::integer)", + i, + i + 1, + i + 2 + )); + i += 3; + } + + let query = format!( + "UPDATE cardano_utxo AS c SET spent_tx_id = v.tx_id FROM (VALUES {}) AS v(tx_id, output_hash, index) WHERE v.index = c.index AND v.output_hash = c.tx_id", + values_strs.join(",") + ); + + #[allow(trivial_casts)] + let params: Vec<_> = chunk + .iter() + .flat_map(|vs| { + [ + &vs.0 as &(dyn tokio_postgres::types::ToSql + Sync), + &vs.1, + &vs.2, + ] + }) + .collect(); + + conn.query(&query, ¶ms).await?; + } + + Ok(()) + } + /// Index utxo data pub(crate) async fn index_utxo_data(&self, tx: &MultiEraTx<'_>) -> anyhow::Result<()> { let conn = self.pool.get().await?; @@ -69,6 +178,52 @@ impl EventDB { Ok(()) } + /// + pub(crate) async fn index_many_txn_data( + &self, values: &[(Vec, SlotNumber, String)], + ) -> anyhow::Result<()> { + if values.is_empty() { + return Ok(()); + } + + let conn = self.pool.get().await?; + + // Queries are divided into chunks because + // Postgres has a limit of u16::MAX parameters a query can have. + let chunk_size = u16::MAX / 3; + for chunk in values.chunks(chunk_size.into()) { + // Build query VALUES statements + let mut values_strs = Vec::with_capacity(values.len()); + let mut i = 1; + + for v in chunk { + values_strs.push(format!("(${},${},${})", i, i + 1, i + 2)); + i += 3; + } + + let query = format!( + "INSERT INTO cardano_txn_index (id, slot_no, network) VALUES {} ON CONFLICT DO NOTHING", + values_strs.join(",") + ); + + #[allow(trivial_casts)] + let params: Vec<_> = chunk + .iter() + .flat_map(|vs| { + [ + &vs.0 as &(dyn tokio_postgres::types::ToSql + Sync), + &vs.1, + &vs.2, + ] + }) + .collect(); + + conn.query(&query, ¶ms).await?; + } + + Ok(()) + } + /// Index txn metadata pub(crate) async fn index_txn_data( &self, tx_id: &[u8], slot_no: SlotNumber, network: Network, diff --git a/catalyst-gateway/bin/src/main.rs b/catalyst-gateway/bin/src/main.rs index 11a5a8bfbad..ef39a8682fa 100644 --- a/catalyst-gateway/bin/src/main.rs +++ b/catalyst-gateway/bin/src/main.rs @@ -1,4 +1,6 @@ //! Catalyst Data Gateway +#![allow(unused)] + use clap::Parser; mod cardano;