Skip to content

Commit

Permalink
Merge pull request #921 from lsst/tickets/DM-41966
Browse files Browse the repository at this point in the history
DM-41966: Add Butler.transfer_dimension_records_from API
  • Loading branch information
timj committed Dec 7, 2023
2 parents d41daf1 + 4845263 commit f87c7a0
Show file tree
Hide file tree
Showing 7 changed files with 184 additions and 19 deletions.
4 changes: 4 additions & 0 deletions doc/changes/DM-41966.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
* Added new API ``Butler.transfer_dimension_records_from()`` to copy dimension records out of some refs and add them to the target butler.
* This and ``Butler.transfer_from()`` now copy related dimension records as well as the records associated directly with the refs.
For example, if visit is being transferred additional records such as visit_definition will also be copied.
This requires a full Butler and not a limited Butler (such as the one backed by a quantum graph).
21 changes: 21 additions & 0 deletions python/lsst/daf/butler/_butler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1248,6 +1248,27 @@ def import_(
"""
raise NotImplementedError()

@abstractmethod
def transfer_dimension_records_from(
self, source_butler: LimitedButler | Butler, source_refs: Iterable[DatasetRef]
) -> None:
"""Transfer dimension records to this Butler from another Butler.
Parameters
----------
source_butler : `LimitedButler` or `Butler`
Butler from which the records are to be transferred. If data IDs
in ``source_refs`` are not expanded then this has to be a full
`Butler` whose registry will be used to expand data IDs. If the
source refs contain coordinates that are used to populate other
records then this will also need to be a full `Butler`.
source_refs : iterable of `DatasetRef`
Datasets defined in the source butler whose dimension records
should be transferred to this butler. In most circumstances.
transfer is faster if the dataset refs are expanded.
"""
raise NotImplementedError()

@abstractmethod
def transfer_from(
self,
Expand Down
2 changes: 1 addition & 1 deletion python/lsst/daf/butler/dimensions/_universe.py
Original file line number Diff line number Diff line change
Expand Up @@ -596,7 +596,7 @@ def getEncodeLength(self) -> int:

def get_elements_populated_by(self, dimension: Dimension) -> NamedValueAbstractSet[DimensionElement]:
"""Return the set of `DimensionElement` objects whose
`~DimensionElement.populated_by` atttribute is the given dimension.
`~DimensionElement.populated_by` attribute is the given dimension.
"""
return self._populates[dimension.name]

Expand Down
143 changes: 126 additions & 17 deletions python/lsst/daf/butler/direct_butler.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import collections.abc
import contextlib
import io
import itertools
import logging
import numbers
import os
Expand Down Expand Up @@ -1879,6 +1880,125 @@ def doImport(importStream: TextIO | ResourceHandleProtocol) -> None:
else:
doImport(filename) # type: ignore

def transfer_dimension_records_from(
self, source_butler: LimitedButler | Butler, source_refs: Iterable[DatasetRef]
) -> None:
# Allowed dimensions in the target butler.
elements = frozenset(
element for element in self.dimensions.elements if element.hasTable() and element.viewOf is None
)

data_ids = {ref.dataId for ref in source_refs}

dimension_records = self._extract_all_dimension_records_from_data_ids(
source_butler, data_ids, elements
)

# Insert order is important.
for element in self.dimensions.sorted(dimension_records.keys()):
records = [r for r in dimension_records[element].values()]
# Assume that if the record is already present that we can
# use it without having to check that the record metadata
# is consistent.
self._registry.insertDimensionData(element, *records, skip_existing=True)
_LOG.debug("Dimension '%s' -- number of records transferred: %d", element.name, len(records))

def _extract_all_dimension_records_from_data_ids(
self,
source_butler: LimitedButler | Butler,
data_ids: set[DataCoordinate],
allowed_elements: frozenset[DimensionElement],
) -> dict[DimensionElement, dict[DataCoordinate, DimensionRecord]]:
primary_records = self._extract_dimension_records_from_data_ids(
source_butler, data_ids, allowed_elements
)

can_query = True if isinstance(source_butler, Butler) else False

additional_records: dict[DimensionElement, dict[DataCoordinate, DimensionRecord]] = defaultdict(dict)
for original_element, record_mapping in primary_records.items():
# Get dimensions that depend on this dimension.
populated_by = self.dimensions.get_elements_populated_by(
self.dimensions[original_element.name] # type: ignore
)

for data_id in record_mapping.keys():
for element in populated_by:
if element not in allowed_elements:
continue
if element.name == original_element.name:
continue

if element.name in primary_records:
# If this element has already been stored avoid
# re-finding records since that may lead to additional
# spurious records. e.g. visit is populated_by
# visit_detector_region but querying
# visit_detector_region by visit will return all the
# detectors for this visit -- the visit dataId does not
# constrain this.
# To constrain the query the original dataIds would
# have to be scanned.
continue

if not can_query:
raise RuntimeError(
f"Transferring populated_by records like {element.name} requires a full Butler."
)

records = source_butler.registry.queryDimensionRecords( # type: ignore
element.name, **data_id.mapping # type: ignore
)
for record in records:
additional_records[record.definition].setdefault(record.dataId, record)

# The next step is to walk back through the additional records to
# pick up any missing content (such as visit_definition needing to
# know the exposure). Want to ensure we do not request records we
# already have.
missing_data_ids = set()
for name, record_mapping in additional_records.items():
for data_id in record_mapping.keys():
if data_id not in primary_records[name]:
missing_data_ids.add(data_id)

# Fill out the new records. Assume that these new records do not
# also need to carry over additional populated_by records.
secondary_records = self._extract_dimension_records_from_data_ids(
source_butler, missing_data_ids, allowed_elements
)

# Merge the extra sets of records in with the original.
for name, record_mapping in itertools.chain(additional_records.items(), secondary_records.items()):
primary_records[name].update(record_mapping)

return primary_records

def _extract_dimension_records_from_data_ids(
self,
source_butler: LimitedButler | Butler,
data_ids: set[DataCoordinate],
allowed_elements: frozenset[DimensionElement],
) -> dict[DimensionElement, dict[DataCoordinate, DimensionRecord]]:
dimension_records: dict[DimensionElement, dict[DataCoordinate, DimensionRecord]] = defaultdict(dict)

for data_id in data_ids:
# Need an expanded record, if not expanded that we need a full
# butler with registry (allow mocks with registry too).
if not data_id.hasRecords():
if registry := getattr(source_butler, "registry", None):
data_id = registry.expandDataId(data_id)
else:
raise TypeError("Input butler needs to be a full butler to expand DataId.")
# If this butler doesn't know about a dimension in the source
# butler things will break later.
for element_name in data_id.dimensions.elements:
record = data_id.records[element_name]
if record is not None and record.definition in allowed_elements:
dimension_records[record.definition].setdefault(record.dataId, record)

return dimension_records

def transfer_from(
self,
source_butler: LimitedButler,
Expand Down Expand Up @@ -1972,30 +2092,19 @@ def transfer_from(
if element.hasTable() and element.viewOf is None
)
dataIds = {ref.dataId for ref in source_refs}
# This logic comes from saveDataIds.
for dataId in dataIds:
# Need an expanded record, if not expanded that we need a full
# butler with registry (allow mocks with registry too).
if not dataId.hasRecords():
if registry := getattr(source_butler, "registry", None):
dataId = registry.expandDataId(dataId)
else:
raise TypeError("Input butler needs to be a full butler to expand DataId.")
# If this butler doesn't know about a dimension in the source
# butler things will break later.
for element_name in dataId.dimensions.elements:
record = dataId.records[element_name]
if record is not None and record.definition in elements:
dimension_records[record.definition].setdefault(record.dataId, record)
dimension_records = self._extract_all_dimension_records_from_data_ids(
source_butler, dataIds, elements
)

handled_collections: set[str] = set()

# Do all the importing in a single transaction.
with self.transaction():
if dimension_records:
_LOG.verbose("Ensuring that dimension records exist for transferred datasets.")
for element, r in dimension_records.items():
records = [r[dataId] for dataId in r]
# Order matters.
for element in self.dimensions.sorted(dimension_records.keys()):
records = [r for r in dimension_records[element].values()]
# Assume that if the record is already present that we can
# use it without having to check that the record metadata
# is consistent.
Expand Down
6 changes: 6 additions & 0 deletions python/lsst/daf/butler/remote_butler/_remote_butler.py
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,12 @@ def import_(
# Docstring inherited.
raise NotImplementedError()

def transfer_dimension_records_from(
self, source_butler: LimitedButler | Butler, source_refs: Iterable[DatasetRef]
) -> None:
# Docstring inherited.
raise NotImplementedError()

def transfer_from(
self,
source_butler: LimitedButler,
Expand Down
5 changes: 5 additions & 0 deletions tests/test_butler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2308,6 +2308,11 @@ def assertButlerTransfers(self, purge: bool = False, storageClassName: str = "St
# Can remove with DM-35498.
self.target_butler.registry.refresh()

# Transfer the records for one ref to test the alternative API.
with self.assertLogs(logger="lsst", level=logging.DEBUG) as log_cm:
self.target_butler.transfer_dimension_records_from(self.source_butler, [source_refs[0]])
self.assertIn("number of records transferred: 1", ";".join(log_cm.output))

# Now transfer them to the second butler, including dimensions.
with self.assertLogs(logger="lsst", level=logging.DEBUG) as log_cm:
transferred = self.target_butler.transfer_from(
Expand Down
22 changes: 21 additions & 1 deletion tests/test_simpleButler.py
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,8 @@ def testButlerGet(self):
try:
flat_id, _ = butler.get("flat", dataId=dataId, collections=coll, **kwds)
except Exception as e:
raise type(e)(f"{str(e)}: dataId={dataId}, kwds={kwds}") from e
e.add_note(f"dataId={dataId}, kwds={kwds}")
raise
self.assertEqual(flat_id, flat2g.id, msg=f"DataId: {dataId}, kwds: {kwds}")

# Check that bad combinations raise.
Expand Down Expand Up @@ -602,6 +603,25 @@ def testJson(self):
from_json = type(test_item).from_json(json_str, universe=butler.dimensions)
self.assertEqual(from_json, test_item, msg=f"From JSON '{json_str}' using universe")

def test_populated_by(self):
"""Test that dimension records can find other records."""
butler = self.makeButler(writeable=True)
butler.import_(filename=os.path.join(TESTDIR, "data", "registry", "hsc-rc2-subset.yaml"))

elements = frozenset(
element for element in butler.dimensions.elements if element.hasTable() and element.viewOf is None
)

# Get a visit-based dataId.
data_ids = set(butler.registry.queryDataIds("visit", visit=1232, instrument="HSC"))

# Request all the records related to it.
records = butler._extract_all_dimension_records_from_data_ids(butler, data_ids, elements)

self.assertIn(butler.dimensions["visit_detector_region"], records, f"Keys: {records.keys()}")
self.assertIn(butler.dimensions["visit_system_membership"], records)
self.assertIn(butler.dimensions["visit_system"], records)

def testJsonDimensionRecordsAndHtmlRepresentation(self):
# Dimension Records
butler = self.makeButler(writeable=True)
Expand Down

0 comments on commit f87c7a0

Please sign in to comment.