Skip to content

Commit

Permalink
Allowed "revalidation" of cylc workflows where template variables are…
Browse files Browse the repository at this point in the history
… collected from

database of already played workflows.
- Added `--revaladidate` as an option to the following scripts
  (Made async to allow testing of changes):
  - cylc validate
  - cylc view
  - cylc graph
  - cylc config
  • Loading branch information
wxtim committed Oct 12, 2022
1 parent 901fa06 commit 5935610
Show file tree
Hide file tree
Showing 15 changed files with 455 additions and 37 deletions.
5 changes: 5 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ ones in. -->

### Enhancements

[#5187](https://github.com/cylc/cylc-flow/pull/5189) - Allow
`cylc validate --revalidate` to use template variables collected from
the workflow database. Also applied to `cylc graph`, `cylc view` and
`cylc config`.

[#5032](https://github.com/cylc/cylc-flow/pull/5032) - set a default limit of
100 for the "default" queue.

Expand Down
20 changes: 20 additions & 0 deletions cylc/flow/option_parsers.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
from typing import Any, Dict, Optional, List, Tuple

from cylc.flow import LOG
from cylc.flow.exceptions import WorkflowConfigError
from cylc.flow.pathutil import is_in_a_rundir
from cylc.flow.terminal import supports_color, DIM
import cylc.flow.flags
from cylc.flow.loggingutil import (
Expand Down Expand Up @@ -289,6 +291,7 @@ def __init__(
argdoc: Optional[List[Tuple[str, str]]] = None,
comms: bool = False,
jset: bool = False,
revalidate: bool = False,
multitask: bool = False,
multiworkflow: bool = False,
auto_add: bool = True,
Expand All @@ -303,6 +306,7 @@ def __init__(
instructions. Optional list of tuples of (name, description).
comms: If True, allow the --comms-timeout option.
jset: If True, allow the Jinja2 --set option.
revalidate: If True, allow the --revalidate option.
multitask: If True, insert the multitask text into the
usage instructions.
multiworkflow: If True, insert the multiworkflow text into the
Expand All @@ -327,6 +331,7 @@ def __init__(
self.unlimited_args = False
self.comms = comms
self.jset = jset
self.revalidate = revalidate
self.color = color
# Whether to log messages that are below warning level to stdout
# instead of stderr:
Expand Down Expand Up @@ -440,6 +445,13 @@ def add_std_options(self):
),
action="store", default=None, dest="templatevars_file")

if self.revalidate:
self.add_std_option(
'--revalidate',
help="Get template variables from prevous workflow run.",
action='store_true', default=False
)

def add_cylc_rose_options(self) -> None:
"""Add extra options for cylc-rose plugin if it is installed."""
try:
Expand Down Expand Up @@ -607,3 +619,11 @@ def __call__(self, **kwargs) -> Values:
setattr(opts, key, value)

return opts


def can_revalidate(flow_file, opts):
if not is_in_a_rundir(flow_file) and opts.revalidate:
raise WorkflowConfigError(
'Revalidation only works with installed workflows.'
)
return True
2 changes: 1 addition & 1 deletion cylc/flow/parsec/fileparse.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ def process_plugins(fpath, opts):
# If you want it to work on sourcedirs you need to get the options
# to here.
plugin_result = entry_point.resolve()(
srcdir=fpath, opts=opts
fpath, opts=opts
)
except Exception as exc:
# NOTE: except Exception (purposefully vague)
Expand Down
5 changes: 5 additions & 0 deletions cylc/flow/pathutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -456,3 +456,8 @@ def get_workflow_name_from_id(workflow_id: str) -> str:
name_path = id_path

return str(name_path.relative_to(cylc_run_dir))


def is_in_a_rundir(path_):
"""Is this path in a run directory"""
return is_relative_to(path_, Path(get_cylc_run_dir()))
70 changes: 70 additions & 0 deletions cylc/flow/pre_configure/get_old_tvars.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""Retrieve template variables stored in a workflow database.
"""

from cylc.flow.rundb import CylcWorkflowDAO
from cylc.flow.templatevars import eval_var
from optparse import Values
from pathlib import Path
from typing import Union


class OldTemplateVars:
"""Gets template variables stored in workflow database.
Mirrors the interface used in scheduler.py to get db nfo on restart.
"""
DB = 'log/db'

def __init__(self, run_dir):
self.template_vars = {}
self._get_db_template_vars(Path(run_dir))

def _callback(self, _, row):
"""Extract key and value and run eval_var on them assigning
them to self.template_vars.
"""
self.template_vars[row[0]] = eval_var(row[1])

def _get_db_template_vars(self, run_dir):
dao = CylcWorkflowDAO(str(run_dir / self.DB))
dao.select_workflow_template_vars(self._callback)


# Entry point:
def main(srcdir: Union[Path, str], opts: 'Values') -> dict:
# We can calculate the source directory here!
"""Get options from a previously installed run.
These options are stored in the database.
Calculate the templating language used from the shebang line.
N.B. The srcdir for this plugin to operate on is a workflow run dir.
Args:
srcdir: The directory of a previously run workflow.
opts: Options Object
"""
if not hasattr(opts, 'revalidate') or not opts.revalidate:
return {}
else:
return {
'template_variables':
OldTemplateVars(srcdir).template_vars,
'templating_detected':
'template variables'
}
23 changes: 19 additions & 4 deletions cylc/flow/scripts/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,19 +49,20 @@
$ cylc config --initial-cycle-point=now myflow
"""

import asyncio
import os.path
from typing import List, Optional, TYPE_CHECKING

from cylc.flow.cfgspec.glbl_cfg import glbl_cfg
from cylc.flow.config import WorkflowConfig
from cylc.flow.id_cli import parse_id
from cylc.flow.exceptions import InputError
from cylc.flow.id_cli import parse_id_async
from cylc.flow.exceptions import InputError, WorkflowConfigError
from cylc.flow.option_parsers import (
WORKFLOW_ID_OR_PATH_ARG_DOC,
CylcOptionParser as COP,
icp_option,
)
from cylc.flow.pathutil import get_workflow_run_dir
from cylc.flow.pathutil import get_workflow_run_dir, is_in_a_rundir
from cylc.flow.templatevars import get_template_vars
from cylc.flow.terminal import cli_function
from cylc.flow.workflow_files import WorkflowFiles
Expand All @@ -75,6 +76,7 @@ def get_option_parser() -> COP:
__doc__,
argdoc=[COP.optional(WORKFLOW_ID_OR_PATH_ARG_DOC)],
jset=True,
revalidate=True,
)

parser.add_option(
Expand Down Expand Up @@ -149,6 +151,14 @@ def main(
options: 'Values',
*ids,
) -> None:
asyncio.run(_main(parser, options, *ids))


async def _main(
parser: COP,
options: 'Values',
*ids,
) -> None:

if options.print_platform_names and options.print_platforms:
options.print_platform_names = False
Expand Down Expand Up @@ -178,12 +188,17 @@ def main(
)
return

workflow_id, _, flow_file = parse_id(
workflow_id, _, flow_file = await parse_id_async(
*ids,
src=True,
constraint='workflows',
)

if not is_in_a_rundir(flow_file) and options.revalidate:
raise WorkflowConfigError(
'Revalidation only works with installed workflows.'
)

if options.print_hierarchy:
print("\n".join(get_config_file_hierarchy(workflow_id)))
return
Expand Down
72 changes: 50 additions & 22 deletions cylc/flow/scripts/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
$ cylc graph one -o 'one.svg'
"""

import asyncio
from difflib import unified_diff
from shutil import which
from subprocess import Popen, PIPE
Expand All @@ -45,11 +46,12 @@
from cylc.flow.config import WorkflowConfig
from cylc.flow.exceptions import InputError, CylcError
from cylc.flow.id import Tokens
from cylc.flow.id_cli import parse_id
from cylc.flow.id_cli import parse_id_async
from cylc.flow.option_parsers import (
WORKFLOW_ID_OR_PATH_ARG_DOC,
CylcOptionParser as COP,
icp_option,
can_revalidate,
)
from cylc.flow.templatevars import get_template_vars
from cylc.flow.terminal import cli_function
Expand Down Expand Up @@ -108,9 +110,10 @@ def get_nodes_and_edges(
workflow_id,
start,
stop,
flow_file,
) -> Tuple[List[Node], List[Edge]]:
"""Return graph sorted nodes and edges."""
config = get_config(workflow_id, opts)
config = get_config(workflow_id, opts, flow_file)
if opts.namespaces:
nodes, edges = _get_inheritance_nodes_and_edges(config)
else:
Expand Down Expand Up @@ -194,13 +197,8 @@ def _get_inheritance_nodes_and_edges(
return sorted(nodes), sorted(edges)


def get_config(workflow_id: str, opts: 'Values') -> WorkflowConfig:
def get_config(workflow_id: str, opts: 'Values', flow_file) -> WorkflowConfig:
"""Return a WorkflowConfig object for the provided reg / path."""
workflow_id, _, flow_file = parse_id(
workflow_id,
src=True,
constraint='workflows',
)
template_vars = get_template_vars(opts)
return WorkflowConfig(
workflow_id, flow_file, opts, template_vars=template_vars
Expand Down Expand Up @@ -334,7 +332,7 @@ def open_image(filename):
img.show()


def graph_render(opts, workflow_id, start, stop) -> int:
def graph_render(opts, workflow_id, start, stop, flow_file) -> int:
"""Render the workflow graph to the specified format.
Graph is rendered to the specified format. The Graphviz "dot" format
Expand All @@ -349,6 +347,7 @@ def graph_render(opts, workflow_id, start, stop) -> int:
workflow_id,
start,
stop,
flow_file
)

# format the graph in graphviz-dot format
Expand Down Expand Up @@ -382,28 +381,40 @@ def graph_render(opts, workflow_id, start, stop) -> int:
return 0


def graph_reference(opts, workflow_id, start, stop, write=print) -> int:
def graph_reference(
opts, workflow_id, start, stop, flow_file, write=print,
) -> int:
"""Format the workflow graph using the cylc reference format."""
# get nodes and edges
nodes, edges = get_nodes_and_edges(
opts,
workflow_id,
start,
stop,
flow_file
)
for line in format_cylc_reference(opts, nodes, edges):
write(line)

return 0


def graph_diff(opts, workflow_a, workflow_b, start, stop) -> int:
async def graph_diff(opts, workflow_a, workflow_b, start, stop, flow_file) -> int:
"""Difference the workflow graphs using the cylc reference format."""

workflow_b, _, flow_file_b = await parse_id_async(
workflow_b,
src=True,
constraint='workflows',
)

# load graphs
graph_a: List[str] = []
graph_b: List[str] = []
graph_reference(opts, workflow_a, start, stop, write=graph_a.append),
graph_reference(opts, workflow_b, start, stop, write=graph_b.append),
graph_reference(
opts, workflow_a, start, stop, flow_file, write=graph_a.append),
graph_reference(
opts, workflow_b, start, stop, flow_file_b, write=graph_b.append),

# compare graphs
diff_lines = list(
Expand All @@ -427,6 +438,7 @@ def get_option_parser() -> COP:
parser = COP(
__doc__,
jset=True,
revalidate=True,
argdoc=[
WORKFLOW_ID_OR_PATH_ARG_DOC,
COP.optional(
Expand Down Expand Up @@ -507,20 +519,36 @@ def main(
start: Optional[str] = None,
stop: Optional[str] = None
) -> None:
result = asyncio.run(_main(parser, opts, workflow_id, start, stop))
sys.exit(result)


async def _main(
parser: COP,
opts: 'Values',
workflow_id: str,
start: Optional[str] = None,
stop: Optional[str] = None
) -> int:
"""Implement ``cylc graph``."""
if opts.grouping and opts.namespaces:
raise InputError('Cannot combine --group and --namespaces.')
if opts.cycles and opts.namespaces:
raise InputError('Cannot combine --cycles and --namespaces.')

workflow_id, _, flow_file = await parse_id_async(
workflow_id,
src=True,
constraint='workflows',
)

can_revalidate(flow_file, opts)

if opts.diff:
sys.exit(
graph_diff(opts, workflow_id, opts.diff, start, stop)
)
return await graph_diff(
opts, workflow_id, opts.diff, start, stop, flow_file)
if opts.reference:
sys.exit(
graph_reference(opts, workflow_id, start, stop)
)
sys.exit(
graph_render(opts, workflow_id, start, stop)
)
return graph_reference(
opts, workflow_id, start, stop, flow_file)

return graph_render(opts, workflow_id, start, stop, flow_file)
Loading

0 comments on commit 5935610

Please sign in to comment.