Skip to content

Commit

Permalink
Merge pull request #912 from lsst/tickets/DM-41878
Browse files Browse the repository at this point in the history
DM-41878: Implement RemoteButler.get() backed by a single FileDatastore
  • Loading branch information
dhirving committed Dec 8, 2023
2 parents f87c7a0 + d718a09 commit 1e7b397
Show file tree
Hide file tree
Showing 17 changed files with 938 additions and 517 deletions.
3 changes: 2 additions & 1 deletion python/lsst/daf/butler/_butler.py
Original file line number Diff line number Diff line change
Expand Up @@ -814,7 +814,8 @@ def get_dataset_type(self, name: str) -> DatasetType:
def get_dataset(
self,
id: DatasetId,
storage_class: str | StorageClass | None,
*,
storage_class: str | StorageClass | None = None,
dimension_records: bool = False,
datastore_records: bool = False,
) -> DatasetRef | None:
Expand Down
17 changes: 17 additions & 0 deletions python/lsst/daf/butler/datastore/_datastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -540,6 +540,23 @@ def get(
"""
raise NotImplementedError("Must be implemented by subclass")

def prepare_get_for_external_client(self, ref: DatasetRef) -> object:
"""Retrieve serializable data that can be used to execute a get()
Parameters
----------
ref : `DatasetRef`
Reference to the required dataset.
Returns
-------
payload : `object`
Serializable payload containing the information needed to perform a
get() operation. This payload may be sent over the wire to another
system to perform the get().
"""
raise NotImplementedError()

@abstractmethod
def put(self, inMemoryDataset: Any, datasetRef: DatasetRef) -> None:
"""Write a `InMemoryDataset` with a given `DatasetRef` to the store.
Expand Down
6 changes: 5 additions & 1 deletion python/lsst/daf/butler/datastore/cache_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -1015,7 +1015,11 @@ class DatastoreDisabledCacheManager(AbstractDatastoreCacheManager):
in lookup keys.
"""

def __init__(self, config: str | DatastoreCacheManagerConfig, universe: DimensionUniverse):
def __init__(
self,
config: str | DatastoreCacheManagerConfig | None = None,
universe: DimensionUniverse | None = None,
):
return

def should_be_cached(self, entity: DatasetRef | DatasetType | StorageClass) -> bool:
Expand Down
98 changes: 49 additions & 49 deletions python/lsst/daf/butler/datastore/generic_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

from __future__ import annotations

__all__ = ("GenericBaseDatastore",)
__all__ = ("GenericBaseDatastore", "post_process_get")

import logging
from collections.abc import Mapping
Expand All @@ -54,54 +54,6 @@ class GenericBaseDatastore(Datastore, Generic[_InfoType]):
Should always be sub-classed since key abstract methods are missing.
"""

def _post_process_get(
self,
inMemoryDataset: object,
readStorageClass: StorageClass,
assemblerParams: Mapping[str, Any] | None = None,
isComponent: bool = False,
) -> object:
"""Given the Python object read from the datastore, manipulate
it based on the supplied parameters and ensure the Python
type is correct.
Parameters
----------
inMemoryDataset : `object`
Dataset to check.
readStorageClass: `StorageClass`
The `StorageClass` used to obtain the assembler and to
check the python type.
assemblerParams : `dict`, optional
Parameters to pass to the assembler. Can be `None`.
isComponent : `bool`, optional
If this is a component, allow the inMemoryDataset to be `None`.
Returns
-------
dataset : `object`
In-memory dataset, potentially converted to expected type.
"""
# Process any left over parameters
if assemblerParams:
inMemoryDataset = readStorageClass.delegate().handleParameters(inMemoryDataset, assemblerParams)

# Validate the returned data type matches the expected data type
pytype = readStorageClass.pytype

allowedTypes = []
if pytype:
allowedTypes.append(pytype)

# Special case components to allow them to be None
if isComponent:
allowedTypes.append(type(None))

if allowedTypes and not isinstance(inMemoryDataset, tuple(allowedTypes)):
inMemoryDataset = readStorageClass.coerce_type(inMemoryDataset)

return inMemoryDataset

def _validate_put_parameters(self, inMemoryDataset: object, ref: DatasetRef) -> None:
"""Validate the supplied arguments for put.
Expand Down Expand Up @@ -173,3 +125,51 @@ def transfer(self, inputDatastore: Datastore, ref: DatasetRef) -> None:
assert inputDatastore is not self # unless we want it for renames?
inMemoryDataset = inputDatastore.get(ref)
return self.put(inMemoryDataset, ref)


def post_process_get(
inMemoryDataset: object,
readStorageClass: StorageClass,
assemblerParams: Mapping[str, Any] | None = None,
isComponent: bool = False,
) -> object:
"""Given the Python object read from the datastore, manipulate
it based on the supplied parameters and ensure the Python
type is correct.
Parameters
----------
inMemoryDataset : `object`
Dataset to check.
readStorageClass: `StorageClass`
The `StorageClass` used to obtain the assembler and to
check the python type.
assemblerParams : `dict`, optional
Parameters to pass to the assembler. Can be `None`.
isComponent : `bool`, optional
If this is a component, allow the inMemoryDataset to be `None`.
Returns
-------
dataset : `object`
In-memory dataset, potentially converted to expected type.
"""
# Process any left over parameters
if assemblerParams:
inMemoryDataset = readStorageClass.delegate().handleParameters(inMemoryDataset, assemblerParams)

# Validate the returned data type matches the expected data type
pytype = readStorageClass.pytype

allowedTypes = []
if pytype:
allowedTypes.append(pytype)

# Special case components to allow them to be None
if isComponent:
allowedTypes.append(type(None))

if allowedTypes and not isinstance(inMemoryDataset, tuple(allowedTypes)):
inMemoryDataset = readStorageClass.coerce_type(inMemoryDataset)

return inMemoryDataset
39 changes: 37 additions & 2 deletions python/lsst/daf/butler/datastore/stored_file_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,14 @@

from __future__ import annotations

__all__ = ("StoredDatastoreItemInfo", "StoredFileInfo")
__all__ = ("StoredDatastoreItemInfo", "StoredFileInfo", "SerializedStoredFileInfo")

import inspect
from collections.abc import Iterable, Mapping
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any

from lsst.daf.butler._compat import _BaseModelCompat
from lsst.resources import ResourcePath
from lsst.utils import doImportType
from lsst.utils.introspection import get_full_type_name
Expand Down Expand Up @@ -214,7 +215,7 @@ def __init__(
"""StorageClass associated with Dataset."""

component: str | None
"""Component associated with this file. Can be None if the file does
"""Component associated with this file. Can be `None` if the file does
not refer to a component of a composite."""

checksum: str | None
Expand Down Expand Up @@ -260,6 +261,13 @@ def to_record(self, **kwargs: Any) -> dict[str, Any]:
**kwargs,
)

