From d20da02746578e56a9a119290ec6b2f1bb077f6e Mon Sep 17 00:00:00 2001 From: David Bonet Date: Fri, 2 Feb 2024 02:46:46 +0100 Subject: [PATCH] fix queue size -1 failing to be retrieved --- Cargo.toml | 2 +- src/functions.rs | 16 ++++++++-------- src/types.rs | 2 +- tests/test.rs | 20 ++++++++++++++++++++ 4 files changed, 30 insertions(+), 10 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index d391b8a..828b289 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,7 +4,7 @@ version = "8.0.1" authors = [ "David Bonet " ] -edition = "2018" +edition = "2021" license = "MIT" description = "Async RSMQ port to rust. RSMQ is a simple redis queue system that works in any redis v2.4+. It contains the same methods as the original one in https://github.com/smrchy/rsmq" homepage = "https://crates.io/crates/rsmq_async" diff --git a/src/functions.rs b/src/functions.rs index a264202..906904e 100644 --- a/src/functions.rs +++ b/src/functions.rs @@ -197,7 +197,7 @@ impl RsmqFunctions { let time: (u64, u64) = redis::cmd("TIME").query_async(conn).await?; - let result: (Vec>, u64, u64) = pipe() + let result: (Vec>, u64, u64) = pipe() .atomic() .cmd("HMGET") .arg(format!("{}:Q", key)) @@ -228,19 +228,19 @@ impl RsmqFunctions { .0 .first() .and_then(Option::as_ref) - .map(|dur| Duration::from_millis(*dur)) + .map(|dur| Duration::from_millis((*dur).try_into().unwrap_or(0))) .unwrap_or(Duration::ZERO), delay: result .0 .get(1) .and_then(Option::as_ref) - .map(|dur| Duration::from_millis(*dur)) + .map(|dur| Duration::from_millis((*dur).try_into().unwrap_or(0))) .unwrap_or(Duration::ZERO), maxsize: result.0.get(2).unwrap_or(&Some(0)).unwrap_or(0), - totalrecv: result.0.get(3).unwrap_or(&Some(0)).unwrap_or(0), - totalsent: result.0.get(4).unwrap_or(&Some(0)).unwrap_or(0), - created: result.0.get(5).unwrap_or(&Some(0)).unwrap_or(0), - modified: result.0.get(6).unwrap_or(&Some(0)).unwrap_or(0), + totalrecv: u64::try_from(result.0.get(3).unwrap_or(&Some(0)).unwrap_or(0)).unwrap_or(0), + totalsent: u64::try_from(result.0.get(4).unwrap_or(&Some(0)).unwrap_or(0)).unwrap_or(0), + created: u64::try_from(result.0.get(5).unwrap_or(&Some(0)).unwrap_or(0)).unwrap_or(0), + modified: u64::try_from(result.0.get(6).unwrap_or(&Some(0)).unwrap_or(0)).unwrap_or(0), msgs: result.1, hiddenmsgs: result.2, }) @@ -473,7 +473,7 @@ impl RsmqFunctions { let time_millis = (result.1).0 * 1000; let (hmget_first, hmget_second, hmget_third) = - match (result.0.get(0), result.0.get(1), result.0.get(2)) { + match (result.0.first(), result.0.get(1), result.0.get(2)) { (Some(Some(v0)), Some(Some(v1)), Some(Some(v2))) => (v0, v1, v2), _ => return Err(RsmqError::QueueNotFound), }; diff --git a/src/types.rs b/src/types.rs index 8528418..7b77b77 100644 --- a/src/types.rs +++ b/src/types.rs @@ -67,7 +67,7 @@ pub struct RsmqQueueAttributes { /// since it was sent pub delay: Duration, /// Max size of the message in bytes in the queue - pub maxsize: u64, + pub maxsize: i64, /// Number of messages received by the queue pub totalrecv: u64, /// Number of messages sent by the queue diff --git a/tests/test.rs b/tests/test.rs index 6bac970..3be756d 100644 --- a/tests/test.rs +++ b/tests/test.rs @@ -426,3 +426,23 @@ fn change_message_visibility() { rsmq.delete_queue("queue6").await.unwrap(); }) } + +#[test] +fn change_queue_size() { + let rt = tokio::runtime::Runtime::new().unwrap(); + + rt.block_on(async move { + let ctx = TestContext::new(); + let connection = ctx.async_connection().await.unwrap(); + let mut rsmq = Rsmq::new_with_connection(connection, false, None); + + rsmq.create_queue("queue6", None, None, None).await.unwrap(); + + rsmq.set_queue_attributes("queue6", None, None, Some(-1)).await.unwrap(); + + let attributes = rsmq.get_queue_attributes("queue6").await.unwrap(); + + assert_eq!(attributes.maxsize, -1); + + }) +}