Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

300 add connection to meorg client to successful run #312

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .conda/benchcab-dev.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ dependencies:
- cerberus>=1.3.5
- gitpython
- jinja2
- hpcpy>=0.3.0
- meorg_client
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New deps. We'll need to get these added to the env.

# CI
- pytest-cov
# Dev Dependencies
Expand Down
27 changes: 19 additions & 8 deletions src/benchcab/benchcab.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@
from benchcab.environment_modules import EnvironmentModules, EnvironmentModulesInterface
from benchcab.internal import get_met_forcing_file_names
from benchcab.model import Model
from benchcab.utils import is_verbose
from benchcab.utils import is_verbose, task_summary
from benchcab.utils.fs import mkdir, next_path
import benchcab.utils.meorg as bm
from benchcab.utils.pbs import render_job_script
from benchcab.utils.repo import create_repo
from benchcab.utils.subprocess import SubprocessWrapper, SubprocessWrapperInterface
Expand Down Expand Up @@ -234,13 +235,25 @@ def fluxsite_submit_job(self, config_path: str, skip: list[str]) -> None:
logger.error(exc.output)
raise

logger.info(f"PBS job submitted: {proc.stdout.strip()}")
# Get the job ID
job_id = proc.stdout.strip()
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to capture the job id to hand to hpcpy


logger.info(f"PBS job submitted: {job_id}")
logger.info("CABLE log file for each task is written to:")
logger.info(f"{internal.FLUXSITE_DIRS['LOG']}/<task_name>_log.txt")
logger.info("The CABLE standard output for each task is written to:")
logger.info(f"{internal.FLUXSITE_DIRS['TASKS']}/<task_name>/out.txt")
logger.info("The NetCDF output for each task is written to:")
logger.info(f"{internal.FLUXSITE_DIRS['OUTPUT']}/<task_name>_out.nc")

# Upload to meorg by default
bm.do_meorg(
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Blind function to submit the upload job. Graph dependency is handled by PBS so benchcab can conveniently forget about this once submitted.

config,
upload_dir=internal.FLUXSITE_DIRS['OUTPUT'],
benchcab_bin=str(self.benchcab_exe_path),
benchcab_job_id=job_id
)


def gen_codecov(self, config_path: str):
"""Endpoint for `benchcab codecov`."""
Expand Down Expand Up @@ -347,8 +360,7 @@ def fluxsite_run_tasks(self, config_path: str):
else:
fluxsite.run_tasks(tasks)

tasks_failed = [task for task in tasks if not task.is_done()]
n_failed, n_success = len(tasks_failed), len(tasks) - len(tasks_failed)
n_tasks, n_success, n_failed, all_complete = task_summary(tasks)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

task_summary was a helper function developed for this, before we started using hpcpy to handle the downstream submission. Still useful.

logger.info(f"{n_failed} failed, {n_success} passed")

def fluxsite_bitwise_cmp(self, config_path: str):
Expand All @@ -374,10 +386,9 @@ def fluxsite_bitwise_cmp(self, config_path: str):
ncpus = config["fluxsite"]["pbs"]["ncpus"]
run_comparisons_in_parallel(comparisons, n_processes=ncpus)
else:
run_comparisons(comparisons)

tasks_failed = [task for task in comparisons if not task.is_done()]
n_failed, n_success = len(tasks_failed), len(comparisons) - len(tasks_failed)
run_comparisons(comparisons)

n_tasks, n_success, n_failed, all_complete = task_summary(comparisons)
logger.info(f"{n_failed} failed, {n_success} passed")

def fluxsite(self, config_path: str, no_submit: bool, skip: list[str]):
Expand Down
3 changes: 3 additions & 0 deletions src/benchcab/data/config-schema.yml
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,9 @@ fluxsite:
schema:
type: "string"
required: false
meorg_model_output_id:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New schema keys, optional model_output_id

type: "string"
required: false

spatial:
type: "dict"
Expand Down
43 changes: 43 additions & 0 deletions src/benchcab/data/meorg_jobscript.j2
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
#!/bin/bash
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New job script for submitting the upload job. Nothing special.

#PBS -l wd
#PBS -l ncpus={{num_threads}}
#PBS -l mem={{mem}}
#PBS -l walltime={{walltime}}
#PBS -q copyq
#PBS -P {{project}}
#PBS -j oe
#PBS -m e
#PBS -l storage={{storage_str}}

module purge
{% for module in modules -%}
module load {{module}}
{% endfor %}
set -ev

