Skip to content

Commit

Permalink
Add parse and dispatch strategy
Browse files Browse the repository at this point in the history
The current method of executing subcommands can be improved using a
"parse and dispatch" strategy. Whereby you don't need to run a
conditional to work out which subcommand to use.

This change refactors the Benchcab class so that its "public" methods
can be dispatched easily when inspecting the argparse.Namespace object
returned by the command line parser.

Fixes #196
  • Loading branch information
SeanBryan51 committed Nov 2, 2023
1 parent 50dd9b5 commit e9fcaec
Show file tree
Hide file tree
Showing 5 changed files with 182 additions and 144 deletions.
208 changes: 94 additions & 114 deletions benchcab/benchcab.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
"""Contains the main program entry point for `benchcab`."""

import argparse
import functools
import grp
import os
import shutil
import sys
from pathlib import Path
from subprocess import CalledProcessError
from typing import Optional

from benchcab import internal
from benchcab.cli import generate_parser
from benchcab.comparison import run_comparisons, run_comparisons_in_parallel
from benchcab.config import read_config
from benchcab.environment_modules import EnvironmentModules, EnvironmentModulesInterface
Expand Down Expand Up @@ -37,27 +37,19 @@ class Benchcab:

def __init__(
self,
argv: list[str],
benchcab_exe_path: Optional[Path],
config: Optional[dict] = None,
validate_env: bool = True,
) -> None:
self.args = generate_parser().parse_args(argv[1:] if argv[1:] else ["-h"])
self.config = config if config else read_config(Path(self.args.config))
self.repos = [
CableRepository(**config, repo_id=id)
for id, config in enumerate(self.config["realisations"])
]
self.tasks: list[Task] = [] # initialise fluxsite tasks lazily
self.benchcab_exe_path = benchcab_exe_path
self.validate_env = validate_env

if validate_env:
self._validate_environment(
project=self.config["project"], modules=self.config["modules"]
)
self.tasks: list[Task] = [] # initialise fluxsite tasks lazily

def _validate_environment(self, project: str, modules: list):
"""Performs checks on current user environment."""
if not self.validate_env:
return

Check warning on line 51 in benchcab/benchcab.py

View check run for this annotation

Codecov / codecov/patch

benchcab/benchcab.py#L50-L51

Added lines #L50 - L51 were not covered by tests

if "gadi.nci" not in internal.NODENAME:
print("Error: benchcab is currently implemented only on Gadi")
sys.exit(1)
Expand Down Expand Up @@ -102,21 +94,40 @@ def _validate_environment(self, project: str, modules: list):
)
sys.exit(1)

def _initialise_tasks(self) -> list[Task]:
@functools.cache
def _get_config(self, config_path) -> dict:
return read_config(config_path)

Check warning on line 99 in benchcab/benchcab.py

View check run for this annotation

Codecov / codecov/patch

benchcab/benchcab.py#L99

Added line #L99 was not covered by tests

@functools.cache
def _get_repos(self, config: dict) -> list[CableRepository]:
return [

Check warning on line 103 in benchcab/benchcab.py

View check run for this annotation

Codecov / codecov/patch

benchcab/benchcab.py#L103

Added line #L103 was not covered by tests
CableRepository(**repo_config, repo_id=id)
for id, repo_config in enumerate(config["realisations"])
]

def _initialise_tasks(self, config: dict) -> list[Task]:
"""A helper method that initialises and returns the `tasks` attribute."""
repos = [

Check warning on line 110 in benchcab/benchcab.py

View check run for this annotation

Codecov / codecov/patch

benchcab/benchcab.py#L110

Added line #L110 was not covered by tests
CableRepository(**config, repo_id=id)
for id, config in enumerate(config["realisations"])
]
self.tasks = get_fluxsite_tasks(
repos=self.repos,
science_configurations=self.config.get(
repos=repos,
science_configurations=config.get(
"science_configurations", internal.DEFAULT_SCIENCE_CONFIGURATIONS
),
fluxsite_forcing_file_names=get_met_forcing_file_names(
self.config["experiment"]
config["experiment"]
),
)
return self.tasks

def fluxsite_submit_job(self) -> None:
def fluxsite_submit_job(
self, config_path: str, verbose: bool, skip: list[str]
) -> None:
"""Submits the PBS job script step in the fluxsite test workflow."""
config = self._get_config(config_path)
self._validate_environment(project=config["project"], modules=config["modules"])

Check warning on line 130 in benchcab/benchcab.py

View check run for this annotation

Codecov / codecov/patch

benchcab/benchcab.py#L129-L130

Added lines #L129 - L130 were not covered by tests
if self.benchcab_exe_path is None:
msg = "Path to benchcab executable is undefined."
raise RuntimeError(msg)
Expand All @@ -128,21 +139,21 @@ def fluxsite_submit_job(self) -> None:
)
with job_script_path.open("w", encoding="utf-8") as file:
contents = render_job_script(
project=self.config["project"],
config_path=self.args.config,
modules=self.config["modules"],
verbose=self.args.verbose,
skip_bitwise_cmp="fluxsite-bitwise-cmp" in self.args.skip,
project=config["project"],
config_path=config_path,
modules=config["modules"],
verbose=verbose,
skip_bitwise_cmp="fluxsite-bitwise-cmp" in skip,
benchcab_path=str(self.benchcab_exe_path),
pbs_config=self.config.get("fluxsite", {}).get("pbs"),
pbs_config=config.get("fluxsite", {}).get("pbs"),
)
file.write(contents)

