Skip to content

Commit

Permalink
Merge branch 'main' into chore/batch-insert-index-data
Browse files Browse the repository at this point in the history
  • Loading branch information
FelipeRosa committed May 31, 2024
2 parents b453d85 + 4e1af1b commit 5ee5b1a
Show file tree
Hide file tree
Showing 51 changed files with 1,516 additions and 242 deletions.
2 changes: 2 additions & 0 deletions .config/dictionaries/project.dic
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ pytest
rapidoc
redoc
Replayability
reloadable
repr
reqwest
rfwtxt
Expand All @@ -159,6 +160,7 @@ slotno
sqlfluff
Stefano
stevenj
stringzilla
Subkey
subosito
SYSROOT
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/generate-allure-report.yml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ jobs:
if: always()
continue-on-error: true
with:
earthfile: ./catalyst-gateway/tests/
earthfile: ./catalyst-gateway/tests/schemathesis_tests
flags: --allow-privileged
targets: test-fuzzer-api
target_flags:
Expand Down
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -107,3 +107,6 @@ dev-catalyst-voice-9f78f27c6bc5.json
# Specifically exclude it in the directory it appears, if its required.
# More information here https://doc.rust-lang.org/cargo/guide/cargo-toml-vs-cargo-lock.html
Cargo.lock

#Hypothesis
.hypothesis
1 change: 1 addition & 0 deletions catalyst-gateway/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ cddl = "0.9.2"
ciborium = "0.2"
pallas = { git = "https://github.com/input-output-hk/catalyst-pallas.git", rev = "709acb19c52c6b789279ecc4bc8793b5d8b5abe9", version = "0.25.0" }
cardano-chain-follower = { git = "https://github.com/input-output-hk/hermes.git", version="0.0.1" }
stringzilla = "3.8.4"