# Set some things
DATA_DIR={{data_dir}}
NUM_THREADS={{num_threads}}
MODEL_OUTPUT_ID={{model_output_id}}
CACHE_DELAY={{cache_delay}}
MEORG_BIN={{meorg_bin}}

{% if purge_outputs %}
# Purge existing model outputs
echo "Purging existing outputs from $MODEL_OUTPUT_ID"
$MEORG_BIN file detach_all $MODEL_OUTPUT_ID
{% endif %}

# Upload the data
echo "Uploading data to $MODEL_OUTPUT_ID"
$MEORG_BIN file upload $DATA_DIR/*.nc -n $NUM_THREADS --attach_to $MODEL_OUTPUT_ID

# Wait for the cache to transfer to the object store.
echo "Waiting for object store transfer ($CACHE_DELAY sec)"
sleep $CACHE_DELAY

# Trigger the analysis
echo "Triggering analysis on $MODEL_OUTPUT_ID"
$MEORG_BIN analysis start $MODEL_OUTPUT_ID

echo "DONE"
52 changes: 52 additions & 0 deletions src/benchcab/data/test/integration_meorg.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
#!/bin/bash
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Integration script variant with the optional model_output_id key added.


set -ex

CABLE_REPO="[email protected]:CABLE-LSM/CABLE.git"
CABLE_DIR=/scratch/$PROJECT/$USER/benchcab/CABLE

TEST_DIR=/scratch/$PROJECT/$USER/benchcab/integration
EXAMPLE_REPO="[email protected]:CABLE-LSM/bench_example.git"

# Remove CABLE and test work space, then recreate
rm -rf $CABLE_DIR
mkdir -p $CABLE_DIR

rm -rf $TEST_DIR
mkdir -p $TEST_DIR

# Clone local checkout for CABLE
git clone $CABLE_REPO $CABLE_DIR
cd $CABLE_DIR

# Clone the example repo
git clone $EXAMPLE_REPO $TEST_DIR
cd $TEST_DIR
git reset --hard 9bfba54ee8bf23141d95b1abe4b7207b0f3498e2

cat > config.yaml << EOL
project: $PROJECT

realisations:
- repo:
local:
path: $CABLE_DIR
- repo:
git:
branch: main
modules: [
intel-compiler/2021.1.1,
netcdf/4.7.4,
openmpi/4.1.0
]

fluxsite:
experiment: AU-Tum
pbs:
storage:
- scratch/$PROJECT
- gdata/$PROJECT
meorg_model_output_id: Sss7qupAHEZ8ovbCv
EOL

benchcab run -v
9 changes: 9 additions & 0 deletions src/benchcab/internal.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,3 +274,12 @@ def get_met_forcing_file_names(experiment: str) -> list[str]:
]

return file_names

# Configuration for the client upload
MEORG_CLIENT = dict(
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New internals keys for client, note the copyq only allows a single thread, boo.

num_threads=1, # Parallel uploads over 4 cores
cache_delay=60*5, # 5mins between upload and analysis triggering
mem="8G",
walltime="01:00:00",
storage=["gdata/ks32", "gdata/hh5", "gdata/wd9", "gdata/rp23"]
)
22 changes: 21 additions & 1 deletion src/benchcab/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import sys
from importlib import resources
from pathlib import Path
from typing import Union
from typing import Union, Iterable

import yaml
from jinja2 import BaseLoader, Environment
Expand Down Expand Up @@ -148,3 +148,23 @@ def get_logger(name="benchcab", level="debug"):
def is_verbose():
"""Return True if verbose output is enabled, False otherwise."""
return get_logger().getEffectiveLevel() == logging.DEBUG


def task_summary(tasks: Iterable) -> tuple:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that helper function I mentioned above.

"""Return a summary of task completions.
Parameters
----------
tasks : Iterable
Iterable of tasks with an .is_done() method available.
Returns
-------
tuple
num_tasks, num_complete, num_failed, all_complete
"""
num_tasks = len(tasks)
num_complete = len([task for task in tasks if task.is_done()])
num_failed = num_tasks - num_complete

return num_tasks, num_complete, num_failed, num_complete == num_tasks
96 changes: 96 additions & 0 deletions src/benchcab/utils/meorg.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
"""Utility methods for interacting with the ME.org client."""
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New utilities to handle interaction with the client.

from benchcab.internal import MEORG_CLIENT
from meorg_client.client import Client as MeorgClient
from hpcpy import get_client
import benchcab.utils as bu
import os
from glob import glob

def do_meorg(config: dict, upload_dir: str, benchcab_bin: str, benchcab_job_id: str):
"""Perform the upload of model outputs to modelevaluation.org
Parameters
----------
config : dict
The master config dictionary
upload_dir : str
Absolute path to the data dir for upload
benchcab_bin : str
Path to the benchcab bin, from which to infer the client bin
Returns
-------
bool
True if successful, False otherwise
"""

logger = bu.get_logger()

model_output_id = config.get("fluxsite").get("meorg_model_output_id", False)
num_threads = MEORG_CLIENT["num_threads"]

# Check if a model output id has been assigned
if model_output_id == False:
logger.info("No model_output_id found in fluxsite configuration.")
logger.info("NOT uploading to modelevaluation.org")
return False

# Allow the user to specify an absolute path to the meorg bin in config
meorg_bin = config.get("meorg_bin", False)

# Otherwise infer the path from the benchcab installation
if meorg_bin == False:
logger.debug(f"Inferring meorg bin from {benchcab_bin}")
bin_segments = benchcab_bin.split("/")
bin_segments[-1] = "meorg"
meorg_bin = "/".join(bin_segments)

logger.debug(f"meorg_bin = {meorg_bin}")

# Now, check if that actually exists
if os.path.isfile(meorg_bin) == False:
logger.error(f"No meorg_client executable found at {meorg_bin}")
logger.error("NOT uploading to modelevaluation.org")
return False

# Also only run if the client is initialised
if MeorgClient().is_initialised() == False:

logger.warn("A model_output_id has been supplied, but the meorg_client is not initialised.")
logger.warn("To initialise, run `meorg initialise` in the installation environment.")
logger.warn("Once initialised, the outputs from this run can be uploaded with the following command:")
logger.warn(f"meorg file upload {upload_dir}/*.nc -n {num_threads} --attach_to {model_output_id}")
logger.warn("Then the analysis can be triggered with:")
logger.warn(f"meorg analysis start {model_output_id}")
return False

# Finally, attempt the upload!
else:

logger.info("Uploading outputs to modelevaluation.org")

# Submit the outputs
client = get_client()
meorg_jobid = client.submit(

bu.get_installed_root() / "data" / "meorg_jobscript.j2",
render=True,
dry_run=False,
depends_on=benchcab_job_id,

# Interpolate into the job script
model_output_id=model_output_id,
data_dir=upload_dir,
cache_delay=MEORG_CLIENT["cache_delay"],
mem=MEORG_CLIENT["mem"],
num_threads=MEORG_CLIENT["num_threads"],
walltime=MEORG_CLIENT["walltime"],
storage=MEORG_CLIENT['storage'],
project=config['project'],
modules=config['modules'],
purge_outputs=True,
meorg_bin=meorg_bin
)

logger.info(f"Upload job submitted: {meorg_jobid}")
return True
47 changes: 25 additions & 22 deletions tests/test_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,37 +3,40 @@

import pytest
from benchcab.utils.state import State, StateAttributeError
from tempfile import TemporaryDirectory
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Switched the state tests to in-memory, off-topic but it should make things in CI a bit cleaner.



@pytest.fixture()
def state():
"""Return a State object."""
return State(state_dir=Path("my_state"))


def test_state_is_set(state):
def test_state_is_set():
"""Success case: test state is set."""
state.set("foo")
assert state.is_set("foo")
with TemporaryDirectory() as tmp_dir:
state = State(state_dir=Path(tmp_dir))
state.set("foo")
assert state.is_set("foo")


def test_state_reset(state):
def test_state_reset():
"""Success case: test state is reset."""
state.set("foo")
state.reset()
assert not state.is_set("foo")
with TemporaryDirectory() as tmp_dir:
state = State(state_dir=Path(tmp_dir))
state.set("foo")
state.reset()
assert not state.is_set("foo")


def test_state_get(state):
def test_state_get():
"""Success case: test get() returns the most recent state attribute."""
state.set("foo")
# This is done so that time stamps can be resolved between state attributes
time.sleep(0.01)
state.set("bar")
assert state.get() == "bar"
with TemporaryDirectory() as tmp_dir:
state = State(state_dir=Path(tmp_dir))
state.set("foo")
# This is done so that time stamps can be resolved between state attributes
time.sleep(1)
state.set("bar")
assert state.get() == "bar"


def test_state_get_raises_exception(state):
def test_state_get_raises_exception():
"""Failure case: test get() raises an exception when no attributes are set."""
with pytest.raises(StateAttributeError):
state.get()
with TemporaryDirectory() as tmp_dir:
state = State(state_dir=Path(tmp_dir))
with pytest.raises(StateAttributeError):
state.get()
Loading
Loading