Skip to content

Commit

Permalink
Refactor code and unit tests
Browse files Browse the repository at this point in the history
This change refactors the code to be more object oriented so that we can
better support mocking via dependency injection rather than resorting to
the `unittest.mock.patch` function. This allows us to write unit tests
that are simpler and that preserve the API layer (as opposed to using
`unittest.mock.patch` which breaks the API layer).

Fixes #102
  • Loading branch information
SeanBryan51 committed Jul 12, 2023
1 parent 865da7a commit 73e04f0
Show file tree
Hide file tree
Showing 28 changed files with 1,474 additions and 1,663 deletions.
19 changes: 4 additions & 15 deletions benchcab/bench_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@ def check_config(config: dict):

if any(key not in config for key in internal.CONFIG_REQUIRED_KEYS):
raise ValueError(
"The config file does not list all required entries. "
"Those are: " + ", ".join(internal.CONFIG_REQUIRED_KEYS)
"Keys are missing from the config file: "
+ ", ".join(
key for key in internal.CONFIG_REQUIRED_KEYS if key not in config
)
)

if not isinstance(config["project"], str):
Expand Down Expand Up @@ -104,17 +106,4 @@ def read_config(config_path: str) -> dict:

check_config(config)

for branch in config["realisations"]:
# Add "name" key if not provided and set to base name of "path" key
branch.setdefault("name", Path(branch["path"]).name)
# Add "revision" key if not provided and set to default value -1, i.e. HEAD of branch
branch.setdefault("revision", -1)
# Add "patch" key if not provided and set to default value {}
branch.setdefault("patch", {})
# Add "build_script" key if not provided and set to default value ""
branch.setdefault("build_script", "")

# Add "science_configurations" if not provided and set to default value
config.setdefault("science_configurations", internal.DEFAULT_SCIENCE_CONFIGURATIONS)

return config
224 changes: 152 additions & 72 deletions benchcab/benchcab.py
Original file line number Diff line number Diff line change
@@ -1,99 +1,195 @@
"""Contains the main program entry point for `benchcab`."""

import sys

from benchcab.job_script import submit_job
import os
import grp
from pathlib import Path
from typing import Optional
from subprocess import CalledProcessError

from benchcab import internal
from benchcab.internal import get_met_sites
from benchcab.bench_config import read_config
from benchcab.benchtree import setup_fluxnet_directory_tree, setup_src_dir
from benchcab.build_cable import default_build, custom_build
from benchcab.get_cable import (
checkout_cable,
checkout_cable_auxiliary,
svn_info_show_item,
next_path,
)
from benchcab.internal import (
validate_environment,
get_met_sites,
CWD,
MULTIPROCESS,
SITE_LOG_DIR,
SITE_TASKS_DIR,
SITE_OUTPUT_DIR,
)
from benchcab.repository import CableRepository
from benchcab.task import (
get_fluxnet_tasks,
get_fluxnet_comparisons,
run_tasks,
run_tasks_in_parallel,
run_comparisons,
run_comparisons_in_parallel,
Task,
)
from benchcab.comparison import run_comparisons, run_comparisons_in_parallel
from benchcab.cli import generate_parser
from benchcab.environment_modules import module_load, module_is_loaded
from benchcab.environment_modules import EnvironmentModules, EnvironmentModulesInterface
from benchcab.utils.subprocess import SubprocessWrapper, SubprocessWrapperInterface
from benchcab.utils.pbs import render_job_script
from benchcab.utils.logging import next_path


class Benchcab:
"""A class that represents the `benchcab` application."""

def __init__(self) -> None:
self.args = generate_parser().parse_args(
sys.argv[1:] if sys.argv[1:] else ["-h"]
)
self.config = read_config(self.args.config)
root_dir: Path = internal.CWD
subprocess_handler: SubprocessWrapperInterface = SubprocessWrapper()
modules_handler: EnvironmentModulesInterface = EnvironmentModules()

