From 85d9bee6aa6bbcb33b17c75271ed6fe1ccd29143 Mon Sep 17 00:00:00 2001 From: Jeremy Magland Date: Mon, 5 Aug 2024 16:07:42 -0400 Subject: [PATCH] Batch set items --- lindi/tar/LindiTarStore.py | 65 ++++++++++++++++++++++---------------- lindi/tar/lindi_tar.py | 48 ++++++++++++++++------------ 2 files changed, 65 insertions(+), 48 deletions(-) diff --git a/lindi/tar/LindiTarStore.py b/lindi/tar/LindiTarStore.py index cd8e48d..d64b4d2 100644 --- a/lindi/tar/LindiTarStore.py +++ b/lindi/tar/LindiTarStore.py @@ -13,34 +13,43 @@ def __getitem__(self, key: str): return self._base_store.__getitem__(key) def __setitem__(self, key: str, value: bytes): - key_parts = key.split("/") - key_base_name = key_parts[-1] - if key_base_name.startswith('.') or key_base_name.endswith('.json'): # always inline .zattrs, .zgroup, .zarray, zarr.json - inline = True - else: - # presumably it is a chunk of an array - if isinstance(value, np.ndarray): - value = value.tobytes() - if not isinstance(value, bytes): - print(f"key: {key}, value type: {type(value)}") - raise ValueError("Value must be bytes") - size = len(value) - inline = size < 1000 # this should be a configurable threshold - if inline: - # If inline, save in memory - return self._base_store.__setitem__(key, value) - else: - # If not inline, save it as a new file in the tar file - key_without_initial_slash = key if not key.startswith("/") else key[1:] - fname_in_tar = f'blobs/{key_without_initial_slash}' - if self._tar_file.has_file_with_name(fname_in_tar): - v = 2 - while self._tar_file.has_file_with_name(f'{fname_in_tar}.v{v}'): - v += 1 - fname_in_tar = f'{fname_in_tar}.v{v}' - self._tar_file.write_file(fname_in_tar, value) - - self._set_ref_reference(key_without_initial_slash, f'./{fname_in_tar}', 0, len(value)) + self.setitems({key: value}) + + def setitems(self, items_dict: dict): + for key, value in items_dict.items(): + key_parts = key.split("/") + key_base_name = key_parts[-1] + + files_to_write_to_tar = {} + + if key_base_name.startswith('.') or key_base_name.endswith('.json'): # always inline .zattrs, .zgroup, .zarray, zarr.json + inline = True + else: + # presumably it is a chunk of an array + if isinstance(value, np.ndarray): + value = value.tobytes() + if not isinstance(value, bytes): + print(f"key: {key}, value type: {type(value)}") + raise ValueError("Value must be bytes") + size = len(value) + inline = size < 1000 # this should be a configurable threshold + if inline: + # If inline, save in memory + return self._base_store.__setitem__(key, value) + else: + # If not inline, save it as a new file in the tar file + key_without_initial_slash = key if not key.startswith("/") else key[1:] + fname_in_tar = f'blobs/{key_without_initial_slash}' + if self._tar_file.has_file_with_name(fname_in_tar): + v = 2 + while self._tar_file.has_file_with_name(f'{fname_in_tar}.v{v}'): + v += 1 + fname_in_tar = f'{fname_in_tar}.v{v}' + files_to_write_to_tar[fname_in_tar] = value + + self._set_ref_reference(key_without_initial_slash, f'./{fname_in_tar}', 0, len(value)) + + self._tar_file.write_files(files_to_write_to_tar) def __delitem__(self, key: str): # We don't actually delete the file from the tar, but maybe it would be diff --git a/lindi/tar/lindi_tar.py b/lindi/tar/lindi_tar.py index d55e765..8ab2ef0 100644 --- a/lindi/tar/lindi_tar.py +++ b/lindi/tar/lindi_tar.py @@ -130,6 +130,9 @@ def _change_name_of_file(self, file_name: str, new_file_name: str, do_write_inde self._index_has_changed = True def write_file(self, file_name: str, data: bytes): + self.write_files({file_name: data}) + + def write_files(self, files: dict): if self._is_remote: raise ValueError("Cannot write a file in a remote tar file") if self._file is None: @@ -141,31 +144,36 @@ def write_file(self, file_name: str, data: bytes): self._file.seek(-1024, 2) file_pos = self._file.tell() - x = { - 'n': file_name, - 'o': file_pos, - 'd': file_pos + 512, # we assume the header is 512 bytes - 's': len(data) - } - - # write the tar header - tar_header = create_tar_header(file_name, len(data)) - self._file.write(tar_header) - # write the data - self._file.write(data) - # pad up to blocks of 512 - if len(data) % 512 != 0: - padding = b"\x00" * (512 - len(data) % 512) - self._file.write(padding) + for file_name, data in files.items(): + x = { + 'n': file_name, + 'o': file_pos, + 'd': file_pos + 512, # we assume the header is 512 bytes + 's': len(data) + } + + # write the tar header + tar_header = create_tar_header(file_name, len(data)) + + # pad up to blocks of 512 + if len(data) % 512 != 0: + padding_len = 512 - len(data) % 512 + else: + padding_len = 0 + + self._file.write(tar_header) + self._file.write(data) + self._file.write(b"\x00" * padding_len) + file_pos += 512 + len(data) + padding_len + + self._index['files'].append(x) + self._index_lookup[file_name] = x + self._index_has_changed = True # write the 1024 bytes marking the end of the file self._file.write(b"\x00" * 1024) - self._index['files'].append(x) - self._index_lookup[file_name] = x - self._index_has_changed = True - def read_file(self, file_name: str) -> bytes: info = self.get_file_info(file_name) if info is None: