Skip to content

Commit

Permalink
Change published to visible and unpublished to shadowed.
Browse files Browse the repository at this point in the history
  • Loading branch information
eigerx committed Sep 16, 2024
1 parent e3e7932 commit 13b6372
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 51 deletions.
98 changes: 50 additions & 48 deletions python/lsst/pipe/base/quantum_provenance_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,20 +242,22 @@ class DatasetRun(pydantic.BaseModel):
"""Whether the specific run produced the dataset.
"""

published: bool = False
"""Whether this dataset was published in the final output collection.
visible: bool = False
"""Whether this dataset is visible in the final output collection; in other
words, whether this dataset is queryable in a find-first search. This
determines whether it will be used as an input to further processing.
"""

@pydantic.model_validator(mode="after")
def _validate(self) -> DatasetRun:
"""Validate the model for `DatasetRun` by asserting that no published
"""Validate the model for `DatasetRun` by asserting that no visible
`DatasetRun` is also not produced (this should be impossible).
"""
assert not (self.published and not self.produced)
assert not (self.visible and not self.produced)
return self


DatasetInfoStatus: TypeAlias = Literal["published", "unpublished", "predicted_only", "unsuccessful", "cursed"]
DatasetInfoStatus: TypeAlias = Literal["visible", "shadowed", "predicted_only", "unsuccessful", "cursed"]


class DatasetInfo(TypedDict):
Expand All @@ -277,21 +279,21 @@ class DatasetInfo(TypedDict):
Possible Statuses
-----------------
`published`: The dataset is queryable in a find_first search. This means
`visible`: The dataset is queryable in a find_first search. This means
that it can be used as an input by subsequent tasks and processing.
`unpublished`: The dataset exists but is not queryable in a find_first
`shadowed`: The dataset exists but is not queryable in a find_first
search. This could mean that the version of this dataset which is
passed as an input to further processing is not in the collections
given. An `unpublished` dataset will not be used as an input to further
given. A `shadowed` dataset will not be used as an input to further
processing.
`predicted_only`: The dataset was predicted, and was not published in any
`predicted_only`: The dataset was predicted, and was not visible in any
run, but was the successor of a successful quantum. These datasets are
the result of pipelines `NoWorkFound` cases, in which a dataset is
predicted in the graph but found to not be necessary in processing.
`unsuccessful`: The dataset was not produced. These are the results of
failed or blocked quanta.
`cursed`: The dataset was the result of an unsuccessful quantum and was
published in the output collection anyway. These are flagged as
visible in the output collection anyway. These are flagged as
`cursed` so that they may be caught before they become inputs to
further processing.
"""
Expand Down Expand Up @@ -454,8 +456,8 @@ class CursedDatasetSummary(pydantic.BaseModel):
"""A dictionary of all the runs associated with the `cursed` dataset;
the `bool` is true if the dataset was produced in the associated run.
"""
run_published: str | None
"""A dictionary of all `published` runs containing the `cursed` dataset.
run_visible: str | None
"""A dictionary of all `visible` runs containing the `cursed` dataset.
"""
messages: list[str]
"""Any diagnostic messages (dictated in this module) which might help in
Expand All @@ -475,13 +477,13 @@ def from_info(cls, info: DatasetInfo, producer_info: QuantumInfo) -> CursedDatas
All relevant information on the producer task. This is used to
report the data_id of the producer task.
"""
runs_published = {k for k, v in info["runs"].items() if v.published}
runs_visible = {k for k, v in info["runs"].items() if v.visible}
return cls(
producer_data_id=dict(producer_info["data_id"].required),
data_id=dict(info["data_id"].required),
runs_produced={k: v.produced for k, v in info["runs"].items()},
# this has at most one element
run_published=runs_published.pop() if runs_published else None,
run_visible=runs_visible.pop() if runs_visible else None,
messages=info["messages"],
)

Expand All @@ -495,13 +497,13 @@ class DatasetTypeSummary(pydantic.BaseModel):
"""The name of the task which produced this dataset.
"""

