Skip to content

Commit

Permalink
Merge pull request #1059 from lsst/tickets/DM-45431
Browse files Browse the repository at this point in the history
DM-45431: Update parquet formatter to use can_accept()
  • Loading branch information
erykoff authored Aug 19, 2024
2 parents 41ffdbb + 54a7d55 commit d09db1e
Show file tree
Hide file tree
Showing 12 changed files with 435 additions and 714 deletions.
3 changes: 3 additions & 0 deletions doc/changes/DM-45431.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
The ParquetFormatter now declares it can_accept Arrow tables, Astropy tables, Numpy tables, and pandas DataFraemes.
This means that we have complete lossless storage of any parquet-compatible type into a datastore that has declared a different type; e.g. an astropy table with units can be persisted into a DataFrame storage class without those units being stripped.
This ticket also adds can_accept to the InMemoryDatastore delegates, and now one ArrowTableDelegate handles all the parquet-compatible datasets.
25 changes: 25 additions & 0 deletions python/lsst/daf/butler/_storage_class_delegate.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,31 @@ def __init__(self, storageClass: StorageClass):
assert storageClass is not None
self.storageClass = storageClass

def can_accept(self, inMemoryDataset: Any) -> bool:
"""Indicate whether this delegate can accept the specified
storage class directly.
Parameters
----------
inMemoryDataset : `object`
The dataset that is to be stored.
Returns
-------
accepts : `bool`
If `True` the delegate can handle data of this type without
requiring datastore to convert it. If `False` the datastore
will attempt to convert before storage.
Notes
-----
The base class always returns `False` even if the given type is an
instance of the delegate type. This will result in a storage class
conversion no-op but also allows mocks with mocked storage classes
to work properly.
"""
return False

@staticmethod
def _attrNames(componentName: str, getter: bool = True) -> tuple[str, ...]:
"""Return list of suitable attribute names to attempt to use.
Expand Down
8 changes: 4 additions & 4 deletions python/lsst/daf/butler/configs/storageClasses.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ storageClasses:
astropy.table.Table: lsst.daf.butler.formatters.parquet.astropy_to_pandas
numpy.ndarray: pandas.DataFrame.from_records
dict: pandas.DataFrame.from_records
delegate: lsst.daf.butler.delegates.dataframe.DataFrameDelegate
delegate: lsst.daf.butler.delegates.arrowtable.ArrowTableDelegate
derivedComponents:
columns: DataFrameIndex
rowcount: int
Expand Down Expand Up @@ -179,7 +179,7 @@ storageClasses:
pandas.core.frame.DataFrame: lsst.daf.butler.formatters.parquet.pandas_to_astropy
numpy.ndarray: lsst.daf.butler.formatters.parquet.numpy_to_astropy
dict: astropy.table.Table
delegate: lsst.daf.butler.delegates.arrowastropy.ArrowAstropyDelegate
delegate: lsst.daf.butler.delegates.arrowtable.ArrowTableDelegate
derivedComponents:
columns: ArrowColumnList
rowcount: int
Expand All @@ -200,7 +200,7 @@ storageClasses:
pandas.core.frame.DataFrame: pandas.DataFrame.to_records
astropy.table.Table: astropy.table.Table.as_array
dict: lsst.daf.butler.formatters.parquet._numpy_dict_to_numpy
delegate: lsst.daf.butler.delegates.arrownumpy.ArrowNumpyDelegate
delegate: lsst.daf.butler.delegates.arrowtable.ArrowTableDelegate
derivedComponents:
columns: ArrowColumnList
rowcount: int
Expand All @@ -221,7 +221,7 @@ storageClasses:
pandas.core.frame.DataFrame: lsst.daf.butler.formatters.parquet._pandas_to_numpy_dict
astropy.table.Table: lsst.daf.butler.formatters.parquet._astropy_to_numpy_dict
numpy.ndarray: lsst.daf.butler.formatters.parquet._numpy_to_numpy_dict
delegate: lsst.daf.butler.delegates.arrownumpydict.ArrowNumpyDictDelegate
delegate: lsst.daf.butler.delegates.arrowtable.ArrowTableDelegate
derivedComponents:
columns: ArrowColumnList
rowcount: int
Expand Down
29 changes: 0 additions & 29 deletions python/lsst/daf/butler/datastore/generic_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
from collections.abc import Mapping
from typing import TYPE_CHECKING, Any, Generic, TypeVar

from .._exceptions import DatasetTypeNotSupportedError
from ..datastore._datastore import Datastore
from .stored_file_info import StoredDatastoreItemInfo

Expand All @@ -54,34 +53,6 @@ class GenericBaseDatastore(Datastore, Generic[_InfoType]):
Should always be sub-classed since key abstract methods are missing.
"""

def _validate_put_parameters(self, inMemoryDataset: object, ref: DatasetRef) -> None:
"""Validate the supplied arguments for put.
Parameters
----------
inMemoryDataset : `object`
The dataset to store.
ref : `DatasetRef`
Reference to the associated Dataset.
"""
storageClass = ref.datasetType.storageClass

# Sanity check
if not isinstance(inMemoryDataset, storageClass.pytype):
raise TypeError(
f"Inconsistency between supplied object ({type(inMemoryDataset)}) "
f"and storage class type ({storageClass.pytype})"
)

# Confirm that we can accept this dataset
if not self.constraints.isAcceptable(ref):
# Raise rather than use boolean return value.
raise DatasetTypeNotSupportedError(
f"Dataset {ref} has been rejected by this datastore via configuration."
)

return

def remove(self, ref: DatasetRef) -> None:
"""Indicate to the Datastore that a dataset can be removed.
Expand Down
17 changes: 14 additions & 3 deletions python/lsst/daf/butler/datastores/inMemoryDatastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
from urllib.parse import urlencode

from lsst.daf.butler import DatasetId, DatasetRef, StorageClass
from lsst.daf.butler._exceptions import DatasetTypeNotSupportedError
from lsst.daf.butler.datastore import DatasetRefURIs, DatastoreConfig
from lsst.daf.butler.datastore.generic_base import GenericBaseDatastore, post_process_get
from lsst.daf.butler.datastore.record_data import DatastoreRecordData
Expand Down Expand Up @@ -397,11 +398,21 @@ def put(self, inMemoryDataset: Any, ref: DatasetRef) -> None:
allow `ChainedDatastore` to put to multiple datastores without
requiring that every datastore accepts the dataset.
"""
if not self.constraints.isAcceptable(ref):
# Raise rather than use boolean return value.
raise DatasetTypeNotSupportedError(
f"Dataset {ref} has been rejected by this datastore via configuration."
)

# May need to coerce the in memory dataset to the correct
# python type, otherwise parameters may not work.
inMemoryDataset = ref.datasetType.storageClass.coerce_type(inMemoryDataset)

self._validate_put_parameters(inMemoryDataset, ref)
try:
delegate = ref.datasetType.storageClass.delegate()
except TypeError:
# TypeError is raised when a storage class doesn't have a delegate.
delegate = None
if not delegate or not delegate.can_accept(inMemoryDataset):
inMemoryDataset = ref.datasetType.storageClass.coerce_type(inMemoryDataset)

self.datasets[ref.id] = inMemoryDataset
log.debug("Store %s in %s", ref, self.name)
Expand Down
83 changes: 0 additions & 83 deletions python/lsst/daf/butler/delegates/arrowastropy.py

This file was deleted.

85 changes: 0 additions & 85 deletions python/lsst/daf/butler/delegates/arrownumpy.py

This file was deleted.

Loading

0 comments on commit d09db1e

Please sign in to comment.