Skip to content

Commit

Permalink
Add read_batch_with_queue to queueext
Browse files Browse the repository at this point in the history
  • Loading branch information
v0idpwn committed Aug 18, 2023
1 parent a28bf20 commit 051517b
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 2 deletions.
47 changes: 47 additions & 0 deletions core/src/pg_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ use serde::{Deserialize, Serialize};
use sqlx::types::chrono::Utc;
use sqlx::{Executor, Pool, Postgres};

const DEFAULT_POLL_TIMEOUT_S: i32 = 5;
const DEFAULT_POLL_INTERVAL_MS: i32 = 250;

/// Main controller for interacting with a managed by the PGMQ Postgres extension.
#[derive(Clone, Debug)]
pub struct PGMQueueExt {
Expand Down Expand Up @@ -194,6 +197,50 @@ impl PGMQueueExt {
}
}

pub async fn read_batch_with_poll<T: for<'de> Deserialize<'de>>(
&self,
queue_name: &str,
vt: i32,
max_batch_size: i32,
poll_timeout: Option<std::time::Duration>,
poll_interval: Option<std::time::Duration>,
) -> Result<Option<Message<T>>, PgmqError> {
check_input(queue_name)?;
let poll_timeout_s = poll_timeout.map_or(DEFAULT_POLL_TIMEOUT_S, |t| t.as_secs() as i32);
let poll_interval_ms =
poll_interval_ms.map_or(DEFAULT_POLL_INTERVAL_MS, |i| i.as_millis() as i32);
let row = sqlx::query!(
"SELECT * from pgmq_read_with_poll($1::text, $2, $3, $4, $5)",
queue_name,
vt,
max_batch_size,
poll_timeout_s,
poll_interval_ms
)
.fetch_optional(&self.connection)
.await?;
match row {
Some(row) => {
// happy path - successfully read a message
let raw_msg = row.message.expect("no message");
let parsed_msg = serde_json::from_value::<T>(raw_msg)?;
Ok(Some(Message {
msg_id: row.msg_id.expect("msg_id missing from queue table"),
vt: row.vt.expect("vt missing from queue table"),
read_ct: row.read_ct.expect("read_ct missing from queue table"),
enqueued_at: row
.enqueued_at
.expect("enqueued_at missing from queue table"),
message: parsed_msg,
}))
}
None => {
// no message found
Ok(None)
}
}
}

/// Move a message to the archive table.
pub async fn archive(&self, queue_name: &str, msg_id: i64) -> Result<bool, PgmqError> {
check_input(queue_name)?;
Expand Down
3 changes: 1 addition & 2 deletions tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,7 @@ async fn test_lifecycle() {
assert!(message.is_none());

// read again, now using poll to block until message is ready
let query =
&format!("SELECT * from pgmq_read_with_poll('{test_default_queue}', 10, 1, 10000);");
let query = &format!("SELECT * from pgmq_read_with_poll('{test_default_queue}', 10, 1, 10);");
let message = fetch_one_message::<serde_json::Value>(query, &conn)
.await
.expect("failed reading message")
Expand Down

0 comments on commit 051517b

Please sign in to comment.