Skip to content

Commit

Permalink
feat: process chain data and write data to postgres concurrently
Browse files Browse the repository at this point in the history
  • Loading branch information
FelipeRosa committed May 24, 2024
1 parent 4ab9b44 commit 27b0c2e
Showing 1 changed file with 86 additions and 60 deletions.
146 changes: 86 additions & 60 deletions catalyst-gateway/bin/src/cardano/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use cardano_chain_follower::{
network_genesis_values, ChainUpdate, Follower, FollowerConfigBuilder, Network, Point,
};
use pallas::ledger::traverse::{wellknown::GenesisValues, MultiEraBlock};
use tokio::{task::JoinHandle, time};
use tokio::{sync::mpsc, task::JoinHandle, time};
use tracing::{error, info};

use crate::event_db::{
Expand All @@ -25,6 +25,9 @@ use crate::event_db::{
pub(crate) mod cip36_registration;
pub(crate) mod util;

/// Blocks batch length that will trigger the blocks buffer to be written to the database.
const MAX_BLOCKS_BATCH_LEN: usize = 1024;

/// Returns a follower configs, waits until they present inside the db
async fn get_follower_config(
check_config_tick: u64, db: Arc<EventDB>,
Expand Down Expand Up @@ -134,72 +137,95 @@ async fn process_blocks(
) {
info!("Follower started processing blocks");

let mut blocks_buffer = Vec::new();
let (blocks_tx, mut blocks_rx) = mpsc::channel(1);

let mut ticker = tokio::time::interval(Duration::from_secs(60));
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
tokio::spawn({
let genesis_values = genesis_values.clone();

loop {
tokio::select! {
result = follower.next() => match result {
Ok(chain_update) => match chain_update {
ChainUpdate::Block(data) => {
blocks_buffer.push(data);

// We have enough blocks to index
if blocks_buffer.len() >= 1024 {
let current_buffer = std::mem::take(&mut blocks_buffer);
index_block_buffer(db.clone(), genesis_values, network, &machine_id, current_buffer).await;

// Since we just wrote stuff to the database,
// reset the ticker so the next write is at least 1 interval from now.
ticker.reset();
async move {
let mut blocks_buffer = Vec::new();

let mut ticker = tokio::time::interval(Duration::from_secs(60));
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);

loop {
tokio::select! {
res = blocks_rx.recv() => {
match res {
Some(block_data) => {
blocks_buffer.push(block_data);

if blocks_buffer.len() >= MAX_BLOCKS_BATCH_LEN {
index_block_buffer(db.clone(), &genesis_values, network, &machine_id, std::mem::take(&mut blocks_buffer)).await;

// Reset batch ticker since we just indexed the blocks buffer
ticker.reset();
}
}

None => {
break;
}
}
},
ChainUpdate::Rollback(data) => {
let block = match data.decode() {
Ok(block) => block,
Err(err) => {
error!("Unable to decode {network:?} block {err} - skip..");
continue;
},
};

info!(
"Rollback block NUMBER={} SLOT={} HASH={}",
block.number(),
block.slot(),
hex::encode(block.hash()),
);
},
}

_ = ticker.tick() => {
// This executes when we have not indexed blocks for more than the configured
// tick interval. This means that if any errors occur in that time we lose the buffered block data (e.g.
// cat-gateway is shutdown ungracefully). This is not a problem since cat-gateway
// checkpoints the latest database writes so it simply restarts from the last
// written block.
//
// This is most likely to happen when following from the tip or receiving blocks
// from the network (since updates will come at larger intervals).
if blocks_buffer.is_empty() {
continue;
}

let current_buffer = std::mem::take(&mut blocks_buffer);
index_block_buffer(db.clone(), &genesis_values, network, &machine_id, current_buffer).await;

// Reset the ticker so it counts the interval as starting after we wrote everything
// to the database.
ticker.reset();
}
}
}
}
});

loop {
match follower.next().await {
Ok(chain_update) => match chain_update {
ChainUpdate::Block(data) => {
if blocks_tx.send(data).await.is_err() {
error!("Block indexing task not running");
break;
};
},
Err(err) => {
error!(
"Unable to receive next update from the {network:?} follower err: {err} - skip..",
ChainUpdate::Rollback(data) => {
let block = match data.decode() {
Ok(block) => block,
Err(err) => {
error!("Unable to decode {network:?} block {err} - skip..");
continue;
},
};

info!(
"Rollback block NUMBER={} SLOT={} HASH={}",
block.number(),
block.slot(),
hex::encode(block.hash()),
);
continue;
},
},
_ = ticker.tick() => {
// This executes when we have not indexed blocks for more than the configured
// tick interval. This means that if any errors occur in that time we lose the buffered block data (e.g.
// cat-gateway is shutdown ungracefully). This is not a problem since cat-gateway
// checkpoints the latest database writes so it simply restarts from the last
// written block.
//
// This is most likely to happen when following from the tip or receiving blocks
// from the network (since updates will come at larger intervals).
if blocks_buffer.is_empty() {
continue;
}

let current_buffer = std::mem::take(&mut blocks_buffer);
index_block_buffer(db.clone(), genesis_values, network, &machine_id, current_buffer).await;

// Reset the ticker so it counts the interval as starting after we wrote everything
// to the database.
ticker.reset();
}
Err(err) => {
error!(
"Unable to receive next update from the {network:?} follower err: {err} - skip..",
);
continue;
},
}
}
}
Expand Down

0 comments on commit 27b0c2e

Please sign in to comment.