Skip to content

Commit

Permalink
Merge pull request #794 from MetOffice/614_robust_plot_index_writing
Browse files Browse the repository at this point in the history
Create the plot index in finish_website to avoid a data race between concurrent index writers
  • Loading branch information
jfrost-mo authored Aug 23, 2024
2 parents 6141528 + 24ce5cf commit 5c29e5e
Show file tree
Hide file tree
Showing 6 changed files with 160 additions and 120 deletions.
17 changes: 17 additions & 0 deletions src/CSET/_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -328,3 +328,20 @@ def iter_maybe(thing) -> Iterable:
if isinstance(thing, Iterable) and not isinstance(thing, str):
return thing
return (thing,)


def combine_dicts(d1: dict, d2: dict) -> dict:
"""Recursively combines two dictionaries.
Duplicate atoms favour the second dictionary.
"""
# Update existing keys.
for key in d1.keys() & d2.keys():
if isinstance(d1[key], dict):
d1[key] = combine_dicts(d1[key], d2[key])
else:
d1[key] = d2[key]
# Add any new keys.
for key in d2.keys() - d1.keys():
d1[key] = d2[key]
return d1
56 changes: 51 additions & 5 deletions src/CSET/_workflow_utils/finish_website.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,60 @@

"""Write finished status to website front page.
Does the final update to the workflow status on the front page of the web
interface.
Constructs the plot index, and does the final update to the workflow status on
the front page of the web interface.
"""

import json
import logging
import os
from pathlib import Path

from CSET._common import combine_dicts

def run():
"""Run workflow script."""
with open(f"{os.getenv('WEB_DIR')}/status.html", "wt") as fp:
logging.basicConfig(
level=os.getenv("LOGLEVEL", "INFO"), format="%(asctime)s %(levelname)s %(message)s"
)


def construct_index():
"""Construct the plot index.
Index should has the form ``{"Category Name": {"recipe_id": "Plot Name"}}``
where ``recipe_id`` is the name of the plot's directory.
"""
index = {}
plots_dir = Path(os.environ["CYLC_WORKFLOW_SHARE_DIR"] + "/web/plots")
# Loop over all directories, and append to index.
# Only visits the directories directly under the plots directory.
for directory in (d for d in plots_dir.iterdir() if d.is_dir()):
try:
with open(directory / "meta.json", "rt", encoding="UTF-8") as fp:
plot_metadata = json.load(fp)
record = {
plot_metadata["category"]: {directory.name: plot_metadata["title"]}
}
except FileNotFoundError:
# Skip directories without metadata, as are likely not plots.
logging.debug("No meta.json in %s, skipping.", directory)
continue
except (json.JSONDecodeError, KeyError, TypeError) as err:
logging.error("%s is invalid, skipping.\n%s", directory / "meta.json", err)
continue
index = combine_dicts(index, record)

with open(plots_dir / "index.json", "wt", encoding="UTF-8") as fp:
json.dump(index, fp)


def update_workflow_status():
"""Update the workflow status on the front page of the web interface."""
web_dir = Path(os.environ["CYLC_WORKFLOW_SHARE_DIR"] + "/web")
with open(web_dir / "status.html", "wt") as fp:
fp.write("<p>Finished</p>\n")


def run():
"""Do the final steps to finish the website."""
construct_index()
update_workflow_status()
56 changes: 0 additions & 56 deletions src/CSET/_workflow_utils/run_cset_recipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

"""Run a recipe with the CSET CLI."""

import fcntl
import json
import logging
import os
import subprocess
Expand All @@ -16,46 +14,6 @@
)


def combine_dicts(d1: dict, d2: dict) -> dict:
"""Recursively combines two dictionaries.
Duplicate atoms favour the second dictionary.
"""
# Update existing keys.
for key in d1.keys() & d2.keys():
if isinstance(d1[key], dict):
d1[key] = combine_dicts(d1[key], d2[key])
else:
d1[key] = d2[key]
# Add any new keys.
for key in d2.keys() - d1.keys():
d1[key] = d2[key]
return d1


