Skip to content

Commit

Permalink
Changes from review
Browse files Browse the repository at this point in the history
  • Loading branch information
timj committed Jul 9, 2024
1 parent 0859813 commit 54a472a
Show file tree
Hide file tree
Showing 12 changed files with 97 additions and 78 deletions.
2 changes: 1 addition & 1 deletion doc/lsst.daf.butler/formatters.rst
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ A datastore knows which formatter was used to write or ingest a dataset.
There are three methods a formatter author can implement in order to read a Python type from a file:

``read_from_local_file``
The ``read_from_local_file`` method guarantees to pass in a local file resource.
The ``read_from_local_file`` method is guaranteed to be passed a local file resource.
If the resource was initially remote it will be downloaded before calling the method and the file can be cached if the butler has been configured to do that.

``read_from_uri``
Expand Down
127 changes: 74 additions & 53 deletions python/lsst/daf/butler/_formatter.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
import zipfile
from abc import ABCMeta, abstractmethod
from collections.abc import Callable, Iterator, Mapping, Set
from typing import TYPE_CHECKING, Any, BinaryIO, ClassVar, TypeAlias
from typing import TYPE_CHECKING, Any, BinaryIO, ClassVar, TypeAlias, final

from lsst.resources import ResourceHandleProtocol, ResourcePath
from lsst.utils.introspection import get_full_type_name
Expand Down Expand Up @@ -96,10 +96,9 @@ class FormatterV2:
The dataset associated with this formatter. Should not be a component
dataset ref.
write_parameters : `dict`, optional
Any parameters to be hard-coded into this instance to control how
the dataset is serialized.
Parameters to control how the dataset is serialized.
write_recipes : `dict`, optional
Detailed write Recipes indexed by recipe name.
Detailed write recipes indexed by recipe name.
**kwargs
Additional arguments that will be ignored but allow for
`Formatter` V1 parameters to be given.
Expand Down Expand Up @@ -135,11 +134,11 @@ class FormatterV2:
how a dataset is serialized. `None` indicates that no parameters are
supported."""

default_extension: str | None = None
default_extension: ClassVar[str | None] = None
"""Default extension to use when writing a file.
Can be `None` if the extension is determined dynamically. Use the
`get_write_extension` property to get the actual extension to use.
`get_write_extension` method to get the actual extension to use.
"""

supported_extensions: ClassVar[Set[str]] = frozenset()
Expand All @@ -149,13 +148,13 @@ class FormatterV2:
automatically included in the list of supported extensions.
"""

can_read_from_uri: bool = False
can_read_from_uri: ClassVar[bool] = False
"""Declare whether `read_from_uri` is available to this formatter."""

can_read_from_stream: bool = False
can_read_from_stream: ClassVar[bool] = False
"""Declare whether `read_from_stream` is available to this formatter."""

can_read_from_local_file: bool = False
can_read_from_local_file: ClassVar[bool] = False
"""Declare whether `read_from_file` is available to this formatter."""

