From a28bf201ee8072dbfe9705ddd3065412e4c23e04 Mon Sep 17 00:00:00 2001 From: v0idpwn Date: Fri, 18 Aug 2023 12:10:29 +0200 Subject: [PATCH] Test/improve read_batch_with_poll --- core/src/lib.rs | 69 ++++++++++++++++++++++------------ core/tests/integration_test.rs | 69 +++++++++++++++++++++++++++++++++- 2 files changed, 113 insertions(+), 25 deletions(-) diff --git a/core/src/lib.rs b/core/src/lib.rs index bf18dc69..0c4b7fd3 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -550,30 +550,6 @@ impl PGMQueue { Ok(message) } - pub async fn read_batch_with_poll Deserialize<'de>>( - &self, - queue_name: &str, - vt: Option, - max_batch_size: i32, - poll_timeout: Option, - poll_interval: Option, - ) -> Result>>, errors::PgmqError> { - let vt_ = vt.unwrap_or(VT_DEFAULT); - let poll_timeout_ = poll_timeout.unwrap_or(POLL_TIMEOUT_DEFAULT); - let poll_interval_ = poll_interval.unwrap_or(POLL_INTERVAL_DEFAULT); - let start_time = std::time::Instant::now(); - loop { - let query = &query::read(queue_name, vt_, max_batch_size)?; - let messages = fetch_messages::(query, &self.connection).await?; - if messages.is_none() && start_time.elapsed() < poll_timeout_ { - tokio::time::sleep(poll_interval_).await; - continue; - } else { - break Ok(messages); - } - } - } - /// Reads a specified number of messages (num_msgs) from the queue. /// Any messages that are returned are made invisible for the duration of the visibility timeout (vt) in seconds. /// @@ -655,6 +631,51 @@ impl PGMQueue { Ok(messages) } + /// Similar to [`read_batch`], but allows waiting until a message is available + /// + /// You can specify a maximum duration for polling (defaults to 5 seconds), + /// and an interval between calls (defaults to 250ms). A lower interval + /// implies higher maximum latency, but less load on the database. + /// + /// Refer to the [`read_batch`] function for more details. + /// + pub async fn read_batch_with_poll Deserialize<'de>>( + &self, + queue_name: &str, + vt: Option, + max_batch_size: i32, + poll_timeout: Option, + poll_interval: Option, + ) -> Result>>, errors::PgmqError> { + let vt_ = vt.unwrap_or(VT_DEFAULT); + let poll_timeout_ = poll_timeout.unwrap_or(POLL_TIMEOUT_DEFAULT); + let poll_interval_ = poll_interval.unwrap_or(POLL_INTERVAL_DEFAULT); + let start_time = std::time::Instant::now(); + loop { + let query = &query::read(queue_name, vt_, max_batch_size)?; + let messages = fetch_messages::(query, &self.connection).await?; + // Why can fetch_messages return both `None` and `Option(())` + match messages { + None => { + if start_time.elapsed() < poll_timeout_ { + tokio::time::sleep(poll_interval_).await; + continue; + } else { + break Ok(None); + } + } + Some(m) => { + if m.is_empty() && start_time.elapsed() < poll_timeout_ { + tokio::time::sleep(poll_interval_).await; + continue; + } else { + break Ok(Some(m)); + } + } + } + } + } + /// Delete a message from the queue. /// This is a permanent delete and cannot be undone. If you want to retain a log of the message, /// use the [archive](#method.archive) method. diff --git a/core/tests/integration_test.rs b/core/tests/integration_test.rs index cd448b3e..338937e8 100644 --- a/core/tests/integration_test.rs +++ b/core/tests/integration_test.rs @@ -231,6 +231,73 @@ async fn test_send_delay() { assert!(one_messages.is_some()); } +#[tokio::test] +async fn test_read_batch_with_poll() { + let test_queue = "test_read_batch_with_poll".to_owned(); + + let queue = init_queue(&test_queue).await; + + // PUBLISH THREE MESSAGES + let msg = serde_json::json!({ + "foo": "bar1" + }); + let msg_id1 = queue.send(&test_queue, &msg).await.unwrap(); + assert_eq!(msg_id1, 1); + let msg_id2 = queue.send(&test_queue, &msg).await.unwrap(); + assert_eq!(msg_id2, 2); + let msg_id3 = queue.send(&test_queue, &msg).await.unwrap(); + assert_eq!(msg_id3, 3); + + // Reading from queue with a 5 seconds VT + let read_message_1 = queue + .read_batch_with_poll::( + &test_queue, + Some(5), + 5, + Some(std::time::Duration::from_secs(6)), + None, + ) + .await + .unwrap() + .unwrap(); + + assert_eq!(read_message_1.len(), 3); + + let starting_time = std::time::Instant::now(); + + // Since VT is 5 seconds, if we poll the queue, it takes around 5 seconds + // to return the result, and returns all 3 messages + let read_message_2 = queue + .read_batch_with_poll::( + &test_queue, + Some(5), + 5, + Some(std::time::Duration::from_secs(6)), + None, + ) + .await + .unwrap() + .unwrap(); + + assert_eq!(read_message_2.len(), 3); + assert!(starting_time.elapsed() > std::time::Duration::from_secs(3)); + + // If we don't poll for long enough, we get none + let read_message_3 = queue + .read_batch_with_poll::( + &test_queue, + Some(3), + 5, + Some(std::time::Duration::from_secs(1)), + None, + ) + .await + .unwrap() + .unwrap(); + + assert_eq!(read_message_3.len(), 0); +} + #[tokio::test] async fn test_read_batch() { let test_queue = "test_read_batch".to_owned(); @@ -722,7 +789,7 @@ async fn test_extension_api() { assert!(msg_id >= 1); let read_message = queue - .read::(&test_queue, 100) + .read::(&test_queue, 5) .await .expect("error reading message"); assert!(read_message.is_some());