Skip to content

Commit

Permalink
Add a way to run without the async-process thread
Browse files Browse the repository at this point in the history
I know I said that I wouldn't add any more features, but I
think this is important enough.

Right now, a thread called "async-process" is responsible for listening
for SIGCHLD and reaping zombie processes. This listens for the SIGCHLD
signal in Unix and uses a channel connected to the waitable handle on
Windows. While this works, we can do better. Through async-signal, the
signal was already asynchronous on Unix; we were already just using
async_io::block_on to wait on the signal. After swapping out the channel
used on Windows with async-channel, the process reaping function "reap"
can be reimplemented as a fully asynchronous future.

From here we must make sure this future is being polled at all times. To
facilitate this, a function named "driver()" is added to the public API.
This future acquires a lock on the reaper structure and calls the
"reap()" future indefinitely. Multiple drivers can be created at once;
they will just wait forever on this lock. This future is intended to be
spawned onto an executor and left to run forever, making sure all child
processes are signalled whenever necessary. If no tasks are running the
driver future, the "async-process" thread is spawned and runs the
"reap()" future itself.

I've added the following controls to make sure that this system is
robust:

- If a "driver" task is dropped, another "driver" task will acquire the
  lock and keep the reaper active.
- Before being dropped, the task checks to see if it is the last driver.
  If it is, it will spawn the "async-process" thread to be the driver.
- When a Child is being created, it checks if there are any active
  drivers. If there are none, it spawns the "async-process" thread
  itself.
- One concern is that the driver future wil try to spawn the
  "async-process" thread as the application exits and the task is being
  dropped, which will be unnecessary and lead to slower shutdowns. To
  prevent this, the future checks to see if there are any extant `Child`
  instances (a new refcount is added to Reaper to facilitate this). If
  there are none, and if there are no zombie processes, it does not
  spawn the additional thread.
- Someone can still `mem::forget()` the driver thread. This does not
  lead to undefined behavior and just leads to processes being left
  dangling. At this point they're asking for wacky behavior.

This strategy might also be viable for `async-io`, if we want to try to
avoid needing to spawn the additional thread there as well.

Closes #7
cc smol-rs/async-io#40

Signed-off-by: John Nunley <[email protected]>
  • Loading branch information
notgull committed Sep 23, 2023
1 parent 52a693e commit 381e7f5
Show file tree
Hide file tree
Showing 3 changed files with 196 additions and 30 deletions.
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ async-signal = "0.2.0"
rustix = { version = "0.38", default-features = false, features = ["std", "fs"] }

[target.'cfg(windows)'.dependencies]
async-channel = "1.9.0"
blocking = "1.0.0"

[target.'cfg(windows)'.dependencies.windows-sys]
Expand All @@ -37,4 +38,5 @@ features = [
]

[dev-dependencies]
async-executor = "1.5.1"
async-io = "1.8"
205 changes: 175 additions & 30 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,14 @@
html_logo_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
)]

use std::convert::Infallible;
use std::ffi::OsStr;
use std::fmt;
use std::mem;
use std::path::Path;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex, Once};
use std::task::{Context, Poll};
use std::thread;

Expand All @@ -74,7 +77,7 @@ use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, OwnedFd, RawFd};
#[cfg(windows)]
use blocking::Unblock;

use async_lock::OnceCell;
use async_lock::{Mutex as AsyncMutex, OnceCell};
use event_listener::{Event, EventListener};
use futures_lite::{future, io, prelude::*};

Expand Down Expand Up @@ -102,60 +105,120 @@ struct Reaper {

/// The pipe that delivers signal notifications.
pipe: Pipe,

/// Locking this mutex indicates that we are polling the SIGCHLD event.
driver_guard: AsyncMutex<()>,

/// The number of tasks polling the SIGCHLD event.
///
/// If this is zero, the `async-process` thread must be spawned.
drivers: AtomicUsize,

/// Ensures the `async-process` thread is only ever spawned once.
async_process_thread: Once,

/// Number of live `Child` instances currently running.
///
/// This is used to prevent the reaper thread from being spawned right as the program closes,
/// when the reaper thread isn't needed. This represents the number of active processes.
child_count: AtomicUsize,
}

