Skip to content

Commit

Permalink
Add unit test for general queries and fix serialization of ingest_date.
Browse files Browse the repository at this point in the history
Adding unit tests exposed the issue with serialization of ingest_time.
We still create schema with a native time type for that column, while
its corresponding ColumnSpec says it should be astropy time. The code
that I added recently used SerializableTime with type adapter to
serialize it which did not work with datetime. I had to reimplement
serialization methods to allow more flexible handling of ingest_date types.
  • Loading branch information
andy-slac committed Aug 13, 2024
1 parent 95b4ede commit 41efeb6
Show file tree
Hide file tree
Showing 4 changed files with 296 additions and 36 deletions.
141 changes: 119 additions & 22 deletions python/lsst/daf/butler/column_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
"COLLECTION_NAME_MAX_LENGTH",
)

import datetime
import textwrap
import uuid
from abc import ABC, abstractmethod
Expand Down Expand Up @@ -88,6 +89,102 @@
# that actually changing the value is a (minor) schema change.


class ColumnValueSerializer(ABC):
"""Class that knows how to serialize and deserialize column values."""

@abstractmethod
def serialize(self, value: Any) -> Any:
"""Convert column value to something that can be serialized.
Parameters
----------
value : `Any`
Column value to be serialized.
Returns
-------
value : `Any`
Column value in serializable format.
"""
raise NotImplementedError

@abstractmethod
def deserialize(self, value: Any) -> Any:
"""Convert serialized value to column value.
Parameters
----------
value : `Any`
Serialized column value.
Returns
-------
value : `Any`
Deserialized column value.
"""
raise NotImplementedError


class _DefaultColumnValueSerializer(ColumnValueSerializer):
"""Default implementation of serializer for basic types."""

def serialize(self, value: Any) -> Any:
# Docstring inherited.
return value

def deserialize(self, value: Any) -> Any:
# Docstring inherited.
return value


class _TypeAdapterColumnValueSerializer(ColumnValueSerializer):
"""Implementation of serializer that uses pydantic type adapter."""

def __init__(self, type_adapter: pydantic.TypeAdapter):
# Docstring inherited.
self._type_adapter = type_adapter

def serialize(self, value: Any) -> Any:
# Docstring inherited.
return value if value is None else self._type_adapter.dump_python(value)

def deserialize(self, value: Any) -> Any:
# Docstring inherited.
return value if value is None else self._type_adapter.validate_python(value)


class _DateTimeColumnValueSerializer(ColumnValueSerializer):
"""Implementation of serializer for ingest_time column. That column can be
either in native database time appearing as `datetime.datetime` on Python
side or integer nanoseconds appearing as astropy.time.Time. We use pydantic
type adapter for astropy time, which serializes it into integer
nanoseconds. datetime is converted to string representation to distinguish
it from integer nanoseconds (timezone handling depends entirely on what
database returns).
"""

def __init__(self) -> None:
self._astropy_adapter = pydantic.TypeAdapter(SerializableTime)

def serialize(self, value: Any) -> Any:
# Docstring inherited.
if value is None:
return None
elif isinstance(value, datetime.datetime):
return value.isoformat()
else:
return self._astropy_adapter.dump_python(value)

def deserialize(self, value: Any) -> Any:
# Docstring inherited.
if value is None:
return None
elif isinstance(value, str):
return datetime.datetime.fromisoformat(value)
else:
return self._astropy_adapter.validate_python(value)


class _BaseColumnSpec(pydantic.BaseModel, ABC):
"""Base class for descriptions of table columns."""

Expand Down Expand Up @@ -136,13 +233,13 @@ def to_arrow(self) -> arrow_utils.ToArrow:
raise NotImplementedError()

