Skip to content

Commit

Permalink
Use multiprocessing.Pool.map() for parallelism
Browse files Browse the repository at this point in the history
We currently do not exit with non-zero if an uncaught exception occurs
within a multiprocessing child process.

`multiprocessing.Pool.map()` will kill all processes if any one process
throws an exception and then re-raise that exception in the parent
process.

Fixes #122
  • Loading branch information
SeanBryan51 committed Oct 16, 2023
1 parent 058f465 commit 068e0bb
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 55 deletions.
31 changes: 6 additions & 25 deletions benchcab/comparison.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
"""A module containing functions and data structures for running comparison tasks."""

import multiprocessing
import queue
import operator
import sys
from pathlib import Path
from subprocess import CalledProcessError

Expand Down Expand Up @@ -48,6 +49,7 @@ def run(self, verbose=False) -> None:
f"Failure: files {file_a.name} {file_b.name} differ. "
f"Results of diff have been written to {output_file}"
)
sys.stdout.flush()


def run_comparisons(comparison_tasks: list[ComparisonTask], verbose=False) -> None:
Expand All @@ -62,27 +64,6 @@ def run_comparisons_in_parallel(
verbose=False,
) -> None:
"""Runs bitwise comparison tasks in parallel across multiple processes."""
task_queue: multiprocessing.Queue = multiprocessing.Queue()
for task in comparison_tasks:
task_queue.put(task)

processes = []
for _ in range(n_processes):
proc = multiprocessing.Process(
target=worker_comparison, args=[task_queue, verbose]
)
proc.start()
processes.append(proc)

for proc in processes:
proc.join()


def worker_comparison(task_queue: multiprocessing.Queue, verbose=False) -> None:
"""Runs bitwise comparison tasks in `task_queue` until the queue is emptied."""
while True:
try:
task = task_queue.get_nowait()
except queue.Empty:
return
task.run(verbose=verbose)
run_task = operator.methodcaller("run", verbose=verbose)
with multiprocessing.Pool(n_processes) as pool:
pool.map(run_task, comparison_tasks, chunksize=1)

Check warning on line 69 in benchcab/comparison.py

View check run for this annotation

Codecov / codecov/patch

benchcab/comparison.py#L67-L69

Added lines #L67 - L69 were not covered by tests
59 changes: 29 additions & 30 deletions benchcab/fluxsite.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
"""A module containing functions and data structures for running fluxsite tasks."""

import multiprocessing
import queue
import operator
import shutil
import sys
from pathlib import Path
from subprocess import CalledProcessError
from typing import Any, Dict, TypeVar
Expand Down Expand Up @@ -134,7 +135,9 @@ def setup_task(self, verbose=False):

mkdir(
internal.FLUXSITE_DIRS["TASKS"] / self.get_task_name(),
verbose=verbose, parents=True, exist_ok=True
verbose=verbose,
parents=True,
exist_ok=True,
)

self.clean_task(verbose=verbose)
Expand Down Expand Up @@ -204,7 +207,9 @@ def clean_task(self, verbose=False):
if verbose:
print(" Cleaning task")

task_dir = self.root_dir / internal.FLUXSITE_DIRS["TASKS"] / self.get_task_name()
task_dir = (
self.root_dir / internal.FLUXSITE_DIRS["TASKS"] / self.get_task_name()
)

cable_exe = task_dir / internal.CABLE_EXE
if cable_exe.exists():
Expand All @@ -223,12 +228,16 @@ def clean_task(self, verbose=False):
cable_soil_nml.unlink()

output_file = (
self.root_dir / internal.FLUXSITE_DIRS["OUTPUT"] / self.get_output_filename()
self.root_dir
/ internal.FLUXSITE_DIRS["OUTPUT"]
/ self.get_output_filename()
)
if output_file.exists():
output_file.unlink()

log_file = self.root_dir / internal.FLUXSITE_DIRS["LOG"] / self.get_log_filename()
log_file = (
self.root_dir / internal.FLUXSITE_DIRS["LOG"] / self.get_log_filename()
)
if log_file.exists():
log_file.unlink()

Expand All @@ -241,7 +250,9 @@ def fetch_files(self, verbose=False):
- copies contents of 'namelists' directory to 'runs/fluxsite/tasks/<task_name>' directory.
- copies cable executable from source to 'runs/fluxsite/tasks/<task_name>' directory.
"""
task_dir = self.root_dir / internal.FLUXSITE_DIRS["TASKS"] / self.get_task_name()
task_dir = (
self.root_dir / internal.FLUXSITE_DIRS["TASKS"] / self.get_task_name()
)

if verbose:
print(
Expand Down Expand Up @@ -282,7 +293,12 @@ def run(self, verbose=False):
self.run_cable(verbose=verbose)
self.add_provenance_info(verbose=verbose)
except CableError:
return
# Note: here we suppress CABLE specific errors so that `benchcab`
# exits successfully. This then allows us to run bitwise comparisons
# checks on whatever output files were produced without having any
# sort of task dependence between CABLE tasks and comparison tasks.
pass
sys.stdout.flush()

Check warning on line 301 in benchcab/fluxsite.py

View check run for this annotation

Codecov / codecov/patch

benchcab/fluxsite.py#L300-L301

Added lines #L300 - L301 were not covered by tests

def run_cable(self, verbose=False):
"""Run the CABLE executable for the given task.
Expand Down Expand Up @@ -311,7 +327,9 @@ def add_provenance_info(self, verbose=False):
the namelist file used to run cable.
"""
nc_output_path = (
self.root_dir / internal.FLUXSITE_DIRS["OUTPUT"] / self.get_output_filename()
self.root_dir
/ internal.FLUXSITE_DIRS["OUTPUT"]
/ self.get_output_filename()
)
nml = f90nml.read(
self.root_dir
Expand Down Expand Up @@ -369,28 +387,9 @@ def run_tasks_in_parallel(
tasks: list[Task], n_processes=internal.FLUXSITE_DEFAULT_PBS["ncpus"], verbose=False
):
"""Runs tasks in `tasks` in parallel across multiple processes."""
task_queue: multiprocessing.Queue = multiprocessing.Queue()
for task in tasks:
task_queue.put(task)

processes = []
for _ in range(n_processes):
proc = multiprocessing.Process(target=worker_run, args=[task_queue, verbose])
proc.start()
processes.append(proc)

for proc in processes:
proc.join()


def worker_run(task_queue: multiprocessing.Queue, verbose=False):
"""Runs tasks in `task_queue` until the queue is emptied."""
while True:
try:
task = task_queue.get_nowait()
except queue.Empty:
return
task.run(verbose=verbose)
run_task = operator.methodcaller("run", verbose=verbose)
with multiprocessing.Pool(n_processes) as pool:
pool.map(run_task, tasks, chunksize=1)

Check warning on line 392 in benchcab/fluxsite.py

View check run for this annotation

Codecov / codecov/patch

benchcab/fluxsite.py#L390-L392

Added lines #L390 - L392 were not covered by tests


def get_fluxsite_comparisons(
Expand Down

0 comments on commit 068e0bb

Please sign in to comment.