Skip to content

Commit

Permalink
Add read_batch_with_poll to client crate
Browse files Browse the repository at this point in the history
  • Loading branch information
v0idpwn committed Aug 17, 2023
1 parent ce0bc8e commit ca79a3e
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 3 deletions.
28 changes: 28 additions & 0 deletions core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,12 @@ pub mod util;
pub use pg_ext::PGMQueueExt;
use util::fetch_one_message;

use std::time::Duration;

const VT_DEFAULT: i32 = 30;
const READ_LIMIT_DEFAULT: i32 = 1;
const POLL_TIMEOUT_S_DEFAULT : u128 = 5;
const POLL_INTERVAL_MS_DEFAULT : u128 = 250;

/// Message struct received from the queue
///
Expand Down Expand Up @@ -546,6 +550,30 @@ impl PGMQueue {
Ok(message)
}

pub async fn read_batch_with_poll<T: for<'de> Deserialize<'de>>(
&self,
queue_name: &str,
vt: Option<i32>,
max_batch_size: i32,
poll_timeout_s: Option<u128>,
poll_interval_ms: Option<u128>
) -> Result<Option<Vec<Message<T>>>, errors::PgmqError> {
let vt_ = vt.unwrap_or(VT_DEFAULT);
let poll_timeout_s_ = poll_timeout_s.unwrap_or(POLL_TIMEOUT_S_DEFAULT);
let poll_interval_ms_ = poll_interval_ms.unwrap_or(POLL_INTERVAL_MS_DEFAULT);
let start_time = std::time::Instant::now();
loop {
let query = &query::read(queue_name, vt_, max_batch_size)?;
let messages = fetch_messages::<T>(query, &self.connection).await?;
if messages.is_none() && start_time.elapsed().as_millis() > (poll_timeout_s_ * 1000) {
tokio::time::sleep(Duration::from_millis(poll_interval_ms_.try_into().unwrap())).await;
continue;
} else {
break Ok(messages);
}
}
}

/// Reads a specified number of messages (num_msgs) from the queue.
/// Any messages that are returned are made invisible for the duration of the visibility timeout (vt) in seconds.
///
Expand Down
6 changes: 3 additions & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ use pgmq_crate::query::{
};
use thiserror::Error;

use std::time::Duration;

#[derive(Error, Debug)]
pub enum PgmqExtError {
#[error("")]
Expand Down Expand Up @@ -153,9 +155,7 @@ fn pgmq_read_with_poll(
if start_time.elapsed().as_millis() > (poll_timeout_s * 1000) as u128 {
break Ok(TableIterator::new(results));
} else {
std::thread::sleep(std::time::Duration::from_millis(
poll_interval_ms.try_into().unwrap(),
));
std::thread::sleep(Duration::from_millis(poll_interval_ms.try_into().unwrap()));
continue;
}
} else {
Expand Down

0 comments on commit ca79a3e

Please sign in to comment.