diff --git a/catalyst-gateway/bin/src/event_db/cardano/cip36_registration/mod.rs b/catalyst-gateway/bin/src/event_db/cardano/cip36_registration/mod.rs index bb77557d289..b148a4a4d93 100644 --- a/catalyst-gateway/bin/src/event_db/cardano/cip36_registration/mod.rs +++ b/catalyst-gateway/bin/src/event_db/cardano/cip36_registration/mod.rs @@ -2,16 +2,14 @@ use cardano_chain_follower::Network; use pallas::ledger::traverse::MultiEraBlock; +use tokio_postgres::{binary_copy::BinaryCopyInWriter, types::Type}; use crate::{ cardano::{ cip36_registration::{Cip36Metadata, VotingInfo}, util::valid_era, }, - event_db::{ - cardano::chain_state::SlotNumber, error::NotFoundError, utils::prepare_sql_params_list, - EventDB, - }, + event_db::{cardano::chain_state::SlotNumber, error::NotFoundError, EventDB}, }; /// Transaction id @@ -145,39 +143,60 @@ impl EventDB { return Ok(()); } - let conn = self.pool.get().await?; + let mut conn = self.pool.get().await?; + let tx = conn.transaction().await?; - let chunk_size = (i16::MAX / 8) as usize; - for chunk in values.chunks(chunk_size) { - // Build query VALUES statements - let values_strings = prepare_sql_params_list(&[None; 8], chunk.len()); + tx.execute( + "CREATE TEMPORARY TABLE tmp_cardano_voter_registration (LIKE cardano_voter_registration) ON COMMIT DROP", + &[], + ) + .await?; - let query = format!( - r#"EXPLAIN (BUFFERS TRUE, ANALYZE TRUE) INSERT INTO cardano_voter_registration (tx_id, stake_credential, public_voting_key, payment_address, nonce, metadata_cip36, stats, valid) VALUES {} - ON CONFLICT (tx_id) DO UPDATE SET stake_credential = EXCLUDED.stake_credential, public_voting_key = EXCLUDED.public_voting_key, payment_address = EXCLUDED.payment_address, - nonce = EXCLUDED.nonce, metadata_cip36 = EXCLUDED.metadata_cip36, stats = EXCLUDED.stats, valid = EXCLUDED.valid"#, - values_strings.join(",") + { + let sink = tx + .copy_in("COPY tmp_cardano_voter_registration (tx_id, stake_credential, public_voting_key, payment_address, nonce, metadata_cip36, stats, valid) FROM STDIN BINARY") + .await?; + let writer = BinaryCopyInWriter::new( + sink, + &[ + Type::BYTEA, + Type::BYTEA, + Type::BYTEA, + Type::BYTEA, + Type::INT8, + Type::BYTEA, + Type::JSONB, + Type::BOOL, + ], ); - - #[allow(trivial_casts)] - let params: Vec<_> = chunk - .iter() - .flat_map(|vs| { - [ - &vs.tx_id as &(dyn tokio_postgres::types::ToSql + Sync), - &vs.stake_credential, - &vs.public_voting_key, - &vs.payment_address, - &vs.nonce, - &vs.cip36_metadata, - &vs.stats, - &vs.valid, - ] - }) - .collect(); - conn.execute(&query, ¶ms).await?; + tokio::pin!(writer); + + for params in values { + #[allow(trivial_casts)] + writer + .as_mut() + .write(&[ + ¶ms.tx_id as &(dyn tokio_postgres::types::ToSql + Sync), + ¶ms.stake_credential, + ¶ms.public_voting_key, + ¶ms.payment_address, + ¶ms.nonce, + ¶ms.cip36_metadata, + ¶ms.stats, + ¶ms.valid, + ]) + .await?; + } + + writer.finish().await?; } + tx.execute("INSERT INTO cardano_voter_registration (tx_id, stake_credential, public_voting_key, payment_address, nonce, metadata_cip36, stats, valid) + SELECT tx_id, stake_credential, public_voting_key, payment_address, nonce, metadata_cip36, stats, valid FROM tmp_cardano_voter_registration + ON CONFLICT (tx_id) DO UPDATE SET stake_credential = EXCLUDED.stake_credential, public_voting_key = EXCLUDED.public_voting_key, payment_address = EXCLUDED.payment_address, + nonce = EXCLUDED.nonce, metadata_cip36 = EXCLUDED.metadata_cip36, stats = EXCLUDED.stats, valid = EXCLUDED.valid", &[]).await?; + tx.commit().await?; + Ok(()) } @@ -188,11 +207,10 @@ impl EventDB { let conn = self.pool.get().await?; let rows = conn - .query(SELECT_VOTER_REGISTRATION_SQL, &[ - &stake_credential, - &network.to_string(), - &slot_num, - ]) + .query( + SELECT_VOTER_REGISTRATION_SQL, + &[&stake_credential, &network.to_string(), &slot_num], + ) .await?; let row = rows.first().ok_or(NotFoundError)?; diff --git a/catalyst-gateway/bin/src/event_db/cardano/utxo/mod.rs b/catalyst-gateway/bin/src/event_db/cardano/utxo/mod.rs index c4ccd8eff6a..fc3b1f7d7b8 100644 --- a/catalyst-gateway/bin/src/event_db/cardano/utxo/mod.rs +++ b/catalyst-gateway/bin/src/event_db/cardano/utxo/mod.rs @@ -144,13 +144,16 @@ impl EventDB { let sink = tx .copy_in("COPY tmp_cardano_utxo (tx_id, index, asset, stake_credential, value) FROM STDIN BINARY") .await?; - let writer = BinaryCopyInWriter::new(sink, &[ - Type::BYTEA, - Type::INT4, - Type::JSONB, - Type::BYTEA, - Type::INT8, - ]); + let writer = BinaryCopyInWriter::new( + sink, + &[ + Type::BYTEA, + Type::INT4, + Type::JSONB, + Type::BYTEA, + Type::INT8, + ], + ); tokio::pin!(writer); for params in values { @@ -275,11 +278,10 @@ impl EventDB { let conn = self.pool.get().await?; let row = conn - .query_one(SELECT_TOTAL_UTXO_AMOUNT_SQL, &[ - &stake_credential, - &network.to_string(), - &slot_num, - ]) + .query_one( + SELECT_TOTAL_UTXO_AMOUNT_SQL, + &[&stake_credential, &network.to_string(), &slot_num], + ) .await?; // Aggregate functions as SUM and MAX return NULL if there are no rows, so we need to diff --git a/catalyst-gateway/bin/src/event_db/mod.rs b/catalyst-gateway/bin/src/event_db/mod.rs index 21fc3b274c9..1c1c2e25f25 100644 --- a/catalyst-gateway/bin/src/event_db/mod.rs +++ b/catalyst-gateway/bin/src/event_db/mod.rs @@ -10,7 +10,6 @@ pub(crate) mod cardano; pub(crate) mod error; pub(crate) mod legacy; pub(crate) mod schema_check; -pub(crate) mod utils; /// Database URL Environment Variable name. /// eg: "`postgres://catalyst-dev:CHANGE_ME@localhost/CatalystDev`" diff --git a/catalyst-gateway/bin/src/event_db/utils.rs b/catalyst-gateway/bin/src/event_db/utils.rs deleted file mode 100644 index 76ec431e853..00000000000 --- a/catalyst-gateway/bin/src/event_db/utils.rs +++ /dev/null @@ -1,24 +0,0 @@ -//! Utility functions for the event db operations - -/// `ParamType` alias -pub(crate) type ParamType<'a> = Option<&'a str>; - -/// Prepare SQL parameters list from the provided list size and number of parameters. -/// Output format: `[($1, $2, $3), ($4, $5, $6)]` -pub(crate) fn prepare_sql_params_list(params: &[ParamType], list_size: usize) -> Vec { - let params_num = params.len(); - (0..list_size) - .map(|row| { - let placeholders: String = params - .iter() - .enumerate() - .map(|(i, param_type)| { - let param_type = param_type.map(|val| format!("::{val}")).unwrap_or_default(); - format!("${}{param_type}", row * params_num + i + 1) - }) - .collect::>() - .join(","); - format!("({placeholders})") - }) - .collect() -}