-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
300 add connection to meorg client to successful run #312
base: main
Are you sure you want to change the base?
Changes from 5 commits
568ac95
7ee1b7b
2612284
f6fadbc
b77a15a
2534bc4
0508cdb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,8 +22,9 @@ | |
from benchcab.environment_modules import EnvironmentModules, EnvironmentModulesInterface | ||
from benchcab.internal import get_met_forcing_file_names | ||
from benchcab.model import Model | ||
from benchcab.utils import is_verbose | ||
from benchcab.utils import is_verbose, task_summary | ||
from benchcab.utils.fs import mkdir, next_path | ||
import benchcab.utils.meorg as bm | ||
from benchcab.utils.pbs import render_job_script | ||
from benchcab.utils.repo import create_repo | ||
from benchcab.utils.subprocess import SubprocessWrapper, SubprocessWrapperInterface | ||
|
@@ -234,13 +235,25 @@ def fluxsite_submit_job(self, config_path: str, skip: list[str]) -> None: | |
logger.error(exc.output) | ||
raise | ||
|
||
logger.info(f"PBS job submitted: {proc.stdout.strip()}") | ||
# Get the job ID | ||
job_id = proc.stdout.strip() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Need to capture the job id to hand to hpcpy |
||
|
||
logger.info(f"PBS job submitted: {job_id}") | ||
logger.info("CABLE log file for each task is written to:") | ||
logger.info(f"{internal.FLUXSITE_DIRS['LOG']}/<task_name>_log.txt") | ||
logger.info("The CABLE standard output for each task is written to:") | ||
logger.info(f"{internal.FLUXSITE_DIRS['TASKS']}/<task_name>/out.txt") | ||
logger.info("The NetCDF output for each task is written to:") | ||
logger.info(f"{internal.FLUXSITE_DIRS['OUTPUT']}/<task_name>_out.nc") | ||
|
||
# Upload to meorg by default | ||
bm.do_meorg( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Blind function to submit the upload job. Graph dependency is handled by PBS so benchcab can conveniently forget about this once submitted. |
||
config, | ||
upload_dir=internal.FLUXSITE_DIRS['OUTPUT'], | ||
benchcab_bin=str(self.benchcab_exe_path), | ||
benchcab_job_id=job_id | ||
) | ||
|
||
|
||
def gen_codecov(self, config_path: str): | ||
"""Endpoint for `benchcab codecov`.""" | ||
|
@@ -347,8 +360,7 @@ def fluxsite_run_tasks(self, config_path: str): | |
else: | ||
fluxsite.run_tasks(tasks) | ||
|
||
tasks_failed = [task for task in tasks if not task.is_done()] | ||
n_failed, n_success = len(tasks_failed), len(tasks) - len(tasks_failed) | ||
n_tasks, n_success, n_failed, all_complete = task_summary(tasks) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
logger.info(f"{n_failed} failed, {n_success} passed") | ||
|
||
def fluxsite_bitwise_cmp(self, config_path: str): | ||
|
@@ -374,10 +386,9 @@ def fluxsite_bitwise_cmp(self, config_path: str): | |
ncpus = config["fluxsite"]["pbs"]["ncpus"] | ||
run_comparisons_in_parallel(comparisons, n_processes=ncpus) | ||
else: | ||
run_comparisons(comparisons) | ||
|
||
tasks_failed = [task for task in comparisons if not task.is_done()] | ||
n_failed, n_success = len(tasks_failed), len(comparisons) - len(tasks_failed) | ||
run_comparisons(comparisons) | ||
|
||
n_tasks, n_success, n_failed, all_complete = task_summary(comparisons) | ||
logger.info(f"{n_failed} failed, {n_success} passed") | ||
|
||
def fluxsite(self, config_path: str, no_submit: bool, skip: list[str]): | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -107,6 +107,9 @@ fluxsite: | |
schema: | ||
type: "string" | ||
required: false | ||
meorg_model_output_id: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. New schema keys, optional |
||
type: "string" | ||
required: false | ||
|
||
spatial: | ||
type: "dict" | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
#!/bin/bash | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. New job script for submitting the upload job. Nothing special. |
||
#PBS -l wd | ||
#PBS -l ncpus={{num_threads}} | ||
#PBS -l mem={{mem}} | ||
#PBS -l walltime={{walltime}} | ||
#PBS -q copyq | ||
#PBS -P {{project}} | ||
#PBS -j oe | ||
#PBS -m e | ||
#PBS -l storage={{storage_str}} | ||
|
||
module purge | ||
{% for module in modules -%} | ||
module load {{module}} | ||
{% endfor %} | ||
set -ev | ||
|
||
# Set some things | ||
DATA_DIR={{data_dir}} | ||
NUM_THREADS={{num_threads}} | ||
MODEL_OUTPUT_ID={{model_output_id}} | ||
CACHE_DELAY={{cache_delay}} | ||
MEORG_BIN={{meorg_bin}} | ||
|
||
{% if purge_outputs %} | ||
# Purge existing model outputs | ||
echo "Purging existing outputs from $MODEL_OUTPUT_ID" | ||
$MEORG_BIN file detach_all $MODEL_OUTPUT_ID | ||
{% endif %} | ||
|
||
# Upload the data | ||
echo "Uploading data to $MODEL_OUTPUT_ID" | ||
$MEORG_BIN file upload $DATA_DIR/*.nc -n $NUM_THREADS --attach_to $MODEL_OUTPUT_ID | ||
|
||
# Wait for the cache to transfer to the object store. | ||
echo "Waiting for object store transfer ($CACHE_DELAY sec)" | ||
sleep $CACHE_DELAY | ||
|
||
# Trigger the analysis | ||
echo "Triggering analysis on $MODEL_OUTPUT_ID" | ||
$MEORG_BIN analysis start $MODEL_OUTPUT_ID | ||
|
||
echo "DONE" |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,52 @@ | ||
#!/bin/bash | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Integration script variant with the optional |
||
|
||
set -ex | ||
|
||
CABLE_REPO="[email protected]:CABLE-LSM/CABLE.git" | ||
CABLE_DIR=/scratch/$PROJECT/$USER/benchcab/CABLE | ||
|
||
TEST_DIR=/scratch/$PROJECT/$USER/benchcab/integration | ||
EXAMPLE_REPO="[email protected]:CABLE-LSM/bench_example.git" | ||
|
||
# Remove CABLE and test work space, then recreate | ||
rm -rf $CABLE_DIR | ||
mkdir -p $CABLE_DIR | ||
|
||
rm -rf $TEST_DIR | ||
mkdir -p $TEST_DIR | ||
|
||
# Clone local checkout for CABLE | ||
git clone $CABLE_REPO $CABLE_DIR | ||
cd $CABLE_DIR | ||
|
||
# Clone the example repo | ||
git clone $EXAMPLE_REPO $TEST_DIR | ||
cd $TEST_DIR | ||
git reset --hard 9bfba54ee8bf23141d95b1abe4b7207b0f3498e2 | ||
|
||
cat > config.yaml << EOL | ||
project: $PROJECT | ||
|
||
realisations: | ||
- repo: | ||
local: | ||
path: $CABLE_DIR | ||
- repo: | ||
git: | ||
branch: main | ||
modules: [ | ||
intel-compiler/2021.1.1, | ||
netcdf/4.7.4, | ||
openmpi/4.1.0 | ||
] | ||
|
||
fluxsite: | ||
experiment: AU-Tum | ||
pbs: | ||
storage: | ||
- scratch/$PROJECT | ||
- gdata/$PROJECT | ||
meorg_model_output_id: Sss7qupAHEZ8ovbCv | ||
EOL | ||
|
||
benchcab run -v |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -274,3 +274,12 @@ def get_met_forcing_file_names(experiment: str) -> list[str]: | |
] | ||
|
||
return file_names | ||
|
||
# Configuration for the client upload | ||
MEORG_CLIENT = dict( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. New internals keys for client, note the |
||
num_threads=1, # Parallel uploads over 4 cores | ||
cache_delay=60*5, # 5mins between upload and analysis triggering | ||
mem="8G", | ||
walltime="01:00:00", | ||
storage=["gdata/ks32", "gdata/hh5", "gdata/wd9", "gdata/rp23"] | ||
) |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -11,7 +11,7 @@ | |
import sys | ||
from importlib import resources | ||
from pathlib import Path | ||
from typing import Union | ||
from typing import Union, Iterable | ||
|
||
import yaml | ||
from jinja2 import BaseLoader, Environment | ||
|
@@ -148,3 +148,23 @@ def get_logger(name="benchcab", level="debug"): | |
def is_verbose(): | ||
"""Return True if verbose output is enabled, False otherwise.""" | ||
return get_logger().getEffectiveLevel() == logging.DEBUG | ||
|
||
|
||
def task_summary(tasks: Iterable) -> tuple: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. that helper function I mentioned above. |
||
"""Return a summary of task completions. | ||
Parameters | ||
---------- | ||
tasks : Iterable | ||
Iterable of tasks with an .is_done() method available. | ||
Returns | ||
------- | ||
tuple | ||
num_tasks, num_complete, num_failed, all_complete | ||
""" | ||
num_tasks = len(tasks) | ||
num_complete = len([task for task in tasks if task.is_done()]) | ||
num_failed = num_tasks - num_complete | ||
|
||
return num_tasks, num_complete, num_failed, num_complete == num_tasks |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,96 @@ | ||
"""Utility methods for interacting with the ME.org client.""" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. New utilities to handle interaction with the client. |
||
from benchcab.internal import MEORG_CLIENT | ||
from meorg_client.client import Client as MeorgClient | ||
from hpcpy import get_client | ||
import benchcab.utils as bu | ||
import os | ||
from glob import glob | ||
|
||
def do_meorg(config: dict, upload_dir: str, benchcab_bin: str, benchcab_job_id: str): | ||
"""Perform the upload of model outputs to modelevaluation.org | ||
Parameters | ||
---------- | ||
config : dict | ||
The master config dictionary | ||
upload_dir : str | ||
Absolute path to the data dir for upload | ||
benchcab_bin : str | ||
Path to the benchcab bin, from which to infer the client bin | ||
Returns | ||
------- | ||
bool | ||
True if successful, False otherwise | ||
""" | ||
|
||
logger = bu.get_logger() | ||
|
||
model_output_id = config.get("fluxsite").get("meorg_model_output_id", False) | ||
num_threads = MEORG_CLIENT["num_threads"] | ||
|
||
# Check if a model output id has been assigned | ||
if model_output_id == False: | ||
logger.info("No model_output_id found in fluxsite configuration.") | ||
logger.info("NOT uploading to modelevaluation.org") | ||
return False | ||
|
||
# Allow the user to specify an absolute path to the meorg bin in config | ||
meorg_bin = config.get("meorg_bin", False) | ||
|
||
# Otherwise infer the path from the benchcab installation | ||
if meorg_bin == False: | ||
logger.debug(f"Inferring meorg bin from {benchcab_bin}") | ||
bin_segments = benchcab_bin.split("/") | ||
bin_segments[-1] = "meorg" | ||
meorg_bin = "/".join(bin_segments) | ||
|
||
logger.debug(f"meorg_bin = {meorg_bin}") | ||
|
||
# Now, check if that actually exists | ||
if os.path.isfile(meorg_bin) == False: | ||
logger.error(f"No meorg_client executable found at {meorg_bin}") | ||
logger.error("NOT uploading to modelevaluation.org") | ||
return False | ||
|
||
# Also only run if the client is initialised | ||
if MeorgClient().is_initialised() == False: | ||
|
||
logger.warn("A model_output_id has been supplied, but the meorg_client is not initialised.") | ||
logger.warn("To initialise, run `meorg initialise` in the installation environment.") | ||
logger.warn("Once initialised, the outputs from this run can be uploaded with the following command:") | ||
logger.warn(f"meorg file upload {upload_dir}/*.nc -n {num_threads} --attach_to {model_output_id}") | ||
logger.warn("Then the analysis can be triggered with:") | ||
logger.warn(f"meorg analysis start {model_output_id}") | ||
return False | ||
|
||
# Finally, attempt the upload! | ||
else: | ||
|
||
logger.info("Uploading outputs to modelevaluation.org") | ||
|
||
# Submit the outputs | ||
client = get_client() | ||
meorg_jobid = client.submit( | ||
|
||
bu.get_installed_root() / "data" / "meorg_jobscript.j2", | ||
render=True, | ||
dry_run=False, | ||
depends_on=benchcab_job_id, | ||
|
||
# Interpolate into the job script | ||
model_output_id=model_output_id, | ||
data_dir=upload_dir, | ||
cache_delay=MEORG_CLIENT["cache_delay"], | ||
mem=MEORG_CLIENT["mem"], | ||
num_threads=MEORG_CLIENT["num_threads"], | ||
walltime=MEORG_CLIENT["walltime"], | ||
storage=MEORG_CLIENT['storage'], | ||
project=config['project'], | ||
modules=config['modules'], | ||
purge_outputs=True, | ||
meorg_bin=meorg_bin | ||
) | ||
|
||
logger.info(f"Upload job submitted: {meorg_jobid}") | ||
return True |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,37 +3,40 @@ | |
|
||
import pytest | ||
from benchcab.utils.state import State, StateAttributeError | ||
from tempfile import TemporaryDirectory | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Switched the state tests to in-memory, off-topic but it should make things in CI a bit cleaner. |
||
|
||
|
||
@pytest.fixture() | ||
def state(): | ||
"""Return a State object.""" | ||
return State(state_dir=Path("my_state")) | ||
|
||
|
||
def test_state_is_set(state): | ||
def test_state_is_set(): | ||
"""Success case: test state is set.""" | ||
state.set("foo") | ||
assert state.is_set("foo") | ||
with TemporaryDirectory() as tmp_dir: | ||
state = State(state_dir=Path(tmp_dir)) | ||
state.set("foo") | ||
assert state.is_set("foo") | ||
|
||
|
||
def test_state_reset(state): | ||
def test_state_reset(): | ||
"""Success case: test state is reset.""" | ||
state.set("foo") | ||
state.reset() | ||
assert not state.is_set("foo") | ||
with TemporaryDirectory() as tmp_dir: | ||
state = State(state_dir=Path(tmp_dir)) | ||
state.set("foo") | ||
state.reset() | ||
assert not state.is_set("foo") | ||
|
||
|
||
def test_state_get(state): | ||
def test_state_get(): | ||
"""Success case: test get() returns the most recent state attribute.""" | ||
state.set("foo") | ||
# This is done so that time stamps can be resolved between state attributes | ||
time.sleep(0.01) | ||
state.set("bar") | ||
assert state.get() == "bar" | ||
with TemporaryDirectory() as tmp_dir: | ||
state = State(state_dir=Path(tmp_dir)) | ||
state.set("foo") | ||
# This is done so that time stamps can be resolved between state attributes | ||
time.sleep(1) | ||
state.set("bar") | ||
assert state.get() == "bar" | ||
|
||
|
||
def test_state_get_raises_exception(state): | ||
def test_state_get_raises_exception(): | ||
"""Failure case: test get() raises an exception when no attributes are set.""" | ||
with pytest.raises(StateAttributeError): | ||
state.get() | ||
with TemporaryDirectory() as tmp_dir: | ||
state = State(state_dir=Path(tmp_dir)) | ||
with pytest.raises(StateAttributeError): | ||
state.get() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
New deps. We'll need to get these added to the env.