Skip to content

Commit

Permalink
Use chunk_iter when adding chunk refs
Browse files Browse the repository at this point in the history
  • Loading branch information
rly committed May 14, 2024
1 parent 1fc62ef commit 620b436
Show file tree
Hide file tree
Showing 3 changed files with 160 additions and 39 deletions.
154 changes: 121 additions & 33 deletions lindi/LindiH5ZarrStore/LindiH5ZarrStore.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
import json
import base64
from typing import Union, List, IO, Any, Dict
from typing import Tuple, Union, List, IO, Any, Dict
import numpy as np
import zarr
from zarr.storage import Store, MemoryStore
import h5py
from tqdm import tqdm
from ._util import (
_read_bytes,
_get_all_chunk_info,
_get_chunk_index,
_get_chunk_byte_range,
_get_byte_range_for_contiguous_dataset,
_join,
Expand Down Expand Up @@ -324,7 +327,7 @@ def _get_chunk_file_bytes(self, key_parent: str, key_name: str):
if self._file is None:
raise Exception("Store is closed")
byte_offset, byte_count, inline_data = self._get_chunk_file_bytes_data(
key_parent, key_name
key_parent, [key_name]
)
if inline_data is not None:
return inline_data
Expand All @@ -351,7 +354,11 @@ def _get_chunk_file_bytes(self, key_parent: str, key_name: str):
)
return buf

def _get_chunk_file_bytes_data(self, key_parent: str, key_name: str):
def _get_single_chunk_file_bytes_data(
self,
key_parent: str,
key_name: str
):
if self._h5f is None:
raise Exception("Store is closed")
h5_item = self._h5f.get('/' + key_parent, None)
Expand All @@ -372,7 +379,7 @@ def _get_chunk_file_bytes_data(self, key_parent: str, key_name: str):
)
if key_name != "0":
raise Exception(
f"Chunk name {key_name} does not match dataset dimensions"
f"Chunk name {key_name} must be '0' for scalar dataset {key_parent}"
)

# In the case of shape 0, we raise an exception because we shouldn't be here
Expand All @@ -387,40 +394,131 @@ def _get_chunk_file_bytes_data(self, key_parent: str, key_name: str):
raise Exception(
f"Chunk name {key_name} does not match dataset dimensions for inline array {key_parent}"
)
return None, None, inline_array.chunk_bytes
inline_data = inline_array.chunk_bytes
return None, None, inline_data

# If this is a scalar, then the data should have been inline
if h5_item.ndim == 0:
raise Exception(f"No inline data for scalar dataset {key_parent}")

# Get the chunk coords from the file name
chunk_name_parts = key_name.split(".")
if len(chunk_name_parts) != h5_item.ndim:
raise Exception(f"Chunk name {key_name} does not match dataset dimensions")
chunk_coords = tuple(int(x) for x in chunk_name_parts)
for i, c in enumerate(chunk_coords):
if c < 0 or c >= h5_item.shape[i]:
raise Exception(
f"Chunk coordinates {chunk_coords} out of range for dataset {key_parent} with dtype {h5_item.dtype}"
)
if h5_item.chunks is not None:
# Get the byte range in the file for the chunk.
# Get the byte range in the file for each chunk.
try:
# Get the chunk coords from the file name
chunk_name_parts = key_name.split(".")
if len(chunk_name_parts) != h5_item.ndim:
raise Exception(f"Chunk name {key_name} does not match dataset dimensions")
chunk_coords = tuple(int(x) for x in chunk_name_parts)
for i, c in enumerate(chunk_coords):
if c < 0 or c >= h5_item.shape[i]:
raise Exception(
f"Chunk coordinates {chunk_coords} out of range for dataset {key_parent} with dtype {h5_item.dtype}"
)
byte_offset, byte_count = _get_chunk_byte_range(h5_item, chunk_coords)
except Exception as e:
raise Exception(
f"Error getting byte range for chunk {key_parent}/{key_name}. Shape: {h5_item.shape}, Chunks: {h5_item.chunks}, Chunk coords: {chunk_coords}: {e}"
)
return byte_offset, byte_count, None

else:
# In this case (contiguous dataset), we need to check that the chunk
# coordinates are (0, 0, 0, ...)
chunk_coords = tuple(int(x) for x in key_name.split("."))
if chunk_coords != (0,) * h5_item.ndim:
raise Exception(
f"Chunk coordinates {chunk_coords} are not (0, 0, 0, ...) for contiguous dataset {key_parent} with dtype {h5_item.dtype} and shape {h5_item.shape}"
)
# Get the byte range in the file for the contiguous dataset
byte_offset, byte_count = _get_byte_range_for_contiguous_dataset(h5_item)
return byte_offset, byte_count, None
return byte_offset, byte_count, None