def __init__(
self,
argv: list[str],
config: Optional[dict] = None,
validate_env: bool = True,
) -> None:
self.args = generate_parser().parse_args(argv[1:] if argv[1:] else ["-h"])
self.config = config if config else read_config(self.args.config)
self.repos = [
CableRepository(**config, repo_id=id)
for id, config in enumerate(self.config["realisations"])
]
self.tasks: list[Task] = [] # initialise fluxnet tasks lazily
validate_environment(
project=self.config["project"], modules=self.config["modules"]

if validate_env:
self._validate_environment(
project=self.config["project"], modules=self.config["modules"]
)

def _validate_environment(self, project: str, modules: list):
"""Performs checks on current user environment"""

if "gadi.nci" not in internal.NODENAME:
print("Error: benchcab is currently implemented only on Gadi")
sys.exit(1)

namelist_dir = Path(internal.CWD / internal.NAMELIST_DIR)
if not namelist_dir.exists():
print(
"Error: cannot find 'namelists' directory in current working directory"
)
sys.exit(1)

required_groups = [project, "ks32", "hh5"]
groups = [grp.getgrgid(gid).gr_name for gid in os.getgroups()]
if not set(required_groups).issubset(groups):
print(
"Error: user does not have the required group permissions.",
"The required groups are:",
", ".join(required_groups),
)
sys.exit(1)

for modname in modules:
if not self.modules_handler.module_is_avail(modname):
print(f"Error: module ({modname}) is not available.")
sys.exit(1)

all_site_ids = set(
internal.MEORG_EXPERIMENTS["five-site-test"]
+ internal.MEORG_EXPERIMENTS["forty-two-site-test"]
)
for site_id in all_site_ids:
paths = list(internal.MET_DIR.glob(f"{site_id}*"))
if not paths:
print(
f"Error: failed to infer met file for site id '{site_id}' in "
f"{internal.MET_DIR}."
)
sys.exit(1)
if len(paths) > 1:
print(
f"Error: multiple paths infered for site id: '{site_id}' in {internal.MET_DIR}."
)
sys.exit(1)

def _initialise_tasks(self) -> list[Task]:
"""A helper method that initialises and returns the `tasks` attribute."""
self.tasks = get_fluxnet_tasks(
realisations=self.config["realisations"],
science_configurations=self.config["science_configurations"],
repos=self.repos,
science_configurations=self.config.get(
"science_configurations", internal.DEFAULT_SCIENCE_CONFIGURATIONS
),
met_sites=get_met_sites(self.config["experiment"]),
)
return self.tasks

# TODO(Sean) this method should be the endpoint for the `fluxnet-submit-job`
# command line argument.
def fluxnet_submit_job(self) -> None:
"""Submits the PBS job script step in the fluxnet test workflow."""

job_script_path = self.root_dir / internal.QSUB_FNAME
print(
"Creating PBS job script to run FLUXNET tasks on compute "
f"nodes: {job_script_path.relative_to(self.root_dir)}"
)
with job_script_path.open("w", encoding="utf-8") as file:
contents = render_job_script(
project=self.config["project"],
config_path=self.args.config,
modules=self.config["modules"],
storage_flags=[], # TODO(Sean) add storage flags option to config
verbose=self.args.verbose,
skip_bitwise_cmp="fluxnet-bitwise-cmp" in self.args.skip,
)
file.write(contents)

try:
proc = self.subprocess_handler.run_cmd(
f"qsub {job_script_path}",
capture_output=True,
verbose=self.args.verbose,
)
except CalledProcessError as exc:
print("Error when submitting job to NCI queue")
print(exc.output)
raise

print(
f"PBS job submitted: {proc.stdout.strip()}\n"
"The CABLE log file for each task is written to "
f"{internal.SITE_LOG_DIR}/<task_name>_log.txt\n"
"The CABLE standard output for each task is written to "
f"{internal.SITE_TASKS_DIR}/<task_name>/out.txt\n"
"The NetCDF output for each task is written to "
f"{internal.SITE_OUTPUT_DIR}/<task_name>_out.nc"
)

def checkout(self):
"""Endpoint for `benchcab checkout`."""

setup_src_dir()

print("Checking out repositories...")
rev_number_log = ""
for branch in self.config["realisations"]:
path_to_repo = checkout_cable(branch, verbose=self.args.verbose)
for repo in self.repos:
repo.checkout(verbose=self.args.verbose)
rev_number_log += (
f"{branch['name']} last changed revision: "
f"{svn_info_show_item(path_to_repo, 'last-changed-revision')}\n"
f"{repo.name} last changed revision: "
f"{repo.svn_info_show_item('last-changed-revision')}\n"
)

# TODO(Sean) we should archive revision numbers for CABLE-AUX
checkout_cable_auxiliary(self.args.verbose)
cable_aux_repo = CableRepository(path=internal.CABLE_AUX_RELATIVE_SVN_PATH)
cable_aux_repo.checkout(verbose=self.args.verbose)

rev_number_log_path = CWD / next_path("rev_number-*.log")
print(f"Writing revision number info to {rev_number_log_path.relative_to(CWD)}")
rev_number_log_path = self.root_dir / next_path(
self.root_dir, "rev_number-*.log"
)
print(
f"Writing revision number info to {rev_number_log_path.relative_to(self.root_dir)}"
)
with open(rev_number_log_path, "w", encoding="utf-8") as file:
file.write(rev_number_log)

print("")

def build(self):
"""Endpoint for `benchcab build`."""
for branch in self.config["realisations"]:
if branch["build_script"]:
custom_build(
branch["build_script"], branch["name"], verbose=self.args.verbose
)
for repo in self.repos:
if repo.build_script:
repo.custom_build(verbose=self.args.verbose)
else:
default_build(
branch["name"],
self.config["modules"],
verbose=self.args.verbose,
)
print(f"Successfully compiled CABLE for realisation {branch['name']}")
repo.build(modules=self.config["modules"], verbose=self.args.verbose)
print(f"Successfully compiled CABLE for realisation {repo.name}")
print("")

def fluxnet_setup_work_directory(self):
Expand All @@ -111,7 +207,7 @@ def fluxnet_run_tasks(self):
"""Endpoint for `benchcab fluxnet-run-tasks`."""
tasks = self.tasks if self.tasks else self._initialise_tasks()
print("Running FLUXNET tasks...")
if MULTIPROCESS:
if internal.MULTIPROCESS:
run_tasks_in_parallel(tasks, verbose=self.args.verbose)
else:
run_tasks(tasks, verbose=self.args.verbose)
Expand All @@ -121,14 +217,16 @@ def fluxnet_run_tasks(self):
def fluxnet_bitwise_cmp(self):
"""Endpoint for `benchcab fluxnet-bitwise-cmp`."""

if not module_is_loaded("nccmp"):
module_load("nccmp") # use `nccmp -df` for bitwise comparisons
if not self.modules_handler.module_is_loaded("nccmp"):
self.modules_handler.module_load(
"nccmp"
) # use `nccmp -df` for bitwise comparisons

tasks = self.tasks if self.tasks else self._initialise_tasks()
comparisons = get_fluxnet_comparisons(tasks)

print("Running comparison tasks...")
if MULTIPROCESS:
if internal.MULTIPROCESS:
run_comparisons_in_parallel(comparisons, verbose=self.args.verbose)
else:
run_comparisons(comparisons, verbose=self.args.verbose)
Expand All @@ -144,25 +242,7 @@ def fluxnet(self):
if "fluxnet-bitwise-cmp" not in self.args.skip:
self.fluxnet_bitwise_cmp()
else:
submit_job(
project=self.config["project"],
config_path=self.args.config,
modules=self.config["modules"],
verbose=self.args.verbose,
skip_bitwise_cmp="fluxnet-bitwise-cmp" in self.args.skip,
)
print(
"The CABLE log file for each task is written to "
f"{SITE_LOG_DIR}/<task_name>_log.txt"
)
print(
"The CABLE standard output for each task is written to "
f"{SITE_TASKS_DIR}/<task_name>/out.txt"
)
print(
"The NetCDF output for each task is written to "
f"{SITE_OUTPUT_DIR}/<task_name>_out.nc"
)
self.fluxnet_submit_job()

def spatial(self):
"""Endpoint for `benchcab spatial`."""
Expand Down Expand Up @@ -206,7 +286,7 @@ def main():
This is required for setup.py entry_points
"""

app = Benchcab()
app = Benchcab(argv=sys.argv)
app.main()


Expand Down
Loading

0 comments on commit 73e04f0

Please sign in to comment.