Skip to content

Commit

Permalink
fix(zone): waitpid should be limited when no child processes exist (f…
Browse files Browse the repository at this point in the history
…ixes #304) (#305)
  • Loading branch information
azenla authored Aug 6, 2024
1 parent 08b30c2 commit 99091df
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 16 deletions.
9 changes: 6 additions & 3 deletions crates/zone/src/background.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,30 +16,33 @@ use krata::idm::{
};
use log::debug;
use nix::unistd::Pid;
use tokio::sync::broadcast::Receiver;
use tokio::{select, sync::broadcast};

pub struct ZoneBackground {
idm: IdmInternalClient,
child: Pid,
_cgroup: Cgroup,
wait: ChildWait,
child_receiver: Receiver<ChildEvent>,
}

impl ZoneBackground {
pub async fn new(idm: IdmInternalClient, cgroup: Cgroup, child: Pid) -> Result<ZoneBackground> {
let (wait, child_receiver) = ChildWait::new()?;
Ok(ZoneBackground {
idm,
child,
_cgroup: cgroup,
wait: ChildWait::new()?,
wait,
child_receiver,
})
}

pub async fn run(&mut self) -> Result<()> {
let mut event_subscription = self.idm.subscribe().await?;
let mut requests_subscription = self.idm.requests().await?;
let mut request_streams_subscription = self.idm.request_streams().await?;
let mut wait_subscription = self.wait.subscribe().await?;
loop {
select! {
x = event_subscription.recv() => match x {
Expand Down Expand Up @@ -86,7 +89,7 @@ impl ZoneBackground {
}
},

event = wait_subscription.recv() => match event {
event = self.child_receiver.recv() => match event {
Ok(event) => self.child_event(event).await?,
Err(_) => {
break;
Expand Down
36 changes: 23 additions & 13 deletions crates/zone/src/childwait.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
use anyhow::Result;
use libc::{c_int, waitpid, WEXITSTATUS, WIFEXITED};
use log::warn;
use nix::unistd::Pid;
use std::thread::sleep;
use std::time::Duration;
use std::{
ptr::addr_of_mut,
sync::{
Expand All @@ -6,11 +12,6 @@ use std::{
},
thread::{self, JoinHandle},
};

use anyhow::Result;
use libc::{c_int, waitpid, WEXITSTATUS, WIFEXITED};
use log::warn;
use nix::unistd::Pid;
use tokio::sync::broadcast::{channel, Receiver, Sender};

const CHILD_WAIT_QUEUE_LEN: usize = 10;
Expand All @@ -29,8 +30,8 @@ pub struct ChildWait {
}

impl ChildWait {
pub fn new() -> Result<ChildWait> {
let (sender, _) = channel(CHILD_WAIT_QUEUE_LEN);
pub fn new() -> Result<(ChildWait, Receiver<ChildEvent>)> {
let (sender, receiver) = channel(CHILD_WAIT_QUEUE_LEN);
let signal = Arc::new(AtomicBool::new(false));
let mut processor = ChildWaitTask {
sender: sender.clone(),
Expand All @@ -41,11 +42,14 @@ impl ChildWait {
warn!("failed to process child updates: {}", error);
}
});
Ok(ChildWait {
sender,
signal,
_task: Arc::new(task),
})
Ok((
ChildWait {
sender,
signal,
_task: Arc::new(task),
},
receiver,
))
}

pub async fn subscribe(&self) -> Result<Receiver<ChildEvent>> {
Expand All @@ -63,7 +67,13 @@ impl ChildWaitTask {
loop {
let mut status: c_int = 0;
let pid = unsafe { waitpid(-1, addr_of_mut!(status), 0) };

// pid being -1 indicates an error occurred, wait 100 microseconds to avoid
// overloading the channel. Right now we don't consider any other errors
// but that is fine for now, as waitpid shouldn't ever stop anyway.
if pid == -1 {
sleep(Duration::from_micros(100));
continue;
}
if WIFEXITED(status) {
let event = ChildEvent {
pid: Pid::from_raw(pid),
Expand Down

0 comments on commit 99091df

Please sign in to comment.