Skip to content

Commit

Permalink
Update index handling in tar operations
Browse files Browse the repository at this point in the history
  • Loading branch information
magland committed Aug 7, 2024
1 parent 673a343 commit cf4ddb1
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 31 deletions.
4 changes: 3 additions & 1 deletion lindi/LindiH5pyFile/LindiH5pyFile.py
Original file line number Diff line number Diff line change
Expand Up @@ -407,14 +407,16 @@ def close(self):
self._is_open = False

def flush(self):
if not self._is_open:
return
if self._mode != 'r' and self._source_url_or_path is not None:
is_url = self._source_url_or_path.startswith("http://") or self._source_url_or_path.startswith("https://")
if is_url:
raise Exception("Cannot write to URL")
rfs = self.to_reference_file_system()
if self._source_tar_file:
self._source_tar_file.write_rfs(rfs)
self._source_tar_file._update_index() # very important
self._source_tar_file._update_index_in_file() # very important
else:
_write_rfs_to_file(rfs=rfs, output_file_name=self._source_url_or_path)

Expand Down
76 changes: 46 additions & 30 deletions lindi/tar/lindi_tar.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,35 @@ def __init__(self, tar_path_or_url: str):
for file in self._index['files']:
self._index_lookup[file['n']] = file

# Verify that the index file correctly has a reference to itself
index_info_2 = self._index_lookup.get(".tar_index.json", None)
if index_info_2 is None:
raise ValueError("File .tar_index.json not found in index")
for k in ['n', 'o', 'd', 's']:
if k not in index_info_2:
raise ValueError(f"File .tar_index.json does not have key {k}")
if k not in index_info:
raise ValueError(f"File .tar_index.json does not have key {k}")
if index_info_2[k] != index_info[k]:
raise ValueError(f"File .tar_index.json has unexpected value for key {k}")

# Verify that the index file correctly as a reference to the entry file
entry_info = self._index_lookup.get(".tar_entry.json", None)
if entry_info is None:
raise ValueError("File .tar_entry.json not found in index")
if entry_info['n'] != ".tar_entry.json":
raise ValueError("File .tar_entry.json has unexpected name")
if entry_info['o'] != 0:
raise ValueError("File .tar_entry.json has unexpected offset")
if entry_info['d'] != 512:
raise ValueError("File .tar_entry.json has unexpected data offset")
if entry_info['s'] != TAR_ENTRY_JSON_SIZE:
raise ValueError("File .tar_entry.json has unexpected size")

self._file = open(self._tar_path_or_url, "r+b") if not self._is_remote else None

def close(self):
self._update_index()
self._update_index_in_file()
if self._file is not None:
self._file.close()

Expand All @@ -57,7 +82,7 @@ def overwrite_file_content(self, file_name: str, data: bytes):
self._file.seek(info['d'])
self._file.write(data)

def trash_file(self, file_name: str, do_write_index=True):
def trash_file(self, file_name: str):
if self._is_remote:
raise ValueError("Cannot trash a file in a remote tar file")
if self._file is None:
Expand All @@ -68,7 +93,13 @@ def trash_file(self, file_name: str, do_write_index=True):
zeros = b"-" * info['s']
self._file.seek(info['d'])
self._file.write(zeros)
self._change_name_of_file(file_name, f'.trash/{file_name}.{_create_random_string()}', do_write_index=do_write_index)
self._change_name_of_file(
file_name,
f'.trash/{file_name}.{_create_random_string()}'
)
self._index['files'] = [file for file in self._index['files'] if file['n'] != file_name]
del self._index_lookup[file_name]
self._index_has_changed = True

def write_rfs(self, rfs: dict):
rfs_json = json.dumps(rfs, indent=2, sort_keys=True)
Expand Down Expand Up @@ -103,7 +134,7 @@ def get_file_byte_range(self, file_name: str) -> tuple:
def has_file_with_name(self, file_name: str) -> bool:
return self.get_file_info(file_name) is not None

