Skip to content

Commit

Permalink
fix create partitioned api
Browse files Browse the repository at this point in the history
  • Loading branch information
ChuckHend committed Jul 31, 2023
1 parent abc4369 commit 3820e97
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 9 deletions.
20 changes: 16 additions & 4 deletions core/src/pg_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,18 @@ impl PGMQueueExt {
.map_err(PgmqError::from)
}

/// Create a new partitioned queue.
/// Errors when there is any database error and Ok(false) when the queue already exists.
pub async fn create(&self, queue_name: &str) -> Result<bool, PgmqError> {
check_input(queue_name)?;
sqlx::query!("SELECT * from pgmq_create($1::text);", queue_name)
.execute(&self.connection)
.await?;
Ok(true)
}

/// Create a new partitioned queue.
/// Errors when there is any database error and Ok(false) when the queue already exists.
pub async fn create_partitioned(&self, queue_name: &str) -> Result<bool, PgmqError> {
check_input(queue_name)?;
let queue_table = format!("public.{TABLE_PREFIX}_{queue_name}");
// we need to check whether the queue exists first
Expand All @@ -58,9 +67,12 @@ impl PGMQueueExt {
if exists {
Ok(false)
} else {
sqlx::query!("SELECT * from pgmq_create($1::text);", queue_name)
.execute(&self.connection)
.await?;
sqlx::query!(
"SELECT * from pgmq_create_partitioned($1::text);",
queue_name
)
.execute(&self.connection)
.await?;
Ok(true)
}
}
Expand Down
10 changes: 5 additions & 5 deletions core/tests/integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -803,20 +803,20 @@ async fn test_pgmq_init() {
.await
.expect("failed to connect to postgres");
let init = queue.init().await.expect("failed to create extension");
assert!(init);
assert!(init, "failed to create extension");

// error mode on queue create but already exists
// error mode on queue partitioned create but already exists
let qname = format!("test_dup_{}", rand::thread_rng().gen_range(0..100));
println!("db_url: {}, qname: {:?}", db_url, qname);
let created = queue
.create(&qname)
.create_partitioned(&qname)
.await
.expect("failed attempting to create queue");
assert!(created, "did not create queue");
// create again
let created = queue
.create(&qname)
.create_partitioned(&qname)
.await
.expect("failed attempting to create the duplicate queue");
assert!(!created)
assert!(!created, "failed to detect duplicate queue");
}

0 comments on commit 3820e97

Please sign in to comment.