Skip to content

Commit

Permalink
Split/improve extension client tests (#81)
Browse files Browse the repository at this point in the history
* Split/improve extension client tests

* Split files

* Potential fix for CI
  • Loading branch information
v0idpwn authored Aug 25, 2023
1 parent b8eebd5 commit 7b081bb
Show file tree
Hide file tree
Showing 3 changed files with 305 additions and 203 deletions.
6 changes: 4 additions & 2 deletions core/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,9 @@ pub fn create_meta() -> String {
fn grant_stmt(table: &str) -> String {
let grant_seq = match &table.contains("pgmq_meta") {
true => "".to_string(),
false => format!("EXECUTE 'GRANT SELECT ON SEQUENCE {table}_msg_id_seq TO pg_monitor';"),
false => {
format!(" EXECUTE 'GRANT SELECT ON SEQUENCE {table}_msg_id_seq TO pg_monitor';")
}
};
format!(
"
Expand All @@ -111,7 +113,7 @@ BEGIN
WHERE has_table_privilege('pg_monitor', '{table}', 'SELECT')
) THEN
EXECUTE 'GRANT SELECT ON {table} TO pg_monitor';
{grant_seq}
{grant_seq}
END IF;
END;
$$ LANGUAGE plpgsql;
Expand Down
202 changes: 1 addition & 201 deletions core/tests/integration_test.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use chrono::{Duration, Utc};
use pgmq::{self, query::TABLE_PREFIX, util::connect, Message};
use pgmq::{self, query::TABLE_PREFIX, Message};
use rand::Rng;
use serde::{Deserialize, Serialize};
use serde_json::Value;
Expand All @@ -24,22 +24,6 @@ async fn init_queue(qname: &str) -> pgmq::PGMQueue {
queue
}

async fn init_queue_ext(qname: &str) -> pgmq::PGMQueueExt {
let db_url = env::var("DATABASE_URL")
.unwrap_or_else(|_| "postgres://postgres:postgres@localhost:5432/pgmq".to_owned());
let queue = pgmq::PGMQueueExt::new(db_url, 2)
.await
.expect("failed to connect to postgres");
queue.init().await.expect("failed to init pgmq");
// make sure queue doesn't exist before the test
let _ = queue.drop_queue(qname).await;
// CREATE QUEUE
let q_success = queue.create(qname).await;
println!("q_success: {:?}", q_success);
assert!(q_success.is_ok());
queue
}

#[derive(Serialize, Debug, Deserialize, Eq, PartialEq)]
struct MyMessage {
foo: String,
Expand Down Expand Up @@ -762,187 +746,3 @@ async fn test_set_vt() {
let num_rows_queue = rowcount(&test_queue, &queue.connection).await;
assert_eq!(num_rows_queue, 1);
}

#[tokio::test]
async fn test_extension_api() {
let test_queue = format!("test_ext_api_{}", rand::thread_rng().gen_range(0..100000));
let test_queue_archive = format!("{}_archive", test_queue);

let queue = init_queue_ext(&test_queue).await;
let msg = MyMessage::default();
let num_rows_queue = rowcount(&test_queue, &queue.connection).await;
println!("num_rows_queue: {:?}", num_rows_queue);
assert_eq!(num_rows_queue, 0);

let qs = queue
.list_queues()
.await
.expect("error listing queues")
.expect("test queue was not created");
let q_names = qs
.iter()
.map(|q| q.queue_name.clone())
.collect::<Vec<String>>();
assert!(q_names.contains(&test_queue));

let msg_id = queue.send(&test_queue, &msg).await.unwrap();
assert!(msg_id >= 1);

let read_message = queue
.read::<MyMessage>(&test_queue, 5)
.await
.expect("error reading message");
assert!(read_message.is_some());
let read_message = read_message.unwrap();
assert_eq!(read_message.msg_id, msg_id);
assert_eq!(read_message.message, msg);

// read again, assert no messages visible
let read_message = queue
.read::<MyMessage>(&test_queue, 2)
.await
.expect("error reading message");
assert!(read_message.is_none());

// read with poll, blocks until message visible
let read_messages = queue
.read_batch_with_poll::<MyMessage>(
&test_queue,
5,
1,
Some(std::time::Duration::from_secs(6)),
None,
)
.await
.expect("error reading message")
.expect("no message");

assert_eq!(read_messages.len(), 1);
assert_eq!(read_messages[0].msg_id, msg_id);

// change the VT to now
let _vt_set = queue
.set_vt::<MyMessage>(&test_queue, msg_id, 0)
.await
.expect("failed to set VT");
let read_message = queue
.read::<MyMessage>(&test_queue, 1)
.await
.expect("error reading message")
.expect("expected a message");
assert_eq!(read_message.msg_id, msg_id);

// archive message
let archived = queue
.archive(&test_queue, msg_id)
.await
.expect("failed to archive");
assert!(archived);

// pop message
let pmsg = MyMessage {
foo: "pop".to_owned(),
num: 123,
};
let msg_id_pop = queue.send(&test_queue, &pmsg).await.unwrap();
assert!(msg_id_pop > msg_id);
let popped = queue
.pop::<MyMessage>(&test_queue)
.await
.expect("failed to pop")
.expect("no message to pop");
assert_eq!(popped.message, pmsg);

// delete message
let del_msg = MyMessage {
foo: "delete".to_owned(),
num: 123,
};
let msg_id_del = queue.send(&test_queue, &del_msg).await.unwrap();
assert!(msg_id_del > msg_id_pop);
let deleted = queue
.delete(&test_queue, msg_id_del)
.await
.expect("failed to delete");
assert!(deleted);
// try delete a message that doesnt exist
let deleted = queue
.delete(&test_queue, msg_id_del)
.await
.expect("failed to delete");
assert!(!deleted);

// delete a batch of messages
let m1 = queue.send(&test_queue, &del_msg).await.unwrap();
let m2 = queue.send(&test_queue, &del_msg).await.unwrap();
let m3 = queue.send(&test_queue, &del_msg).await.unwrap();
let delete_result = queue
.delete_batch(&test_queue, &[m1, m2, m3])
.await
.expect("delete batch error");
let post_delete_rowcount = rowcount(&test_queue, &queue.connection).await;
assert_eq!(post_delete_rowcount, 0);
assert_eq!(delete_result, true);

// archive a batch of messages
let m4 = queue.send(&test_queue, &del_msg).await.unwrap();
let m5 = queue.send(&test_queue, &del_msg).await.unwrap();
let m6 = queue.send(&test_queue, &del_msg).await.unwrap();
let archive_result = queue
.archive_batch(&test_queue, &[m4, m5, m6])
.await
.expect("archive batch error");
let post_archive_rowcount = rowcount(&test_queue, &queue.connection).await;
assert_eq!(post_archive_rowcount, 0);
assert_eq!(archive_result, true);
let post_archive_archive_rowcount = rowcount(&test_queue_archive, &queue.connection).await;
assert_eq!(post_archive_archive_rowcount, 4);
}

#[tokio::test]
async fn test_pgmq_init() {
let db_url = env::var("DATABASE_URL")
.unwrap_or_else(|_| "postgres://postgres:postgres@localhost:5432/pgmq".to_owned());
let queue = pgmq::PGMQueueExt::new(db_url.clone(), 2)
.await
.expect("failed to connect to postgres");
let init = queue.init().await.expect("failed to create extension");
assert!(init, "failed to create extension");

// error mode on queue partitioned create but already exists
let qname = format!("test_dup_{}", rand::thread_rng().gen_range(0..100));
println!("db_url: {}, qname: {:?}", db_url, qname);
let created = queue
.create_partitioned(&qname)
.await
.expect("failed attempting to create queue");
assert!(created, "did not create queue");
// create again
let created = queue
.create_partitioned(&qname)
.await
.expect("failed attempting to create the duplicate queue");
assert!(!created, "failed to detect duplicate queue");
}

/// test "bring your own pool"
#[tokio::test]
async fn test_byop() {
let db_url = env::var("DATABASE_URL")
.unwrap_or_else(|_| "postgres://postgres:postgres@localhost:5432/pgmq".to_owned());

let conn = connect(&db_url, 2).await.expect("failed to connect");

let queue = pgmq::PGMQueueExt::new_with_pool(conn)
.await
.expect("failed to connect to postgres");

let init = queue.init().await.expect("failed to create extension");
assert!(init, "failed to create extension");

let created = queue
.create("test_byop")
.await
.expect("failed to create queue");
assert!(created);
}
Loading

0 comments on commit 7b081bb

Please sign in to comment.