From 620b4365cb9354c750ec1ec61569db6a11f5b12c Mon Sep 17 00:00:00 2001 From: rly Date: Mon, 13 May 2024 19:15:46 -0700 Subject: [PATCH 01/12] Use chunk_iter when adding chunk refs --- lindi/LindiH5ZarrStore/LindiH5ZarrStore.py | 154 ++++++++++++++++----- lindi/LindiH5ZarrStore/_util.py | 44 +++++- pyproject.toml | 1 + 3 files changed, 160 insertions(+), 39 deletions(-) diff --git a/lindi/LindiH5ZarrStore/LindiH5ZarrStore.py b/lindi/LindiH5ZarrStore/LindiH5ZarrStore.py index b12f592..0ee1d15 100644 --- a/lindi/LindiH5ZarrStore/LindiH5ZarrStore.py +++ b/lindi/LindiH5ZarrStore/LindiH5ZarrStore.py @@ -1,12 +1,15 @@ import json import base64 -from typing import Union, List, IO, Any, Dict +from typing import Tuple, Union, List, IO, Any, Dict import numpy as np import zarr from zarr.storage import Store, MemoryStore import h5py +from tqdm import tqdm from ._util import ( _read_bytes, + _get_all_chunk_info, + _get_chunk_index, _get_chunk_byte_range, _get_byte_range_for_contiguous_dataset, _join, @@ -324,7 +327,7 @@ def _get_chunk_file_bytes(self, key_parent: str, key_name: str): if self._file is None: raise Exception("Store is closed") byte_offset, byte_count, inline_data = self._get_chunk_file_bytes_data( - key_parent, key_name + key_parent, [key_name] ) if inline_data is not None: return inline_data @@ -351,7 +354,11 @@ def _get_chunk_file_bytes(self, key_parent: str, key_name: str): ) return buf - def _get_chunk_file_bytes_data(self, key_parent: str, key_name: str): + def _get_single_chunk_file_bytes_data( + self, + key_parent: str, + key_name: str + ): if self._h5f is None: raise Exception("Store is closed") h5_item = self._h5f.get('/' + key_parent, None) @@ -372,7 +379,7 @@ def _get_chunk_file_bytes_data(self, key_parent: str, key_name: str): ) if key_name != "0": raise Exception( - f"Chunk name {key_name} does not match dataset dimensions" + f"Chunk name {key_name} must be '0' for scalar dataset {key_parent}" ) # In the case of shape 0, we raise an exception because we shouldn't be here @@ -387,40 +394,131 @@ def _get_chunk_file_bytes_data(self, key_parent: str, key_name: str): raise Exception( f"Chunk name {key_name} does not match dataset dimensions for inline array {key_parent}" ) - return None, None, inline_array.chunk_bytes + inline_data = inline_array.chunk_bytes + return None, None, inline_data # If this is a scalar, then the data should have been inline if h5_item.ndim == 0: raise Exception(f"No inline data for scalar dataset {key_parent}") - # Get the chunk coords from the file name - chunk_name_parts = key_name.split(".") - if len(chunk_name_parts) != h5_item.ndim: - raise Exception(f"Chunk name {key_name} does not match dataset dimensions") - chunk_coords = tuple(int(x) for x in chunk_name_parts) - for i, c in enumerate(chunk_coords): - if c < 0 or c >= h5_item.shape[i]: - raise Exception( - f"Chunk coordinates {chunk_coords} out of range for dataset {key_parent} with dtype {h5_item.dtype}" - ) if h5_item.chunks is not None: - # Get the byte range in the file for the chunk. + # Get the byte range in the file for each chunk. try: + # Get the chunk coords from the file name + chunk_name_parts = key_name.split(".") + if len(chunk_name_parts) != h5_item.ndim: + raise Exception(f"Chunk name {key_name} does not match dataset dimensions") + chunk_coords = tuple(int(x) for x in chunk_name_parts) + for i, c in enumerate(chunk_coords): + if c < 0 or c >= h5_item.shape[i]: + raise Exception( + f"Chunk coordinates {chunk_coords} out of range for dataset {key_parent} with dtype {h5_item.dtype}" + ) byte_offset, byte_count = _get_chunk_byte_range(h5_item, chunk_coords) except Exception as e: raise Exception( f"Error getting byte range for chunk {key_parent}/{key_name}. Shape: {h5_item.shape}, Chunks: {h5_item.chunks}, Chunk coords: {chunk_coords}: {e}" ) + return byte_offset, byte_count, None + else: # In this case (contiguous dataset), we need to check that the chunk # coordinates are (0, 0, 0, ...) + chunk_coords = tuple(int(x) for x in key_name.split(".")) if chunk_coords != (0,) * h5_item.ndim: raise Exception( f"Chunk coordinates {chunk_coords} are not (0, 0, 0, ...) for contiguous dataset {key_parent} with dtype {h5_item.dtype} and shape {h5_item.shape}" ) # Get the byte range in the file for the contiguous dataset byte_offset, byte_count = _get_byte_range_for_contiguous_dataset(h5_item) - return byte_offset, byte_count, None + return byte_offset, byte_count, None + + def _add_chunk_info_to_refs( + self, + key_parent: str, + key_names: List[str], + add_ref: callable, + add_ref_chunk: callable + ): + h5_item = self._h5f.get('/' + key_parent, None) + + # For the case of a scalar dataset, we need to check a few things + if h5_item.ndim == 0: + if h5_item.chunks is not None: + raise Exception( + f"Unable to handle case where chunks is not None but ndim is 0 for dataset {key_parent}" + ) + if len(key_names) != 1 or key_names[0] != "0": + raise Exception( + f"Chunk name {key_names[0]} must be '0' for scalar dataset {key_parent}" + ) + + inline_array = self._get_inline_array(key_parent, h5_item) + if inline_array.is_inline: + if len(key_names) != 1 or key_names[0] != inline_array.chunk_fname: + raise Exception( + f"Chunk name {key_name[0]} does not match dataset dimensions for inline array {key_parent}" + ) + inline_data = inline_array.chunk_bytes + add_ref(f"{key_parent}/{key_names[0]}", inline_data) + return + + # If this is a scalar, then the data should have been inline + if h5_item.ndim == 0: + raise Exception(f"No inline data for scalar dataset {key_parent}") + + if h5_item.chunks is not None: + # Get the byte range in the file for each chunk. + # Get a list of all the chunk info. + chunk_info = _get_all_chunk_info(h5_item) + for chunk_index, key_name in tqdm( + enumerate(key_names), + total=len(key_names), + desc=f"Writing chunk info for {key_parent}", + leave=True, + delay=2 # do not show progress bar until 2 seconds have passed + ): + try: + # TODO remove this code through the assert after verifying order of key_names + # Get the chunk coords from the file name + chunk_name_parts = key_name.split(".") + if len(chunk_name_parts) != h5_item.ndim: + raise Exception(f"Chunk name {key_name} does not match dataset dimensions") + chunk_coords = tuple(int(x) for x in chunk_name_parts) + for i, c in enumerate(chunk_coords): + if c < 0 or c >= h5_item.shape[i]: + raise Exception( + f"Chunk coordinates {chunk_coords} out of range for dataset {key_parent} with dtype {h5_item.dtype}" + ) + assert chunk_index == _get_chunk_index(h5_item, chunk_coords) + + byte_offset = chunk_info[chunk_index].byte_offset + byte_count = chunk_info[chunk_index].size + except Exception as e: + raise Exception( + f"Error getting byte range for chunk {key_parent}/{key_name}. Shape: {h5_item.shape}, Chunks: {h5_item.chunks}, Chunk coords: {chunk_coords}: {e}" + ) + + # In this case we reference a chunk of data in a separate file + add_ref_chunk(f"{key_parent}/{key_name}", (self._url, byte_offset, byte_count)) + + else: + # In this case (contiguous dataset), we need to check that the chunk + # coordinates are (0, 0, 0, ...) + if len(key_names) != 1: + raise Exception( + f"Contiguous dataset {key_parent} must have exactly one key name, but got {key_names}" + ) + key_name = key_names[0] + chunk_coords = tuple(int(x) for x in key_name.split(".")) + if chunk_coords != (0,) * h5_item.ndim: + raise Exception( + f"Chunk coordinates {chunk_coords} are not (0, 0, 0, ...) for contiguous dataset {key_parent} with dtype {h5_item.dtype} and shape {h5_item.shape}" + ) + # Get the byte range in the file for the contiguous dataset + byte_offset, byte_count = _get_byte_range_for_contiguous_dataset(h5_item) + add_ref_chunk(f"{key_parent}/{key_name}", (self._url, byte_offset, byte_count)) + return byte_offset, byte_count, None def _get_external_array_link(self, parent_key: str, h5_item: h5py.Dataset): # First check the memory cache @@ -528,6 +626,11 @@ def _add_ref(key: str, content: Union[bytes, None]): "ascii" ) + def _add_ref_chunk(key: str, data: Tuple[str, int, int]): + assert data[1] is not None + assert data[2] is not None + ret["refs"][key] = list(data) # TODO make downstream accept tuple + def _process_group(key, item: h5py.Group): if isinstance(item, h5py.Group): # Add the .zattrs and .zgroup files for the group @@ -578,22 +681,7 @@ def _process_dataset(key): chunk_names = _get_chunk_names_for_dataset( chunk_coords_shape ) - for chunk_name in chunk_names: - byte_offset, byte_count, inline_data = ( - self._get_chunk_file_bytes_data(key, chunk_name) - ) - if inline_data is not None: - # The data is inline for this chunk - _add_ref(f"{key}/{chunk_name}", inline_data) - else: - # In this case we reference a chunk of data in a separate file - assert byte_offset is not None - assert byte_count is not None - ret["refs"][f"{key}/{chunk_name}"] = [ - self._url, - byte_offset, - byte_count, - ] + self._add_chunk_info_to_refs(key, chunk_names, _add_ref, _add_ref_chunk) # Process the groups recursively starting with the root group _process_group("", self._h5f) diff --git a/lindi/LindiH5ZarrStore/_util.py b/lindi/LindiH5ZarrStore/_util.py index 6deb5c0..b3b43ee 100644 --- a/lindi/LindiH5ZarrStore/_util.py +++ b/lindi/LindiH5ZarrStore/_util.py @@ -1,4 +1,4 @@ -from typing import IO, List +from typing import IO, List, Union import json import numpy as np import h5py @@ -10,11 +10,33 @@ def _read_bytes(file: IO, offset: int, count: int): return file.read(count) -def _get_chunk_byte_range(h5_dataset: h5py.Dataset, chunk_coords: tuple) -> tuple: - """Get the byte range in the file for a chunk of an h5py dataset. +def _get_all_chunk_info(h5_dataset: h5py.Dataset) -> Union[list, None]: + """Get the chunk info for all the chunks of an h5py dataset as a list of StoreInfo objects. + The chunks are in order such that the last dimension changes the fastest, e.g., chunk coordinates could be: + [0, 0, 0], [0, 0, 1], [0, 0, 2], ..., [0, 1, 0], [0, 1, 1], [0, 1, 2], ..., [1, 0, 0], [1, 0, 1], [1, 0, 2], ... - This involves some low-level functions from the h5py library. First we need - to get the chunk index. Then we call _get_chunk_byte_range_for_chunk_index. + Use stinfo[i].byte_offset and stinfo[i].size to get the byte range in the file for the i-th chunk. + + Requires HDF5 1.12.3 and above. If the chunk_iter method is not available, return None. + + This takes 1-5 seconds for a dataset with 1e6 chunks. + + This might be very slow if the dataset is stored remotely. + """ + stinfo = list() + dsid = h5_dataset.id + try: + dsid.chunk_iter(stinfo.append) + except AttributeError: + # chunk_iter is not available + return None + return stinfo + + +def _get_chunk_index(h5_dataset: h5py.Dataset, chunk_coords: tuple) -> int: + """Get the chunk index for a chunk of an h5py dataset. + + This involves some low-level functions from the h5py library. """ shape = h5_dataset.shape chunk_shape = h5_dataset.chunks @@ -30,13 +52,23 @@ def _get_chunk_byte_range(h5_dataset: h5py.Dataset, chunk_coords: tuple) -> tupl chunk_index = 0 for i in range(ndim): chunk_index += int(chunk_coords[i] * np.prod(chunk_coords_shape[i + 1:])) + return chunk_index + +def _get_chunk_byte_range(h5_dataset: h5py.Dataset, chunk_coords: tuple) -> tuple: + """Get the byte range in the file for a chunk of an h5py dataset. + + This involves some low-level functions from the h5py library. First we need + to get the chunk index. Then we call _get_chunk_byte_range_for_chunk_index. + """ + chunk_index = _get_chunk_index(h5_dataset, chunk_coords) return _get_chunk_byte_range_for_chunk_index(h5_dataset, chunk_index) def _get_chunk_byte_range_for_chunk_index(h5_dataset: h5py.Dataset, chunk_index: int) -> tuple: """Get the byte range in the file for a chunk of an h5py dataset. - This involves some low-level functions from the h5py library. + This involves some low-level functions from the h5py library. Use _get_all_chunk_info instead of + calling this repeatedly for many chunks of the same dataset. """ # got hints from kerchunk source code dsid = h5_dataset.id diff --git a/pyproject.toml b/pyproject.toml index 755b773..309b050 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -15,6 +15,7 @@ numcodecs = "^0.12.1" zarr = "^2.16.1" h5py = "^3.10.0" requests = "^2.31.0" +tqdm = "^4.66.4" [tool.poetry.group.dev.dependencies] pynwb = "^2.6.0" From a39b7f8524454ff4a82103ccb6bc637e15f26797 Mon Sep 17 00:00:00 2001 From: rly Date: Mon, 13 May 2024 19:29:53 -0700 Subject: [PATCH 02/12] Fix for case when chunk_iter is not available --- lindi/LindiH5ZarrStore/LindiH5ZarrStore.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/lindi/LindiH5ZarrStore/LindiH5ZarrStore.py b/lindi/LindiH5ZarrStore/LindiH5ZarrStore.py index 0ee1d15..f4bdab6 100644 --- a/lindi/LindiH5ZarrStore/LindiH5ZarrStore.py +++ b/lindi/LindiH5ZarrStore/LindiH5ZarrStore.py @@ -492,8 +492,13 @@ def _add_chunk_info_to_refs( ) assert chunk_index == _get_chunk_index(h5_item, chunk_coords) - byte_offset = chunk_info[chunk_index].byte_offset - byte_count = chunk_info[chunk_index].size + # use chunk_info if available on this system because it is more efficient, + # otherwise use the slower _get_chunk_byte_range + if chunk_info is not None: + byte_offset = chunk_info[chunk_index].byte_offset + byte_count = chunk_info[chunk_index].size + else: + byte_offset, byte_count = _get_chunk_byte_range(h5_item, chunk_coords) except Exception as e: raise Exception( f"Error getting byte range for chunk {key_parent}/{key_name}. Shape: {h5_item.shape}, Chunks: {h5_item.chunks}, Chunk coords: {chunk_coords}: {e}" From 2defaf895370851ac40ef700ed53f4c72a614f82 Mon Sep 17 00:00:00 2001 From: rly Date: Mon, 13 May 2024 19:44:40 -0700 Subject: [PATCH 03/12] Refactor and fix --- lindi/LindiH5ZarrStore/LindiH5ZarrStore.py | 30 +++++++++++----------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/lindi/LindiH5ZarrStore/LindiH5ZarrStore.py b/lindi/LindiH5ZarrStore/LindiH5ZarrStore.py index f4bdab6..0395779 100644 --- a/lindi/LindiH5ZarrStore/LindiH5ZarrStore.py +++ b/lindi/LindiH5ZarrStore/LindiH5ZarrStore.py @@ -354,7 +354,7 @@ def _get_chunk_file_bytes(self, key_parent: str, key_name: str): ) return buf - def _get_single_chunk_file_bytes_data( + def _get_chunk_file_bytes_data( self, key_parent: str, key_name: str @@ -394,32 +394,32 @@ def _get_single_chunk_file_bytes_data( raise Exception( f"Chunk name {key_name} does not match dataset dimensions for inline array {key_parent}" ) - inline_data = inline_array.chunk_bytes - return None, None, inline_data + return None, None, inline_array.chunk_bytes # If this is a scalar, then the data should have been inline if h5_item.ndim == 0: raise Exception(f"No inline data for scalar dataset {key_parent}") + # Get the chunk coords from the file name + chunk_name_parts = key_name.split(".") + if len(chunk_name_parts) != h5_item.ndim: + raise Exception(f"Chunk name {key_name} does not match dataset dimensions") + chunk_coords = tuple(int(x) for x in chunk_name_parts) + for i, c in enumerate(chunk_coords): + if c < 0 or c >= h5_item.shape[i]: + raise Exception( + f"Chunk coordinates {chunk_coords} out of range for dataset {key_parent} with dtype {h5_item.dtype}" + ) + if h5_item.chunks is not None: - # Get the byte range in the file for each chunk. + # Get the byte range in the file for the chunk. try: # Get the chunk coords from the file name - chunk_name_parts = key_name.split(".") - if len(chunk_name_parts) != h5_item.ndim: - raise Exception(f"Chunk name {key_name} does not match dataset dimensions") - chunk_coords = tuple(int(x) for x in chunk_name_parts) - for i, c in enumerate(chunk_coords): - if c < 0 or c >= h5_item.shape[i]: - raise Exception( - f"Chunk coordinates {chunk_coords} out of range for dataset {key_parent} with dtype {h5_item.dtype}" - ) byte_offset, byte_count = _get_chunk_byte_range(h5_item, chunk_coords) except Exception as e: raise Exception( f"Error getting byte range for chunk {key_parent}/{key_name}. Shape: {h5_item.shape}, Chunks: {h5_item.chunks}, Chunk coords: {chunk_coords}: {e}" ) - return byte_offset, byte_count, None else: # In this case (contiguous dataset), we need to check that the chunk @@ -431,7 +431,7 @@ def _get_single_chunk_file_bytes_data( ) # Get the byte range in the file for the contiguous dataset byte_offset, byte_count = _get_byte_range_for_contiguous_dataset(h5_item) - return byte_offset, byte_count, None + return byte_offset, byte_count, None def _add_chunk_info_to_refs( self, From adb67ff4d964b462c1598975d4e4ca545350044b Mon Sep 17 00:00:00 2001 From: rly Date: Mon, 13 May 2024 19:45:51 -0700 Subject: [PATCH 04/12] Fix --- lindi/LindiH5ZarrStore/LindiH5ZarrStore.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/lindi/LindiH5ZarrStore/LindiH5ZarrStore.py b/lindi/LindiH5ZarrStore/LindiH5ZarrStore.py index 0395779..899b9ec 100644 --- a/lindi/LindiH5ZarrStore/LindiH5ZarrStore.py +++ b/lindi/LindiH5ZarrStore/LindiH5ZarrStore.py @@ -327,7 +327,7 @@ def _get_chunk_file_bytes(self, key_parent: str, key_name: str): if self._file is None: raise Exception("Store is closed") byte_offset, byte_count, inline_data = self._get_chunk_file_bytes_data( - key_parent, [key_name] + key_parent, key_name ) if inline_data is not None: return inline_data @@ -354,11 +354,7 @@ def _get_chunk_file_bytes(self, key_parent: str, key_name: str): ) return buf - def _get_chunk_file_bytes_data( - self, - key_parent: str, - key_name: str - ): + def _get_chunk_file_bytes_data(self, key_parent: str, key_name: str): if self._h5f is None: raise Exception("Store is closed") h5_item = self._h5f.get('/' + key_parent, None) From ce75b32b736a3847afe7dc8d531b209f0ddf3cae Mon Sep 17 00:00:00 2001 From: rly Date: Mon, 13 May 2024 19:47:01 -0700 Subject: [PATCH 05/12] Cleanup --- lindi/LindiH5ZarrStore/LindiH5ZarrStore.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/lindi/LindiH5ZarrStore/LindiH5ZarrStore.py b/lindi/LindiH5ZarrStore/LindiH5ZarrStore.py index 899b9ec..14a4e3f 100644 --- a/lindi/LindiH5ZarrStore/LindiH5ZarrStore.py +++ b/lindi/LindiH5ZarrStore/LindiH5ZarrStore.py @@ -406,21 +406,17 @@ def _get_chunk_file_bytes_data(self, key_parent: str, key_name: str): raise Exception( f"Chunk coordinates {chunk_coords} out of range for dataset {key_parent} with dtype {h5_item.dtype}" ) - if h5_item.chunks is not None: # Get the byte range in the file for the chunk. try: - # Get the chunk coords from the file name byte_offset, byte_count = _get_chunk_byte_range(h5_item, chunk_coords) except Exception as e: raise Exception( f"Error getting byte range for chunk {key_parent}/{key_name}. Shape: {h5_item.shape}, Chunks: {h5_item.chunks}, Chunk coords: {chunk_coords}: {e}" ) - else: # In this case (contiguous dataset), we need to check that the chunk # coordinates are (0, 0, 0, ...) - chunk_coords = tuple(int(x) for x in key_name.split(".")) if chunk_coords != (0,) * h5_item.ndim: raise Exception( f"Chunk coordinates {chunk_coords} are not (0, 0, 0, ...) for contiguous dataset {key_parent} with dtype {h5_item.dtype} and shape {h5_item.shape}" @@ -502,7 +498,6 @@ def _add_chunk_info_to_refs( # In this case we reference a chunk of data in a separate file add_ref_chunk(f"{key_parent}/{key_name}", (self._url, byte_offset, byte_count)) - else: # In this case (contiguous dataset), we need to check that the chunk # coordinates are (0, 0, 0, ...) From 5dbf83baf26baaa3cf84b73f0bf7b35d09c108a8 Mon Sep 17 00:00:00 2001 From: Jeremy Magland Date: Tue, 14 May 2024 06:35:56 -0400 Subject: [PATCH 06/12] linter fixes --- lindi/LindiH5ZarrStore/LindiH5ZarrStore.py | 12 ++++++++---- lindi/LindiH5ZarrStore/_util.py | 1 + 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/lindi/LindiH5ZarrStore/LindiH5ZarrStore.py b/lindi/LindiH5ZarrStore/LindiH5ZarrStore.py index 14a4e3f..d83332e 100644 --- a/lindi/LindiH5ZarrStore/LindiH5ZarrStore.py +++ b/lindi/LindiH5ZarrStore/LindiH5ZarrStore.py @@ -1,6 +1,6 @@ import json import base64 -from typing import Tuple, Union, List, IO, Any, Dict +from typing import Tuple, Union, List, IO, Any, Dict, Callable import numpy as np import zarr from zarr.storage import Store, MemoryStore @@ -429,10 +429,13 @@ def _add_chunk_info_to_refs( self, key_parent: str, key_names: List[str], - add_ref: callable, - add_ref_chunk: callable + add_ref: Callable, + add_ref_chunk: Callable ): + if self._h5f is None: + raise Exception("Store is closed") h5_item = self._h5f.get('/' + key_parent, None) + assert isinstance(h5_item, h5py.Dataset) # For the case of a scalar dataset, we need to check a few things if h5_item.ndim == 0: @@ -449,7 +452,7 @@ def _add_chunk_info_to_refs( if inline_array.is_inline: if len(key_names) != 1 or key_names[0] != inline_array.chunk_fname: raise Exception( - f"Chunk name {key_name[0]} does not match dataset dimensions for inline array {key_parent}" + f"Chunk name {key_names[0]} does not match dataset dimensions for inline array {key_parent}" ) inline_data = inline_array.chunk_bytes add_ref(f"{key_parent}/{key_names[0]}", inline_data) @@ -470,6 +473,7 @@ def _add_chunk_info_to_refs( leave=True, delay=2 # do not show progress bar until 2 seconds have passed ): + chunk_coords = None # so that chunk_coords is not unbound on exception try: # TODO remove this code through the assert after verifying order of key_names # Get the chunk coords from the file name diff --git a/lindi/LindiH5ZarrStore/_util.py b/lindi/LindiH5ZarrStore/_util.py index b3b43ee..b310084 100644 --- a/lindi/LindiH5ZarrStore/_util.py +++ b/lindi/LindiH5ZarrStore/_util.py @@ -54,6 +54,7 @@ def _get_chunk_index(h5_dataset: h5py.Dataset, chunk_coords: tuple) -> int: chunk_index += int(chunk_coords[i] * np.prod(chunk_coords_shape[i + 1:])) return chunk_index + def _get_chunk_byte_range(h5_dataset: h5py.Dataset, chunk_coords: tuple) -> tuple: """Get the byte range in the file for a chunk of an h5py dataset. From 3652ebbfc342eece0fcf3d625aa85b8e57b1a1b3 Mon Sep 17 00:00:00 2001 From: rly Date: Tue, 14 May 2024 06:39:29 -0700 Subject: [PATCH 07/12] cleanup comments --- lindi/LindiH5ZarrStore/LindiH5ZarrStore.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/lindi/LindiH5ZarrStore/LindiH5ZarrStore.py b/lindi/LindiH5ZarrStore/LindiH5ZarrStore.py index d83332e..7c3a65f 100644 --- a/lindi/LindiH5ZarrStore/LindiH5ZarrStore.py +++ b/lindi/LindiH5ZarrStore/LindiH5ZarrStore.py @@ -601,8 +601,6 @@ def to_reference_file_system(self) -> dict: raise Exception("You must specify a url to create a reference file system") ret = {"refs": {}, "version": 1} - # TODO: use templates to decrease the size of the JSON - def _add_ref(key: str, content: Union[bytes, None]): if content is None: raise Exception(f"Unable to get content for key {key}") @@ -629,7 +627,7 @@ def _add_ref(key: str, content: Union[bytes, None]): def _add_ref_chunk(key: str, data: Tuple[str, int, int]): assert data[1] is not None assert data[2] is not None - ret["refs"][key] = list(data) # TODO make downstream accept tuple + ret["refs"][key] = list(data) # downstream expects a list like on read from a JSON file def _process_group(key, item: h5py.Group): if isinstance(item, h5py.Group): From 2a31250989f921fbc33660158739f6e7b025b347 Mon Sep 17 00:00:00 2001 From: rly Date: Tue, 14 May 2024 08:31:49 -0700 Subject: [PATCH 08/12] Simplify method for adding all chunk info --- lindi/LindiH5ZarrStore/LindiH5ZarrStore.py | 103 +++++---------------- lindi/LindiH5ZarrStore/_util.py | 54 +++++------ 2 files changed, 52 insertions(+), 105 deletions(-) diff --git a/lindi/LindiH5ZarrStore/LindiH5ZarrStore.py b/lindi/LindiH5ZarrStore/LindiH5ZarrStore.py index 7c3a65f..4814c86 100644 --- a/lindi/LindiH5ZarrStore/LindiH5ZarrStore.py +++ b/lindi/LindiH5ZarrStore/LindiH5ZarrStore.py @@ -8,12 +8,10 @@ from tqdm import tqdm from ._util import ( _read_bytes, - _get_all_chunk_info, - _get_chunk_index, + _apply_to_all_chunk_info, _get_chunk_byte_range, _get_byte_range_for_contiguous_dataset, _join, - _get_chunk_names_for_dataset, _write_rfs_to_file, ) from ..conversion.attr_conversion import h5_to_zarr_attr @@ -425,13 +423,7 @@ def _get_chunk_file_bytes_data(self, key_parent: str, key_name: str): byte_offset, byte_count = _get_byte_range_for_contiguous_dataset(h5_item) return byte_offset, byte_count, None - def _add_chunk_info_to_refs( - self, - key_parent: str, - key_names: List[str], - add_ref: Callable, - add_ref_chunk: Callable - ): + def _add_chunk_info_to_refs(self, key_parent: str, add_ref: Callable, add_ref_chunk: Callable): if self._h5f is None: raise Exception("Store is closed") h5_item = self._h5f.get('/' + key_parent, None) @@ -443,19 +435,12 @@ def _add_chunk_info_to_refs( raise Exception( f"Unable to handle case where chunks is not None but ndim is 0 for dataset {key_parent}" ) - if len(key_names) != 1 or key_names[0] != "0": - raise Exception( - f"Chunk name {key_names[0]} must be '0' for scalar dataset {key_parent}" - ) inline_array = self._get_inline_array(key_parent, h5_item) if inline_array.is_inline: - if len(key_names) != 1 or key_names[0] != inline_array.chunk_fname: - raise Exception( - f"Chunk name {key_names[0]} does not match dataset dimensions for inline array {key_parent}" - ) + key_name = inline_array.chunk_fname inline_data = inline_array.chunk_bytes - add_ref(f"{key_parent}/{key_names[0]}", inline_data) + add_ref(f"{key_parent}/{key_name}", inline_data) return # If this is a scalar, then the data should have been inline @@ -463,62 +448,34 @@ def _add_chunk_info_to_refs( raise Exception(f"No inline data for scalar dataset {key_parent}") if h5_item.chunks is not None: - # Get the byte range in the file for each chunk. - # Get a list of all the chunk info. - chunk_info = _get_all_chunk_info(h5_item) - for chunk_index, key_name in tqdm( - enumerate(key_names), - total=len(key_names), + # Set up progress bar for manual updates because h5py chunk_iter used in _apply_to_all_chunk_info + # does not provide a way to hook in a progress bar + dsid = h5_item.id + num_chunks = dsid.get_num_chunks() # NOTE: this is very slow if dataset is remote and has many chunks + pbar = tqdm( + total=num_chunks, desc=f"Writing chunk info for {key_parent}", leave=True, delay=2 # do not show progress bar until 2 seconds have passed - ): - chunk_coords = None # so that chunk_coords is not unbound on exception - try: - # TODO remove this code through the assert after verifying order of key_names - # Get the chunk coords from the file name - chunk_name_parts = key_name.split(".") - if len(chunk_name_parts) != h5_item.ndim: - raise Exception(f"Chunk name {key_name} does not match dataset dimensions") - chunk_coords = tuple(int(x) for x in chunk_name_parts) - for i, c in enumerate(chunk_coords): - if c < 0 or c >= h5_item.shape[i]: - raise Exception( - f"Chunk coordinates {chunk_coords} out of range for dataset {key_parent} with dtype {h5_item.dtype}" - ) - assert chunk_index == _get_chunk_index(h5_item, chunk_coords) - - # use chunk_info if available on this system because it is more efficient, - # otherwise use the slower _get_chunk_byte_range - if chunk_info is not None: - byte_offset = chunk_info[chunk_index].byte_offset - byte_count = chunk_info[chunk_index].size - else: - byte_offset, byte_count = _get_chunk_byte_range(h5_item, chunk_coords) - except Exception as e: - raise Exception( - f"Error getting byte range for chunk {key_parent}/{key_name}. Shape: {h5_item.shape}, Chunks: {h5_item.chunks}, Chunk coords: {chunk_coords}: {e}" - ) + ) - # In this case we reference a chunk of data in a separate file + chunk_size = h5_item.chunks + def store_chunk_info(chunk_info): + # Get the byte range in the file for each chunk. + chunk_offset: Tuple[int, ...] = chunk_info.chunk_offset + byte_offset = chunk_info.byte_offset + byte_count = chunk_info.size + key_name = ".".join([str(a // b) for a, b in zip(chunk_offset, chunk_size)]) add_ref_chunk(f"{key_parent}/{key_name}", (self._url, byte_offset, byte_count)) + pbar.update() + + _apply_to_all_chunk_info(h5_item, store_chunk_info) + pbar.close() else: - # In this case (contiguous dataset), we need to check that the chunk - # coordinates are (0, 0, 0, ...) - if len(key_names) != 1: - raise Exception( - f"Contiguous dataset {key_parent} must have exactly one key name, but got {key_names}" - ) - key_name = key_names[0] - chunk_coords = tuple(int(x) for x in key_name.split(".")) - if chunk_coords != (0,) * h5_item.ndim: - raise Exception( - f"Chunk coordinates {chunk_coords} are not (0, 0, 0, ...) for contiguous dataset {key_parent} with dtype {h5_item.dtype} and shape {h5_item.shape}" - ) # Get the byte range in the file for the contiguous dataset byte_offset, byte_count = _get_byte_range_for_contiguous_dataset(h5_item) + key_name = ".".join("0" for _ in range(h5_item.ndim)) add_ref_chunk(f"{key_parent}/{key_name}", (self._url, byte_offset, byte_count)) - return byte_offset, byte_count, None def _get_external_array_link(self, parent_key: str, h5_item: h5py.Dataset): # First check the memory cache @@ -667,19 +624,7 @@ def _process_dataset(key): if external_array_link is None: # Only add chunk references for datasets without an external array link - shape = zarray_dict["shape"] - chunks = zarray_dict.get("chunks", None) - chunk_coords_shape = [ - # the shape could be zero -- for example dandiset 000559 - acquisition/depth_video/data has shape [0, 0, 0] - (shape[i] + chunks[i] - 1) // chunks[i] if chunks[i] != 0 else 0 - for i in range(len(shape)) - ] - # For example, chunk_names could be ['0', '1', '2', ...] - # or ['0.0', '0.1', '0.2', ...] - chunk_names = _get_chunk_names_for_dataset( - chunk_coords_shape - ) - self._add_chunk_info_to_refs(key, chunk_names, _add_ref, _add_ref_chunk) + self._add_chunk_info_to_refs(key, _add_ref, _add_ref_chunk) # Process the groups recursively starting with the root group _process_group("", self._h5f) diff --git a/lindi/LindiH5ZarrStore/_util.py b/lindi/LindiH5ZarrStore/_util.py index b310084..719a3cf 100644 --- a/lindi/LindiH5ZarrStore/_util.py +++ b/lindi/LindiH5ZarrStore/_util.py @@ -1,7 +1,8 @@ -from typing import IO, List, Union +from typing import IO, List, Callable import json import numpy as np import h5py +import warnings def _read_bytes(file: IO, offset: int, count: int): @@ -10,33 +11,43 @@ def _read_bytes(file: IO, offset: int, count: int): return file.read(count) -def _get_all_chunk_info(h5_dataset: h5py.Dataset) -> Union[list, None]: - """Get the chunk info for all the chunks of an h5py dataset as a list of StoreInfo objects. - The chunks are in order such that the last dimension changes the fastest, e.g., chunk coordinates could be: +def _apply_to_all_chunk_info(h5_dataset: h5py.Dataset, callback: Callable): + """Apply the callback function to each chunk of an h5py dataset. + The chunks are iterated in order such that the last dimension changes the fastest, + e.g., chunk coordinates could be: [0, 0, 0], [0, 0, 1], [0, 0, 2], ..., [0, 1, 0], [0, 1, 1], [0, 1, 2], ..., [1, 0, 0], [1, 0, 1], [1, 0, 2], ... - Use stinfo[i].byte_offset and stinfo[i].size to get the byte range in the file for the i-th chunk. + This method tries to use the `chunk_iter` method if it is available. The `chunk_iter` method requires + HDF5 1.12.3 and above. If it is not available, this method falls back to the `get_chunk_info` method, + which is significantly slower and not recommended if the dataset has many chunks. - Requires HDF5 1.12.3 and above. If the chunk_iter method is not available, return None. + `chunk_iter` takes 1-5 seconds for all chunks for a dataset with 1e6 chunks. + `get_chunk_info` takes about 0.2 seconds per chunk for a dataset with 1e6 chunks. - This takes 1-5 seconds for a dataset with 1e6 chunks. - - This might be very slow if the dataset is stored remotely. + NOTE: This method might be very slow if the dataset is stored remotely. """ - stinfo = list() + assert h5_dataset.chunks is not None dsid = h5_dataset.id try: - dsid.chunk_iter(stinfo.append) + dsid.chunk_iter(callback) except AttributeError: # chunk_iter is not available - return None - return stinfo + num_chunks = dsid.get_num_chunks() # NOTE: this is very slow if dataset is remote and has many chunks + if num_chunks > 100: + warnings.warn( + f"Dataset {h5_dataset.name} has {num_chunks} chunks. Using get_chunk_info is slow. " + f"Consider upgrading to HDF5 1.12.3 or above for faster performance." + ) + for index in range(num_chunks): + chunk_info = dsid.get_chunk_info(index) + callback(chunk_info) -def _get_chunk_index(h5_dataset: h5py.Dataset, chunk_coords: tuple) -> int: - """Get the chunk index for a chunk of an h5py dataset. +def _get_chunk_byte_range(h5_dataset: h5py.Dataset, chunk_coords: tuple) -> tuple: + """Get the byte range in the file for a chunk of an h5py dataset. - This involves some low-level functions from the h5py library. + This involves some low-level functions from the h5py library. First we need + to get the chunk index. Then we call _get_chunk_byte_range_for_chunk_index. """ shape = h5_dataset.shape chunk_shape = h5_dataset.chunks @@ -52,16 +63,6 @@ def _get_chunk_index(h5_dataset: h5py.Dataset, chunk_coords: tuple) -> int: chunk_index = 0 for i in range(ndim): chunk_index += int(chunk_coords[i] * np.prod(chunk_coords_shape[i + 1:])) - return chunk_index - - -def _get_chunk_byte_range(h5_dataset: h5py.Dataset, chunk_coords: tuple) -> tuple: - """Get the byte range in the file for a chunk of an h5py dataset. - - This involves some low-level functions from the h5py library. First we need - to get the chunk index. Then we call _get_chunk_byte_range_for_chunk_index. - """ - chunk_index = _get_chunk_index(h5_dataset, chunk_coords) return _get_chunk_byte_range_for_chunk_index(h5_dataset, chunk_index) @@ -99,6 +100,7 @@ def _join(a: str, b: str) -> str: return f"{a}/{b}" +# NOTE: this is no longer used def _get_chunk_names_for_dataset(chunk_coords_shape: List[int]) -> List[str]: """Get the chunk names for a dataset with the given chunk coords shape. From c9c8955792ed7b302293b7a72980e36802a9c97e Mon Sep 17 00:00:00 2001 From: rly Date: Tue, 14 May 2024 08:33:03 -0700 Subject: [PATCH 09/12] Fix lint --- lindi/LindiH5ZarrStore/LindiH5ZarrStore.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lindi/LindiH5ZarrStore/LindiH5ZarrStore.py b/lindi/LindiH5ZarrStore/LindiH5ZarrStore.py index 4814c86..2d6fbff 100644 --- a/lindi/LindiH5ZarrStore/LindiH5ZarrStore.py +++ b/lindi/LindiH5ZarrStore/LindiH5ZarrStore.py @@ -460,6 +460,7 @@ def _add_chunk_info_to_refs(self, key_parent: str, add_ref: Callable, add_ref_ch ) chunk_size = h5_item.chunks + def store_chunk_info(chunk_info): # Get the byte range in the file for each chunk. chunk_offset: Tuple[int, ...] = chunk_info.chunk_offset @@ -620,7 +621,6 @@ def _process_dataset(key): zarray_bytes = self.get(f"{key}/.zarray") assert zarray_bytes is not None _add_ref(f"{key}/.zarray", zarray_bytes) - zarray_dict = json.loads(zarray_bytes.decode("utf-8")) if external_array_link is None: # Only add chunk references for datasets without an external array link From 53663be2e5d5f750c954f35b358dfb05d7c4284a Mon Sep 17 00:00:00 2001 From: rly Date: Tue, 14 May 2024 08:57:55 -0700 Subject: [PATCH 10/12] Use _get_max_num_chunks --- lindi/LindiH5ZarrStore/LindiH5ZarrStore.py | 4 ++-- lindi/LindiH5ZarrStore/_util.py | 14 +++++++++++++- 2 files changed, 15 insertions(+), 3 deletions(-) diff --git a/lindi/LindiH5ZarrStore/LindiH5ZarrStore.py b/lindi/LindiH5ZarrStore/LindiH5ZarrStore.py index 2d6fbff..21ffd61 100644 --- a/lindi/LindiH5ZarrStore/LindiH5ZarrStore.py +++ b/lindi/LindiH5ZarrStore/LindiH5ZarrStore.py @@ -8,6 +8,7 @@ from tqdm import tqdm from ._util import ( _read_bytes, + _get_max_num_chunks, _apply_to_all_chunk_info, _get_chunk_byte_range, _get_byte_range_for_contiguous_dataset, @@ -450,8 +451,7 @@ def _add_chunk_info_to_refs(self, key_parent: str, add_ref: Callable, add_ref_ch if h5_item.chunks is not None: # Set up progress bar for manual updates because h5py chunk_iter used in _apply_to_all_chunk_info # does not provide a way to hook in a progress bar - dsid = h5_item.id - num_chunks = dsid.get_num_chunks() # NOTE: this is very slow if dataset is remote and has many chunks + num_chunks = _get_max_num_chunks(h5_item) # NOTE: unallocated chunks are counted pbar = tqdm( total=num_chunks, desc=f"Writing chunk info for {key_parent}", diff --git a/lindi/LindiH5ZarrStore/_util.py b/lindi/LindiH5ZarrStore/_util.py index 719a3cf..90b4b8b 100644 --- a/lindi/LindiH5ZarrStore/_util.py +++ b/lindi/LindiH5ZarrStore/_util.py @@ -2,6 +2,7 @@ import json import numpy as np import h5py +import math import warnings @@ -11,6 +12,17 @@ def _read_bytes(file: IO, offset: int, count: int): return file.read(count) +def _get_max_num_chunks(h5_dataset: h5py.Dataset): + """Get the maximum number of chunks in an h5py dataset. + + This is similar to h5_dataset.id.get_num_chunks() but significantly faster. It does not account for + whether some chunks are allocated. + """ + chunk_size = h5_dataset.chunks + assert chunk_size is not None + return math.prod([math.ceil(a / b) for a, b in zip(h5_dataset.shape, chunk_size)]) + + def _apply_to_all_chunk_info(h5_dataset: h5py.Dataset, callback: Callable): """Apply the callback function to each chunk of an h5py dataset. The chunks are iterated in order such that the last dimension changes the fastest, @@ -32,7 +44,7 @@ def _apply_to_all_chunk_info(h5_dataset: h5py.Dataset, callback: Callable): dsid.chunk_iter(callback) except AttributeError: # chunk_iter is not available - num_chunks = dsid.get_num_chunks() # NOTE: this is very slow if dataset is remote and has many chunks + num_chunks = _get_max_num_chunks(dsid) if num_chunks > 100: warnings.warn( f"Dataset {h5_dataset.name} has {num_chunks} chunks. Using get_chunk_info is slow. " From 1eced73122f93d4a894eecdf1dfd70e76c0ca84c Mon Sep 17 00:00:00 2001 From: rly Date: Tue, 14 May 2024 11:17:33 -0700 Subject: [PATCH 11/12] Ues get_num_chunks when iterating chunks manually --- lindi/LindiH5ZarrStore/LindiH5ZarrStore.py | 2 ++ lindi/LindiH5ZarrStore/_util.py | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/lindi/LindiH5ZarrStore/LindiH5ZarrStore.py b/lindi/LindiH5ZarrStore/LindiH5ZarrStore.py index 21ffd61..b188e65 100644 --- a/lindi/LindiH5ZarrStore/LindiH5ZarrStore.py +++ b/lindi/LindiH5ZarrStore/LindiH5ZarrStore.py @@ -451,6 +451,8 @@ def _add_chunk_info_to_refs(self, key_parent: str, add_ref: Callable, add_ref_ch if h5_item.chunks is not None: # Set up progress bar for manual updates because h5py chunk_iter used in _apply_to_all_chunk_info # does not provide a way to hook in a progress bar + # We use max number of chunks instead of actual number of chunks because get_num_chunks is slow + # for remote datasets. num_chunks = _get_max_num_chunks(h5_item) # NOTE: unallocated chunks are counted pbar = tqdm( total=num_chunks, diff --git a/lindi/LindiH5ZarrStore/_util.py b/lindi/LindiH5ZarrStore/_util.py index 90b4b8b..685dbe9 100644 --- a/lindi/LindiH5ZarrStore/_util.py +++ b/lindi/LindiH5ZarrStore/_util.py @@ -44,7 +44,7 @@ def _apply_to_all_chunk_info(h5_dataset: h5py.Dataset, callback: Callable): dsid.chunk_iter(callback) except AttributeError: # chunk_iter is not available - num_chunks = _get_max_num_chunks(dsid) + num_chunks = dsid.get_num_chunks() # NOTE: this can be slow for remote datasets with many chunks if num_chunks > 100: warnings.warn( f"Dataset {h5_dataset.name} has {num_chunks} chunks. Using get_chunk_info is slow. " From 2897504c1d1d5ac70bc64682692df387ba15b8c2 Mon Sep 17 00:00:00 2001 From: rly Date: Tue, 14 May 2024 11:21:46 -0700 Subject: [PATCH 12/12] FIx comment --- lindi/LindiH5ZarrStore/_util.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lindi/LindiH5ZarrStore/_util.py b/lindi/LindiH5ZarrStore/_util.py index 685dbe9..0badbae 100644 --- a/lindi/LindiH5ZarrStore/_util.py +++ b/lindi/LindiH5ZarrStore/_util.py @@ -81,7 +81,7 @@ def _get_chunk_byte_range(h5_dataset: h5py.Dataset, chunk_coords: tuple) -> tupl def _get_chunk_byte_range_for_chunk_index(h5_dataset: h5py.Dataset, chunk_index: int) -> tuple: """Get the byte range in the file for a chunk of an h5py dataset. - This involves some low-level functions from the h5py library. Use _get_all_chunk_info instead of + This involves some low-level functions from the h5py library. Use _apply_to_all_chunk_info instead of calling this repeatedly for many chunks of the same dataset. """ # got hints from kerchunk source code