From 99091df3cf2ae25a05b2d62d97dcb1cb9072125d Mon Sep 17 00:00:00 2001 From: Alex Zenla Date: Mon, 5 Aug 2024 18:48:30 -0700 Subject: [PATCH] fix(zone): waitpid should be limited when no child processes exist (fixes #304) (#305) --- crates/zone/src/background.rs | 9 ++++++--- crates/zone/src/childwait.rs | 36 ++++++++++++++++++++++------------- 2 files changed, 29 insertions(+), 16 deletions(-) diff --git a/crates/zone/src/background.rs b/crates/zone/src/background.rs index 40ba8d91..ea103bdc 100644 --- a/crates/zone/src/background.rs +++ b/crates/zone/src/background.rs @@ -16,6 +16,7 @@ use krata::idm::{ }; use log::debug; use nix::unistd::Pid; +use tokio::sync::broadcast::Receiver; use tokio::{select, sync::broadcast}; pub struct ZoneBackground { @@ -23,15 +24,18 @@ pub struct ZoneBackground { child: Pid, _cgroup: Cgroup, wait: ChildWait, + child_receiver: Receiver, } impl ZoneBackground { pub async fn new(idm: IdmInternalClient, cgroup: Cgroup, child: Pid) -> Result { + let (wait, child_receiver) = ChildWait::new()?; Ok(ZoneBackground { idm, child, _cgroup: cgroup, - wait: ChildWait::new()?, + wait, + child_receiver, }) } @@ -39,7 +43,6 @@ impl ZoneBackground { let mut event_subscription = self.idm.subscribe().await?; let mut requests_subscription = self.idm.requests().await?; let mut request_streams_subscription = self.idm.request_streams().await?; - let mut wait_subscription = self.wait.subscribe().await?; loop { select! { x = event_subscription.recv() => match x { @@ -86,7 +89,7 @@ impl ZoneBackground { } }, - event = wait_subscription.recv() => match event { + event = self.child_receiver.recv() => match event { Ok(event) => self.child_event(event).await?, Err(_) => { break; diff --git a/crates/zone/src/childwait.rs b/crates/zone/src/childwait.rs index 309aabab..6efe82f9 100644 --- a/crates/zone/src/childwait.rs +++ b/crates/zone/src/childwait.rs @@ -1,3 +1,9 @@ +use anyhow::Result; +use libc::{c_int, waitpid, WEXITSTATUS, WIFEXITED}; +use log::warn; +use nix::unistd::Pid; +use std::thread::sleep; +use std::time::Duration; use std::{ ptr::addr_of_mut, sync::{ @@ -6,11 +12,6 @@ use std::{ }, thread::{self, JoinHandle}, }; - -use anyhow::Result; -use libc::{c_int, waitpid, WEXITSTATUS, WIFEXITED}; -use log::warn; -use nix::unistd::Pid; use tokio::sync::broadcast::{channel, Receiver, Sender}; const CHILD_WAIT_QUEUE_LEN: usize = 10; @@ -29,8 +30,8 @@ pub struct ChildWait { } impl ChildWait { - pub fn new() -> Result { - let (sender, _) = channel(CHILD_WAIT_QUEUE_LEN); + pub fn new() -> Result<(ChildWait, Receiver)> { + let (sender, receiver) = channel(CHILD_WAIT_QUEUE_LEN); let signal = Arc::new(AtomicBool::new(false)); let mut processor = ChildWaitTask { sender: sender.clone(), @@ -41,11 +42,14 @@ impl ChildWait { warn!("failed to process child updates: {}", error); } }); - Ok(ChildWait { - sender, - signal, - _task: Arc::new(task), - }) + Ok(( + ChildWait { + sender, + signal, + _task: Arc::new(task), + }, + receiver, + )) } pub async fn subscribe(&self) -> Result> { @@ -63,7 +67,13 @@ impl ChildWaitTask { loop { let mut status: c_int = 0; let pid = unsafe { waitpid(-1, addr_of_mut!(status), 0) }; - + // pid being -1 indicates an error occurred, wait 100 microseconds to avoid + // overloading the channel. Right now we don't consider any other errors + // but that is fine for now, as waitpid shouldn't ever stop anyway. + if pid == -1 { + sleep(Duration::from_micros(100)); + continue; + } if WIFEXITED(status) { let event = ChildEvent { pid: Pid::from_raw(pid),