Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent panics in worker threads from causing deadlocks #7218

Merged
merged 1 commit into from
Nov 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 49 additions & 63 deletions crates/compiler/load_internal/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ use roc_solve_problem::TypeError;
use roc_target::Target;
use roc_types::subs::{CopiedImport, ExposedTypesStorageSubs, Subs, VarStore, Variable};
use roc_types::types::{Alias, Types};
use roc_worker::{ChannelProblem, WorkerMsg};
use roc_worker::ChannelProblem;
use std::collections::hash_map::Entry::{Occupied, Vacant};
use std::collections::HashMap;
use std::io;
Expand Down Expand Up @@ -1035,14 +1035,14 @@ type MsgSender<'a> = Sender<Msg<'a>>;
/// Add a task to the queue, and notify all the listeners.
fn enqueue_task<'a>(
injector: &Injector<BuildTask<'a>>,
listeners: &[Sender<WorkerMsg>],
listeners: &[Sender<()>],
task: BuildTask<'a>,
) -> Result<(), LoadingProblem<'a>> {
injector.push(task);

for listener in listeners {
listener
.send(WorkerMsg::TaskAdded)
.send(())
.map_err(|_| LoadingProblem::ChannelProblem(ChannelProblem::FailedToEnqueueTask))?;
}

Expand Down Expand Up @@ -1569,17 +1569,17 @@ pub fn load_single_threaded<'a>(
// We'll add tasks to this, and then worker threads will take tasks from it.
let injector = Injector::new();

let (worker_msg_tx, worker_msg_rx) = bounded(1024);
let worker_listener = worker_msg_tx;
let worker_listeners = arena.alloc([worker_listener]);
let (worker_wakup_tx, worker_wakup_rx) = bounded(1024);
let worker_waker = worker_wakup_tx;
let worker_wakers = [worker_waker];

let worker = Worker::new_fifo();
let stealer = worker.stealer();
let stealers = &[stealer];

