Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-44875: Handle ambiguous calibration lookups on older postgres #1029

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions python/lsst/daf/butler/_exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,14 @@ class MissingDatasetTypeError(DatasetTypeError, KeyError, ButlerUserError):
error_type = "missing_dataset_type"


class UnimplementedQueryError(NotImplementedError, ButlerUserError):
"""Exception raised when the query system does not support the query
specified by the user.
"""

error_type = "unimplemented_query"


class DatasetTypeNotSupportedError(RuntimeError):
"""A `DatasetType` is not handled by this routine.

Expand Down Expand Up @@ -216,6 +224,7 @@ class UnknownButlerUserError(ButlerUserError):
InvalidQueryError,
MissingCollectionError,
MissingDatasetTypeError,
UnimplementedQueryError,
UnknownButlerUserError,
)
_USER_ERROR_MAPPING = {e.error_type: e for e in _USER_ERROR_TYPES}
Expand Down
48 changes: 40 additions & 8 deletions python/lsst/daf/butler/direct_query_driver/_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@

from .. import ddl
from .._dataset_type import DatasetType
from .._exceptions import InvalidQueryError
from .._exceptions import InvalidQueryError, UnimplementedQueryError
from ..dimensions import DataCoordinate, DataIdValue, DimensionGroup, DimensionUniverse
from ..dimensions.record_cache import DimensionRecordCache
from ..queries import tree as qt
Expand Down Expand Up @@ -270,7 +270,7 @@
spec, self.get_dataset_type(spec.dataset_type_name), context
)
case _:
raise NotImplementedError(f"Result type '{spec.result_type}' not yet implemented")
raise UnimplementedQueryError(f"Result type '{spec.result_type}' not yet implemented")

Check warning on line 273 in python/lsst/daf/butler/direct_query_driver/_driver.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/daf/butler/direct_query_driver/_driver.py#L273

Added line #L273 was not covered by tests

def materialize(
self,
Expand Down Expand Up @@ -459,7 +459,7 @@
# construction do.
plan, builder = self.analyze_query(tree, final_columns, order_by, find_first_dataset)
self.apply_query_joins(plan.joins, builder.joiner)
self.apply_query_projection(plan.projection, builder)
self.apply_query_projection(plan.projection, builder, order_by)
builder = self.apply_query_find_first(plan.find_first, builder)
builder.columns = plan.final_columns
return plan, builder
Expand Down Expand Up @@ -627,7 +627,9 @@
# Add the WHERE clause to the joiner.
joiner.where(plan.predicate.visit(SqlColumnVisitor(joiner, self)))

def apply_query_projection(self, plan: QueryProjectionPlan, builder: QueryBuilder) -> None:
def apply_query_projection(
self, plan: QueryProjectionPlan, builder: QueryBuilder, order_by: Iterable[qt.OrderExpression]
) -> None:
"""Modify `QueryBuilder` to reflect the "projection" stage of query
construction, which can involve a GROUP BY or DISTINCT [ON] clause
that enforces uniqueness.
Expand All @@ -640,6 +642,9 @@
Builder object that will be modified in place. Expected to be
initialized by `analyze_query` and further modified by
`apply_query_joins`.
order_by : `~collections.abc.Iterable` [ \
`.queries.tree.OrderExpression` ]
Order by clause associated with the query.
"""
builder.columns = plan.columns
if not plan and not builder.postprocessing.check_validity_match_count:
Expand Down Expand Up @@ -701,6 +706,8 @@
# it depends on the kinds of collection(s) we're searching and whether
# it's a find-first query.
for dataset_type, fields_for_dataset in plan.columns.dataset_fields.items():
is_calibration_search = plan.datasets[dataset_type].is_calibration_search
is_find_first_search = dataset_type == plan.find_first_dataset
for dataset_field in fields_for_dataset:
if dataset_field == "collection_key":
# If the collection_key field is present, it's needed for
Expand All @@ -710,11 +717,11 @@
unique_keys.append(builder.joiner.fields[dataset_type]["collection_key"])
else:
derived_fields.append((dataset_type, "collection_key"))
elif dataset_field == "timespan" and plan.datasets[dataset_type].is_calibration_search:
elif dataset_field == "timespan" and is_calibration_search:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This dataset_field == "timespan" case was already not covered before I got here. I wanted to add a test but can't figure out what circumstances would trigger it, since the validity range comparison is handled in-DB, and DatasetRefs don't include the calibration collection timespan field.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, this can't be tested until we add support for "general" results.

# If we're doing a non-find-first query against a
# CALIBRATION collection, the timespan is also a unique
# key...
if dataset_type == plan.find_first_dataset:
if is_find_first_search:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also I realized last night that we need to add dataset_id to unique keys for non-find-first search, I'll do that today before you review this for real hopefully

# ...unless we're doing a find-first search on this
# dataset, in which case we need to use ANY_VALUE on
# the timespan and check that _VALIDITY_MATCH_COUNT
Expand All @@ -723,7 +730,7 @@
# collection that survived the base query's WHERE
# clauses and JOINs.
if not self.db.has_any_aggregate:
raise NotImplementedError(
raise UnimplementedQueryError(

Check warning on line 733 in python/lsst/daf/butler/direct_query_driver/_driver.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/daf/butler/direct_query_driver/_driver.py#L733

Added line #L733 was not covered by tests
f"Cannot generate query that returns {dataset_type}.timespan after a "
"find-first search, because this a database does not support the ANY_VALUE "
"aggregate function (or equivalent)."
Expand All @@ -733,14 +740,39 @@
].apply_any_aggregate(self.db.apply_any_aggregate)
else:
unique_keys.extend(builder.joiner.timespans[dataset_type].flatten())
elif (
dataset_field == "dataset_id"
and is_calibration_search
and is_find_first_search
and not self.db.has_any_aggregate
):
# As with the timespans above, for a find-first search in a
# calibration collection there may be multiple matching
# datasets, and we have to check in post-processing using
# _VALIDITY_MATCH_COUNT to make sure there is only one
# dataset to avoid ambiguity in the lookup.
#
# If there is no support for ANY_VALUE, dataset_id ends up
# in GROUP BY which prevents _VALIDITY_MATCH_COUNT from
# working.
raise UnimplementedQueryError(
f"Cannot generate query that returns {dataset_type}.dataset_id after a "
"find-first search, because this a database does not support the ANY_VALUE "
"aggregate function (or equivalent)."
)
else:
# Other dataset fields derive their uniqueness from key
# fields.
derived_fields.append((dataset_type, dataset_field))
if not have_aggregates and not derived_fields:
# SELECT DISTINCT is sufficient.
builder.distinct = True
elif not have_aggregates and self.db.has_distinct_on:
# With DISTINCT ON, Postgres requires that the leftmost parts of the
# ORDER BY match the DISTINCT ON expressions. It's somewhat tricky to
# enforce that, so instead we just don't use DISTINCT ON if ORDER BY is
# present. There may be an optimization opportunity by relaxing this
# restriction.
elif not have_aggregates and self.db.has_distinct_on and len(list(order_by)) == 0:
# SELECT DISTINCT ON is sufficient and supported by this database.
builder.distinct = unique_keys
else:
Expand Down
12 changes: 9 additions & 3 deletions python/lsst/daf/butler/registry/tests/_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
InvalidQueryError,
MissingCollectionError,
MissingDatasetTypeError,
UnimplementedQueryError,
)
from ..._exceptions_legacy import DatasetTypeError
from ..._storage_class import StorageClass
Expand Down Expand Up @@ -2701,9 +2702,14 @@ def testSkipCalibs(self):
with self.assertRaises(CalibrationLookupError):
datasets = list(registry.queryDatasets("bias", collections=chain, findFirst=True))
else:
# Old query system ignores calibration collection entirely.
datasets = list(registry.queryDatasets("bias", collections=chain, findFirst=True))
self.assertGreater(len(datasets), 0)
try:
# Old query system ignores calibration collection entirely.
datasets = list(registry.queryDatasets("bias", collections=chain, findFirst=True))
self.assertGreater(len(datasets), 0)
except UnimplementedQueryError:
# New query system, on Postgres < 16, is unable to handle this
# case.
pass

def testIngestTimeQuery(self):
registry = self.makeRegistry()
Expand Down
18 changes: 8 additions & 10 deletions tests/test_remote_butler.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,21 +220,19 @@ class RemoteButlerPostgresRegistryTests(RemoteButlerRegistryTests, unittest.Test
server.
"""

postgres: TemporaryPostgresInstance

@classmethod
def setUpClass(cls):
cls.postgres = cls.enterClassContext(setup_postgres_test_db())
super().setUpClass()

@unittest.expectedFailure
def testQueryDataIdsOrderBy(self):
# TODO DM-44868: order_by sometimes causes invalid SQL to be generated
return super().testQueryDataIdsOrderBy()

def testSkipCalibs(self):
if self.postgres.server_major_version() < 16:
# TODO DM-44875: This test currently fails for older Postgres.
self.skipTest("TODO DM-44875")
return super().testSkipCalibs()
@property
def supportsCalibrationCollectionInFindFirst(self) -> bool:
# Find-first searches in calibration collections require the ANY_VALUE
# aggregate function, which is only supported starting from Postgres
# 16.
return self.postgres.server_major_version() >= 16


if __name__ == "__main__":
Expand Down
Loading