def _add_chunk_info_to_refs(
self,
key_parent: str,
key_names: List[str],
add_ref: callable,
add_ref_chunk: callable
):
h5_item = self._h5f.get('/' + key_parent, None)

# For the case of a scalar dataset, we need to check a few things
if h5_item.ndim == 0:
if h5_item.chunks is not None:
raise Exception(
f"Unable to handle case where chunks is not None but ndim is 0 for dataset {key_parent}"
)
if len(key_names) != 1 or key_names[0] != "0":
raise Exception(
f"Chunk name {key_names[0]} must be '0' for scalar dataset {key_parent}"
)

inline_array = self._get_inline_array(key_parent, h5_item)
if inline_array.is_inline:
if len(key_names) != 1 or key_names[0] != inline_array.chunk_fname:
raise Exception(
f"Chunk name {key_name[0]} does not match dataset dimensions for inline array {key_parent}"
)
inline_data = inline_array.chunk_bytes
add_ref(f"{key_parent}/{key_names[0]}", inline_data)
return

# If this is a scalar, then the data should have been inline
if h5_item.ndim == 0:
raise Exception(f"No inline data for scalar dataset {key_parent}")

if h5_item.chunks is not None:
# Get the byte range in the file for each chunk.
# Get a list of all the chunk info.
chunk_info = _get_all_chunk_info(h5_item)
for chunk_index, key_name in tqdm(
enumerate(key_names),
total=len(key_names),
desc=f"Writing chunk info for {key_parent}",
leave=True,
delay=2 # do not show progress bar until 2 seconds have passed
):
try:
# TODO remove this code through the assert after verifying order of key_names
# Get the chunk coords from the file name
chunk_name_parts = key_name.split(".")
if len(chunk_name_parts) != h5_item.ndim:
raise Exception(f"Chunk name {key_name} does not match dataset dimensions")
chunk_coords = tuple(int(x) for x in chunk_name_parts)
for i, c in enumerate(chunk_coords):
if c < 0 or c >= h5_item.shape[i]:
raise Exception(
f"Chunk coordinates {chunk_coords} out of range for dataset {key_parent} with dtype {h5_item.dtype}"
)
assert chunk_index == _get_chunk_index(h5_item, chunk_coords)

byte_offset = chunk_info[chunk_index].byte_offset
byte_count = chunk_info[chunk_index].size
except Exception as e:
raise Exception(
f"Error getting byte range for chunk {key_parent}/{key_name}. Shape: {h5_item.shape}, Chunks: {h5_item.chunks}, Chunk coords: {chunk_coords}: {e}"
)

# In this case we reference a chunk of data in a separate file
add_ref_chunk(f"{key_parent}/{key_name}", (self._url, byte_offset, byte_count))

else:
# In this case (contiguous dataset), we need to check that the chunk
# coordinates are (0, 0, 0, ...)
if len(key_names) != 1:
raise Exception(
f"Contiguous dataset {key_parent} must have exactly one key name, but got {key_names}"
)
key_name = key_names[0]
chunk_coords = tuple(int(x) for x in key_name.split("."))
if chunk_coords != (0,) * h5_item.ndim:
raise Exception(
f"Chunk coordinates {chunk_coords} are not (0, 0, 0, ...) for contiguous dataset {key_parent} with dtype {h5_item.dtype} and shape {h5_item.shape}"
)
# Get the byte range in the file for the contiguous dataset
byte_offset, byte_count = _get_byte_range_for_contiguous_dataset(h5_item)
add_ref_chunk(f"{key_parent}/{key_name}", (self._url, byte_offset, byte_count))
return byte_offset, byte_count, None

def _get_external_array_link(self, parent_key: str, h5_item: h5py.Dataset):
# First check the memory cache
Expand Down Expand Up @@ -528,6 +626,11 @@ def _add_ref(key: str, content: Union[bytes, None]):
"ascii"
)

def _add_ref_chunk(key: str, data: Tuple[str, int, int]):
assert data[1] is not None
assert data[2] is not None
ret["refs"][key] = list(data) # TODO make downstream accept tuple

