Skip to content

Commit

Permalink
Test/improve read_batch_with_poll
Browse files Browse the repository at this point in the history
  • Loading branch information
v0idpwn committed Aug 18, 2023
1 parent 275da04 commit a28bf20
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 25 deletions.
69 changes: 45 additions & 24 deletions core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -550,30 +550,6 @@ impl PGMQueue {
Ok(message)
}

pub async fn read_batch_with_poll<T: for<'de> Deserialize<'de>>(
&self,
queue_name: &str,
vt: Option<i32>,
max_batch_size: i32,
poll_timeout: Option<Duration>,
poll_interval: Option<Duration>,
) -> Result<Option<Vec<Message<T>>>, errors::PgmqError> {
let vt_ = vt.unwrap_or(VT_DEFAULT);
let poll_timeout_ = poll_timeout.unwrap_or(POLL_TIMEOUT_DEFAULT);
let poll_interval_ = poll_interval.unwrap_or(POLL_INTERVAL_DEFAULT);
let start_time = std::time::Instant::now();
loop {
let query = &query::read(queue_name, vt_, max_batch_size)?;
let messages = fetch_messages::<T>(query, &self.connection).await?;
if messages.is_none() && start_time.elapsed() < poll_timeout_ {
tokio::time::sleep(poll_interval_).await;
continue;
} else {
break Ok(messages);
}
}
}

/// Reads a specified number of messages (num_msgs) from the queue.
/// Any messages that are returned are made invisible for the duration of the visibility timeout (vt) in seconds.
///
Expand Down Expand Up @@ -655,6 +631,51 @@ impl PGMQueue {
Ok(messages)
}

/// Similar to [`read_batch`], but allows waiting until a message is available
///
/// You can specify a maximum duration for polling (defaults to 5 seconds),
/// and an interval between calls (defaults to 250ms). A lower interval
/// implies higher maximum latency, but less load on the database.
///
/// Refer to the [`read_batch`] function for more details.
///
pub async fn read_batch_with_poll<T: for<'de> Deserialize<'de>>(
&self,
queue_name: &str,
vt: Option<i32>,
max_batch_size: i32,
poll_timeout: Option<Duration>,
poll_interval: Option<Duration>,
) -> Result<Option<Vec<Message<T>>>, errors::PgmqError> {
let vt_ = vt.unwrap_or(VT_DEFAULT);
let poll_timeout_ = poll_timeout.unwrap_or(POLL_TIMEOUT_DEFAULT);
let poll_interval_ = poll_interval.unwrap_or(POLL_INTERVAL_DEFAULT);
let start_time = std::time::Instant::now();
loop {
let query = &query::read(queue_name, vt_, max_batch_size)?;
let messages = fetch_messages::<T>(query, &self.connection).await?;
// Why can fetch_messages return both `None` and `Option(())`
match messages {
None => {
if start_time.elapsed() < poll_timeout_ {
tokio::time::sleep(poll_interval_).await;
continue;
} else {
break Ok(None);
}
}
Some(m) => {
if m.is_empty() && start_time.elapsed() < poll_timeout_ {
tokio::time::sleep(poll_interval_).await;
continue;
} else {
break Ok(Some(m));
}
}
}
}
}

/// Delete a message from the queue.
/// This is a permanent delete and cannot be undone. If you want to retain a log of the message,
/// use the [archive](#method.archive) method.
Expand Down
69 changes: 68 additions & 1 deletion core/tests/integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,73 @@ async fn test_send_delay() {
assert!(one_messages.is_some());
}

#[tokio::test]
async fn test_read_batch_with_poll() {
let test_queue = "test_read_batch_with_poll".to_owned();

let queue = init_queue(&test_queue).await;

// PUBLISH THREE MESSAGES
let msg = serde_json::json!({
"foo": "bar1"
});
let msg_id1 = queue.send(&test_queue, &msg).await.unwrap();
assert_eq!(msg_id1, 1);
let msg_id2 = queue.send(&test_queue, &msg).await.unwrap();
assert_eq!(msg_id2, 2);
let msg_id3 = queue.send(&test_queue, &msg).await.unwrap();
assert_eq!(msg_id3, 3);

// Reading from queue with a 5 seconds VT
let read_message_1 = queue
.read_batch_with_poll::<Value>(
&test_queue,
Some(5),
5,
Some(std::time::Duration::from_secs(6)),
None,
)
.await
.unwrap()
.unwrap();

assert_eq!(read_message_1.len(), 3);

let starting_time = std::time::Instant::now();

// Since VT is 5 seconds, if we poll the queue, it takes around 5 seconds
// to return the result, and returns all 3 messages
let read_message_2 = queue
.read_batch_with_poll::<Value>(
&test_queue,
Some(5),
5,
Some(std::time::Duration::from_secs(6)),
None,
)
.await
.unwrap()
.unwrap();

assert_eq!(read_message_2.len(), 3);
assert!(starting_time.elapsed() > std::time::Duration::from_secs(3));

// If we don't poll for long enough, we get none
let read_message_3 = queue
.read_batch_with_poll::<Value>(
&test_queue,
Some(3),
5,
Some(std::time::Duration::from_secs(1)),
None,
)
.await
.unwrap()
.unwrap();

assert_eq!(read_message_3.len(), 0);
}

#[tokio::test]
async fn test_read_batch() {
let test_queue = "test_read_batch".to_owned();
Expand Down Expand Up @@ -722,7 +789,7 @@ async fn test_extension_api() {
assert!(msg_id >= 1);

let read_message = queue
.read::<MyMessage>(&test_queue, 100)
.read::<MyMessage>(&test_queue, 5)
.await
.expect("error reading message");
assert!(read_message.is_some());
Expand Down

0 comments on commit a28bf20

Please sign in to comment.