Skip to content

Commit

Permalink
Switch to read_from_stream from read_from_bytes
Browse files Browse the repository at this point in the history
Also add flags for declaring which readers are available.
  • Loading branch information
timj committed Jun 27, 2024
1 parent 5101483 commit 99618cc
Show file tree
Hide file tree
Showing 10 changed files with 121 additions and 126 deletions.
156 changes: 73 additions & 83 deletions python/lsst/daf/butler/_formatter.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@
import logging
from abc import ABCMeta, abstractmethod
from collections.abc import Iterator, Mapping, Set
from typing import TYPE_CHECKING, Any, ClassVar, TypeAlias
from typing import TYPE_CHECKING, Any, BinaryIO, ClassVar, TypeAlias

from lsst.resources import ResourcePath
from lsst.resources import ResourceHandleProtocol, ResourcePath
from lsst.utils.introspection import get_full_type_name
from lsst.utils.timer import time_this

Expand Down Expand Up @@ -106,7 +106,7 @@ class FormatterV2(metaclass=ABCMeta):
-----
In most cases a Formatter author will not want to override the default
`read` method but will instead want to implement one or all of
`read_from_bytes`, `read_from_uri`, or `read_from_local_file`. The method
`read_from_stream`, `read_from_uri`, or `read_from_local_file`. The method
`read_from_uri` will always be attempted first and could be more efficient
(since it allows the possibility for a subset of the data file to be
accessed when parameters or components are specified) but it will not
Expand Down Expand Up @@ -152,6 +152,15 @@ class FormatterV2(metaclass=ABCMeta):
directly into memory for parsing. Set to 0 if bytes can not be parsed
directly by this formatter."""

can_read_from_uri: bool = False
"""Declare whether `read_from_uri` is available to this formatter."""

can_read_from_stream: bool = False
"""Declare whether `read_from_stream` is available to this formatter."""

can_read_from_local_file: bool = False
"""Declare whether `read_from_file` is available to this formatter."""

def __init__(
self,
file_descriptor: FileDescriptor,
Expand Down Expand Up @@ -379,37 +388,41 @@ def read(
resource.
"""
# First see if the formatter can support direct remote read from
# a URI.
with contextlib.suppress(FormatterNotImplementedError):
return self.read_directly_from_possibly_cached_uri(
component, expected_size, cache_manager=cache_manager
)

