diff --git a/lindi/LindiH5ZarrStore/LindiH5ZarrStore.py b/lindi/LindiH5ZarrStore/LindiH5ZarrStore.py index 36ad26a..86cc173 100644 --- a/lindi/LindiH5ZarrStore/LindiH5ZarrStore.py +++ b/lindi/LindiH5ZarrStore/LindiH5ZarrStore.py @@ -1,16 +1,18 @@ import json import base64 -from typing import Union, List, IO, Any, Dict +from typing import Tuple, Union, List, IO, Any, Dict, Callable import numpy as np import zarr from zarr.storage import Store, MemoryStore import h5py +from tqdm import tqdm from ._util import ( _read_bytes, + _get_max_num_chunks, + _apply_to_all_chunk_info, _get_chunk_byte_range, _get_byte_range_for_contiguous_dataset, _join, - _get_chunk_names_for_dataset, _write_rfs_to_file, ) from ..conversion.attr_conversion import h5_to_zarr_attr @@ -372,7 +374,7 @@ def _get_chunk_file_bytes_data(self, key_parent: str, key_name: str): ) if key_name != "0": raise Exception( - f"Chunk name {key_name} does not match dataset dimensions" + f"Chunk name {key_name} must be '0' for scalar dataset {key_parent}" ) # In the case of shape 0, we raise an exception because we shouldn't be here @@ -422,6 +424,62 @@ def _get_chunk_file_bytes_data(self, key_parent: str, key_name: str): byte_offset, byte_count = _get_byte_range_for_contiguous_dataset(h5_item) return byte_offset, byte_count, None + def _add_chunk_info_to_refs(self, key_parent: str, add_ref: Callable, add_ref_chunk: Callable): + if self._h5f is None: + raise Exception("Store is closed") + h5_item = self._h5f.get('/' + key_parent, None) + assert isinstance(h5_item, h5py.Dataset) + + # For the case of a scalar dataset, we need to check a few things + if h5_item.ndim == 0: + if h5_item.chunks is not None: + raise Exception( + f"Unable to handle case where chunks is not None but ndim is 0 for dataset {key_parent}" + ) + + inline_array = self._get_inline_array(key_parent, h5_item) + if inline_array.is_inline: + key_name = inline_array.chunk_fname + inline_data = inline_array.chunk_bytes + add_ref(f"{key_parent}/{key_name}", inline_data) + return + + # If this is a scalar, then the data should have been inline + if h5_item.ndim == 0: + raise Exception(f"No inline data for scalar dataset {key_parent}") + + if h5_item.chunks is not None: + # Set up progress bar for manual updates because h5py chunk_iter used in _apply_to_all_chunk_info + # does not provide a way to hook in a progress bar + # We use max number of chunks instead of actual number of chunks because get_num_chunks is slow + # for remote datasets. + num_chunks = _get_max_num_chunks(h5_item) # NOTE: unallocated chunks are counted + pbar = tqdm( + total=num_chunks, + desc=f"Writing chunk info for {key_parent}", + leave=True, + delay=2 # do not show progress bar until 2 seconds have passed + ) + + chunk_size = h5_item.chunks + + def store_chunk_info(chunk_info): + # Get the byte range in the file for each chunk. + chunk_offset: Tuple[int, ...] = chunk_info.chunk_offset + byte_offset = chunk_info.byte_offset + byte_count = chunk_info.size + key_name = ".".join([str(a // b) for a, b in zip(chunk_offset, chunk_size)]) + add_ref_chunk(f"{key_parent}/{key_name}", (self._url, byte_offset, byte_count)) + pbar.update() + + _apply_to_all_chunk_info(h5_item, store_chunk_info) + pbar.close() + else: + # Get the byte range in the file for the contiguous dataset + byte_offset, byte_count = _get_byte_range_for_contiguous_dataset(h5_item) + key_name = ".".join("0" for _ in range(h5_item.ndim)) + add_ref_chunk(f"{key_parent}/{key_name}", (self._url, byte_offset, byte_count)) + def _get_external_array_link(self, parent_key: str, h5_item: h5py.Dataset): # First check the memory cache if parent_key in self._external_array_links: @@ -503,8 +561,6 @@ def to_reference_file_system(self) -> dict: raise Exception("You must specify a url to create a reference file system") ret = {"refs": {}, "version": 1} - # TODO: use templates to decrease the size of the JSON - def _add_ref(key: str, content: Union[bytes, None]): if content is None: raise Exception(f"Unable to get content for key {key}") @@ -528,6 +584,11 @@ def _add_ref(key: str, content: Union[bytes, None]): "ascii" ) + def _add_ref_chunk(key: str, data: Tuple[str, int, int]): + assert data[1] is not None + assert data[2] is not None + ret["refs"][key] = list(data) # downstream expects a list like on read from a JSON file + def _process_group(key, item: h5py.Group): if isinstance(item, h5py.Group): # Add the .zattrs and .zgroup files for the group @@ -562,38 +623,10 @@ def _process_dataset(key): zarray_bytes = self.get(f"{key}/.zarray") assert zarray_bytes is not None _add_ref(f"{key}/.zarray", zarray_bytes) - zarray_dict = json.loads(zarray_bytes.decode("utf-8")) if external_array_link is None: # Only add chunk references for datasets without an external array link - shape = zarray_dict["shape"] - chunks = zarray_dict.get("chunks", None) - chunk_coords_shape = [ - # the shape could be zero -- for example dandiset 000559 - acquisition/depth_video/data has shape [0, 0, 0] - (shape[i] + chunks[i] - 1) // chunks[i] if chunks[i] != 0 else 0 - for i in range(len(shape)) - ] - # For example, chunk_names could be ['0', '1', '2', ...] - # or ['0.0', '0.1', '0.2', ...] - chunk_names = _get_chunk_names_for_dataset( - chunk_coords_shape - ) - for chunk_name in chunk_names: - byte_offset, byte_count, inline_data = ( - self._get_chunk_file_bytes_data(key, chunk_name) - ) - if inline_data is not None: - # The data is inline for this chunk - _add_ref(f"{key}/{chunk_name}", inline_data) - else: - # In this case we reference a chunk of data in a separate file - assert byte_offset is not None - assert byte_count is not None - ret["refs"][f"{key}/{chunk_name}"] = [ - self._url, - byte_offset, - byte_count, - ] + self._add_chunk_info_to_refs(key, _add_ref, _add_ref_chunk) # Process the groups recursively starting with the root group _process_group("", self._h5f) diff --git a/lindi/LindiH5ZarrStore/_util.py b/lindi/LindiH5ZarrStore/_util.py index 6deb5c0..0badbae 100644 --- a/lindi/LindiH5ZarrStore/_util.py +++ b/lindi/LindiH5ZarrStore/_util.py @@ -1,7 +1,9 @@ -from typing import IO, List +from typing import IO, List, Callable import json import numpy as np import h5py +import math +import warnings def _read_bytes(file: IO, offset: int, count: int): @@ -10,6 +12,49 @@ def _read_bytes(file: IO, offset: int, count: int): return file.read(count) +def _get_max_num_chunks(h5_dataset: h5py.Dataset): + """Get the maximum number of chunks in an h5py dataset. + + This is similar to h5_dataset.id.get_num_chunks() but significantly faster. It does not account for + whether some chunks are allocated. + """ + chunk_size = h5_dataset.chunks + assert chunk_size is not None + return math.prod([math.ceil(a / b) for a, b in zip(h5_dataset.shape, chunk_size)]) + + +def _apply_to_all_chunk_info(h5_dataset: h5py.Dataset, callback: Callable): + """Apply the callback function to each chunk of an h5py dataset. + The chunks are iterated in order such that the last dimension changes the fastest, + e.g., chunk coordinates could be: + [0, 0, 0], [0, 0, 1], [0, 0, 2], ..., [0, 1, 0], [0, 1, 1], [0, 1, 2], ..., [1, 0, 0], [1, 0, 1], [1, 0, 2], ... + + This method tries to use the `chunk_iter` method if it is available. The `chunk_iter` method requires + HDF5 1.12.3 and above. If it is not available, this method falls back to the `get_chunk_info` method, + which is significantly slower and not recommended if the dataset has many chunks. + + `chunk_iter` takes 1-5 seconds for all chunks for a dataset with 1e6 chunks. + `get_chunk_info` takes about 0.2 seconds per chunk for a dataset with 1e6 chunks. + + NOTE: This method might be very slow if the dataset is stored remotely. + """ + assert h5_dataset.chunks is not None + dsid = h5_dataset.id + try: + dsid.chunk_iter(callback) + except AttributeError: + # chunk_iter is not available + num_chunks = dsid.get_num_chunks() # NOTE: this can be slow for remote datasets with many chunks + if num_chunks > 100: + warnings.warn( + f"Dataset {h5_dataset.name} has {num_chunks} chunks. Using get_chunk_info is slow. " + f"Consider upgrading to HDF5 1.12.3 or above for faster performance." + ) + for index in range(num_chunks): + chunk_info = dsid.get_chunk_info(index) + callback(chunk_info) + + def _get_chunk_byte_range(h5_dataset: h5py.Dataset, chunk_coords: tuple) -> tuple: """Get the byte range in the file for a chunk of an h5py dataset. @@ -36,7 +81,8 @@ def _get_chunk_byte_range(h5_dataset: h5py.Dataset, chunk_coords: tuple) -> tupl def _get_chunk_byte_range_for_chunk_index(h5_dataset: h5py.Dataset, chunk_index: int) -> tuple: """Get the byte range in the file for a chunk of an h5py dataset. - This involves some low-level functions from the h5py library. + This involves some low-level functions from the h5py library. Use _apply_to_all_chunk_info instead of + calling this repeatedly for many chunks of the same dataset. """ # got hints from kerchunk source code dsid = h5_dataset.id @@ -66,6 +112,7 @@ def _join(a: str, b: str) -> str: return f"{a}/{b}" +# NOTE: this is no longer used def _get_chunk_names_for_dataset(chunk_coords_shape: List[int]) -> List[str]: """Get the chunk names for a dataset with the given chunk coords shape. diff --git a/pyproject.toml b/pyproject.toml index 755b773..309b050 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -15,6 +15,7 @@ numcodecs = "^0.12.1" zarr = "^2.16.1" h5py = "^3.10.0" requests = "^2.31.0" +tqdm = "^4.66.4" [tool.poetry.group.dev.dependencies] pynwb = "^2.6.0"