// now we just manually interleave stepping the state "thread" and the worker "thread"
loop {
match state_thread_step(arena, state, worker_listeners, &injector, &msg_tx, &msg_rx) {
match state_thread_step(arena, state, &worker_wakers, &injector, &msg_tx, &msg_rx) {
Ok(ControlFlow::Break(done)) => return Ok(done),
Ok(ControlFlow::Continue(new_state)) => {
state = new_state;
Expand All @@ -1589,7 +1589,7 @@ pub fn load_single_threaded<'a>(

// then check if the worker can step
let control_flow =
roc_worker::worker_task_step(&worker, &injector, stealers, &worker_msg_rx, |task| {
roc_worker::worker_task_step(&worker, &injector, stealers, &worker_wakup_rx, |task| {
run_task(task, arena, &src_dir, msg_tx.clone(), roc_cache_dir, target)
});

Expand All @@ -1606,7 +1606,7 @@ pub fn load_single_threaded<'a>(
fn state_thread_step<'a>(
arena: &'a Bump,
state: State<'a>,
worker_listeners: &'a [Sender<WorkerMsg>],
worker_wakers: &[Sender<()>],
injector: &Injector<BuildTask<'a>>,
msg_tx: &crossbeam::channel::Sender<Msg<'a>>,
msg_rx: &crossbeam::channel::Receiver<Msg<'a>>,
Expand Down Expand Up @@ -1712,14 +1712,8 @@ fn state_thread_step<'a>(
let render = state.render;
let palette = state.palette;

let res_state = update(
state,
msg,
msg_tx.clone(),
injector,
worker_listeners,
arena,
);
let res_state =
update(state, msg, msg_tx.clone(), injector, worker_wakers, arena);

match res_state {
Ok(new_state) => Ok(ControlFlow::Continue(new_state)),
Expand Down Expand Up @@ -1993,15 +1987,21 @@ fn load_multi_threaded<'a>(

{
let thread_result = thread::scope(|thread_scope| {
let mut worker_listeners =
bumpalo::collections::Vec::with_capacity_in(num_workers, arena);
// Careful! It's important that worker listeners aren't allocated in the arena,
// since they need to be correctly dropped if we have a panic in this thread::scope code.
// Making sure they're owned means they'll be dropped correctly on either normal exit
// of this thread::scope block or on panicking. When they're dropped, the worker threads
// will correctly exit their message processing loops.
// If these were allocated in the arena, we might panic without shutting down the worker threads,
// causing the thread::scope block to hang while it waits for the worker threads to exit.
let mut worker_wakers = Vec::with_capacity(num_workers);
Comment on lines +1990 to +1997
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really helpful explanation 😄


for worker_arena in it {
let msg_tx = msg_tx.clone();
let worker = worker_queues.pop().unwrap();

let (worker_msg_tx, worker_msg_rx) = bounded(1024);
worker_listeners.push(worker_msg_tx);
let (worker_wakup_tx, worker_wakup_rx) = bounded(1024);
worker_wakers.push(worker_wakup_tx);

// We only want to move a *reference* to the main task queue's
// injector in the thread, not the injector itself
Expand All @@ -2015,16 +2015,22 @@ fn load_multi_threaded<'a>(
.stack_size(EXPANDED_STACK_SIZE)
.spawn(move |_| {
// will process messages until we run out
roc_worker::worker_task(worker, injector, stealers, worker_msg_rx, |task| {
run_task(
task,
worker_arena,
src_dir,
msg_tx.clone(),
roc_cache_dir,
target,
)
})
roc_worker::worker_task(
worker,
injector,
stealers,
worker_wakup_rx,
|task| {
run_task(
task,
worker_arena,
src_dir,
msg_tx.clone(),
roc_cache_dir,
target,
)
},
)
});

res_join_handle.unwrap_or_else(|_| {
Expand All @@ -2039,40 +2045,20 @@ fn load_multi_threaded<'a>(

// Grab a reference to these Senders outside the loop, so we can share
// it across each iteration of the loop.
let worker_listeners = worker_listeners.into_bump_slice();
let msg_tx = msg_tx.clone();

macro_rules! shut_down_worker_threads {
() => {
for listener in worker_listeners {
// We intentionally don't propagate this Result, because even if
// shutting down a worker failed (which can happen if a a panic
// occurred on that thread), we want to continue shutting down
// the others regardless.
if listener.send(WorkerMsg::Shutdown).is_err() {
log!("There was an error trying to shutdown a worker thread. One reason this can happen is if the thread panicked.");
}
}
};
}

// The root module will have already queued up messages to process,
// and processing those messages will in turn queue up more messages.
loop {
match state_thread_step(arena, state, worker_listeners, &injector, &msg_tx, &msg_rx)
{
match state_thread_step(arena, state, &worker_wakers, &injector, &msg_tx, &msg_rx) {
Ok(ControlFlow::Break(load_result)) => {
shut_down_worker_threads!();

return Ok(load_result);
}
Ok(ControlFlow::Continue(new_state)) => {
state = new_state;
continue;
}
Err(e) => {
shut_down_worker_threads!();

return Err(e);
}
}
Expand Down Expand Up @@ -2111,13 +2097,13 @@ fn start_tasks<'a>(
state: &mut State<'a>,
work: MutSet<(ModuleId, Phase)>,
injector: &Injector<BuildTask<'a>>,
worker_listeners: &'a [Sender<WorkerMsg>],
worker_wakers: &[Sender<()>],
) -> Result<(), LoadingProblem<'a>> {
for (module_id, phase) in work {
let tasks = start_phase(module_id, phase, arena, state);

for task in tasks {
enqueue_task(injector, worker_listeners, task)?
enqueue_task(injector, worker_wakers, task)?
}
}

Expand Down Expand Up @@ -2179,7 +2165,7 @@ fn update<'a>(
msg: Msg<'a>,
msg_tx: MsgSender<'a>,
injector: &Injector<BuildTask<'a>>,
worker_listeners: &'a [Sender<WorkerMsg>],
worker_wakers: &[Sender<()>],
arena: &'a Bump,
) -> Result<State<'a>, LoadingProblem<'a>> {
use self::Msg::*;
Expand Down Expand Up @@ -2305,7 +2291,7 @@ fn update<'a>(
work.extend(state.dependencies.notify(home, Phase::LoadHeader));
work.insert((home, Phase::Parse));

start_tasks(arena, &mut state, work, injector, worker_listeners)?;
start_tasks(arena, &mut state, work, injector, worker_wakers)?;

Ok(state)
}
Expand Down Expand Up @@ -2382,7 +2368,7 @@ fn update<'a>(
}
};

start_tasks(arena, &mut state, work, injector, worker_listeners)?;
start_tasks(arena, &mut state, work, injector, worker_wakers)?;

state
.module_cache
Expand All @@ -2393,7 +2379,7 @@ fn update<'a>(

let work = state.dependencies.notify(module_id, Phase::Parse);

start_tasks(arena, &mut state, work, injector, worker_listeners)?;
start_tasks(arena, &mut state, work, injector, worker_wakers)?;

Ok(state)
}
Expand Down Expand Up @@ -2445,7 +2431,7 @@ fn update<'a>(
.dependencies
.notify(module_id, Phase::CanonicalizeAndConstrain);

start_tasks(arena, &mut state, work, injector, worker_listeners)?;
start_tasks(arena, &mut state, work, injector, worker_wakers)?;

Ok(state)
}
Expand Down Expand Up @@ -2652,7 +2638,7 @@ fn update<'a>(
work
};

start_tasks(arena, &mut state, work, injector, worker_listeners)?;
start_tasks(arena, &mut state, work, injector, worker_wakers)?;
}

Ok(state)
Expand Down Expand Up @@ -2700,7 +2686,7 @@ fn update<'a>(
.dependencies
.notify(module_id, Phase::FindSpecializations);

start_tasks(arena, &mut state, work, injector, worker_listeners)?;
start_tasks(arena, &mut state, work, injector, worker_wakers)?;

Ok(state)
}
Expand Down Expand Up @@ -2990,13 +2976,13 @@ fn update<'a>(

let work = state.dependencies.reload_make_specialization_pass();

start_tasks(arena, &mut state, work, injector, worker_listeners)?;
start_tasks(arena, &mut state, work, injector, worker_wakers)?;

Ok(state)
}

NextStep::MakingInPhase => {
start_tasks(arena, &mut state, work, injector, worker_listeners)?;
start_tasks(arena, &mut state, work, injector, worker_wakers)?;

Ok(state)
}
Expand Down
Loading