Skip to content

Commit

Permalink
Improve lindi tar functionality
Browse files Browse the repository at this point in the history
  • Loading branch information
magland committed Aug 7, 2024
1 parent bbbf24e commit effbb1e
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 11 deletions.
32 changes: 23 additions & 9 deletions lindi/LindiH5pyFile/LindiH5pyFile.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@


class LindiH5pyFile(h5py.File):
def __init__(self, _zarr_group: zarr.Group, *, _zarr_store: Union[ZarrStore, None] = None, _mode: LindiFileMode = "r", _local_cache: Union[LocalCache, None] = None, _source_url_or_path: Union[str, None] = None, _source_tar_file: Union[LindiTarFile, None] = None):
def __init__(self, _zarr_group: zarr.Group, *, _zarr_store: Union[ZarrStore, None] = None, _mode: LindiFileMode = "r", _local_cache: Union[LocalCache, None] = None, _source_url_or_path: Union[str, None] = None, _source_tar_file: Union[LindiTarFile, None] = None, _close_source_tar_file_on_close: bool = False):
"""
Do not use this constructor directly. Instead, use: from_lindi_file,
from_h5py_file, from_reference_file_system, from_zarr_store, or
Expand All @@ -45,10 +45,13 @@ def __init__(self, _zarr_group: zarr.Group, *, _zarr_store: Union[ZarrStore, Non
self._local_cache = _local_cache
self._source_url_or_path = _source_url_or_path
self._source_tar_file = _source_tar_file
self._close_source_tar_file_on_close = _close_source_tar_file_on_close

# see comment in LindiH5pyGroup
self._id = f'{id(self._zarr_group)}/'

self._is_open = True

@staticmethod
def from_lindi_file(url_or_path: str, *, mode: LindiFileMode = "r", staging_area: Union[StagingArea, None] = None, local_cache: Union[LocalCache, None] = None):
"""
Expand Down Expand Up @@ -105,7 +108,7 @@ def from_hdf5_file(
)

