Skip to content

Commit

Permalink
change times to use Duration type. Breaking change
Browse files Browse the repository at this point in the history
  • Loading branch information
DavidBM committed Nov 19, 2023
1 parent 41c8b63 commit a55cd7b
Show file tree
Hide file tree
Showing 8 changed files with 114 additions and 76 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "rsmq_async"
version = "7.0.2"
version = "8.0.0"
authors = [
"David Bonet <[email protected]>"
]
Expand Down
78 changes: 51 additions & 27 deletions src/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use radix_fmt::radix_36;
use rand::seq::IteratorRandom;
use redis::{aio::ConnectionLike, pipe, Script};
use std::convert::TryInto;
use std::time::Duration;

lazy_static! {
static ref CHANGE_MESSAGE_VISIVILITY: Script =
Expand Down Expand Up @@ -39,11 +40,13 @@ impl<T: ConnectionLike> RsmqFunctions<T> {
conn: &mut T,
qname: &str,
message_id: &str,
seconds_hidden: u64,
seconds_hidden: Duration,
) -> RsmqResult<()> {
let seconds_hidden = get_redis_duration(Some(seconds_hidden), &Duration::from_secs(30));

let queue = self.get_queue(conn, qname, false).await?;

number_in_range(seconds_hidden, 0, 9_999_999)?;
number_in_range(seconds_hidden, 0, 9_999_999_000)?;

CHANGE_MESSAGE_VISIVILITY
.key(format!("{}{}", self.ns, qname))
Expand All @@ -66,18 +69,18 @@ impl<T: ConnectionLike> RsmqFunctions<T> {
&self,
conn: &mut T,
qname: &str,
seconds_hidden: Option<u32>,
delay: Option<u32>,
seconds_hidden: Option<Duration>,
delay: Option<Duration>,
maxsize: Option<i32>,
) -> RsmqResult<()> {
valid_name_format(qname)?;

let key = format!("{}{}:Q", self.ns, qname);
let seconds_hidden = seconds_hidden.unwrap_or(30);
let delay = delay.unwrap_or(0);
let seconds_hidden = get_redis_duration(seconds_hidden, &Duration::from_secs(30));
let delay = get_redis_duration(delay, &Duration::ZERO);
let maxsize = maxsize.unwrap_or(65536);

number_in_range(seconds_hidden, 0, 9_999_999)?;
number_in_range(seconds_hidden, 0, 9_999_999_000)?;
number_in_range(delay, 0, 9_999_999)?;
if let Err(error) = number_in_range(maxsize, 1024, 65536) {
if maxsize != -1 {
Expand Down Expand Up @@ -219,8 +222,18 @@ impl<T: ConnectionLike> RsmqFunctions<T> {
}

Ok(RsmqQueueAttributes {
vt: result.0.first().unwrap_or(&Some(0)).unwrap_or(0),
delay: result.0.get(1).unwrap_or(&Some(0)).unwrap_or(0),
vt: result
.0
.first()
.and_then(Option::as_ref)
.map(|dur| Duration::from_millis(*dur))
.unwrap_or(Duration::ZERO),
delay: result
.0
.get(1)
.and_then(Option::as_ref)
.map(|dur| Duration::from_millis(*dur))
.unwrap_or(Duration::ZERO),
maxsize: result.0.get(2).unwrap_or(&Some(0)).unwrap_or(0),
totalrecv: result.0.get(3).unwrap_or(&Some(0)).unwrap_or(0),
totalsent: result.0.get(4).unwrap_or(&Some(0)).unwrap_or(0),
Expand Down Expand Up @@ -277,13 +290,12 @@ impl<T: ConnectionLike> RsmqFunctions<T> {
&self,
conn: &mut T,
qname: &str,
seconds_hidden: Option<u64>,
seconds_hidden: Option<Duration>,
) -> RsmqResult<Option<RsmqMessage<E>>> {
let queue = self.get_queue(conn, qname, false).await?;

let seconds_hidden = seconds_hidden.unwrap_or(queue.vt);

number_in_range(seconds_hidden, 0, 9_999_999)?;
let seconds_hidden = get_redis_duration(seconds_hidden, &queue.vt);
number_in_range(seconds_hidden, 0, 9_999_999_000)?;

let result: (bool, String, Vec<u8>, u64, u64) = RECEIVE_MESSAGE
.key(format!("{}{}", self.ns, qname))
Expand Down Expand Up @@ -313,11 +325,11 @@ impl<T: ConnectionLike> RsmqFunctions<T> {
conn: &mut T,
qname: &str,
message: E,
delay: Option<u64>,
delay: Option<Duration>,
) -> RsmqResult<String> {
let queue = self.get_queue(conn, qname, true).await?;

let delay = delay.unwrap_or(queue.delay);
let delay = get_redis_duration(delay, &queue.delay);
let key = format!("{}{}", self.ns, qname);

number_in_range(delay, 0, 9_999_999)?;
Expand Down Expand Up @@ -386,8 +398,8 @@ impl<T: ConnectionLike> RsmqFunctions<T> {
&self,
conn: &mut T,
qname: &str,
seconds_hidden: Option<u64>,
delay: Option<u64>,
seconds_hidden: Option<Duration>,
delay: Option<Duration>,
maxsize: Option<i64>,
) -> RsmqResult<RsmqQueueAttributes> {
self.get_queue(conn, qname, false).await?;
Expand All @@ -405,16 +417,18 @@ impl<T: ConnectionLike> RsmqFunctions<T> {
.arg("modified")
.arg(time.0);

if let Some(duration) = seconds_hidden {
number_in_range(duration, 0, 9_999_999)?;
if seconds_hidden.is_some() {
let duration = get_redis_duration(seconds_hidden, &Duration::from_secs(30));
number_in_range(duration, 0, 9_999_999_000)?;
commands = commands
.cmd("HSET")
.arg(&queue_name)
.arg("vt")
.arg(duration);
}

if let Some(delay) = delay {
if delay.is_some() {
let delay = get_redis_duration(delay, &Duration::ZERO);
number_in_range(delay, 0, 9_999_999)?;
commands = commands
.cmd("HSET")
Expand Down Expand Up @@ -454,7 +468,7 @@ impl<T: ConnectionLike> RsmqFunctions<T> {
.query_async(conn)
.await?;

let time_seconds = (result.1).0;
let time_millis = (result.1).0 * 1000;

let (hmget_first, hmget_second, hmget_third) =
match (result.0.get(0), result.0.get(1), result.0.get(2)) {
Expand All @@ -463,20 +477,22 @@ impl<T: ConnectionLike> RsmqFunctions<T> {
};

let quid = if uid {
Some(radix_36(time_seconds).to_string() + &RsmqFunctions::<T>::make_id(22)?)
Some(radix_36(time_millis).to_string() + &RsmqFunctions::<T>::make_id(22)?)
} else {
None
};

Ok(QueueDescriptor {
vt: hmget_first.parse().map_err(|_| RsmqError::CannotParseVT)?,
delay: hmget_second
.parse()
.map_err(|_| RsmqError::CannotParseDelay)?,
vt: Duration::from_millis(hmget_first.parse().map_err(|_| RsmqError::CannotParseVT)?),
delay: Duration::from_millis(
hmget_second
.parse()
.map_err(|_| RsmqError::CannotParseDelay)?,
),
maxsize: hmget_third
.parse()
.map_err(|_| RsmqError::CannotParseMaxsize)?,
ts: time_seconds,
ts: time_millis,
uid: quid,
})
}
Expand Down Expand Up @@ -527,3 +543,11 @@ fn valid_name_format(name: &str) -> RsmqResult<()> {

Ok(())
}

fn get_redis_duration(d: Option<Duration>, default: &Duration) -> u64 {
d.as_ref()
.map(Duration::as_millis)
.map(u64::try_from)
.and_then(Result::ok)
.unwrap_or_else(|| u64::try_from(default.as_millis()).ok().unwrap_or(30_000))
}
15 changes: 8 additions & 7 deletions src/multiplexed_facade.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::types::{RedisBytes, RsmqMessage, RsmqOptions, RsmqQueueAttributes};
use crate::RsmqResult;
use core::convert::TryFrom;
use core::marker::PhantomData;
use std::time::Duration;

#[derive(Clone)]
struct RedisConnection(redis::aio::MultiplexedConnection);
Expand Down Expand Up @@ -66,7 +67,7 @@ impl RsmqConnection for MultiplexedRsmq {
&mut self,
qname: &str,
message_id: &str,
seconds_hidden: u64,
seconds_hidden: Duration,
) -> RsmqResult<()> {
self.functions
.change_message_visibility(&mut self.connection.0, qname, message_id, seconds_hidden)
Expand All @@ -76,8 +77,8 @@ impl RsmqConnection for MultiplexedRsmq {
async fn create_queue(
&mut self,
qname: &str,
seconds_hidden: Option<u32>,
delay: Option<u32>,
seconds_hidden: Option<Duration>,
delay: Option<Duration>,
maxsize: Option<i32>,
) -> RsmqResult<()> {
self.functions
Expand Down Expand Up @@ -123,7 +124,7 @@ impl RsmqConnection for MultiplexedRsmq {
async fn receive_message<E: TryFrom<RedisBytes, Error = Vec<u8>>>(
&mut self,
qname: &str,
seconds_hidden: Option<u64>,
seconds_hidden: Option<Duration>,
) -> RsmqResult<Option<RsmqMessage<E>>> {
self.functions
.receive_message::<E>(&mut self.connection.0, qname, seconds_hidden)
Expand All @@ -134,7 +135,7 @@ impl RsmqConnection for MultiplexedRsmq {
&mut self,
qname: &str,
message: E,
delay: Option<u64>,
delay: Option<Duration>,
) -> RsmqResult<String> {
self.functions
.send_message(&mut self.connection.0, qname, message, delay)
Expand All @@ -144,8 +145,8 @@ impl RsmqConnection for MultiplexedRsmq {
async fn set_queue_attributes(
&mut self,
qname: &str,
seconds_hidden: Option<u64>,
delay: Option<u64>,
seconds_hidden: Option<Duration>,
delay: Option<Duration>,
maxsize: Option<i64>,
) -> RsmqResult<RsmqQueueAttributes> {
self.functions
Expand Down
15 changes: 8 additions & 7 deletions src/normal_facade.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::types::{RedisBytes, RsmqMessage, RsmqOptions, RsmqQueueAttributes};
use crate::RsmqResult;
use core::convert::TryFrom;
use core::marker::PhantomData;
use std::time::Duration;

struct RedisConnection(redis::aio::Connection);

Expand Down Expand Up @@ -65,7 +66,7 @@ impl RsmqConnection for Rsmq {
&mut self,
qname: &str,
message_id: &str,
seconds_hidden: u64,
seconds_hidden: Duration,
) -> RsmqResult<()> {
self.functions
.change_message_visibility(&mut self.connection.0, qname, message_id, seconds_hidden)
Expand All @@ -75,8 +76,8 @@ impl RsmqConnection for Rsmq {
async fn create_queue(
&mut self,
qname: &str,
seconds_hidden: Option<u32>,
delay: Option<u32>,
seconds_hidden: Option<Duration>,
delay: Option<Duration>,
maxsize: Option<i32>,
) -> RsmqResult<()> {
self.functions
Expand Down Expand Up @@ -122,7 +123,7 @@ impl RsmqConnection for Rsmq {
async fn receive_message<E: TryFrom<RedisBytes, Error = Vec<u8>>>(
&mut self,
qname: &str,
seconds_hidden: Option<u64>,
seconds_hidden: Option<Duration>,
) -> RsmqResult<Option<RsmqMessage<E>>> {
self.functions
.receive_message::<E>(&mut self.connection.0, qname, seconds_hidden)
Expand All @@ -133,7 +134,7 @@ impl RsmqConnection for Rsmq {
&mut self,
qname: &str,
message: E,
delay: Option<u64>,
delay: Option<Duration>,
) -> RsmqResult<String> {
self.functions
.send_message(&mut self.connection.0, qname, message, delay)
Expand All @@ -143,8 +144,8 @@ impl RsmqConnection for Rsmq {
async fn set_queue_attributes(
&mut self,
qname: &str,
seconds_hidden: Option<u64>,
delay: Option<u64>,
seconds_hidden: Option<Duration>,
delay: Option<Duration>,
maxsize: Option<i64>,
) -> RsmqResult<RsmqQueueAttributes> {
self.functions
Expand Down
15 changes: 8 additions & 7 deletions src/pooled_facade.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use async_trait::async_trait;
use core::convert::TryFrom;
use redis::RedisError;
use std::marker::PhantomData;
use std::time::Duration;

#[derive(Clone, Debug)]
pub struct RedisConnectionManager {
Expand Down Expand Up @@ -104,7 +105,7 @@ impl RsmqConnection for PooledRsmq {
&mut self,
qname: &str,
message_id: &str,
seconds_hidden: u64,
seconds_hidden: Duration,
) -> RsmqResult<()> {
let mut conn = self.pool.get().await?;

Expand All @@ -116,8 +117,8 @@ impl RsmqConnection for PooledRsmq {
async fn create_queue(
&mut self,
qname: &str,
seconds_hidden: Option<u32>,
delay: Option<u32>,
seconds_hidden: Option<Duration>,
delay: Option<Duration>,
maxsize: Option<i32>,
) -> RsmqResult<()> {
let mut conn = self.pool.get().await?;
Expand Down Expand Up @@ -161,7 +162,7 @@ impl RsmqConnection for PooledRsmq {
async fn receive_message<E: TryFrom<RedisBytes, Error = Vec<u8>>>(
&mut self,
qname: &str,
seconds_hidden: Option<u64>,
seconds_hidden: Option<Duration>,
) -> RsmqResult<Option<RsmqMessage<E>>> {
let mut conn = self.pool.get().await?;

Expand All @@ -174,7 +175,7 @@ impl RsmqConnection for PooledRsmq {
&mut self,
qname: &str,
message: E,
delay: Option<u64>,
delay: Option<Duration>,
) -> RsmqResult<String> {
let mut conn = self.pool.get().await?;

Expand All @@ -186,8 +187,8 @@ impl RsmqConnection for PooledRsmq {
async fn set_queue_attributes(
&mut self,
qname: &str,
seconds_hidden: Option<u64>,
delay: Option<u64>,
seconds_hidden: Option<Duration>,
delay: Option<Duration>,
maxsize: Option<i64>,
) -> RsmqResult<RsmqQueueAttributes> {
let mut conn = self.pool.get().await?;
Expand Down
Loading

0 comments on commit a55cd7b

Please sign in to comment.