Skip to content

Commit

Permalink
add sync facade
Browse files Browse the repository at this point in the history
  • Loading branch information
DavidBM committed May 30, 2024
1 parent 0f47685 commit 0083217
Show file tree
Hide file tree
Showing 5 changed files with 197 additions and 2 deletions.
6 changes: 4 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "rsmq_async"
version = "11.0.0"
version = "11.1.0"
authors = [
"David Bonet <[email protected]>"
]
Expand All @@ -20,12 +20,14 @@ bb8 = "^0.8"
thiserror = "^1"
redis = { version = "^0.25", default-features = false, features = ["acl", "keep-alive", "script"] }
async-trait = "^0.1"
tokio = { version = "^1", optional = true}

[dev-dependencies]
net2 = "^0.2"
tokio = { version = "^1", features = ["rt-multi-thread"]}

[features]
default = ["tokio-comp"]
default = ["tokio-comp", "sync"]
sync = ["tokio"]
tokio-comp = ["redis/tokio-comp"]
async-std-comp = ["redis/async-std-comp"]
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,11 @@ and issue a `receiveMessage` then. However make sure not to listen with multiple
workers for new messages with SUBSCRIBE to prevent multiple simultaneous
`receiveMessage` calls.

## Sync option

If you enable the `sync` feature, you can import a `RsmqSync` object with sync
versions of the methods.

## Guarantees

If you want to implement "at least one delivery" guarantee, you need to receive
Expand Down
17 changes: 17 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,21 @@ pub enum RsmqError {
CannotParseMaxsize,
#[error("The message received from Redis cannot be decoded into the expected type. Try to use Vec<u8> instead.")]
CannotDecodeMessage(Vec<u8>),
#[error("Cannot start tokio runtime for sync facade")]
TokioStart(Different<std::io::Error>),
}

#[derive(Debug)]
pub struct Different<T>(pub T);

impl<T> PartialEq for Different<T> {
fn eq(&self, _other: &Self) -> bool {
false
}
}

impl<T> From<T> for Different<T> {
fn from(value: T) -> Self {
Different(value)
}
}
4 changes: 4 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,8 @@ mod error;
mod functions;
mod multiplexed_facade;
mod pooled_facade;
#[cfg(feature = "sync")]
mod sync_facade;
mod r#trait;
mod types;

Expand All @@ -156,6 +158,8 @@ pub use error::RsmqResult;
pub use multiplexed_facade::Rsmq;
pub use pooled_facade::{PoolOptions, PooledRsmq, RedisConnectionManager};
pub use r#trait::RsmqConnection;
#[cfg(feature = "sync")]
pub use sync_facade::RsmqSync;
pub use types::RedisBytes;
pub use types::RsmqMessage;
pub use types::RsmqOptions;
Expand Down
167 changes: 167 additions & 0 deletions src/sync_facade.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
use tokio::runtime::Runtime;

use crate::functions::RsmqFunctions;
use crate::r#trait::RsmqConnection;
use crate::types::{RedisBytes, RsmqMessage, RsmqOptions, RsmqQueueAttributes};
use crate::{RsmqError, RsmqResult};
use core::convert::TryFrom;
use core::marker::PhantomData;
use std::sync::Arc;
use std::time::Duration;

#[derive(Clone)]
struct RedisConnection(redis::aio::MultiplexedConnection);

impl std::fmt::Debug for RedisConnection {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "MultiplexedRedisAsyncConnnection")
}
}

#[derive(Debug, Clone)]
pub struct RsmqSync {
connection: RedisConnection,
functions: RsmqFunctions<redis::aio::MultiplexedConnection>,
runner: Arc<Runtime>,
}

impl RsmqSync {
/// Creates a new RSMQ instance, including its connection
pub async fn new(options: RsmqOptions) -> RsmqResult<RsmqSync> {
let runner = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.map_err(|e| RsmqError::TokioStart(e.into()))?;

let conn_info = redis::ConnectionInfo {
addr: redis::ConnectionAddr::Tcp(options.host, options.port),
redis: redis::RedisConnectionInfo {
db: options.db.into(),
username: options.username,
password: options.password,
},
};

let client = redis::Client::open(conn_info)?;

let connection =
runner.block_on(async move { client.get_multiplexed_async_connection().await })?;

Ok(RsmqSync {
connection: RedisConnection(connection),
functions: RsmqFunctions {
ns: options.ns,
realtime: options.realtime,
conn: PhantomData,
},
runner: Arc::new(runner),
})
}
}

#[async_trait::async_trait]
impl RsmqConnection for RsmqSync {
async fn change_message_visibility(
&mut self,
qname: &str,
message_id: &str,
hidden: Duration,
) -> RsmqResult<()> {
self.runner.block_on(async {
self.functions
.change_message_visibility(&mut self.connection.0, qname, message_id, hidden)
.await
})
}

async fn create_queue(
&mut self,
qname: &str,
hidden: Option<Duration>,
delay: Option<Duration>,
maxsize: Option<i32>,
) -> RsmqResult<()> {
self.runner.block_on(async {
self.functions
.create_queue(&mut self.connection.0, qname, hidden, delay, maxsize)
.await
})
}

async fn delete_message(&mut self, qname: &str, id: &str) -> RsmqResult<bool> {
self.runner.block_on(async {
self.functions
.delete_message(&mut self.connection.0, qname, id)
.await
})
}
async fn delete_queue(&mut self, qname: &str) -> RsmqResult<()> {
self.runner.block_on(async {
self.functions
.delete_queue(&mut self.connection.0, qname)
.await
})
}
async fn get_queue_attributes(&mut self, qname: &str) -> RsmqResult<RsmqQueueAttributes> {
self.runner.block_on(async {
self.functions
.get_queue_attributes(&mut self.connection.0, qname)
.await
})
}

async fn list_queues(&mut self) -> RsmqResult<Vec<String>> {
self.runner
.block_on(async { self.functions.list_queues(&mut self.connection.0).await })
}

async fn pop_message<E: TryFrom<RedisBytes, Error = Vec<u8>>>(
&mut self,
qname: &str,
) -> RsmqResult<Option<RsmqMessage<E>>> {
self.runner.block_on(async {
self.functions
.pop_message::<E>(&mut self.connection.0, qname)
.await
})
}

async fn receive_message<E: TryFrom<RedisBytes, Error = Vec<u8>>>(
&mut self,
qname: &str,
hidden: Option<Duration>,
) -> RsmqResult<Option<RsmqMessage<E>>> {
self.runner.block_on(async {
self.functions
.receive_message::<E>(&mut self.connection.0, qname, hidden)
.await
})
}

async fn send_message<E: Into<RedisBytes> + Send>(
&mut self,
qname: &str,
message: E,
delay: Option<Duration>,
) -> RsmqResult<String> {
self.runner.block_on(async {
self.functions
.send_message(&mut self.connection.0, qname, message, delay)
.await
})
}

async fn set_queue_attributes(
&mut self,
qname: &str,
hidden: Option<Duration>,
delay: Option<Duration>,
maxsize: Option<i64>,
) -> RsmqResult<RsmqQueueAttributes> {
self.runner.block_on(async {
self.functions
.set_queue_attributes(&mut self.connection.0, qname, hidden, delay, maxsize)
.await
})
}
}

0 comments on commit 0083217

Please sign in to comment.