Skip to content

Commit

Permalink
add CheckedName as an abstraction for valid Pg identifiers
Browse files Browse the repository at this point in the history
  • Loading branch information
vrmiguel committed Aug 8, 2023
1 parent 6cf5b1b commit efd06b9
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 36 deletions.
41 changes: 15 additions & 26 deletions core/src/query.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
//! Query constructors

use crate::errors::PgmqError;
use crate::{errors::PgmqError, util::CheckedName};
use sqlx::types::chrono::Utc;
pub const TABLE_PREFIX: &str = r#"pgmq"#;
pub const PGMQ_SCHEMA: &str = "public";

pub fn init_queue(name: &str) -> Result<Vec<String>, PgmqError> {
check_input(name)?;
let name = CheckedName::new(name)?;
Ok(vec![
create_meta(),
create_queue(name)?,
Expand All @@ -20,7 +20,7 @@ pub fn init_queue(name: &str) -> Result<Vec<String>, PgmqError> {
}

pub fn destroy_queue(name: &str) -> Result<Vec<String>, PgmqError> {
check_input(name)?;
let name = CheckedName::new(name)?;
Ok(vec![
drop_queue(name)?,
delete_queue_index(name)?,
Expand All @@ -29,8 +29,7 @@ pub fn destroy_queue(name: &str) -> Result<Vec<String>, PgmqError> {
])
}

pub fn create_queue(name: &str) -> Result<String, PgmqError> {
check_input(name)?;
pub fn create_queue(name: CheckedName<'_>) -> Result<String, PgmqError> {
Ok(format!(
"
CREATE TABLE IF NOT EXISTS {PGMQ_SCHEMA}.{TABLE_PREFIX}_{name} (
Expand All @@ -44,8 +43,7 @@ pub fn create_queue(name: &str) -> Result<String, PgmqError> {
))
}

pub fn create_archive(name: &str) -> Result<String, PgmqError> {
check_input(name)?;
pub fn create_archive(name: CheckedName<'_>) -> Result<String, PgmqError> {
Ok(format!(
"
CREATE TABLE IF NOT EXISTS {PGMQ_SCHEMA}.{TABLE_PREFIX}_{name}_archive (
Expand Down Expand Up @@ -95,38 +93,33 @@ pub fn grant_pgmon_meta() -> String {
}

// pg_monitor needs to query queue tables
pub fn grant_pgmon_queue(name: &str) -> Result<String, PgmqError> {
check_input(name)?;
pub fn grant_pgmon_queue(name: CheckedName<'_>) -> Result<String, PgmqError> {
let table = format!("{PGMQ_SCHEMA}.{TABLE_PREFIX}_{name}");
Ok(grant_stmt(&table))
}

pub fn grant_pgmon_queue_seq(name: &str) -> Result<String, PgmqError> {
check_input(name)?;
pub fn grant_pgmon_queue_seq(name: CheckedName<'_>) -> Result<String, PgmqError> {
let table = format!("{PGMQ_SCHEMA}.{TABLE_PREFIX}_{name}_msg_id_seq");
Ok(grant_stmt(&table))
}

pub fn drop_queue(name: &str) -> Result<String, PgmqError> {
check_input(name)?;
pub fn drop_queue(name: CheckedName<'_>) -> Result<String, PgmqError> {
Ok(format!(
"
DROP TABLE IF EXISTS {PGMQ_SCHEMA}.{TABLE_PREFIX}_{name};
"
))
}

pub fn delete_queue_index(name: &str) -> Result<String, PgmqError> {
check_input(name)?;
pub fn delete_queue_index(name: CheckedName<'_>) -> Result<String, PgmqError> {
Ok(format!(
"
DROP INDEX IF EXISTS {TABLE_PREFIX}_{name}.vt_idx_{name};
"
))
}

pub fn delete_queue_metadata(name: &str) -> Result<String, PgmqError> {
check_input(name)?;
pub fn delete_queue_metadata(name: CheckedName<'_>) -> Result<String, PgmqError> {
Ok(format!(
"
DO $$
Expand All @@ -145,17 +138,15 @@ pub fn delete_queue_metadata(name: &str) -> Result<String, PgmqError> {
))
}

pub fn drop_queue_archive(name: &str) -> Result<String, PgmqError> {
check_input(name)?;
pub fn drop_queue_archive(name: CheckedName<'_>) -> Result<String, PgmqError> {
Ok(format!(
"
DROP TABLE IF EXISTS {PGMQ_SCHEMA}.{TABLE_PREFIX}_{name}_archive;
"
))
}

pub fn insert_meta(name: &str) -> Result<String, PgmqError> {
check_input(name)?;
pub fn insert_meta(name: CheckedName<'_>) -> Result<String, PgmqError> {
Ok(format!(
"
INSERT INTO {PGMQ_SCHEMA}.{TABLE_PREFIX}_meta (queue_name)
Expand All @@ -166,8 +157,7 @@ pub fn insert_meta(name: &str) -> Result<String, PgmqError> {
))
}

pub fn create_archive_index(name: &str) -> Result<String, PgmqError> {
check_input(name)?;
pub fn create_archive_index(name: CheckedName<'_>) -> Result<String, PgmqError> {
Ok(format!(
"
CREATE INDEX IF NOT EXISTS deleted_at_idx_{name} ON {PGMQ_SCHEMA}.{TABLE_PREFIX}_{name}_archive (deleted_at);
Expand All @@ -176,8 +166,7 @@ pub fn create_archive_index(name: &str) -> Result<String, PgmqError> {
}

// indexes are created ascending to support FIFO
pub fn create_index(name: &str) -> Result<String, PgmqError> {
check_input(name)?;
pub fn create_index(name: CheckedName<'_>) -> Result<String, PgmqError> {
Ok(format!(
"
CREATE INDEX IF NOT EXISTS msg_id_vt_idx_{name} ON {PGMQ_SCHEMA}.{TABLE_PREFIX}_{name} (vt ASC, msg_id ASC);
Expand All @@ -192,7 +181,7 @@ pub fn enqueue(
) -> Result<String, PgmqError> {
// construct string of comma separated messages
check_input(name)?;
let mut values: String = "".to_owned();
let mut values = "".to_owned();
for message in messages.iter() {
let full_msg = format!(
"((now() at time zone 'utc' + interval '{delay} seconds'), '{message}'::json),"
Expand Down
28 changes: 28 additions & 0 deletions core/src/util.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
use std::fmt::Display;

use crate::query::check_input;
use crate::{Message, PgmqError};
use log::LevelFilter;
use serde::Deserialize;
Expand Down Expand Up @@ -63,3 +66,28 @@ pub async fn fetch_one_message<T: for<'de> Deserialize<'de>>(
Err(e) => Err(e)?,
}
}

/// A string that is known to be formed of only ASCII alphanumeric or an underscore;
#[derive(Clone, Copy)]
pub struct CheckedName<'a>(&'a str);

impl<'a> CheckedName<'a> {
/// Accepts `input` as a CheckedName if it is a valid queue identifier
pub fn new(input: &'a str) -> Result<Self, PgmqError> {
check_input(input)?;

Ok(Self(input))
}
}

impl AsRef<str> for CheckedName<'_> {
fn as_ref(&self) -> &str {
self.0
}
}

impl Display for CheckedName<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(self.0)
}
}
24 changes: 14 additions & 10 deletions src/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@ use pgrx::prelude::*;
use pgmq_crate::{
errors::PgmqError,
query::{
check_input, create_archive, create_index, create_meta, grant_pgmon_meta,
grant_pgmon_queue, grant_pgmon_queue_seq, insert_meta, PGMQ_SCHEMA, TABLE_PREFIX,
create_archive, create_index, create_meta, grant_pgmon_meta, grant_pgmon_queue,
grant_pgmon_queue_seq, insert_meta, PGMQ_SCHEMA, TABLE_PREFIX,
},
util::CheckedName,
};

// for now, put pg_partman in the public PGMQ_SCHEMA
Expand All @@ -17,7 +18,7 @@ pub fn init_partitioned_queue(
partition_interval: &str,
retention_interval: &str,
) -> Result<Vec<String>, PgmqError> {
check_input(name)?;
let name = CheckedName::new(name)?;
let partition_col = map_partition_col(partition_interval);
Ok(vec![
create_meta(),
Expand All @@ -44,8 +45,10 @@ fn map_partition_col(partition_interval: &str) -> &'static str {
}
}

fn create_partitioned_queue(queue: &str, partition_col: &str) -> Result<String, PgmqError> {
check_input(queue)?;
fn create_partitioned_queue(
queue: CheckedName<'_>,
partition_col: &str,
) -> Result<String, PgmqError> {
Ok(format!(
"
CREATE TABLE IF NOT EXISTS {PGMQ_SCHEMA}.{TABLE_PREFIX}_{queue} (
Expand All @@ -59,8 +62,10 @@ fn create_partitioned_queue(queue: &str, partition_col: &str) -> Result<String,
))
}

pub fn create_partitioned_index(queue: &str, partiton_col: &str) -> Result<String, PgmqError> {
check_input(queue)?;
pub fn create_partitioned_index(
queue: CheckedName<'_>,
partiton_col: &str,
) -> Result<String, PgmqError> {
Ok(format!(
"
CREATE INDEX IF NOT EXISTS pgmq_partition_idx_{queue} ON {PGMQ_SCHEMA}.{TABLE_PREFIX}_{queue} ({partiton_col});
Expand All @@ -69,7 +74,7 @@ pub fn create_partitioned_index(queue: &str, partiton_col: &str) -> Result<Strin
}

fn create_partitioned_table(
queue: &str,
queue: CheckedName<'_>,
partition_col: &str,
partition_interval: &str,
) -> Result<String, PgmqError> {
Expand All @@ -86,8 +91,7 @@ fn create_partitioned_table(
// messages .archived() will be retained forever on the `<queue_name>_archive` table
// https://github.com/pgpartman/pg_partman/blob/ca212077f66af19c0ca317c206091cd31d3108b8/doc/pg_partman.md#retention
// integer value will set that any partitions with an id value less than the current maximum id value minus the retention value will be dropped
fn set_retention_config(queue: &str, retention: &str) -> Result<String, PgmqError> {
check_input(queue)?;
fn set_retention_config(queue: CheckedName<'_>, retention: &str) -> Result<String, PgmqError> {
Ok(format!(
"
UPDATE {PGMQ_SCHEMA}.part_config
Expand Down

0 comments on commit efd06b9

Please sign in to comment.