From df8444cff901a0661ae3d5f1cbec9d78c96e1b80 Mon Sep 17 00:00:00 2001 From: David Bonet Date: Sun, 28 Apr 2024 10:47:15 +0200 Subject: [PATCH] Remove normal facade as the redis::aio::Connection is deprecated in favor of redis::aio::MultiplexedConnection. Keeping the pooled facade for specific workloads. --- Cargo.toml | 7 +- src/lib.rs | 130 ++++++++++++++++----------------- src/multiplexed_facade.rs | 16 ++-- src/normal_facade.rs | 149 -------------------------------------- src/pooled_facade.rs | 11 ++- src/trait.rs | 2 + src/types.rs | 8 +- tests/support/mod.rs | 4 +- 8 files changed, 90 insertions(+), 237 deletions(-) delete mode 100644 src/normal_facade.rs diff --git a/Cargo.toml b/Cargo.toml index 951d971..8984e88 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,9 +18,14 @@ rand = "0.8" radix_fmt = "1" bb8 = "0.8" thiserror = "1" -redis = { version = "0.23", features = ["tokio-comp", "async-std-comp"] } +redis = { version = "0.25", features = ["tokio-comp", "async-std-comp"] } async-trait = "0.1" [dev-dependencies] net2 = "0.2" tokio = { version = "1", features = ["rt-multi-thread"]} + +[features] +default = ["tokio-comp", "async-std-comp"] +tokio-comp = ["redis/tokio-comp"] +async-std-comp = ["redis/async-std-comp"] diff --git a/src/lib.rs b/src/lib.rs index ab8928b..5f36e7d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,12 +1,7 @@ //! # RSMQ in async Rust //! -//! RSMQ port to async rust. RSMQ is a simple redis queue system that works in any -//! redis v2.6+. It contains the same methods as the original one -//! in [https://github.com/smrchy/rsmq](https://github.com/smrchy/rsmq) -//! -//! This crate uses async in the implementation. If you want to use it in your sync -//! code you can use tokio/async_std "block_on" method. Async was used in order to -//! simplify the code and allow 1-to-1 port oft he JS code. +//! RSMQ port to async rust. RSMQ is a simple redis queue system that works in any redis v2.6+. It contains the same +//! methods as the original one in [https://github.com/smrchy/rsmq](https://github.com/smrchy/rsmq) //! //! [![Crates.io](https://img.shields.io/crates/v/rsmq_async)](https://crates.io/crates/rsmq_async) //! [![Crates.io](https://img.shields.io/crates/l/rsmq_async)](https://choosealicense.com/licenses/mit/) @@ -15,41 +10,35 @@ //! //! ## Example //! -//! ```rust,no_run -//! -//! use rsmq_async::{Rsmq, RsmqError, RsmqConnection}; +//! ```rust +//! # use rsmq_async::RsmqError; use rsmq_async::{Rsmq, RsmqConnection}; //! -//! # async fn it_works() -> Result<(), RsmqError> { -//! let mut rsmq = Rsmq::new(Default::default()).await?; +//! # async fn it_works() -> Result<(), RsmqError> { let mut rsmq = Rsmq::new(Default::default()).await?; //! //! let message = rsmq.receive_message::("myqueue", None).await?; //! -//! if let Some(message) = message { -//! rsmq.delete_message("myqueue", &message.id).await?; -//! } +//! if let Some(message) = message { rsmq.delete_message("myqueue", &message.id).await?; } //! //! # Ok(()) //! # } //! //! ``` //! -//! Main object documentation are in: Rsmq and -//! PooledRsmq and they both implement the trait -//! RsmqConnection where you can see all the RSMQ -//! methods. Make sure you always import the trait RsmqConnection. +//! Main object documentation are in: [`Rsmq`] and[`PooledRsmq`] and they both implement the trait +//! [`RsmqConnection`] where you can see all the RSMQ methods. Make sure you always import the trait +//! [`RsmqConnection`]. //! //! ## Installation //! //! Check [https://crates.io/crates/rsmq_async](https://crates.io/crates/rsmq_async) //! -//! //! ## Example //! -//! ```rust,no_run +//! ```rust +//! //! use rsmq_async::{Rsmq, RsmqConnection}; //! -//! async fn it_works() { -//! let mut rsmq = Rsmq::new(Default::default()) +//! async fn it_works() { let mut rsmq = Rsmq::new(Default::default()) //! .await //! .expect("connection failed"); //! @@ -66,83 +55,90 @@ //! .await //! .expect("cannot receive message"); //! -//! if let Some(message) = message { -//! rsmq.delete_message("myqueue", &message.id).await; -//! } -//! } +//! if let Some(message) = message { rsmq.delete_message("myqueue", &message.id).await; } } //! //! ``` //! //! ## Realtime //! -//! When [initializing](#initialize) RSMQ you can enable the realtime PUBLISH for -//! new messages. On every new message that gets sent to RSQM via `sendMessage` a -//! Redis PUBLISH will be issued to `{rsmq.ns}:rt:{qname}`. So, you can subscribe -//! to it using redis-rs library directly. +//! When initializing RSMQ you can enable the realtime PUBLISH for new messages. On every new message that gets sent to +//! RSQM via `sendMessage` a Redis PUBLISH will be issued to `{rsmq.ns}:rt:{qname}`. So, you can subscribe to it using +//! redis-rs library directly. //! //! ### How to use the realtime option //! -//! Besides the PUBLISH when a new message is sent to RSMQ nothing else will happen. -//! Your app could use the Redis SUBSCRIBE command to be notified of new messages -//! and issue a `receiveMessage` then. However make sure not to listen with multiple -//! workers for new messages with SUBSCRIBE to prevent multiple simultaneous -//! `receiveMessage` calls. +//! Besides the PUBLISH redis command when a new message is sent to RSMQ nothing else will happen. Your app could use +//! the Redis SUBSCRIBE command to be notified of new messages and issue a `receiveMessage` then. However make sure not +//! to listen with multiple workers for new messages with SUBSCRIBE to prevent multiple simultaneous `receiveMessage` +//! calls. //! //! ## Guarantees //! -//! If you want to implement "at least one delivery" guarantee, you need to receive -//! the messages using "receive_message" and then, once the message is successfully -//! processed, delete it with "delete_message". +//! If you want to implement "at least one delivery" guarantee, you need to receive the messages using "receive_message" +//! and then, once the message is successfully processed, delete it with "delete_message". //! //! ## Connection Pool //! -//! If you want to use a connection pool, just use PooledRsmq -//! instad of Rsmq. It implements the RsmqConnection trait as the normal Rsmq. +//! If you want to use a connection pool, just use [`PooledRsmq`] instad of Rsmq. It implements the RsmqConnection trait +//! as the normal Rsmq. //! -//! If you want to accept any of both implementation, just accept the trait -//! RsmqConnection +//! If you want to accept any of both implementation, just accept the trait [`RsmqConnection`] //! //! ## Executor compatibility //! -//! Since version 0.16 [where this pull request was merged](https://github.com/mitsuhiko/redis-rs/issues/280) -//! redis-rs dependency supports tokio and async_std executors. By default it will -//! guess what you are using when creating the connection. You can check -//! [redis-rs](https://github.com/mitsuhiko/redis-rs/blob/master/Cargo.toml) `Cargo.tolm` for -//! the flags `async-std-comp` and `tokio-comp` in order to choose one or the other. If you don't select -//! any it should be able to automatically choose the correct one. +//! By default it will intruct redis-rs library to enable async-std and tokio compatibility and choose Tokio +//! if Tokio is avaialble, async-std if not. If you want to choose, you can change the `Cargo.toml` definition to +//! +//! ```toml +//! +//! rsmq_async = { version = "9", default-features = false, features = ["tokio-comp"] } +//! +//! ``` +//! +//! Where `"tokio-comp"` can also be `"async-std-comp"`. +//! +//! ## `Rsmq` vs `PooledRsmq` +//! +//! In almost all workloads you might prefer the `Rsmq` object, as it works with a multiplexed connection. +//! +//! For specific workloads, where you might be sending a lof of data (images, documents, big blobs) you might prefer to +//! use the `PooledRsmq` and configure it with `PoolOptions`. +//! +//! They both use the `redis::aio::MultiplexedConnection`, but the pooled connection can be configured to spawn several +//! of those, so one operation won't block the other. //! //! ## Response types //! //! There are 3 functions that take generic types: //! -//! - `pop_message` and `receive_message`: Where the type for the received message is -//! `RsmqMessage` where `E: TryFrom>`. So, If you have custom type, you can implement the trait -//! `TryFrom` for `YourCustomType` and use it like: `rsmq.receive_message::("myqueue", None)`. -//! Implementations are provided for `String` and `Vec`. -//! - `send_message` where the message to send needs to implement `Into + Send`. So you will -//! need to implement the trait for your type. You can check the implementations for the type RedisBytes and see how -//! we did it. Implementations are provided for `String`, `&str` and `Vec`. +//! - `pop_message` and `receive_message`: Where the type for the received message is `RsmqMessage` where `E: +//! TryFrom>`. So, If you have custom type, you can implement the trait +//! `TryFrom` for `YourCustomType` and use it like: `rsmq.receive_message:: +//! ("myqueue", None)`. Implementations are provided for `String` and `Vec`. +//! - `send_message` where the message to send needs to implement `Into + Send`. So you will need to +//! implement the trait for your type. You can check the implementations for the type RedisBytes and see how we did +//! it. Implementations are provided for `String`, `&str` and `Vec`. //! -//! All this is because strings in Rust are very convenient to use for json messages, so always returning a Vec -//! may not be the most ergonomic solution. But at the same time, we can just add some already made implementations -//! for it and you can just use it with your type or, if you are sending, let's say, images, just use the method -//! like: `rsmq.receive_message::>("myqueue", None)` and transform it later to your type. (Or just implement -//! the TryFrom for your type and the transformation will be automatic.) +//! All this is because strings in Rust are very convenient to use for json messages, so always returning a `Vec` +//! may not be the most ergonomic solution. But at the same time, we can just add some already made implementations for +//! it and you can just use it with your type or, if you are sending, let's say, images, just use the method like: +//! `rsmq.receive_message::>("myqueue", None)` and transform it later to your type. (Or just implement the +//! `TryFrom` for your type and the transformation will be automatic.) //! //! ### Example for implementing a custom type //! //! ```rust,ignore -//! impl TryFrom for String { //! -//! // We sacrifice the ability of recovering the original error for the ability of having the -//! // original data. If you know how to conserver both, let me know! -//! +//! impl TryFrom for String { +//! //! type Error = Vec; // Always set Error as Vec; //! //! fn try_from(bytes: RedisBytes) -> Result { //! String::from_utf8(bytes.0).map_err(|e| e.into_bytes()) //! } +//! //! } +//! //! ``` //! @@ -151,15 +147,13 @@ mod error; mod functions; mod multiplexed_facade; -mod normal_facade; mod pooled_facade; mod r#trait; mod types; pub use error::RsmqError; pub use error::RsmqResult; -pub use multiplexed_facade::MultiplexedRsmq; -pub use normal_facade::Rsmq; +pub use multiplexed_facade::Rsmq; pub use pooled_facade::{PoolOptions, PooledRsmq}; pub use r#trait::RsmqConnection; pub use types::RedisBytes; diff --git a/src/multiplexed_facade.rs b/src/multiplexed_facade.rs index eb25b59..59478e6 100644 --- a/src/multiplexed_facade.rs +++ b/src/multiplexed_facade.rs @@ -11,19 +11,19 @@ struct RedisConnection(redis::aio::MultiplexedConnection); impl std::fmt::Debug for RedisConnection { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - write!(f, "RedisAsyncConnnection") + write!(f, "MultiplexedRedisAsyncConnnection") } } #[derive(Debug, Clone)] -pub struct MultiplexedRsmq { +pub struct Rsmq { connection: RedisConnection, functions: RsmqFunctions, } -impl MultiplexedRsmq { +impl Rsmq { /// Creates a new RSMQ instance, including its connection - pub async fn new(options: RsmqOptions) -> RsmqResult { + pub async fn new(options: RsmqOptions) -> RsmqResult { let conn_info = redis::ConnectionInfo { addr: redis::ConnectionAddr::Tcp(options.host, options.port), redis: redis::RedisConnectionInfo { @@ -37,7 +37,7 @@ impl MultiplexedRsmq { let connection = client.get_multiplexed_async_connection().await?; - Ok(MultiplexedRsmq::new_with_connection( + Ok(Rsmq::new_with_connection( connection, options.realtime, Some(&options.ns), @@ -49,8 +49,8 @@ impl MultiplexedRsmq { connection: redis::aio::MultiplexedConnection, realtime: bool, ns: Option<&str>, - ) -> MultiplexedRsmq { - MultiplexedRsmq { + ) -> Rsmq { + Rsmq { connection: RedisConnection(connection), functions: RsmqFunctions { ns: ns.unwrap_or("rsmq").to_string(), @@ -62,7 +62,7 @@ impl MultiplexedRsmq { } #[async_trait::async_trait] -impl RsmqConnection for MultiplexedRsmq { +impl RsmqConnection for Rsmq { async fn change_message_visibility( &mut self, qname: &str, diff --git a/src/normal_facade.rs b/src/normal_facade.rs deleted file mode 100644 index cd3c105..0000000 --- a/src/normal_facade.rs +++ /dev/null @@ -1,149 +0,0 @@ -use crate::functions::RsmqFunctions; -use crate::r#trait::RsmqConnection; -use crate::types::{RedisBytes, RsmqMessage, RsmqOptions, RsmqQueueAttributes}; -use crate::RsmqResult; -use core::convert::TryFrom; -use core::marker::PhantomData; -use std::time::Duration; - -struct RedisConnection(redis::aio::Connection); - -impl std::fmt::Debug for RedisConnection { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - write!(f, "RedisAsyncConnnection") - } -} - -#[derive(Debug)] -pub struct Rsmq { - connection: RedisConnection, - functions: RsmqFunctions, -} - -impl Rsmq { - /// Creates a new RSMQ instance, including its connection - pub async fn new(options: RsmqOptions) -> RsmqResult { - let conn_info = redis::ConnectionInfo { - addr: redis::ConnectionAddr::Tcp(options.host, options.port), - redis: redis::RedisConnectionInfo { - db: options.db.into(), - username: options.username, - password: options.password, - }, - }; - - let client = redis::Client::open(conn_info)?; - - let connection = client.get_async_connection().await?; - - Ok(Rsmq::new_with_connection( - connection, - options.realtime, - Some(&options.ns), - )) - } - - /// Special method for when you already have a redis-rs connection and you don't want redis_async to create a new one. - pub fn new_with_connection( - connection: redis::aio::Connection, - realtime: bool, - ns: Option<&str>, - ) -> Rsmq { - Rsmq { - connection: RedisConnection(connection), - functions: RsmqFunctions { - ns: ns.unwrap_or("rsmq").to_string(), - realtime, - conn: PhantomData, - }, - } - } -} - -#[async_trait::async_trait] -impl RsmqConnection for Rsmq { - async fn change_message_visibility( - &mut self, - qname: &str, - message_id: &str, - hidden: Duration, - ) -> RsmqResult<()> { - self.functions - .change_message_visibility(&mut self.connection.0, qname, message_id, hidden) - .await - } - - async fn create_queue( - &mut self, - qname: &str, - hidden: Option, - delay: Option, - maxsize: Option, - ) -> RsmqResult<()> { - self.functions - .create_queue(&mut self.connection.0, qname, hidden, delay, maxsize) - .await - } - - async fn delete_message(&mut self, qname: &str, id: &str) -> RsmqResult { - self.functions - .delete_message(&mut self.connection.0, qname, id) - .await - } - async fn delete_queue(&mut self, qname: &str) -> RsmqResult<()> { - self.functions - .delete_queue(&mut self.connection.0, qname) - .await - } - async fn get_queue_attributes(&mut self, qname: &str) -> RsmqResult { - self.functions - .get_queue_attributes(&mut self.connection.0, qname) - .await - } - - async fn list_queues(&mut self) -> RsmqResult> { - self.functions.list_queues(&mut self.connection.0).await - } - - async fn pop_message>>( - &mut self, - qname: &str, - ) -> RsmqResult>> { - self.functions - .pop_message::(&mut self.connection.0, qname) - .await - } - - async fn receive_message>>( - &mut self, - qname: &str, - hidden: Option, - ) -> RsmqResult>> { - self.functions - .receive_message::(&mut self.connection.0, qname, hidden) - .await - } - - async fn send_message + Send>( - &mut self, - qname: &str, - message: E, - delay: Option, - ) -> RsmqResult { - self.functions - .send_message(&mut self.connection.0, qname, message, delay) - .await - } - - async fn set_queue_attributes( - &mut self, - qname: &str, - hidden: Option, - delay: Option, - maxsize: Option, - ) -> RsmqResult { - self.functions - .set_queue_attributes(&mut self.connection.0, qname, hidden, delay, maxsize) - .await - } -} diff --git a/src/pooled_facade.rs b/src/pooled_facade.rs index d66cdc6..519a1bb 100644 --- a/src/pooled_facade.rs +++ b/src/pooled_facade.rs @@ -22,14 +22,17 @@ impl RedisConnectionManager { #[async_trait] impl bb8::ManageConnection for RedisConnectionManager { - type Connection = redis::aio::Connection; + type Connection = redis::aio::MultiplexedConnection; type Error = RedisError; async fn connect(&self) -> Result { - self.client.get_async_connection().await + self.client.get_multiplexed_async_connection().await } - async fn is_valid(&self, conn: &mut redis::aio::Connection) -> Result<(), Self::Error> { + async fn is_valid( + &self, + conn: &mut redis::aio::MultiplexedConnection, + ) -> Result<(), Self::Error> { redis::cmd("PING").query_async(conn).await } @@ -46,7 +49,7 @@ pub struct PoolOptions { pub struct PooledRsmq { pool: bb8::Pool, - functions: RsmqFunctions, + functions: RsmqFunctions, } impl Clone for PooledRsmq { diff --git a/src/trait.rs b/src/trait.rs index 6a36de7..737389e 100644 --- a/src/trait.rs +++ b/src/trait.rs @@ -37,8 +37,10 @@ pub trait RsmqConnection { /// /// Important to use when you are using receive_message. async fn delete_message(&mut self, qname: &str, id: &str) -> RsmqResult; + /// Deletes the queue and all the messages on it async fn delete_queue(&mut self, qname: &str) -> RsmqResult<()>; + /// Returns the queue attributes and statistics async fn get_queue_attributes(&mut self, qname: &str) -> RsmqResult; diff --git a/src/types.rs b/src/types.rs index 7b77b77..03aeac6 100644 --- a/src/types.rs +++ b/src/types.rs @@ -85,8 +85,8 @@ pub struct RsmqQueueAttributes { } /// Internal value representing the redis bytes. -/// It implements TryFrom String and Vec -/// and From String, &str, Vec and &[u8] to +/// It implements `TryFrom` `String` and `Vec` +/// and `From String`, `&str`, `Vec` and `&[u8]` to /// itself. /// /// You can add your custom TryFrom and From @@ -104,8 +104,6 @@ pub struct RsmqQueueAttributes { /// type Error = Vec; /// /// fn try_from(bytes: RedisBytes) -> Result { -/// // For the library user, they can just call into_bytes -/// // for getting the original Vec /// String::from_utf8(bytes.0).map_err(|e| e.into_bytes()) /// } /// } @@ -114,7 +112,7 @@ pub struct RsmqQueueAttributes { pub struct RedisBytes(pub(crate) Vec); impl RedisBytes { - /// Consumes the value and returns the raw bytes as Vec + /// Consumes the value and returns the raw bytes as `Vec` pub fn into_bytes(self) -> Vec { self.0 } diff --git a/tests/support/mod.rs b/tests/support/mod.rs index 3a09ea2..8c67a6a 100644 --- a/tests/support/mod.rs +++ b/tests/support/mod.rs @@ -116,7 +116,7 @@ impl TestContext { TestContext { server, client } } - pub async fn async_connection(&self) -> redis::RedisResult { - self.client.get_async_connection().await + pub async fn async_connection(&self) -> redis::RedisResult { + self.client.get_multiplexed_async_connection().await } }