Skip to content

Commit

Permalink
Some latency-related minor touch-ups
Browse files Browse the repository at this point in the history
  • Loading branch information
vrmiguel committed Jul 25, 2023
1 parent c41ceae commit e325d25
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 8 deletions.
8 changes: 6 additions & 2 deletions core/src/pg_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::Message;
use log::info;
use serde::{Deserialize, Serialize};
use sqlx::types::chrono::Utc;
use sqlx::{Pool, Postgres, Executor};
use sqlx::{Executor, Pool, Postgres};

/// Main controller for interacting with a managed by the PGMQ Postgres extension.
#[derive(Clone, Debug)]
Expand Down Expand Up @@ -68,7 +68,11 @@ impl PGMQueueExt {
/// Drop an existing queue table.
pub async fn drop_queue(&self, queue_name: &str) -> Result<(), PgmqError> {
check_input(queue_name)?;
self.connection.execute(sqlx::query!("SELECT * from pgmq_drop_queue($1::text);", queue_name))
self.connection
.execute(sqlx::query!(
"SELECT * from pgmq_drop_queue($1::text);",
queue_name
))
.await?;

Ok(())
Expand Down
12 changes: 6 additions & 6 deletions src/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ pub fn init_partitioned_queue(
Ok(vec![
create_meta(),
grant_pgmon_meta(),
create_partitioned_queue(name, &partition_col)?,
create_partitioned_index(name, &partition_col)?,
create_partitioned_queue(name, partition_col)?,
create_partitioned_index(name, partition_col)?,
create_index(name)?,
create_archive(name)?,
create_partitioned_table(name, &partition_col, partition_interval)?,
create_partitioned_table(name, partition_col, partition_interval)?,
insert_meta(name)?,
set_retention_config(name, retention_interval)?,
grant_pgmon_queue(name)?,
Expand All @@ -35,12 +35,12 @@ pub fn init_partitioned_queue(
}

/// maps the partition column based on partition_interval
fn map_partition_col(partition_interval: &str) -> String {
fn map_partition_col(partition_interval: &str) -> &'static str {
// map using msg_id when partition_interval is an integer
// otherwise use enqueued_at (time based)
match partition_interval.parse::<i32>() {
Ok(_) => "msg_id".to_owned(),
Err(_) => "enqueued_at".to_owned(),
Ok(_) => "msg_id",
Err(_) => "enqueued_at",
}
}

Expand Down

0 comments on commit e325d25

Please sign in to comment.