Skip to content

Commit

Permalink
Merge pull request #68 from NeurodataWithoutBorders/chunk_iter
Browse files Browse the repository at this point in the history
  • Loading branch information
rly authored May 16, 2024
2 parents b7b66f2 + 08eace7 commit 929952c
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 36 deletions.
101 changes: 67 additions & 34 deletions lindi/LindiH5ZarrStore/LindiH5ZarrStore.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
import json
import base64
from typing import Union, List, IO, Any, Dict
from typing import Tuple, Union, List, IO, Any, Dict, Callable
import numpy as np
import zarr
from zarr.storage import Store, MemoryStore
import h5py
from tqdm import tqdm
from ._util import (
_read_bytes,
_get_max_num_chunks,
_apply_to_all_chunk_info,
_get_chunk_byte_range,
_get_byte_range_for_contiguous_dataset,
_join,
_get_chunk_names_for_dataset,
_write_rfs_to_file,
)
from ..conversion.attr_conversion import h5_to_zarr_attr
Expand Down Expand Up @@ -372,7 +374,7 @@ def _get_chunk_file_bytes_data(self, key_parent: str, key_name: str):
)
if key_name != "0":
raise Exception(
f"Chunk name {key_name} does not match dataset dimensions"
f"Chunk name {key_name} must be '0' for scalar dataset {key_parent}"
)

# In the case of shape 0, we raise an exception because we shouldn't be here
Expand Down Expand Up @@ -422,6 +424,62 @@ def _get_chunk_file_bytes_data(self, key_parent: str, key_name: str):
byte_offset, byte_count = _get_byte_range_for_contiguous_dataset(h5_item)
return byte_offset, byte_count, None

def _add_chunk_info_to_refs(self, key_parent: str, add_ref: Callable, add_ref_chunk: Callable):
if self._h5f is None:
raise Exception("Store is closed")
h5_item = self._h5f.get('/' + key_parent, None)
assert isinstance(h5_item, h5py.Dataset)

# For the case of a scalar dataset, we need to check a few things
if h5_item.ndim == 0:
if h5_item.chunks is not None:
raise Exception(
f"Unable to handle case where chunks is not None but ndim is 0 for dataset {key_parent}"
)

inline_array = self._get_inline_array(key_parent, h5_item)
if inline_array.is_inline:
key_name = inline_array.chunk_fname
inline_data = inline_array.chunk_bytes
add_ref(f"{key_parent}/{key_name}", inline_data)
return

# If this is a scalar, then the data should have been inline
if h5_item.ndim == 0:
raise Exception(f"No inline data for scalar dataset {key_parent}")

if h5_item.chunks is not None:
# Set up progress bar for manual updates because h5py chunk_iter used in _apply_to_all_chunk_info
# does not provide a way to hook in a progress bar
# We use max number of chunks instead of actual number of chunks because get_num_chunks is slow
# for remote datasets.
num_chunks = _get_max_num_chunks(h5_item) # NOTE: unallocated chunks are counted
pbar = tqdm(
total=num_chunks,
desc=f"Writing chunk info for {key_parent}",
leave=True,
delay=2 # do not show progress bar until 2 seconds have passed
)

chunk_size = h5_item.chunks