@staticmethod
def from_reference_file_system(rfs: Union[dict, str, None], *, mode: LindiFileMode = "r", staging_area: Union[StagingArea, None] = None, local_cache: Union[LocalCache, None] = None, _source_url_or_path: Union[str, None] = None, _source_tar_file: Union[LindiTarFile, None] = None):
def from_reference_file_system(rfs: Union[dict, str, None], *, mode: LindiFileMode = "r", staging_area: Union[StagingArea, None] = None, local_cache: Union[LocalCache, None] = None, _source_url_or_path: Union[str, None] = None, _source_tar_file: Union[LindiTarFile, None] = None, _close_source_tar_file_on_close: bool = False):
"""
Create a LindiH5pyFile from a reference file system.
Expand All @@ -126,6 +129,8 @@ def from_reference_file_system(rfs: Union[dict, str, None], *, mode: LindiFileMo
Internal use only
_source_tar_file : Union[LindiTarFile, None], optional
Internal use only
_close_source_tar_file_on_close : bool, optional
Internal use only
"""
if rfs is None:
rfs = {
Expand All @@ -149,7 +154,8 @@ def from_reference_file_system(rfs: Union[dict, str, None], *, mode: LindiFileMo
mode=mode,
staging_area=staging_area,
local_cache=local_cache,
_source_tar_file=tar_file
_source_tar_file=tar_file,
_close_source_tar_file_on_close=_close_source_tar_file_on_close
)
else:
# local file
Expand Down Expand Up @@ -188,7 +194,8 @@ def from_reference_file_system(rfs: Union[dict, str, None], *, mode: LindiFileMo
staging_area=staging_area,
local_cache=local_cache,
_source_url_or_path=rfs,
_source_tar_file=tar_file
_source_tar_file=tar_file,
_close_source_tar_file_on_close=True
)
elif isinstance(rfs, dict):
# This store does not need to be closed
Expand All @@ -210,13 +217,14 @@ def from_reference_file_system(rfs: Union[dict, str, None], *, mode: LindiFileMo
mode=mode,
local_cache=local_cache,
_source_url_or_path=_source_url_or_path,
_source_tar_file=_source_tar_file
_source_tar_file=_source_tar_file,
_close_source_tar_file_on_close=_close_source_tar_file_on_close
)
else:
raise Exception(f"Unhandled type for rfs: {type(rfs)}")

@staticmethod
def from_zarr_store(zarr_store: ZarrStore, mode: LindiFileMode = "r", local_cache: Union[LocalCache, None] = None, _source_url_or_path: Union[str, None] = None, _source_tar_file: Union[LindiTarFile, None] = None):
def from_zarr_store(zarr_store: ZarrStore, mode: LindiFileMode = "r", local_cache: Union[LocalCache, None] = None, _source_url_or_path: Union[str, None] = None, _source_tar_file: Union[LindiTarFile, None] = None, _close_source_tar_file_on_close: bool = False):
"""
Create a LindiH5pyFile from a zarr store.
Expand All @@ -233,10 +241,10 @@ def from_zarr_store(zarr_store: ZarrStore, mode: LindiFileMode = "r", local_cach
# does not need to be closed
zarr_group = zarr.open(store=zarr_store, mode=mode)
assert isinstance(zarr_group, zarr.Group)
return LindiH5pyFile.from_zarr_group(zarr_group, _zarr_store=zarr_store, mode=mode, local_cache=local_cache, _source_url_or_path=_source_url_or_path, _source_tar_file=_source_tar_file)
return LindiH5pyFile.from_zarr_group(zarr_group, _zarr_store=zarr_store, mode=mode, local_cache=local_cache, _source_url_or_path=_source_url_or_path, _source_tar_file=_source_tar_file, _close_source_tar_file_on_close=_close_source_tar_file_on_close)

@staticmethod
def from_zarr_group(zarr_group: zarr.Group, *, mode: LindiFileMode = "r", _zarr_store: Union[ZarrStore, None] = None, local_cache: Union[LocalCache, None] = None, _source_url_or_path: Union[str, None] = None, _source_tar_file: Union[LindiTarFile, None] = None):
def from_zarr_group(zarr_group: zarr.Group, *, mode: LindiFileMode = "r", _zarr_store: Union[ZarrStore, None] = None, local_cache: Union[LocalCache, None] = None, _source_url_or_path: Union[str, None] = None, _source_tar_file: Union[LindiTarFile, None] = None, _close_source_tar_file_on_close: bool = False):
"""
Create a LindiH5pyFile from a zarr group.
Expand All @@ -254,7 +262,7 @@ def from_zarr_group(zarr_group: zarr.Group, *, mode: LindiFileMode = "r", _zarr_
See from_zarr_store().
"""
return LindiH5pyFile(zarr_group, _zarr_store=_zarr_store, _mode=mode, _local_cache=local_cache, _source_url_or_path=_source_url_or_path, _source_tar_file=_source_tar_file)
return LindiH5pyFile(zarr_group, _zarr_store=_zarr_store, _mode=mode, _local_cache=local_cache, _source_url_or_path=_source_url_or_path, _source_tar_file=_source_tar_file, _close_source_tar_file_on_close=_close_source_tar_file_on_close)

def to_reference_file_system(self):
"""
Expand Down Expand Up @@ -390,7 +398,13 @@ def swmr_mode(self, value): # type: ignore
raise Exception("Getting swmr_mode is not allowed")

def close(self):
if not self._is_open:
print('Warning: LINDI file already closed.')
return
self.flush()
if self._close_source_tar_file_on_close and self._source_tar_file:
self._source_tar_file.close()
self._is_open = False

def flush(self):
if self._mode != 'r' and self._source_url_or_path is not None:
Expand Down
3 changes: 1 addition & 2 deletions lindi/tar/lindi_tar.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,8 +238,7 @@ def create(fname: str, *, rfs: dict):
f.write(header)
f.write(initial_index_json)

with open(fname, "ab") as f:
f.write(b"\x00" * 1024)
f.write(b"\x00" * 1024)

# write the rfs file
tf = LindiTarFile(fname)
Expand Down

0 comments on commit effbb1e

Please sign in to comment.