Skip to content

Commit

Permalink
Create Channel trait
Browse files Browse the repository at this point in the history
  • Loading branch information
matthunz committed Dec 13, 2023
1 parent 884cca8 commit 8f2fb8e
Show file tree
Hide file tree
Showing 6 changed files with 82 additions and 36 deletions.
4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ description = "Cross-platform UI framework"
repository = "https://github.com/concoct-rs/concoct"

[features]
std = []
rt = []
full = ["rt"]
futures = ["std"]
full = ["rt", "futures"]

[dependencies]
futures = "0.3.29"
Expand Down
18 changes: 8 additions & 10 deletions src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,15 @@ impl<T> Context<T> {
M: 'static,
{
let key = self.key;
crate::Runtime::current()
.tx
.unbounded_send(crate::rt::RuntimeMessage::Handle {
crate::Runtime::current().inner.borrow_mut().channel.send(
crate::rt::RuntimeMessage::Handle {
key,
f: Box::new(move |any_task| {
let task = any_task.as_any_mut().downcast_mut::<T>().unwrap();
task.handle(Context::new(key), msg);
}),
})
.unwrap();
},
)
}

pub fn listen<M>(&self, mut f: impl FnMut(&M) + 'static)
Expand Down Expand Up @@ -93,13 +92,12 @@ impl<T> Context<T> {
M: 'static,
{
let key = self.key;
crate::Runtime::current()
.tx
.unbounded_send(crate::rt::RuntimeMessage::Signal {
crate::Runtime::current().inner.borrow_mut().channel.send(
crate::rt::RuntimeMessage::Signal {
key,
msg: Box::new(msg),
})
.unwrap();
},
);
}

pub fn signal<M>(&self) -> crate::SignalHandle<M> {
Expand Down
15 changes: 13 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,17 @@ macro_rules! cfg_rt {
};
}

#[allow(unused_macros)]
macro_rules! cfg_futures {
($($i:item)*) => {
$(
#[cfg(feature = "futures")]
#[cfg_attr(docsrs, doc(cfg(feature = "futures")))]
$i
)*
};
}

mod context;
pub use self::context::Context;

Expand All @@ -20,8 +31,8 @@ cfg_rt!(
mod handle;
pub use self::handle::Handle;

mod rt;
pub use self::rt::{Runtime, RuntimeGuard};
pub mod rt;
pub use self::rt::Runtime;

mod slot_handle;
pub use slot_handle::SlotHandle;
Expand Down
67 changes: 52 additions & 15 deletions src/rt.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
use crate::{Context, Handle, Object};
use futures::{channel::mpsc, StreamExt};
use futures::{channel::mpsc, Future, StreamExt};
use slotmap::{DefaultKey, SlotMap};
use std::{
any::{Any, TypeId},
cell::RefCell,
collections::HashMap,
rc::Rc,
rc::Rc, pin::Pin,
};

pub(crate) enum RuntimeMessage {
pub enum RuntimeMessage {
Signal {
key: DefaultKey,
msg: Box<dyn Any>,
Expand All @@ -22,7 +22,7 @@ pub(crate) enum RuntimeMessage {
pub(crate) struct Inner {
pub(crate) tasks: SlotMap<DefaultKey, Rc<RefCell<dyn AnyTask>>>,
pub(crate) listeners: HashMap<(DefaultKey, TypeId), Vec<Rc<RefCell<dyn FnMut(&dyn Any)>>>>,
rx: mpsc::UnboundedReceiver<RuntimeMessage>,
pub(crate) channel: Box<dyn Channel>,
}

thread_local! {
Expand All @@ -32,24 +32,30 @@ thread_local! {
#[derive(Clone)]
pub struct Runtime {
pub(crate) inner: Rc<RefCell<Inner>>,
pub(crate) tx: mpsc::UnboundedSender<RuntimeMessage>,
}

impl Default for Runtime {
fn default() -> Self {
let (tx, rx) = mpsc::unbounded();
cfg_futures!(
impl Default for Runtime {
fn default() -> Self {
let (tx, rx) = mpsc::unbounded();
Self::new(Box::new(Mpsc {
tx,rx
}))
}
}
);

impl Runtime {
pub fn new(channel: Box<dyn Channel>) -> Self {
Self {
inner: Rc::new(RefCell::new(Inner {
tasks: SlotMap::new(),
listeners: HashMap::new(),
rx,
channel,
})),
tx,
}
}
}

impl Runtime {
pub fn current() -> Self {
Self::try_current().unwrap()
}
Expand Down Expand Up @@ -87,7 +93,7 @@ impl Runtime {

pub async fn run(&self) {
let mut me = self.inner.borrow_mut();
if let Some(msg) = me.rx.next().await {
if let Some(msg) = me.channel.next().await {
drop(me);
self.run_inner(msg);

Expand All @@ -98,7 +104,7 @@ impl Runtime {
pub fn try_run(&self) {
loop {
let mut me = self.inner.borrow_mut();
if let Ok(Some(msg)) = me.rx.try_next() {
if let Some(msg) = me.channel.try_next() {
drop(me);
self.run_inner(msg);
} else {
Expand Down Expand Up @@ -134,7 +140,7 @@ impl Runtime {
}
}

pub(crate) trait AnyTask {
pub trait AnyTask {
fn as_any(&self) -> &dyn Any;

fn as_any_mut(&mut self) -> &mut dyn Any;
Expand Down Expand Up @@ -165,3 +171,34 @@ impl Drop for RuntimeGuard {
CURRENT.try_with(|cell| cell.borrow_mut().take()).unwrap();
}
}

pub trait Channel {
fn send(&mut self, msg: RuntimeMessage);

fn next(&mut self) -> Pin<Box<dyn Future<Output = Option<RuntimeMessage>>+ '_>>;

fn try_next(&mut self) -> Option<RuntimeMessage>;
}

cfg_futures!(
pub struct Mpsc {
pub tx: mpsc::UnboundedSender<RuntimeMessage>,
pub rx: mpsc::UnboundedReceiver<RuntimeMessage>,
}

impl Channel for Mpsc {
fn send(&mut self, msg: RuntimeMessage) {
self.tx.unbounded_send(msg).unwrap();
}

fn next(&mut self) -> Pin<Box<dyn Future<Output = Option<RuntimeMessage>> + '_>> {
Box::pin(async move {
self.rx.next().await
})
}

fn try_next(&mut self) -> Option<RuntimeMessage> {
self.rx.try_next().ok().flatten()
}
}
);
7 changes: 3 additions & 4 deletions src/signal_handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,11 @@ impl<M> SignalHandle<M> {
{
let key = self.key;
Runtime::current()
.tx
.unbounded_send(crate::rt::RuntimeMessage::Signal {
.inner.borrow_mut().channel
.send(crate::rt::RuntimeMessage::Signal {
key,
msg: Box::new(msg),
})
.unwrap();
});
}

pub fn listen(&self, mut f: impl FnMut(&M) + 'static)
Expand Down
7 changes: 3 additions & 4 deletions src/slot_handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,12 @@ impl<M> SlotHandle<M> {
let key = self.key;
let f = self.f.clone();
Runtime::current()
.tx
.unbounded_send(crate::rt::RuntimeMessage::Handle {
.inner.borrow_mut().channel
.send(crate::rt::RuntimeMessage::Handle {
key,
f: Box::new(move |any_task| {
f.borrow_mut()(any_task, Box::new(msg));
}),
})
.unwrap();
});
}
}

0 comments on commit 8f2fb8e

Please sign in to comment.