Skip to content

Commit

Permalink
chore: clean logging and error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
FelipeRosa committed May 27, 2024
1 parent a35a476 commit 191c0ea
Showing 1 changed file with 60 additions and 101 deletions.
161 changes: 60 additions & 101 deletions catalyst-gateway/bin/src/cardano/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::{path::PathBuf, sync::Arc, time::Duration};
/// Handler for follower tasks, allows for control over spawned follower threads
pub type ManageTasks = JoinHandle<()>;

use anyhow::Context;
use cardano_chain_follower::{
network_genesis_values, ChainUpdate, Follower, FollowerConfigBuilder, Network, Point,
};
Expand Down Expand Up @@ -237,7 +238,7 @@ async fn index_block_buffer(
db: Arc<EventDB>, genesis_values: &GenesisValues, network: Network, machine_id: &MachineId,
buffer: Vec<cardano_chain_follower::MultiEraBlockData>,
) {
info!("Starting data indexing");
info!("Starting data batch indexing");

let mut blocks = Vec::new();

Expand All @@ -250,39 +251,35 @@ async fn index_block_buffer(
}
}

index_many_blocks(db.clone(), genesis_values, network, machine_id, &blocks).await;
match index_many_blocks(db.clone(), genesis_values, network, machine_id, &blocks).await {
Ok(()) => {
info!("Finished indexing data batch");
},
Err(e) => {
error!(error = ?e, "Failed indexing data batch");
},
}
}