def append_to_index(record: dict):
"""Append the plot record to the index file.
Record should have the form {"Category Name": {"recipe_id": "Plot Name"}}
"""
# Plot index is at {run}/share/web/plots/index.json
index_path = Path(os.getenv("CYLC_WORKFLOW_SHARE_DIR"), "web/plots/index.json")
with open(index_path, "a+t", encoding="UTF-8") as fp:
# Lock file until closed.
fcntl.flock(fp, fcntl.LOCK_EX)
# Open in append mode then seek back to avoid errors if the file does
# not exist.
fp.seek(0)
try:
index = json.load(fp)
except json.JSONDecodeError:
index = {}
index = combine_dicts(index, record)
fp.seek(0)
fp.truncate()
json.dump(index, fp)


def subprocess_env():
"""Create a dictionary of amended environment variables for subprocess."""
env_mapping = dict(os.environ)
Expand Down Expand Up @@ -125,19 +83,6 @@ def create_diagnostic_archive(output_directory):
archive.write(file, arcname=file.relative_to(output_directory))


# Not covered by tests as will soon be removed in #794.
def add_to_diagnostic_index(output_directory, recipe_id): # pragma: no cover
"""Add the diagnostic to the plot index if it isn't already there."""
output_directory = Path(output_directory)
with open(output_directory / "meta.json", "rt", encoding="UTF-8") as fp:
recipe_meta = json.load(fp)
title = recipe_meta.get("title", "Unknown")
category = recipe_meta.get("category", "Unknown")

# Add plot to plot index.
append_to_index({category: {recipe_id: title}})


# Not covered by tests as will soon be removed in #765.
def parallel(): # pragma: no cover
"""Process raw data in parallel."""
Expand Down Expand Up @@ -193,7 +138,6 @@ def collate(): # pragma: no cover
logging.error("cset bake exited non-zero while collating.")
raise
create_diagnostic_archive(output_directory())
add_to_diagnostic_index(output_directory(), recipe_id())


def run():
Expand Down
8 changes: 8 additions & 0 deletions tests/test_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,3 +268,11 @@ def test_iter_maybe_string():
for value in created_iterable:
# The same object is inside the iterable.
assert value is atom


def test_combine_dicts():
"""Test combine_dicts function."""
d1 = {"a": 1, "b": 2, "c": {"d": 3, "e": 4}}
d2 = {"b": 3, "c": {"d": 5, "f": 6}}
expected = {"a": 1, "b": 3, "c": {"d": 5, "e": 4, "f": 6}}
assert common.combine_dicts(d1, d2) == expected
87 changes: 84 additions & 3 deletions tests/workflow_utils/test_finish_website.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,93 @@

"""Tests for finish_website workflow utility."""

import json
import logging

from CSET._workflow_utils import finish_website


def test_write_workflow_status(monkeypatch, tmp_path):
"""Workflow finish status gets written to status file."""
monkeypatch.setenv("WEB_DIR", str(tmp_path))
finish_website.run()
with open(tmp_path / "status.html") as fp:
web_dir = tmp_path / "web"
web_dir.mkdir()
monkeypatch.setenv("CYLC_WORKFLOW_SHARE_DIR", str(tmp_path))
finish_website.update_workflow_status()
with open(web_dir / "status.html", "rt", encoding="UTF-8") as fp:
assert fp.read() == "<p>Finished</p>\n"


def test_construct_index(monkeypatch, tmp_path):
"""Test putting the index together."""
monkeypatch.setenv("CYLC_WORKFLOW_SHARE_DIR", str(tmp_path))
plots_dir = tmp_path / "web/plots"
plots_dir.mkdir(parents=True)

# Plot directories.
plot1 = plots_dir / "p1/meta.json"
plot1.parent.mkdir()
plot1.write_text('{"category": "Category", "title": "P1"}')
plot2 = plots_dir / "p2/meta.json"
plot2.parent.mkdir()
plot2.write_text('{"category": "Category", "title": "P2"}')

# Non-plot directory also present.
static_resource = plots_dir / "static/style.css"
static_resource.parent.mkdir()
static_resource.touch()

# Construct index.
finish_website.construct_index()

# Check index.
index_file = plots_dir / "index.json"
assert index_file.is_file()
with open(index_file, "rt", encoding="UTF-8") as fp:
index = json.load(fp)
expected = {"Category": {"p1": "P1", "p2": "P2"}}
assert index == expected


