From 8f2fb8e833e1af060e1f2d51d44c95d6c33fa45b Mon Sep 17 00:00:00 2001 From: Matt Hunzinger Date: Tue, 12 Dec 2023 22:37:15 -0500 Subject: [PATCH] Create Channel trait --- Cargo.toml | 4 ++- src/context.rs | 18 ++++++------ src/lib.rs | 15 ++++++++-- src/rt.rs | 67 ++++++++++++++++++++++++++++++++++---------- src/signal_handle.rs | 7 ++--- src/slot_handle.rs | 7 ++--- 6 files changed, 82 insertions(+), 36 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 7ab63060..5c082157 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,8 +7,10 @@ description = "Cross-platform UI framework" repository = "https://github.com/concoct-rs/concoct" [features] +std = [] rt = [] -full = ["rt"] +futures = ["std"] +full = ["rt", "futures"] [dependencies] futures = "0.3.29" diff --git a/src/context.rs b/src/context.rs index fee3cbdb..a2cc55f9 100644 --- a/src/context.rs +++ b/src/context.rs @@ -34,16 +34,15 @@ impl Context { M: 'static, { let key = self.key; - crate::Runtime::current() - .tx - .unbounded_send(crate::rt::RuntimeMessage::Handle { + crate::Runtime::current().inner.borrow_mut().channel.send( + crate::rt::RuntimeMessage::Handle { key, f: Box::new(move |any_task| { let task = any_task.as_any_mut().downcast_mut::().unwrap(); task.handle(Context::new(key), msg); }), - }) - .unwrap(); + }, + ) } pub fn listen(&self, mut f: impl FnMut(&M) + 'static) @@ -93,13 +92,12 @@ impl Context { M: 'static, { let key = self.key; - crate::Runtime::current() - .tx - .unbounded_send(crate::rt::RuntimeMessage::Signal { + crate::Runtime::current().inner.borrow_mut().channel.send( + crate::rt::RuntimeMessage::Signal { key, msg: Box::new(msg), - }) - .unwrap(); + }, + ); } pub fn signal(&self) -> crate::SignalHandle { diff --git a/src/lib.rs b/src/lib.rs index e0eee78e..a0d50057 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -10,6 +10,17 @@ macro_rules! cfg_rt { }; } +#[allow(unused_macros)] +macro_rules! cfg_futures { + ($($i:item)*) => { + $( + #[cfg(feature = "futures")] + #[cfg_attr(docsrs, doc(cfg(feature = "futures")))] + $i + )* + }; +} + mod context; pub use self::context::Context; @@ -20,8 +31,8 @@ cfg_rt!( mod handle; pub use self::handle::Handle; - mod rt; - pub use self::rt::{Runtime, RuntimeGuard}; + pub mod rt; + pub use self::rt::Runtime; mod slot_handle; pub use slot_handle::SlotHandle; diff --git a/src/rt.rs b/src/rt.rs index 1fd0ffb0..ae6e87fe 100644 --- a/src/rt.rs +++ b/src/rt.rs @@ -1,14 +1,14 @@ use crate::{Context, Handle, Object}; -use futures::{channel::mpsc, StreamExt}; +use futures::{channel::mpsc, Future, StreamExt}; use slotmap::{DefaultKey, SlotMap}; use std::{ any::{Any, TypeId}, cell::RefCell, collections::HashMap, - rc::Rc, + rc::Rc, pin::Pin, }; -pub(crate) enum RuntimeMessage { +pub enum RuntimeMessage { Signal { key: DefaultKey, msg: Box, @@ -22,7 +22,7 @@ pub(crate) enum RuntimeMessage { pub(crate) struct Inner { pub(crate) tasks: SlotMap>>, pub(crate) listeners: HashMap<(DefaultKey, TypeId), Vec>>>, - rx: mpsc::UnboundedReceiver, + pub(crate) channel: Box, } thread_local! { @@ -32,24 +32,30 @@ thread_local! { #[derive(Clone)] pub struct Runtime { pub(crate) inner: Rc>, - pub(crate) tx: mpsc::UnboundedSender, } -impl Default for Runtime { - fn default() -> Self { - let (tx, rx) = mpsc::unbounded(); +cfg_futures!( + impl Default for Runtime { + fn default() -> Self { + let (tx, rx) = mpsc::unbounded(); + Self::new(Box::new(Mpsc { + tx,rx + })) + } + } +); + +impl Runtime { + pub fn new(channel: Box) -> Self { Self { inner: Rc::new(RefCell::new(Inner { tasks: SlotMap::new(), listeners: HashMap::new(), - rx, + channel, })), - tx, } } -} -impl Runtime { pub fn current() -> Self { Self::try_current().unwrap() } @@ -87,7 +93,7 @@ impl Runtime { pub async fn run(&self) { let mut me = self.inner.borrow_mut(); - if let Some(msg) = me.rx.next().await { + if let Some(msg) = me.channel.next().await { drop(me); self.run_inner(msg); @@ -98,7 +104,7 @@ impl Runtime { pub fn try_run(&self) { loop { let mut me = self.inner.borrow_mut(); - if let Ok(Some(msg)) = me.rx.try_next() { + if let Some(msg) = me.channel.try_next() { drop(me); self.run_inner(msg); } else { @@ -134,7 +140,7 @@ impl Runtime { } } -pub(crate) trait AnyTask { +pub trait AnyTask { fn as_any(&self) -> &dyn Any; fn as_any_mut(&mut self) -> &mut dyn Any; @@ -165,3 +171,34 @@ impl Drop for RuntimeGuard { CURRENT.try_with(|cell| cell.borrow_mut().take()).unwrap(); } } + +pub trait Channel { + fn send(&mut self, msg: RuntimeMessage); + + fn next(&mut self) -> Pin>+ '_>>; + + fn try_next(&mut self) -> Option; +} + +cfg_futures!( + pub struct Mpsc { + pub tx: mpsc::UnboundedSender, + pub rx: mpsc::UnboundedReceiver, + } + + impl Channel for Mpsc { + fn send(&mut self, msg: RuntimeMessage) { + self.tx.unbounded_send(msg).unwrap(); + } + + fn next(&mut self) -> Pin> + '_>> { + Box::pin(async move { + self.rx.next().await + }) + } + + fn try_next(&mut self) -> Option { + self.rx.try_next().ok().flatten() + } + } +); diff --git a/src/signal_handle.rs b/src/signal_handle.rs index 4c160e73..ff61cb6b 100644 --- a/src/signal_handle.rs +++ b/src/signal_handle.rs @@ -28,12 +28,11 @@ impl SignalHandle { { let key = self.key; Runtime::current() - .tx - .unbounded_send(crate::rt::RuntimeMessage::Signal { + .inner.borrow_mut().channel + .send(crate::rt::RuntimeMessage::Signal { key, msg: Box::new(msg), - }) - .unwrap(); + }); } pub fn listen(&self, mut f: impl FnMut(&M) + 'static) diff --git a/src/slot_handle.rs b/src/slot_handle.rs index e3ec348f..2a92d7f0 100644 --- a/src/slot_handle.rs +++ b/src/slot_handle.rs @@ -26,13 +26,12 @@ impl SlotHandle { let key = self.key; let f = self.f.clone(); Runtime::current() - .tx - .unbounded_send(crate::rt::RuntimeMessage::Handle { + .inner.borrow_mut().channel + .send(crate::rt::RuntimeMessage::Handle { key, f: Box::new(move |any_task| { f.borrow_mut()(any_task, Box::new(msg)); }), - }) - .unwrap(); + }); } }