From 051517b471c69e342e88d7c31f4fe3c1af20d7be Mon Sep 17 00:00:00 2001 From: v0idpwn Date: Fri, 18 Aug 2023 12:42:30 +0200 Subject: [PATCH] Add read_batch_with_queue to queueext --- core/src/pg_ext.rs | 47 ++++++++++++++++++++++++++++++++++++++ tests/integration_tests.rs | 3 +-- 2 files changed, 48 insertions(+), 2 deletions(-) diff --git a/core/src/pg_ext.rs b/core/src/pg_ext.rs index 18e8a7d9..3b36da7f 100644 --- a/core/src/pg_ext.rs +++ b/core/src/pg_ext.rs @@ -7,6 +7,9 @@ use serde::{Deserialize, Serialize}; use sqlx::types::chrono::Utc; use sqlx::{Executor, Pool, Postgres}; +const DEFAULT_POLL_TIMEOUT_S: i32 = 5; +const DEFAULT_POLL_INTERVAL_MS: i32 = 250; + /// Main controller for interacting with a managed by the PGMQ Postgres extension. #[derive(Clone, Debug)] pub struct PGMQueueExt { @@ -194,6 +197,50 @@ impl PGMQueueExt { } } + pub async fn read_batch_with_poll Deserialize<'de>>( + &self, + queue_name: &str, + vt: i32, + max_batch_size: i32, + poll_timeout: Option, + poll_interval: Option, + ) -> Result>, PgmqError> { + check_input(queue_name)?; + let poll_timeout_s = poll_timeout.map_or(DEFAULT_POLL_TIMEOUT_S, |t| t.as_secs() as i32); + let poll_interval_ms = + poll_interval_ms.map_or(DEFAULT_POLL_INTERVAL_MS, |i| i.as_millis() as i32); + let row = sqlx::query!( + "SELECT * from pgmq_read_with_poll($1::text, $2, $3, $4, $5)", + queue_name, + vt, + max_batch_size, + poll_timeout_s, + poll_interval_ms + ) + .fetch_optional(&self.connection) + .await?; + match row { + Some(row) => { + // happy path - successfully read a message + let raw_msg = row.message.expect("no message"); + let parsed_msg = serde_json::from_value::(raw_msg)?; + Ok(Some(Message { + msg_id: row.msg_id.expect("msg_id missing from queue table"), + vt: row.vt.expect("vt missing from queue table"), + read_ct: row.read_ct.expect("read_ct missing from queue table"), + enqueued_at: row + .enqueued_at + .expect("enqueued_at missing from queue table"), + message: parsed_msg, + })) + } + None => { + // no message found + Ok(None) + } + } + } + /// Move a message to the archive table. pub async fn archive(&self, queue_name: &str, msg_id: i64) -> Result { check_input(queue_name)?; diff --git a/tests/integration_tests.rs b/tests/integration_tests.rs index a0879af7..a60f4b48 100644 --- a/tests/integration_tests.rs +++ b/tests/integration_tests.rs @@ -82,8 +82,7 @@ async fn test_lifecycle() { assert!(message.is_none()); // read again, now using poll to block until message is ready - let query = - &format!("SELECT * from pgmq_read_with_poll('{test_default_queue}', 10, 1, 10000);"); + let query = &format!("SELECT * from pgmq_read_with_poll('{test_default_queue}', 10, 1, 10);"); let message = fetch_one_message::(query, &conn) .await .expect("failed reading message")