Skip to content

Commit

Permalink
Switch hybrid registry to use remote registry for queryDatasetAssocia…
Browse files Browse the repository at this point in the history
…tions.

This enables testing of the remote registry implementation, which uncovered
a problem with deserialization of general result pages. Had to add additional
code to serialize/deserialize that thing correctly.
  • Loading branch information
andy-slac committed Aug 6, 2024
1 parent 90aadc1 commit 52abca0
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 3 deletions.
49 changes: 49 additions & 0 deletions python/lsst/daf/butler/column_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@

from . import arrow_utils, ddl
from ._timespan import Timespan
from .pydantic_utils import SerializableRegion, SerializableTime

if TYPE_CHECKING:
from .name_shrinker import NameShrinker
Expand Down Expand Up @@ -134,6 +135,18 @@ def to_arrow(self) -> arrow_utils.ToArrow:
"""
raise NotImplementedError()

@abstractmethod
def type_adapter(self) -> pydantic.TypeAdapter:
"""Return pydantic type adapter that converts values of this column to
or from serializable format.
Returns
-------
type_adapter : `pydantic.TypeAdapter`
A converter instance.
"""
raise NotImplementedError()

def display(self, level: int = 0, tab: str = " ") -> list[str]:
"""Return a human-reader-focused string description of this column as
a list of lines.
Expand Down Expand Up @@ -178,6 +191,10 @@ def to_arrow(self) -> arrow_utils.ToArrow:
# Docstring inherited.
return arrow_utils.ToArrow.for_primitive(self.name, pa.uint64(), nullable=self.nullable)

def type_adapter(self) -> pydantic.TypeAdapter:
# Docstring inherited.
return pydantic.TypeAdapter(self.pytype)


@final
class StringColumnSpec(_BaseColumnSpec):
Expand All @@ -198,6 +215,10 @@ def to_arrow(self) -> arrow_utils.ToArrow:
# Docstring inherited.
return arrow_utils.ToArrow.for_primitive(self.name, pa.string(), nullable=self.nullable)

def type_adapter(self) -> pydantic.TypeAdapter:
# Docstring inherited.
return pydantic.TypeAdapter(self.pytype)


@final
class HashColumnSpec(_BaseColumnSpec):
Expand All @@ -224,6 +245,10 @@ def to_arrow(self) -> arrow_utils.ToArrow:
nullable=self.nullable,
)

def type_adapter(self) -> pydantic.TypeAdapter:
# Docstring inherited.
return pydantic.TypeAdapter(self.pytype)

Check warning on line 250 in python/lsst/daf/butler/column_spec.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/daf/butler/column_spec.py#L250

Added line #L250 was not covered by tests


@final
class FloatColumnSpec(_BaseColumnSpec):
Expand All @@ -238,6 +263,10 @@ def to_arrow(self) -> arrow_utils.ToArrow:
assert self.nullable is not None, "nullable=None should be resolved by validators"
return arrow_utils.ToArrow.for_primitive(self.name, pa.float64(), nullable=self.nullable)

def type_adapter(self) -> pydantic.TypeAdapter:
# Docstring inherited.
return pydantic.TypeAdapter(self.pytype)

Check warning on line 268 in python/lsst/daf/butler/column_spec.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/daf/butler/column_spec.py#L268

Added line #L268 was not covered by tests


@final
class BoolColumnSpec(_BaseColumnSpec):
Expand All @@ -251,6 +280,10 @@ def to_arrow(self) -> arrow_utils.ToArrow:
# Docstring inherited.
return arrow_utils.ToArrow.for_primitive(self.name, pa.bool_(), nullable=self.nullable)

def type_adapter(self) -> pydantic.TypeAdapter:
# Docstring inherited.
return pydantic.TypeAdapter(self.pytype)

Check warning on line 285 in python/lsst/daf/butler/column_spec.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/daf/butler/column_spec.py#L285

Added line #L285 was not covered by tests


@final
class UUIDColumnSpec(_BaseColumnSpec):
Expand All @@ -265,6 +298,10 @@ def to_arrow(self) -> arrow_utils.ToArrow:
assert self.nullable is not None, "nullable=None should be resolved by validators"
return arrow_utils.ToArrow.for_uuid(self.name, nullable=self.nullable)

def type_adapter(self) -> pydantic.TypeAdapter:
# Docstring inherited.
return pydantic.TypeAdapter(self.pytype)


@final
class RegionColumnSpec(_BaseColumnSpec):
Expand All @@ -284,6 +321,10 @@ def to_arrow(self) -> arrow_utils.ToArrow:
assert self.nullable is not None, "nullable=None should be resolved by validators"
return arrow_utils.ToArrow.for_region(self.name, nullable=self.nullable)

def type_adapter(self) -> pydantic.TypeAdapter:
# Docstring inherited.
return pydantic.TypeAdapter(SerializableRegion)

Check warning on line 326 in python/lsst/daf/butler/column_spec.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/daf/butler/column_spec.py#L326

Added line #L326 was not covered by tests


@final
class TimespanColumnSpec(_BaseColumnSpec):
Expand All @@ -299,6 +340,10 @@ def to_arrow(self) -> arrow_utils.ToArrow:
# Docstring inherited.
return arrow_utils.ToArrow.for_timespan(self.name, nullable=self.nullable)

