Skip to content

Commit

Permalink
Merge pull request #16579 from mvdbeek/resource_param_fix
Browse files Browse the repository at this point in the history
[23.1] Ignore errors with user-set job resources
  • Loading branch information
mvdbeek authored Aug 23, 2023
2 parents 14d96f9 + bd839e9 commit 66ec43c
Show file tree
Hide file tree
Showing 6 changed files with 108 additions and 8 deletions.
39 changes: 31 additions & 8 deletions lib/galaxy/jobs/mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def __init__(self, job_wrapper, url_to_destination, job_config):
module_name = job_config.dynamic_params["rules_module"]
self.rules_module = importlib.import_module(module_name)

def __invoke_expand_function(self, expand_function, destination):
def __invoke_expand_function(self, expand_function, destination, resource_params_from_job_state=True):
function_arg_names = getfullargspec(expand_function).args
app = self.job_wrapper.app
possible_args = {
Expand Down Expand Up @@ -110,7 +110,9 @@ def __invoke_expand_function(self, expand_function, destination):
elif require_db and arg in db_param_mapping:
actual_args[arg] = db_param_mapping[arg]
elif arg == "resource_params":
actual_args["resource_params"] = self.job_wrapper.get_resource_parameters(job)
actual_args["resource_params"] = (
self.job_wrapper.get_resource_parameters(job) if resource_params_from_job_state else {}
)
elif arg == "workflow_invocation_uuid":
param_values = job.raw_param_dict()
workflow_invocation_uuid = param_values.get("__workflow_invocation_uuid__", None)
Expand Down Expand Up @@ -203,7 +205,23 @@ def __handle_dynamic_job_destination(self, destination):
return self.__handle_rule(expand_function, destination)

def __handle_rule(self, rule_function, destination):
job_destination = self.__invoke_expand_function(rule_function, destination)
try:
job_destination = self.__invoke_expand_function(rule_function, destination)
except Exception as e:
# Rules have varying quality and don't raise a consistent set of standard exceptions.
# so ... if we get an error here let's try again without resource params encoded
# in the job state. They're evil anyway.
try:
job_destination = self.__invoke_expand_function(
rule_function, destination, resource_params_from_job_state=False
)
except Exception:
# raise original exception, dropping resource param from job state didn't help.
raise e
else:
log.warning(
f"Ignored user-specified invalid resource parameter request because it failed with {str(e)}"
)
if not isinstance(job_destination, galaxy.jobs.JobDestination):
job_destination_rep = str(job_destination) # Should be either id or url
if "://" in job_destination_rep:
Expand All @@ -220,11 +238,16 @@ def __determine_job_destination(self, params, raw_job_destination=None):
if raw_job_destination is None:
raw_job_destination = self.job_wrapper.tool.get_job_destination(params)
if raw_job_destination.runner == DYNAMIC_RUNNER_NAME:
job_destination = self.__handle_dynamic_job_destination(raw_job_destination)
log.debug("(%s) Mapped job to destination id: %s", self.job_wrapper.job_id, job_destination.id)
# Recursively handle chained dynamic destinations
if job_destination.runner == DYNAMIC_RUNNER_NAME:
return self.__determine_job_destination(params, raw_job_destination=job_destination)
try:
job_destination = self.__handle_dynamic_job_destination(raw_job_destination)
log.debug("(%s) Mapped job to destination id: %s", self.job_wrapper.job_id, job_destination.id)
# Recursively handle chained dynamic destinations
if job_destination.runner == DYNAMIC_RUNNER_NAME:
return self.__determine_job_destination(params, raw_job_destination=job_destination)
except AssertionError:
if params and "job_resource" in params:
params = params.copy()
del params["job_resource"]
else:
job_destination = raw_job_destination
log.debug("(%s) Mapped job to destination id: %s", self.job_wrapper.job_id, job_destination.id)
Expand Down
7 changes: 7 additions & 0 deletions test/integration/dummy_job_resource_parameters.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
<parameters>
<param label="Compute Resource" name="multi_compute_resource" type="select"
help="Need help selecting a compute resource? Options and limits are explained in detail &lt;a href='https://galaxyproject.org/main/' target='_blank'&gt;in the Galaxy Hub&lt;/a&gt;">
<option value="does not matter">Galaxy cluster</option>
<option value="or this">Jetstream 2</option>
</param>
</parameters>
22 changes: 22 additions & 0 deletions test/integration/job_resource_error_recovery_job_conf.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# job config that fails submission if job resource parameters are requested
runners:
local:
load: galaxy.jobs.runners.local:LocalJobRunner
workers: 1
dynamic:
rules_module: integration.job_resource_rules

execution:
default: initial_destination
environments:
initial_destination:
runner: dynamic
type: python
function: local_or_exception
local:
runner: local

tools:
- class: local
environment: local
resources: upload
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
def local_or_exception(resource_params):
"""Build environment that fails if resource_params are passed."""
if resource_params:
raise Exception("boo!")
return "local"
43 changes: 43 additions & 0 deletions test/integration/test_job_resource_error_recovery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import os

from galaxy_test.base.populators import (
DatasetPopulator,
WorkflowPopulator,
)
from galaxy_test.driver import integration_util

SCRIPT_DIRECTORY = os.path.abspath(os.path.dirname(__file__))
JOB_RESOURCES_CONFIG_FILE = os.path.join(SCRIPT_DIRECTORY, "dummy_job_resource_parameters.xml")
JOB_CONFIG = os.path.join(SCRIPT_DIRECTORY, "job_resource_error_recovery_job_conf.yml")


class TestJobRecoveryBeforeHandledIntegration(integration_util.IntegrationTestCase):
dataset_populator: DatasetPopulator
framework_tool_and_types = True

def setUp(self) -> None:
super().setUp()
self.dataset_populator = DatasetPopulator(self.galaxy_interactor)
self.workflow_populator = WorkflowPopulator(self.galaxy_interactor)

@classmethod
def handle_galaxy_config_kwds(cls, config) -> None:
super().handle_galaxy_config_kwds(config)
config["job_config_file"] = JOB_CONFIG
config["job_resource_params_file"] = JOB_RESOURCES_CONFIG_FILE

def test_recovers_from_job_resource_errors(self):
with self.dataset_populator.test_history() as history_id:
self.workflow_populator.run_workflow(
"""
class: GalaxyWorkflow
steps:
simple_step:
tool_id: create_2
tool_state:
__job_resource:
__job_resource__select: 'yes'
cores: '32'
""",
history_id=history_id,
)

0 comments on commit 66ec43c

Please sign in to comment.