diff --git a/lindi/LindiH5pyFile/LindiH5pyFile.py b/lindi/LindiH5pyFile/LindiH5pyFile.py index 0d8272f..aa891a1 100644 --- a/lindi/LindiH5pyFile/LindiH5pyFile.py +++ b/lindi/LindiH5pyFile/LindiH5pyFile.py @@ -1,4 +1,5 @@ -from typing import Union, Literal +from typing import Union, Literal, Callable +import os import json import tempfile import urllib.request @@ -12,7 +13,7 @@ from .LindiReferenceFileSystemStore import LindiReferenceFileSystemStore from ..LindiStagingStore.StagingArea import StagingArea -from ..LindiStagingStore.LindiStagingStore import LindiStagingStore +from ..LindiStagingStore.LindiStagingStore import LindiStagingStore, _apply_templates from ..LindiH5ZarrStore.LindiH5ZarrStoreOpts import LindiH5ZarrStoreOpts from ..LocalCache.LocalCache import LocalCache @@ -20,33 +21,41 @@ from ..LindiH5ZarrStore._util import _write_rfs_to_file +LindiFileMode = Literal["r", "r+", "w", "w-", "x", "a"] + +# Accepts a string path to a file, uploads (or copies) it somewhere, and returns a string URL +# (or local path) +UploadFileFunc = Callable[[str], str] + + class LindiH5pyFile(h5py.File): - def __init__(self, _zarr_group: zarr.Group, *, _zarr_store: Union[ZarrStore, None] = None, _mode: Literal["r", "r+"] = "r", _local_cache: Union[LocalCache, None] = None): + def __init__(self, _zarr_group: zarr.Group, *, _zarr_store: Union[ZarrStore, None] = None, _mode: LindiFileMode = "r", _local_cache: Union[LocalCache, None] = None, _local_file_path: Union[str, None] = None): """ - Do not use this constructor directly. Instead, use: - from_reference_file_system, from_zarr_store, from_zarr_group, - or from_h5py_file + Do not use this constructor directly. Instead, use: from_lindi_file, + from_h5py_file, from_reference_file_system, from_zarr_store, or + from_zarr_group. """ self._zarr_group = _zarr_group self._zarr_store = _zarr_store - self._mode: Literal['r', 'r+'] = _mode + self._mode: LindiFileMode = _mode self._the_group = LindiH5pyGroup(_zarr_group, self) self._local_cache = _local_cache + self._local_file_path = _local_file_path # see comment in LindiH5pyGroup self._id = f'{id(self._zarr_group)}/' @staticmethod - def from_lindi_file(url_or_path: str, *, mode: Literal["r", "r+"] = "r", staging_area: Union[StagingArea, None] = None, local_cache: Union[LocalCache, None] = None): + def from_lindi_file(url_or_path: str, *, mode: LindiFileMode = "r", staging_area: Union[StagingArea, None] = None, local_cache: Union[LocalCache, None] = None, local_file_path: Union[str, None] = None): """ Create a LindiH5pyFile from a URL or path to a .lindi.json file. For a description of parameters, see from_reference_file_system(). """ - return LindiH5pyFile.from_reference_file_system(url_or_path, mode=mode, staging_area=staging_area, local_cache=local_cache) + return LindiH5pyFile.from_reference_file_system(url_or_path, mode=mode, staging_area=staging_area, local_cache=local_cache, local_file_path=local_file_path) @staticmethod - def from_hdf5_file(url_or_path: str, *, mode: Literal["r", "r+"] = "r", local_cache: Union[LocalCache, None] = None, zarr_store_opts: Union[LindiH5ZarrStoreOpts, None] = None): + def from_hdf5_file(url_or_path: str, *, mode: LindiFileMode = "r", local_cache: Union[LocalCache, None] = None, zarr_store_opts: Union[LindiH5ZarrStoreOpts, None] = None): """ Create a LindiH5pyFile from a URL or path to an HDF5 file. @@ -54,17 +63,17 @@ def from_hdf5_file(url_or_path: str, *, mode: Literal["r", "r+"] = "r", local_ca ---------- url_or_path : str The URL or path to the remote or local HDF5 file. - mode : Literal["r", "r+"], optional - The mode to open the file object in. Right now only "r" is - supported, by default "r". + mode : Literal["r", "r+", "w", "w-", "x", "a"], optional + The mode to open the file object in. See api docs for + h5py.File for more information on the modes, by default "r". local_cache : Union[LocalCache, None], optional The local cache to use for caching data chunks, by default None. zarr_store_opts : Union[LindiH5ZarrStoreOpts, None], optional The options to use for the zarr store, by default None. """ from ..LindiH5ZarrStore.LindiH5ZarrStore import LindiH5ZarrStore # avoid circular import - if mode == 'r+': - raise Exception("Opening hdf5 file in r+ mode is not supported") + if mode != "r": + raise Exception("Opening hdf5 file in write mode is not supported") zarr_store = LindiH5ZarrStore.from_file(url_or_path, local_cache=local_cache, opts=zarr_store_opts, url=url_or_path) return LindiH5pyFile.from_zarr_store( zarr_store=zarr_store, @@ -73,7 +82,7 @@ def from_hdf5_file(url_or_path: str, *, mode: Literal["r", "r+"] = "r", local_ca ) @staticmethod - def from_reference_file_system(rfs: Union[dict, str, None], *, mode: Literal["r", "r+"] = "r", staging_area: Union[StagingArea, None] = None, local_cache: Union[LocalCache, None] = None): + def from_reference_file_system(rfs: Union[dict, str, None], *, mode: LindiFileMode = "r", staging_area: Union[StagingArea, None] = None, local_cache: Union[LocalCache, None] = None, local_file_path: Union[str, None] = None): """ Create a LindiH5pyFile from a reference file system. @@ -83,19 +92,18 @@ def from_reference_file_system(rfs: Union[dict, str, None], *, mode: Literal["r" The reference file system. This can be a dictionary or a URL or path to a .lindi.json file. If None, an empty reference file system will be created. - mode : Literal["r", "r+"], optional - The mode to open the file object in, by default "r". If the mode is - "r", the file object will be read-only. If the mode is "r+", the - file will be read-write. However, if the rfs is a string (URL or - path), the file itself will not be modified on changes, but the - internal in-memory representation will be modified. Use - to_reference_file_system() to export the updated reference file - system to the same file or a new file. + mode : Literal["r", "r+", "w", "w-", "x", "a"], optional + The mode to open the file object in, by default "r". staging_area : Union[StagingArea, None], optional - The staging area to use for writing data, preparing for upload. - This is only used in write mode, by default None. + The staging area to use for writing data, preparing for upload. This + is only used in write mode, by default None. local_cache : Union[LocalCache, None], optional The local cache to use for caching data, by default None. + local_file_path : Union[str, None], optional + If rfs is not a string or is a remote url, this is the path to the + local file for the purpose of writing to it. It is required in this + case if mode is not "r". If rfs is a string and not a remote url, it + must be equal to local_file_path if provided. """ if rfs is None: rfs = { @@ -107,30 +115,65 @@ def from_reference_file_system(rfs: Union[dict, str, None], *, mode: Literal["r" } if isinstance(rfs, str): - if rfs.startswith("http://") or rfs.startswith("https://"): + rfs_is_url = rfs.startswith("http://") or rfs.startswith("https://") + if local_file_path is not None and not rfs_is_url and rfs != local_file_path: + raise Exception(f"rfs is not a remote url, so local_file_path must be the same as rfs, but got: {rfs} and {local_file_path}") + if rfs_is_url: with tempfile.TemporaryDirectory() as tmpdir: filename = f"{tmpdir}/temp.lindi.json" _download_file(rfs, filename) with open(filename, "r") as f: data = json.load(f) assert isinstance(data, dict) # prevent infinite recursion - return LindiH5pyFile.from_reference_file_system(data, mode=mode, staging_area=staging_area, local_cache=local_cache) + return LindiH5pyFile.from_reference_file_system(data, mode=mode, staging_area=staging_area, local_cache=local_cache, local_file_path=local_file_path) else: + empty_rfs = { + "refs": { + '.zgroup': { + 'zarr_format': 2 + } + }, + } + if mode == "r": + # Readonly, file must exist (default) + if not os.path.exists(rfs): + raise Exception(f"File does not exist: {rfs}") + elif mode == "r+": + # Read/write, file must exist + if not os.path.exists(rfs): + raise Exception(f"File does not exist: {rfs}") + elif mode == "w": + # Create file, truncate if exists + with open(rfs, "w") as f: + json.dump(empty_rfs, f) + elif mode in ["w-", "x"]: + # Create file, fail if exists + if os.path.exists(rfs): + raise Exception(f"File already exists: {rfs}") + with open(rfs, "w") as f: + json.dump(empty_rfs, f) + elif mode == "a": + # Read/write if exists, create otherwise + if os.path.exists(rfs): + with open(rfs, "r") as f: + data = json.load(f) + else: + raise Exception(f"Unhandled mode: {mode}") with open(rfs, "r") as f: data = json.load(f) assert isinstance(data, dict) # prevent infinite recursion - return LindiH5pyFile.from_reference_file_system(data, mode=mode, staging_area=staging_area, local_cache=local_cache) + return LindiH5pyFile.from_reference_file_system(data, mode=mode, staging_area=staging_area, local_cache=local_cache, local_file_path=local_file_path) elif isinstance(rfs, dict): # This store does not need to be closed store = LindiReferenceFileSystemStore(rfs, local_cache=local_cache) if staging_area: store = LindiStagingStore(base_store=store, staging_area=staging_area) - return LindiH5pyFile.from_zarr_store(store, mode=mode) + return LindiH5pyFile.from_zarr_store(store, mode=mode, local_file_path=local_file_path) else: raise Exception(f"Unhandled type for rfs: {type(rfs)}") @staticmethod - def from_zarr_store(zarr_store: ZarrStore, mode: Literal["r", "r+"] = "r", local_cache: Union[LocalCache, None] = None): + def from_zarr_store(zarr_store: ZarrStore, mode: LindiFileMode = "r", local_cache: Union[LocalCache, None] = None, local_file_path: Union[str, None] = None): """ Create a LindiH5pyFile from a zarr store. @@ -138,7 +181,7 @@ def from_zarr_store(zarr_store: ZarrStore, mode: Literal["r", "r+"] = "r", local ---------- zarr_store : ZarrStore The zarr store. - mode : Literal["r", "r+"], optional + mode : Literal["r", "r+", "w", "w-", "x", "a"], optional The mode to open the file object in, by default "r". If the mode is "r", the file object will be read-only. For write mode to work, the zarr store will need to be writeable as well. @@ -147,10 +190,10 @@ def from_zarr_store(zarr_store: ZarrStore, mode: Literal["r", "r+"] = "r", local # does not need to be closed zarr_group = zarr.open(store=zarr_store, mode=mode) assert isinstance(zarr_group, zarr.Group) - return LindiH5pyFile.from_zarr_group(zarr_group, _zarr_store=zarr_store, mode=mode, local_cache=local_cache) + return LindiH5pyFile.from_zarr_group(zarr_group, _zarr_store=zarr_store, mode=mode, local_cache=local_cache, local_file_path=local_file_path) @staticmethod - def from_zarr_group(zarr_group: zarr.Group, *, mode: Literal["r", "r+"] = "r", _zarr_store: Union[ZarrStore, None] = None, local_cache: Union[LocalCache, None] = None): + def from_zarr_group(zarr_group: zarr.Group, *, mode: LindiFileMode = "r", _zarr_store: Union[ZarrStore, None] = None, local_cache: Union[LocalCache, None] = None, local_file_path: Union[str, None] = None): """ Create a LindiH5pyFile from a zarr group. @@ -158,7 +201,7 @@ def from_zarr_group(zarr_group: zarr.Group, *, mode: Literal["r", "r+"] = "r", _ ---------- zarr_group : zarr.Group The zarr group. - mode : Literal["r", "r+"], optional + mode : Literal["r", "r+", "w", "w-", "x", "a"], optional The mode to open the file object in, by default "r". If the mode is "r", the file object will be read-only. For write mode to work, the zarr store will need to be writeable as well. @@ -168,19 +211,18 @@ def from_zarr_group(zarr_group: zarr.Group, *, mode: Literal["r", "r+"] = "r", _ See from_zarr_store(). """ - return LindiH5pyFile(zarr_group, _zarr_store=_zarr_store, _mode=mode, _local_cache=local_cache) + return LindiH5pyFile(zarr_group, _zarr_store=_zarr_store, _mode=mode, _local_cache=local_cache, _local_file_path=local_file_path) def to_reference_file_system(self): """ Export the internal in-memory representation to a reference file system. - In order to use this, the file object needs to have been created using - from_reference_file_system() or from_lindi_file(). """ from ..LindiH5ZarrStore.LindiH5ZarrStore import LindiH5ZarrStore # avoid circular import if self._zarr_store is None: raise Exception("Cannot convert to reference file system without zarr store") zarr_store = self._zarr_store if isinstance(zarr_store, LindiStagingStore): + zarr_store.consolidate_chunks() zarr_store = zarr_store._base_store if isinstance(zarr_store, LindiH5ZarrStore): return zarr_store.to_reference_file_system() @@ -192,6 +234,58 @@ def to_reference_file_system(self): LindiReferenceFileSystemStore.use_templates_in_rfs(rfs_copy) return rfs_copy + def upload( + self, + *, + on_upload_blob: UploadFileFunc, + on_upload_main: UploadFileFunc + ): + """ + Consolidate the chunks in the staging area, upload them to a storage + system, updating the references in the base store, and then upload the + updated reference file system .json file. + + Parameters + ---------- + on_upload_blob : StoreFileFunc + A function that takes a string path to a blob file, uploads or copies it + somewhere, and returns a string URL (or local path). + on_upload_main : StoreFileFunc + A function that takes a string path to the main .json file, stores + it somewhere, and returns a string URL (or local path). + + Returns + ------- + str + The URL (or local path) of the uploaded reference file system .json + file. + """ + rfs = self.to_reference_file_system() + blobs_to_upload = set() + # Get the set of all local URLs in rfs['refs'] + for k, v in rfs['refs'].items(): + if isinstance(v, list) and len(v) == 3: + url = _apply_templates(v[0], rfs.get('templates', {})) + if not url.startswith("http://") and not url.startswith("https://"): + local_path = url + blobs_to_upload.add(local_path) + # Upload each of the local blobs using the given upload function and get a mapping from + # the original file paths to the URLs of the uploaded files + blob_mapping = _upload_blobs(blobs_to_upload, on_upload_blob=on_upload_blob) + # Replace the local URLs in rfs['refs'] with URLs of the uploaded files + for k, v in rfs['refs'].items(): + if isinstance(v, list) and len(v) == 3: + url1 = _apply_templates(v[0], rfs.get('templates', {})) + url2 = blob_mapping.get(url1, None) + if url2 is not None: + v[0] = url2 + # Write the updated LINDI file to a temp directory and upload it + with tempfile.TemporaryDirectory() as tmpdir: + rfs_fname = f"{tmpdir}/rfs.lindi.json" + LindiReferenceFileSystemStore.use_templates_in_rfs(rfs) + _write_rfs_to_file(rfs=rfs, output_file_name=rfs_fname) + return on_upload_main(rfs_fname) + def write_lindi_file(self, filename: str): """ Write the reference file system to a .lindi.json file. @@ -220,7 +314,7 @@ def driver(self): @property def mode(self): - return self._mode + return 'r' if self._mode == 'r' else 'r+' @property def libver(self): @@ -238,11 +332,12 @@ def swmr_mode(self, value): # type: ignore raise Exception("Getting swmr_mode is not allowed") def close(self): - # Nothing was opened, so nothing needs to be closed - pass + self.flush() def flush(self): - pass + if self._mode != 'r' and self._local_file_path is not None: + rfs = self.to_reference_file_system() + _write_rfs_to_file(rfs=rfs, output_file_name=self._local_file_path) def __enter__(self): # type: ignore return self @@ -357,24 +452,24 @@ def ref(self): ############################## # write def create_group(self, name, track_order=None): - if self._mode not in ['r+']: + if self._mode == 'r': raise Exception("Cannot create group in read-only mode") if track_order is not None: raise Exception("track_order is not supported (I don't know what it is)") return self._the_group.create_group(name) def require_group(self, name): - if self._mode not in ['r+']: + if self._mode == 'r': raise Exception("Cannot require group in read-only mode") return self._the_group.require_group(name) def create_dataset(self, name, shape=None, dtype=None, data=None, **kwds): - if self._mode not in ['r+']: + if self._mode == 'r': raise Exception("Cannot create dataset in read-only mode") return self._the_group.create_dataset(name, shape=shape, dtype=dtype, data=data, **kwds) def require_dataset(self, name, shape, dtype, exact=False, **kwds): - if self._mode not in ['r+']: + if self._mode == 'r': raise Exception("Cannot require dataset in read-only mode") return self._the_group.require_dataset(name, shape, dtype, exact=exact, **kwds) @@ -453,3 +548,32 @@ def _deep_copy(obj): return tuple(_deep_copy(v) for v in obj) else: return obj + + +def _upload_blobs( + blobs: set, + *, + on_upload_blob: UploadFileFunc +) -> dict: + """ + Upload all the blobs in a set to a storage system and return a mapping from + the original file paths to the URLs of the uploaded files. + """ + blob_mapping = {} + for i, blob in enumerate(blobs): + size = os.path.getsize(blob) + print(f'Uploading blob {i + 1} of {len(blobs)} {blob} ({_format_size_bytes(size)})') + blob_url = on_upload_blob(blob) + blob_mapping[blob] = blob_url + return blob_mapping + + +def _format_size_bytes(size_bytes: int) -> str: + if size_bytes < 1024: + return f"{size_bytes} bytes" + elif size_bytes < 1024 * 1024: + return f"{size_bytes / 1024:.1f} KB" + elif size_bytes < 1024 * 1024 * 1024: + return f"{size_bytes / 1024 / 1024:.1f} MB" + else: + return f"{size_bytes / 1024 / 1024 / 1024:.1f} GB" diff --git a/lindi/LindiH5pyFile/LindiReferenceFileSystemStore.py b/lindi/LindiH5pyFile/LindiReferenceFileSystemStore.py index 9225e00..825e162 100644 --- a/lindi/LindiH5pyFile/LindiReferenceFileSystemStore.py +++ b/lindi/LindiH5pyFile/LindiReferenceFileSystemStore.py @@ -213,7 +213,11 @@ def use_templates_in_rfs(rfs: dict) -> None: urls_with_many_occurrences = sorted([url for url, count in url_counts.items() if count >= 5]) new_templates = rfs.get('templates', {}) template_names_for_urls: Dict[str, str] = {} + for template_name, url in new_templates.items(): + template_names_for_urls[url] = template_name for url in urls_with_many_occurrences: + if url in template_names_for_urls: + continue i = 1 while f'u{i}' in new_templates: i += 1 diff --git a/lindi/LindiStagingStore/LindiStagingStore.py b/lindi/LindiStagingStore/LindiStagingStore.py index b95498c..019d982 100644 --- a/lindi/LindiStagingStore/LindiStagingStore.py +++ b/lindi/LindiStagingStore/LindiStagingStore.py @@ -1,16 +1,7 @@ -from typing import Callable -import json -import tempfile import os from zarr.storage import Store as ZarrStore from ..LindiH5pyFile.LindiReferenceFileSystemStore import LindiReferenceFileSystemStore from .StagingArea import StagingArea, _random_str -from ..LindiH5ZarrStore._util import _write_rfs_to_file - - -# Accepts a string path to a file, uploads (or copies) it somewhere, and returns a string URL -# (or local path) -UploadFileFunc = Callable[[str], str] class LindiStagingStore(ZarrStore): @@ -94,61 +85,9 @@ def _set_ref_reference(self, key: str, filename: str, offset: int, size: int): size ] - def upload( - self, - *, - on_upload_blob: UploadFileFunc, - on_upload_main: UploadFileFunc, - consolidate_chunks: bool = True - ): - """ - Consolidate the chunks in the staging area, upload them to a storage - system, updating the references in the base store, and then upload the - updated reference file system .json file. - - Parameters - ---------- - on_upload_blob : StoreFileFunc - A function that takes a string path to a blob file, uploads or copies it - somewhere, and returns a string URL (or local path). - on_upload_main : StoreFileFunc - A function that takes a string path to the main .json file, stores - it somewhere, and returns a string URL (or local path). - consolidate_chunks : bool - If True (the default), consolidate the chunks in the staging area - before uploading. - - Returns - ------- - str - The URL (or local path) of the uploaded reference file system .json - file. - """ - if consolidate_chunks: - self.consolidate_chunks() - rfs = self._base_store.rfs - rfs = json.loads(json.dumps(rfs)) # deep copy - LindiReferenceFileSystemStore.replace_meta_file_contents_with_dicts_in_rfs(rfs) - blob_mapping = _upload_directory_of_blobs(self._staging_area.directory, on_upload_blob=on_upload_blob) - for k, v in rfs['refs'].items(): - if isinstance(v, list) and len(v) == 3: - url1 = v[0] - if url1.startswith(self._staging_area.directory + '/'): - url2 = blob_mapping.get(url1, None) - if url2 is None: - raise ValueError(f"Could not find url in blob mapping: {url1}") - rfs['refs'][k][0] = url2 - with tempfile.TemporaryDirectory() as tmpdir: - rfs_fname = f"{tmpdir}/rfs.lindi.json" - LindiReferenceFileSystemStore.use_templates_in_rfs(rfs) - _write_rfs_to_file(rfs=rfs, output_file_name=rfs_fname) - return on_upload_main(rfs_fname) - def consolidate_chunks(self): """ Consolidate the chunks in the staging area. - - This method is called by `upload` if `consolidate_chunks` is True. """ rfs = self._base_store.rfs refs_keys_by_reference_parent_path = {} @@ -273,40 +212,6 @@ def _chunk_key(fname: str) -> tuple: return sorted(files, key=_chunk_key) -def _upload_directory_of_blobs( - staging_dir: str, - on_upload_blob: UploadFileFunc -) -> dict: - """ - Upload all the files in a directory to a storage system and return a mapping - from the original file paths to the URLs of the uploaded files. - """ - all_files = [] - for root, dirs, files in os.walk(staging_dir): - for fname in files: - full_fname = f"{root}/{fname}" - all_files.append(full_fname) - blob_mapping = {} - for i, full_fname in enumerate(all_files): - relative_fname = full_fname[len(staging_dir):] - size_bytes = os.path.getsize(full_fname) - print(f'Uploading blob {i + 1} of {len(all_files)} {relative_fname} ({_format_size_bytes(size_bytes)})') - blob_url = on_upload_blob(full_fname) - blob_mapping[full_fname] = blob_url - return blob_mapping - - -def _format_size_bytes(size_bytes: int) -> str: - if size_bytes < 1024: - return f"{size_bytes} bytes" - elif size_bytes < 1024 * 1024: - return f"{size_bytes / 1024:.1f} KB" - elif size_bytes < 1024 * 1024 * 1024: - return f"{size_bytes / 1024 / 1024:.1f} MB" - else: - return f"{size_bytes / 1024 / 1024 / 1024:.1f} GB" - - def _read_chunk_data(filename: str, offset: int, size: int) -> bytes: with open(filename, "rb") as f: f.seek(offset) diff --git a/tests/test_staging_area.py b/tests/test_staging_area.py index c2e2847..fbe4758 100644 --- a/tests/test_staging_area.py +++ b/tests/test_staging_area.py @@ -36,10 +36,9 @@ def on_upload_main(fname: str): return output_fname assert client.staging_store - client.staging_store.upload( + client.upload( on_upload_blob=on_upload_blob, - on_upload_main=on_upload_main, - consolidate_chunks=True + on_upload_main=on_upload_main ) client3 = lindi.LindiH5pyFile.from_lindi_file(output_fname, mode='r')