diff --git a/core/src/pg_ext.rs b/core/src/pg_ext.rs index 5b3281ad..69277eff 100644 --- a/core/src/pg_ext.rs +++ b/core/src/pg_ext.rs @@ -35,9 +35,18 @@ impl PGMQueueExt { .map_err(PgmqError::from) } - /// Create a new partitioned queue. /// Errors when there is any database error and Ok(false) when the queue already exists. pub async fn create(&self, queue_name: &str) -> Result { + check_input(queue_name)?; + sqlx::query!("SELECT * from pgmq_create($1::text);", queue_name) + .execute(&self.connection) + .await?; + Ok(true) + } + + /// Create a new partitioned queue. + /// Errors when there is any database error and Ok(false) when the queue already exists. + pub async fn create_partitioned(&self, queue_name: &str) -> Result { check_input(queue_name)?; let queue_table = format!("public.{TABLE_PREFIX}_{queue_name}"); // we need to check whether the queue exists first @@ -58,9 +67,12 @@ impl PGMQueueExt { if exists { Ok(false) } else { - sqlx::query!("SELECT * from pgmq_create($1::text);", queue_name) - .execute(&self.connection) - .await?; + sqlx::query!( + "SELECT * from pgmq_create_partitioned($1::text);", + queue_name + ) + .execute(&self.connection) + .await?; Ok(true) } } diff --git a/core/tests/integration_test.rs b/core/tests/integration_test.rs index cfbc4995..0afb4f47 100644 --- a/core/tests/integration_test.rs +++ b/core/tests/integration_test.rs @@ -803,20 +803,20 @@ async fn test_pgmq_init() { .await .expect("failed to connect to postgres"); let init = queue.init().await.expect("failed to create extension"); - assert!(init); + assert!(init, "failed to create extension"); - // error mode on queue create but already exists + // error mode on queue partitioned create but already exists let qname = format!("test_dup_{}", rand::thread_rng().gen_range(0..100)); println!("db_url: {}, qname: {:?}", db_url, qname); let created = queue - .create(&qname) + .create_partitioned(&qname) .await .expect("failed attempting to create queue"); assert!(created, "did not create queue"); // create again let created = queue - .create(&qname) + .create_partitioned(&qname) .await .expect("failed attempting to create the duplicate queue"); - assert!(!created) + assert!(!created, "failed to detect duplicate queue"); }