/// Index a slice of blocks.
async fn index_many_blocks(
db: Arc<EventDB>, genesis_values: &GenesisValues, network: Network, machine_id: &MachineId,
blocks: &[MultiEraBlock<'_>],
) {
) -> anyhow::Result<()> {
let Some(last_block) = blocks.last() else {
return;
return Ok(());
};

let network_str = network.to_string();

if !index_blocks(&db, genesis_values, &network_str, blocks).await {
return;
}

if !index_transactions(&db, blocks, &network_str).await {
return;
}
index_blocks(&db, genesis_values, &network_str, blocks).await?;
index_transactions(&db, blocks, &network_str).await?;
index_voter_registrations(&db, blocks, network).await?;

if !index_voter_registrations(&db, blocks, network).await {
return;
}

// SAFETY: This is safe to ignore because we would not reach this point if
// the try_into inside the block indexing loop had failed.
#[allow(clippy::cast_possible_wrap)]
match db
.refresh_last_updated(
chrono::offset::Utc::now(),
last_block.slot() as i64,
last_block.slot().try_into()?,
last_block.hash().to_vec(),
network,
machine_id,
Expand All @@ -294,151 +291,113 @@ async fn index_many_blocks(
error!("Unable to mark {network:?} last update point {err} - skip..",);
},
};

Ok(())
}

/// Index the data from the given blocks.
async fn index_blocks(
db: &EventDB, genesis_values: &GenesisValues, network_str: &str, blocks: &[MultiEraBlock<'_>],
) -> bool {
) -> anyhow::Result<usize> {
let values: Vec<_> = blocks
.iter()
.filter_map(|block| {
IndexedFollowerDataParams::from_block_data(genesis_values, network_str, block)
})
.collect();

match db.index_many_follower_data(&values).await {
Ok(()) => {
info!(count = values.len(), "Finished indexing block data");
true
},
Err(e) => {
error!(error = ?e, "Failed to write DB entries");
false
},
}
db.index_many_follower_data(&values)
.await
.context("Indexing block data")?;

Ok(values.len())
}

/// Index transactions (and its inputs and outputs) from a slice of blocks.
async fn index_transactions(db: &EventDB, blocks: &[MultiEraBlock<'_>], network_str: &str) -> bool {
async fn index_transactions(
db: &EventDB, blocks: &[MultiEraBlock<'_>], network_str: &str,
) -> anyhow::Result<()> {
let blocks_txs: Vec<_> = blocks
.iter()
.flat_map(|b| b.txs().into_iter().map(|tx| (b.slot(), tx)))
.collect();

if !index_transactions_data(db, network_str, &blocks_txs).await {
return false;
}

if !index_transaction_outputs_data(db, &blocks_txs).await {
return false;
}
index_transactions_data(db, network_str, &blocks_txs).await?;
index_transaction_outputs_data(db, &blocks_txs).await?;
index_transaction_inputs_data(db, &blocks_txs).await?;

if !index_transaction_inputs_data(db, &blocks_txs).await {
return false;
}

true
Ok(())
}

/// Index transactions data.
async fn index_transactions_data(
db: &EventDB, network_str: &str, blocks_txs: &[(u64, MultiEraTx<'_>)],
) -> bool {
) -> anyhow::Result<usize> {
let values: Vec<_> = blocks_txs
.iter()
.map(|(slot, tx)| {
// SAFETY: This is safe to ignore because we would not reach this point if
// the try_into inside the block indexing loop had failed.
#[allow(clippy::cast_possible_wrap)]
IndexedTxnParams {
Ok(IndexedTxnParams {
id: tx.hash().to_vec(),
slot_no: *slot as i64,
slot_no: (*slot).try_into()?,
network: network_str,
}
})
})
.collect();
.collect::<anyhow::Result<Vec<_>>>()?;

match db.index_many_txn_data(&values).await {
Ok(()) => info!(count = values.len(), "Finished indexing transactions"),
Err(e) => {
error!(error = ?e, "Failed to index transactions");
return false;
},
}
db.index_many_txn_data(&values)
.await
.context("Indexing transaction data")?;

true
Ok(values.len())
}

/// Index transaction outputs data.
async fn index_transaction_outputs_data(
db: &EventDB, blocks_txs: &[(u64, MultiEraTx<'_>)],
) -> bool {
) -> anyhow::Result<usize> {
let values: Vec<_> = blocks_txs
.iter()
.flat_map(|(_, tx)| IndexedTxnOutputParams::from_txn_data(tx))
.collect();

match db.index_many_txn_output_data(&values).await {
Ok(()) => {
info!(
count = values.len(),
"Finished indexing transaction outputs data"
);
true
},
Err(e) => {
error!(error = ?e, "Failed to index transaction outputs data");
false
},
}
db.index_many_txn_output_data(&values)
.await
.context("Indexing transaction outputs")?;

Ok(values.len())
}

/// Index transaction inputs data.
async fn index_transaction_inputs_data(db: &EventDB, blocks_txs: &[(u64, MultiEraTx<'_>)]) -> bool {
async fn index_transaction_inputs_data(
db: &EventDB, blocks_txs: &[(u64, MultiEraTx<'_>)],
) -> anyhow::Result<usize> {
let values: Vec<_> = blocks_txs
.iter()
.flat_map(|(_, tx)| IndexedTxnInputParams::from_txn_data(tx))
.collect();

match db.index_many_txn_input_data(&values).await {
Ok(()) => {
info!(
count = values.len(),
"Finished indexing transaction inputs data"
);
true
},
Err(e) => {
error!(error = ?e, "Failed to index transaction inputs data");
false
},
}
db.index_many_txn_input_data(&values)
.await
.context("Indexing transaction inputs")?;

Ok(values.len())
}

/// Index voter registrations from a slice of blocks.
async fn index_voter_registrations(
db: &EventDB, blocks: &[MultiEraBlock<'_>], network: Network,
) -> bool {
) -> anyhow::Result<usize> {
let values: Vec<_> = blocks
.iter()
.filter_map(|block| IndexedVoterRegistrationParams::from_block_data(block, network))
.flatten()
.collect();

match db.index_many_voter_registration_data(&values).await {
Ok(()) => {
info!(
count = values.len(),
"Finished indexing voter registrations data"
);
true
},
Err(e) => {
error!(error = ?e, "Failed to index voter registrations data");
false
},
}
db.index_many_voter_registration_data(&values)
.await
.context("Indexing voter registration")?;

Ok(values.len())
}

/// Instantiate the follower.
Expand Down

0 comments on commit 191c0ea

Please sign in to comment.