Skip to content

Commit

Permalink
Fix event_bus::Sender and event_bus::Receiver
Browse files Browse the repository at this point in the history
  • Loading branch information
ivmarkov committed Oct 18, 2023
1 parent fc37536 commit 19a5a71
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 49 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [0.26.1] - 2023-10-18
* Rolled back a change where `event_bus::asynch::Sender` and `event_bus::asynch::Receiver` did no longer implement `ErrorType` and returned a `Result`; since these traits are rarely used, and 0.26.0 was just released, no new major version was released, but instead 0.26.0 was yanked

## [0.26.0] - 2023-10-17
* MSRV raised to 1.71
* Breaking change: All traits converted to AFIT, except `Unblocker`, which needs to compile with stable Rust
Expand Down
27 changes: 12 additions & 15 deletions src/event_bus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,21 +118,19 @@ where
pub mod asynch {
pub use super::{ErrorType, Spin};

pub trait Sender {
pub trait Sender: ErrorType {
type Data: Send;
type Result: Send;

async fn send(&self, value: Self::Data) -> Self::Result;
async fn send(&self, value: Self::Data) -> Result<(), Self::Error>;
}

impl<S> Sender for &mut S
where
S: Sender,
{
type Data = S::Data;
type Result = S::Result;

async fn send(&self, value: Self::Data) -> Self::Result {
async fn send(&self, value: Self::Data) -> Result<(), Self::Error> {
(**self).send(value).await
}
}
Expand All @@ -142,26 +140,25 @@ pub mod asynch {
S: Sender,
{
type Data = S::Data;
type Result = S::Result;

async fn send(&self, value: Self::Data) -> Self::Result {
async fn send(&self, value: Self::Data) -> Result<(), Self::Error> {
(*self).send(value).await
}
}

pub trait Receiver {
type Result: Send;
pub trait Receiver: ErrorType {
type Data: Send;

async fn recv(&self) -> Self::Result;
async fn recv(&self) -> Result<Self::Data, Self::Error>;
}

impl<R> Receiver for &mut R
where
R: Receiver,
{
type Result = R::Result;
type Data = R::Data;

async fn recv(&self) -> Self::Result {
async fn recv(&self) -> Result<Self::Data, Self::Error> {
(**self).recv().await
}
}
Expand All @@ -170,15 +167,15 @@ pub mod asynch {
where
R: Receiver,
{
type Result = R::Result;
type Data = R::Data;

async fn recv(&self) -> Self::Result {
async fn recv(&self) -> Result<Self::Data, Self::Error> {
(*self).recv().await
}
}

pub trait EventBus<P>: ErrorType {
type Subscription<'a>: Receiver<Result = P>
type Subscription<'a>: Receiver<Data = P, Error = Self::Error>
where
Self: 'a;

Expand Down
92 changes: 58 additions & 34 deletions src/utils/asyncify/event_bus.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use core::fmt::Debug;
use core::future::Future;
use core::marker::PhantomData;
use core::pin::Pin;
Expand All @@ -7,6 +8,7 @@ use core::time::Duration;
extern crate alloc;
use alloc::sync::Arc;

use crate::event_bus::ErrorType;
use crate::utils::asyncify::Unblocker;
use crate::utils::mutex::{Condvar, Mutex, RawCondvar};

Expand All @@ -32,22 +34,23 @@ impl<U, P, PB> AsyncPostbox<U, P, PB> {
}

impl<P, PB> AsyncPostbox<(), P, PB> {
pub fn send_blocking(&self, value: P, duration: Option<core::time::Duration>)
pub fn send_blocking(
&self,
value: P,
duration: Option<core::time::Duration>,
) -> Result<(), PB::Error>
where
PB: crate::event_bus::Postbox<P>,
{
self.blocking_postbox
.post(&value, duration)
.map(|_| ())
.unwrap()
self.blocking_postbox.post(&value, duration).map(|_| ())
}
}

impl<U, P, PB> AsyncPostbox<U, P, PB>
where
U: Unblocker,
{
pub async fn send(&self, value: P)
pub async fn send(&self, value: P) -> Result<(), PB::Error>
where
P: Send + 'static,
PB: crate::event_bus::Postbox<P> + Sync,
Expand All @@ -62,7 +65,6 @@ where
.map(|_| ())
})
.await
.unwrap()
}
}

Expand All @@ -87,34 +89,35 @@ pub struct SubscriptionState<P, S> {
}

#[allow(clippy::type_complexity)]
pub struct AsyncSubscription<CV, P, S>(
pub struct AsyncSubscription<CV, P, S, E>(
Arc<(Mutex<CV::RawMutex, SubscriptionState<P, S>>, Condvar<CV>)>,
PhantomData<fn() -> E>,
)
where
CV: RawCondvar,
P: Send,
S: Send;

impl<CV, P, S> AsyncSubscription<CV, P, S>
impl<CV, P, S, E> AsyncSubscription<CV, P, S, E>
where
CV: RawCondvar + Send + Sync,
CV::RawMutex: Send + Sync,
S: Send,
P: Clone + Send,
{
pub async fn recv(&self) -> P {
pub async fn recv(&self) -> Result<P, E> {
NextFuture(self).await
}
}

struct NextFuture<'a, CV, P, S>(&'a AsyncSubscription<CV, P, S>)
struct NextFuture<'a, CV, P, S, E>(&'a AsyncSubscription<CV, P, S, E>)
where
CV: RawCondvar + Send + Sync,
CV::RawMutex: Send + Sync,
P: Clone + Send,
S: Send;

impl<'a, CV, P, S> Drop for NextFuture<'a, CV, P, S>
impl<'a, CV, P, S, E> Drop for NextFuture<'a, CV, P, S, E>
where
CV: RawCondvar + Send + Sync,
CV::RawMutex: Send + Sync,
Expand All @@ -127,14 +130,14 @@ where
}
}

impl<'a, CV, P, S> Future for NextFuture<'a, CV, P, S>
impl<'a, CV, P, S, E> Future for NextFuture<'a, CV, P, S, E>
where
CV: RawCondvar + Send + Sync,
CV::RawMutex: Send + Sync,
P: Clone + Send,
S: Send,
{
type Output = P;
type Output = Result<P, E>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut state = self.0 .0 .0.lock();
Expand All @@ -144,7 +147,7 @@ where
if let Some(value) = value {
self.0 .0 .1.notify_all();

Poll::Ready(value)
Poll::Ready(Ok(value))
} else {
state.waker = Some(cx.waker().clone());

Expand Down Expand Up @@ -176,7 +179,9 @@ where
CV: RawCondvar + Send + Sync + 'static,
CV::RawMutex: Send + Sync + 'static,
{
pub fn subscribe<P>(&self) -> Result<AsyncSubscription<CV, P, E::Subscription<'_>>, E::Error>
pub fn subscribe<P>(
&self,
) -> Result<AsyncSubscription<CV, P, E::Subscription<'_>, E::Error>, E::Error>
where
P: Clone + Send + 'static,
E: crate::event_bus::EventBus<P>,
Expand Down Expand Up @@ -217,7 +222,7 @@ where

state.0.lock().subscription = Some(subscription);

Ok(AsyncSubscription(state))
Ok(AsyncSubscription(state, PhantomData))
}
}

Expand Down Expand Up @@ -274,9 +279,35 @@ impl<CV, E> AsyncWrapper<E> for AsyncEventBus<(), CV, E> {
}
}

impl<U, P, PB> ErrorType for AsyncPostbox<U, P, PB>
where
PB: ErrorType,
{
type Error = PB::Error;
}

impl<U, CV, E> ErrorType for AsyncEventBus<U, CV, E>
where
E: ErrorType,
{
type Error = E::Error;
}

impl<CV, P, S, E> ErrorType for AsyncSubscription<CV, P, S, E>
where
CV: RawCondvar,
P: Send,
S: Send,
E: Debug,
{
type Error = E;
}

#[cfg(feature = "nightly")]
mod async_traits_impl {
use crate::event_bus::asynch::{ErrorType, EventBus, PostboxProvider, Receiver, Sender};
use core::fmt::Debug;

use crate::event_bus::asynch::{EventBus, PostboxProvider, Receiver, Sender};
use crate::utils::asyncify::Unblocker;
use crate::utils::mutex::RawCondvar;

Expand All @@ -287,16 +318,16 @@ mod async_traits_impl {
U: Unblocker,
P: Clone + Send + 'static,
PB: crate::event_bus::Postbox<P> + Clone + Send + Sync,
PB::Error: Send,
{
type Data = P;
type Result = ();

async fn send(&self, value: Self::Data) {
async fn send(&self, value: Self::Data) -> Result<(), Self::Error> {
let value = value;
let blocking_postbox = self.blocking_postbox.clone();

self.unblocker
.unblock(move || blocking_postbox.post(&value, None).map(|_| ()).unwrap())
.unblock(move || blocking_postbox.post(&value, None).map(|_| ()))
.await
}
}
Expand All @@ -307,34 +338,27 @@ mod async_traits_impl {
PB: crate::event_bus::Postbox<P>,
{
type Data = P;
type Result = ();

async fn send(&self, value: Self::Data) {
async fn send(&self, value: Self::Data) -> Result<(), Self::Error> {
AsyncPostbox::send_blocking(self, value, Some(core::time::Duration::MAX))
}
}

impl<CV, P, S> Receiver for AsyncSubscription<CV, P, S>
impl<CV, P, S, E> Receiver for AsyncSubscription<CV, P, S, E>
where
CV: RawCondvar + Send + Sync,
CV::RawMutex: Send + Sync,
S: Send,
P: Clone + Send,
E: Debug,
{
type Result = P;
type Data = P;

async fn recv(&self) -> Self::Result {
async fn recv(&self) -> Result<Self::Data, Self::Error> {
AsyncSubscription::recv(self).await
}
}

impl<U, CV, E> ErrorType for AsyncEventBus<U, CV, E>
where
E: ErrorType,
{
type Error = E::Error;
}

impl<U, CV, P, E> EventBus<P> for AsyncEventBus<U, CV, E>
where
CV: RawCondvar + Send + Sync + 'static,
Expand All @@ -343,7 +367,7 @@ mod async_traits_impl {
E: crate::event_bus::EventBus<P>,
for<'a> E::Subscription<'a>: Send,
{
type Subscription<'a> = AsyncSubscription<CV, P, E::Subscription<'a>> where Self: 'a;
type Subscription<'a> = AsyncSubscription<CV, P, E::Subscription<'a>, E::Error> where Self: 'a;

async fn subscribe(&self) -> Result<Self::Subscription<'_>, Self::Error> {
AsyncEventBus::subscribe(self)
Expand Down

0 comments on commit 19a5a71

Please sign in to comment.