From 7b081bb2eb33245a898361a305c9d44dd2235ece Mon Sep 17 00:00:00 2001 From: felipe stival <14948182+v0idpwn@users.noreply.github.com> Date: Fri, 25 Aug 2023 19:08:05 +0300 Subject: [PATCH] Split/improve extension client tests (#81) * Split/improve extension client tests * Split files * Potential fix for CI --- core/src/query.rs | 6 +- core/tests/integration_test.rs | 202 +---------------- core/tests/pg_ext_integration_test.rs | 300 ++++++++++++++++++++++++++ 3 files changed, 305 insertions(+), 203 deletions(-) create mode 100644 core/tests/pg_ext_integration_test.rs diff --git a/core/src/query.rs b/core/src/query.rs index 5b8105af..d4a43a36 100644 --- a/core/src/query.rs +++ b/core/src/query.rs @@ -100,7 +100,9 @@ pub fn create_meta() -> String { fn grant_stmt(table: &str) -> String { let grant_seq = match &table.contains("pgmq_meta") { true => "".to_string(), - false => format!("EXECUTE 'GRANT SELECT ON SEQUENCE {table}_msg_id_seq TO pg_monitor';"), + false => { + format!(" EXECUTE 'GRANT SELECT ON SEQUENCE {table}_msg_id_seq TO pg_monitor';") + } }; format!( " @@ -111,7 +113,7 @@ BEGIN WHERE has_table_privilege('pg_monitor', '{table}', 'SELECT') ) THEN EXECUTE 'GRANT SELECT ON {table} TO pg_monitor'; - {grant_seq} +{grant_seq} END IF; END; $$ LANGUAGE plpgsql; diff --git a/core/tests/integration_test.rs b/core/tests/integration_test.rs index b196c85d..66eb4823 100644 --- a/core/tests/integration_test.rs +++ b/core/tests/integration_test.rs @@ -1,5 +1,5 @@ use chrono::{Duration, Utc}; -use pgmq::{self, query::TABLE_PREFIX, util::connect, Message}; +use pgmq::{self, query::TABLE_PREFIX, Message}; use rand::Rng; use serde::{Deserialize, Serialize}; use serde_json::Value; @@ -24,22 +24,6 @@ async fn init_queue(qname: &str) -> pgmq::PGMQueue { queue } -async fn init_queue_ext(qname: &str) -> pgmq::PGMQueueExt { - let db_url = env::var("DATABASE_URL") - .unwrap_or_else(|_| "postgres://postgres:postgres@localhost:5432/pgmq".to_owned()); - let queue = pgmq::PGMQueueExt::new(db_url, 2) - .await - .expect("failed to connect to postgres"); - queue.init().await.expect("failed to init pgmq"); - // make sure queue doesn't exist before the test - let _ = queue.drop_queue(qname).await; - // CREATE QUEUE - let q_success = queue.create(qname).await; - println!("q_success: {:?}", q_success); - assert!(q_success.is_ok()); - queue -} - #[derive(Serialize, Debug, Deserialize, Eq, PartialEq)] struct MyMessage { foo: String, @@ -762,187 +746,3 @@ async fn test_set_vt() { let num_rows_queue = rowcount(&test_queue, &queue.connection).await; assert_eq!(num_rows_queue, 1); } - -#[tokio::test] -async fn test_extension_api() { - let test_queue = format!("test_ext_api_{}", rand::thread_rng().gen_range(0..100000)); - let test_queue_archive = format!("{}_archive", test_queue); - - let queue = init_queue_ext(&test_queue).await; - let msg = MyMessage::default(); - let num_rows_queue = rowcount(&test_queue, &queue.connection).await; - println!("num_rows_queue: {:?}", num_rows_queue); - assert_eq!(num_rows_queue, 0); - - let qs = queue - .list_queues() - .await - .expect("error listing queues") - .expect("test queue was not created"); - let q_names = qs - .iter() - .map(|q| q.queue_name.clone()) - .collect::>(); - assert!(q_names.contains(&test_queue)); - - let msg_id = queue.send(&test_queue, &msg).await.unwrap(); - assert!(msg_id >= 1); - - let read_message = queue - .read::(&test_queue, 5) - .await - .expect("error reading message"); - assert!(read_message.is_some()); - let read_message = read_message.unwrap(); - assert_eq!(read_message.msg_id, msg_id); - assert_eq!(read_message.message, msg); - - // read again, assert no messages visible - let read_message = queue - .read::(&test_queue, 2) - .await - .expect("error reading message"); - assert!(read_message.is_none()); - - // read with poll, blocks until message visible - let read_messages = queue - .read_batch_with_poll::( - &test_queue, - 5, - 1, - Some(std::time::Duration::from_secs(6)), - None, - ) - .await - .expect("error reading message") - .expect("no message"); - - assert_eq!(read_messages.len(), 1); - assert_eq!(read_messages[0].msg_id, msg_id); - - // change the VT to now - let _vt_set = queue - .set_vt::(&test_queue, msg_id, 0) - .await - .expect("failed to set VT"); - let read_message = queue - .read::(&test_queue, 1) - .await - .expect("error reading message") - .expect("expected a message"); - assert_eq!(read_message.msg_id, msg_id); - - // archive message - let archived = queue - .archive(&test_queue, msg_id) - .await - .expect("failed to archive"); - assert!(archived); - - // pop message - let pmsg = MyMessage { - foo: "pop".to_owned(), - num: 123, - }; - let msg_id_pop = queue.send(&test_queue, &pmsg).await.unwrap(); - assert!(msg_id_pop > msg_id); - let popped = queue - .pop::(&test_queue) - .await - .expect("failed to pop") - .expect("no message to pop"); - assert_eq!(popped.message, pmsg); - - // delete message - let del_msg = MyMessage { - foo: "delete".to_owned(), - num: 123, - }; - let msg_id_del = queue.send(&test_queue, &del_msg).await.unwrap(); - assert!(msg_id_del > msg_id_pop); - let deleted = queue - .delete(&test_queue, msg_id_del) - .await - .expect("failed to delete"); - assert!(deleted); - // try delete a message that doesnt exist - let deleted = queue - .delete(&test_queue, msg_id_del) - .await - .expect("failed to delete"); - assert!(!deleted); - - // delete a batch of messages - let m1 = queue.send(&test_queue, &del_msg).await.unwrap(); - let m2 = queue.send(&test_queue, &del_msg).await.unwrap(); - let m3 = queue.send(&test_queue, &del_msg).await.unwrap(); - let delete_result = queue - .delete_batch(&test_queue, &[m1, m2, m3]) - .await - .expect("delete batch error"); - let post_delete_rowcount = rowcount(&test_queue, &queue.connection).await; - assert_eq!(post_delete_rowcount, 0); - assert_eq!(delete_result, true); - - // archive a batch of messages - let m4 = queue.send(&test_queue, &del_msg).await.unwrap(); - let m5 = queue.send(&test_queue, &del_msg).await.unwrap(); - let m6 = queue.send(&test_queue, &del_msg).await.unwrap(); - let archive_result = queue - .archive_batch(&test_queue, &[m4, m5, m6]) - .await - .expect("archive batch error"); - let post_archive_rowcount = rowcount(&test_queue, &queue.connection).await; - assert_eq!(post_archive_rowcount, 0); - assert_eq!(archive_result, true); - let post_archive_archive_rowcount = rowcount(&test_queue_archive, &queue.connection).await; - assert_eq!(post_archive_archive_rowcount, 4); -} - -#[tokio::test] -async fn test_pgmq_init() { - let db_url = env::var("DATABASE_URL") - .unwrap_or_else(|_| "postgres://postgres:postgres@localhost:5432/pgmq".to_owned()); - let queue = pgmq::PGMQueueExt::new(db_url.clone(), 2) - .await - .expect("failed to connect to postgres"); - let init = queue.init().await.expect("failed to create extension"); - assert!(init, "failed to create extension"); - - // error mode on queue partitioned create but already exists - let qname = format!("test_dup_{}", rand::thread_rng().gen_range(0..100)); - println!("db_url: {}, qname: {:?}", db_url, qname); - let created = queue - .create_partitioned(&qname) - .await - .expect("failed attempting to create queue"); - assert!(created, "did not create queue"); - // create again - let created = queue - .create_partitioned(&qname) - .await - .expect("failed attempting to create the duplicate queue"); - assert!(!created, "failed to detect duplicate queue"); -} - -/// test "bring your own pool" -#[tokio::test] -async fn test_byop() { - let db_url = env::var("DATABASE_URL") - .unwrap_or_else(|_| "postgres://postgres:postgres@localhost:5432/pgmq".to_owned()); - - let conn = connect(&db_url, 2).await.expect("failed to connect"); - - let queue = pgmq::PGMQueueExt::new_with_pool(conn) - .await - .expect("failed to connect to postgres"); - - let init = queue.init().await.expect("failed to create extension"); - assert!(init, "failed to create extension"); - - let created = queue - .create("test_byop") - .await - .expect("failed to create queue"); - assert!(created); -} diff --git a/core/tests/pg_ext_integration_test.rs b/core/tests/pg_ext_integration_test.rs new file mode 100644 index 00000000..7d15b204 --- /dev/null +++ b/core/tests/pg_ext_integration_test.rs @@ -0,0 +1,300 @@ +use pgmq::{self, query::TABLE_PREFIX, util::connect}; +use rand::Rng; +use serde::{Deserialize, Serialize}; +use sqlx::{Pool, Postgres, Row}; +use std::env; + +async fn init_queue_ext(qname: &str) -> pgmq::PGMQueueExt { + let db_url = env::var("DATABASE_URL") + .unwrap_or_else(|_| "postgres://postgres:postgres@localhost:5432/pgmq".to_owned()); + let queue = pgmq::PGMQueueExt::new(db_url, 2) + .await + .expect("failed to connect to postgres"); + queue.init().await.expect("failed to init pgmq"); + // make sure queue doesn't exist before the test + let _ = queue.drop_queue(qname).await; + // CREATE QUEUE + let q_success = queue.create(qname).await; + println!("q_success: {:?}", q_success); + assert!(q_success.is_ok()); + queue +} + +#[derive(Serialize, Debug, Deserialize, Eq, PartialEq)] +struct MyMessage { + foo: String, + num: u64, +} + +impl Default for MyMessage { + fn default() -> Self { + MyMessage { + foo: "bar".to_owned(), + num: rand::thread_rng().gen_range(0..100), + } + } +} + +#[derive(Serialize, Debug, Deserialize)] +struct YoloMessage { + yolo: String, +} + +async fn rowcount(qname: &str, connection: &Pool) -> i64 { + let row_ct_query = format!("SELECT count(*) as ct FROM {TABLE_PREFIX}_{qname}"); + sqlx::query(&row_ct_query) + .fetch_one(connection) + .await + .unwrap() + .get::(0) +} + +#[tokio::test] +async fn test_ext_create_list_drop() { + let test_queue = format!( + "test_ext_create_list_drop_{}", + rand::thread_rng().gen_range(0..100000) + ); + let queue = init_queue_ext(&test_queue).await; + + let q_names = queue + .list_queues() + .await + .expect("error listing queues") + .expect("test queue was not created") + .iter() + .map(|q| q.queue_name.clone()) + .collect::>(); + + assert!(q_names.contains(&test_queue)); + + let _ = queue + .drop_queue(&test_queue) + .await + .expect("error dropping queue"); + + let post_drop_q_names = queue + .list_queues() + .await + .expect("error listing queues") + .expect("test queue was not created") + .iter() + .map(|q| q.queue_name.clone()) + .collect::>(); + + assert!(!post_drop_q_names.contains(&test_queue)); +} + +#[tokio::test] +async fn test_ext_send_read_delete() { + let test_queue = format!( + "test_ext_send_read_delete_{}", + rand::thread_rng().gen_range(0..100000) + ); + + let queue = init_queue_ext(&test_queue).await; + let msg = MyMessage::default(); + let num_rows_queue = rowcount(&test_queue, &queue.connection).await; + assert_eq!(num_rows_queue, 0); + + let msg_id = queue.send(&test_queue, &msg).await.unwrap(); + assert!(msg_id >= 1); + + let read_message = queue + .read::(&test_queue, 5) + .await + .expect("error reading message"); + assert!(read_message.is_some()); + let read_message = read_message.unwrap(); + assert_eq!(read_message.msg_id, msg_id); + assert_eq!(read_message.message, msg); + + // read again, assert no messages visible + let read_message = queue + .read::(&test_queue, 2) + .await + .expect("error reading message"); + assert!(read_message.is_none()); + + // read with poll, blocks until message visible + let start_poll = std::time::Instant::now(); + let read_with_poll = queue + .read_batch_with_poll::( + &test_queue, + 5, + 1, + Some(std::time::Duration::from_secs(6)), + None, + ) + .await + .expect("error reading message") + .expect("no message"); + + let poll_duration = start_poll.elapsed(); + + assert!(poll_duration.as_millis() > 1000); + assert_eq!(read_with_poll.len(), 1); + assert_eq!(read_with_poll[0].msg_id, msg_id); + + // change the VT to now + let _vt_set = queue + .set_vt::(&test_queue, msg_id, 0) + .await + .expect("failed to set VT"); + let read_message = queue + .read::(&test_queue, 1) + .await + .expect("error reading message") + .expect("expected a message"); + assert_eq!(read_message.msg_id, msg_id); + + // delete message + let msg_id_del = queue.send(&test_queue, &msg).await.unwrap(); + + let deleted = queue + .delete(&test_queue, msg_id_del) + .await + .expect("failed to delete"); + assert!(deleted); + + // try to delete a message that doesnt exist + let deleted = queue + .delete(&test_queue, msg_id_del) + .await + .expect("failed to delete"); + assert!(!deleted); +} + +#[tokio::test] +async fn test_ext_send_pop() { + let test_queue = format!( + "test_ext_send_pop_{}", + rand::thread_rng().gen_range(0..100000) + ); + let queue = init_queue_ext(&test_queue).await; + let msg = MyMessage::default(); + + let _ = queue.send(&test_queue, &msg).await.unwrap(); + + let popped = queue + .pop::(&test_queue) + .await + .expect("failed to pop") + .expect("no message to pop"); + assert_eq!(popped.message, msg); +} + +#[tokio::test] +async fn test_ext_send_archive() { + let test_queue = format!( + "test_ext_send_archive_{}", + rand::thread_rng().gen_range(0..100000) + ); + let queue = init_queue_ext(&test_queue).await; + let msg = MyMessage::default(); + + let msg_id = queue.send(&test_queue, &msg).await.unwrap(); + + let archived = queue + .archive(&test_queue, msg_id) + .await + .expect("failed to archive"); + assert!(archived); +} + +#[tokio::test] +async fn test_ext_archive_batch() { + let test_queue = format!( + "test_ext_archive_batch_{}", + rand::thread_rng().gen_range(0..100000) + ); + let test_queue_archive = format!("{}_archive", test_queue); + let queue = init_queue_ext(&test_queue).await; + let msg = MyMessage::default(); + + let m1 = queue.send(&test_queue, &msg).await.unwrap(); + let m2 = queue.send(&test_queue, &msg).await.unwrap(); + let m3 = queue.send(&test_queue, &msg).await.unwrap(); + + let archive_result = queue + .archive_batch(&test_queue, &[m1, m2, m3]) + .await + .expect("archive batch error"); + + let post_archive_rowcount = rowcount(&test_queue, &queue.connection).await; + + assert_eq!(post_archive_rowcount, 0); + assert_eq!(archive_result, true); + + let post_archive_archive_rowcount = rowcount(&test_queue_archive, &queue.connection).await; + assert_eq!(post_archive_archive_rowcount, 3); +} + +#[tokio::test] +async fn test_ext_delete_batch() { + let test_queue = format!( + "test_ext_delete_batch{}", + rand::thread_rng().gen_range(0..100000) + ); + + let queue = init_queue_ext(&test_queue).await; + let msg = MyMessage::default(); + let m1 = queue.send(&test_queue, &msg).await.unwrap(); + let m2 = queue.send(&test_queue, &msg).await.unwrap(); + let m3 = queue.send(&test_queue, &msg).await.unwrap(); + let delete_result = queue + .delete_batch(&test_queue, &[m1, m2, m3]) + .await + .expect("delete batch error"); + let post_delete_rowcount = rowcount(&test_queue, &queue.connection).await; + assert_eq!(post_delete_rowcount, 0); + assert_eq!(delete_result, true); +} + +#[tokio::test] +async fn test_pgmq_init() { + let db_url = env::var("DATABASE_URL") + .unwrap_or_else(|_| "postgres://postgres:postgres@localhost:5432/pgmq".to_owned()); + let queue = pgmq::PGMQueueExt::new(db_url.clone(), 2) + .await + .expect("failed to connect to postgres"); + let init = queue.init().await.expect("failed to create extension"); + assert!(init, "failed to create extension"); + + // error mode on queue partitioned create but already exists + let qname = format!("test_dup_{}", rand::thread_rng().gen_range(0..100)); + println!("db_url: {}, qname: {:?}", db_url, qname); + let created = queue + .create_partitioned(&qname) + .await + .expect("failed attempting to create queue"); + assert!(created, "did not create queue"); + // create again + let created = queue + .create_partitioned(&qname) + .await + .expect("failed attempting to create the duplicate queue"); + assert!(!created, "failed to detect duplicate queue"); +} + +/// test "bring your own pool" +#[tokio::test] +async fn test_byop() { + let db_url = env::var("DATABASE_URL") + .unwrap_or_else(|_| "postgres://postgres:postgres@localhost:5432/pgmq".to_owned()); + + let conn = connect(&db_url, 2).await.expect("failed to connect"); + + let queue = pgmq::PGMQueueExt::new_with_pool(conn) + .await + .expect("failed to connect to postgres"); + + let init = queue.init().await.expect("failed to create extension"); + assert!(init, "failed to create extension"); + + let created = queue + .create("test_byop") + .await + .expect("failed to create queue"); + assert!(created); +}