def _process_group(key, item: h5py.Group):
if isinstance(item, h5py.Group):
# Add the .zattrs and .zgroup files for the group
Expand Down Expand Up @@ -578,22 +681,7 @@ def _process_dataset(key):
chunk_names = _get_chunk_names_for_dataset(
chunk_coords_shape
)
for chunk_name in chunk_names:
byte_offset, byte_count, inline_data = (
self._get_chunk_file_bytes_data(key, chunk_name)
)
if inline_data is not None:
# The data is inline for this chunk
_add_ref(f"{key}/{chunk_name}", inline_data)
else:
# In this case we reference a chunk of data in a separate file
assert byte_offset is not None
assert byte_count is not None
ret["refs"][f"{key}/{chunk_name}"] = [
self._url,
byte_offset,
byte_count,
]
self._add_chunk_info_to_refs(key, chunk_names, _add_ref, _add_ref_chunk)

# Process the groups recursively starting with the root group
_process_group("", self._h5f)
Expand Down
44 changes: 38 additions & 6 deletions lindi/LindiH5ZarrStore/_util.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import IO, List
from typing import IO, List, Union
import json
import numpy as np
import h5py
Expand All @@ -10,11 +10,33 @@ def _read_bytes(file: IO, offset: int, count: int):
return file.read(count)


def _get_chunk_byte_range(h5_dataset: h5py.Dataset, chunk_coords: tuple) -> tuple:
"""Get the byte range in the file for a chunk of an h5py dataset.
def _get_all_chunk_info(h5_dataset: h5py.Dataset) -> Union[list, None]:
"""Get the chunk info for all the chunks of an h5py dataset as a list of StoreInfo objects.
The chunks are in order such that the last dimension changes the fastest, e.g., chunk coordinates could be:
[0, 0, 0], [0, 0, 1], [0, 0, 2], ..., [0, 1, 0], [0, 1, 1], [0, 1, 2], ..., [1, 0, 0], [1, 0, 1], [1, 0, 2], ...
This involves some low-level functions from the h5py library. First we need
to get the chunk index. Then we call _get_chunk_byte_range_for_chunk_index.
Use stinfo[i].byte_offset and stinfo[i].size to get the byte range in the file for the i-th chunk.
Requires HDF5 1.12.3 and above. If the chunk_iter method is not available, return None.
This takes 1-5 seconds for a dataset with 1e6 chunks.
This might be very slow if the dataset is stored remotely.
"""
stinfo = list()
dsid = h5_dataset.id
try:
dsid.chunk_iter(stinfo.append)
except AttributeError:
# chunk_iter is not available
return None
return stinfo


def _get_chunk_index(h5_dataset: h5py.Dataset, chunk_coords: tuple) -> int:
"""Get the chunk index for a chunk of an h5py dataset.
This involves some low-level functions from the h5py library.
"""
shape = h5_dataset.shape
chunk_shape = h5_dataset.chunks
Expand All @@ -30,13 +52,23 @@ def _get_chunk_byte_range(h5_dataset: h5py.Dataset, chunk_coords: tuple) -> tupl
chunk_index = 0
for i in range(ndim):
chunk_index += int(chunk_coords[i] * np.prod(chunk_coords_shape[i + 1:]))
return chunk_index

def _get_chunk_byte_range(h5_dataset: h5py.Dataset, chunk_coords: tuple) -> tuple:
"""Get the byte range in the file for a chunk of an h5py dataset.
This involves some low-level functions from the h5py library. First we need
to get the chunk index. Then we call _get_chunk_byte_range_for_chunk_index.
"""
chunk_index = _get_chunk_index(h5_dataset, chunk_coords)
return _get_chunk_byte_range_for_chunk_index(h5_dataset, chunk_index)


def _get_chunk_byte_range_for_chunk_index(h5_dataset: h5py.Dataset, chunk_index: int) -> tuple:
"""Get the byte range in the file for a chunk of an h5py dataset.
This involves some low-level functions from the h5py library.
This involves some low-level functions from the h5py library. Use _get_all_chunk_info instead of
calling this repeatedly for many chunks of the same dataset.
"""
# got hints from kerchunk source code
dsid = h5_dataset.id
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ numcodecs = "^0.12.1"
zarr = "^2.16.1"
h5py = "^3.10.0"
requests = "^2.31.0"
tqdm = "^4.66.4"

[tool.poetry.group.dev.dependencies]
pynwb = "^2.6.0"
Expand Down

0 comments on commit 620b436

Please sign in to comment.