n_published: int = 0
"""A count of the datasets of this type which were published in the
n_visible: int = 0
"""A count of the datasets of this type which were visible in the
finalized collection(s).
"""
n_unpublished: int = 0
n_shadowed: int = 0
"""A count of the datasets of this type which were produced but not
published. This includes any datasets which do not come up in a butler
visible. This includes any datasets which do not come up in a butler
query over their associated collection.
"""
n_predicted_only: int = 0
Expand Down Expand Up @@ -555,10 +557,10 @@ def add_dataset_info(self, info: DatasetInfo, producer_info: QuantumInfo) -> Non
specific issues.
"""
match info["status"]:
case "published":
self.n_published += 1
case "unpublished":
self.n_unpublished += 1
case "visible":
self.n_visible += 1
case "shadowed":
self.n_shadowed += 1
case "unsuccessful":
self.unsuccessful_datasets.append(dict(info["data_id"].mapping))
case "cursed":
Expand Down Expand Up @@ -842,7 +844,7 @@ def resolve_duplicates(
Particularly, use the publish state of each `DatasetRun` in combination
with overall quantum status to ascertain the status of each dataset.
Additionally, if there are multiple published runs associated with a
Additionally, if there are multiple visible runs associated with a
dataset, mark the producer quantum as `wonky`.
This method should be called after
Expand All @@ -863,11 +865,11 @@ def resolve_duplicates(
A "where" string to use to constrain the collections, if passed.
curse_failed_logs : `bool`
Mark log datasets as `cursed` if they are published in the final
Mark log datasets as `cursed` if they are visible in the final
output collection. Note that a campaign-level collection must be
used here for `collections` if `curse_failed_logs` is `True`; if
`resolve_duplicates` is run on a list of group-level collections
then each will show logs from their own failures as published
then each will show logs from their own failures as visible
the datasets will show as cursed regardless of this flag.
"""
# First thing: raise an error if resolve_duplicates has been run
Expand All @@ -894,49 +896,49 @@ def resolve_duplicates(
# graphs given.
except KeyError:
continue
# queryable datasets are `published`.
dataset_info["runs"][ref.run].published = True
# queryable datasets are `visible`.
dataset_info["runs"][ref.run].visible = True

for task_quanta in self._quanta.values():
for quantum_key in task_quanta:
# runs associated with published datasets.
published_runs: set[str] = set()
# runs associated with visible datasets.
visible_runs: set[str] = set()
quantum_info = self.get_quantum_info(quantum_key)
# Loop over each dataset in the outputs of a single quantum.
for dataset_key in self.iter_outputs_of(quantum_key):
dataset_info = self.get_dataset_info(dataset_key)
published_runs.update(
run for run, dataset_run in dataset_info["runs"].items() if dataset_run.published
visible_runs.update(
run for run, dataset_run in dataset_info["runs"].items() if dataset_run.visible
)
if any(dataset_run.published for dataset_run in dataset_info["runs"].values()):
publish_state = "published"
# set the publish state to `unpublished` if the dataset was
# produced but not published (i.e., not queryable from the
if any(dataset_run.visible for dataset_run in dataset_info["runs"].values()):
publish_state = "visible"
# set the publish state to `shadowed` if the dataset was
# produced but not visible (i.e., not queryable from the
# final collection(s)).
elif any(dataset_run.produced for dataset_run in dataset_info["runs"].values()):
publish_state = "unpublished"
# a dataset which was not produced and not published is
publish_state = "shadowed"
# a dataset which was not produced and not visible is
# missing.
else:
publish_state = "missing"
# use the quantum status and publish state to ascertain the
# status of the dataset.
match (quantum_info["status"], publish_state):
# published datasets from successful quanta are as
# visible datasets from successful quanta are as
# intended.
case ("successful", "published"):
dataset_info["status"] = "published"
case ("successful", "visible"):
dataset_info["status"] = "visible"
# missing datasets from successful quanta indicate a
# `NoWorkFound` case.
case ("successful", "missing"):
dataset_info["status"] = "predicted_only"
case ("successful", "unpublished"):
dataset_info["status"] = "unpublished"
case ("successful", "shadowed"):
dataset_info["status"] = "shadowed"
# If anything other than a successful quantum produces
# a published dataset, that dataset is cursed. Set the
# a visible dataset, that dataset is cursed. Set the
# status for the dataset to cursed and note the reason
# for labeling the dataset as cursed.
case (_, "published"):
case (_, "visible"):
# Avoiding publishing failed logs is difficult
# without using tagged collections, so flag them as
# merely unsuccessful unless the user requests it.
Expand All @@ -945,18 +947,18 @@ def resolve_duplicates(
else:
dataset_info["status"] = "cursed"
dataset_info["messages"].append(
f"Unsuccessful dataset {dataset_type_name} published in "
f"Unsuccessful dataset {dataset_type_name} visible in "
"final output collection."
)
# any other produced dataset (produced but not
# published and not successful) is a regular
# visible and not successful) is a regular
# failure.
case _:
dataset_info["status"] = "unsuccessful"
if len(published_runs) > 1:
if len(visible_runs) > 1:
quantum_info["status"] = "wonky"
quantum_info["messages"].append(
f"Outputs from different runs of the same quanta were published: {published_runs}."
f"Outputs from different runs of the same quanta were visible: {visible_runs}."
)
for dataset_key in self.iter_outputs_of(quantum_key):
dataset_info = self.get_dataset_info(dataset_key)
Expand Down
6 changes: 3 additions & 3 deletions tests/test_quantum_provenance_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def test_qpg_reports(self) -> None:
TaskSummary(
n_successful=0,
n_blocked=0,
n_not_attempted=1,
n_unknown=1,
n_expected=1,
failed_quanta=[],
recovered_quanta=[],
Expand Down Expand Up @@ -106,8 +106,8 @@ def test_qpg_reports(self) -> None:
# Check dataset counts (can't be done all in one because
# datasets have different producers), but all the counts for
# each task should be the same.
self.assertEqual(dataset_type_summary.n_published, 0)
self.assertEqual(dataset_type_summary.n_unpublished, 0)
self.assertEqual(dataset_type_summary.n_visible, 0)
self.assertEqual(dataset_type_summary.n_shadowed, 0)
self.assertEqual(dataset_type_summary.n_predicted_only, 0)
self.assertEqual(dataset_type_summary.n_expected, 1)
self.assertEqual(dataset_type_summary.n_cursed, 0)
Expand Down

0 comments on commit 13b6372

Please sign in to comment.