From 253edad52ab2b7c86176facc973f52102d18265b Mon Sep 17 00:00:00 2001 From: Orion Eiger Date: Wed, 18 Sep 2024 12:37:07 -0700 Subject: [PATCH] Change status Literals to Enums --- .../pipe/base/quantum_provenance_graph.py | 261 ++++++++++-------- 1 file changed, 145 insertions(+), 116 deletions(-) diff --git a/python/lsst/pipe/base/quantum_provenance_graph.py b/python/lsst/pipe/base/quantum_provenance_graph.py index faa926bba..86abbbd06 100644 --- a/python/lsst/pipe/base/quantum_provenance_graph.py +++ b/python/lsst/pipe/base/quantum_provenance_graph.py @@ -43,7 +43,8 @@ import logging import uuid from collections.abc import Iterator, Sequence -from typing import TYPE_CHECKING, ClassVar, Literal, NamedTuple, TypeAlias, TypedDict, cast +from enum import Enum +from typing import TYPE_CHECKING, ClassVar, Literal, NamedTuple, TypedDict, cast import networkx import pydantic @@ -132,7 +133,28 @@ class PrerequisiteDatasetKey(NamedTuple): """ -QuantumRunStatus: TypeAlias = Literal["failed", "successful", "logs_missing", "blocked", "metadata_missing"] +class QuantumRunStatus(Enum): + """Enum describing the status of a quantum-run collection combination. + + Possible Statuses + ----------------- + METADATA_MISSING = -3: Metadata is missing for this quantum in this run. + It is impossible to tell whether execution of this quantum was + attempted due to missing metadata. + LOGS_MISSING = -2: Logs are missing for this quantum in this run. It was + attempted, but it is impossible to tell if it succeeded or failed due + to missing logs. + FAILED = -1: Attempts to execute the quantum failed in this run. + BLOCKED = 0: This run does not include an executed version of this + quantum because an upstream task failed. + SUCCESSFUL = 1: This quantum was executed successfully in this run. + """ + + METADATA_MISSING = -3 + LOGS_MISSING = -2 + FAILED = -1 + BLOCKED = 0 + SUCCESSFUL = 1 class QuantumRun(pydantic.BaseModel): @@ -142,25 +164,50 @@ class QuantumRun(pydantic.BaseModel): """The quantum graph node ID associated with the dataId in a specific run. """ - status: QuantumRunStatus = "metadata_missing" + status: QuantumRunStatus = QuantumRunStatus.METADATA_MISSING """The status of the quantum in that run. + """ + + +class QuantumInfoStatus(Enum): + """The status of a quantum (a particular task run on a particular dataID) + across all runs. Possible Statuses ----------------- - `failed`: Attempts to execute the quantum failed in this run. - `successful`: This quantum was executed successfully in this run. - `logs_missing`: Logs are missing for this quantum in this run. It was - attempted, but it is impossible to tell if it succeeded or failed due - to missing logs. - `blocked`: This run does not include an executed version of this quantum - because an upstream task failed. - `metadata_missing`: Metadata is missing for this quantum in this run. It is - impossible to tell whether execution of this quantum was attempted due - to missing metadata. + WONKY = -3: The overall state of this quantum reflects inconsistencies or + is difficult to discern. There are a few specific ways to enter a wonky + state; it is impossible to exit and requires human intervention to + proceed with processing. + Currently, a quantum enters a wonky state for one of three reasons: + - Its `QuantumInfoStatus` exits a successful state. Something that + initially succeeded fails on + - A `QuantumRun` is missing logs. + - There are multiple runs associated with a dataset which comes up in a + findFirst search. This means that a dataset which will be used as an + input data product for further processing has heterogeneous inputs, + which may have had different inputs or a different data-query. + FAILED = -2: These quanta were attempted and failed. Failed quanta have + logs and no metadata. + UNKNOWN = -1: These are quanta which do not have any metadata associated + with processing, but for which it is impossible to tell the status due + to an additional absence of logs. Quanta which had not been processed + at all would reflect this state, as would quanta which were + conceptualized in the construction of the quantum graph but later + identified to be unneccesary or erroneous (deemed NoWorkFound by the + Science Pipelines). + BLOCKED = 0: The quantum is not able to execute because its inputs are + missing due to an upstream failure. Blocked quanta are distinguished + from failed quanta by being successors of failed quanta in the graph. + All the successors of blocked quanta are also marked as blocked. + SUCCESSFUL = 1: Attempts at executing this quantum were successful. """ - -QuantumInfoStatus: TypeAlias = Literal["successful", "wonky", "blocked", "unknown", "failed"] + WONKY = -3 + FAILED = -2 + UNKNOWN = -1 + BLOCKED = 0 + SUCCESSFUL = 1 class QuantumInfo(TypedDict): @@ -181,35 +228,6 @@ class QuantumInfo(TypedDict): status: QuantumInfoStatus """The overall status of the quantum. Note that it is impossible to exit a wonky state. - - Possible Statuses - ----------------- - `successful`: Attempts at executing this quantum were successful. - `wonky`: The overall state of this quantum reflects inconsistencies or is - difficult to discern. There are a few specific ways to enter a wonky - state; it is impossible to exit and requires human intervention to - proceed with processing. - Currently, a quantum enters a wonky state for one of three reasons: - - Its `QuantumInfoStatus` exits a successful state. Something that - initially succeeded fails on - - A `QuantumRun` is missing logs. - - There are multiple runs associated with a dataset which comes up in a - findFirst search. This means that a dataset which will be used as an - input data product for further processing has heterogeneous inputs, - which may have had different inputs or a different data-query. - `blocked`: The quantum is not able to execute because its inputs are - missing due to an upstream failure. Blocked quanta are distinguished - from failed quanta by being successors of failed quanta in the graph. - All the successors of blocked quanta are also marked as blocked. - `unknown`: These are quanta which do not have any metadata associated - with processing, but for which it is impossible to tell the status due - to an additional absence of logs. Quanta which had not been processed - at all would reflect this state, as would quanta which were - conceptualized in the construction of the quantum graph but later - identified to be unneccesary or erroneous (deemed `NoWorkFound` by the - Science Pipelines). - `failed`: These quanta were attempted and failed. Failed quanta have logs - and no metadata. """ recovered: bool @@ -257,7 +275,35 @@ def _validate(self) -> DatasetRun: return self -DatasetInfoStatus: TypeAlias = Literal["visible", "shadowed", "predicted_only", "unsuccessful", "cursed"] +class DatasetInfoStatus(Enum): + """Status of the the DatasetType-dataID pair over all runs. + + Possible Statuses + ----------------- + CURSED: The dataset was the result of an unsuccessful quantum and was + visible in the output collection anyway. These are flagged as + cursed so that they may be caught before they become inputs to + further processing. + UNSUCCESSFUL: The dataset was not produced. These are the results of + failed or blocked quanta. + PREDICTED_ONLY: The dataset was predicted, and was not visible in any + run, but was the successor of a successful quantum. These datasets are + the result of pipelines NoWorkFound cases, in which a dataset is + predicted in the graph but found to not be necessary in processing. + SHADOWED: The dataset exists but is not queryable in a find_first + search. This could mean that the version of this dataset which is + passed as an input to further processing is not in the collections + given. A shadowed dataset will not be used as an input to further + processing. + VISIBLE: The dataset is queryable in a find_first search. This means + that it can be used as an input by subsequent tasks and processing. + """ + + CURSED = -2 + UNSUCCESSFUL = -1 + PREDICTED_ONLY = 0 + SHADOWED = 1 + VISIBLE = 2 class DatasetInfo(TypedDict): @@ -276,26 +322,6 @@ class DatasetInfo(TypedDict): status: DatasetInfoStatus """Overall status of the dataset. - - Possible Statuses - ----------------- - `visible`: The dataset is queryable in a find_first search. This means - that it can be used as an input by subsequent tasks and processing. - `shadowed`: The dataset exists but is not queryable in a find_first - search. This could mean that the version of this dataset which is - passed as an input to further processing is not in the collections - given. A `shadowed` dataset will not be used as an input to further - processing. - `predicted_only`: The dataset was predicted, and was not visible in any - run, but was the successor of a successful quantum. These datasets are - the result of pipelines `NoWorkFound` cases, in which a dataset is - predicted in the graph but found to not be necessary in processing. - `unsuccessful`: The dataset was not produced. These are the results of - failed or blocked quanta. - `cursed`: The dataset was the result of an unsuccessful quantum and was - visible in the output collection anyway. These are flagged as - `cursed` so that they may be caught before they become inputs to - further processing. """ messages: list[str] @@ -313,9 +339,9 @@ class UnsuccessfulQuantumSummary(pydantic.BaseModel): data_id: dict[str, DataIdValue] """The data_id of the unsuccessful quantum. """ - runs: dict[str, QuantumRunStatus] - """A dictionary including the `QuantumRunStatus` of each run associated - with an attempt to process the unsuccessful quantum. + runs: dict[str, str] + """A dictionary including the enum name of the `QuantumRunStatus` of each + run associated with an attempt to process the unsuccessful quantum. """ messages: list[str] """Any messages associated with the unsuccessful quantum (any clues as to @@ -334,7 +360,7 @@ def from_info(cls, info: QuantumInfo) -> UnsuccessfulQuantumSummary: """ return cls( data_id=dict(info["data_id"].required), - runs={k: v.status for k, v in info["runs"].items()}, + runs={k: v.status.name for k, v in info["runs"].items()}, messages=info["messages"], ) @@ -407,15 +433,15 @@ def add_quantum_info(self, info: QuantumInfo, butler: Butler, do_store_logs: boo if `True`. """ match info["status"]: - case "successful": + case QuantumInfoStatus.SUCCESSFUL: self.n_successful += 1 if info["recovered"]: self.recovered_quanta.append(dict(info["data_id"].required)) - case "wonky": + case QuantumInfoStatus.WONKY: self.wonky_quanta.append(UnsuccessfulQuantumSummary.from_info(info)) - case "blocked": + case QuantumInfoStatus.BLOCKED: self.n_blocked += 1 - case "failed": + case QuantumInfoStatus.FAILED: failed_quantum_summary = UnsuccessfulQuantumSummary.from_info(info) log_key = info["log"] if do_store_logs: @@ -435,7 +461,7 @@ def add_quantum_info(self, info: QuantumInfo, butler: Butler, do_store_logs: boo [record.message for record in log if record.levelno >= logging.ERROR] ) self.failed_quanta.append(failed_quantum_summary) - case "unknown": + case QuantumInfoStatus.UNKNOWN: self.n_unknown += 1 case unrecognized_state: raise AssertionError(f"Unrecognized quantum status {unrecognized_state!r}") @@ -557,15 +583,15 @@ def add_dataset_info(self, info: DatasetInfo, producer_info: QuantumInfo) -> Non specific issues. """ match info["status"]: - case "visible": + case DatasetInfoStatus.VISIBLE: self.n_visible += 1 - case "shadowed": + case DatasetInfoStatus.SHADOWED: self.n_shadowed += 1 - case "unsuccessful": + case DatasetInfoStatus.UNSUCCESSFUL: self.unsuccessful_datasets.append(dict(info["data_id"].mapping)) - case "cursed": + case DatasetInfoStatus.CURSED: self.cursed_datasets.append(CursedDatasetSummary.from_info(info, producer_info)) - case "predicted_only": + case DatasetInfoStatus.PREDICTED_ONLY: self.n_predicted_only += 1 case unrecognized_state: raise AssertionError(f"Unrecognized dataset status {unrecognized_state!r}") @@ -683,7 +709,7 @@ def __add_new_graph(self, butler: Butler, qgraph: QuantumGraph | ResourcePathExp quantum_info.setdefault("messages", []) quantum_info.setdefault("runs", {}) quantum_info.setdefault("data_id", cast(DataCoordinate, node.quantum.dataId)) - quantum_info.setdefault("status", "unknown") + quantum_info.setdefault("status", QuantumInfoStatus.UNKNOWN) quantum_info.setdefault("recovered", False) new_quanta.append(quantum_key) self._quanta.setdefault(quantum_key.task_label, set()).add(quantum_key) @@ -705,7 +731,7 @@ def __add_new_graph(self, butler: Butler, qgraph: QuantumGraph | ResourcePathExp # the dataset and set defaults for its values. dataset_info = self.get_dataset_info(dataset_key) dataset_info.setdefault("data_id", ref.dataId) - dataset_info.setdefault("status", "predicted_only") + dataset_info.setdefault("status", DatasetInfoStatus.PREDICTED_ONLY) dataset_info.setdefault("messages", []) self._datasets.setdefault(dataset_key.parent_dataset_type_name, set()).add(dataset_key) dataset_runs = dataset_info.setdefault("runs", {}) @@ -745,14 +771,14 @@ def __add_new_graph(self, butler: Butler, qgraph: QuantumGraph | ResourcePathExp if metadata_dataset_run.produced: # if we also have logs, this is a success. if log_dataset_run.produced: - quantum_run.status = "successful" + quantum_run.status = QuantumRunStatus.SUCCESSFUL # if we have metadata and no logs, this is a very rare # case. either the task ran successfully and the datastore # died immediately afterwards, or some supporting # infrastructure for transferring the logs to the datastore # failed. else: - quantum_run.status = "logs_missing" + quantum_run.status = QuantumRunStatus.LOGS_MISSING # missing metadata means that the task did not finish. else: @@ -760,7 +786,7 @@ def __add_new_graph(self, butler: Butler, qgraph: QuantumGraph | ResourcePathExp # a failure in the task itself. This includes all payload # errors and some other problems. if log_dataset_run.produced: - quantum_run.status = "failed" + quantum_run.status = QuantumRunStatus.FAILED # if a quantum fails, all its successor datasets are # blocked. blocked.update(self._xgraph.successors(quantum_key)) @@ -772,11 +798,11 @@ def __add_new_graph(self, butler: Butler, qgraph: QuantumGraph | ResourcePathExp # metadata must just be missing. if blocked.isdisjoint(self._xgraph.predecessors(quantum_key)): # None of this quantum's inputs were blocked. - quantum_run.status = "metadata_missing" + quantum_run.status = QuantumRunStatus.METADATA_MISSING # otherwise we can assume from no metadata and no logs # that the task was blocked by an upstream failure. else: - quantum_run.status = "blocked" + quantum_run.status = QuantumRunStatus.BLOCKED blocked.update(self._xgraph.successors(quantum_key)) # Now we can start using state transitions to mark overall status. @@ -784,50 +810,53 @@ def __add_new_graph(self, butler: Butler, qgraph: QuantumGraph | ResourcePathExp new_status: QuantumInfoStatus match last_status, quantum_run.status: # A quantum can never escape a `wonky` state. - case ("wonky", _): - new_status = "wonky" + case (QuantumInfoStatus.WONKY, _): + new_status = QuantumInfoStatus.WONKY # Any transition to a success (excluding from `wonky`) is # a success; any transition from a failed state is also a # recovery. - case (_, "successful"): - new_status = "successful" - if last_status != "successful" and last_status != "unknown": + case (_, QuantumRunStatus.SUCCESSFUL): + new_status = QuantumInfoStatus.SUCCESSFUL + if ( + last_status != QuantumInfoStatus.SUCCESSFUL + and last_status != QuantumInfoStatus.UNKNOWN + ): quantum_info["recovered"] = True # Missing logs are one of the categories of wonky quanta. They # interfere with our ability to discern quantum status and are # signs of weird things afoot in processing. Add a message # noting why this quantum is being marked as wonky to be stored # in its `UnsuccessfulQuantumInfo`. - case (_, "logs_missing"): - new_status = "wonky" + case (_, QuantumRunStatus.LOGS_MISSING): + new_status = QuantumInfoStatus.WONKY quantum_info["messages"].append(f"Logs missing for run {output_run!r}.") # Leaving a successful state is another category of wonky # quanta. If a previous success fails on a subsequent run, # a human should inspect why. Add a message noting why this # quantum is being marked as wonky to be stored in its # `UnsuccessfulQuantumInfo`. - case ("successful", _): - new_status = "wonky" + case (QuantumInfoStatus.SUCCESSFUL, _): + new_status = QuantumInfoStatus.WONKY quantum_info["messages"].append( f"Status went from successful in run {list(quantum_info['runs'].values())[-1]!r} " f"to {quantum_run.status!r} in {output_run!r}." ) # If a quantum status is unknown and it moves to blocked, we # know for sure that it is a blocked quantum. - case ("unknown", "blocked"): - new_status = "blocked" + case (QuantumInfoStatus.UNKNOWN, QuantumRunStatus.BLOCKED): + new_status = QuantumInfoStatus.BLOCKED # A transition into blocked does not change the overall quantum # status for a failure. - case (_, "blocked"): + case (_, QuantumRunStatus.BLOCKED): new_status = last_status # If a quantum transitions from any state into missing # metadata, we don't have enough information to diagnose its # state. - case (_, "metadata_missing"): - new_status = "unknown" + case (_, QuantumRunStatus.METADATA_MISSING): + new_status = QuantumInfoStatus.UNKNOWN # Any transition into failure is a failed state. - case (_, "failed"): - new_status = "failed" + case (_, QuantumRunStatus.FAILED): + new_status = QuantumInfoStatus.FAILED # Update `QuantumInfo.status` for this quantum. quantum_info["status"] = new_status @@ -911,29 +940,29 @@ def __resolve_duplicates( run for run, dataset_run in dataset_info["runs"].items() if dataset_run.visible ) if any(dataset_run.visible for dataset_run in dataset_info["runs"].values()): - publish_state = "visible" + query_state = "visible" # set the publish state to `shadowed` if the dataset was # produced but not visible (i.e., not queryable from the # final collection(s)). elif any(dataset_run.produced for dataset_run in dataset_info["runs"].values()): - publish_state = "shadowed" + query_state = "shadowed" # a dataset which was not produced and not visible is # missing. else: - publish_state = "missing" + query_state = "missing" # use the quantum status and publish state to ascertain the # status of the dataset. - match (quantum_info["status"], publish_state): + match (quantum_info["status"], query_state): # visible datasets from successful quanta are as # intended. - case ("successful", "visible"): - dataset_info["status"] = "visible" + case (QuantumInfoStatus.SUCCESSFUL, "visible"): + dataset_info["status"] = DatasetInfoStatus.VISIBLE # missing datasets from successful quanta indicate a # `NoWorkFound` case. - case ("successful", "missing"): - dataset_info["status"] = "predicted_only" - case ("successful", "shadowed"): - dataset_info["status"] = "shadowed" + case (QuantumInfoStatus.SUCCESSFUL, "missing"): + dataset_info["status"] = DatasetInfoStatus.PREDICTED_ONLY + case (QuantumInfoStatus.SUCCESSFUL, "shadowed"): + dataset_info["status"] = DatasetInfoStatus.SHADOWED # If anything other than a successful quantum produces # a visible dataset, that dataset is cursed. Set the # status for the dataset to cursed and note the reason @@ -943,9 +972,9 @@ def __resolve_duplicates( # without using tagged collections, so flag them as # merely unsuccessful unless the user requests it. if dataset_type_name.endswith("_log") and not curse_failed_logs: - dataset_info["status"] = "unsuccessful" + dataset_info["status"] = DatasetInfoStatus.UNSUCCESSFUL else: - dataset_info["status"] = "cursed" + dataset_info["status"] = DatasetInfoStatus.CURSED dataset_info["messages"].append( f"Unsuccessful dataset {dataset_type_name} visible in " "final output collection." @@ -954,9 +983,9 @@ def __resolve_duplicates( # visible and not successful) is a regular # failure. case _: - dataset_info["status"] = "unsuccessful" + dataset_info["status"] = DatasetInfoStatus.UNSUCCESSFUL if len(visible_runs) > 1: - quantum_info["status"] = "wonky" + quantum_info["status"] = QuantumInfoStatus.WONKY quantum_info["messages"].append( f"Outputs from different runs of the same quanta were visible: {visible_runs}." )