def __init__(
Expand Down Expand Up @@ -203,9 +202,8 @@ def __repr__(self) -> str:

@property
def file_descriptor(self) -> FileDescriptor:
"""File descriptor associated with this formatter (`FileDescriptor`).
Read-only property.
"""File descriptor associated with this formatter
(`FileDescriptor`).
"""
return self._file_descriptor

Expand Down Expand Up @@ -245,7 +243,7 @@ def can_accept(self, in_memory_dataset: Any) -> bool:
Parameters
----------
in_memory_dataset : `typing.Any`
in_memory_dataset : `object`
The dataset that is to be written.
Returns
Expand Down Expand Up @@ -327,7 +325,7 @@ def _get_cache_ref(self) -> DatasetRef:
The dataset ref to use when looking in the cache.
For single-file dataset this will be the dataset ref directly.
If this is disassembled we need the component and the component
will be in the `FileDescriptor`.
will be in the `FileDescriptor`.
"""
if self.file_descriptor.component is None:
cache_ref = self.dataset_ref
Expand Down Expand Up @@ -385,10 +383,13 @@ def read(
* `read_from_uri`
* `read_from_stream`
* `read_from_local_file`
* `read_from_uri` (but with a local file)
It is possible for `read_from_uri` to be skipped if the implementation
raises `FormatterNotImplementedError`. If `read_from_stream` is
called `read_from_local_file` will never be called.
called `read_from_local_file` will never be called. If `read_from_uri`
was skipped and `read_from_local_file` is not implemented, it will
be called with a local file as a last resort.
A Formatter can also read a file from within a Zip file if the
URI associated with the `FileDescriptor` corresponds to a file with
Expand Down Expand Up @@ -430,22 +431,15 @@ def read(
# a file within the ZIP file, it is no longer possible to use the
# direct read from URI option.
uri = self.file_descriptor.location.uri
if uri.getExtension() == ".zip" and uri.fragment and uri.fragment.startswith("zip-path="):
# Preference to use URIs with "file.zip#zip-path=thing.json"
# rather than "file.zip#thing.json"
if uri.fragment and uri.fragment.startswith("zip-path="):
_, path_in_zip = uri.fragment.split("=")

# Open the Zip file using ResourcePath.
with uri.open("rb") as fd:
with zipfile.ZipFile(fd) as zf: # type: ignore
if self.can_read_from_stream:
zip_fd = zf.open(path_in_zip)
try:
in_memory_dataset = self.read_from_stream(zip_fd, component)
except Exception:
zip_fd.close()
raise
return in_memory_dataset
with contextlib.closing(zf.open(path_in_zip)) as zip_fd:
return self.read_from_stream(zip_fd, component, expected_size=expected_size)

# For now for both URI and local file options we retrieve
# the bytes to a temporary local and use that.
Expand All @@ -454,9 +448,9 @@ def read(
tmp_uri.write(zf.read(path_in_zip))

if self.can_read_from_local_file:
return self.read_from_local_file(tmp_uri, component)
return self.read_from_local_file(tmp_uri, component, expected_size=expected_size)
if self.can_read_from_uri:
return self.read_from_uri(tmp_uri, component)
return self.read_from_uri(tmp_uri, component, expected_size=expected_size)

raise FormatterNotImplementedError(

Check warning on line 455 in python/lsst/daf/butler/_formatter.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/daf/butler/_formatter.py#L455

Added line #L455 was not covered by tests
f"Formatter {self.name()} could not read the file using any method."
Expand Down Expand Up @@ -492,7 +486,7 @@ def read(

def _read_from_possibly_cached_location_no_cache_write(
self,
callback: Callable[[ResourcePath, str | None], Any],
callback: Callable[[ResourcePath, str | None, int], Any],
component: str | None = None,
expected_size: int = -1,
*,
Expand Down Expand Up @@ -531,7 +525,7 @@ def _read_from_possibly_cached_location_no_cache_write(
self.name(),
),
):
return callback(desired_uri, component)
return callback(desired_uri, component, expected_size)

def read_from_possibly_cached_stream(
self,
Expand Down Expand Up @@ -568,9 +562,9 @@ def read_from_possibly_cached_stream(
a file to the local cache.
"""

def _open_stream(uri: ResourcePath, comp: str | None) -> Any:
def _open_stream(uri: ResourcePath, comp: str | None, size: int = -1) -> Any:
with uri.open("rb") as fd:
return self.read_from_stream(fd, comp)
return self.read_from_stream(fd, comp, expected_size=size)

return self._read_from_possibly_cached_location_no_cache_write(
_open_stream, component, expected_size=expected_size, cache_manager=cache_manager
Expand Down Expand Up @@ -614,8 +608,8 @@ def read_directly_from_possibly_cached_uri(
The URI will be read by calling `read_from_uri`.
"""

def _open_uri(uri: ResourcePath, comp: str | None) -> Any:
return self.read_from_uri(uri, comp)
def _open_uri(uri: ResourcePath, comp: str | None, size: int = -1) -> Any:
return self.read_from_uri(uri, comp, expected_size=size)

return self._read_from_possibly_cached_location_no_cache_write(
_open_uri, component, expected_size=expected_size, cache_manager=cache_manager
Expand Down Expand Up @@ -653,7 +647,8 @@ def read_from_possibly_cached_local_file(
Notes
-----
The file will be downloaded and cached if it is a remote resource.
The file contents will be read using `read_from_local_file`.
The file contents will be read using `read_from_local_file` or
`read_from_uri`, with preference given to the former.
"""
cache_manager = self._ensure_cache(cache_manager)
uri = self.file_descriptor.location.uri
Expand Down Expand Up @@ -712,15 +707,30 @@ def read_from_possibly_cached_local_file(
self.name(),
),
):
result = self.read_from_local_file(local_uri, component=component)
if self.can_read_from_local_file:
result = self.read_from_local_file(
local_uri, component=component, expected_size=expected_size
)
elif self.can_read_from_uri:
# If the direct URI reader was skipped earlier and
# there is no explicit local file implementation, pass
# in the guaranteed local URI to the generic reader.
result = self.read_from_uri(

Check warning on line 718 in python/lsst/daf/butler/_formatter.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/daf/butler/_formatter.py#L718

Added line #L718 was not covered by tests
local_uri, component=component, expected_size=expected_size
)
else:
raise FormatterNotImplementedError(

Check warning on line 722 in python/lsst/daf/butler/_formatter.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/daf/butler/_formatter.py#L722

Added line #L722 was not covered by tests
"Unexpectedly found no formatter implementation to read from a local file "
"or URI to local file."
)

# File was read successfully so can move to cache
if can_be_cached:
cache_manager.move_to_cache(local_uri, cache_ref)

return result

def read_from_uri(self, uri: ResourcePath, component: str | None = None) -> Any:
def read_from_uri(self, uri: ResourcePath, component: str | None = None, expected_size: int = -1) -> Any:
"""Read a dataset from a URI that can be local or remote.
Parameters
Expand All @@ -730,10 +740,13 @@ def read_from_uri(self, uri: ResourcePath, component: str | None = None) -> Any:
and can refer to the actual resource or to a locally cached file.
component : `str` or `None`, optional
The component to be read from the dataset.
expected_size : `int`, optional
If known the expected size of the resource to read. This can be
``-1`` indicates the file size is not known.
Returns
-------
in_memory_dataset : `typing.Any`
in_memory_dataset : `object`
The Python object read from the resource.
Raises
Expand All @@ -748,8 +761,8 @@ def read_from_uri(self, uri: ResourcePath, component: str | None = None) -> Any:
``can_read_from_uri`` is set to `True`.
It is possible that a cached local file will be given to this method
even if it was originally a remote URI. This can happen if the write
resulted in the file being added to the local cache.
even if it was originally a remote URI. This can happen if the original
write resulted in the file being added to the local cache.
If the full file is being read this file will not be added to the
local cache. Consider raising `FormatterNotImplementedError` in
Expand All @@ -761,21 +774,24 @@ def read_from_uri(self, uri: ResourcePath, component: str | None = None) -> Any:
raise FormatterNotImplementedError("This formatter does not know how to read a file.")

Check warning on line 774 in python/lsst/daf/butler/_formatter.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/daf/butler/_formatter.py#L774

Added line #L774 was not covered by tests

def read_from_stream(
self, stream: BinaryIO | ResourceHandleProtocol, component: str | None = None
self, stream: BinaryIO | ResourceHandleProtocol, component: str | None = None, expected_size: int = -1
) -> Any:
"""Read from an open file descriptor.
Parameters
----------
stream : `lsst.resources.ResourceHandleProtocol` or \
`typing.BinariyIO`
`typing.BinaryIO`
File stream to use to read the dataset.
component : `str` or `None`, optional
The component to be read from the dataset.
expected_size : `int`, optional
If known the expected size of the resource to read. This can be
``-1`` indicates the file size is not known.
Returns
-------
in_memory_dataset : `typing.Any`
in_memory_dataset : `object`
The Python object read from the stream.
Notes
Expand All @@ -784,21 +800,25 @@ def read_from_stream(
"""
raise FormatterNotImplementedError("This formatter does not know how to read from a stream.")

Check warning on line 801 in python/lsst/daf/butler/_formatter.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/daf/butler/_formatter.py#L801

Added line #L801 was not covered by tests

def read_from_local_file(self, local_uri: ResourcePath, component: str | None = None) -> Any:
def read_from_local_file(
self, local_uri: ResourcePath, component: str | None = None, expected_size: int = -1
) -> Any:
"""Read a dataset from a URI guaranteed to refer to the local file
system.
Parameters
----------
local_uri : `lsst.resources.ResourcePath`
URI to use to read the dataset. This URI is guaranteed to be
a local file.
Path to a local file that should be read.
component : `str` or `None`, optional
The component to be read from the dataset.
expected_size : `int`, optional
If known the expected size of the resource to read. This can be
``-1`` indicates the file size is not known.
Returns
-------
in_memory_dataset : `typing.Any`
in_memory_dataset : `object`
The Python object read from the resource.
Raises
Expand All @@ -815,6 +835,7 @@ def read_from_local_file(self, local_uri: ResourcePath, component: str | None =
"""
raise FormatterNotImplementedError("This formatter does not know how to read a local file.")

Check warning on line 836 in python/lsst/daf/butler/_formatter.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/daf/butler/_formatter.py#L836

Added line #L836 was not covered by tests

@final
def write(
self,
in_memory_dataset: Any,
Expand Down Expand Up @@ -1129,8 +1150,7 @@ class Formatter(metaclass=ABCMeta):
dataId : `DataCoordinate`
Data ID associated with this formatter.
writeParameters : `dict`, optional
Any parameters to be hard-coded into this instance to control how
the dataset is serialized.
Parameters to control how the dataset is serialized.
writeRecipes : `dict`, optional
Detailed write Recipes indexed by recipe name.
**kwargs
Expand Down Expand Up @@ -1218,9 +1238,8 @@ def __repr__(self) -> str:

@property
def fileDescriptor(self) -> FileDescriptor:
"""File descriptor associated with this formatter (`FileDescriptor`).
Read-only property.
"""File descriptor associated with this formatter
(`FileDescriptor`).
"""
return self._fileDescriptor

Expand Down Expand Up @@ -1249,7 +1268,7 @@ def can_accept(self, in_memory_dataset: Any) -> bool:
Parameters
----------
in_memory_dataset : `typing.Any`
in_memory_dataset : `object`
The dataset that is to be written.
Returns
Expand Down Expand Up @@ -1999,7 +2018,9 @@ def validate_write_recipes( # type: ignore
# for a dynamic shim. Luckily the shim is only used as an instance.
return self._formatter.validate_write_recipes(recipes)

def read_from_local_file(self, local_uri: ResourcePath, component: str | None = None) -> Any:
def read_from_local_file(
self, local_uri: ResourcePath, component: str | None = None, expected_size: int = -1
) -> Any:
# Need to temporarily override the location since the V1 formatter
# will not know anything about this local file.

Expand Down
4 changes: 3 additions & 1 deletion python/lsst/daf/butler/formatters/astropyTable.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ def get_write_extension(self) -> str:
# Other supported formats can be added here
raise RuntimeError(f"Requested file format '{format}' is not supported for Table")

def read_from_local_file(self, local_uri: ResourcePath, component: str | None = None) -> Any:
def read_from_local_file(
self, local_uri: ResourcePath, component: str | None = None, expected_size: int = -1
) -> Any:
pytype = self.file_descriptor.storageClass.pytype
if not issubclass(pytype, astropy.table.Table):
raise TypeError(f"Python type {pytype} does not seem to be a astropy Table type")

Check warning on line 60 in python/lsst/daf/butler/formatters/astropyTable.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/daf/butler/formatters/astropyTable.py#L60

Added line #L60 was not covered by tests
Expand Down
2 changes: 1 addition & 1 deletion python/lsst/daf/butler/formatters/json.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class JsonFormatter(TypelessFormatter):
unsupported_parameters = None
can_read_from_uri = True

def read_from_uri(self, uri: ResourcePath, component: str | None = None) -> Any:
def read_from_uri(self, uri: ResourcePath, component: str | None = None, expected_size: int = -1) -> Any:
# json.load() reads the entire file content into memory
# and is no different from json.loads(uri.read()). It does not attempt
# to support incremental reading to minimize memory usage.
Expand Down
6 changes: 4 additions & 2 deletions python/lsst/daf/butler/formatters/logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,13 @@ def _get_read_pytype(self) -> type[ButlerLogRecords]:
raise RuntimeError(f"Python type {pytype} does not seem to be a ButlerLogRecords type")
return pytype

def read_from_local_file(self, uri: ResourcePath, component: str | None = None) -> Any:
def read_from_local_file(
self, local_uri: ResourcePath, component: str | None = None, expected_size: int = -1
) -> Any:
# ResourcePath open() cannot do a per-line read so can not use
# `read_from_stream` and `read_from_uri` does not give any advantage
# over pre-downloading the whole file (which can be very large).
return self._get_read_pytype().from_file(uri.ospath)
return self._get_read_pytype().from_file(local_uri.ospath)

def to_bytes(self, in_memory_dataset: Any) -> bytes:
return in_memory_dataset.model_dump_json(exclude_unset=True, exclude_defaults=True).encode()
Loading

0 comments on commit 54a472a

Please sign in to comment.