def store_chunk_info(chunk_info):
# Get the byte range in the file for each chunk.
chunk_offset: Tuple[int, ...] = chunk_info.chunk_offset
byte_offset = chunk_info.byte_offset
byte_count = chunk_info.size
key_name = ".".join([str(a // b) for a, b in zip(chunk_offset, chunk_size)])
add_ref_chunk(f"{key_parent}/{key_name}", (self._url, byte_offset, byte_count))
pbar.update()

_apply_to_all_chunk_info(h5_item, store_chunk_info)
pbar.close()
else:
# Get the byte range in the file for the contiguous dataset
byte_offset, byte_count = _get_byte_range_for_contiguous_dataset(h5_item)
key_name = ".".join("0" for _ in range(h5_item.ndim))
add_ref_chunk(f"{key_parent}/{key_name}", (self._url, byte_offset, byte_count))

def _get_external_array_link(self, parent_key: str, h5_item: h5py.Dataset):
# First check the memory cache
if parent_key in self._external_array_links:
Expand Down Expand Up @@ -503,8 +561,6 @@ def to_reference_file_system(self) -> dict:
raise Exception("You must specify a url to create a reference file system")
ret = {"refs": {}, "version": 1}

# TODO: use templates to decrease the size of the JSON

def _add_ref(key: str, content: Union[bytes, None]):
if content is None:
raise Exception(f"Unable to get content for key {key}")
Expand All @@ -528,6 +584,11 @@ def _add_ref(key: str, content: Union[bytes, None]):
"ascii"
)

def _add_ref_chunk(key: str, data: Tuple[str, int, int]):
assert data[1] is not None
assert data[2] is not None
ret["refs"][key] = list(data) # downstream expects a list like on read from a JSON file

def _process_group(key, item: h5py.Group):
if isinstance(item, h5py.Group):
# Add the .zattrs and .zgroup files for the group
Expand Down Expand Up @@ -562,38 +623,10 @@ def _process_dataset(key):
zarray_bytes = self.get(f"{key}/.zarray")
assert zarray_bytes is not None
_add_ref(f"{key}/.zarray", zarray_bytes)
zarray_dict = json.loads(zarray_bytes.decode("utf-8"))

if external_array_link is None:
# Only add chunk references for datasets without an external array link
shape = zarray_dict["shape"]
chunks = zarray_dict.get("chunks", None)
chunk_coords_shape = [
# the shape could be zero -- for example dandiset 000559 - acquisition/depth_video/data has shape [0, 0, 0]
(shape[i] + chunks[i] - 1) // chunks[i] if chunks[i] != 0 else 0
for i in range(len(shape))
]
# For example, chunk_names could be ['0', '1', '2', ...]
# or ['0.0', '0.1', '0.2', ...]
chunk_names = _get_chunk_names_for_dataset(
chunk_coords_shape
)
for chunk_name in chunk_names:
byte_offset, byte_count, inline_data = (
self._get_chunk_file_bytes_data(key, chunk_name)
)
if inline_data is not None:
# The data is inline for this chunk
_add_ref(f"{key}/{chunk_name}", inline_data)
else:
# In this case we reference a chunk of data in a separate file
assert byte_offset is not None
assert byte_count is not None
ret["refs"][f"{key}/{chunk_name}"] = [
self._url,
byte_offset,
byte_count,
]
self._add_chunk_info_to_refs(key, _add_ref, _add_ref_chunk)

# Process the groups recursively starting with the root group
_process_group("", self._h5f)
Expand Down
51 changes: 49 additions & 2 deletions lindi/LindiH5ZarrStore/_util.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from typing import IO, List
from typing import IO, List, Callable
import json
import numpy as np
import h5py
import math
import warnings


def _read_bytes(file: IO, offset: int, count: int):
Expand All @@ -10,6 +12,49 @@ def _read_bytes(file: IO, offset: int, count: int):
return file.read(count)


def _get_max_num_chunks(h5_dataset: h5py.Dataset):
"""Get the maximum number of chunks in an h5py dataset.
This is similar to h5_dataset.id.get_num_chunks() but significantly faster. It does not account for
whether some chunks are allocated.
"""
chunk_size = h5_dataset.chunks
assert chunk_size is not None
return math.prod([math.ceil(a / b) for a, b in zip(h5_dataset.shape, chunk_size)])


def _apply_to_all_chunk_info(h5_dataset: h5py.Dataset, callback: Callable):
"""Apply the callback function to each chunk of an h5py dataset.
The chunks are iterated in order such that the last dimension changes the fastest,
e.g., chunk coordinates could be:
[0, 0, 0], [0, 0, 1], [0, 0, 2], ..., [0, 1, 0], [0, 1, 1], [0, 1, 2], ..., [1, 0, 0], [1, 0, 1], [1, 0, 2], ...
This method tries to use the `chunk_iter` method if it is available. The `chunk_iter` method requires
HDF5 1.12.3 and above. If it is not available, this method falls back to the `get_chunk_info` method,
which is significantly slower and not recommended if the dataset has many chunks.
`chunk_iter` takes 1-5 seconds for all chunks for a dataset with 1e6 chunks.
`get_chunk_info` takes about 0.2 seconds per chunk for a dataset with 1e6 chunks.
NOTE: This method might be very slow if the dataset is stored remotely.
"""
assert h5_dataset.chunks is not None
dsid = h5_dataset.id
try:
dsid.chunk_iter(callback)
except AttributeError:
# chunk_iter is not available
num_chunks = dsid.get_num_chunks() # NOTE: this can be slow for remote datasets with many chunks
if num_chunks > 100:
warnings.warn(
f"Dataset {h5_dataset.name} has {num_chunks} chunks. Using get_chunk_info is slow. "
f"Consider upgrading to HDF5 1.12.3 or above for faster performance."
)
for index in range(num_chunks):
chunk_info = dsid.get_chunk_info(index)
callback(chunk_info)


def _get_chunk_byte_range(h5_dataset: h5py.Dataset, chunk_coords: tuple) -> tuple:
"""Get the byte range in the file for a chunk of an h5py dataset.
Expand All @@ -36,7 +81,8 @@ def _get_chunk_byte_range(h5_dataset: h5py.Dataset, chunk_coords: tuple) -> tupl
def _get_chunk_byte_range_for_chunk_index(h5_dataset: h5py.Dataset, chunk_index: int) -> tuple:
"""Get the byte range in the file for a chunk of an h5py dataset.
This involves some low-level functions from the h5py library.
This involves some low-level functions from the h5py library. Use _apply_to_all_chunk_info instead of
calling this repeatedly for many chunks of the same dataset.
"""
# got hints from kerchunk source code
dsid = h5_dataset.id
Expand Down Expand Up @@ -66,6 +112,7 @@ def _join(a: str, b: str) -> str:
return f"{a}/{b}"


# NOTE: this is no longer used
def _get_chunk_names_for_dataset(chunk_coords_shape: List[int]) -> List[str]:
"""Get the chunk names for a dataset with the given chunk coords shape.
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ numcodecs = "^0.12.1"
zarr = "^2.16.1"
h5py = "^3.10.0"
requests = "^2.31.0"
tqdm = "^4.66.4"

[tool.poetry.group.dev.dependencies]
pynwb = "^2.6.0"
Expand Down

0 comments on commit 929952c

Please sign in to comment.