[workspace.lints.rust]
warnings = "deny"
Expand Down
3 changes: 2 additions & 1 deletion catalyst-gateway/bin/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ tokio-postgres = { workspace = true, features = [
] }
clap = { workspace = true, features = ["derive", "env"] }
tracing = { workspace = true, features = ["log"] }
tracing-subscriber = { workspace = true, features = ["fmt", "json", "time"] }
tracing-subscriber = { workspace = true, features = ["fmt", "json", "registry", "std", "time"] }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
tokio = { workspace = true, features = ["rt", "macros", "rt-multi-thread"] }
Expand Down Expand Up @@ -68,3 +68,4 @@ handlebars = { workspace = true }
cddl = { workspace = true }
ciborium = { workspace = true }
ed25519-dalek = "2.1.1"
stringzilla = { workspace = true }
7 changes: 5 additions & 2 deletions catalyst-gateway/bin/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,16 @@ impl Cli {
pub(crate) async fn exec(self) -> anyhow::Result<()> {
match self {
Self::Run(settings) => {
logger::init(settings.log_level)?;
let logger_handle = logger::init(settings.log_level);

// Unique machine id
let machine_id = settings.follower_settings.machine_uid;

let state = Arc::new(State::new(Some(settings.database_url)).await?);
let state = Arc::new(State::new(Some(settings.database_url), logger_handle).await?);
let event_db = state.event_db();
event_db
.modify_deep_query(settings.deep_query_inspection.into())
.await;

tokio::spawn(async move {
match service::run(&settings.docs_settings, state.clone()).await {
Expand Down
69 changes: 31 additions & 38 deletions catalyst-gateway/bin/src/event_db/cardano/chain_state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,23 +58,17 @@ impl SlotInfoQueryType {
/// Get SQL query
fn get_sql_query(&self) -> anyhow::Result<String> {
let tmpl_fields = match self {
SlotInfoQueryType::Previous => {
SlotInfoQueryTmplFields {
sign: "<",
ordering: Some("DESC"),
}
SlotInfoQueryType::Previous => SlotInfoQueryTmplFields {
sign: "<",
ordering: Some("DESC"),
},
SlotInfoQueryType::Current => {
SlotInfoQueryTmplFields {
sign: "=",
ordering: None,
}
SlotInfoQueryType::Current => SlotInfoQueryTmplFields {
sign: "=",
ordering: None,
},
SlotInfoQueryType::Next => {
SlotInfoQueryTmplFields {
sign: ">",
ordering: None,
}
SlotInfoQueryType::Next => SlotInfoQueryTmplFields {
sign: ">",
ordering: None,
},
};

Expand Down Expand Up @@ -161,13 +155,16 @@ impl EventDB {
let sink = tx
.copy_in("COPY tmp_cardano_slot_index (slot_no, network, epoch_no, block_time, block_hash) FROM STDIN BINARY")
.await?;
let writer = BinaryCopyInWriter::new(sink, &[
Type::INT8,
Type::TEXT,
Type::INT8,
Type::TIMESTAMPTZ,
Type::BYTEA,
]);
let writer = BinaryCopyInWriter::new(
sink,
&[
Type::INT8,
Type::TEXT,
Type::INT8,
Type::TIMESTAMPTZ,
Type::BYTEA,
],
);
tokio::pin!(writer);

for params in values {
Expand Down Expand Up @@ -197,13 +194,11 @@ impl EventDB {
pub(crate) async fn get_slot_info(
&self, date_time: DateTime, network: Network, query_type: SlotInfoQueryType,
) -> anyhow::Result<(SlotNumber, BlockHash, DateTime)> {
let conn = self.pool.get().await?;

let rows = conn
.query(&query_type.get_sql_query()?, &[
&network.to_string(),
&date_time,
])
let rows = self
.query(
&query_type.get_sql_query()?,
&[&network.to_string(), &date_time],
)
.await?;

let row = rows.first().ok_or(NotFoundError)?;
Expand All @@ -218,9 +213,7 @@ impl EventDB {
pub(crate) async fn last_updated_state(
&self, network: Network,
) -> anyhow::Result<(SlotNumber, BlockHash, DateTime)> {
let conn = self.pool.get().await?;

let rows = conn
let rows = self
.query(SELECT_UPDATE_STATE_SQL, &[&network.to_string()])
.await?;

Expand All @@ -239,17 +232,16 @@ impl EventDB {
&self, last_updated: DateTime, slot_no: SlotNumber, block_hash: BlockHash,
network: Network, machine_id: &MachineId,
) -> anyhow::Result<()> {
let conn = self.pool.get().await?;

// Rollback or update
let update = true;

let network_id: u64 = network.into();

// An insert only happens once when there is no update metadata available
// All future additions are just updates on ended, slot_no and block_hash
let _rows = conn
.query(INSERT_UPDATE_STATE_SQL, &[
self.modify(
INSERT_UPDATE_STATE_SQL,
&[
&i64::try_from(network_id)?,
&last_updated,
&last_updated,
Expand All @@ -258,8 +250,9 @@ impl EventDB {
&network.to_string(),
&block_hash,
&update,
])
.await?;
],
)
.await?;

Ok(())
}
Expand Down
37 changes: 18 additions & 19 deletions catalyst-gateway/bin/src/event_db/cardano/cip36_registration/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@ impl IndexedVoterRegistrationParams {
} else {
(None, None, None, None)
};

let encoded_voting_key = if let Some(voting_info) = voting_info.as_ref() {
let Ok(enc) = serde_json::to_string(voting_info) else {
return None;
Expand Down Expand Up @@ -156,16 +155,19 @@ impl EventDB {
let sink = tx
.copy_in("COPY tmp_cardano_voter_registration (tx_id, stake_credential, public_voting_key, payment_address, nonce, metadata_cip36, stats, valid) FROM STDIN BINARY")
.await?;
let writer = BinaryCopyInWriter::new(sink, &[
Type::BYTEA,
Type::BYTEA,
Type::BYTEA,
Type::BYTEA,
Type::INT8,
Type::BYTEA,
Type::JSONB,
Type::BOOL,
]);
let writer = BinaryCopyInWriter::new(
sink,
&[
Type::BYTEA,
Type::BYTEA,
Type::BYTEA,
Type::BYTEA,
Type::INT8,
Type::BYTEA,
Type::JSONB,
Type::BOOL,
],
);
tokio::pin!(writer);

for params in values {
Expand Down Expand Up @@ -201,14 +203,11 @@ impl EventDB {
pub(crate) async fn get_registration_info(
&self, stake_credential: StakeCredential, network: Network, slot_num: SlotNumber,
) -> anyhow::Result<(TxId, PaymentAddress, PublicVotingInfo, Nonce)> {
let conn = self.pool.get().await?;

let rows = conn
.query(SELECT_VOTER_REGISTRATION_SQL, &[
&stake_credential,
&network.to_string(),
&slot_num,
])
let rows = self
.query(
SELECT_VOTER_REGISTRATION_SQL,
&[&stake_credential, &network.to_string(), &slot_num],
)
.await?;

let row = rows.first().ok_or(NotFoundError)?;
Expand Down
4 changes: 1 addition & 3 deletions catalyst-gateway/bin/src/event_db/cardano/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,10 @@ const SELECT_CONFIG_SQL: &str = include_str!("select_config.sql");
impl EventDB {
/// Config query
pub(crate) async fn get_follower_config(&self) -> anyhow::Result<Vec<FollowerConfig>> {
let conn = self.pool.get().await?;

let id = "cardano";
let id2 = "follower";

let rows = conn.query(SELECT_CONFIG_SQL, &[&id, &id2]).await?;
let rows = self.query(SELECT_CONFIG_SQL, &[&id, &id2]).await?;

let mut follower_configs = Vec::new();
for row in rows {
Expand Down
30 changes: 15 additions & 15 deletions catalyst-gateway/bin/src/event_db/cardano/utxo/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,13 +144,16 @@ impl EventDB {
let sink = tx
.copy_in("COPY tmp_cardano_utxo (tx_id, index, asset, stake_credential, value) FROM STDIN BINARY")
.await?;
let writer = BinaryCopyInWriter::new(sink, &[
Type::BYTEA,
Type::INT4,
Type::JSONB,
Type::BYTEA,
Type::INT8,
]);
let writer = BinaryCopyInWriter::new(
sink,
&[
Type::BYTEA,
Type::INT4,
Type::JSONB,
Type::BYTEA,
Type::INT8,
],
);
tokio::pin!(writer);

for params in values {
Expand Down Expand Up @@ -272,14 +275,11 @@ impl EventDB {
pub(crate) async fn total_utxo_amount(
&self, stake_credential: StakeCredential, network: Network, slot_num: SlotNumber,
) -> anyhow::Result<(StakeAmount, SlotNumber)> {
let conn = self.pool.get().await?;

let row = conn
.query_one(SELECT_TOTAL_UTXO_AMOUNT_SQL, &[
&stake_credential,
&network.to_string(),
&slot_num,
])
let row = self
.query_one(
SELECT_TOTAL_UTXO_AMOUNT_SQL,
&[&stake_credential, &network.to_string(), &slot_num],
)
.await?;

// Aggregate functions as SUM and MAX return NULL if there are no rows, so we need to
Expand Down
18 changes: 6 additions & 12 deletions catalyst-gateway/bin/src/event_db/legacy/queries/event/ballot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,7 @@ impl EventDB {
pub(crate) async fn get_ballot(
&self, event: EventId, objective: ObjectiveId, proposal: ProposalId,
) -> anyhow::Result<Ballot> {
let conn = self.pool.get().await?;

let rows = conn
let rows = self
.query(Self::BALLOT_VOTE_OPTIONS_QUERY, &[
&event.0,
&objective.0,
Expand All @@ -65,7 +63,7 @@ impl EventDB {
let row = rows.first().ok_or(NotFoundError)?;
let choices = row.try_get("objective")?;

let rows = conn
let rows = self
.query(Self::BALLOT_VOTE_PLANS_QUERY, &[
&event.0,
&objective.0,
Expand Down Expand Up @@ -96,9 +94,7 @@ impl EventDB {
pub(crate) async fn get_objective_ballots(
&self, event: EventId, objective: ObjectiveId,
) -> anyhow::Result<Vec<ProposalBallot>> {
let conn = self.pool.get().await?;

let rows = conn
let rows = self
.query(Self::BALLOTS_VOTE_OPTIONS_PER_OBJECTIVE_QUERY, &[
&event.0,
&objective.0,
Expand All @@ -110,7 +106,7 @@ impl EventDB {
let choices = row.try_get("objective")?;
let proposal_id = ProposalId(row.try_get("proposal_id")?);

let rows = conn
let rows = self
.query(Self::BALLOT_VOTE_PLANS_QUERY, &[
&event.0,
&objective.0,
Expand Down Expand Up @@ -146,9 +142,7 @@ impl EventDB {
pub(crate) async fn get_event_ballots(
&self, event: EventId,
) -> anyhow::Result<Vec<ObjectiveBallots>> {
let conn = self.pool.get().await?;

let rows = conn
let rows = self
.query(Self::BALLOTS_VOTE_OPTIONS_PER_EVENT_QUERY, &[&event.0])
.await?;
let mut ballots = HashMap::<ObjectiveId, Vec<ProposalBallot>>::new();
Expand All @@ -157,7 +151,7 @@ impl EventDB {
let proposal_id = ProposalId(row.try_get("proposal_id")?);
let objective_id = ObjectiveId(row.try_get("objective_id")?);

let rows = conn
let rows = self
.query(Self::BALLOT_VOTE_PLANS_QUERY, &[
&event.0,
&objective_id.0,
Expand Down
10 changes: 3 additions & 7 deletions catalyst-gateway/bin/src/event_db/legacy/queries/event/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,7 @@ impl EventDB {
pub(crate) async fn get_events(
&self, limit: Option<i64>, offset: Option<i64>,
) -> anyhow::Result<Vec<EventSummary>> {
let conn = self.pool.get().await?;

let rows = conn
let rows = self
.query(Self::EVENTS_QUERY, &[&limit, &offset.unwrap_or(0)])
.await?;

Expand Down Expand Up @@ -77,9 +75,7 @@ impl EventDB {
/// Get event query
#[allow(dead_code)]
pub(crate) async fn get_event(&self, event: EventId) -> anyhow::Result<Event> {
let conn = self.pool.get().await?;

let rows = conn.query(Self::EVENT_QUERY, &[&event.0]).await?;
let rows = self.query(Self::EVENT_QUERY, &[&event.0]).await?;
let row = rows.first().ok_or(NotFoundError)?;

let ends = row
Expand Down Expand Up @@ -133,7 +129,7 @@ impl EventDB {
.map(|val| val.and_local_timezone(Utc).unwrap()),
};

let rows = conn.query(Self::EVENT_GOALS_QUERY, &[&event.0]).await?;
let rows = self.query(Self::EVENT_GOALS_QUERY, &[&event.0]).await?;
let mut goals = Vec::new();
for row in rows {
goals.push(EventGoal {
Expand Down
Loading

0 comments on commit 5ee5b1a

Please sign in to comment.