diff --git a/benchcab/comparison.py b/benchcab/comparison.py index 0c181f3b..8676d9b8 100644 --- a/benchcab/comparison.py +++ b/benchcab/comparison.py @@ -1,7 +1,7 @@ """A module containing functions and data structures for running comparison tasks.""" import multiprocessing -import queue +import operator from pathlib import Path from subprocess import CalledProcessError @@ -62,27 +62,6 @@ def run_comparisons_in_parallel( verbose=False, ) -> None: """Runs bitwise comparison tasks in parallel across multiple processes.""" - task_queue: multiprocessing.Queue = multiprocessing.Queue() - for task in comparison_tasks: - task_queue.put(task) - - processes = [] - for _ in range(n_processes): - proc = multiprocessing.Process( - target=worker_comparison, args=[task_queue, verbose] - ) - proc.start() - processes.append(proc) - - for proc in processes: - proc.join() - - -def worker_comparison(task_queue: multiprocessing.Queue, verbose=False) -> None: - """Runs bitwise comparison tasks in `task_queue` until the queue is emptied.""" - while True: - try: - task = task_queue.get_nowait() - except queue.Empty: - return - task.run(verbose=verbose) + run_task = operator.methodcaller("run", verbose=verbose) + with multiprocessing.Pool(n_processes) as pool: + pool.map(run_task, comparison_tasks, chunksize=1) diff --git a/benchcab/fluxsite.py b/benchcab/fluxsite.py index a96f6552..46dd7f1a 100644 --- a/benchcab/fluxsite.py +++ b/benchcab/fluxsite.py @@ -1,7 +1,7 @@ """A module containing functions and data structures for running fluxsite tasks.""" import multiprocessing -import queue +import operator import shutil from pathlib import Path from subprocess import CalledProcessError @@ -134,7 +134,9 @@ def setup_task(self, verbose=False): mkdir( internal.FLUXSITE_DIRS["TASKS"] / self.get_task_name(), - verbose=verbose, parents=True, exist_ok=True + verbose=verbose, + parents=True, + exist_ok=True, ) self.clean_task(verbose=verbose) @@ -204,7 +206,9 @@ def clean_task(self, verbose=False): if verbose: print(" Cleaning task") - task_dir = self.root_dir / internal.FLUXSITE_DIRS["TASKS"] / self.get_task_name() + task_dir = ( + self.root_dir / internal.FLUXSITE_DIRS["TASKS"] / self.get_task_name() + ) cable_exe = task_dir / internal.CABLE_EXE if cable_exe.exists(): @@ -223,12 +227,16 @@ def clean_task(self, verbose=False): cable_soil_nml.unlink() output_file = ( - self.root_dir / internal.FLUXSITE_DIRS["OUTPUT"] / self.get_output_filename() + self.root_dir + / internal.FLUXSITE_DIRS["OUTPUT"] + / self.get_output_filename() ) if output_file.exists(): output_file.unlink() - log_file = self.root_dir / internal.FLUXSITE_DIRS["LOG"] / self.get_log_filename() + log_file = ( + self.root_dir / internal.FLUXSITE_DIRS["LOG"] / self.get_log_filename() + ) if log_file.exists(): log_file.unlink() @@ -241,7 +249,9 @@ def fetch_files(self, verbose=False): - copies contents of 'namelists' directory to 'runs/fluxsite/tasks/' directory. - copies cable executable from source to 'runs/fluxsite/tasks/' directory. """ - task_dir = self.root_dir / internal.FLUXSITE_DIRS["TASKS"] / self.get_task_name() + task_dir = ( + self.root_dir / internal.FLUXSITE_DIRS["TASKS"] / self.get_task_name() + ) if verbose: print( @@ -282,6 +292,10 @@ def run(self, verbose=False): self.run_cable(verbose=verbose) self.add_provenance_info(verbose=verbose) except CableError: + # Note: here we suppress CABLE specific errors so that `benchcab` + # exits successfully. This then allows us to run bitwise comparisons + # checks on whatever output files were produced without having any + # sort of task dependence between CABLE tasks and comparison tasks. return def run_cable(self, verbose=False): @@ -311,7 +325,9 @@ def add_provenance_info(self, verbose=False): the namelist file used to run cable. """ nc_output_path = ( - self.root_dir / internal.FLUXSITE_DIRS["OUTPUT"] / self.get_output_filename() + self.root_dir + / internal.FLUXSITE_DIRS["OUTPUT"] + / self.get_output_filename() ) nml = f90nml.read( self.root_dir @@ -369,28 +385,9 @@ def run_tasks_in_parallel( tasks: list[Task], n_processes=internal.FLUXSITE_DEFAULT_PBS["ncpus"], verbose=False ): """Runs tasks in `tasks` in parallel across multiple processes.""" - task_queue: multiprocessing.Queue = multiprocessing.Queue() - for task in tasks: - task_queue.put(task) - - processes = [] - for _ in range(n_processes): - proc = multiprocessing.Process(target=worker_run, args=[task_queue, verbose]) - proc.start() - processes.append(proc) - - for proc in processes: - proc.join() - - -def worker_run(task_queue: multiprocessing.Queue, verbose=False): - """Runs tasks in `task_queue` until the queue is emptied.""" - while True: - try: - task = task_queue.get_nowait() - except queue.Empty: - return - task.run(verbose=verbose) + run_task = operator.methodcaller("run", verbose=verbose) + with multiprocessing.Pool(n_processes) as pool: + pool.map(run_task, tasks, chunksize=1) def get_fluxsite_comparisons(