Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Split/improve extension client tests #81

Merged
merged 3 commits into from
Aug 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions core/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,9 @@ pub fn create_meta() -> String {
fn grant_stmt(table: &str) -> String {
let grant_seq = match &table.contains("pgmq_meta") {
true => "".to_string(),
false => format!("EXECUTE 'GRANT SELECT ON SEQUENCE {table}_msg_id_seq TO pg_monitor';"),
false => {
format!(" EXECUTE 'GRANT SELECT ON SEQUENCE {table}_msg_id_seq TO pg_monitor';")
}
};
format!(
"
Expand All @@ -111,7 +113,7 @@ BEGIN
WHERE has_table_privilege('pg_monitor', '{table}', 'SELECT')
) THEN
EXECUTE 'GRANT SELECT ON {table} TO pg_monitor';
{grant_seq}
{grant_seq}
END IF;
END;
$$ LANGUAGE plpgsql;
Expand Down
202 changes: 1 addition & 201 deletions core/tests/integration_test.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use chrono::{Duration, Utc};
use pgmq::{self, query::TABLE_PREFIX, util::connect, Message};
use pgmq::{self, query::TABLE_PREFIX, Message};
use rand::Rng;
use serde::{Deserialize, Serialize};
use serde_json::Value;
Expand All @@ -24,22 +24,6 @@ async fn init_queue(qname: &str) -> pgmq::PGMQueue {
queue
}

async fn init_queue_ext(qname: &str) -> pgmq::PGMQueueExt {
let db_url = env::var("DATABASE_URL")
.unwrap_or_else(|_| "postgres://postgres:postgres@localhost:5432/pgmq".to_owned());
let queue = pgmq::PGMQueueExt::new(db_url, 2)
.await
.expect("failed to connect to postgres");
queue.init().await.expect("failed to init pgmq");
// make sure queue doesn't exist before the test
let _ = queue.drop_queue(qname).await;
// CREATE QUEUE
let q_success = queue.create(qname).await;
println!("q_success: {:?}", q_success);
assert!(q_success.is_ok());
queue
}

#[derive(Serialize, Debug, Deserialize, Eq, PartialEq)]
struct MyMessage {
foo: String,
Expand Down Expand Up @@ -762,187 +746,3 @@ async fn test_set_vt() {
let num_rows_queue = rowcount(&test_queue, &queue.connection).await;
assert_eq!(num_rows_queue, 1);
}

#[tokio::test]
async fn test_extension_api() {
let test_queue = format!("test_ext_api_{}", rand::thread_rng().gen_range(0..100000));
let test_queue_archive = format!("{}_archive", test_queue);

let queue = init_queue_ext(&test_queue).await;
let msg = MyMessage::default();
let num_rows_queue = rowcount(&test_queue, &queue.connection).await;
println!("num_rows_queue: {:?}", num_rows_queue);
assert_eq!(num_rows_queue, 0);

let qs = queue
.list_queues()
.await
.expect("error listing queues")
.expect("test queue was not created");
let q_names = qs
.iter()
.map(|q| q.queue_name.clone())
.collect::<Vec<String>>();
assert!(q_names.contains(&test_queue));

let msg_id = queue.send(&test_queue, &msg).await.unwrap();
assert!(msg_id >= 1);

let read_message = queue
.read::<MyMessage>(&test_queue, 5)
.await
.expect("error reading message");
assert!(read_message.is_some());
let read_message = read_message.unwrap();
assert_eq!(read_message.msg_id, msg_id);
assert_eq!(read_message.message, msg);

// read again, assert no messages visible
let read_message = queue
.read::<MyMessage>(&test_queue, 2)
.await
.expect("error reading message");
assert!(read_message.is_none());

// read with poll, blocks until message visible
let read_messages = queue
.read_batch_with_poll::<MyMessage>(
&test_queue,
5,
1,
Some(std::time::Duration::from_secs(6)),
None,
)
.await
.expect("error reading message")
.expect("no message");

assert_eq!(read_messages.len(), 1);
assert_eq!(read_messages[0].msg_id, msg_id);

// change the VT to now
let _vt_set = queue
.set_vt::<MyMessage>(&test_queue, msg_id, 0)
.await
.expect("failed to set VT");
let read_message = queue
.read::<MyMessage>(&test_queue, 1)
.await
.expect("error reading message")
.expect("expected a message");
assert_eq!(read_message.msg_id, msg_id);

// archive message
let archived = queue
.archive(&test_queue, msg_id)
.await
.expect("failed to archive");
assert!(archived);

// pop message
let pmsg = MyMessage {
foo: "pop".to_owned(),
num: 123,
};
let msg_id_pop = queue.send(&test_queue, &pmsg).await.unwrap();
assert!(msg_id_pop > msg_id);
let popped = queue
.pop::<MyMessage>(&test_queue)
.await
.expect("failed to pop")
.expect("no message to pop");
assert_eq!(popped.message, pmsg);

// delete message
let del_msg = MyMessage {
foo: "delete".to_owned(),
num: 123,
};
let msg_id_del = queue.send(&test_queue, &del_msg).await.unwrap();
assert!(msg_id_del > msg_id_pop);
let deleted = queue
.delete(&test_queue, msg_id_del)
.await
.expect("failed to delete");
assert!(deleted);
// try delete a message that doesnt exist
let deleted = queue
.delete(&test_queue, msg_id_del)
.await
.expect("failed to delete");
assert!(!deleted);

// delete a batch of messages
let m1 = queue.send(&test_queue, &del_msg).await.unwrap();
let m2 = queue.send(&test_queue, &del_msg).await.unwrap();
let m3 = queue.send(&test_queue, &del_msg).await.unwrap();
let delete_result = queue
.delete_batch(&test_queue, &[m1, m2, m3])
.await
.expect("delete batch error");
let post_delete_rowcount = rowcount(&test_queue, &queue.connection).await;
assert_eq!(post_delete_rowcount, 0);
assert_eq!(delete_result, true);

// archive a batch of messages
let m4 = queue.send(&test_queue, &del_msg).await.unwrap();
let m5 = queue.send(&test_queue, &del_msg).await.unwrap();
let m6 = queue.send(&test_queue, &del_msg).await.unwrap();
let archive_result = queue
.archive_batch(&test_queue, &[m4, m5, m6])
.await
.expect("archive batch error");
let post_archive_rowcount = rowcount(&test_queue, &queue.connection).await;
assert_eq!(post_archive_rowcount, 0);
assert_eq!(archive_result, true);
let post_archive_archive_rowcount = rowcount(&test_queue_archive, &queue.connection).await;
assert_eq!(post_archive_archive_rowcount, 4);
}

#[tokio::test]
async fn test_pgmq_init() {
let db_url = env::var("DATABASE_URL")
.unwrap_or_else(|_| "postgres://postgres:postgres@localhost:5432/pgmq".to_owned());
let queue = pgmq::PGMQueueExt::new(db_url.clone(), 2)
.await
.expect("failed to connect to postgres");
let init = queue.init().await.expect("failed to create extension");
assert!(init, "failed to create extension");

// error mode on queue partitioned create but already exists
let qname = format!("test_dup_{}", rand::thread_rng().gen_range(0..100));
println!("db_url: {}, qname: {:?}", db_url, qname);
let created = queue
.create_partitioned(&qname)
.await
.expect("failed attempting to create queue");
assert!(created, "did not create queue");
// create again
let created = queue
.create_partitioned(&qname)
.await
.expect("failed attempting to create the duplicate queue");
assert!(!created, "failed to detect duplicate queue");
}

/// test "bring your own pool"
#[tokio::test]
async fn test_byop() {
let db_url = env::var("DATABASE_URL")
.unwrap_or_else(|_| "postgres://postgres:postgres@localhost:5432/pgmq".to_owned());

let conn = connect(&db_url, 2).await.expect("failed to connect");

let queue = pgmq::PGMQueueExt::new_with_pool(conn)
.await
.expect("failed to connect to postgres");

let init = queue.init().await.expect("failed to create extension");
assert!(init, "failed to create extension");

let created = queue
.create("test_byop")
.await
.expect("failed to create queue");
assert!(created);
}
Loading
Loading