try:
proc = self.subprocess_handler.run_cmd(
f"qsub {job_script_path}",
capture_output=True,
verbose=self.args.verbose,
verbose=verbose,
)
except CalledProcessError as exc:
print("Error when submitting job to NCI queue")
Expand All @@ -159,22 +170,25 @@ def fluxsite_submit_job(self) -> None:
f"{internal.FLUXSITE_DIRS['OUTPUT']}/<task_name>_out.nc"
)

def checkout(self):
def checkout(self, config_path: str, verbose: bool):
"""Endpoint for `benchcab checkout`."""
config = self._get_config(config_path)
self._validate_environment(project=config["project"], modules=config["modules"])

Check warning on line 176 in benchcab/benchcab.py

View check run for this annotation

Codecov / codecov/patch

benchcab/benchcab.py#L175-L176

Added lines #L175 - L176 were not covered by tests

mkdir(internal.SRC_DIR, exist_ok=True, verbose=True)

print("Checking out repositories...")
rev_number_log = ""
for repo in self.repos:
repo.checkout(verbose=self.args.verbose)
for repo in self._get_repos(config):
repo.checkout(verbose=verbose)

Check warning on line 183 in benchcab/benchcab.py

View check run for this annotation

Codecov / codecov/patch

benchcab/benchcab.py#L182-L183

Added lines #L182 - L183 were not covered by tests
rev_number_log += (
f"{repo.name} last changed revision: "
f"{repo.svn_info_show_item('last-changed-revision')}\n"
)

# TODO(Sean) we should archive revision numbers for CABLE-AUX
cable_aux_repo = CableRepository(path=internal.CABLE_AUX_RELATIVE_SVN_PATH)
cable_aux_repo.checkout(verbose=self.args.verbose)
cable_aux_repo.checkout(verbose=verbose)

Check warning on line 191 in benchcab/benchcab.py

View check run for this annotation

Codecov / codecov/patch

benchcab/benchcab.py#L191

Added line #L191 was not covered by tests

