Skip to content

Commit

Permalink
update schema in tests
Browse files Browse the repository at this point in the history
  • Loading branch information
ChuckHend committed Aug 16, 2023
1 parent e1f7c63 commit f223ec1
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 65 deletions.
2 changes: 1 addition & 1 deletion core/tests/integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -536,7 +536,7 @@ async fn test_archive() {
let num_rows_queue = rowcount(&test_queue, &queue.connection).await;
// archived record is no longer on the queue
assert_eq!(num_rows_queue, 0);
let num_rows_archive = rowcount(&format!("{test_queue}_archive"), &queue.connection).await;
let num_rows_archive = rowcount(&format!("pgmq.{test_queue}_archive"), &queue.connection).await;
// archived record is now on the archive table
assert_eq!(num_rows_archive, 1);
}
Expand Down
77 changes: 45 additions & 32 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ fn pgmq_set_vt(

let query = format!(
"
UPDATE {TABLE_PREFIX}_{queue_name}
UPDATE {PGMQ_SCHEMA}.{TABLE_PREFIX}_{queue_name}
SET vt = (now() + interval '{vt_offset} seconds')
WHERE msg_id = $1
RETURNING *;
Expand Down Expand Up @@ -354,23 +354,27 @@ mod tests {
fn test_creat_non_partitioned() {
let qname = r#"test_queue"#;
let _ = pgmq_create_non_partitioned(&qname).unwrap();
let retval = Spi::get_one::<i64>(&format!("SELECT count(*) FROM {TABLE_PREFIX}_{qname}"))
.expect("SQL select failed");
let retval = Spi::get_one::<i64>(&format!(
"SELECT count(*) FROM {PGMQ_SCHEMA}.{TABLE_PREFIX}_{qname}"
))
.expect("SQL select failed");
assert_eq!(retval.unwrap(), 0);
let _ = pgmq_send(&qname, pgrx::JsonB(serde_json::json!({"x":"y"}))).unwrap();
let retval = Spi::get_one::<i64>(&format!("SELECT count(*) FROM {TABLE_PREFIX}_{qname}"))
.expect("SQL select failed");
let retval = Spi::get_one::<i64>(&format!(
"SELECT count(*) FROM {PGMQ_SCHEMA}.{TABLE_PREFIX}_{qname}"
))
.expect("SQL select failed");
assert_eq!(retval.unwrap(), 1);
}

// assert an invisible message is not readable
#[pg_test]
fn test_default() {
let qname = r#"test_default"#;
let qname = "test_default";
let table = format!("{PGMQ_SCHEMA}.{TABLE_PREFIX}_{qname}");
let _ = pgmq_create_non_partitioned(&qname);
let init_count =
Spi::get_one::<i64>(&format!("SELECT count(*) FROM {TABLE_PREFIX}_{qname}"))
.expect("SQL select failed");
let init_count = Spi::get_one::<i64>(&format!("SELECT count(*) FROM {table}"))
.expect("SQL select failed");
// should not be any messages initially
assert_eq!(init_count.unwrap(), 0);

Expand All @@ -380,9 +384,8 @@ mod tests {
// read the message with the pg_extern, sets message invisible
let _ = pgmq_read(&qname, 10_i32, 1_i32);
// but still one record on the table
let init_count =
Spi::get_one::<i64>(&format!("SELECT count(*) FROM {TABLE_PREFIX}_{qname}"))
.expect("SQL select failed");
let init_count = Spi::get_one::<i64>(&format!("SELECT count(*) FROM {table}"))
.expect("SQL select failed");
assert_eq!(init_count.unwrap(), 1);

// pop the message, must not panic
Expand Down Expand Up @@ -424,9 +427,10 @@ mod tests {
assert_eq!(nothing.len(), 0);

// but still one record on the table
let init_count =
Spi::get_one::<i64>(&format!("SELECT count(*) FROM {TABLE_PREFIX}_{qname}"))
.expect("SQL select failed");
let init_count = Spi::get_one::<i64>(&format!(
"SELECT count(*) FROM {PGMQ_SCHEMA}.{TABLE_PREFIX}_{qname}"
))
.expect("SQL select failed");
assert_eq!(init_count.unwrap(), 1);

// delete the messages
Expand All @@ -438,9 +442,10 @@ mod tests {
assert!(!delete1);

// no records after delete
let init_count =
Spi::get_one::<i64>(&format!("SELECT count(*) FROM {TABLE_PREFIX}_{qname}"))
.expect("SQL select failed");
let init_count = Spi::get_one::<i64>(&format!(
"SELECT count(*) FROM {PGMQ_SCHEMA}.{TABLE_PREFIX}_{qname}"
))
.expect("SQL select failed");
assert_eq!(init_count.unwrap(), 0);
}

Expand Down Expand Up @@ -493,9 +498,10 @@ mod tests {
assert_eq!(nothing.len(), 0);

// but still one record on the table
let init_count =
Spi::get_one::<i64>(&format!("SELECT count(*) FROM {TABLE_PREFIX}_{qname}"))
.expect("SQL select failed");
let init_count = Spi::get_one::<i64>(&format!(
"SELECT count(*) FROM {PGMQ_SCHEMA}.{TABLE_PREFIX}_{qname}"
))
.expect("SQL select failed");
assert_eq!(init_count.unwrap(), 1);

// delete the messages
Expand All @@ -507,9 +513,10 @@ mod tests {
assert!(!delete1);

// no records after delete
let init_count =
Spi::get_one::<i64>(&format!("SELECT count(*) FROM {TABLE_PREFIX}_{qname}"))
.expect("SQL select failed");
let init_count = Spi::get_one::<i64>(&format!(
"SELECT count(*) FROM {PGMQ_SCHEMA}.{TABLE_PREFIX}_{qname}"
))
.expect("SQL select failed");
assert_eq!(init_count.unwrap(), 0);
}

Expand All @@ -519,31 +526,37 @@ mod tests {
let _ = Spi::run("CREATE EXTENSION IF NOT EXISTS pg_partman").expect("SQL select failed");
let _ = pgmq_create_non_partitioned(&qname).unwrap();
// no messages in the queue
let retval = Spi::get_one::<i64>(&format!("SELECT count(*) FROM {TABLE_PREFIX}_{qname}"))
.expect("SQL select failed");
let retval = Spi::get_one::<i64>(&format!(
"SELECT count(*) FROM {PGMQ_SCHEMA}.{TABLE_PREFIX}_{qname}"
))
.expect("SQL select failed");
assert_eq!(retval.unwrap(), 0);
// no messages in queue archive
let retval = Spi::get_one::<i64>(&format!(
"SELECT count(*) FROM {TABLE_PREFIX}_{qname}_archive"
"SELECT count(*) FROM {PGMQ_SCHEMA}.{TABLE_PREFIX}_{qname}_archive"
))
.expect("SQL select failed");
assert_eq!(retval.unwrap(), 0);
// put a message on the queue
let msg_id = pgmq_send(&qname, pgrx::JsonB(serde_json::json!({"x":"y"}))).unwrap();
let retval = Spi::get_one::<i64>(&format!("SELECT count(*) FROM {TABLE_PREFIX}_{qname}"))
.expect("SQL select failed");
let retval = Spi::get_one::<i64>(&format!(
"SELECT count(*) FROM {PGMQ_SCHEMA}.{TABLE_PREFIX}_{qname}"
))
.expect("SQL select failed");
assert_eq!(retval.unwrap(), 1);

// archive the message
let archived = pgmq_archive(&qname, msg_id.unwrap()).unwrap().unwrap();
assert!(archived);
// should be no messages left on the queue table
let retval = Spi::get_one::<i64>(&format!("SELECT count(*) FROM {TABLE_PREFIX}_{qname}"))
.expect("SQL select failed");
let retval = Spi::get_one::<i64>(&format!(
"SELECT count(*) FROM {PGMQ_SCHEMA}.{TABLE_PREFIX}_{qname}"
))
.expect("SQL select failed");
assert_eq!(retval.unwrap(), 0);
// but one on the archive table
let retval = Spi::get_one::<i64>(&format!(
"SELECT count(*) FROM {TABLE_PREFIX}_{qname}_archive"
"SELECT count(*) FROM {PGMQ_SCHEMA}.{TABLE_PREFIX}_{qname}_archive"
))
.expect("SQL select failed");
assert_eq!(retval.unwrap(), 1);
Expand Down
2 changes: 1 addition & 1 deletion src/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ fn create_partitioned_table(
fn set_retention_config(queue: CheckedName<'_>, retention: &str) -> Result<String, PgmqError> {
Ok(format!(
"
UPDATE {PGMQ_SCHEMA}.part_config
UPDATE {PARTMAN_SCHEMA}.part_config
SET
retention = '{retention}',
retention_keep_table = false,
Expand Down
76 changes: 45 additions & 31 deletions tests/integration_tests.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use pgmq_crate::query::PGMQ_SCHEMA;
use pgmq_crate::util::{conn_options, fetch_one_message};
use rand::Rng;
use sqlx::postgres::PgPoolOptions;
Expand Down Expand Up @@ -39,13 +40,15 @@ async fn test_lifecycle() {

// CREATE with default retention and partition strategy
let test_default_queue = format!("test_default_{test_num}");
let _ = sqlx::query(&format!("SELECT pgmq.pgmq_create('{test_default_queue}');"))
.execute(&conn)
.await
.expect("failed to create queue");
let _ = sqlx::query(&format!(
"SELECT {PGMQ_SCHEMA}.pgmq_create('{test_default_queue}');"
))
.execute(&conn)
.await
.expect("failed to create queue");

let msg_id = sqlx::query(&format!(
"SELECT * from pgmq.pgmq_send('{test_default_queue}', '{{\"hello\": \"world\"}}');"
"SELECT * from {PGMQ_SCHEMA}.pgmq_send('{test_default_queue}', '{{\"hello\": \"world\"}}');"
))
.fetch_one(&conn)
.await
Expand All @@ -55,7 +58,7 @@ async fn test_lifecycle() {

// read message
// vt=2, limit=1
let query = &format!("SELECT * from pgmq.pgmq_read('{test_default_queue}', 2, 1);");
let query = &format!("SELECT * from {PGMQ_SCHEMA}.pgmq_read('{test_default_queue}', 2, 1);");

let message = fetch_one_message::<serde_json::Value>(query, &conn)
.await
Expand All @@ -64,8 +67,9 @@ async fn test_lifecycle() {
assert_eq!(message.msg_id, 1);

// set VT to tomorrow
let query =
&format!("SELECT * from pgmq.pgmq_set_vt('{test_default_queue}', {msg_id}, 84600);");
let query = &format!(
"SELECT * from {PGMQ_SCHEMA}.pgmq_set_vt('{test_default_queue}', {msg_id}, 84600);"
);
let message = fetch_one_message::<serde_json::Value>(query, &conn)
.await
.expect("failed reading message")
Expand All @@ -76,22 +80,23 @@ async fn test_lifecycle() {
assert!(message.vt > now + chrono::Duration::seconds(84000));

// read again, assert no messages because we just set VT to tomorrow
let query = &format!("SELECT * from pgmq.pgmq_read('{test_default_queue}', 2, 1);");
let query = &format!("SELECT * from {PGMQ_SCHEMA}.pgmq_read('{test_default_queue}', 2, 1);");
let message = fetch_one_message::<serde_json::Value>(query, &conn)
.await
.expect("failed reading message");
assert!(message.is_none());

// set VT to now
let query = &format!("SELECT * from pgmq.pgmq_set_vt('{test_default_queue}', {msg_id}, 0);");
let query =
&format!("SELECT * from {PGMQ_SCHEMA}.pgmq_set_vt('{test_default_queue}', {msg_id}, 0);");
let message = fetch_one_message::<serde_json::Value>(query, &conn)
.await
.expect("failed reading message")
.expect("expected message");
assert_eq!(message.msg_id, 1);

// read again, should have msg_id 1 again
let query = &format!("SELECT * from pgmq.pgmq_read('{test_default_queue}', 2, 1);");
let query = &format!("SELECT * from {PGMQ_SCHEMA}.pgmq_read('{test_default_queue}', 2, 1);");
let message = fetch_one_message::<serde_json::Value>(query, &conn)
.await
.expect("failed reading message")
Expand All @@ -105,7 +110,7 @@ async fn test_lifecycle() {

// CREATE with 5 seconds per partition, 10 seconds retention
let test_duration_queue = format!("test_duration_{test_num}");
let q = format!("SELECT \"pgmq.pgmq_create_partitioned\"('{test_duration_queue}'::text, '5 seconds'::text, '10 seconds'::text);");
let q = format!("SELECT {PGMQ_SCHEMA}.pgmq_create_partitioned('{test_duration_queue}'::text, '5 seconds'::text, '10 seconds'::text);");
let _ = sqlx::query(&q)
.execute(&conn)
.await
Expand All @@ -114,7 +119,7 @@ async fn test_lifecycle() {
// CREATE with 10 messages per partition, 20 messages retention
let test_numeric_queue = format!("test_numeric_{test_num}");
let _ = sqlx::query(&format!(
"SELECT \"pgmq.pgmq_create_partitioned\"('{test_numeric_queue}'::text, '10'::text, '20'::text);"
"SELECT {PGMQ_SCHEMA}.pgmq_create_partitioned('{test_numeric_queue}'::text, '10'::text, '20'::text);"
))
.execute(&conn)
.await
Expand All @@ -133,18 +138,20 @@ async fn test_lifecycle() {

// get metrics
let rows = sqlx::query_as::<_, MetricsRow>(&format!(
"SELECT * from pgmq.pgmq_metrics('{test_duration_queue}'::text);"
"SELECT * from {PGMQ_SCHEMA}.pgmq_metrics('{test_duration_queue}'::text);"
))
.fetch_all(&conn)
.await
.expect("failed creating numeric interval queue");
assert_eq!(rows.len(), 1);

// get metrics all
let rows = sqlx::query_as::<_, MetricsRow>(&format!("SELECT * from pgmq.pgmq_metrics_all();"))
.fetch_all(&conn)
.await
.expect("failed creating numeric interval queue");
let rows = sqlx::query_as::<_, MetricsRow>(&format!(
"SELECT * from {PGMQ_SCHEMA}.pgmq_metrics_all();"
))
.fetch_all(&conn)
.await
.expect("failed creating numeric interval queue");
assert!(rows.len() > 1);

// delete all the queues
Expand All @@ -156,30 +163,37 @@ async fn test_lifecycle() {

// delete partitioned queues
for queue in [test_duration_queue, test_numeric_queue].iter() {
sqlx::query(&format!("select pgmq.pgmq_drop_queue('{}', true);", &queue))
.execute(&conn)
.await
.expect("failed to drop partitioned queues");
sqlx::query(&format!(
"select {PGMQ_SCHEMA}.pgmq_drop_queue('{}', true);",
&queue
))
.execute(&conn)
.await
.expect("failed to drop partitioned queues");
}

let queues = sqlx::query_as::<_, QueueMeta>("select queue_name from pgmq_list_queues();")
.fetch_all(&conn)
.await
.expect("failed to list queues");
let queues = sqlx::query_as::<_, QueueMeta>(&format!(
"select queue_name from {PGMQ_SCHEMA}.pgmq_list_queues();"
))
.fetch_all(&conn)
.await
.expect("failed to list queues");

// drop the rest of the queues
for queue in queues {
let q = queue.queue_name;
sqlx::query(&format!("select pgmq.pgmq_drop_queue('{}');", &q))
sqlx::query(&format!("select {PGMQ_SCHEMA}.pgmq_drop_queue('{}');", &q))
.execute(&conn)
.await
.expect("failed to drop standard queues");
}

let queues = sqlx::query_as::<_, QueueMeta>("select queue_name from pgmq_list_queues();")
.fetch_all(&conn)
.await
.expect("failed to list queues");
let queues = sqlx::query_as::<_, QueueMeta>(&format!(
"select queue_name from {PGMQ_SCHEMA}.pgmq_list_queues();"
))
.fetch_all(&conn)
.await
.expect("failed to list queues");
assert!(queues.is_empty());

#[allow(dead_code)]
Expand Down

0 comments on commit f223ec1

Please sign in to comment.