diff --git a/doc/changes/DM-41966.feature.rst b/doc/changes/DM-41966.feature.rst new file mode 100644 index 0000000000..5ea6679ea7 --- /dev/null +++ b/doc/changes/DM-41966.feature.rst @@ -0,0 +1,4 @@ +* Added new API ``Butler.transfer_dimension_records_from()`` to copy dimension records out of some refs and add them to the target butler. +* This and ``Butler.transfer_from()`` now copy related dimension records as well as the records associated directly with the refs. + For example, if visit is being transferred additional records such as visit_definition will also be copied. + This requires a full Butler and not a limited Butler (such as the one backed by a quantum graph). diff --git a/python/lsst/daf/butler/_butler.py b/python/lsst/daf/butler/_butler.py index 655f774fdc..e2804dd0aa 100644 --- a/python/lsst/daf/butler/_butler.py +++ b/python/lsst/daf/butler/_butler.py @@ -1248,6 +1248,27 @@ def import_( """ raise NotImplementedError() + @abstractmethod + def transfer_dimension_records_from( + self, source_butler: LimitedButler | Butler, source_refs: Iterable[DatasetRef] + ) -> None: + """Transfer dimension records to this Butler from another Butler. + + Parameters + ---------- + source_butler : `LimitedButler` or `Butler` + Butler from which the records are to be transferred. If data IDs + in ``source_refs`` are not expanded then this has to be a full + `Butler` whose registry will be used to expand data IDs. If the + source refs contain coordinates that are used to populate other + records then this will also need to be a full `Butler`. + source_refs : iterable of `DatasetRef` + Datasets defined in the source butler whose dimension records + should be transferred to this butler. In most circumstances. + transfer is faster if the dataset refs are expanded. + """ + raise NotImplementedError() + @abstractmethod def transfer_from( self, diff --git a/python/lsst/daf/butler/dimensions/_universe.py b/python/lsst/daf/butler/dimensions/_universe.py index db7fdc9c32..5643586d15 100644 --- a/python/lsst/daf/butler/dimensions/_universe.py +++ b/python/lsst/daf/butler/dimensions/_universe.py @@ -596,7 +596,7 @@ def getEncodeLength(self) -> int: def get_elements_populated_by(self, dimension: Dimension) -> NamedValueAbstractSet[DimensionElement]: """Return the set of `DimensionElement` objects whose - `~DimensionElement.populated_by` atttribute is the given dimension. + `~DimensionElement.populated_by` attribute is the given dimension. """ return self._populates[dimension.name] diff --git a/python/lsst/daf/butler/direct_butler.py b/python/lsst/daf/butler/direct_butler.py index d9a8b13c08..ab857d9fbd 100644 --- a/python/lsst/daf/butler/direct_butler.py +++ b/python/lsst/daf/butler/direct_butler.py @@ -37,6 +37,7 @@ import collections.abc import contextlib import io +import itertools import logging import numbers import os @@ -1879,6 +1880,125 @@ def doImport(importStream: TextIO | ResourceHandleProtocol) -> None: else: doImport(filename) # type: ignore + def transfer_dimension_records_from( + self, source_butler: LimitedButler | Butler, source_refs: Iterable[DatasetRef] + ) -> None: + # Allowed dimensions in the target butler. + elements = frozenset( + element for element in self.dimensions.elements if element.hasTable() and element.viewOf is None + ) + + data_ids = {ref.dataId for ref in source_refs} + + dimension_records = self._extract_all_dimension_records_from_data_ids( + source_butler, data_ids, elements + ) + + # Insert order is important. + for element in self.dimensions.sorted(dimension_records.keys()): + records = [r for r in dimension_records[element].values()] + # Assume that if the record is already present that we can + # use it without having to check that the record metadata + # is consistent. + self._registry.insertDimensionData(element, *records, skip_existing=True) + _LOG.debug("Dimension '%s' -- number of records transferred: %d", element.name, len(records)) + + def _extract_all_dimension_records_from_data_ids( + self, + source_butler: LimitedButler | Butler, + data_ids: set[DataCoordinate], + allowed_elements: frozenset[DimensionElement], + ) -> dict[DimensionElement, dict[DataCoordinate, DimensionRecord]]: + primary_records = self._extract_dimension_records_from_data_ids( + source_butler, data_ids, allowed_elements + ) + + can_query = True if isinstance(source_butler, Butler) else False + + additional_records: dict[DimensionElement, dict[DataCoordinate, DimensionRecord]] = defaultdict(dict) + for original_element, record_mapping in primary_records.items(): + # Get dimensions that depend on this dimension. + populated_by = self.dimensions.get_elements_populated_by( + self.dimensions[original_element.name] # type: ignore + ) + + for data_id in record_mapping.keys(): + for element in populated_by: + if element not in allowed_elements: + continue + if element.name == original_element.name: + continue + + if element.name in primary_records: + # If this element has already been stored avoid + # re-finding records since that may lead to additional + # spurious records. e.g. visit is populated_by + # visit_detector_region but querying + # visit_detector_region by visit will return all the + # detectors for this visit -- the visit dataId does not + # constrain this. + # To constrain the query the original dataIds would + # have to be scanned. + continue + + if not can_query: + raise RuntimeError( + f"Transferring populated_by records like {element.name} requires a full Butler." + ) + + records = source_butler.registry.queryDimensionRecords( # type: ignore + element.name, **data_id.mapping # type: ignore + ) + for record in records: + additional_records[record.definition].setdefault(record.dataId, record) + + # The next step is to walk back through the additional records to + # pick up any missing content (such as visit_definition needing to + # know the exposure). Want to ensure we do not request records we + # already have. + missing_data_ids = set() + for name, record_mapping in additional_records.items(): + for data_id in record_mapping.keys(): + if data_id not in primary_records[name]: + missing_data_ids.add(data_id) + + # Fill out the new records. Assume that these new records do not + # also need to carry over additional populated_by records. + secondary_records = self._extract_dimension_records_from_data_ids( + source_butler, missing_data_ids, allowed_elements + ) + + # Merge the extra sets of records in with the original. + for name, record_mapping in itertools.chain(additional_records.items(), secondary_records.items()): + primary_records[name].update(record_mapping) + + return primary_records + + def _extract_dimension_records_from_data_ids( + self, + source_butler: LimitedButler | Butler, + data_ids: set[DataCoordinate], + allowed_elements: frozenset[DimensionElement], + ) -> dict[DimensionElement, dict[DataCoordinate, DimensionRecord]]: + dimension_records: dict[DimensionElement, dict[DataCoordinate, DimensionRecord]] = defaultdict(dict) + + for data_id in data_ids: + # Need an expanded record, if not expanded that we need a full + # butler with registry (allow mocks with registry too). + if not data_id.hasRecords(): + if registry := getattr(source_butler, "registry", None): + data_id = registry.expandDataId(data_id) + else: + raise TypeError("Input butler needs to be a full butler to expand DataId.") + # If this butler doesn't know about a dimension in the source + # butler things will break later. + for element_name in data_id.dimensions.elements: + record = data_id.records[element_name] + if record is not None and record.definition in allowed_elements: + dimension_records[record.definition].setdefault(record.dataId, record) + + return dimension_records + def transfer_from( self, source_butler: LimitedButler, @@ -1972,21 +2092,9 @@ def transfer_from( if element.hasTable() and element.viewOf is None ) dataIds = {ref.dataId for ref in source_refs} - # This logic comes from saveDataIds. - for dataId in dataIds: - # Need an expanded record, if not expanded that we need a full - # butler with registry (allow mocks with registry too). - if not dataId.hasRecords(): - if registry := getattr(source_butler, "registry", None): - dataId = registry.expandDataId(dataId) - else: - raise TypeError("Input butler needs to be a full butler to expand DataId.") - # If this butler doesn't know about a dimension in the source - # butler things will break later. - for element_name in dataId.dimensions.elements: - record = dataId.records[element_name] - if record is not None and record.definition in elements: - dimension_records[record.definition].setdefault(record.dataId, record) + dimension_records = self._extract_all_dimension_records_from_data_ids( + source_butler, dataIds, elements + ) handled_collections: set[str] = set() @@ -1994,8 +2102,9 @@ def transfer_from( with self.transaction(): if dimension_records: _LOG.verbose("Ensuring that dimension records exist for transferred datasets.") - for element, r in dimension_records.items(): - records = [r[dataId] for dataId in r] + # Order matters. + for element in self.dimensions.sorted(dimension_records.keys()): + records = [r for r in dimension_records[element].values()] # Assume that if the record is already present that we can # use it without having to check that the record metadata # is consistent. diff --git a/python/lsst/daf/butler/remote_butler/_remote_butler.py b/python/lsst/daf/butler/remote_butler/_remote_butler.py index 52eac6fe2b..53a28bc631 100644 --- a/python/lsst/daf/butler/remote_butler/_remote_butler.py +++ b/python/lsst/daf/butler/remote_butler/_remote_butler.py @@ -407,6 +407,12 @@ def import_( # Docstring inherited. raise NotImplementedError() + def transfer_dimension_records_from( + self, source_butler: LimitedButler | Butler, source_refs: Iterable[DatasetRef] + ) -> None: + # Docstring inherited. + raise NotImplementedError() + def transfer_from( self, source_butler: LimitedButler, diff --git a/tests/test_butler.py b/tests/test_butler.py index 17d6744c51..5f6f5f6851 100644 --- a/tests/test_butler.py +++ b/tests/test_butler.py @@ -2308,6 +2308,11 @@ def assertButlerTransfers(self, purge: bool = False, storageClassName: str = "St # Can remove with DM-35498. self.target_butler.registry.refresh() + # Transfer the records for one ref to test the alternative API. + with self.assertLogs(logger="lsst", level=logging.DEBUG) as log_cm: + self.target_butler.transfer_dimension_records_from(self.source_butler, [source_refs[0]]) + self.assertIn("number of records transferred: 1", ";".join(log_cm.output)) + # Now transfer them to the second butler, including dimensions. with self.assertLogs(logger="lsst", level=logging.DEBUG) as log_cm: transferred = self.target_butler.transfer_from( diff --git a/tests/test_simpleButler.py b/tests/test_simpleButler.py index a2ebb2dc1c..5cf09b3f68 100644 --- a/tests/test_simpleButler.py +++ b/tests/test_simpleButler.py @@ -324,7 +324,8 @@ def testButlerGet(self): try: flat_id, _ = butler.get("flat", dataId=dataId, collections=coll, **kwds) except Exception as e: - raise type(e)(f"{str(e)}: dataId={dataId}, kwds={kwds}") from e + e.add_note(f"dataId={dataId}, kwds={kwds}") + raise self.assertEqual(flat_id, flat2g.id, msg=f"DataId: {dataId}, kwds: {kwds}") # Check that bad combinations raise. @@ -602,6 +603,25 @@ def testJson(self): from_json = type(test_item).from_json(json_str, universe=butler.dimensions) self.assertEqual(from_json, test_item, msg=f"From JSON '{json_str}' using universe") + def test_populated_by(self): + """Test that dimension records can find other records.""" + butler = self.makeButler(writeable=True) + butler.import_(filename=os.path.join(TESTDIR, "data", "registry", "hsc-rc2-subset.yaml")) + + elements = frozenset( + element for element in butler.dimensions.elements if element.hasTable() and element.viewOf is None + ) + + # Get a visit-based dataId. + data_ids = set(butler.registry.queryDataIds("visit", visit=1232, instrument="HSC")) + + # Request all the records related to it. + records = butler._extract_all_dimension_records_from_data_ids(butler, data_ids, elements) + + self.assertIn(butler.dimensions["visit_detector_region"], records, f"Keys: {records.keys()}") + self.assertIn(butler.dimensions["visit_system_membership"], records) + self.assertIn(butler.dimensions["visit_system"], records) + def testJsonDimensionRecordsAndHtmlRepresentation(self): # Dimension Records butler = self.makeButler(writeable=True)