# Should we check the size with file_uri.size() if expected size is -1?
# Do we give a choice here at all and allow a formatter to say one
# or the other?
if expected_size > 0 and expected_size < self.nbytes_read_threshold:
# If this method is not implemented, fall back to using a file.
# V1 formatters in emulation won't set the read threshold.
# a URI. We allow FormatterNotImplementedError to be raised by
# the formatter in case the formatter wishes to defer to the
# local file implementation which will trigger a cache write.
if self.can_read_from_uri:
with contextlib.suppress(FormatterNotImplementedError):
return self.read_from_possibly_cached_bytes(
return self.read_directly_from_possibly_cached_uri(
component, expected_size, cache_manager=cache_manager
)

# Some formatters might want to be able to read directly from
# an open file stream. This is preferred over forcing a download
# to local file system.
if self.can_read_from_stream:
return self.read_from_possibly_cached_stream(
component, expected_size, cache_manager=cache_manager
)

# Finally, try to read the local file.
# If this is not implemented we let the exception through.
return self.read_from_possibly_cached_local_file(
component, expected_size, cache_manager=cache_manager
if self.can_read_from_local_file:
return self.read_from_possibly_cached_local_file(
component, expected_size, cache_manager=cache_manager
)

raise FormatterNotImplementedError(
f"Formatter {self.name()} could not read the file using any method."
)

def read_from_possibly_cached_bytes(
def read_from_possibly_cached_stream(
self,
component: str | None = None,
expected_size: int = -1,
*,
cache_manager: AbstractDatastoreCacheManager | None = None,
) -> Any:
"""Read from bytes, checking for possible presence in local cache.
"""Read from a stream, checking for possible presence in local cache.
Parameters
----------
Expand All @@ -432,10 +445,9 @@ def read_from_possibly_cached_bytes(
Notes
-----
Calls `read_from_bytes` but will first check the datastore cache
in case the file is present and reading that in preference. This
method will not cache a remote dataset and assumes that any size
checks have been performed before calling this method.
Calls `read_from_stream` but will first check the datastore cache
in case the file is present locally. This method will not download
a file to the local cache.
"""
cache_manager = self._ensure_cache(cache_manager)

Expand All @@ -450,23 +462,13 @@ def read_from_possibly_cached_bytes(
desired_uri = uri
msg = ""

with time_this(log, msg="Reading bytes from %s%s", args=(desired_uri, msg)):
serialized_dataset = desired_uri.read()
self._check_resource_size(uri, expected_size, len(serialized_dataset))

# The component for log messages is either the component requested
# explicitly or the component from the file descriptor.
log_component = component if component is not None else self.file_descriptor.component

log.debug(
"Deserializing %s from %d bytes from location %s with formatter %s",
f"component {log_component}" if log_component else "",
len(serialized_dataset),
uri,
self.name(),
)

return self.read_from_bytes(serialized_dataset, component=component)
with time_this(
log,
msg="Reading from file handle %s%s with formatter %s",
args=(desired_uri, msg, self.name()),
):
with uri.open("rb") as fd:
return self.read_from_stream(fd, component)

def read_directly_from_possibly_cached_uri(
self,
Expand Down Expand Up @@ -688,43 +690,35 @@ def read_from_uri(self, uri: ResourcePath, component: str | None = None) -> Any:
"""
raise FormatterNotImplementedError("This formatter does not know how to read a file.")

def read_from_local_file(self, local_uri: ResourcePath, component: str | None = None) -> Any:
"""Read a dataset from a URI guaranteed to refer to the local file
system.
def read_from_stream(
self, stream: BinaryIO | ResourceHandleProtocol, component: str | None = None
) -> Any:
"""Read from an open file descriptor.
Parameters
----------
local_uri : `lsst.resources.ResourcePath`
URI to use to read the dataset. This URI is guaranteed to be
a local file.
stream : `lsst.resources.ResourceHandleProtocol` or \
`typing.BinariyIO`
File stream to use to read the dataset.
component : `str` or `None`, optional
The component to be read from the dataset.
Returns
-------
in_memory_dataset : `typing.Any`
The Python object read from the resource.
Raises
------
FormatterNotImplementedError
Raised if there is no implementation written to read data
from a local file.
Notes
-----
This method will only be called if neither `read_from_uri` nor
`read_from_bytes` were called successfully.
The Python object read from the stream.
"""
raise FormatterNotImplementedError("This formatter does not know how to read a local file.")
raise FormatterNotImplementedError("This formatter does not know how to read from a stream.")

def read_from_bytes(self, serialized_bytes: bytes, component: str | None = None) -> Any:
"""Read a dataset from a byte stream.
def read_from_local_file(self, local_uri: ResourcePath, component: str | None = None) -> Any:
"""Read a dataset from a URI guaranteed to refer to the local file
system.
Parameters
----------
serialized_bytes : `bytes`
Contents of the serialized file.
local_uri : `lsst.resources.ResourcePath`
URI to use to read the dataset. This URI is guaranteed to be
a local file.
component : `str` or `None`, optional
The component to be read from the dataset.
Expand All @@ -736,20 +730,15 @@ def read_from_bytes(self, serialized_bytes: bytes, component: str | None = None)
Raises
------
FormatterNotImplementedError
Raised if there is no implementation for constructing a Python
object from the serialized bytes.
Raised if there is no implementation written to read data
from a local file.
Notes
-----
This method will only be called if `read_from_uri` was not called.
Additionally, in the default implementation of the `read` method,
this method will only be called if the expected size of the resource
is known and it is less than the class property
``nbytes_read_threshold``.
This method will only be called if neither `read_from_uri` nor
`read_from_stream` were called successfully.
"""
raise FormatterNotImplementedError(
"This formatter does not know how to convert bytes to a Python type."
)
raise FormatterNotImplementedError("This formatter does not know how to read a local file.")

def write(
self,
Expand Down Expand Up @@ -1860,6 +1849,9 @@ class FormatterV1inV2(FormatterV2):
`Formatter` V1 parameters to be given.
"""

can_read_from_local_file = True
"""This formatter can read from a local file."""

def __init__(
self,
file_descriptor: FileDescriptor,
Expand Down Expand Up @@ -1901,6 +1893,12 @@ def validate_write_recipes( # type: ignore
def read_from_local_file(self, local_uri: ResourcePath, component: str | None = None) -> Any:
# Need to temporarily override the location since the V1 formatter
# will not know anything about this local file.

# V2 does not have a fromBytes equivalent.
if self._formatter.can_read_bytes():
serialized_dataset = local_uri.read()
return self._formatter.fromBytes(serialized_dataset, component=component)

location = Location(*local_uri.split())
with self._formatter._updateLocation(location):
try:
Expand All @@ -1911,14 +1909,6 @@ def read_from_local_file(self, local_uri: ResourcePath, component: str | None =
raise FormatterNotImplementedError(str(e)) from e
return result

def read_from_bytes(self, serialized_bytes: bytes, component: str | None = None) -> Any:
try:
return self._formatter.fromBytes(serialized_bytes, component=component)
except NotImplementedError as e:
# V1 raises NotImplementedError but V2 is expecting something
# slightly different.
raise FormatterNotImplementedError(str(e)) from e

def to_bytes(self, in_memory_dataset: Any) -> bytes:
try:
return self._formatter.toBytes(in_memory_dataset)
Expand Down
1 change: 1 addition & 0 deletions python/lsst/daf/butler/formatters/astropyTable.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class AstropyTableFormatter(FormatterV2):

supported_write_parameters = frozenset({"format"})
supported_extensions = frozenset({".ecsv"})
can_read_from_local_file = True

def get_write_extension(self) -> str:
# Default to ECSV but allow configuration via write parameter
Expand Down
1 change: 1 addition & 0 deletions python/lsst/daf/butler/formatters/json.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class JsonFormatter(TypelessFormatter):

default_extension = ".json"
unsupported_parameters = None
can_read_from_uri = True

def read_from_uri(self, uri: ResourcePath, component: str | None = None) -> Any:
# json.load() reads the entire file content into memory
Expand Down
8 changes: 4 additions & 4 deletions python/lsst/daf/butler/formatters/logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ class ButlerLogRecordsFormatter(FormatterV2):

default_extension = ".json"
supported_extensions = frozenset({".log"})
can_read_from_local_file = True

def _get_read_pytype(self) -> type[ButlerLogRecords]:
"""Get the Python type to allow for subclasses."""
Expand All @@ -63,12 +64,11 @@ def _get_read_pytype(self) -> type[ButlerLogRecords]:
return pytype

def read_from_local_file(self, uri: ResourcePath, component: str | None = None) -> Any:
# ResourcePath open() cannot do a per-line read.
# ResourcePath open() cannot do a per-line read so can not use
# `read_from_stream` and `read_from_uri` does not give any advantage
# over pre-downloading the whole file (which can be very large).
return self._get_read_pytype().from_file(uri.ospath)

def read_from_bytes(self, serialized_bytes: bytes, component: str | None = None) -> Any:
return self._get_read_pytype().from_raw(serialized_bytes)

def to_bytes(self, in_memory_dataset: Any) -> bytes:
return in_memory_dataset.model_dump_json(exclude_unset=True, exclude_defaults=True).encode()

Expand Down
1 change: 1 addition & 0 deletions python/lsst/daf/butler/formatters/packages.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class PackagesFormatterV2(FormatterV2):

supported_write_parameters = frozenset({"format"})
supported_extensions = frozenset({".yaml", ".pickle", ".pkl", ".json"})
can_read_from_uri = True

def get_write_extension(self) -> str:
# Default to YAML but allow configuration via write parameter
Expand Down
1 change: 1 addition & 0 deletions python/lsst/daf/butler/formatters/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ class ParquetFormatter(FormatterV2):
"""

default_extension = ".parq"
can_read_from_local_file = True

def read_from_local_file(self, uri: ResourcePath, component: str | None = None) -> Any:
# Docstring inherited from Formatter.read.
Expand Down
1 change: 1 addition & 0 deletions python/lsst/daf/butler/formatters/pickle.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class PickleFormatter(TypelessFormatter):

default_extension = ".pickle"
unsupported_parameters = None
can_read_from_uri = True

def read_from_uri(self, uri: ResourcePath, component: str | None = None) -> Any:
# Read the pickle file directly from the resource into memory.
Expand Down
1 change: 1 addition & 0 deletions python/lsst/daf/butler/formatters/yaml.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class YamlFormatter(TypelessFormatter):
default_extension = ".yaml"
unsupported_parameters = None
supported_write_parameters = frozenset({"unsafe_dump"})
can_read_from_uri = True

def read_from_uri(self, uri: ResourcePath, component: str | None = None) -> Any:
# Can not use ResourcePath.open()
Expand Down
9 changes: 6 additions & 3 deletions python/lsst/daf/butler/tests/_datasetsHelper.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,12 +191,13 @@ def makeDatastore(self, sub: str | None = None) -> Datastore:
class BadWriteFormatter(YamlFormatter):
"""A formatter that never works but does leave a file behind."""

can_read_from_uri = False
can_read_from_local_file = False
can_read_from_stream = False

def read_from_uri(self, uri: ResourcePath, component: str | None = None) -> Any:
raise FormatterNotImplementedError("This formatter can not read anything")

def to_bytes(self, in_memory_dataset: Any) -> bytes:
raise FormatterNotImplementedError("This formatter can not serialize to bytes.")

def write_direct(
self,
in_memory_dataset: Any,
Expand All @@ -222,6 +223,8 @@ def write_direct(
class MultiDetectorFormatter(YamlFormatter):
"""A formatter that requires a detector to be specified in the dataID."""

can_read_from_uri = True

def read_from_uri(self, uri: ResourcePath, component: str | None = None) -> Any:
if self.data_id is None:
raise RuntimeError("This formatter requires a dataId")
Expand Down
Loading

0 comments on commit 99618cc

Please sign in to comment.