Skip to content

Commit

Permalink
Merge pull request #7218 from joshuawarner32/clean-shutdown
Browse files Browse the repository at this point in the history
Prevent panics in worker threads from causing deadlocks
  • Loading branch information
joshuawarner32 authored Nov 15, 2024
2 parents 0804758 + d6b7ee2 commit 8fd83a4
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 125 deletions.
112 changes: 49 additions & 63 deletions crates/compiler/load_internal/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ use roc_solve_problem::TypeError;
use roc_target::Target;
use roc_types::subs::{CopiedImport, ExposedTypesStorageSubs, Subs, VarStore, Variable};
use roc_types::types::{Alias, Types};
use roc_worker::{ChannelProblem, WorkerMsg};
use roc_worker::ChannelProblem;
use std::collections::hash_map::Entry::{Occupied, Vacant};
use std::collections::HashMap;
use std::io;
Expand Down Expand Up @@ -1035,14 +1035,14 @@ type MsgSender<'a> = Sender<Msg<'a>>;
/// Add a task to the queue, and notify all the listeners.
fn enqueue_task<'a>(
injector: &Injector<BuildTask<'a>>,
listeners: &[Sender<WorkerMsg>],
listeners: &[Sender<()>],
task: BuildTask<'a>,
) -> Result<(), LoadingProblem<'a>> {
injector.push(task);

for listener in listeners {
listener
.send(WorkerMsg::TaskAdded)
.send(())
.map_err(|_| LoadingProblem::ChannelProblem(ChannelProblem::FailedToEnqueueTask))?;
}

Expand Down Expand Up @@ -1569,17 +1569,17 @@ pub fn load_single_threaded<'a>(
// We'll add tasks to this, and then worker threads will take tasks from it.
let injector = Injector::new();

let (worker_msg_tx, worker_msg_rx) = bounded(1024);
let worker_listener = worker_msg_tx;
let worker_listeners = arena.alloc([worker_listener]);
let (worker_wakup_tx, worker_wakup_rx) = bounded(1024);
let worker_waker = worker_wakup_tx;
let worker_wakers = [worker_waker];

let worker = Worker::new_fifo();
let stealer = worker.stealer();
let stealers = &[stealer];

