Skip to content

Commit

Permalink
Use multiprocessing.Pool.map() for parallelism
Browse files Browse the repository at this point in the history
We currently do not exit with non-zero if an uncaught exception occurs
within a multiprocessing child process.

`multiprocessing.Pool.map()` will kill all processes if any one process
throws an exception and then re-raise that exception in the parent
process.

Fixes #122
  • Loading branch information
SeanBryan51 committed Oct 15, 2023
1 parent 058f465 commit 66043f8
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 54 deletions.
29 changes: 4 additions & 25 deletions benchcab/comparison.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""A module containing functions and data structures for running comparison tasks."""

import multiprocessing
import queue
import operator
from pathlib import Path
from subprocess import CalledProcessError

Expand Down Expand Up @@ -62,27 +62,6 @@ def run_comparisons_in_parallel(
verbose=False,
) -> None:
"""Runs bitwise comparison tasks in parallel across multiple processes."""
task_queue: multiprocessing.Queue = multiprocessing.Queue()
for task in comparison_tasks:
task_queue.put(task)

processes = []
for _ in range(n_processes):
proc = multiprocessing.Process(
target=worker_comparison, args=[task_queue, verbose]
)
proc.start()
processes.append(proc)

for proc in processes:
proc.join()


def worker_comparison(task_queue: multiprocessing.Queue, verbose=False) -> None:
"""Runs bitwise comparison tasks in `task_queue` until the queue is emptied."""
while True:
try:
task = task_queue.get_nowait()
except queue.Empty:
return
task.run(verbose=verbose)
run_task = operator.methodcaller("run", verbose=verbose)
with multiprocessing.Pool(n_processes) as pool:
pool.map(run_task, comparison_tasks, chunksize=1)

Check warning on line 67 in benchcab/comparison.py

View check run for this annotation

Codecov / codecov/patch

benchcab/comparison.py#L65-L67

Added lines #L65 - L67 were not covered by tests
55 changes: 26 additions & 29 deletions benchcab/fluxsite.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""A module containing functions and data structures for running fluxsite tasks."""

import multiprocessing
import queue
import operator
import shutil
from pathlib import Path
from subprocess import CalledProcessError
Expand Down Expand Up @@ -134,7 +134,9 @@ def setup_task(self, verbose=False):

mkdir(
internal.FLUXSITE_DIRS["TASKS"] / self.get_task_name(),
verbose=verbose, parents=True, exist_ok=True
verbose=verbose,
parents=True,
exist_ok=True,
)

self.clean_task(verbose=verbose)
Expand Down Expand Up @@ -204,7 +206,9 @@ def clean_task(self, verbose=False):
if verbose:
print(" Cleaning task")

task_dir = self.root_dir / internal.FLUXSITE_DIRS["TASKS"] / self.get_task_name()
task_dir = (
self.root_dir / internal.FLUXSITE_DIRS["TASKS"] / self.get_task_name()
)

cable_exe = task_dir / internal.CABLE_EXE
if cable_exe.exists():
Expand All @@ -223,12 +227,16 @@ def clean_task(self, verbose=False):
cable_soil_nml.unlink()

output_file = (
self.root_dir / internal.FLUXSITE_DIRS["OUTPUT"] / self.get_output_filename()
self.root_dir
/ internal.FLUXSITE_DIRS["OUTPUT"]
/ self.get_output_filename()
)
if output_file.exists():
output_file.unlink()

log_file = self.root_dir / internal.FLUXSITE_DIRS["LOG"] / self.get_log_filename()
log_file = (
self.root_dir / internal.FLUXSITE_DIRS["LOG"] / self.get_log_filename()
)
if log_file.exists():
log_file.unlink()

Expand All @@ -241,7 +249,9 @@ def fetch_files(self, verbose=False):
- copies contents of 'namelists' directory to 'runs/fluxsite/tasks/<task_name>' directory.
- copies cable executable from source to 'runs/fluxsite/tasks/<task_name>' directory.
"""
task_dir = self.root_dir / internal.FLUXSITE_DIRS["TASKS"] / self.get_task_name()
task_dir = (
self.root_dir / internal.FLUXSITE_DIRS["TASKS"] / self.get_task_name()
)

if verbose:
print(
Expand Down Expand Up @@ -282,6 +292,10 @@ def run(self, verbose=False):
self.run_cable(verbose=verbose)
self.add_provenance_info(verbose=verbose)
except CableError:
# Note: here we suppress CABLE specific errors so that `benchcab`
# exits successfully. This then allows us to run bitwise comparisons
# checks on whatever output files were produced without having any
# sort of task dependence between CABLE tasks and comparison tasks.
return

def run_cable(self, verbose=False):
Expand Down Expand Up @@ -311,7 +325,9 @@ def add_provenance_info(self, verbose=False):
the namelist file used to run cable.
"""
nc_output_path = (
self.root_dir / internal.FLUXSITE_DIRS["OUTPUT"] / self.get_output_filename()
self.root_dir
/ internal.FLUXSITE_DIRS["OUTPUT"]
/ self.get_output_filename()
)
nml = f90nml.read(
self.root_dir
Expand Down Expand Up @@ -369,28 +385,9 @@ def run_tasks_in_parallel(
tasks: list[Task], n_processes=internal.FLUXSITE_DEFAULT_PBS["ncpus"], verbose=False
):
"""Runs tasks in `tasks` in parallel across multiple processes."""
task_queue: multiprocessing.Queue = multiprocessing.Queue()
for task in tasks:
task_queue.put(task)

processes = []
for _ in range(n_processes):
proc = multiprocessing.Process(target=worker_run, args=[task_queue, verbose])
proc.start()
processes.append(proc)

for proc in processes:
proc.join()


def worker_run(task_queue: multiprocessing.Queue, verbose=False):
"""Runs tasks in `task_queue` until the queue is emptied."""
while True:
try:
task = task_queue.get_nowait()
except queue.Empty:
return
task.run(verbose=verbose)
run_task = operator.methodcaller("run", verbose=verbose)
with multiprocessing.Pool(n_processes) as pool:
pool.map(run_task, tasks, chunksize=1)

Check warning on line 390 in benchcab/fluxsite.py

View check run for this annotation

Codecov / codecov/patch

benchcab/fluxsite.py#L388-L390

Added lines #L388 - L390 were not covered by tests


def get_fluxsite_comparisons(
Expand Down

0 comments on commit 66043f8

Please sign in to comment.