def type_adapter(self) -> pydantic.TypeAdapter:
# Docstring inherited.
return pydantic.TypeAdapter(self.pytype)


@final
class DateTimeColumnSpec(_BaseColumnSpec):
Expand All @@ -315,6 +360,10 @@ def to_arrow(self) -> arrow_utils.ToArrow:
assert self.nullable is not None, "nullable=None should be resolved by validators"
return arrow_utils.ToArrow.for_datetime(self.name, nullable=self.nullable)

def type_adapter(self) -> pydantic.TypeAdapter:
# Docstring inherited.
return pydantic.TypeAdapter(SerializableTime)

Check warning on line 365 in python/lsst/daf/butler/column_spec.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/daf/butler/column_spec.py#L365

Added line #L365 was not covered by tests


ColumnSpec = Annotated[
Union[
Expand Down
20 changes: 20 additions & 0 deletions python/lsst/daf/butler/remote_butler/_query_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
from .server_models import (
AdditionalQueryInput,
DataCoordinateUpload,
GeneralResultModel,
MaterializedQuery,
QueryAnyRequestModel,
QueryAnyResponseModel,
Expand Down Expand Up @@ -260,5 +261,24 @@ def _convert_query_result_page(
spec=result_spec,
rows=[DatasetRef.from_simple(r, universe) for r in result.rows],
)
elif result_spec.result_type == "general":
assert result.type == "general"
return _convert_general_result(result_spec, result)
else:
raise NotImplementedError(f"Unhandled result type {result_spec.result_type}")


def _convert_general_result(spec: GeneralResultSpec, model: GeneralResultModel) -> GeneralResultPage:
"""Convert GeneralResultModel to a general result page."""
columns = spec.get_result_columns()
type_adapters = [
columns.get_column_spec(column.logical_table, column.field).type_adapter() for column in columns
]
rows = [
tuple(
value if value is None else type_adapter.validate_python(value)
for value, type_adapter in zip(row, type_adapters)
)
for row in model.rows
]
return GeneralResultPage(spec=spec, rows=rows)
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
DataCoordinateResultPage,
DatasetRefResultPage,
DimensionRecordResultPage,
GeneralResultPage,
ResultPage,
ResultSpec,
)
Expand All @@ -42,6 +43,7 @@
DataCoordinateResultModel,
DatasetRefResultModel,
DimensionRecordsResultModel,
GeneralResultModel,
QueryErrorResultModel,
QueryExecuteResultData,
)
Expand Down Expand Up @@ -86,5 +88,24 @@ def _convert_query_page(spec: ResultSpec, page: ResultPage) -> QueryExecuteResul
case "dataset_ref":
assert isinstance(page, DatasetRefResultPage)
return DatasetRefResultModel(rows=[ref.to_simple() for ref in page.rows])
case "general":
assert isinstance(page, GeneralResultPage)
return _convert_general_result(page)
case _:
raise NotImplementedError(f"Unhandled query result type {spec.result_type}")


def _convert_general_result(page: GeneralResultPage) -> GeneralResultModel:
"""Convert GeneralResultPage to a serializable model."""
columns = page.spec.get_result_columns()
type_adapters = [
columns.get_column_spec(column.logical_table, column.field).type_adapter() for column in columns
]
rows = [
tuple(
value if value is None else type_adapter.dump_python(value)
for value, type_adapter in zip(row, type_adapters)
)
for row in page.rows
]
return GeneralResultModel(rows=rows)
15 changes: 13 additions & 2 deletions python/lsst/daf/butler/remote_butler/server_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
"GetCollectionSummaryResponseModel",
]

from typing import Annotated, Literal, NewType, TypeAlias
from typing import Annotated, Any, Literal, NewType, TypeAlias
from uuid import UUID

import pydantic
Expand Down Expand Up @@ -276,6 +276,13 @@ class DatasetRefResultModel(pydantic.BaseModel):
rows: list[SerializedDatasetRef]


class GeneralResultModel(pydantic.BaseModel):
"""Result model for /query/execute/ when user requested general results."""

type: Literal["general"] = "general"
rows: list[tuple[Any, ...]]


class QueryErrorResultModel(pydantic.BaseModel):
"""Result model for /query/execute when an error occurs part-way through
returning rows.
Expand All @@ -293,7 +300,11 @@ class QueryErrorResultModel(pydantic.BaseModel):


QueryExecuteResultData: TypeAlias = Annotated[
DataCoordinateResultModel | DimensionRecordsResultModel | DatasetRefResultModel | QueryErrorResultModel,
DataCoordinateResultModel
| DimensionRecordsResultModel
| DatasetRefResultModel
| GeneralResultModel
| QueryErrorResultModel,
pydantic.Field(discriminator="type"),
]

Expand Down
2 changes: 1 addition & 1 deletion python/lsst/daf/butler/tests/hybrid_butler_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ def queryDatasetAssociations(
collectionTypes: Iterable[CollectionType] = CollectionType.all(),
flattenChains: bool = False,
) -> Iterator[DatasetAssociation]:
return self._direct.queryDatasetAssociations(
return self._remote.queryDatasetAssociations(
datasetType, collections, collectionTypes=collectionTypes, flattenChains=flattenChains
)

Expand Down

0 comments on commit 52abca0

Please sign in to comment.