From 620b4365cb9354c750ec1ec61569db6a11f5b12c Mon Sep 17 00:00:00 2001 From: rly Date: Mon, 13 May 2024 19:15:46 -0700 Subject: [PATCH] Use chunk_iter when adding chunk refs --- lindi/LindiH5ZarrStore/LindiH5ZarrStore.py | 154 ++++++++++++++++----- lindi/LindiH5ZarrStore/_util.py | 44 +++++- pyproject.toml | 1 + 3 files changed, 160 insertions(+), 39 deletions(-) diff --git a/lindi/LindiH5ZarrStore/LindiH5ZarrStore.py b/lindi/LindiH5ZarrStore/LindiH5ZarrStore.py index b12f592..0ee1d15 100644 --- a/lindi/LindiH5ZarrStore/LindiH5ZarrStore.py +++ b/lindi/LindiH5ZarrStore/LindiH5ZarrStore.py @@ -1,12 +1,15 @@ import json import base64 -from typing import Union, List, IO, Any, Dict +from typing import Tuple, Union, List, IO, Any, Dict import numpy as np import zarr from zarr.storage import Store, MemoryStore import h5py +from tqdm import tqdm from ._util import ( _read_bytes, + _get_all_chunk_info, + _get_chunk_index, _get_chunk_byte_range, _get_byte_range_for_contiguous_dataset, _join, @@ -324,7 +327,7 @@ def _get_chunk_file_bytes(self, key_parent: str, key_name: str): if self._file is None: raise Exception("Store is closed") byte_offset, byte_count, inline_data = self._get_chunk_file_bytes_data( - key_parent, key_name + key_parent, [key_name] ) if inline_data is not None: return inline_data @@ -351,7 +354,11 @@ def _get_chunk_file_bytes(self, key_parent: str, key_name: str): ) return buf - def _get_chunk_file_bytes_data(self, key_parent: str, key_name: str): + def _get_single_chunk_file_bytes_data( + self, + key_parent: str, + key_name: str + ): if self._h5f is None: raise Exception("Store is closed") h5_item = self._h5f.get('/' + key_parent, None) @@ -372,7 +379,7 @@ def _get_chunk_file_bytes_data(self, key_parent: str, key_name: str): ) if key_name != "0": raise Exception( - f"Chunk name {key_name} does not match dataset dimensions" + f"Chunk name {key_name} must be '0' for scalar dataset {key_parent}" ) # In the case of shape 0, we raise an exception because we shouldn't be here @@ -387,40 +394,131 @@ def _get_chunk_file_bytes_data(self, key_parent: str, key_name: str): raise Exception( f"Chunk name {key_name} does not match dataset dimensions for inline array {key_parent}" ) - return None, None, inline_array.chunk_bytes + inline_data = inline_array.chunk_bytes + return None, None, inline_data # If this is a scalar, then the data should have been inline if h5_item.ndim == 0: raise Exception(f"No inline data for scalar dataset {key_parent}") - # Get the chunk coords from the file name - chunk_name_parts = key_name.split(".") - if len(chunk_name_parts) != h5_item.ndim: - raise Exception(f"Chunk name {key_name} does not match dataset dimensions") - chunk_coords = tuple(int(x) for x in chunk_name_parts) - for i, c in enumerate(chunk_coords): - if c < 0 or c >= h5_item.shape[i]: - raise Exception( - f"Chunk coordinates {chunk_coords} out of range for dataset {key_parent} with dtype {h5_item.dtype}" - ) if h5_item.chunks is not None: - # Get the byte range in the file for the chunk. + # Get the byte range in the file for each chunk. try: + # Get the chunk coords from the file name + chunk_name_parts = key_name.split(".") + if len(chunk_name_parts) != h5_item.ndim: + raise Exception(f"Chunk name {key_name} does not match dataset dimensions") + chunk_coords = tuple(int(x) for x in chunk_name_parts) + for i, c in enumerate(chunk_coords): + if c < 0 or c >= h5_item.shape[i]: + raise Exception( + f"Chunk coordinates {chunk_coords} out of range for dataset {key_parent} with dtype {h5_item.dtype}" + ) byte_offset, byte_count = _get_chunk_byte_range(h5_item, chunk_coords) except Exception as e: raise Exception( f"Error getting byte range for chunk {key_parent}/{key_name}. Shape: {h5_item.shape}, Chunks: {h5_item.chunks}, Chunk coords: {chunk_coords}: {e}" ) + return byte_offset, byte_count, None + else: # In this case (contiguous dataset), we need to check that the chunk # coordinates are (0, 0, 0, ...) + chunk_coords = tuple(int(x) for x in key_name.split(".")) if chunk_coords != (0,) * h5_item.ndim: raise Exception( f"Chunk coordinates {chunk_coords} are not (0, 0, 0, ...) for contiguous dataset {key_parent} with dtype {h5_item.dtype} and shape {h5_item.shape}" ) # Get the byte range in the file for the contiguous dataset byte_offset, byte_count = _get_byte_range_for_contiguous_dataset(h5_item) - return byte_offset, byte_count, None + return byte_offset, byte_count, None + + def _add_chunk_info_to_refs( + self, + key_parent: str, + key_names: List[str], + add_ref: callable, + add_ref_chunk: callable + ): + h5_item = self._h5f.get('/' + key_parent, None) + + # For the case of a scalar dataset, we need to check a few things + if h5_item.ndim == 0: + if h5_item.chunks is not None: + raise Exception( + f"Unable to handle case where chunks is not None but ndim is 0 for dataset {key_parent}" + ) + if len(key_names) != 1 or key_names[0] != "0": + raise Exception( + f"Chunk name {key_names[0]} must be '0' for scalar dataset {key_parent}" + ) + + inline_array = self._get_inline_array(key_parent, h5_item) + if inline_array.is_inline: + if len(key_names) != 1 or key_names[0] != inline_array.chunk_fname: + raise Exception( + f"Chunk name {key_name[0]} does not match dataset dimensions for inline array {key_parent}" + ) + inline_data = inline_array.chunk_bytes + add_ref(f"{key_parent}/{key_names[0]}", inline_data) + return + + # If this is a scalar, then the data should have been inline + if h5_item.ndim == 0: + raise Exception(f"No inline data for scalar dataset {key_parent}") + + if h5_item.chunks is not None: + # Get the byte range in the file for each chunk. + # Get a list of all the chunk info. + chunk_info = _get_all_chunk_info(h5_item) + for chunk_index, key_name in tqdm( + enumerate(key_names), + total=len(key_names), + desc=f"Writing chunk info for {key_parent}", + leave=True, + delay=2 # do not show progress bar until 2 seconds have passed + ): + try: + # TODO remove this code through the assert after verifying order of key_names + # Get the chunk coords from the file name + chunk_name_parts = key_name.split(".") + if len(chunk_name_parts) != h5_item.ndim: + raise Exception(f"Chunk name {key_name} does not match dataset dimensions") + chunk_coords = tuple(int(x) for x in chunk_name_parts) + for i, c in enumerate(chunk_coords): + if c < 0 or c >= h5_item.shape[i]: + raise Exception( + f"Chunk coordinates {chunk_coords} out of range for dataset {key_parent} with dtype {h5_item.dtype}" + ) + assert chunk_index == _get_chunk_index(h5_item, chunk_coords) + + byte_offset = chunk_info[chunk_index].byte_offset + byte_count = chunk_info[chunk_index].size + except Exception as e: + raise Exception( + f"Error getting byte range for chunk {key_parent}/{key_name}. Shape: {h5_item.shape}, Chunks: {h5_item.chunks}, Chunk coords: {chunk_coords}: {e}" + ) + + # In this case we reference a chunk of data in a separate file + add_ref_chunk(f"{key_parent}/{key_name}", (self._url, byte_offset, byte_count)) + + else: + # In this case (contiguous dataset), we need to check that the chunk + # coordinates are (0, 0, 0, ...) + if len(key_names) != 1: + raise Exception( + f"Contiguous dataset {key_parent} must have exactly one key name, but got {key_names}" + ) + key_name = key_names[0] + chunk_coords = tuple(int(x) for x in key_name.split(".")) + if chunk_coords != (0,) * h5_item.ndim: + raise Exception( + f"Chunk coordinates {chunk_coords} are not (0, 0, 0, ...) for contiguous dataset {key_parent} with dtype {h5_item.dtype} and shape {h5_item.shape}" + ) + # Get the byte range in the file for the contiguous dataset + byte_offset, byte_count = _get_byte_range_for_contiguous_dataset(h5_item) + add_ref_chunk(f"{key_parent}/{key_name}", (self._url, byte_offset, byte_count)) + return byte_offset, byte_count, None def _get_external_array_link(self, parent_key: str, h5_item: h5py.Dataset): # First check the memory cache @@ -528,6 +626,11 @@ def _add_ref(key: str, content: Union[bytes, None]): "ascii" ) + def _add_ref_chunk(key: str, data: Tuple[str, int, int]): + assert data[1] is not None + assert data[2] is not None + ret["refs"][key] = list(data) # TODO make downstream accept tuple + def _process_group(key, item: h5py.Group): if isinstance(item, h5py.Group): # Add the .zattrs and .zgroup files for the group @@ -578,22 +681,7 @@ def _process_dataset(key): chunk_names = _get_chunk_names_for_dataset( chunk_coords_shape ) - for chunk_name in chunk_names: - byte_offset, byte_count, inline_data = ( - self._get_chunk_file_bytes_data(key, chunk_name) - ) - if inline_data is not None: - # The data is inline for this chunk - _add_ref(f"{key}/{chunk_name}", inline_data) - else: - # In this case we reference a chunk of data in a separate file - assert byte_offset is not None - assert byte_count is not None - ret["refs"][f"{key}/{chunk_name}"] = [ - self._url, - byte_offset, - byte_count, - ] + self._add_chunk_info_to_refs(key, chunk_names, _add_ref, _add_ref_chunk) # Process the groups recursively starting with the root group _process_group("", self._h5f) diff --git a/lindi/LindiH5ZarrStore/_util.py b/lindi/LindiH5ZarrStore/_util.py index 6deb5c0..b3b43ee 100644 --- a/lindi/LindiH5ZarrStore/_util.py +++ b/lindi/LindiH5ZarrStore/_util.py @@ -1,4 +1,4 @@ -from typing import IO, List +from typing import IO, List, Union import json import numpy as np import h5py @@ -10,11 +10,33 @@ def _read_bytes(file: IO, offset: int, count: int): return file.read(count) -def _get_chunk_byte_range(h5_dataset: h5py.Dataset, chunk_coords: tuple) -> tuple: - """Get the byte range in the file for a chunk of an h5py dataset. +def _get_all_chunk_info(h5_dataset: h5py.Dataset) -> Union[list, None]: + """Get the chunk info for all the chunks of an h5py dataset as a list of StoreInfo objects. + The chunks are in order such that the last dimension changes the fastest, e.g., chunk coordinates could be: + [0, 0, 0], [0, 0, 1], [0, 0, 2], ..., [0, 1, 0], [0, 1, 1], [0, 1, 2], ..., [1, 0, 0], [1, 0, 1], [1, 0, 2], ... - This involves some low-level functions from the h5py library. First we need - to get the chunk index. Then we call _get_chunk_byte_range_for_chunk_index. + Use stinfo[i].byte_offset and stinfo[i].size to get the byte range in the file for the i-th chunk. + + Requires HDF5 1.12.3 and above. If the chunk_iter method is not available, return None. + + This takes 1-5 seconds for a dataset with 1e6 chunks. + + This might be very slow if the dataset is stored remotely. + """ + stinfo = list() + dsid = h5_dataset.id + try: + dsid.chunk_iter(stinfo.append) + except AttributeError: + # chunk_iter is not available + return None + return stinfo + + +def _get_chunk_index(h5_dataset: h5py.Dataset, chunk_coords: tuple) -> int: + """Get the chunk index for a chunk of an h5py dataset. + + This involves some low-level functions from the h5py library. """ shape = h5_dataset.shape chunk_shape = h5_dataset.chunks @@ -30,13 +52,23 @@ def _get_chunk_byte_range(h5_dataset: h5py.Dataset, chunk_coords: tuple) -> tupl chunk_index = 0 for i in range(ndim): chunk_index += int(chunk_coords[i] * np.prod(chunk_coords_shape[i + 1:])) + return chunk_index + +def _get_chunk_byte_range(h5_dataset: h5py.Dataset, chunk_coords: tuple) -> tuple: + """Get the byte range in the file for a chunk of an h5py dataset. + + This involves some low-level functions from the h5py library. First we need + to get the chunk index. Then we call _get_chunk_byte_range_for_chunk_index. + """ + chunk_index = _get_chunk_index(h5_dataset, chunk_coords) return _get_chunk_byte_range_for_chunk_index(h5_dataset, chunk_index) def _get_chunk_byte_range_for_chunk_index(h5_dataset: h5py.Dataset, chunk_index: int) -> tuple: """Get the byte range in the file for a chunk of an h5py dataset. - This involves some low-level functions from the h5py library. + This involves some low-level functions from the h5py library. Use _get_all_chunk_info instead of + calling this repeatedly for many chunks of the same dataset. """ # got hints from kerchunk source code dsid = h5_dataset.id diff --git a/pyproject.toml b/pyproject.toml index 755b773..309b050 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -15,6 +15,7 @@ numcodecs = "^0.12.1" zarr = "^2.16.1" h5py = "^3.10.0" requests = "^2.31.0" +tqdm = "^4.66.4" [tool.poetry.group.dev.dependencies] pynwb = "^2.6.0"