@abstractmethod
def type_adapter(self) -> pydantic.TypeAdapter:
"""Return pydantic type adapter that converts values of this column to
or from serializable format.
def serializer(self) -> ColumnValueSerializer:
"""Return object that converts values of this column to or from
serializable format.
Returns
-------
type_adapter : `pydantic.TypeAdapter`
serializer : `ColumnValueSerializer`
A converter instance.
"""
raise NotImplementedError()
Expand Down Expand Up @@ -191,9 +288,9 @@ def to_arrow(self) -> arrow_utils.ToArrow:
# Docstring inherited.
return arrow_utils.ToArrow.for_primitive(self.name, pa.uint64(), nullable=self.nullable)

def type_adapter(self) -> pydantic.TypeAdapter:
def serializer(self) -> ColumnValueSerializer:
# Docstring inherited.
return pydantic.TypeAdapter(self.pytype)
return _DefaultColumnValueSerializer()


@final
Expand All @@ -215,9 +312,9 @@ def to_arrow(self) -> arrow_utils.ToArrow:
# Docstring inherited.
return arrow_utils.ToArrow.for_primitive(self.name, pa.string(), nullable=self.nullable)

def type_adapter(self) -> pydantic.TypeAdapter:
def serializer(self) -> ColumnValueSerializer:
# Docstring inherited.
return pydantic.TypeAdapter(self.pytype)
return _DefaultColumnValueSerializer()


@final
Expand Down Expand Up @@ -245,9 +342,9 @@ def to_arrow(self) -> arrow_utils.ToArrow:
nullable=self.nullable,
)

def type_adapter(self) -> pydantic.TypeAdapter:
def serializer(self) -> ColumnValueSerializer:
# Docstring inherited.
return pydantic.TypeAdapter(self.pytype)
return _DefaultColumnValueSerializer()


@final
Expand All @@ -263,9 +360,9 @@ def to_arrow(self) -> arrow_utils.ToArrow:
assert self.nullable is not None, "nullable=None should be resolved by validators"
return arrow_utils.ToArrow.for_primitive(self.name, pa.float64(), nullable=self.nullable)

def type_adapter(self) -> pydantic.TypeAdapter:
def serializer(self) -> ColumnValueSerializer:
# Docstring inherited.
return pydantic.TypeAdapter(self.pytype)
return _DefaultColumnValueSerializer()


@final
Expand All @@ -280,9 +377,9 @@ def to_arrow(self) -> arrow_utils.ToArrow:
# Docstring inherited.
return arrow_utils.ToArrow.for_primitive(self.name, pa.bool_(), nullable=self.nullable)

def type_adapter(self) -> pydantic.TypeAdapter:
def serializer(self) -> ColumnValueSerializer:
# Docstring inherited.
return pydantic.TypeAdapter(self.pytype)
return _DefaultColumnValueSerializer()


@final
Expand All @@ -298,9 +395,9 @@ def to_arrow(self) -> arrow_utils.ToArrow:
assert self.nullable is not None, "nullable=None should be resolved by validators"
return arrow_utils.ToArrow.for_uuid(self.name, nullable=self.nullable)

def type_adapter(self) -> pydantic.TypeAdapter:
def serializer(self) -> ColumnValueSerializer:
# Docstring inherited.
return pydantic.TypeAdapter(self.pytype)
return _TypeAdapterColumnValueSerializer(pydantic.TypeAdapter(self.pytype))


@final
Expand All @@ -321,9 +418,9 @@ def to_arrow(self) -> arrow_utils.ToArrow:
assert self.nullable is not None, "nullable=None should be resolved by validators"
return arrow_utils.ToArrow.for_region(self.name, nullable=self.nullable)

def type_adapter(self) -> pydantic.TypeAdapter:
def serializer(self) -> ColumnValueSerializer:
# Docstring inherited.
return pydantic.TypeAdapter(SerializableRegion)
return _TypeAdapterColumnValueSerializer(pydantic.TypeAdapter(SerializableRegion))


@final
Expand All @@ -340,9 +437,9 @@ def to_arrow(self) -> arrow_utils.ToArrow:
# Docstring inherited.
return arrow_utils.ToArrow.for_timespan(self.name, nullable=self.nullable)

def type_adapter(self) -> pydantic.TypeAdapter:
def serializer(self) -> ColumnValueSerializer:
# Docstring inherited.
return pydantic.TypeAdapter(self.pytype)
return _TypeAdapterColumnValueSerializer(pydantic.TypeAdapter(self.pytype))


@final
Expand All @@ -360,9 +457,9 @@ def to_arrow(self) -> arrow_utils.ToArrow:
assert self.nullable is not None, "nullable=None should be resolved by validators"
return arrow_utils.ToArrow.for_datetime(self.name, nullable=self.nullable)

def type_adapter(self) -> pydantic.TypeAdapter:
def serializer(self) -> ColumnValueSerializer:
# Docstring inherited.
return pydantic.TypeAdapter(SerializableTime)
return _DateTimeColumnValueSerializer()


ColumnSpec = Annotated[
Expand Down
9 changes: 3 additions & 6 deletions python/lsst/daf/butler/remote_butler/_query_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,14 +271,11 @@ def _convert_query_result_page(
def _convert_general_result(spec: GeneralResultSpec, model: GeneralResultModel) -> GeneralResultPage:
"""Convert GeneralResultModel to a general result page."""
columns = spec.get_result_columns()
type_adapters = [
columns.get_column_spec(column.logical_table, column.field).type_adapter() for column in columns
serializers = [
columns.get_column_spec(column.logical_table, column.field).serializer() for column in columns
]
rows = [
tuple(
value if value is None else type_adapter.validate_python(value)
for value, type_adapter in zip(row, type_adapters)
)
tuple(serializer.deserialize(value) for value, serializer in zip(row, serializers))
for row in model.rows
]
return GeneralResultPage(spec=spec, rows=rows)
Original file line number Diff line number Diff line change
Expand Up @@ -98,14 +98,10 @@ def _convert_query_page(spec: ResultSpec, page: ResultPage) -> QueryExecuteResul
def _convert_general_result(page: GeneralResultPage) -> GeneralResultModel:
"""Convert GeneralResultPage to a serializable model."""
columns = page.spec.get_result_columns()
type_adapters = [
columns.get_column_spec(column.logical_table, column.field).type_adapter() for column in columns
serializers = [
columns.get_column_spec(column.logical_table, column.field).serializer() for column in columns
]
rows = [
tuple(
value if value is None else type_adapter.dump_python(value)
for value, type_adapter in zip(row, type_adapters)
)
for row in page.rows
tuple(serializer.serialize(value) for value, serializer in zip(row, serializers)) for row in page.rows
]
return GeneralResultModel(rows=rows)
Loading

0 comments on commit 41efeb6

Please sign in to comment.