From 068e0bba6530e495515606fd5ecbd11c747d1aab Mon Sep 17 00:00:00 2001 From: Sean Bryan Date: Mon, 16 Oct 2023 10:04:44 +1100 Subject: [PATCH] Use `multiprocessing.Pool.map()` for parallelism We currently do not exit with non-zero if an uncaught exception occurs within a multiprocessing child process. `multiprocessing.Pool.map()` will kill all processes if any one process throws an exception and then re-raise that exception in the parent process. Fixes #122 --- benchcab/comparison.py | 31 +++++----------------- benchcab/fluxsite.py | 59 +++++++++++++++++++++--------------------- 2 files changed, 35 insertions(+), 55 deletions(-) diff --git a/benchcab/comparison.py b/benchcab/comparison.py index 0c181f3b..d4824019 100644 --- a/benchcab/comparison.py +++ b/benchcab/comparison.py @@ -1,7 +1,8 @@ """A module containing functions and data structures for running comparison tasks.""" import multiprocessing -import queue +import operator +import sys from pathlib import Path from subprocess import CalledProcessError @@ -48,6 +49,7 @@ def run(self, verbose=False) -> None: f"Failure: files {file_a.name} {file_b.name} differ. " f"Results of diff have been written to {output_file}" ) + sys.stdout.flush() def run_comparisons(comparison_tasks: list[ComparisonTask], verbose=False) -> None: @@ -62,27 +64,6 @@ def run_comparisons_in_parallel( verbose=False, ) -> None: """Runs bitwise comparison tasks in parallel across multiple processes.""" - task_queue: multiprocessing.Queue = multiprocessing.Queue() - for task in comparison_tasks: - task_queue.put(task) - - processes = [] - for _ in range(n_processes): - proc = multiprocessing.Process( - target=worker_comparison, args=[task_queue, verbose] - ) - proc.start() - processes.append(proc) - - for proc in processes: - proc.join() - - -def worker_comparison(task_queue: multiprocessing.Queue, verbose=False) -> None: - """Runs bitwise comparison tasks in `task_queue` until the queue is emptied.""" - while True: - try: - task = task_queue.get_nowait() - except queue.Empty: - return - task.run(verbose=verbose) + run_task = operator.methodcaller("run", verbose=verbose) + with multiprocessing.Pool(n_processes) as pool: + pool.map(run_task, comparison_tasks, chunksize=1) diff --git a/benchcab/fluxsite.py b/benchcab/fluxsite.py index a96f6552..6bca611e 100644 --- a/benchcab/fluxsite.py +++ b/benchcab/fluxsite.py @@ -1,8 +1,9 @@ """A module containing functions and data structures for running fluxsite tasks.""" import multiprocessing -import queue +import operator import shutil +import sys from pathlib import Path from subprocess import CalledProcessError from typing import Any, Dict, TypeVar @@ -134,7 +135,9 @@ def setup_task(self, verbose=False): mkdir( internal.FLUXSITE_DIRS["TASKS"] / self.get_task_name(), - verbose=verbose, parents=True, exist_ok=True + verbose=verbose, + parents=True, + exist_ok=True, ) self.clean_task(verbose=verbose) @@ -204,7 +207,9 @@ def clean_task(self, verbose=False): if verbose: print(" Cleaning task") - task_dir = self.root_dir / internal.FLUXSITE_DIRS["TASKS"] / self.get_task_name() + task_dir = ( + self.root_dir / internal.FLUXSITE_DIRS["TASKS"] / self.get_task_name() + ) cable_exe = task_dir / internal.CABLE_EXE if cable_exe.exists(): @@ -223,12 +228,16 @@ def clean_task(self, verbose=False): cable_soil_nml.unlink() output_file = ( - self.root_dir / internal.FLUXSITE_DIRS["OUTPUT"] / self.get_output_filename() + self.root_dir + / internal.FLUXSITE_DIRS["OUTPUT"] + / self.get_output_filename() ) if output_file.exists(): output_file.unlink() - log_file = self.root_dir / internal.FLUXSITE_DIRS["LOG"] / self.get_log_filename() + log_file = ( + self.root_dir / internal.FLUXSITE_DIRS["LOG"] / self.get_log_filename() + ) if log_file.exists(): log_file.unlink() @@ -241,7 +250,9 @@ def fetch_files(self, verbose=False): - copies contents of 'namelists' directory to 'runs/fluxsite/tasks/' directory. - copies cable executable from source to 'runs/fluxsite/tasks/' directory. """ - task_dir = self.root_dir / internal.FLUXSITE_DIRS["TASKS"] / self.get_task_name() + task_dir = ( + self.root_dir / internal.FLUXSITE_DIRS["TASKS"] / self.get_task_name() + ) if verbose: print( @@ -282,7 +293,12 @@ def run(self, verbose=False): self.run_cable(verbose=verbose) self.add_provenance_info(verbose=verbose) except CableError: - return + # Note: here we suppress CABLE specific errors so that `benchcab` + # exits successfully. This then allows us to run bitwise comparisons + # checks on whatever output files were produced without having any + # sort of task dependence between CABLE tasks and comparison tasks. + pass + sys.stdout.flush() def run_cable(self, verbose=False): """Run the CABLE executable for the given task. @@ -311,7 +327,9 @@ def add_provenance_info(self, verbose=False): the namelist file used to run cable. """ nc_output_path = ( - self.root_dir / internal.FLUXSITE_DIRS["OUTPUT"] / self.get_output_filename() + self.root_dir + / internal.FLUXSITE_DIRS["OUTPUT"] + / self.get_output_filename() ) nml = f90nml.read( self.root_dir @@ -369,28 +387,9 @@ def run_tasks_in_parallel( tasks: list[Task], n_processes=internal.FLUXSITE_DEFAULT_PBS["ncpus"], verbose=False ): """Runs tasks in `tasks` in parallel across multiple processes.""" - task_queue: multiprocessing.Queue = multiprocessing.Queue() - for task in tasks: - task_queue.put(task) - - processes = [] - for _ in range(n_processes): - proc = multiprocessing.Process(target=worker_run, args=[task_queue, verbose]) - proc.start() - processes.append(proc) - - for proc in processes: - proc.join() - - -def worker_run(task_queue: multiprocessing.Queue, verbose=False): - """Runs tasks in `task_queue` until the queue is emptied.""" - while True: - try: - task = task_queue.get_nowait() - except queue.Empty: - return - task.run(verbose=verbose) + run_task = operator.methodcaller("run", verbose=verbose) + with multiprocessing.Pool(n_processes) as pool: + pool.map(run_task, tasks, chunksize=1) def get_fluxsite_comparisons(