Skip to content

Commit

Permalink
Batch set items
Browse files Browse the repository at this point in the history
  • Loading branch information
magland committed Aug 5, 2024
1 parent 075419b commit 85d9bee
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 48 deletions.
65 changes: 37 additions & 28 deletions lindi/tar/LindiTarStore.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,34 +13,43 @@ def __getitem__(self, key: str):
return self._base_store.__getitem__(key)

def __setitem__(self, key: str, value: bytes):
key_parts = key.split("/")
key_base_name = key_parts[-1]
if key_base_name.startswith('.') or key_base_name.endswith('.json'): # always inline .zattrs, .zgroup, .zarray, zarr.json
inline = True
else:
# presumably it is a chunk of an array
if isinstance(value, np.ndarray):
value = value.tobytes()
if not isinstance(value, bytes):
print(f"key: {key}, value type: {type(value)}")
raise ValueError("Value must be bytes")
size = len(value)
inline = size < 1000 # this should be a configurable threshold
if inline:
# If inline, save in memory
return self._base_store.__setitem__(key, value)
else:
# If not inline, save it as a new file in the tar file
key_without_initial_slash = key if not key.startswith("/") else key[1:]
fname_in_tar = f'blobs/{key_without_initial_slash}'
if self._tar_file.has_file_with_name(fname_in_tar):
v = 2
while self._tar_file.has_file_with_name(f'{fname_in_tar}.v{v}'):
v += 1
fname_in_tar = f'{fname_in_tar}.v{v}'
self._tar_file.write_file(fname_in_tar, value)

self._set_ref_reference(key_without_initial_slash, f'./{fname_in_tar}', 0, len(value))
self.setitems({key: value})

def setitems(self, items_dict: dict):
for key, value in items_dict.items():
key_parts = key.split("/")
key_base_name = key_parts[-1]

files_to_write_to_tar = {}

if key_base_name.startswith('.') or key_base_name.endswith('.json'): # always inline .zattrs, .zgroup, .zarray, zarr.json
inline = True
else:
# presumably it is a chunk of an array
if isinstance(value, np.ndarray):
value = value.tobytes()
if not isinstance(value, bytes):
print(f"key: {key}, value type: {type(value)}")
raise ValueError("Value must be bytes")
size = len(value)
inline = size < 1000 # this should be a configurable threshold
if inline:
# If inline, save in memory
return self._base_store.__setitem__(key, value)
else:
# If not inline, save it as a new file in the tar file
key_without_initial_slash = key if not key.startswith("/") else key[1:]
fname_in_tar = f'blobs/{key_without_initial_slash}'
if self._tar_file.has_file_with_name(fname_in_tar):
v = 2
while self._tar_file.has_file_with_name(f'{fname_in_tar}.v{v}'):
v += 1
fname_in_tar = f'{fname_in_tar}.v{v}'
files_to_write_to_tar[fname_in_tar] = value

self._set_ref_reference(key_without_initial_slash, f'./{fname_in_tar}', 0, len(value))

self._tar_file.write_files(files_to_write_to_tar)

def __delitem__(self, key: str):
# We don't actually delete the file from the tar, but maybe it would be
Expand Down
48 changes: 28 additions & 20 deletions lindi/tar/lindi_tar.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,9 @@ def _change_name_of_file(self, file_name: str, new_file_name: str, do_write_inde
self._index_has_changed = True

def write_file(self, file_name: str, data: bytes):
self.write_files({file_name: data})

def write_files(self, files: dict):
if self._is_remote:
raise ValueError("Cannot write a file in a remote tar file")
if self._file is None:
Expand All @@ -141,31 +144,36 @@ def write_file(self, file_name: str, data: bytes):
self._file.seek(-1024, 2)

file_pos = self._file.tell()
x = {
'n': file_name,
'o': file_pos,
'd': file_pos + 512, # we assume the header is 512 bytes
's': len(data)
}

# write the tar header
tar_header = create_tar_header(file_name, len(data))
self._file.write(tar_header)
# write the data
self._file.write(data)

# pad up to blocks of 512
if len(data) % 512 != 0:
padding = b"\x00" * (512 - len(data) % 512)
self._file.write(padding)
for file_name, data in files.items():
x = {
'n': file_name,
'o': file_pos,
'd': file_pos + 512, # we assume the header is 512 bytes
's': len(data)
}

# write the tar header
tar_header = create_tar_header(file_name, len(data))

# pad up to blocks of 512
if len(data) % 512 != 0:
padding_len = 512 - len(data) % 512
else:
padding_len = 0

self._file.write(tar_header)
self._file.write(data)
self._file.write(b"\x00" * padding_len)
file_pos += 512 + len(data) + padding_len

self._index['files'].append(x)
self._index_lookup[file_name] = x
self._index_has_changed = True

# write the 1024 bytes marking the end of the file
self._file.write(b"\x00" * 1024)

self._index['files'].append(x)
self._index_lookup[file_name] = x
self._index_has_changed = True

def read_file(self, file_name: str) -> bytes:
info = self.get_file_info(file_name)
if info is None:
Expand Down

0 comments on commit 85d9bee

Please sign in to comment.