impl Reaper {
/// Get the singleton instance of the reaper.
fn get() -> &'static Self {
static REAPER: OnceCell<Reaper> = OnceCell::new();

REAPER.get_or_init_blocking(|| {
REAPER.get_or_init_blocking(|| Reaper {
sigchld: Event::new(),
zombies: Mutex::new(Vec::new()),
pipe: Pipe::new().expect("cannot create SIGCHLD pipe"),
driver_guard: AsyncMutex::new(()),
drivers: AtomicUsize::new(0),
async_process_thread: Once::new(),
child_count: AtomicUsize::new(0),
})
}

/// Ensure that the reaper is driven.
///
/// If there are no active `driver()` callers, this will spawn the `async-process` thread.
#[inline]
fn ensure_driven(&'static self) {
if self.drivers.load(Ordering::Acquire) == 0 {
self.start_driver_thread();
}
}

/// Start the `async-process` thread.
#[cold]
fn start_driver_thread(&'static self) {
self.async_process_thread.call_once(move || {
thread::Builder::new()
.name("async-process".to_string())
.spawn(|| REAPER.wait_blocking().reap())
.spawn(move || {
let driver = async move {
self.drivers.fetch_add(1, Ordering::SeqCst);
let guard = self.driver_guard.lock().await;
self.reap(guard).await
};

#[cfg(unix)]
async_io::block_on(driver);

#[cfg(not(unix))]
future::block_on(driver);
})
.expect("cannot spawn async-process thread");

Reaper {
sigchld: Event::new(),
zombies: Mutex::new(Vec::new()),
pipe: Pipe::new().expect("cannot create SIGCHLD pipe"),
}
})
});
}

