Skip to content

Commit

Permalink
feat: add EventDb::query and EventDb::modify methods
Browse files Browse the repository at this point in the history
  • Loading branch information
saibatizoku committed May 15, 2024
1 parent 387de24 commit 8e8617a
Showing 1 changed file with 59 additions and 7 deletions.
66 changes: 59 additions & 7 deletions catalyst-gateway/bin/src/event_db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ impl EventDB {
}

/// Modify the deep query inspection setting.
///
/// # Arguments
///
/// * `deep_query` - `DeepQueryInspection` setting.
pub(crate) async fn modify_deep_query(&self, deep_query: DeepQueryInspection) {
let mut settings = self.inspection_settings.write().await;
settings.deep_query = deep_query;
Expand All @@ -57,46 +61,90 @@ impl EventDB {
/// rolled-back transaction, before running the query.
///
/// # Arguments
///
/// * `stmt` - `&str` SQL statement.
/// * `params` - `&[&(dyn ToSql + Sync)]` SQL parameters.
///
/// # Returns
///
/// `Result<Vec<Row>, anyhow::Error>`
#[must_use = "ONLY use this function for SELECT type operations which return row data, otherwise use `modify()`"]
pub(crate) async fn query(
&self, stmt: &str, params: &[&(dyn ToSql + Sync)],
) -> Result<Vec<Row>, anyhow::Error> {
if self.is_deep_query_enabled().await {
self.explain_analyze(stmt, params).await?;
self.explain_analyze(stmt, params, true).await?;
}
let conn = self.pool.get().await?;
let rows = conn.query(stmt, params).await?;
Ok(rows)
}

/// Query the database for a single row.
///
/// # Arguments
///
/// * `stmt` - `&str` SQL statement.
/// * `params` - `&[&(dyn ToSql + Sync)]` SQL parameters.
///
/// # Returns
///
/// `Result<Row, anyhow::Error>`
#[must_use = "ONLY use this function for SELECT type operations which return row data, otherwise use `modify()`"]
pub(crate) async fn query_one(
&self, stmt: &str, params: &[&(dyn ToSql + Sync)],
) -> Result<Row, anyhow::Error> {
if self.is_deep_query_enabled().await {
self.explain_analyze(stmt, params).await?;
self.explain_analyze(stmt, params, true).await?;
}
let conn = self.pool.get().await?;
let row = conn.query_one(stmt, params).await?;
Ok(row)
}

/// Modify the database.
///
/// Use this for `UPDATE`, `DELETE`, and other DB statements that
/// don't return data.
///
/// # Arguments
///
/// * `stmt` - `&str` SQL statement.
/// * `params` - `&[&(dyn ToSql + Sync)]` SQL parameters.
///
/// # Returns
///
/// `Result<(), anyhow::Error>`
pub(crate) async fn modify(
&self, stmt: &str, params: &[&(dyn ToSql + Sync)],
) -> Result<(), anyhow::Error> {
if self.is_deep_query_enabled().await {
self.explain_analyze(stmt, params, false).await?;
}
let conn = self.pool.get().await?;
let _row = conn.query(stmt, params).await?;
Ok(())
}

/// Prepend `EXPLAIN ANALYZE` to the query.
///
/// Log the query plan inside a rolled-back transaction.
/// Log the query plan inside a transaction that may be committed or rolled back.
///
/// # Arguments
///
/// * `stmt` - `&str` SQL statement.
/// * `params` - `&[&(dyn ToSql + Sync)]` SQL parameters.
/// * `rollback` - `bool` whether to roll back the transaction or not.
async fn explain_analyze(
&self, stmt: &str, params: &[&(dyn ToSql + Sync)],
&self, stmt: &str, params: &[&(dyn ToSql + Sync)], rollback: bool,
) -> anyhow::Result<()> {
let span = debug_span!(
"query_plan",
query_statement = stmt,
params = format!("{:?}", params),
uuid = uuid::Uuid::new_v4().to_string()
);

async move {
let mut conn = self.pool.get().await?;
let transaction = conn.transaction().await?;
Expand All @@ -105,10 +153,14 @@ impl EventDB {
.await?;
let rows = transaction.query(&explain_stmt, params).await?;
for r in rows {
let c: String = r.get("QUERY PLAN");
debug!("{}", c);
let query_plan_str: String = r.get("QUERY PLAN");
debug!("{}", query_plan_str);
}
if rollback {
transaction.rollback().await?;
} else {
transaction.commit().await?;
}
transaction.rollback().await?;
Ok(())
}
.instrument(span)
Expand Down

0 comments on commit 8e8617a

Please sign in to comment.