From d6b7ee2039e508423dfa6fd255a406c16130fbd9 Mon Sep 17 00:00:00 2001 From: Joshua Warner Date: Wed, 13 Nov 2024 19:22:52 -0800 Subject: [PATCH] Prevent panics in worker threads from causing deadlocks Occasionally, we'll run into panics during compilation which prior to this diff would cause the compiler to hang indefinitely after printing the initial panic message. Note that this simplified/renamed worker 'wakeup' system seems to leave around a bit of a code smell - that this perhaps indicates we should rework/refactor in order to avoid the necessity of having separate wakeup and message stealing systems... but one problem at a time! --- crates/compiler/load_internal/src/file.rs | 112 ++++++++++------------ crates/compiler/worker/src/worker.rs | 97 +++++++------------ 2 files changed, 84 insertions(+), 125 deletions(-) diff --git a/crates/compiler/load_internal/src/file.rs b/crates/compiler/load_internal/src/file.rs index 870d1769139..585ff0a3c6c 100644 --- a/crates/compiler/load_internal/src/file.rs +++ b/crates/compiler/load_internal/src/file.rs @@ -68,7 +68,7 @@ use roc_solve_problem::TypeError; use roc_target::Target; use roc_types::subs::{CopiedImport, ExposedTypesStorageSubs, Subs, VarStore, Variable}; use roc_types::types::{Alias, Types}; -use roc_worker::{ChannelProblem, WorkerMsg}; +use roc_worker::ChannelProblem; use std::collections::hash_map::Entry::{Occupied, Vacant}; use std::collections::HashMap; use std::io; @@ -1035,14 +1035,14 @@ type MsgSender<'a> = Sender>; /// Add a task to the queue, and notify all the listeners. fn enqueue_task<'a>( injector: &Injector>, - listeners: &[Sender], + listeners: &[Sender<()>], task: BuildTask<'a>, ) -> Result<(), LoadingProblem<'a>> { injector.push(task); for listener in listeners { listener - .send(WorkerMsg::TaskAdded) + .send(()) .map_err(|_| LoadingProblem::ChannelProblem(ChannelProblem::FailedToEnqueueTask))?; } @@ -1569,9 +1569,9 @@ pub fn load_single_threaded<'a>( // We'll add tasks to this, and then worker threads will take tasks from it. let injector = Injector::new(); - let (worker_msg_tx, worker_msg_rx) = bounded(1024); - let worker_listener = worker_msg_tx; - let worker_listeners = arena.alloc([worker_listener]); + let (worker_wakup_tx, worker_wakup_rx) = bounded(1024); + let worker_waker = worker_wakup_tx; + let worker_wakers = [worker_waker]; let worker = Worker::new_fifo(); let stealer = worker.stealer(); @@ -1579,7 +1579,7 @@ pub fn load_single_threaded<'a>( // now we just manually interleave stepping the state "thread" and the worker "thread" loop { - match state_thread_step(arena, state, worker_listeners, &injector, &msg_tx, &msg_rx) { + match state_thread_step(arena, state, &worker_wakers, &injector, &msg_tx, &msg_rx) { Ok(ControlFlow::Break(done)) => return Ok(done), Ok(ControlFlow::Continue(new_state)) => { state = new_state; @@ -1589,7 +1589,7 @@ pub fn load_single_threaded<'a>( // then check if the worker can step let control_flow = - roc_worker::worker_task_step(&worker, &injector, stealers, &worker_msg_rx, |task| { + roc_worker::worker_task_step(&worker, &injector, stealers, &worker_wakup_rx, |task| { run_task(task, arena, &src_dir, msg_tx.clone(), roc_cache_dir, target) }); @@ -1606,7 +1606,7 @@ pub fn load_single_threaded<'a>( fn state_thread_step<'a>( arena: &'a Bump, state: State<'a>, - worker_listeners: &'a [Sender], + worker_wakers: &[Sender<()>], injector: &Injector>, msg_tx: &crossbeam::channel::Sender>, msg_rx: &crossbeam::channel::Receiver>, @@ -1712,14 +1712,8 @@ fn state_thread_step<'a>( let render = state.render; let palette = state.palette; - let res_state = update( - state, - msg, - msg_tx.clone(), - injector, - worker_listeners, - arena, - ); + let res_state = + update(state, msg, msg_tx.clone(), injector, worker_wakers, arena); match res_state { Ok(new_state) => Ok(ControlFlow::Continue(new_state)), @@ -1993,15 +1987,21 @@ fn load_multi_threaded<'a>( { let thread_result = thread::scope(|thread_scope| { - let mut worker_listeners = - bumpalo::collections::Vec::with_capacity_in(num_workers, arena); + // Careful! It's important that worker listeners aren't allocated in the arena, + // since they need to be correctly dropped if we have a panic in this thread::scope code. + // Making sure they're owned means they'll be dropped correctly on either normal exit + // of this thread::scope block or on panicking. When they're dropped, the worker threads + // will correctly exit their message processing loops. + // If these were allocated in the arena, we might panic without shutting down the worker threads, + // causing the thread::scope block to hang while it waits for the worker threads to exit. + let mut worker_wakers = Vec::with_capacity(num_workers); for worker_arena in it { let msg_tx = msg_tx.clone(); let worker = worker_queues.pop().unwrap(); - let (worker_msg_tx, worker_msg_rx) = bounded(1024); - worker_listeners.push(worker_msg_tx); + let (worker_wakup_tx, worker_wakup_rx) = bounded(1024); + worker_wakers.push(worker_wakup_tx); // We only want to move a *reference* to the main task queue's // injector in the thread, not the injector itself @@ -2015,16 +2015,22 @@ fn load_multi_threaded<'a>( .stack_size(EXPANDED_STACK_SIZE) .spawn(move |_| { // will process messages until we run out - roc_worker::worker_task(worker, injector, stealers, worker_msg_rx, |task| { - run_task( - task, - worker_arena, - src_dir, - msg_tx.clone(), - roc_cache_dir, - target, - ) - }) + roc_worker::worker_task( + worker, + injector, + stealers, + worker_wakup_rx, + |task| { + run_task( + task, + worker_arena, + src_dir, + msg_tx.clone(), + roc_cache_dir, + target, + ) + }, + ) }); res_join_handle.unwrap_or_else(|_| { @@ -2039,31 +2045,13 @@ fn load_multi_threaded<'a>( // Grab a reference to these Senders outside the loop, so we can share // it across each iteration of the loop. - let worker_listeners = worker_listeners.into_bump_slice(); let msg_tx = msg_tx.clone(); - macro_rules! shut_down_worker_threads { - () => { - for listener in worker_listeners { - // We intentionally don't propagate this Result, because even if - // shutting down a worker failed (which can happen if a a panic - // occurred on that thread), we want to continue shutting down - // the others regardless. - if listener.send(WorkerMsg::Shutdown).is_err() { - log!("There was an error trying to shutdown a worker thread. One reason this can happen is if the thread panicked."); - } - } - }; - } - // The root module will have already queued up messages to process, // and processing those messages will in turn queue up more messages. loop { - match state_thread_step(arena, state, worker_listeners, &injector, &msg_tx, &msg_rx) - { + match state_thread_step(arena, state, &worker_wakers, &injector, &msg_tx, &msg_rx) { Ok(ControlFlow::Break(load_result)) => { - shut_down_worker_threads!(); - return Ok(load_result); } Ok(ControlFlow::Continue(new_state)) => { @@ -2071,8 +2059,6 @@ fn load_multi_threaded<'a>( continue; } Err(e) => { - shut_down_worker_threads!(); - return Err(e); } } @@ -2111,13 +2097,13 @@ fn start_tasks<'a>( state: &mut State<'a>, work: MutSet<(ModuleId, Phase)>, injector: &Injector>, - worker_listeners: &'a [Sender], + worker_wakers: &[Sender<()>], ) -> Result<(), LoadingProblem<'a>> { for (module_id, phase) in work { let tasks = start_phase(module_id, phase, arena, state); for task in tasks { - enqueue_task(injector, worker_listeners, task)? + enqueue_task(injector, worker_wakers, task)? } } @@ -2179,7 +2165,7 @@ fn update<'a>( msg: Msg<'a>, msg_tx: MsgSender<'a>, injector: &Injector>, - worker_listeners: &'a [Sender], + worker_wakers: &[Sender<()>], arena: &'a Bump, ) -> Result, LoadingProblem<'a>> { use self::Msg::*; @@ -2305,7 +2291,7 @@ fn update<'a>( work.extend(state.dependencies.notify(home, Phase::LoadHeader)); work.insert((home, Phase::Parse)); - start_tasks(arena, &mut state, work, injector, worker_listeners)?; + start_tasks(arena, &mut state, work, injector, worker_wakers)?; Ok(state) } @@ -2382,7 +2368,7 @@ fn update<'a>( } }; - start_tasks(arena, &mut state, work, injector, worker_listeners)?; + start_tasks(arena, &mut state, work, injector, worker_wakers)?; state .module_cache @@ -2393,7 +2379,7 @@ fn update<'a>( let work = state.dependencies.notify(module_id, Phase::Parse); - start_tasks(arena, &mut state, work, injector, worker_listeners)?; + start_tasks(arena, &mut state, work, injector, worker_wakers)?; Ok(state) } @@ -2445,7 +2431,7 @@ fn update<'a>( .dependencies .notify(module_id, Phase::CanonicalizeAndConstrain); - start_tasks(arena, &mut state, work, injector, worker_listeners)?; + start_tasks(arena, &mut state, work, injector, worker_wakers)?; Ok(state) } @@ -2652,7 +2638,7 @@ fn update<'a>( work }; - start_tasks(arena, &mut state, work, injector, worker_listeners)?; + start_tasks(arena, &mut state, work, injector, worker_wakers)?; } Ok(state) @@ -2700,7 +2686,7 @@ fn update<'a>( .dependencies .notify(module_id, Phase::FindSpecializations); - start_tasks(arena, &mut state, work, injector, worker_listeners)?; + start_tasks(arena, &mut state, work, injector, worker_wakers)?; Ok(state) } @@ -2990,13 +2976,13 @@ fn update<'a>( let work = state.dependencies.reload_make_specialization_pass(); - start_tasks(arena, &mut state, work, injector, worker_listeners)?; + start_tasks(arena, &mut state, work, injector, worker_wakers)?; Ok(state) } NextStep::MakingInPhase => { - start_tasks(arena, &mut state, work, injector, worker_listeners)?; + start_tasks(arena, &mut state, work, injector, worker_wakers)?; Ok(state) } diff --git a/crates/compiler/worker/src/worker.rs b/crates/compiler/worker/src/worker.rs index 71cb267be9f..8c9d2bcd601 100644 --- a/crates/compiler/worker/src/worker.rs +++ b/crates/compiler/worker/src/worker.rs @@ -7,12 +7,6 @@ use roc_module::symbol::ModuleId; use roc_work::Phase; use std::ops::ControlFlow; -#[derive(Debug)] -pub enum WorkerMsg { - Shutdown, - TaskAdded, -} - #[derive(Debug)] pub enum ChannelProblem { FailedToSendRootMsg, @@ -29,41 +23,31 @@ pub fn worker_task_step( worker: &Worker, injector: &Injector, stealers: &[Stealer], - worker_msg_rx: &Receiver, + worker_wakeup_rx: &Receiver<()>, run_task: impl Fn(Task) -> Result<(), ChannelProblem>, ) -> Result, ChannelProblem> { - match worker_msg_rx.try_recv() { - Ok(msg) => { - match msg { - WorkerMsg::Shutdown => { - // We've finished all our work. It's time to - // shut down the thread, so when the main thread - // blocks on joining with all the worker threads, - // it can finally exit too! - Ok(ControlFlow::Break(())) - } - WorkerMsg::TaskAdded => { - // Find a task - either from this thread's queue, - // or from the main queue, or from another worker's - // queue - and run it. - // - // There might be no tasks to work on! That could - // happen if another thread is working on a task - // which will later result in more tasks being - // added. In that case, do nothing, and keep waiting - // until we receive a Shutdown message. - if let Some(task) = find_task(worker, injector, stealers) { - run_task(task)?; - } - - Ok(ControlFlow::Continue(())) - } + match worker_wakeup_rx.try_recv() { + Ok(()) => { + // Find a task - either from this thread's queue, + // or from the main queue, or from another worker's + // queue - and run it. + // + // There might be no tasks to work on! That could + // happen if another thread is working on a task + // which will later result in more tasks being + // added. In that case, do nothing, and keep waiting + // until we receive a Shutdown message. + if let Some(task) = find_task(worker, injector, stealers) { + run_task(task)?; } + + Ok(ControlFlow::Continue(())) } Err(err) => match err { crossbeam::channel::TryRecvError::Empty => Ok(ControlFlow::Continue(())), crossbeam::channel::TryRecvError::Disconnected => { - Err(ChannelProblem::ChannelDisconnected) + // The channel sender has been dropped, which means we want to shut down + Ok(ControlFlow::Break(())) } }, } @@ -73,33 +57,22 @@ pub fn worker_task( worker: Worker, injector: &Injector, stealers: &[Stealer], - worker_msg_rx: crossbeam::channel::Receiver, + worker_wakeup_rx: crossbeam::channel::Receiver<()>, run_task: impl Fn(Task) -> Result<(), ChannelProblem>, ) -> Result<(), ChannelProblem> { // Keep listening until we receive a Shutdown msg - for msg in worker_msg_rx.iter() { - match msg { - WorkerMsg::Shutdown => { - // We've finished all our work. It's time to - // shut down the thread, so when the main thread - // blocks on joining with all the worker threads, - // it can finally exit too! - return Ok(()); - } - WorkerMsg::TaskAdded => { - // Find a task - either from this thread's queue, - // or from the main queue, or from another worker's - // queue - and run it. - // - // There might be no tasks to work on! That could - // happen if another thread is working on a task - // which will later result in more tasks being - // added. In that case, do nothing, and keep waiting - // until we receive a Shutdown message. - if let Some(task) = find_task(&worker, injector, stealers) { - run_task(task)?; - } - } + for () in worker_wakeup_rx.iter() { + // Find a task - either from this thread's queue, + // or from the main queue, or from another worker's + // queue - and run it. + // + // There might be no tasks to work on! That could + // happen if another thread is working on a task + // which will later result in more tasks being + // added. In that case, do nothing, and keep waiting + // until we receive a Shutdown message. + if let Some(task) = find_task(&worker, injector, stealers) { + run_task(task)?; } } @@ -110,17 +83,17 @@ pub fn start_tasks>( state: &mut State, work: MutSet<(ModuleId, Phase)>, injector: &Injector, - worker_listeners: &[Sender], + worker_wakers: &[Sender<()>], mut start_phase: impl FnMut(ModuleId, Phase, &mut State) -> Tasks, -) -> Result<(), SendError> { +) -> Result<(), SendError<()>> { for (module_id, phase) in work { let tasks = start_phase(module_id, phase, state); for task in tasks { injector.push(task); - for listener in worker_listeners { - listener.send(WorkerMsg::TaskAdded)?; + for listener in worker_wakers { + listener.send(())?; } } }