rev_number_log_path = self.root_dir / next_path(
self.root_dir, "rev_number-*.log"
Expand All @@ -187,142 +201,108 @@ def checkout(self):

print("")

def build(self):
def build(self, config_path: str, verbose: bool):
"""Endpoint for `benchcab build`."""
for repo in self.repos:
config = self._get_config(config_path)
self._validate_environment(project=config["project"], modules=config["modules"])

Check warning on line 207 in benchcab/benchcab.py

View check run for this annotation

Codecov / codecov/patch

benchcab/benchcab.py#L206-L207

Added lines #L206 - L207 were not covered by tests

for repo in self._get_repos():

Check warning on line 209 in benchcab/benchcab.py

View check run for this annotation

Codecov / codecov/patch

benchcab/benchcab.py#L209

Added line #L209 was not covered by tests
if repo.build_script:
print(
"Compiling CABLE using custom build script for "
f"realisation {repo.name}..."
)
repo.custom_build(
modules=self.config["modules"], verbose=self.args.verbose
)
repo.custom_build(modules=config["modules"], verbose=verbose)

Check warning on line 215 in benchcab/benchcab.py

View check run for this annotation

Codecov / codecov/patch

benchcab/benchcab.py#L215

Added line #L215 was not covered by tests
else:
build_mode = "with MPI" if internal.MPI else "serially"
print(f"Compiling CABLE {build_mode} for realisation {repo.name}...")
repo.pre_build(verbose=self.args.verbose)
repo.run_build(
modules=self.config["modules"], verbose=self.args.verbose
)
repo.post_build(verbose=self.args.verbose)
repo.pre_build(verbose=verbose)
repo.run_build(modules=config["modules"], verbose=verbose)
repo.post_build(verbose=verbose)

Check warning on line 221 in benchcab/benchcab.py

View check run for this annotation

Codecov / codecov/patch

benchcab/benchcab.py#L219-L221

Added lines #L219 - L221 were not covered by tests
print(f"Successfully compiled CABLE for realisation {repo.name}")
print("")

def fluxsite_setup_work_directory(self):
def fluxsite_setup_work_directory(self, config_path: str, verbose: bool):
"""Endpoint for `benchcab fluxsite-setup-work-dir`."""
tasks = self.tasks if self.tasks else self._initialise_tasks()
config = self._get_config(config_path)
self._validate_environment(project=config["project"], modules=config["modules"])

Check warning on line 228 in benchcab/benchcab.py

View check run for this annotation

Codecov / codecov/patch

benchcab/benchcab.py#L227-L228

Added lines #L227 - L228 were not covered by tests

tasks = self.tasks if self.tasks else self._initialise_tasks(config)

Check warning on line 230 in benchcab/benchcab.py

View check run for this annotation

Codecov / codecov/patch

benchcab/benchcab.py#L230

Added line #L230 was not covered by tests
print("Setting up run directory tree for fluxsite tests...")
setup_fluxsite_directory_tree(verbose=self.args.verbose)
setup_fluxsite_directory_tree(verbose=verbose)

Check warning on line 232 in benchcab/benchcab.py

View check run for this annotation

Codecov / codecov/patch

benchcab/benchcab.py#L232

Added line #L232 was not covered by tests
print("Setting up tasks...")
for task in tasks:
task.setup_task(verbose=self.args.verbose)
task.setup_task(verbose=verbose)

Check warning on line 235 in benchcab/benchcab.py

View check run for this annotation

Codecov / codecov/patch

benchcab/benchcab.py#L235

Added line #L235 was not covered by tests
print("Successfully setup fluxsite tasks")
print("")

def fluxsite_run_tasks(self):
def fluxsite_run_tasks(self, config_path: str, verbose: bool):
"""Endpoint for `benchcab fluxsite-run-tasks`."""
tasks = self.tasks if self.tasks else self._initialise_tasks()
config = self._get_config(config_path)
self._validate_environment(project=config["project"], modules=config["modules"])

Check warning on line 242 in benchcab/benchcab.py

View check run for this annotation

Codecov / codecov/patch

benchcab/benchcab.py#L241-L242

Added lines #L241 - L242 were not covered by tests

tasks = self.tasks if self.tasks else self._initialise_tasks(config)

Check warning on line 244 in benchcab/benchcab.py

View check run for this annotation

Codecov / codecov/patch

benchcab/benchcab.py#L244

Added line #L244 was not covered by tests
print("Running fluxsite tasks...")
try:
multiprocess = self.config["fluxsite"]["multiprocess"]
multiprocess = config["fluxsite"]["multiprocess"]

Check warning on line 247 in benchcab/benchcab.py

View check run for this annotation

Codecov / codecov/patch

benchcab/benchcab.py#L247

Added line #L247 was not covered by tests
except KeyError:
multiprocess = internal.FLUXSITE_DEFAULT_MULTIPROCESS
if multiprocess:
ncpus = self.config.get("pbs", {}).get(
ncpus = config.get("pbs", {}).get(

Check warning on line 251 in benchcab/benchcab.py

View check run for this annotation

Codecov / codecov/patch

benchcab/benchcab.py#L251

Added line #L251 was not covered by tests
"ncpus", internal.FLUXSITE_DEFAULT_PBS["ncpus"]
)
run_tasks_in_parallel(tasks, n_processes=ncpus, verbose=self.args.verbose)
run_tasks_in_parallel(tasks, n_processes=ncpus, verbose=verbose)

Check warning on line 254 in benchcab/benchcab.py

View check run for this annotation

Codecov / codecov/patch

benchcab/benchcab.py#L254

Added line #L254 was not covered by tests
else:
run_tasks(tasks, verbose=self.args.verbose)
run_tasks(tasks, verbose=verbose)

Check warning on line 256 in benchcab/benchcab.py

View check run for this annotation

Codecov / codecov/patch

benchcab/benchcab.py#L256

Added line #L256 was not covered by tests
print("Successfully ran fluxsite tasks")
print("")

def fluxsite_bitwise_cmp(self):
def fluxsite_bitwise_cmp(self, config_path: str, verbose: bool):
"""Endpoint for `benchcab fluxsite-bitwise-cmp`."""
config = self._get_config(config_path)
self._validate_environment(project=config["project"], modules=config["modules"])

Check warning on line 263 in benchcab/benchcab.py

View check run for this annotation

Codecov / codecov/patch

benchcab/benchcab.py#L262-L263

Added lines #L262 - L263 were not covered by tests

if not self.modules_handler.module_is_loaded("nccmp/1.8.5.0"):
self.modules_handler.module_load(
"nccmp/1.8.5.0"
) # use `nccmp -df` for bitwise comparisons

tasks = self.tasks if self.tasks else self._initialise_tasks()
tasks = self.tasks if self.tasks else self._initialise_tasks(config)

Check warning on line 270 in benchcab/benchcab.py

View check run for this annotation

Codecov / codecov/patch

benchcab/benchcab.py#L270

Added line #L270 was not covered by tests
comparisons = get_fluxsite_comparisons(tasks)

print("Running comparison tasks...")
try:
multiprocess = self.config["fluxsite"]["multiprocess"]
multiprocess = config["fluxsite"]["multiprocess"]

Check warning on line 275 in benchcab/benchcab.py

View check run for this annotation

Codecov / codecov/patch

benchcab/benchcab.py#L275

Added line #L275 was not covered by tests
except KeyError:
multiprocess = internal.FLUXSITE_DEFAULT_MULTIPROCESS
if multiprocess:
try:
ncpus = self.config["fluxsite"]["pbs"]["ncpus"]
ncpus = config["fluxsite"]["pbs"]["ncpus"]

Check warning on line 280 in benchcab/benchcab.py

View check run for this annotation

Codecov / codecov/patch

benchcab/benchcab.py#L280

Added line #L280 was not covered by tests
except KeyError:
ncpus = internal.FLUXSITE_DEFAULT_PBS["ncpus"]
run_comparisons_in_parallel(
comparisons, n_processes=ncpus, verbose=self.args.verbose
)
run_comparisons_in_parallel(comparisons, n_processes=ncpus, verbose=verbose)

Check warning on line 283 in benchcab/benchcab.py

View check run for this annotation

Codecov / codecov/patch

benchcab/benchcab.py#L283

Added line #L283 was not covered by tests
else:
run_comparisons(comparisons, verbose=self.args.verbose)
run_comparisons(comparisons, verbose=verbose)

Check warning on line 285 in benchcab/benchcab.py

View check run for this annotation

Codecov / codecov/patch

benchcab/benchcab.py#L285

Added line #L285 was not covered by tests
print("Successfully ran comparison tasks")

def fluxsite(self):
def fluxsite(
self, config_path: str, no_submit: bool, verbose: bool, skip: list[str]
):
"""Endpoint for `benchcab fluxsite`."""
self.checkout()
self.build()
self.fluxsite_setup_work_directory()
if self.args.no_submit:
self.fluxsite_run_tasks()
if "fluxsite-bitwise-cmp" not in self.args.skip:
self.fluxsite_bitwise_cmp()
self.checkout(config_path, verbose)
self.build(config_path, verbose)
self.fluxsite_setup_work_directory(config_path, verbose)
if no_submit:
self.fluxsite_run_tasks(config_path, verbose)
if "fluxsite-bitwise-cmp" not in skip:
self.fluxsite_bitwise_cmp(config_path, verbose)

Check warning on line 298 in benchcab/benchcab.py

View check run for this annotation

Codecov / codecov/patch

benchcab/benchcab.py#L292-L298

Added lines #L292 - L298 were not covered by tests
else:
self.fluxsite_submit_job()
self.fluxsite_submit_job(config_path, verbose, skip)

Check warning on line 300 in benchcab/benchcab.py

View check run for this annotation

Codecov / codecov/patch

benchcab/benchcab.py#L300

Added line #L300 was not covered by tests

def spatial(self):
def spatial(self, config_path: str, verbose: bool):
"""Endpoint for `benchcab spatial`."""

def run(self):
def run(self, config_path: str, no_submit: bool, verbose: bool, skip: list[str]):
"""Endpoint for `benchcab run`."""
self.fluxsite()
self.spatial()

def main(self):
"""Main function for `benchcab`."""
if self.args.subcommand == "run":
self.run()

if self.args.subcommand == "checkout":
self.checkout()

if self.args.subcommand == "build":
self.build()

if self.args.subcommand == "fluxsite":
self.fluxsite()

if self.args.subcommand == "fluxsite-setup-work-dir":
self.fluxsite_setup_work_directory()

if self.args.subcommand == "fluxsite-submit-job":
self.fluxsite_submit_job()

if self.args.subcommand == "fluxsite-run-tasks":
self.fluxsite_run_tasks()

if self.args.subcommand == "fluxsite-bitwise-cmp":
self.fluxsite_bitwise_cmp()

if self.args.subcommand == "spatial":
self.spatial()


def main():
"""Main program entry point for `benchcab`.
This is required for setup.py entry_points
"""
app = Benchcab(argv=sys.argv, benchcab_exe_path=shutil.which(sys.argv[0]))
app.main()


if __name__ == "__main__":
main()
self.fluxsite(config_path, no_submit, verbose, skip)
self.spatial(config_path, verbose)

Check warning on line 308 in benchcab/benchcab.py

View check run for this annotation

Codecov / codecov/patch

benchcab/benchcab.py#L307-L308

Added lines #L307 - L308 were not covered by tests
Loading

0 comments on commit e9fcaec

Please sign in to comment.