-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
300 add connection to meorg client to successful run #312
base: main
Are you sure you want to change the base?
300 add connection to meorg client to successful run #312
Conversation
@@ -16,6 +16,8 @@ dependencies: | |||
- cerberus>=1.3.5 | |||
- gitpython | |||
- jinja2 | |||
- hpcpy>=0.3.0 | |||
- meorg_client |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
New deps. We'll need to get these added to the env.
@@ -234,13 +235,25 @@ def fluxsite_submit_job(self, config_path: str, skip: list[str]) -> None: | |||
logger.error(exc.output) | |||
raise | |||
|
|||
logger.info(f"PBS job submitted: {proc.stdout.strip()}") | |||
# Get the job ID | |||
job_id = proc.stdout.strip() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need to capture the job id to hand to hpcpy
logger.info("CABLE log file for each task is written to:") | ||
logger.info(f"{internal.FLUXSITE_DIRS['LOG']}/<task_name>_log.txt") | ||
logger.info("The CABLE standard output for each task is written to:") | ||
logger.info(f"{internal.FLUXSITE_DIRS['TASKS']}/<task_name>/out.txt") | ||
logger.info("The NetCDF output for each task is written to:") | ||
logger.info(f"{internal.FLUXSITE_DIRS['OUTPUT']}/<task_name>_out.nc") | ||
|
||
# Upload to meorg by default | ||
bm.do_meorg( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Blind function to submit the upload job. Graph dependency is handled by PBS so benchcab can conveniently forget about this once submitted.
@@ -347,8 +360,7 @@ def fluxsite_run_tasks(self, config_path: str): | |||
else: | |||
fluxsite.run_tasks(tasks) | |||
|
|||
tasks_failed = [task for task in tasks if not task.is_done()] | |||
n_failed, n_success = len(tasks_failed), len(tasks) - len(tasks_failed) | |||
n_tasks, n_success, n_failed, all_complete = task_summary(tasks) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
task_summary
was a helper function developed for this, before we started using hpcpy to handle the downstream submission. Still useful.
@@ -107,6 +107,9 @@ fluxsite: | |||
schema: | |||
type: "string" | |||
required: false | |||
meorg_model_output_id: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
New schema keys, optional model_output_id
@@ -0,0 +1,43 @@ | |||
#!/bin/bash |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
New job script for submitting the upload job. Nothing special.
@@ -0,0 +1,52 @@ | |||
#!/bin/bash |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Integration script variant with the optional model_output_id
key added.
@@ -274,3 +274,12 @@ def get_met_forcing_file_names(experiment: str) -> list[str]: | |||
] | |||
|
|||
return file_names | |||
|
|||
# Configuration for the client upload | |||
MEORG_CLIENT = dict( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
New internals keys for client, note the copyq
only allows a single thread, boo.
@@ -148,3 +148,23 @@ def get_logger(name="benchcab", level="debug"): | |||
def is_verbose(): | |||
"""Return True if verbose output is enabled, False otherwise.""" | |||
return get_logger().getEffectiveLevel() == logging.DEBUG | |||
|
|||
|
|||
def task_summary(tasks: Iterable) -> tuple: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that helper function I mentioned above.
@@ -0,0 +1,96 @@ | |||
"""Utility methods for interacting with the ME.org client.""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
New utilities to handle interaction with the client.
@@ -3,37 +3,40 @@ | |||
|
|||
import pytest | |||
from benchcab.utils.state import State, StateAttributeError | |||
from tempfile import TemporaryDirectory |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Switched the state tests to in-memory, off-topic but it should make things in CI a bit cleaner.
Code for the integration work.