// now we just manually interleave stepping the state "thread" and the worker "thread"
loop {
match state_thread_step(arena, state, worker_listeners, &injector, &msg_tx, &msg_rx) {
match state_thread_step(arena, state, &worker_wakers, &injector, &msg_tx, &msg_rx) {
Ok(ControlFlow::Break(done)) => return Ok(done),
Ok(ControlFlow::Continue(new_state)) => {
state = new_state;
Expand All @@ -1589,7 +1589,7 @@ pub fn load_single_threaded<'a>(

// then check if the worker can step
let control_flow =
roc_worker::worker_task_step(&worker, &injector, stealers, &worker_msg_rx, |task| {
roc_worker::worker_task_step(&worker, &injector, stealers, &worker_wakup_rx, |task| {
run_task(task, arena, &src_dir, msg_tx.clone(), roc_cache_dir, target)
});

Expand All @@ -1606,7 +1606,7 @@ pub fn load_single_threaded<'a>(
fn state_thread_step<'a>(
arena: &'a Bump,
state: State<'a>,
worker_listeners: &'a [Sender<WorkerMsg>],
worker_wakers: &[Sender<()>],
injector: &Injector<BuildTask<'a>>,
msg_tx: &crossbeam::channel::Sender<Msg<'a>>,
msg_rx: &crossbeam::channel::Receiver<Msg<'a>>,
Expand Down Expand Up @@ -1712,14 +1712,8 @@ fn state_thread_step<'a>(
let render = state.render;
let palette = state.palette;

let res_state = update(
state,
msg,
msg_tx.clone(),
injector,
worker_listeners,
arena,
);
let res_state =
update(state, msg, msg_tx.clone(), injector, worker_wakers, arena);

match res_state {
Ok(new_state) => Ok(ControlFlow::Continue(new_state)),
Expand Down Expand Up @@ -1993,15 +1987,21 @@ fn load_multi_threaded<'a>(

{
let thread_result = thread::scope(|thread_scope| {
let mut worker_listeners =
bumpalo::collections::Vec::with_capacity_in(num_workers, arena);
// Careful! It's important that worker listeners aren't allocated in the arena,
// since they need to be correctly dropped if we have a panic in this thread::scope code.
// Making sure they're owned means they'll be dropped correctly on either normal exit
// of this thread::scope block or on panicking. When they're dropped, the worker threads
// will correctly exit their message processing loops.
// If these were allocated in the arena, we might panic without shutting down the worker threads,
// causing the thread::scope block to hang while it waits for the worker threads to exit.
let mut worker_wakers = Vec::with_capacity(num_workers);

for worker_arena in it {
let msg_tx = msg_tx.clone();
let worker = worker_queues.pop().unwrap();

let (worker_msg_tx, worker_msg_rx) = bounded(1024);
worker_listeners.push(worker_msg_tx);
let (worker_wakup_tx, worker_wakup_rx) = bounded(1024);
worker_wakers.push(worker_wakup_tx);

// We only want to move a *reference* to the main task queue's
// injector in the thread, not the injector itself
Expand All @@ -2015,16 +2015,22 @@ fn load_multi_threaded<'a>(
.stack_size(EXPANDED_STACK_SIZE)
.spawn(move |_| {
// will process messages until we run out
roc_worker::worker_task(worker, injector, stealers, worker_msg_rx, |task| {
run_task(
task,
worker_arena,
src_dir,
msg_tx.clone(),
roc_cache_dir,
target,
)
})
roc_worker::worker_task(
worker,
injector,
stealers,
worker_wakup_rx,
|task| {
run_task(
task,
worker_arena,
src_dir,
msg_tx.clone(),
roc_cache_dir,
target,
)
},
)
});

res_join_handle.unwrap_or_else(|_| {
Expand All @@ -2039,40 +2045,20 @@ fn load_multi_threaded<'a>(

// Grab a reference to these Senders outside the loop, so we can share
// it across each iteration of the loop.
let worker_listeners = worker_listeners.into_bump_slice();
let msg_tx = msg_tx.clone();

macro_rules! shut_down_worker_threads {
() => {
for listener in worker_listeners {
// We intentionally don't propagate this Result, because even if
// shutting down a worker failed (which can happen if a a panic
// occurred on that thread), we want to continue shutting down
// the others regardless.
if listener.send(WorkerMsg::Shutdown).is_err() {
log!("There was an error trying to shutdown a worker thread. One reason this can happen is if the thread panicked.");
}
}
};
}

// The root module will have already queued up messages to process,
// and processing those messages will in turn queue up more messages.
loop {
match state_thread_step(arena, state, worker_listeners, &injector, &msg_tx, &msg_rx)
{
match state_thread_step(arena, state, &worker_wakers, &injector, &msg_tx, &msg_rx) {
Ok(ControlFlow::Break(load_result)) => {
shut_down_worker_threads!();

return Ok(load_result);
}
Ok(ControlFlow::Continue(new_state)) => {
state = new_state;
continue;
}
Err(e) => {
shut_down_worker_threads!();

return Err(e);
}
}
Expand Down Expand Up @@ -2111,13 +2097,13 @@ fn start_tasks<'a>(
state: &mut State<'a>,
work: MutSet<(ModuleId, Phase)>,
injector: &Injector<BuildTask<'a>>,
worker_listeners: &'a [Sender<WorkerMsg>],
worker_wakers: &[Sender<()>],
) -> Result<(), LoadingProblem<'a>> {
for (module_id, phase) in work {
let tasks = start_phase(module_id, phase, arena, state);

for task in tasks {
enqueue_task(injector, worker_listeners, task)?
enqueue_task(injector, worker_wakers, task)?
}
}

Expand Down Expand Up @@ -2179,7 +2165,7 @@ fn update<'a>(
msg: Msg<'a>,
msg_tx: MsgSender<'a>,
injector: &Injector<BuildTask<'a>>,
worker_listeners: &'a [Sender<WorkerMsg>],
worker_wakers: &[Sender<()>],
arena: &'a Bump,
) -> Result<State<'a>, LoadingProblem<'a>> {
use self::Msg::*;
Expand Down Expand Up @@ -2305,7 +2291,7 @@ fn update<'a>(
work.extend(state.dependencies.notify(home, Phase::LoadHeader));
work.insert((home, Phase::Parse));

start_tasks(arena, &mut state, work, injector, worker_listeners)?;
start_tasks(arena, &mut state, work, injector, worker_wakers)?;

Ok(state)
}
Expand Down Expand Up @@ -2382,7 +2368,7 @@ fn update<'a>(
}
};

start_tasks(arena, &mut state, work, injector, worker_listeners)?;
start_tasks(arena, &mut state, work, injector, worker_wakers)?;

state
.module_cache
Expand All @@ -2393,7 +2379,7 @@ fn update<'a>(

let work = state.dependencies.notify(module_id, Phase::Parse);

start_tasks(arena, &mut state, work, injector, worker_listeners)?;
start_tasks(arena, &mut state, work, injector, worker_wakers)?;

Ok(state)
}
Expand Down Expand Up @@ -2445,7 +2431,7 @@ fn update<'a>(
.dependencies
.notify(module_id, Phase::CanonicalizeAndConstrain);

start_tasks(arena, &mut state, work, injector, worker_listeners)?;
start_tasks(arena, &mut state, work, injector, worker_wakers)?;

Ok(state)
}
Expand Down Expand Up @@ -2652,7 +2638,7 @@ fn update<'a>(
work
};

start_tasks(arena, &mut state, work, injector, worker_listeners)?;
start_tasks(arena, &mut state, work, injector, worker_wakers)?;
}

Ok(state)
Expand Down Expand Up @@ -2700,7 +2686,7 @@ fn update<'a>(
.dependencies
.notify(module_id, Phase::FindSpecializations);

start_tasks(arena, &mut state, work, injector, worker_listeners)?;
start_tasks(arena, &mut state, work, injector, worker_wakers)?;

Ok(state)
}
Expand Down Expand Up @@ -2990,13 +2976,13 @@ fn update<'a>(

let work = state.dependencies.reload_make_specialization_pass();

start_tasks(arena, &mut state, work, injector, worker_listeners)?;
start_tasks(arena, &mut state, work, injector, worker_wakers)?;

Ok(state)
}

NextStep::MakingInPhase => {
start_tasks(arena, &mut state, work, injector, worker_listeners)?;
start_tasks(arena, &mut state, work, injector, worker_wakers)?;

Ok(state)
}
Expand Down
Loading

0 comments on commit 8fd83a4

Please sign in to comment.