def to_simple(self) -> SerializedStoredFileInfo:
record = self.to_record()
# We allow None on the model but the record contains a "null string"
# instead
record["component"] = self.component
return SerializedStoredFileInfo.model_validate(record)

def file_location(self, factory: LocationFactory) -> Location:
"""Return the location of artifact.
Expand Down Expand Up @@ -307,6 +315,10 @@ def from_record(cls: type[StoredFileInfo], record: Mapping[str, Any]) -> StoredF
)
return info

@classmethod
def from_simple(cls: type[StoredFileInfo], model: SerializedStoredFileInfo) -> StoredFileInfo:
return cls.from_record(dict(model))

def update(self, **kwargs: Any) -> StoredFileInfo:
new_args = {}
for k in self.__slots__:
Expand All @@ -320,3 +332,26 @@ def update(self, **kwargs: Any) -> StoredFileInfo:

def __reduce__(self) -> str | tuple[Any, ...]:
return (self.from_record, (self.to_record(),))


class SerializedStoredFileInfo(_BaseModelCompat):
"""Serialized representation of `StoredFileInfo` properties"""

formatter: str
"""Fully-qualified name of Formatter."""

path: str
"""Path to dataset within Datastore."""

storage_class: str
"""Name of the StorageClass associated with Dataset."""

component: str | None
"""Component associated with this file. Can be `None` if the file does
not refer to a component of a composite."""

checksum: str | None
"""Checksum of the serialized dataset."""

file_size: int
"""Size of the serialized dataset in bytes."""
Loading

0 comments on commit 1e7b397

Please sign in to comment.