def _change_name_of_file(self, file_name: str, new_file_name: str, do_write_index=True):
def _change_name_of_file(self, file_name: str, new_file_name: str):
if self._is_remote:
raise ValueError("Cannot change the name of a file in a remote tar file")
if self._file is None:
Expand All @@ -123,11 +154,6 @@ def _change_name_of_file(self, file_name: str, new_file_name: str, do_write_inde
self._file.write(b"\x00" * (file_name_prefix_byte_range[1] - file_name_prefix_byte_range[0]))

_fix_checksum_in_header(self._file, header_start_byte)
file_in_index = self._index_lookup.get(file_name, None)
if file_in_index is None:
raise ValueError(f"File {file_name} not found in index")
file_in_index['n'] = new_file_name
self._index_has_changed = True

def write_file(self, file_name: str, data: bytes):
self.write_files({file_name: data})
Expand Down Expand Up @@ -177,7 +203,7 @@ def write_files(self, files: dict):
def read_file(self, file_name: str) -> bytes:
info = self.get_file_info(file_name)
if info is None:
raise FileNotFoundError(f"File {file_name} not found")
raise FileNotFoundError(f"File {file_name} not found for {self._tar_path_or_url}")
start_byte = info['d']
size = info['s']
return _load_bytes_from_local_or_remote_file(self._tar_path_or_url, start_byte, start_byte + size)
Expand Down Expand Up @@ -245,7 +271,7 @@ def create(fname: str, *, rfs: dict):
tf.write_rfs(rfs)
tf.close()

def _update_index(self):
def _update_index_in_file(self):
if not self._index_has_changed:
return
if self._is_remote:
Expand All @@ -260,26 +286,19 @@ def _update_index(self):
self.overwrite_file_content(".tar_index.json", new_index_json)
else:
# we must create a new index file
self.trash_file(".tar_index.json", do_write_index=False)
self.trash_file(".tar_index.json")

# after we trash the file, the index has changed once again
new_index_json = json.dumps(self._index, indent=2, sort_keys=True)

new_index_json = _pad_bytes_to_leave_room_for_growth(new_index_json, INITIAL_TAR_INDEX_JSON_SIZE)
new_index_json_size = len(new_index_json)
self.write_file(".tar_index.json", new_index_json)

# now we need to update the entry file
# tar_index_info = self.get_file_info(".tar_index.json")
# if tar_index_info is None:
# raise ValueError("tar_index_info is None")
# new_entry_json = json.dumps({
# 'index': {
# 'n': tar_index_info.name,
# 'o': tar_index_info.offset,
# 'd': tar_index_info.offset_data,
# 's': tar_index_info.size
# }
# }, indent=2, sort_keys=True)
# now the index has changed once again, but we assume it doesn't exceed the size
new_index_json = json.dumps(self._index, indent=2, sort_keys=True)
new_index_json = new_index_json.encode() + b" " * (new_index_json_size - len(new_index_json))
self.overwrite_file_content(".tar_index.json", new_index_json)

tar_index_info = next(file for file in self._index['files'] if file['n'] == ".tar_index.json")
new_entry_json = json.dumps({
'index': {
Expand All @@ -290,11 +309,8 @@ def _update_index(self):
}
}, indent=2, sort_keys=True)
new_entry_json = new_entry_json.encode() + b" " * (TAR_ENTRY_JSON_SIZE - len(new_entry_json))
with open(self._tar_path_or_url, "r+b") as f:
# we assume the first file is the entry file, and we assume the header is 512 bytes
# this is to avoid calling the potentially expensive getmember() method
f.seek(512)
f.write(new_entry_json)
self._file.seek(512)
self._file.write(new_entry_json)
self._file.flush()
self._index_has_changed = False

Expand Down

0 comments on commit cf4ddb1

Please sign in to comment.