Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a dispatcher that can be sent between threads #234

Merged
merged 3 commits into from
Jan 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -73,4 +73,6 @@ jobs:
# Miri currently reports leaks in some tests so we disable that check
# here (might be due to ptr-int-ptr in crossbeam-epoch so might be
# resolved in future versions of that crate).
run: MIRIFLAGS="-Zmiri-ignore-leaks" cargo miri test
#
# crossbeam-epoch doesn't pass with stacked borrows https://github.com/crossbeam-rs/crossbeam/issues/545
run: MIRIFLAGS="-Zmiri-ignore-leaks -Zmiri-tree-borrows" cargo miri test
72 changes: 33 additions & 39 deletions src/dispatch/dispatcher.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
use smallvec::SmallVec;

use crate::{dispatch::stage::Stage, system::RunNow, world::World};
use crate::{
dispatch::{stage::Stage, SendDispatcher},
system::RunNow,
world::World,
};

/// This wrapper is used to share a replaceable ThreadPool with other
/// dispatchers. Useful with batch dispatchers.
Expand All @@ -10,19 +14,15 @@ pub type ThreadPoolWrapper = Option<::std::sync::Arc<::rayon::ThreadPool>>;
/// The dispatcher struct, allowing
/// systems to be executed in parallel.
pub struct Dispatcher<'a, 'b> {
stages: Vec<Stage<'a>>,
inner: SendDispatcher<'a>,
thread_local: ThreadLocal<'b>,
#[cfg(feature = "parallel")]
thread_pool: ::std::sync::Arc<::std::sync::RwLock<ThreadPoolWrapper>>,
}

