diff --git a/python/lsst/pipe/base/quantum_provenance_graph.py b/python/lsst/pipe/base/quantum_provenance_graph.py index 86abbbd0..1881df59 100644 --- a/python/lsst/pipe/base/quantum_provenance_graph.py +++ b/python/lsst/pipe/base/quantum_provenance_graph.py @@ -181,12 +181,13 @@ class QuantumInfoStatus(Enum): proceed with processing. Currently, a quantum enters a wonky state for one of three reasons: - Its `QuantumInfoStatus` exits a successful state. Something that - initially succeeded fails on + initially succeeded fails on subsequent attempts. - A `QuantumRun` is missing logs. - - There are multiple runs associated with a dataset which comes up in a - findFirst search. This means that a dataset which will be used as an - input data product for further processing has heterogeneous inputs, - which may have had different inputs or a different data-query. + - There are multiple runs associated with a dataset, and this comes up + in a findFirst search. This means that a dataset which will be used + as an input data product for further processing has heterogeneous + inputs, which may have had different inputs or a different + data-query. FAILED = -2: These quanta were attempted and failed. Failed quanta have logs and no metadata. UNKNOWN = -1: These are quanta which do not have any metadata associated @@ -345,7 +346,7 @@ class UnsuccessfulQuantumSummary(pydantic.BaseModel): """ messages: list[str] """Any messages associated with the unsuccessful quantum (any clues as to - why the quantum may be in a `failed` or `wonky` state). + why the quantum may be in a `FAILED` or `WONKY` state). """ @classmethod @@ -398,17 +399,17 @@ def n_failed(self) -> int: failed_quanta: list[UnsuccessfulQuantumSummary] = pydantic.Field(default_factory=list) """A list of all `UnsuccessfulQuantumSummary` objects associated with the - `failed` quanta. This is a report containing their data IDs, the status + FAILED quanta. This is a report containing their data IDs, the status of each run associated with each `failed` quantum, and the error messages associated with the failures when applicable. """ recovered_quanta: list[dict[str, DataIdValue]] = pydantic.Field(default_factory=list) - """A list of the quanta which moved from an unsuccessful to `successful` + """A list of the quanta which moved from an unsuccessful to SUCCESSFUL state. """ wonky_quanta: list[UnsuccessfulQuantumSummary] = pydantic.Field(default_factory=list) """A list of all `UnsuccessfulQuantumSummary` objects associated with the - `wonky` quanta. This is a report containing their data_ids, the status of + WONKY quanta. This is a report containing their data_ids, the status of each run associated with each `wonky` quantum, and messages (dictated in this module) associated with the particular issue identified. """ @@ -468,22 +469,22 @@ def add_quantum_info(self, info: QuantumInfo, butler: Butler, do_store_logs: boo class CursedDatasetSummary(pydantic.BaseModel): - """A summary of all the relevant information on a `cursed` dataset.""" + """A summary of all the relevant information on a cursed dataset.""" producer_data_id: dict[str, DataIdValue] """The data_id of the task which produced this dataset. This is mostly useful for people wishing to track down the task which produced this - `cursed` dataset quickly. + cursed dataset quickly. """ data_id: dict[str, DataIdValue] - """The data_id of the cursed `Dataset`. + """The data_id of the cursed dataset. """ runs_produced: dict[str, bool] - """A dictionary of all the runs associated with the `cursed` dataset; + """A dictionary of all the runs associated with the cursed dataset; the `bool` is true if the dataset was produced in the associated run. """ run_visible: str | None - """A dictionary of all `visible` runs containing the `cursed` dataset. + """A dictionary of all `visible` runs containing the cursed dataset. """ messages: list[str] """Any diagnostic messages (dictated in this module) which might help in @@ -556,7 +557,7 @@ def n_unsuccessful(self) -> int: cursed_datasets: list[CursedDatasetSummary] = pydantic.Field(default_factory=list) """A list of all `CursedDatasetSummary` objects associated with the - `cursed` datasets. This is a report containing their data_ids and the + cursed datasets. This is a report containing their data_ids and the data_ids of their producer task, the status of each run associated with each `cursed` dataset, and messages (dictated in this module) associated with the particular issue identified. @@ -579,7 +580,7 @@ def add_dataset_info(self, info: DatasetInfo, producer_info: QuantumInfo) -> Non producer_info : `QuantumInfo` The `QuantumInfo` object associated with the producer of the dataset. This is used to report the producer task in the - summaries for `cursed` datasets, which may help identify + summaries for cursed datasets, which may help identify specific issues. """ match info["status"]: @@ -599,7 +600,7 @@ def add_dataset_info(self, info: DatasetInfo, producer_info: QuantumInfo) -> Non class Summary(pydantic.BaseModel): """A summary of the contents of the QuantumProvenanceGraph, including - all information on the quanta for each `Task` and the datasets of each + all information on the quanta for each task and the datasets of each `DatasetType`. """ @@ -618,11 +619,14 @@ class QuantumProvenanceGraph: Step through all the quantum graphs associated with certain tasks or processing steps. For each graph/attempt, the status of each quantum and - dataset is recorded in `QuantumProvenanceGraph.add_new_graph` and outcomes - of quanta over multiple runs are resolved in - `QuantumProvenanceGraph.resolve_duplicates`. At the end of this process, - we can combine all attempts into a summary. This serves to answer the - question "What happened to this data ID?" in a wholistic sense. + dataset is recorded in `QuantumProvenanceGraph.__add_new_graph` and + outcomes of quanta over multiple runs are resolved in + `QuantumProvenanceGraph.__resolve_duplicates`. These can be called outside + the class in the correct order by + `QuantumProvenanceGraph.assemble_quantum_provenance_graph`. At the end of + this process, we can combine all attempts into a summary using the + `QuantumProvenanceGraph.to_summary` method. This serves to answer the + question 'What happened to this data ID?' in a wholistic sense. """ def __init__(self) -> None: @@ -671,15 +675,15 @@ def __add_new_graph(self, butler: Butler, qgraph: QuantumGraph | ResourcePathExp `DatasetRun` and `QuantumRun`). For each new quantum, annotate the status of the `QuantumRun` by inspecting the graph. If a DatasetType was produced, annotate this in the run by setting - `DatasetRun.produced = True`. If a quantum is given `blocked` - or `failed` status, annotate all their successors in the graph - as `blocked`. For each new quantum, use the transition between + `DatasetRun.produced = True`. If a quantum is given BLOCKED + or FAILED status, annotate all their successors in the graph + as BLOCKED. For each new quantum, use the transition between the current and last `QuantumRun.status` to determine the status to assign to the overall `QuantumInfo`. For example, if a - previous run associated with a quantum had the status `failed`, - and the status from the new graph reads `successful`, we can - mark the overall quantum status as `successful` and list the data_id - as `recovered`. + previous run associated with a quantum had the status FAILED, + and the status from the new graph reads SUCCESSFUL, we can + mark the overall quantum status as SUCCESSFUL and list the data_id + as RECOVERED. Parameters ---------- @@ -780,7 +784,6 @@ def __add_new_graph(self, butler: Butler, qgraph: QuantumGraph | ResourcePathExp else: quantum_run.status = QuantumRunStatus.LOGS_MISSING # missing metadata means that the task did not finish. - else: # if we have logs and no metadata, the task not finishing is # a failure in the task itself. This includes all payload @@ -809,10 +812,10 @@ def __add_new_graph(self, butler: Butler, qgraph: QuantumGraph | ResourcePathExp last_status = quantum_info["status"] new_status: QuantumInfoStatus match last_status, quantum_run.status: - # A quantum can never escape a `wonky` state. + # A quantum can never escape a WONKY state. case (QuantumInfoStatus.WONKY, _): new_status = QuantumInfoStatus.WONKY - # Any transition to a success (excluding from `wonky`) is + # Any transition to a success (excluding from WONKY) is # a success; any transition from a failed state is also a # recovery. case (_, QuantumRunStatus.SUCCESSFUL): @@ -871,13 +874,13 @@ def __resolve_duplicates( to the `QuantumProvenanceGraph, resolve any discrepancies between them and use all attempts to finalize overall status. - Particularly, use the publish state of each `DatasetRun` in combination - with overall quantum status to ascertain the status of each dataset. + Particularly, use the state of each `DatasetRun` in combination with + overall quantum status to ascertain the status of each dataset. Additionally, if there are multiple visible runs associated with a - dataset, mark the producer quantum as `wonky`. + dataset, mark the producer quantum as WONKY. This method should be called after - `QuantumProvenanceGraph.add_new_graph` has been called on every graph + `QuantumProvenanceGraph.__add_new_graph` has been called on every graph associated with the data processing. Parameters @@ -894,12 +897,12 @@ def __resolve_duplicates( A "where" string to use to constrain the collections, if passed. curse_failed_logs : `bool` - Mark log datasets as `cursed` if they are visible in the final + Mark log datasets as CURSED if they are visible in the final output collection. Note that a campaign-level collection must be used here for `collections` if `curse_failed_logs` is `True`; if - `resolve_duplicates` is run on a list of group-level collections - then each will show logs from their own failures as visible - the datasets will show as cursed regardless of this flag. + `__resolve_duplicates` is run on a list of group-level collections + then each will only show log datasets from their own failures as + visible and datasets from others will be marked as cursed. """ # First thing: raise an error if resolve_duplicates has been run # before on this qpg. @@ -1008,6 +1011,35 @@ def assemble_quantum_provenance_graph( where: str = "", curse_failed_logs: bool = False, ) -> None: + """Assemble the quantum provenance graph from a list of all graphs + corresponding to processing attempts. + + This method calls the private method `__add_new_graph` on each of the + constituent graphs, verifying that the graphs have been passed in + order. After `__add_new_graph` has been called on all graphs in the + `Sequence`, the method calls `__resolve_duplicates`. + + Parameters + ---------- + butler : `lsst.daf.butler.Butler` + The Butler used for this report. This should match the Butler used + for the run associated with the executed quantum graph. + qgraphs : `Sequence`[`QuantumGraph` | `ResourcePathExpression`] + A list of either quantum graph objects or their uri's, to be used + to assemble the `QuantumProvenanceGraph`. + collections : `Sequence[str]` | `None` + Collections to use in `lsst.daf.butler.registry.queryDatasets` if + paring down the query would be useful. + where : `str` + A "where" string to use to constrain the collections, if passed. + curse_failed_logs : `bool` + Mark log datasets as CURSED if they are visible in the final + output collection. Note that a campaign-level collection must be + used here for `collections` if `curse_failed_logs` is `True`; if + `__resolve_duplicates` is run on a list of group-level collections + then each will only show log datasets from their own failures as + visible and datasets from others will be marked as cursed. + """ output_runs = [] for count, graph in enumerate(qgraphs): qgraph = graph if isinstance(graph, QuantumGraph) else QuantumGraph.loadUri(graph) @@ -1033,6 +1065,9 @@ def assemble_quantum_provenance_graph( # If the user has not passed a `collections` variable if not collections: collections = list(reversed(output_runs)) + assert ( + not curse_failed_logs + ), "curse_failed_logs option must be used with one campaign-level collection." self.__resolve_duplicates(butler, collections, where, curse_failed_logs) def to_summary(self, butler: Butler, do_store_logs: bool = True) -> Summary: