From 65bdb9e67f9e398d4b16235fb57c954011caff6f Mon Sep 17 00:00:00 2001 From: Tim Jenness Date: Fri, 1 Dec 2023 10:57:31 -0700 Subject: [PATCH 1/5] Clean up an exception in test code by using add_note This is clearer than trying to raise the same exception from itself. --- tests/test_simpleButler.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/test_simpleButler.py b/tests/test_simpleButler.py index a2ebb2dc1c..36ed7691aa 100644 --- a/tests/test_simpleButler.py +++ b/tests/test_simpleButler.py @@ -324,7 +324,8 @@ def testButlerGet(self): try: flat_id, _ = butler.get("flat", dataId=dataId, collections=coll, **kwds) except Exception as e: - raise type(e)(f"{str(e)}: dataId={dataId}, kwds={kwds}") from e + e.add_note(f"dataId={dataId}, kwds={kwds}") + raise self.assertEqual(flat_id, flat2g.id, msg=f"DataId: {dataId}, kwds: {kwds}") # Check that bad combinations raise. From 43a946e4ad4536b45e52088cf16193569f99b3e8 Mon Sep 17 00:00:00 2001 From: Tim Jenness Date: Mon, 4 Dec 2023 16:38:09 -0700 Subject: [PATCH 2/5] New Butler API for transferring dimension records Also copies related dimensions populated by the original set. Butler.transfer_from now uses a part of this API. --- python/lsst/daf/butler/_butler.py | 19 +++ python/lsst/daf/butler/direct_butler.py | 126 +++++++++++++++--- .../butler/remote_butler/_remote_butler.py | 6 + tests/test_butler.py | 5 + tests/test_simpleButler.py | 19 +++ 5 files changed, 160 insertions(+), 15 deletions(-) diff --git a/python/lsst/daf/butler/_butler.py b/python/lsst/daf/butler/_butler.py index 655f774fdc..61f6c1f319 100644 --- a/python/lsst/daf/butler/_butler.py +++ b/python/lsst/daf/butler/_butler.py @@ -1248,6 +1248,25 @@ def import_( """ raise NotImplementedError() + @abstractmethod + def transfer_dimension_records_from( + self, source_butler: LimitedButler, source_refs: Iterable[DatasetRef] + ) -> None: + """Transfer dimension records to this Butler from another Butler. + + Parameters + ---------- + source_butler : `LimitedButler` + Butler from which the records are to be transferred. If data IDs + in ``source_refs`` are not expanded then this has to be a full + `Butler` whose registry will be used to expand data IDs. + source_refs : iterable of `DatasetRef` + Datasets defined in the source butler whose dimension records + should be transferred to this butler. In most circumstances. + transfer is faster if the dataset refs are expanded. + """ + raise NotImplementedError() + @abstractmethod def transfer_from( self, diff --git a/python/lsst/daf/butler/direct_butler.py b/python/lsst/daf/butler/direct_butler.py index d9a8b13c08..8a0880c169 100644 --- a/python/lsst/daf/butler/direct_butler.py +++ b/python/lsst/daf/butler/direct_butler.py @@ -37,6 +37,7 @@ import collections.abc import contextlib import io +import itertools import logging import numbers import os @@ -1879,6 +1880,113 @@ def doImport(importStream: TextIO | ResourceHandleProtocol) -> None: else: doImport(filename) # type: ignore + def transfer_dimension_records_from( + self, source_butler: LimitedButler, source_refs: Iterable[DatasetRef] + ) -> None: + # Allowed dimensions in the target butler. + elements = frozenset( + element for element in self.dimensions.elements if element.hasTable() and element.viewOf is None + ) + + data_ids = {ref.dataId for ref in source_refs} + + dimension_records = self._extract_all_dimension_records_from_data_ids( + source_butler, data_ids, elements + ) + + for element, r in dimension_records.items(): + records = [r[dataId] for dataId in r] + # Assume that if the record is already present that we can + # use it without having to check that the record metadata + # is consistent. + self._registry.insertDimensionData(element, *records, skip_existing=True) + _LOG.debug("Dimension '%s' -- number of records transferred: %d", element.name, len(records)) + + def _extract_all_dimension_records_from_data_ids( + self, + source_butler: LimitedButler, + data_ids: set[DataCoordinate], + allowed_elements: frozenset[DimensionElement], + ) -> dict[DimensionElement, dict[DataCoordinate, DimensionRecord]]: + primary_records = self._extract_dimension_records_from_data_ids( + source_butler, data_ids, allowed_elements + ) + + additional_records: dict[DimensionElement, dict[DataCoordinate, DimensionRecord]] = defaultdict(dict) + for name, records in primary_records.items(): + # Get dimensions that depend on this dimension. + populated_by = self.dimensions.get_elements_populated_by(self.dimensions[name]) + + for data_id in records.keys(): + for element in populated_by: + if element not in allowed_elements: + continue + if element.name == name: + continue + + if element.name in primary_records: + # If this element has already been stored avoid + # re-finding records since that may lead to additional + # spurious records. e.g. visit is populated_by + # visit_detector_region but querying + # visit_detector_region by visit will return all the + # detectors for this visit -- the visit dataId does not + # constrain this. + # To constrain the query the original dataIds would + # have to be scanned. + continue + + records = source_butler.registry.queryDimensionRecords(element.name, **data_id.mapping) + for record in records: + additional_records[record.definition].setdefault(record.dataId, record) + + # The next step is to walk back through the additional records to + # pick up any missing content (such as visit_definition needing to + # know the exposure). Want to ensure we do not request records we + # already have. + missing_data_ids = set() + for name, records in additional_records.items(): + for data_id in records.keys(): + if data_id not in primary_records[name]: + missing_data_ids.add(data_id) + + # Fill out the new records. Assume that these new records do not + # also need to carry over additional populated_by records. + secondary_records = self._extract_dimension_records_from_data_ids( + source_butler, missing_data_ids, allowed_elements + ) + + # Merge the extra sets of records in with the original. + for name, records in itertools.chain(additional_records.items(), secondary_records.items()): + primary_records[name].update(records) + + return primary_records + + def _extract_dimension_records_from_data_ids( + self, + source_butler: LimitedButler, + data_ids: set[DataCoordinate], + allowed_elements: frozenset[DimensionElement], + ) -> dict[DimensionElement, dict[DataCoordinate, DimensionRecord]]: + dimension_records: dict[DimensionElement, dict[DataCoordinate, DimensionRecord]] = defaultdict(dict) + + for data_id in data_ids: + # Need an expanded record, if not expanded that we need a full + # butler with registry (allow mocks with registry too). + if not data_id.hasRecords(): + if registry := getattr(source_butler, "registry", None): + data_id = registry.expandDataId(data_id) + else: + raise TypeError("Input butler needs to be a full butler to expand DataId.") + # If this butler doesn't know about a dimension in the source + # butler things will break later. + for element_name in data_id.dimensions.elements: + record = data_id.records[element_name] + if record is not None and record.definition in allowed_elements: + dimension_records[record.definition].setdefault(record.dataId, record) + + return dimension_records + def transfer_from( self, source_butler: LimitedButler, @@ -1972,21 +2080,9 @@ def transfer_from( if element.hasTable() and element.viewOf is None ) dataIds = {ref.dataId for ref in source_refs} - # This logic comes from saveDataIds. - for dataId in dataIds: - # Need an expanded record, if not expanded that we need a full - # butler with registry (allow mocks with registry too). - if not dataId.hasRecords(): - if registry := getattr(source_butler, "registry", None): - dataId = registry.expandDataId(dataId) - else: - raise TypeError("Input butler needs to be a full butler to expand DataId.") - # If this butler doesn't know about a dimension in the source - # butler things will break later. - for element_name in dataId.dimensions.elements: - record = dataId.records[element_name] - if record is not None and record.definition in elements: - dimension_records[record.definition].setdefault(record.dataId, record) + dimension_records = self._extract_all_dimension_records_from_data_ids( + source_butler, dataIds, elements + ) handled_collections: set[str] = set() diff --git a/python/lsst/daf/butler/remote_butler/_remote_butler.py b/python/lsst/daf/butler/remote_butler/_remote_butler.py index 52eac6fe2b..c808e3127f 100644 --- a/python/lsst/daf/butler/remote_butler/_remote_butler.py +++ b/python/lsst/daf/butler/remote_butler/_remote_butler.py @@ -407,6 +407,12 @@ def import_( # Docstring inherited. raise NotImplementedError() + def transfer_dimension_records_from( + self, source_butler: LimitedButler, source_refs: Iterable[DatasetRef] + ) -> None: + # Docstring inherited. + raise NotImplementedError() + def transfer_from( self, source_butler: LimitedButler, diff --git a/tests/test_butler.py b/tests/test_butler.py index 17d6744c51..5f6f5f6851 100644 --- a/tests/test_butler.py +++ b/tests/test_butler.py @@ -2308,6 +2308,11 @@ def assertButlerTransfers(self, purge: bool = False, storageClassName: str = "St # Can remove with DM-35498. self.target_butler.registry.refresh() + # Transfer the records for one ref to test the alternative API. + with self.assertLogs(logger="lsst", level=logging.DEBUG) as log_cm: + self.target_butler.transfer_dimension_records_from(self.source_butler, [source_refs[0]]) + self.assertIn("number of records transferred: 1", ";".join(log_cm.output)) + # Now transfer them to the second butler, including dimensions. with self.assertLogs(logger="lsst", level=logging.DEBUG) as log_cm: transferred = self.target_butler.transfer_from( diff --git a/tests/test_simpleButler.py b/tests/test_simpleButler.py index 36ed7691aa..5cf09b3f68 100644 --- a/tests/test_simpleButler.py +++ b/tests/test_simpleButler.py @@ -603,6 +603,25 @@ def testJson(self): from_json = type(test_item).from_json(json_str, universe=butler.dimensions) self.assertEqual(from_json, test_item, msg=f"From JSON '{json_str}' using universe") + def test_populated_by(self): + """Test that dimension records can find other records.""" + butler = self.makeButler(writeable=True) + butler.import_(filename=os.path.join(TESTDIR, "data", "registry", "hsc-rc2-subset.yaml")) + + elements = frozenset( + element for element in butler.dimensions.elements if element.hasTable() and element.viewOf is None + ) + + # Get a visit-based dataId. + data_ids = set(butler.registry.queryDataIds("visit", visit=1232, instrument="HSC")) + + # Request all the records related to it. + records = butler._extract_all_dimension_records_from_data_ids(butler, data_ids, elements) + + self.assertIn(butler.dimensions["visit_detector_region"], records, f"Keys: {records.keys()}") + self.assertIn(butler.dimensions["visit_system_membership"], records) + self.assertIn(butler.dimensions["visit_system"], records) + def testJsonDimensionRecordsAndHtmlRepresentation(self): # Dimension Records butler = self.makeButler(writeable=True) From fad6e8a3f3c629411c314d17dbe66db80720116d Mon Sep 17 00:00:00 2001 From: Tim Jenness Date: Tue, 5 Dec 2023 16:29:08 -0700 Subject: [PATCH 3/5] Fix some type annotations --- python/lsst/daf/butler/_butler.py | 8 +++-- .../lsst/daf/butler/dimensions/_universe.py | 2 +- python/lsst/daf/butler/direct_butler.py | 35 ++++++++++++------- .../butler/remote_butler/_remote_butler.py | 2 +- 4 files changed, 30 insertions(+), 17 deletions(-) diff --git a/python/lsst/daf/butler/_butler.py b/python/lsst/daf/butler/_butler.py index 61f6c1f319..e2804dd0aa 100644 --- a/python/lsst/daf/butler/_butler.py +++ b/python/lsst/daf/butler/_butler.py @@ -1250,16 +1250,18 @@ def import_( @abstractmethod def transfer_dimension_records_from( - self, source_butler: LimitedButler, source_refs: Iterable[DatasetRef] + self, source_butler: LimitedButler | Butler, source_refs: Iterable[DatasetRef] ) -> None: """Transfer dimension records to this Butler from another Butler. Parameters ---------- - source_butler : `LimitedButler` + source_butler : `LimitedButler` or `Butler` Butler from which the records are to be transferred. If data IDs in ``source_refs`` are not expanded then this has to be a full - `Butler` whose registry will be used to expand data IDs. + `Butler` whose registry will be used to expand data IDs. If the + source refs contain coordinates that are used to populate other + records then this will also need to be a full `Butler`. source_refs : iterable of `DatasetRef` Datasets defined in the source butler whose dimension records should be transferred to this butler. In most circumstances. diff --git a/python/lsst/daf/butler/dimensions/_universe.py b/python/lsst/daf/butler/dimensions/_universe.py index db7fdc9c32..5643586d15 100644 --- a/python/lsst/daf/butler/dimensions/_universe.py +++ b/python/lsst/daf/butler/dimensions/_universe.py @@ -596,7 +596,7 @@ def getEncodeLength(self) -> int: def get_elements_populated_by(self, dimension: Dimension) -> NamedValueAbstractSet[DimensionElement]: """Return the set of `DimensionElement` objects whose - `~DimensionElement.populated_by` atttribute is the given dimension. + `~DimensionElement.populated_by` attribute is the given dimension. """ return self._populates[dimension.name] diff --git a/python/lsst/daf/butler/direct_butler.py b/python/lsst/daf/butler/direct_butler.py index 8a0880c169..54dc2c4e94 100644 --- a/python/lsst/daf/butler/direct_butler.py +++ b/python/lsst/daf/butler/direct_butler.py @@ -1881,7 +1881,7 @@ def doImport(importStream: TextIO | ResourceHandleProtocol) -> None: doImport(filename) # type: ignore def transfer_dimension_records_from( - self, source_butler: LimitedButler, source_refs: Iterable[DatasetRef] + self, source_butler: LimitedButler | Butler, source_refs: Iterable[DatasetRef] ) -> None: # Allowed dimensions in the target butler. elements = frozenset( @@ -1904,7 +1904,7 @@ def transfer_dimension_records_from( def _extract_all_dimension_records_from_data_ids( self, - source_butler: LimitedButler, + source_butler: LimitedButler | Butler, data_ids: set[DataCoordinate], allowed_elements: frozenset[DimensionElement], ) -> dict[DimensionElement, dict[DataCoordinate, DimensionRecord]]: @@ -1912,16 +1912,20 @@ def _extract_all_dimension_records_from_data_ids( source_butler, data_ids, allowed_elements ) + can_query = True if isinstance(source_butler, Butler) else False + additional_records: dict[DimensionElement, dict[DataCoordinate, DimensionRecord]] = defaultdict(dict) - for name, records in primary_records.items(): + for original_element, record_mapping in primary_records.items(): # Get dimensions that depend on this dimension. - populated_by = self.dimensions.get_elements_populated_by(self.dimensions[name]) + populated_by = self.dimensions.get_elements_populated_by( + self.dimensions[original_element.name] # type: ignore + ) - for data_id in records.keys(): + for data_id in record_mapping.keys(): for element in populated_by: if element not in allowed_elements: continue - if element.name == name: + if element.name == original_element.name: continue if element.name in primary_records: @@ -1936,7 +1940,14 @@ def _extract_all_dimension_records_from_data_ids( # have to be scanned. continue - records = source_butler.registry.queryDimensionRecords(element.name, **data_id.mapping) + if not can_query: + raise RuntimeError( + f"Transferring populated_by records like {element.name} requires a full Butler." + ) + + records = source_butler.registry.queryDimensionRecords( # type: ignore + element.name, **data_id.mapping # type: ignore + ) for record in records: additional_records[record.definition].setdefault(record.dataId, record) @@ -1945,8 +1956,8 @@ def _extract_all_dimension_records_from_data_ids( # know the exposure). Want to ensure we do not request records we # already have. missing_data_ids = set() - for name, records in additional_records.items(): - for data_id in records.keys(): + for name, record_mapping in additional_records.items(): + for data_id in record_mapping.keys(): if data_id not in primary_records[name]: missing_data_ids.add(data_id) @@ -1957,14 +1968,14 @@ def _extract_all_dimension_records_from_data_ids( ) # Merge the extra sets of records in with the original. - for name, records in itertools.chain(additional_records.items(), secondary_records.items()): - primary_records[name].update(records) + for name, record_mapping in itertools.chain(additional_records.items(), secondary_records.items()): + primary_records[name].update(record_mapping) return primary_records def _extract_dimension_records_from_data_ids( self, - source_butler: LimitedButler, + source_butler: LimitedButler | Butler, data_ids: set[DataCoordinate], allowed_elements: frozenset[DimensionElement], ) -> dict[DimensionElement, dict[DataCoordinate, DimensionRecord]]: diff --git a/python/lsst/daf/butler/remote_butler/_remote_butler.py b/python/lsst/daf/butler/remote_butler/_remote_butler.py index c808e3127f..53a28bc631 100644 --- a/python/lsst/daf/butler/remote_butler/_remote_butler.py +++ b/python/lsst/daf/butler/remote_butler/_remote_butler.py @@ -408,7 +408,7 @@ def import_( raise NotImplementedError() def transfer_dimension_records_from( - self, source_butler: LimitedButler, source_refs: Iterable[DatasetRef] + self, source_butler: LimitedButler | Butler, source_refs: Iterable[DatasetRef] ) -> None: # Docstring inherited. raise NotImplementedError() From 31fc7636f5fe9a37ea3ba211908db859a53f0267 Mon Sep 17 00:00:00 2001 From: Tim Jenness Date: Wed, 6 Dec 2023 15:02:30 -0700 Subject: [PATCH 4/5] Ensure that dimension records are inserted in the right order This makes sure that exposure is inserted before visit and before visit_definition. --- python/lsst/daf/butler/direct_butler.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/python/lsst/daf/butler/direct_butler.py b/python/lsst/daf/butler/direct_butler.py index 54dc2c4e94..ab857d9fbd 100644 --- a/python/lsst/daf/butler/direct_butler.py +++ b/python/lsst/daf/butler/direct_butler.py @@ -1894,8 +1894,9 @@ def transfer_dimension_records_from( source_butler, data_ids, elements ) - for element, r in dimension_records.items(): - records = [r[dataId] for dataId in r] + # Insert order is important. + for element in self.dimensions.sorted(dimension_records.keys()): + records = [r for r in dimension_records[element].values()] # Assume that if the record is already present that we can # use it without having to check that the record metadata # is consistent. @@ -2101,8 +2102,9 @@ def transfer_from( with self.transaction(): if dimension_records: _LOG.verbose("Ensuring that dimension records exist for transferred datasets.") - for element, r in dimension_records.items(): - records = [r[dataId] for dataId in r] + # Order matters. + for element in self.dimensions.sorted(dimension_records.keys()): + records = [r for r in dimension_records[element].values()] # Assume that if the record is already present that we can # use it without having to check that the record metadata # is consistent. From 484526386a38a5f1619d04cde785745785a19e76 Mon Sep 17 00:00:00 2001 From: Tim Jenness Date: Wed, 6 Dec 2023 21:17:41 -0700 Subject: [PATCH 5/5] Add news fragment. --- doc/changes/DM-41966.feature.rst | 4 ++++ 1 file changed, 4 insertions(+) create mode 100644 doc/changes/DM-41966.feature.rst diff --git a/doc/changes/DM-41966.feature.rst b/doc/changes/DM-41966.feature.rst new file mode 100644 index 0000000000..5ea6679ea7 --- /dev/null +++ b/doc/changes/DM-41966.feature.rst @@ -0,0 +1,4 @@ +* Added new API ``Butler.transfer_dimension_records_from()`` to copy dimension records out of some refs and add them to the target butler. +* This and ``Butler.transfer_from()`` now copy related dimension records as well as the records associated directly with the refs. + For example, if visit is being transferred additional records such as visit_definition will also be copied. + This requires a full Butler and not a limited Butler (such as the one backed by a quantum graph).