Skip to content

Commit

Permalink
refactor: Cleanup catalyst-gateway event-db queries (#297)
Browse files Browse the repository at this point in the history
* remove config_table

* add insert_voter_registration quries

* fix

* wip

* wip

* fix lints

* fix rust build

* move sql quries to the separate files

* move schema_check query

* move config query

* remove redundant traits

* fix

* fix db
  • Loading branch information
Mr-Leshiy authored Mar 12, 2024
1 parent 6c7e899 commit 9e580c0
Show file tree
Hide file tree
Showing 26 changed files with 187 additions and 248 deletions.
1 change: 0 additions & 1 deletion catalyst-gateway/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion catalyst-gateway/bin/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ workspace = true

[dependencies]

async-trait = { workspace = true }
bb8 = { workspace = true }
bb8-postgres = { workspace = true }
tokio-postgres = { workspace = true, features = [
Expand Down
1 change: 0 additions & 1 deletion catalyst-gateway/bin/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use tokio::time;
use tracing::{error, info};

use crate::{
event_db::config::ConfigQueries,
follower::start_followers,
logger, service,
settings::{DocsSettings, ServiceSettings},
Expand Down
27 changes: 9 additions & 18 deletions catalyst-gateway/bin/src/event_db/config.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,8 @@
//! Config Queries
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use tracing::error;

use crate::event_db::{Error, EventDB};

#[async_trait]
#[allow(clippy::module_name_repetitions)]
/// Config Queries Trait
pub(crate) trait ConfigQueries: Sync + Send + 'static {
async fn get_config(&self) -> Result<(Vec<NetworkMeta>, FollowerMeta), Error>;
}
use crate::event_db::Error::JsonParseIssue;
use crate::event_db::{Error, Error::JsonParseIssue, EventDB};

#[derive(Serialize, Deserialize, Debug, PartialEq, PartialOrd, Clone)]
/// Network config metadata
Expand All @@ -32,16 +23,16 @@ pub(crate) struct FollowerMeta {
}

impl EventDB {
/// Get config
const CONFIG_QUERY: &'static str = "SELECT cardano, follower, preview FROM config";
}

#[async_trait]
impl ConfigQueries for EventDB {
async fn get_config(&self) -> Result<(Vec<NetworkMeta>, FollowerMeta), Error> {
/// Config query
pub(crate) async fn get_config(&self) -> Result<(Vec<NetworkMeta>, FollowerMeta), Error> {
let conn = self.pool.get().await?;

let rows = conn.query(Self::CONFIG_QUERY, &[]).await?;
let rows = conn
.query(
include_str!("../../../event-db/queries/config/select_config.sql"),
&[],
)
.await?;

let Some(row) = rows.first() else {
return Err(Error::NoConfig);
Expand Down
93 changes: 36 additions & 57 deletions catalyst-gateway/bin/src/event_db/follower.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
//! Follower Queries

use async_trait::async_trait;
use cardano_chain_follower::Network;
use chrono::TimeZone;

Expand All @@ -19,41 +18,9 @@ pub type MachineId = String;
/// Time when a follower last indexed
pub type LastUpdate = chrono::DateTime<chrono::offset::Utc>;

#[async_trait]
#[allow(clippy::module_name_repetitions)]
/// Follower Queries Trait
pub(crate) trait FollowerQueries: Sync + Send + 'static {
async fn index_follower_data(
&self, slot_no: SlotNumber, network: Network, epoch_no: EpochNumber, block_time: BlockTime,
block_hash: BlockHash,
) -> Result<(), Error>;

async fn last_updated_metadata(
&self, network: String,
) -> Result<(SlotNumber, BlockHash, LastUpdate), Error>;

async fn refresh_last_updated(
&self, last_updated: LastUpdate, slot_no: SlotNumber, block_hash: BlockHash,
network: Network, machine_id: &MachineId,
) -> Result<(), Error>;
}

impl EventDB {
/// Update db with follower data
const INDEX_FOLLOWER_QUERY: &'static str =
"INSERT INTO cardano_slot_index(slot_no, network, epoch_no, block_time, block_hash) VALUES($1, $2, $3, $4, $5)";
/// If no last updated metadata, init with metadata. If present update metadata.
const LAST_UPDATED_QUERY: &'static str =
"INSERT INTO cardano_update_state(id, started, ended, updater_id ,slot_no, network, block_hash, update) VALUES($1, $2, $3 , $4, $5, $6, $7, $8) ON CONFLICT(id) DO UPDATE SET ended = $2, slot_no = $5, block_hash = $7;";
/// Start follower from where previous follower left off.
const START_FROM_QUERY: &'static str =
"SELECT network, slot_no, block_hash, ended FROM cardano_update_state WHERE network = $1;";
}

#[async_trait]
impl FollowerQueries for EventDB {
/// Index follower block stream
async fn index_follower_data(
pub(crate) async fn index_follower_data(
&self, slot_no: SlotNumber, network: Network, epoch_no: EpochNumber, block_time: BlockTime,
block_hash: BlockHash,
) -> Result<(), Error> {
Expand All @@ -69,25 +36,34 @@ impl FollowerQueries for EventDB {
};

let _rows = conn
.query(Self::INDEX_FOLLOWER_QUERY, &[
&slot_no,
&network,
&epoch_no,
&timestamp,
&hex::decode(block_hash).map_err(|e| Error::DecodeHex(e.to_string()))?,
])
.query(
include_str!("../../../event-db/queries/follower/insert_slot_index.sql"),
&[
&slot_no,
&network,
&epoch_no,
&timestamp,
&hex::decode(block_hash).map_err(|e| Error::DecodeHex(e.to_string()))?,
],
)
.await?;

Ok(())
}

/// Check when last update occurred
async fn last_updated_metadata(
/// Check when last update occurred.
/// Start follower from where previous follower left off.
pub(crate) async fn last_updated_metadata(
&self, network: String,
) -> Result<(SlotNumber, BlockHash, LastUpdate), Error> {
let conn = self.pool.get().await?;

let rows = conn.query(Self::START_FROM_QUERY, &[&network]).await?;
let rows = conn
.query(
include_str!("../../../event-db/queries/follower/select_update_state.sql"),
&[&network],
)
.await?;
if rows.is_empty() {
return Err(Error::NoLastUpdateMetadata("No metadata".to_string()));
}
Expand All @@ -101,8 +77,8 @@ impl FollowerQueries for EventDB {
Err(e) => return Err(Error::NoLastUpdateMetadata(e.to_string())),
};

let block_hash: BlockHash = match row.try_get("block_hash") {
Ok(block_hash) => block_hash,
let block_hash: BlockHash = match row.try_get::<_, Vec<u8>>("block_hash") {
Ok(block_hash) => hex::encode(block_hash),
Err(e) => return Err(Error::NoLastUpdateMetadata(e.to_string())),
};
let last_updated: LastUpdate = match row.try_get("ended") {
Expand All @@ -115,7 +91,7 @@ impl FollowerQueries for EventDB {

/// Mark point in time where the last follower finished indexing in order for future
/// followers to pick up from this point
async fn refresh_last_updated(
pub(crate) async fn refresh_last_updated(
&self, last_updated: LastUpdate, slot_no: SlotNumber, block_hash: BlockHash,
network: Network, machine_id: &MachineId,
) -> Result<(), Error> {
Expand All @@ -134,16 +110,19 @@ impl FollowerQueries for EventDB {
// An insert only happens once when there is no update metadata available
// All future additions are just updates on ended, slot_no and block_hash
let _rows = conn
.query(Self::LAST_UPDATED_QUERY, &[
&i64::from(id),
&last_updated,
&last_updated,
&machine_id,
&slot_no,
&network,
&block_hash,
&update,
])
.query(
include_str!("../../../event-db/queries/follower/insert_update_state.sql"),
&[
&i64::from(id),
&last_updated,
&last_updated,
&machine_id,
&slot_no,
&network,
&hex::decode(block_hash).map_err(|e| Error::DecodeHex(e.to_string()))?,
&update,
],
)
.await?;

Ok(())
Expand Down
32 changes: 12 additions & 20 deletions catalyst-gateway/bin/src/event_db/legacy/queries/event/ballot.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
//! Ballot Queries
use std::collections::HashMap;

use async_trait::async_trait;

use crate::event_db::{
error::Error,
legacy::types::{
Expand All @@ -18,19 +16,6 @@ use crate::event_db::{
EventDB,
};

#[async_trait]
#[allow(clippy::module_name_repetitions)]
/// Ballot Queries Trait
pub(crate) trait BallotQueries: Sync + Send + 'static {
async fn get_ballot(
&self, event: EventId, objective: ObjectiveId, proposal: ProposalId,
) -> Result<Ballot, Error>;
async fn get_objective_ballots(
&self, event: EventId, objective: ObjectiveId,
) -> Result<Vec<ProposalBallot>, Error>;
async fn get_event_ballots(&self, event: EventId) -> Result<Vec<ObjectiveBallots>, Error>;
}

impl EventDB {
/// Ballot vote options per event query template
const BALLOTS_VOTE_OPTIONS_PER_EVENT_QUERY: &'static str =
Expand Down Expand Up @@ -62,9 +47,10 @@ impl EventDB {
WHERE objective.event = $1 AND objective.id = $2 AND proposal.id = $3;";
}

#[async_trait]
impl BallotQueries for EventDB {
async fn get_ballot(
impl EventDB {
/// Get ballot query
#[allow(dead_code)]
pub(crate) async fn get_ballot(
&self, event: EventId, objective: ObjectiveId, proposal: ProposalId,
) -> Result<Ballot, Error> {
let conn = self.pool.get().await?;
Expand Down Expand Up @@ -107,7 +93,9 @@ impl BallotQueries for EventDB {
})
}

async fn get_objective_ballots(
/// Get objective's ballots query
#[allow(dead_code)]
pub(crate) async fn get_objective_ballots(
&self, event: EventId, objective: ObjectiveId,
) -> Result<Vec<ProposalBallot>, Error> {
let conn = self.pool.get().await?;
Expand Down Expand Up @@ -155,7 +143,11 @@ impl BallotQueries for EventDB {
Ok(ballots)
}

async fn get_event_ballots(&self, event: EventId) -> Result<Vec<ObjectiveBallots>, Error> {
/// Get event's ballots query
#[allow(dead_code)]
pub(crate) async fn get_event_ballots(
&self, event: EventId,
) -> Result<Vec<ObjectiveBallots>, Error> {
let conn = self.pool.get().await?;

let rows = conn
Expand Down
22 changes: 7 additions & 15 deletions catalyst-gateway/bin/src/event_db/legacy/queries/event/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
//! Event Queries
use async_trait::async_trait;
use chrono::{NaiveDateTime, Utc};

use crate::event_db::{
Expand All @@ -16,16 +15,6 @@ pub(crate) mod objective;
pub(crate) mod proposal;
pub(crate) mod review;

#[async_trait]
#[allow(clippy::module_name_repetitions)]
/// Event Queries Trait
pub(crate) trait EventQueries: Sync + Send + 'static {
async fn get_events(
&self, limit: Option<i64>, offset: Option<i64>,
) -> Result<Vec<EventSummary>, Error>;
async fn get_event(&self, event: EventId) -> Result<Event, Error>;
}

impl EventDB {
/// Events query template
const EVENTS_QUERY: &'static str =
Expand All @@ -50,9 +39,10 @@ impl EventDB {
WHERE event.row_id = $1;";
}

#[async_trait]
impl EventQueries for EventDB {
async fn get_events(
impl EventDB {
/// Get events query
#[allow(dead_code)]
pub(crate) async fn get_events(
&self, limit: Option<i64>, offset: Option<i64>,
) -> Result<Vec<EventSummary>, Error> {
let conn = self.pool.get().await?;
Expand Down Expand Up @@ -84,7 +74,9 @@ impl EventQueries for EventDB {
Ok(events)
}

async fn get_event(&self, event: EventId) -> Result<Event, Error> {
/// Get event query
#[allow(dead_code)]
pub(crate) async fn get_event(&self, event: EventId) -> Result<Event, Error> {
let conn = self.pool.get().await?;

let rows = conn.query(Self::EVENT_QUERY, &[&event.0]).await?;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
//! Objective Queries
use async_trait::async_trait;

use crate::event_db::{
error::Error,
legacy::types::{
Expand All @@ -14,15 +12,6 @@ use crate::event_db::{
EventDB,
};

#[async_trait]
#[allow(clippy::module_name_repetitions)]
/// Objective Queries Trait
pub(crate) trait ObjectiveQueries: Sync + Send + 'static {
async fn get_objectives(
&self, event: EventId, limit: Option<i64>, offset: Option<i64>,
) -> Result<Vec<Objective>, Error>;
}

impl EventDB {
/// Objectives query template
const OBJECTIVES_QUERY: &'static str =
Expand All @@ -39,9 +28,10 @@ impl EventDB {
WHERE objective_id = $1;";
}

#[async_trait]
impl ObjectiveQueries for EventDB {
async fn get_objectives(
impl EventDB {
/// Get objectives query
#[allow(dead_code)]
pub(crate) async fn get_objectives(
&self, event: EventId, limit: Option<i64>, offset: Option<i64>,
) -> Result<Vec<Objective>, Error> {
let conn = self.pool.get().await?;
Expand Down
Loading

0 comments on commit 9e580c0

Please sign in to comment.