diff --git a/Cargo.toml b/Cargo.toml index 6598dac..d92ffb5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "rsmq_async" -version = "11.0.0" +version = "11.1.0" authors = [ "David Bonet " ] @@ -20,12 +20,14 @@ bb8 = "^0.8" thiserror = "^1" redis = { version = "^0.25", default-features = false, features = ["acl", "keep-alive", "script"] } async-trait = "^0.1" +tokio = { version = "^1", optional = true} [dev-dependencies] net2 = "^0.2" tokio = { version = "^1", features = ["rt-multi-thread"]} [features] -default = ["tokio-comp"] +default = ["tokio-comp", "sync"] +sync = ["tokio"] tokio-comp = ["redis/tokio-comp"] async-std-comp = ["redis/async-std-comp"] diff --git a/README.md b/README.md index c821169..1864efe 100644 --- a/README.md +++ b/README.md @@ -87,6 +87,11 @@ and issue a `receiveMessage` then. However make sure not to listen with multiple workers for new messages with SUBSCRIBE to prevent multiple simultaneous `receiveMessage` calls. +## Sync option + +If you enable the `sync` feature, you can import a `RsmqSync` object with sync +versions of the methods. + ## Guarantees If you want to implement "at least one delivery" guarantee, you need to receive diff --git a/src/error.rs b/src/error.rs index 7f1dc15..f5f0847 100644 --- a/src/error.rs +++ b/src/error.rs @@ -41,4 +41,21 @@ pub enum RsmqError { CannotParseMaxsize, #[error("The message received from Redis cannot be decoded into the expected type. Try to use Vec instead.")] CannotDecodeMessage(Vec), + #[error("Cannot start tokio runtime for sync facade")] + TokioStart(Different), +} + +#[derive(Debug)] +pub struct Different(pub T); + +impl PartialEq for Different { + fn eq(&self, _other: &Self) -> bool { + false + } +} + +impl From for Different { + fn from(value: T) -> Self { + Different(value) + } } diff --git a/src/lib.rs b/src/lib.rs index 5ad83f9..480bd8b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -148,6 +148,8 @@ mod error; mod functions; mod multiplexed_facade; mod pooled_facade; +#[cfg(feature = "sync")] +mod sync_facade; mod r#trait; mod types; @@ -156,6 +158,8 @@ pub use error::RsmqResult; pub use multiplexed_facade::Rsmq; pub use pooled_facade::{PoolOptions, PooledRsmq, RedisConnectionManager}; pub use r#trait::RsmqConnection; +#[cfg(feature = "sync")] +pub use sync_facade::RsmqSync; pub use types::RedisBytes; pub use types::RsmqMessage; pub use types::RsmqOptions; diff --git a/src/sync_facade.rs b/src/sync_facade.rs new file mode 100644 index 0000000..19c1cff --- /dev/null +++ b/src/sync_facade.rs @@ -0,0 +1,167 @@ +use tokio::runtime::Runtime; + +use crate::functions::RsmqFunctions; +use crate::r#trait::RsmqConnection; +use crate::types::{RedisBytes, RsmqMessage, RsmqOptions, RsmqQueueAttributes}; +use crate::{RsmqError, RsmqResult}; +use core::convert::TryFrom; +use core::marker::PhantomData; +use std::sync::Arc; +use std::time::Duration; + +#[derive(Clone)] +struct RedisConnection(redis::aio::MultiplexedConnection); + +impl std::fmt::Debug for RedisConnection { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "MultiplexedRedisAsyncConnnection") + } +} + +#[derive(Debug, Clone)] +pub struct RsmqSync { + connection: RedisConnection, + functions: RsmqFunctions, + runner: Arc, +} + +impl RsmqSync { + /// Creates a new RSMQ instance, including its connection + pub async fn new(options: RsmqOptions) -> RsmqResult { + let runner = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .map_err(|e| RsmqError::TokioStart(e.into()))?; + + let conn_info = redis::ConnectionInfo { + addr: redis::ConnectionAddr::Tcp(options.host, options.port), + redis: redis::RedisConnectionInfo { + db: options.db.into(), + username: options.username, + password: options.password, + }, + }; + + let client = redis::Client::open(conn_info)?; + + let connection = + runner.block_on(async move { client.get_multiplexed_async_connection().await })?; + + Ok(RsmqSync { + connection: RedisConnection(connection), + functions: RsmqFunctions { + ns: options.ns, + realtime: options.realtime, + conn: PhantomData, + }, + runner: Arc::new(runner), + }) + } +} + +#[async_trait::async_trait] +impl RsmqConnection for RsmqSync { + async fn change_message_visibility( + &mut self, + qname: &str, + message_id: &str, + hidden: Duration, + ) -> RsmqResult<()> { + self.runner.block_on(async { + self.functions + .change_message_visibility(&mut self.connection.0, qname, message_id, hidden) + .await + }) + } + + async fn create_queue( + &mut self, + qname: &str, + hidden: Option, + delay: Option, + maxsize: Option, + ) -> RsmqResult<()> { + self.runner.block_on(async { + self.functions + .create_queue(&mut self.connection.0, qname, hidden, delay, maxsize) + .await + }) + } + + async fn delete_message(&mut self, qname: &str, id: &str) -> RsmqResult { + self.runner.block_on(async { + self.functions + .delete_message(&mut self.connection.0, qname, id) + .await + }) + } + async fn delete_queue(&mut self, qname: &str) -> RsmqResult<()> { + self.runner.block_on(async { + self.functions + .delete_queue(&mut self.connection.0, qname) + .await + }) + } + async fn get_queue_attributes(&mut self, qname: &str) -> RsmqResult { + self.runner.block_on(async { + self.functions + .get_queue_attributes(&mut self.connection.0, qname) + .await + }) + } + + async fn list_queues(&mut self) -> RsmqResult> { + self.runner + .block_on(async { self.functions.list_queues(&mut self.connection.0).await }) + } + + async fn pop_message>>( + &mut self, + qname: &str, + ) -> RsmqResult>> { + self.runner.block_on(async { + self.functions + .pop_message::(&mut self.connection.0, qname) + .await + }) + } + + async fn receive_message>>( + &mut self, + qname: &str, + hidden: Option, + ) -> RsmqResult>> { + self.runner.block_on(async { + self.functions + .receive_message::(&mut self.connection.0, qname, hidden) + .await + }) + } + + async fn send_message + Send>( + &mut self, + qname: &str, + message: E, + delay: Option, + ) -> RsmqResult { + self.runner.block_on(async { + self.functions + .send_message(&mut self.connection.0, qname, message, delay) + .await + }) + } + + async fn set_queue_attributes( + &mut self, + qname: &str, + hidden: Option, + delay: Option, + maxsize: Option, + ) -> RsmqResult { + self.runner.block_on(async { + self.functions + .set_queue_attributes(&mut self.connection.0, qname, hidden, delay, maxsize) + .await + }) + } +}