Skip to content

Commit

Permalink
Some latency-related minor touch-ups
Browse files Browse the repository at this point in the history
  • Loading branch information
vrmiguel committed Jul 25, 2023
1 parent 6517c29 commit c41ceae
Showing 1 changed file with 8 additions and 10 deletions.
18 changes: 8 additions & 10 deletions core/src/pg_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::Message;
use log::info;
use serde::{Deserialize, Serialize};
use sqlx::types::chrono::Utc;
use sqlx::{Pool, Postgres};
use sqlx::{Pool, Postgres, Executor};

/// Main controller for interacting with a managed by the PGMQ Postgres extension.
#[derive(Clone, Debug)]
Expand All @@ -22,23 +22,21 @@ impl PGMQueueExt {
/// Initialize a connection to PGMQ/Postgres
pub async fn new(url: String, max_connections: u32) -> Result<PGMQueueExt, PgmqError> {
Ok(PGMQueueExt {
url: url.clone(),
connection: connect(&url, max_connections).await?,
url,
})
}

pub async fn init(&self) -> Result<bool, PgmqError> {
match sqlx::query!("CREATE EXTENSION IF NOT EXISTS pgmq CASCADE;")
sqlx::query!("CREATE EXTENSION IF NOT EXISTS pgmq CASCADE;")
.execute(&self.connection)
.await
{
Ok(_) => Ok(true),
Err(e) => Err(PgmqError::from(e)),
}
.map(|_| true)
.map_err(PgmqError::from)
}

/// Create a new partitioned queue.
/// Errors when there is any database error and Result<false> when the queue already exists.
/// Errors when there is any database error and Ok(false) when the queue already exists.
pub async fn create(&self, queue_name: &str) -> Result<bool, PgmqError> {
check_input(queue_name)?;
let queue_table = format!("public.{TABLE_PREFIX}_{queue_name}");
Expand Down Expand Up @@ -70,9 +68,9 @@ impl PGMQueueExt {
/// Drop an existing queue table.
pub async fn drop_queue(&self, queue_name: &str) -> Result<(), PgmqError> {
check_input(queue_name)?;
sqlx::query!("SELECT * from pgmq_drop_queue($1::text);", queue_name)
.fetch_optional(&self.connection)
self.connection.execute(sqlx::query!("SELECT * from pgmq_drop_queue($1::text);", queue_name))
.await?;

Ok(())
}

Expand Down

0 comments on commit c41ceae

Please sign in to comment.