def test_construct_index_invalid(monkeypatch, tmp_path, caplog):
"""Test constructing index when metadata is invalid."""
monkeypatch.setenv("CYLC_WORKFLOW_SHARE_DIR", str(tmp_path))
plots_dir = tmp_path / "web/plots"
plots_dir.mkdir(parents=True)

# Plot directories.
plot = plots_dir / "p1/meta.json"
plot.parent.mkdir()
plot.write_text('"Not JSON!"')

# Construct index.
finish_website.construct_index()

# Check log message.
_, level, message = caplog.record_tuples[0]
assert level == logging.ERROR
assert "p1/meta.json is invalid, skipping." in message

index_file = plots_dir / "index.json"
assert index_file.is_file()
with open(index_file, "rt", encoding="UTF-8") as fp:
index = json.load(fp)
expected = {}
assert index == expected


def test_entrypoint(monkeypatch):
"""Test running the finish_website module."""
# Count the number of times the other functions are run, to ensure they
# are both run.
counter = 0

def increment_counter():
nonlocal counter
counter += 1

monkeypatch.setattr(finish_website, "construct_index", increment_counter)
monkeypatch.setattr(finish_website, "update_workflow_status", increment_counter)

# Just check that it runs all the needed subfunctions.
finish_website.run()
assert counter == 2
56 changes: 0 additions & 56 deletions tests/workflow_utils/test_run_cset_recipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

"""Tests for run_cset_recipe workflow utility."""

import json
import os
import subprocess
import zipfile
Expand All @@ -25,61 +24,6 @@
from CSET._workflow_utils import run_cset_recipe


def test_combine_dicts():
"""Test combine_dicts function."""
d1 = {"a": 1, "b": 2, "c": {"d": 3, "e": 4}}
d2 = {"b": 3, "c": {"d": 5, "f": 6}}
expected = {"a": 1, "b": 3, "c": {"d": 5, "e": 4, "f": 6}}
assert run_cset_recipe.combine_dicts(d1, d2) == expected


def test_append_to_index(monkeypatch, tmp_path):
"""Test appending to index."""
index_path = tmp_path / "web/plots/index.json"
index_path.parent.mkdir(parents=True)
monkeypatch.setenv("CYLC_WORKFLOW_SHARE_DIR", str(tmp_path))
with open(index_path, "wt", encoding="UTF-8") as fp:
json.dump(
{
"Category Name": {"recipe_id_a": "Title A"},
"Other Category": {"recipe_id_b": "Title B"},
},
fp,
)
record = {"Category Name": {"recipe_id": "Plot Name"}}
run_cset_recipe.append_to_index(record)
expected = {
"Category Name": {"recipe_id": "Plot Name", "recipe_id_a": "Title A"},
"Other Category": {"recipe_id_b": "Title B"},
}
with open(index_path, "rt", encoding="UTF-8") as fp:
assert json.load(fp) == expected


def test_append_to_index_missing(monkeypatch, tmp_path):
"""Test appending to index when index does not yet exist."""
index_path = tmp_path / "web/plots/index.json"
index_path.parent.mkdir(parents=True)
monkeypatch.setenv("CYLC_WORKFLOW_SHARE_DIR", str(tmp_path))
record = {"Category Name": {"recipe_id": "Plot Name"}}
run_cset_recipe.append_to_index(record)
with open(index_path, "rt", encoding="UTF-8") as fp:
assert json.load(fp) == record


def test_append_to_index_invalid(monkeypatch, tmp_path):
"""Test appending to index when existing content is invalid."""
index_path = tmp_path / "web/plots/index.json"
index_path.parent.mkdir(parents=True)
monkeypatch.setenv("CYLC_WORKFLOW_SHARE_DIR", str(tmp_path))
with open(index_path, "wt", encoding="UTF-8") as fp:
fp.write("Not JSON!")
record = {"Category Name": {"recipe_id": "Plot Name"}}
run_cset_recipe.append_to_index(record)
with open(index_path, "rt", encoding="UTF-8") as fp:
assert json.load(fp) == record


def test_subprocess_env(monkeypatch):
"""Test subprocess_env function."""
monkeypatch.setenv("CYLC_TASK_CYCLE_POINT", "2000-01-01T00:00:00Z")
Expand Down

0 comments on commit 5c29e5e

Please sign in to comment.