Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use multiprocessing.Pool.map() for parallelism #182

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 6 additions & 25 deletions benchcab/comparison.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
"""A module containing functions and data structures for running comparison tasks."""

import multiprocessing
import queue
import operator
import sys
from pathlib import Path
from subprocess import CalledProcessError

Expand Down Expand Up @@ -48,6 +49,7 @@
f"Failure: files {file_a.name} {file_b.name} differ. "
f"Results of diff have been written to {output_file}"
)
sys.stdout.flush()


def run_comparisons(comparison_tasks: list[ComparisonTask], verbose=False) -> None:
Expand All @@ -62,27 +64,6 @@
verbose=False,
) -> None:
"""Runs bitwise comparison tasks in parallel across multiple processes."""
task_queue: multiprocessing.Queue = multiprocessing.Queue()
for task in comparison_tasks:
task_queue.put(task)

processes = []
for _ in range(n_processes):
proc = multiprocessing.Process(
target=worker_comparison, args=[task_queue, verbose]
)
proc.start()
processes.append(proc)

for proc in processes:
proc.join()


def worker_comparison(task_queue: multiprocessing.Queue, verbose=False) -> None:
"""Runs bitwise comparison tasks in `task_queue` until the queue is emptied."""
while True:
try:
task = task_queue.get_nowait()
except queue.Empty:
return
task.run(verbose=verbose)
run_task = operator.methodcaller("run", verbose=verbose)
with multiprocessing.Pool(n_processes) as pool:
pool.map(run_task, comparison_tasks, chunksize=1)

Check warning on line 69 in benchcab/comparison.py

View check run for this annotation

Codecov / codecov/patch

benchcab/comparison.py#L67-L69

Added lines #L67 - L69 were not covered by tests
59 changes: 29 additions & 30 deletions benchcab/fluxsite.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
"""A module containing functions and data structures for running fluxsite tasks."""

import multiprocessing
import queue
import operator
import shutil
import sys
from pathlib import Path
from subprocess import CalledProcessError
from typing import Any, Dict, TypeVar
Expand Down Expand Up @@ -134,7 +135,9 @@

mkdir(
internal.FLUXSITE_DIRS["TASKS"] / self.get_task_name(),
verbose=verbose, parents=True, exist_ok=True
verbose=verbose,
parents=True,
exist_ok=True,
)

self.clean_task(verbose=verbose)
Expand Down Expand Up @@ -204,7 +207,9 @@
if verbose:
print(" Cleaning task")

task_dir = self.root_dir / internal.FLUXSITE_DIRS["TASKS"] / self.get_task_name()
task_dir = (
self.root_dir / internal.FLUXSITE_DIRS["TASKS"] / self.get_task_name()
)

cable_exe = task_dir / internal.CABLE_EXE
if cable_exe.exists():
Expand All @@ -223,12 +228,16 @@
cable_soil_nml.unlink()

output_file = (
self.root_dir / internal.FLUXSITE_DIRS["OUTPUT"] / self.get_output_filename()
self.root_dir
/ internal.FLUXSITE_DIRS["OUTPUT"]
/ self.get_output_filename()
)
if output_file.exists():
output_file.unlink()

log_file = self.root_dir / internal.FLUXSITE_DIRS["LOG"] / self.get_log_filename()
log_file = (
self.root_dir / internal.FLUXSITE_DIRS["LOG"] / self.get_log_filename()
)
if log_file.exists():
log_file.unlink()

Expand All @@ -241,7 +250,9 @@
- copies contents of 'namelists' directory to 'runs/fluxsite/tasks/<task_name>' directory.
- copies cable executable from source to 'runs/fluxsite/tasks/<task_name>' directory.
"""
task_dir = self.root_dir / internal.FLUXSITE_DIRS["TASKS"] / self.get_task_name()
task_dir = (
self.root_dir / internal.FLUXSITE_DIRS["TASKS"] / self.get_task_name()
)

if verbose:
print(
Expand Down Expand Up @@ -282,7 +293,12 @@
self.run_cable(verbose=verbose)
self.add_provenance_info(verbose=verbose)
except CableError:
return
# Note: here we suppress CABLE specific errors so that `benchcab`
# exits successfully. This then allows us to run bitwise comparisons
# checks on whatever output files were produced without having any
# sort of task dependence between CABLE tasks and comparison tasks.
pass
sys.stdout.flush()

Check warning on line 301 in benchcab/fluxsite.py

View check run for this annotation

Codecov / codecov/patch

benchcab/fluxsite.py#L300-L301

Added lines #L300 - L301 were not covered by tests

def run_cable(self, verbose=False):
"""Run the CABLE executable for the given task.
Expand Down Expand Up @@ -311,7 +327,9 @@
the namelist file used to run cable.
"""
nc_output_path = (
self.root_dir / internal.FLUXSITE_DIRS["OUTPUT"] / self.get_output_filename()
self.root_dir
/ internal.FLUXSITE_DIRS["OUTPUT"]
/ self.get_output_filename()
)
nml = f90nml.read(
self.root_dir
Expand Down Expand Up @@ -369,28 +387,9 @@
tasks: list[Task], n_processes=internal.FLUXSITE_DEFAULT_PBS["ncpus"], verbose=False
):
"""Runs tasks in `tasks` in parallel across multiple processes."""
task_queue: multiprocessing.Queue = multiprocessing.Queue()
for task in tasks:
task_queue.put(task)

processes = []
for _ in range(n_processes):
proc = multiprocessing.Process(target=worker_run, args=[task_queue, verbose])
proc.start()
processes.append(proc)

for proc in processes:
proc.join()


def worker_run(task_queue: multiprocessing.Queue, verbose=False):
"""Runs tasks in `task_queue` until the queue is emptied."""
while True:
try:
task = task_queue.get_nowait()
except queue.Empty:
return
task.run(verbose=verbose)
run_task = operator.methodcaller("run", verbose=verbose)
with multiprocessing.Pool(n_processes) as pool:
pool.map(run_task, tasks, chunksize=1)

Check warning on line 392 in benchcab/fluxsite.py

View check run for this annotation

Codecov / codecov/patch

benchcab/fluxsite.py#L390-L392

Added lines #L390 - L392 were not covered by tests


def get_fluxsite_comparisons(
Expand Down