diff --git a/core/tests/integration_test.rs b/core/tests/integration_test.rs index 7e1dc1e7..70d33980 100644 --- a/core/tests/integration_test.rs +++ b/core/tests/integration_test.rs @@ -536,7 +536,7 @@ async fn test_archive() { let num_rows_queue = rowcount(&test_queue, &queue.connection).await; // archived record is no longer on the queue assert_eq!(num_rows_queue, 0); - let num_rows_archive = rowcount(&format!("{test_queue}_archive"), &queue.connection).await; + let num_rows_archive = rowcount(&format!("pgmq.{test_queue}_archive"), &queue.connection).await; // archived record is now on the archive table assert_eq!(num_rows_archive, 1); } diff --git a/src/lib.rs b/src/lib.rs index 9f698b89..c5d94d32 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -319,7 +319,7 @@ fn pgmq_set_vt( let query = format!( " - UPDATE {TABLE_PREFIX}_{queue_name} + UPDATE {PGMQ_SCHEMA}.{TABLE_PREFIX}_{queue_name} SET vt = (now() + interval '{vt_offset} seconds') WHERE msg_id = $1 RETURNING *; @@ -354,23 +354,27 @@ mod tests { fn test_creat_non_partitioned() { let qname = r#"test_queue"#; let _ = pgmq_create_non_partitioned(&qname).unwrap(); - let retval = Spi::get_one::(&format!("SELECT count(*) FROM {TABLE_PREFIX}_{qname}")) - .expect("SQL select failed"); + let retval = Spi::get_one::(&format!( + "SELECT count(*) FROM {PGMQ_SCHEMA}.{TABLE_PREFIX}_{qname}" + )) + .expect("SQL select failed"); assert_eq!(retval.unwrap(), 0); let _ = pgmq_send(&qname, pgrx::JsonB(serde_json::json!({"x":"y"}))).unwrap(); - let retval = Spi::get_one::(&format!("SELECT count(*) FROM {TABLE_PREFIX}_{qname}")) - .expect("SQL select failed"); + let retval = Spi::get_one::(&format!( + "SELECT count(*) FROM {PGMQ_SCHEMA}.{TABLE_PREFIX}_{qname}" + )) + .expect("SQL select failed"); assert_eq!(retval.unwrap(), 1); } // assert an invisible message is not readable #[pg_test] fn test_default() { - let qname = r#"test_default"#; + let qname = "test_default"; + let table = format!("{PGMQ_SCHEMA}.{TABLE_PREFIX}_{qname}"); let _ = pgmq_create_non_partitioned(&qname); - let init_count = - Spi::get_one::(&format!("SELECT count(*) FROM {TABLE_PREFIX}_{qname}")) - .expect("SQL select failed"); + let init_count = Spi::get_one::(&format!("SELECT count(*) FROM {table}")) + .expect("SQL select failed"); // should not be any messages initially assert_eq!(init_count.unwrap(), 0); @@ -380,9 +384,8 @@ mod tests { // read the message with the pg_extern, sets message invisible let _ = pgmq_read(&qname, 10_i32, 1_i32); // but still one record on the table - let init_count = - Spi::get_one::(&format!("SELECT count(*) FROM {TABLE_PREFIX}_{qname}")) - .expect("SQL select failed"); + let init_count = Spi::get_one::(&format!("SELECT count(*) FROM {table}")) + .expect("SQL select failed"); assert_eq!(init_count.unwrap(), 1); // pop the message, must not panic @@ -424,9 +427,10 @@ mod tests { assert_eq!(nothing.len(), 0); // but still one record on the table - let init_count = - Spi::get_one::(&format!("SELECT count(*) FROM {TABLE_PREFIX}_{qname}")) - .expect("SQL select failed"); + let init_count = Spi::get_one::(&format!( + "SELECT count(*) FROM {PGMQ_SCHEMA}.{TABLE_PREFIX}_{qname}" + )) + .expect("SQL select failed"); assert_eq!(init_count.unwrap(), 1); // delete the messages @@ -438,9 +442,10 @@ mod tests { assert!(!delete1); // no records after delete - let init_count = - Spi::get_one::(&format!("SELECT count(*) FROM {TABLE_PREFIX}_{qname}")) - .expect("SQL select failed"); + let init_count = Spi::get_one::(&format!( + "SELECT count(*) FROM {PGMQ_SCHEMA}.{TABLE_PREFIX}_{qname}" + )) + .expect("SQL select failed"); assert_eq!(init_count.unwrap(), 0); } @@ -493,9 +498,10 @@ mod tests { assert_eq!(nothing.len(), 0); // but still one record on the table - let init_count = - Spi::get_one::(&format!("SELECT count(*) FROM {TABLE_PREFIX}_{qname}")) - .expect("SQL select failed"); + let init_count = Spi::get_one::(&format!( + "SELECT count(*) FROM {PGMQ_SCHEMA}.{TABLE_PREFIX}_{qname}" + )) + .expect("SQL select failed"); assert_eq!(init_count.unwrap(), 1); // delete the messages @@ -507,9 +513,10 @@ mod tests { assert!(!delete1); // no records after delete - let init_count = - Spi::get_one::(&format!("SELECT count(*) FROM {TABLE_PREFIX}_{qname}")) - .expect("SQL select failed"); + let init_count = Spi::get_one::(&format!( + "SELECT count(*) FROM {PGMQ_SCHEMA}.{TABLE_PREFIX}_{qname}" + )) + .expect("SQL select failed"); assert_eq!(init_count.unwrap(), 0); } @@ -519,31 +526,37 @@ mod tests { let _ = Spi::run("CREATE EXTENSION IF NOT EXISTS pg_partman").expect("SQL select failed"); let _ = pgmq_create_non_partitioned(&qname).unwrap(); // no messages in the queue - let retval = Spi::get_one::(&format!("SELECT count(*) FROM {TABLE_PREFIX}_{qname}")) - .expect("SQL select failed"); + let retval = Spi::get_one::(&format!( + "SELECT count(*) FROM {PGMQ_SCHEMA}.{TABLE_PREFIX}_{qname}" + )) + .expect("SQL select failed"); assert_eq!(retval.unwrap(), 0); // no messages in queue archive let retval = Spi::get_one::(&format!( - "SELECT count(*) FROM {TABLE_PREFIX}_{qname}_archive" + "SELECT count(*) FROM {PGMQ_SCHEMA}.{TABLE_PREFIX}_{qname}_archive" )) .expect("SQL select failed"); assert_eq!(retval.unwrap(), 0); // put a message on the queue let msg_id = pgmq_send(&qname, pgrx::JsonB(serde_json::json!({"x":"y"}))).unwrap(); - let retval = Spi::get_one::(&format!("SELECT count(*) FROM {TABLE_PREFIX}_{qname}")) - .expect("SQL select failed"); + let retval = Spi::get_one::(&format!( + "SELECT count(*) FROM {PGMQ_SCHEMA}.{TABLE_PREFIX}_{qname}" + )) + .expect("SQL select failed"); assert_eq!(retval.unwrap(), 1); // archive the message let archived = pgmq_archive(&qname, msg_id.unwrap()).unwrap().unwrap(); assert!(archived); // should be no messages left on the queue table - let retval = Spi::get_one::(&format!("SELECT count(*) FROM {TABLE_PREFIX}_{qname}")) - .expect("SQL select failed"); + let retval = Spi::get_one::(&format!( + "SELECT count(*) FROM {PGMQ_SCHEMA}.{TABLE_PREFIX}_{qname}" + )) + .expect("SQL select failed"); assert_eq!(retval.unwrap(), 0); // but one on the archive table let retval = Spi::get_one::(&format!( - "SELECT count(*) FROM {TABLE_PREFIX}_{qname}_archive" + "SELECT count(*) FROM {PGMQ_SCHEMA}.{TABLE_PREFIX}_{qname}_archive" )) .expect("SQL select failed"); assert_eq!(retval.unwrap(), 1); diff --git a/src/partition.rs b/src/partition.rs index 6c678af8..dbd90286 100644 --- a/src/partition.rs +++ b/src/partition.rs @@ -95,7 +95,7 @@ fn create_partitioned_table( fn set_retention_config(queue: CheckedName<'_>, retention: &str) -> Result { Ok(format!( " - UPDATE {PGMQ_SCHEMA}.part_config + UPDATE {PARTMAN_SCHEMA}.part_config SET retention = '{retention}', retention_keep_table = false, diff --git a/tests/integration_tests.rs b/tests/integration_tests.rs index 63091027..7ee5268f 100644 --- a/tests/integration_tests.rs +++ b/tests/integration_tests.rs @@ -1,3 +1,4 @@ +use pgmq_crate::query::PGMQ_SCHEMA; use pgmq_crate::util::{conn_options, fetch_one_message}; use rand::Rng; use sqlx::postgres::PgPoolOptions; @@ -39,13 +40,15 @@ async fn test_lifecycle() { // CREATE with default retention and partition strategy let test_default_queue = format!("test_default_{test_num}"); - let _ = sqlx::query(&format!("SELECT pgmq.pgmq_create('{test_default_queue}');")) - .execute(&conn) - .await - .expect("failed to create queue"); + let _ = sqlx::query(&format!( + "SELECT {PGMQ_SCHEMA}.pgmq_create('{test_default_queue}');" + )) + .execute(&conn) + .await + .expect("failed to create queue"); let msg_id = sqlx::query(&format!( - "SELECT * from pgmq.pgmq_send('{test_default_queue}', '{{\"hello\": \"world\"}}');" + "SELECT * from {PGMQ_SCHEMA}.pgmq_send('{test_default_queue}', '{{\"hello\": \"world\"}}');" )) .fetch_one(&conn) .await @@ -55,7 +58,7 @@ async fn test_lifecycle() { // read message // vt=2, limit=1 - let query = &format!("SELECT * from pgmq.pgmq_read('{test_default_queue}', 2, 1);"); + let query = &format!("SELECT * from {PGMQ_SCHEMA}.pgmq_read('{test_default_queue}', 2, 1);"); let message = fetch_one_message::(query, &conn) .await @@ -64,8 +67,9 @@ async fn test_lifecycle() { assert_eq!(message.msg_id, 1); // set VT to tomorrow - let query = - &format!("SELECT * from pgmq.pgmq_set_vt('{test_default_queue}', {msg_id}, 84600);"); + let query = &format!( + "SELECT * from {PGMQ_SCHEMA}.pgmq_set_vt('{test_default_queue}', {msg_id}, 84600);" + ); let message = fetch_one_message::(query, &conn) .await .expect("failed reading message") @@ -76,14 +80,15 @@ async fn test_lifecycle() { assert!(message.vt > now + chrono::Duration::seconds(84000)); // read again, assert no messages because we just set VT to tomorrow - let query = &format!("SELECT * from pgmq.pgmq_read('{test_default_queue}', 2, 1);"); + let query = &format!("SELECT * from {PGMQ_SCHEMA}.pgmq_read('{test_default_queue}', 2, 1);"); let message = fetch_one_message::(query, &conn) .await .expect("failed reading message"); assert!(message.is_none()); // set VT to now - let query = &format!("SELECT * from pgmq.pgmq_set_vt('{test_default_queue}', {msg_id}, 0);"); + let query = + &format!("SELECT * from {PGMQ_SCHEMA}.pgmq_set_vt('{test_default_queue}', {msg_id}, 0);"); let message = fetch_one_message::(query, &conn) .await .expect("failed reading message") @@ -91,7 +96,7 @@ async fn test_lifecycle() { assert_eq!(message.msg_id, 1); // read again, should have msg_id 1 again - let query = &format!("SELECT * from pgmq.pgmq_read('{test_default_queue}', 2, 1);"); + let query = &format!("SELECT * from {PGMQ_SCHEMA}.pgmq_read('{test_default_queue}', 2, 1);"); let message = fetch_one_message::(query, &conn) .await .expect("failed reading message") @@ -105,7 +110,7 @@ async fn test_lifecycle() { // CREATE with 5 seconds per partition, 10 seconds retention let test_duration_queue = format!("test_duration_{test_num}"); - let q = format!("SELECT \"pgmq.pgmq_create_partitioned\"('{test_duration_queue}'::text, '5 seconds'::text, '10 seconds'::text);"); + let q = format!("SELECT {PGMQ_SCHEMA}.pgmq_create_partitioned('{test_duration_queue}'::text, '5 seconds'::text, '10 seconds'::text);"); let _ = sqlx::query(&q) .execute(&conn) .await @@ -114,7 +119,7 @@ async fn test_lifecycle() { // CREATE with 10 messages per partition, 20 messages retention let test_numeric_queue = format!("test_numeric_{test_num}"); let _ = sqlx::query(&format!( - "SELECT \"pgmq.pgmq_create_partitioned\"('{test_numeric_queue}'::text, '10'::text, '20'::text);" + "SELECT {PGMQ_SCHEMA}.pgmq_create_partitioned('{test_numeric_queue}'::text, '10'::text, '20'::text);" )) .execute(&conn) .await @@ -133,7 +138,7 @@ async fn test_lifecycle() { // get metrics let rows = sqlx::query_as::<_, MetricsRow>(&format!( - "SELECT * from pgmq.pgmq_metrics('{test_duration_queue}'::text);" + "SELECT * from {PGMQ_SCHEMA}.pgmq_metrics('{test_duration_queue}'::text);" )) .fetch_all(&conn) .await @@ -141,10 +146,12 @@ async fn test_lifecycle() { assert_eq!(rows.len(), 1); // get metrics all - let rows = sqlx::query_as::<_, MetricsRow>(&format!("SELECT * from pgmq.pgmq_metrics_all();")) - .fetch_all(&conn) - .await - .expect("failed creating numeric interval queue"); + let rows = sqlx::query_as::<_, MetricsRow>(&format!( + "SELECT * from {PGMQ_SCHEMA}.pgmq_metrics_all();" + )) + .fetch_all(&conn) + .await + .expect("failed creating numeric interval queue"); assert!(rows.len() > 1); // delete all the queues @@ -156,30 +163,37 @@ async fn test_lifecycle() { // delete partitioned queues for queue in [test_duration_queue, test_numeric_queue].iter() { - sqlx::query(&format!("select pgmq.pgmq_drop_queue('{}', true);", &queue)) - .execute(&conn) - .await - .expect("failed to drop partitioned queues"); + sqlx::query(&format!( + "select {PGMQ_SCHEMA}.pgmq_drop_queue('{}', true);", + &queue + )) + .execute(&conn) + .await + .expect("failed to drop partitioned queues"); } - let queues = sqlx::query_as::<_, QueueMeta>("select queue_name from pgmq_list_queues();") - .fetch_all(&conn) - .await - .expect("failed to list queues"); + let queues = sqlx::query_as::<_, QueueMeta>(&format!( + "select queue_name from {PGMQ_SCHEMA}.pgmq_list_queues();" + )) + .fetch_all(&conn) + .await + .expect("failed to list queues"); // drop the rest of the queues for queue in queues { let q = queue.queue_name; - sqlx::query(&format!("select pgmq.pgmq_drop_queue('{}');", &q)) + sqlx::query(&format!("select {PGMQ_SCHEMA}.pgmq_drop_queue('{}');", &q)) .execute(&conn) .await .expect("failed to drop standard queues"); } - let queues = sqlx::query_as::<_, QueueMeta>("select queue_name from pgmq_list_queues();") - .fetch_all(&conn) - .await - .expect("failed to list queues"); + let queues = sqlx::query_as::<_, QueueMeta>(&format!( + "select queue_name from {PGMQ_SCHEMA}.pgmq_list_queues();" + )) + .fetch_all(&conn) + .await + .expect("failed to list queues"); assert!(queues.is_empty()); #[allow(dead_code)]