/// Reap zombie processes forever.
fn reap(&'static self) -> ! {
async fn reap(&'static self, _driver_guard: async_lock::MutexGuard<'_, ()>) -> ! {
loop {
// Wait for the next SIGCHLD signal.
self.pipe.wait();
self.pipe.wait().await;

// Notify all listeners waiting on the SIGCHLD event.
self.sigchld.notify(std::usize::MAX);

// Reap zombie processes.
let mut zombies = self.zombies.lock().unwrap();
// Reap zombie processes, but make sure we don't hold onto the lock for too long!
let mut zombies = mem::take(&mut *self.zombies.lock().unwrap());
let mut i = 0;
while i < zombies.len() {
if let Ok(None) = zombies[i].try_wait() {
i += 1;
} else {
zombies.swap_remove(i);
'reap_zombies: loop {
for _ in 0..50 {
if i >= zombies.len() {
break 'reap_zombies;
}

if let Ok(None) = zombies[i].try_wait() {
i += 1;
} else {
zombies.swap_remove(i);
}
}

// Be a good citizen; yield if there are a lot of processes.
future::yield_now().await;
}

// Put zombie processes back.
self.zombies.lock().unwrap().append(&mut zombies);
}
}

/// Register a process with this reaper.
fn register(&'static self, child: &std::process::Child) -> io::Result<()> {
self.ensure_driven();
self.pipe.register(child)
}
}

cfg_if::cfg_if! {
if #[cfg(windows)] {
use async_channel::{Sender, Receiver, bounded};
use std::ffi::c_void;
use std::os::windows::io::AsRawHandle;
use std::sync::mpsc;

use windows_sys::Win32::{
Foundation::{BOOLEAN, HANDLE},
Expand All @@ -167,25 +230,25 @@ cfg_if::cfg_if! {
/// Waits for the next SIGCHLD signal.
struct Pipe {
/// The sender channel for the SIGCHLD signal.
sender: mpsc::SyncSender<()>,
sender: Sender<()>,

/// The receiver channel for the SIGCHLD signal.
receiver: Mutex<mpsc::Receiver<()>>,
receiver: Receiver<()>,
}

impl Pipe {
/// Creates a new pipe.
fn new() -> io::Result<Pipe> {
let (sender, receiver) = mpsc::sync_channel(1);
let (sender, receiver) = bounded(1);
Ok(Pipe {
sender,
receiver: Mutex::new(receiver),
receiver
})
}

/// Waits for the next SIGCHLD signal.
fn wait(&self) {
self.receiver.lock().unwrap().recv().ok();
async fn wait(&self) {
self.receiver.recv().await.ok();
}

/// Register a process object into this pipe.
Expand Down Expand Up @@ -238,8 +301,8 @@ cfg_if::cfg_if! {
}

/// Waits for the next SIGCHLD signal.
fn wait(&self) {
async_io::block_on((&self.signals).next());
async fn wait(&self) {
(&self.signals).next().await;
}

/// Register a process object into this pipe.
Expand All @@ -260,6 +323,7 @@ struct ChildGuard {
inner: Option<std::process::Child>,
reap_on_drop: bool,
kill_on_drop: bool,
reaper: &'static Reaper,
}

impl ChildGuard {
Expand All @@ -275,11 +339,14 @@ impl Drop for ChildGuard {
self.get_mut().kill().ok();
}
if self.reap_on_drop {
let mut zombies = Reaper::get().zombies.lock().unwrap();
let mut zombies = self.reaper.zombies.lock().unwrap();
if let Ok(None) = self.get_mut().try_wait() {
zombies.push(self.inner.take().unwrap());
}
}

// Decrement number of children.
self.reaper.child_count.fetch_sub(1, Ordering::Acquire);
}
}

Expand Down Expand Up @@ -330,6 +397,9 @@ impl Child {
let stdout = child.stdout.take().map(wrap).transpose()?.map(ChildStdout);
let stderr = child.stderr.take().map(wrap).transpose()?.map(ChildStderr);

// Bump the child count.
reaper.child_count.fetch_add(1, Ordering::Relaxed);

// Register the child process in the global list.
reaper.register(&child)?;

Expand All @@ -341,6 +411,7 @@ impl Child {
inner: Some(child),
reap_on_drop: cmd.reap_on_drop,
kill_on_drop: cmd.kill_on_drop,
reaper,
})),
})
}
Expand Down Expand Up @@ -760,6 +831,80 @@ impl TryFrom<ChildStderr> for OwnedFd {
}
}

/// Runs the driver for the asynchronous processes.
///
/// This future takes control of global structures related to driving [`Child`]ren and reaping
/// zombie processes. These responsibilities include listening for the `SIGCHLD` signal and
/// making sure zombie processes are successfully waited on.
///
/// If multiple tasks run `driver()` at once, only one will actually drive the reaper; the other
/// ones will just sleep. If a task that is driving the reaper is dropped, a previously sleeping
/// task will take over. If all tasks driving the reaper are dropped, the "async-process" thread
/// will be spawned. The "async-process" thread just blocks on this future and will automatically
/// be spawned if no tasks are driving the reaper once a [`Child`] is created.
///
/// This future will never complete. It is intended to be ran on a background task in your
/// executor of choice.
///
/// # Examples
///
/// ```no_run
/// use async_executor::Executor;
/// use async_process::{driver, Command};
///
/// # futures_lite::future::block_on(async {
/// // Create an executor and run on it.
/// let ex = Executor::new();
/// ex.run(async {
/// // Run the driver future in the background.
/// ex.spawn(driver()).detach();
///
/// // Run a command.
/// Command::new("ls").output().await.ok();
/// }).await;
/// # });
/// ```
#[inline]
pub fn driver() -> impl Future<Output = Infallible> + Send + 'static {
struct CallOnDrop<F: FnMut()>(F);

impl<F: FnMut()> Drop for CallOnDrop<F> {
fn drop(&mut self) {
(self.0)();
}
}

async {
// Get the reaper.
let reaper = Reaper::get();

// Make sure the reaper knows we're driving it.
reaper.drivers.fetch_add(1, Ordering::SeqCst);

// Decrement the driver count when this future is dropped.
let _guard = CallOnDrop(|| {
let prev_count = reaper.drivers.fetch_sub(1, Ordering::SeqCst);

// If this was the last driver, and there are still resources actively using the
// reaper, make sure that there is a thread driving the reaper.
if prev_count == 1
&& reaper.child_count.load(Ordering::SeqCst) > 0
&& !reaper
.zombies
.lock()
.unwrap_or_else(|x| x.into_inner())
.is_empty()
{
reaper.ensure_driven();
}
});

// Acquire the reaper lock and start polling the SIGCHLD event.
let guard = reaper.driver_guard.lock().await;
reaper.reap(guard).await
}
}

/// A builder for spawning processes.
///
/// # Examples
Expand Down
19 changes: 19 additions & 0 deletions tests/std.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,25 @@ fn smoke() {
})
}

#[test]
fn smoke_driven() {
future::block_on(
async {
async_process::driver().await;
}
.or(async {
let p = if cfg!(target_os = "windows") {
Command::new("cmd").args(["/C", "exit 0"]).spawn()
} else {
Command::new("true").spawn()
};
assert!(p.is_ok());
let mut p = p.unwrap();
assert!(p.status().await.unwrap().success());
}),
)
}

#[test]
fn smoke_failure() {
assert!(Command::new("if-this-is-a-binary-then-the-world-has-ended")
Expand Down

0 comments on commit 381e7f5

Please sign in to comment.