impl<'a, 'b> Dispatcher<'a, 'b> {
/// Sets up all the systems which means they are gonna add default values
/// for the resources they need.
pub fn setup(&mut self, world: &mut World) {
for stage in &mut self.stages {
stage.setup(world);
}
self.inner.setup(world);

for sys in &mut self.thread_local {
sys.setup(world);
Expand All @@ -34,9 +34,7 @@ impl<'a, 'b> Dispatcher<'a, 'b> {
/// / or resources from the `World` which are associated with external
/// resources.
pub fn dispose(self, world: &mut World) {
for stage in self.stages {
stage.dispose(world);
}
self.inner.dispose(world);

for sys in self.thread_local {
sys.dispose(world);
Expand All @@ -56,12 +54,7 @@ impl<'a, 'b> Dispatcher<'a, 'b> {
/// Please note that this method assumes that no resource
/// is currently borrowed. If that's the case, it panics.
pub fn dispatch(&mut self, world: &World) {
#[cfg(feature = "parallel")]
self.dispatch_par(world);

#[cfg(not(feature = "parallel"))]
self.dispatch_seq(world);

self.inner.dispatch(world);
self.dispatch_thread_local(world);
}

Expand All @@ -77,18 +70,7 @@ impl<'a, 'b> Dispatcher<'a, 'b> {
/// is currently borrowed. If that's the case, it panics.
#[cfg(feature = "parallel")]
pub fn dispatch_par(&mut self, world: &World) {
let stages = &mut self.stages;

self.thread_pool
.read()
.unwrap()
.as_ref()
.unwrap()
.install(move || {
for stage in stages {
stage.execute(world);
}
});
self.inner.dispatch_par(world);
}

/// Dispatches the systems (except thread local systems) sequentially.
Expand All @@ -99,9 +81,7 @@ impl<'a, 'b> Dispatcher<'a, 'b> {
/// Please note that this method assumes that no resource
/// is currently borrowed. If that's the case, it panics.
pub fn dispatch_seq(&mut self, world: &World) {
for stage in &mut self.stages {
stage.execute_seq(world);
}
self.inner.dispatch_seq(world);
}

/// Dispatch only thread local systems sequentially.
Expand All @@ -114,16 +94,28 @@ impl<'a, 'b> Dispatcher<'a, 'b> {
}
}

/// Converts this to a [`SendDispatcher`].
///
/// Fails and returns the original distpatcher if it contains thread local systems.
pub fn try_into_sendable(self) -> Result<SendDispatcher<'a>, Self> {
let Dispatcher {
inner: _,
thread_local,
} = &self;

if thread_local.is_empty() {
Ok(self.inner)
} else {
Err(self)
}
}

/// This method returns the largest amount of threads this dispatcher
/// can make use of. This is mainly for debugging purposes so you can see
/// how well your systems can make use of multi-threading.
#[cfg(feature = "parallel")]
pub fn max_threads(&self) -> usize {
self.stages
.iter()
.map(Stage::max_threads)
.max()
.unwrap_or(0)
self.inner.max_threads()
}
}

Expand Down Expand Up @@ -154,9 +146,11 @@ pub fn new_dispatcher<'a, 'b>(
thread_pool: ::std::sync::Arc<::std::sync::RwLock<ThreadPoolWrapper>>,
) -> Dispatcher<'a, 'b> {
Dispatcher {
stages,
inner: SendDispatcher {
stages,
thread_pool,
},
thread_local,
thread_pool,
}
}

Expand All @@ -166,7 +160,7 @@ pub fn new_dispatcher<'a, 'b>(
thread_local: ThreadLocal<'b>,
) -> Dispatcher<'a, 'b> {
Dispatcher {
stages,
inner: SendDispatcher { stages },
thread_local,
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/dispatch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ pub use self::{
},
builder::DispatcherBuilder,
dispatcher::Dispatcher,
send_dispatcher::SendDispatcher,
};

#[cfg(feature = "parallel")]
Expand All @@ -18,5 +19,6 @@ mod builder;
mod dispatcher;
#[cfg(feature = "parallel")]
mod par_seq;
mod send_dispatcher;
mod stage;
mod util;
128 changes: 128 additions & 0 deletions src/dispatch/send_dispatcher.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
#[cfg(feature = "parallel")]
use crate::dispatch::dispatcher::ThreadPoolWrapper;
use crate::{dispatch::stage::Stage, system::RunNow, world::World};

/// `Send`able version of [`Dispatcher`](crate::dispatch::Dispatcher).
///
/// Can't hold thread local systems.
///
/// Create using [`Dispatcher::try_into_sendable`](crate::dispatch::Dispatcher::try_into_sendable).
pub struct SendDispatcher<'a> {
pub(super) stages: Vec<Stage<'a>>,
#[cfg(feature = "parallel")]
pub(super) thread_pool: ::std::sync::Arc<::std::sync::RwLock<ThreadPoolWrapper>>,
}

impl<'a> SendDispatcher<'a> {
/// Sets up all the systems which means they are gonna add default values
/// for the resources they need.
pub fn setup(&mut self, world: &mut World) {
for stage in &mut self.stages {
stage.setup(world);
}
}

/// Calls the `dispose` method of all systems and allows them to release
/// external resources. It is common this method removes components and
/// / or resources from the `World` which are associated with external
/// resources.
pub fn dispose(self, world: &mut World) {
for stage in self.stages {
stage.dispose(world);
}
}

/// Dispatch all the systems with given resources and context
/// and then run thread local systems.
///
/// This function automatically redirects to
///
/// * [SendDispatcher::dispatch_par] in case it is supported
/// * [SendDispatcher::dispatch_seq] otherwise
///
/// and runs `dispatch_thread_local` afterwards.
///
/// Please note that this method assumes that no resource
/// is currently borrowed. If that's the case, it panics.
pub fn dispatch(&mut self, world: &World) {
#[cfg(feature = "parallel")]
self.dispatch_par(world);

#[cfg(not(feature = "parallel"))]
self.dispatch_seq(world);
}

/// Dispatches the systems (except thread local systems)
/// in parallel given the resources to operate on.
///
/// This operation blocks the
/// executing thread.
///
/// Only available with "parallel" feature enabled.
///
/// Please note that this method assumes that no resource
/// is currently borrowed. If that's the case, it panics.
#[cfg(feature = "parallel")]
pub fn dispatch_par(&mut self, world: &World) {
let stages = &mut self.stages;

self.thread_pool
.read()
.unwrap()
.as_ref()
.unwrap()
.install(move || {
for stage in stages {
stage.execute(world);
}
});
}

/// Dispatches the systems (except thread local systems) sequentially.
///
/// This is useful if parallel overhead is
/// too big or the platform does not support multithreading.
///
/// Please note that this method assumes that no resource
/// is currently borrowed. If that's the case, it panics.
pub fn dispatch_seq(&mut self, world: &World) {
for stage in &mut self.stages {
stage.execute_seq(world);
}
}

/// This method returns the largest amount of threads this dispatcher
/// can make use of. This is mainly for debugging purposes so you can see
/// how well your systems can make use of multi-threading.
#[cfg(feature = "parallel")]
pub fn max_threads(&self) -> usize {
self.stages
.iter()
.map(Stage::max_threads)
.max()
.unwrap_or(0)
}
}

impl<'a, 'b> RunNow<'a> for SendDispatcher<'b> {
fn run_now(&mut self, world: &World) {
self.dispatch(world);
}

fn setup(&mut self, world: &mut World) {
self.setup(world);
}

fn dispose(self: Box<Self>, world: &mut World) {
(*self).dispose(world);
}
}

#[cfg(test)]
mod tests {
#[test]
fn send_dispatcher_is_send() {
fn is_send<T: Send>() {}
is_send::<super::SendDispatcher>();
}
}
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ pub use crate::dispatch::{Par, ParSeq, RunWithPool, Seq};
pub use crate::{
dispatch::{
BatchAccessor, BatchController, BatchUncheckedWorld, Dispatcher, DispatcherBuilder,
MultiDispatchController, MultiDispatcher,
MultiDispatchController, MultiDispatcher, SendDispatcher,
},
meta::{CastFrom, MetaIter, MetaIterMut, MetaTable},
system::{
Expand Down
2 changes: 1 addition & 1 deletion src/world/res_downcast/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//! Code is based on https://github.com/chris-morgan/mopa
//! Code is based on <https://github.com/chris-morgan/mopa>
//! with the macro inlined for `Resource`. License files can be found in the
//! directory of this source file, see COPYRIGHT, LICENSE-APACHE and
//! LICENSE-MIT.
Expand Down
Loading