Skip to content

Commit

Permalink
Merge pull request #57 from NeurodataWithoutBorders/write-json
Browse files Browse the repository at this point in the history
write to local lindi json file (3)
  • Loading branch information
magland authored May 9, 2024
2 parents f8c9cc0 + 0ea4fb0 commit aa8ab4c
Show file tree
Hide file tree
Showing 4 changed files with 176 additions and 144 deletions.
216 changes: 170 additions & 46 deletions lindi/LindiH5pyFile/LindiH5pyFile.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from typing import Union, Literal
from typing import Union, Literal, Callable
import os
import json
import tempfile
import urllib.request
Expand All @@ -12,59 +13,67 @@
from .LindiReferenceFileSystemStore import LindiReferenceFileSystemStore

from ..LindiStagingStore.StagingArea import StagingArea
from ..LindiStagingStore.LindiStagingStore import LindiStagingStore
from ..LindiStagingStore.LindiStagingStore import LindiStagingStore, _apply_templates
from ..LindiH5ZarrStore.LindiH5ZarrStoreOpts import LindiH5ZarrStoreOpts

from ..LocalCache.LocalCache import LocalCache

from ..LindiH5ZarrStore._util import _write_rfs_to_file


LindiFileMode = Literal["r", "r+", "w", "w-", "x", "a"]

# Accepts a string path to a file, uploads (or copies) it somewhere, and returns a string URL
# (or local path)
UploadFileFunc = Callable[[str], str]


class LindiH5pyFile(h5py.File):
def __init__(self, _zarr_group: zarr.Group, *, _zarr_store: Union[ZarrStore, None] = None, _mode: Literal["r", "r+"] = "r", _local_cache: Union[LocalCache, None] = None):
def __init__(self, _zarr_group: zarr.Group, *, _zarr_store: Union[ZarrStore, None] = None, _mode: LindiFileMode = "r", _local_cache: Union[LocalCache, None] = None, _local_file_path: Union[str, None] = None):
"""
Do not use this constructor directly. Instead, use:
from_reference_file_system, from_zarr_store, from_zarr_group,
or from_h5py_file
Do not use this constructor directly. Instead, use: from_lindi_file,
from_h5py_file, from_reference_file_system, from_zarr_store, or
from_zarr_group.
"""
self._zarr_group = _zarr_group
self._zarr_store = _zarr_store
self._mode: Literal['r', 'r+'] = _mode
self._mode: LindiFileMode = _mode
self._the_group = LindiH5pyGroup(_zarr_group, self)
self._local_cache = _local_cache
self._local_file_path = _local_file_path

# see comment in LindiH5pyGroup
self._id = f'{id(self._zarr_group)}/'

@staticmethod
def from_lindi_file(url_or_path: str, *, mode: Literal["r", "r+"] = "r", staging_area: Union[StagingArea, None] = None, local_cache: Union[LocalCache, None] = None):
def from_lindi_file(url_or_path: str, *, mode: LindiFileMode = "r", staging_area: Union[StagingArea, None] = None, local_cache: Union[LocalCache, None] = None, local_file_path: Union[str, None] = None):
"""
Create a LindiH5pyFile from a URL or path to a .lindi.json file.
For a description of parameters, see from_reference_file_system().
"""
return LindiH5pyFile.from_reference_file_system(url_or_path, mode=mode, staging_area=staging_area, local_cache=local_cache)
return LindiH5pyFile.from_reference_file_system(url_or_path, mode=mode, staging_area=staging_area, local_cache=local_cache, local_file_path=local_file_path)

@staticmethod
def from_hdf5_file(url_or_path: str, *, mode: Literal["r", "r+"] = "r", local_cache: Union[LocalCache, None] = None, zarr_store_opts: Union[LindiH5ZarrStoreOpts, None] = None):
def from_hdf5_file(url_or_path: str, *, mode: LindiFileMode = "r", local_cache: Union[LocalCache, None] = None, zarr_store_opts: Union[LindiH5ZarrStoreOpts, None] = None):
"""
Create a LindiH5pyFile from a URL or path to an HDF5 file.
Parameters
----------
url_or_path : str
The URL or path to the remote or local HDF5 file.
mode : Literal["r", "r+"], optional
The mode to open the file object in. Right now only "r" is
supported, by default "r".
mode : Literal["r", "r+", "w", "w-", "x", "a"], optional
The mode to open the file object in. See api docs for
h5py.File for more information on the modes, by default "r".
local_cache : Union[LocalCache, None], optional
The local cache to use for caching data chunks, by default None.
zarr_store_opts : Union[LindiH5ZarrStoreOpts, None], optional
The options to use for the zarr store, by default None.
"""
from ..LindiH5ZarrStore.LindiH5ZarrStore import LindiH5ZarrStore # avoid circular import
if mode == 'r+':
raise Exception("Opening hdf5 file in r+ mode is not supported")
if mode != "r":
raise Exception("Opening hdf5 file in write mode is not supported")
zarr_store = LindiH5ZarrStore.from_file(url_or_path, local_cache=local_cache, opts=zarr_store_opts, url=url_or_path)
return LindiH5pyFile.from_zarr_store(
zarr_store=zarr_store,
Expand All @@ -73,7 +82,7 @@ def from_hdf5_file(url_or_path: str, *, mode: Literal["r", "r+"] = "r", local_ca
)

@staticmethod
def from_reference_file_system(rfs: Union[dict, str, None], *, mode: Literal["r", "r+"] = "r", staging_area: Union[StagingArea, None] = None, local_cache: Union[LocalCache, None] = None):
def from_reference_file_system(rfs: Union[dict, str, None], *, mode: LindiFileMode = "r", staging_area: Union[StagingArea, None] = None, local_cache: Union[LocalCache, None] = None, local_file_path: Union[str, None] = None):
"""
Create a LindiH5pyFile from a reference file system.
Expand All @@ -83,19 +92,18 @@ def from_reference_file_system(rfs: Union[dict, str, None], *, mode: Literal["r"
The reference file system. This can be a dictionary or a URL or path
to a .lindi.json file. If None, an empty reference file system will
be created.
mode : Literal["r", "r+"], optional
The mode to open the file object in, by default "r". If the mode is
"r", the file object will be read-only. If the mode is "r+", the
file will be read-write. However, if the rfs is a string (URL or
path), the file itself will not be modified on changes, but the
internal in-memory representation will be modified. Use
to_reference_file_system() to export the updated reference file
system to the same file or a new file.
mode : Literal["r", "r+", "w", "w-", "x", "a"], optional
The mode to open the file object in, by default "r".
staging_area : Union[StagingArea, None], optional
The staging area to use for writing data, preparing for upload.
This is only used in write mode, by default None.
The staging area to use for writing data, preparing for upload. This
is only used in write mode, by default None.
local_cache : Union[LocalCache, None], optional
The local cache to use for caching data, by default None.
local_file_path : Union[str, None], optional
If rfs is not a string or is a remote url, this is the path to the
local file for the purpose of writing to it. It is required in this
case if mode is not "r". If rfs is a string and not a remote url, it
must be equal to local_file_path if provided.
"""
if rfs is None:
rfs = {
Expand All @@ -107,38 +115,73 @@ def from_reference_file_system(rfs: Union[dict, str, None], *, mode: Literal["r"
}

if isinstance(rfs, str):
if rfs.startswith("http://") or rfs.startswith("https://"):
rfs_is_url = rfs.startswith("http://") or rfs.startswith("https://")
if local_file_path is not None and not rfs_is_url and rfs != local_file_path:
raise Exception(f"rfs is not a remote url, so local_file_path must be the same as rfs, but got: {rfs} and {local_file_path}")
if rfs_is_url:
with tempfile.TemporaryDirectory() as tmpdir:
filename = f"{tmpdir}/temp.lindi.json"
_download_file(rfs, filename)
with open(filename, "r") as f:
data = json.load(f)
assert isinstance(data, dict) # prevent infinite recursion
return LindiH5pyFile.from_reference_file_system(data, mode=mode, staging_area=staging_area, local_cache=local_cache)
return LindiH5pyFile.from_reference_file_system(data, mode=mode, staging_area=staging_area, local_cache=local_cache, local_file_path=local_file_path)
else:
empty_rfs = {
"refs": {
'.zgroup': {
'zarr_format': 2
}
},
}
if mode == "r":
# Readonly, file must exist (default)
if not os.path.exists(rfs):
raise Exception(f"File does not exist: {rfs}")
elif mode == "r+":
# Read/write, file must exist
if not os.path.exists(rfs):
raise Exception(f"File does not exist: {rfs}")
elif mode == "w":
# Create file, truncate if exists
with open(rfs, "w") as f:
json.dump(empty_rfs, f)
elif mode in ["w-", "x"]:
# Create file, fail if exists
if os.path.exists(rfs):
raise Exception(f"File already exists: {rfs}")
with open(rfs, "w") as f:
json.dump(empty_rfs, f)
elif mode == "a":
# Read/write if exists, create otherwise
if os.path.exists(rfs):
with open(rfs, "r") as f:
data = json.load(f)
else:
raise Exception(f"Unhandled mode: {mode}")
with open(rfs, "r") as f:
data = json.load(f)
assert isinstance(data, dict) # prevent infinite recursion
return LindiH5pyFile.from_reference_file_system(data, mode=mode, staging_area=staging_area, local_cache=local_cache)
return LindiH5pyFile.from_reference_file_system(data, mode=mode, staging_area=staging_area, local_cache=local_cache, local_file_path=local_file_path)
elif isinstance(rfs, dict):
# This store does not need to be closed
store = LindiReferenceFileSystemStore(rfs, local_cache=local_cache)
if staging_area:
store = LindiStagingStore(base_store=store, staging_area=staging_area)
return LindiH5pyFile.from_zarr_store(store, mode=mode)
return LindiH5pyFile.from_zarr_store(store, mode=mode, local_file_path=local_file_path)
else:
raise Exception(f"Unhandled type for rfs: {type(rfs)}")

@staticmethod
def from_zarr_store(zarr_store: ZarrStore, mode: Literal["r", "r+"] = "r", local_cache: Union[LocalCache, None] = None):
def from_zarr_store(zarr_store: ZarrStore, mode: LindiFileMode = "r", local_cache: Union[LocalCache, None] = None, local_file_path: Union[str, None] = None):
"""
Create a LindiH5pyFile from a zarr store.
Parameters
----------
zarr_store : ZarrStore
The zarr store.
mode : Literal["r", "r+"], optional
mode : Literal["r", "r+", "w", "w-", "x", "a"], optional
The mode to open the file object in, by default "r". If the mode is
"r", the file object will be read-only. For write mode to work, the
zarr store will need to be writeable as well.
Expand All @@ -147,18 +190,18 @@ def from_zarr_store(zarr_store: ZarrStore, mode: Literal["r", "r+"] = "r", local
# does not need to be closed
zarr_group = zarr.open(store=zarr_store, mode=mode)
assert isinstance(zarr_group, zarr.Group)
return LindiH5pyFile.from_zarr_group(zarr_group, _zarr_store=zarr_store, mode=mode, local_cache=local_cache)
return LindiH5pyFile.from_zarr_group(zarr_group, _zarr_store=zarr_store, mode=mode, local_cache=local_cache, local_file_path=local_file_path)

@staticmethod
def from_zarr_group(zarr_group: zarr.Group, *, mode: Literal["r", "r+"] = "r", _zarr_store: Union[ZarrStore, None] = None, local_cache: Union[LocalCache, None] = None):
def from_zarr_group(zarr_group: zarr.Group, *, mode: LindiFileMode = "r", _zarr_store: Union[ZarrStore, None] = None, local_cache: Union[LocalCache, None] = None, local_file_path: Union[str, None] = None):
"""
Create a LindiH5pyFile from a zarr group.
Parameters
----------
zarr_group : zarr.Group
The zarr group.
mode : Literal["r", "r+"], optional
mode : Literal["r", "r+", "w", "w-", "x", "a"], optional
The mode to open the file object in, by default "r". If the mode is
"r", the file object will be read-only. For write mode to work, the
zarr store will need to be writeable as well.
Expand All @@ -168,19 +211,18 @@ def from_zarr_group(zarr_group: zarr.Group, *, mode: Literal["r", "r+"] = "r", _
See from_zarr_store().
"""
return LindiH5pyFile(zarr_group, _zarr_store=_zarr_store, _mode=mode, _local_cache=local_cache)
return LindiH5pyFile(zarr_group, _zarr_store=_zarr_store, _mode=mode, _local_cache=local_cache, _local_file_path=local_file_path)

def to_reference_file_system(self):
"""
Export the internal in-memory representation to a reference file system.
In order to use this, the file object needs to have been created using
from_reference_file_system() or from_lindi_file().
"""
from ..LindiH5ZarrStore.LindiH5ZarrStore import LindiH5ZarrStore # avoid circular import
if self._zarr_store is None:
raise Exception("Cannot convert to reference file system without zarr store")
zarr_store = self._zarr_store
if isinstance(zarr_store, LindiStagingStore):
zarr_store.consolidate_chunks()
zarr_store = zarr_store._base_store
if isinstance(zarr_store, LindiH5ZarrStore):
return zarr_store.to_reference_file_system()
Expand All @@ -192,6 +234,58 @@ def to_reference_file_system(self):
LindiReferenceFileSystemStore.use_templates_in_rfs(rfs_copy)
return rfs_copy

def upload(
self,
*,
on_upload_blob: UploadFileFunc,
on_upload_main: UploadFileFunc
):
"""
Consolidate the chunks in the staging area, upload them to a storage
system, updating the references in the base store, and then upload the
updated reference file system .json file.
Parameters
----------
on_upload_blob : StoreFileFunc
A function that takes a string path to a blob file, uploads or copies it
somewhere, and returns a string URL (or local path).
on_upload_main : StoreFileFunc
A function that takes a string path to the main .json file, stores
it somewhere, and returns a string URL (or local path).
Returns
-------
str
The URL (or local path) of the uploaded reference file system .json
file.
"""
rfs = self.to_reference_file_system()
blobs_to_upload = set()
# Get the set of all local URLs in rfs['refs']
for k, v in rfs['refs'].items():
if isinstance(v, list) and len(v) == 3:
url = _apply_templates(v[0], rfs.get('templates', {}))
if not url.startswith("http://") and not url.startswith("https://"):
local_path = url
blobs_to_upload.add(local_path)
# Upload each of the local blobs using the given upload function and get a mapping from
# the original file paths to the URLs of the uploaded files
blob_mapping = _upload_blobs(blobs_to_upload, on_upload_blob=on_upload_blob)
# Replace the local URLs in rfs['refs'] with URLs of the uploaded files
for k, v in rfs['refs'].items():
if isinstance(v, list) and len(v) == 3:
url1 = _apply_templates(v[0], rfs.get('templates', {}))
url2 = blob_mapping.get(url1, None)
if url2 is not None:
v[0] = url2
# Write the updated LINDI file to a temp directory and upload it
with tempfile.TemporaryDirectory() as tmpdir:
rfs_fname = f"{tmpdir}/rfs.lindi.json"
LindiReferenceFileSystemStore.use_templates_in_rfs(rfs)
_write_rfs_to_file(rfs=rfs, output_file_name=rfs_fname)
return on_upload_main(rfs_fname)

def write_lindi_file(self, filename: str):
"""
Write the reference file system to a .lindi.json file.
Expand Down Expand Up @@ -220,7 +314,7 @@ def driver(self):

@property
def mode(self):
return self._mode
return 'r' if self._mode == 'r' else 'r+'

@property
def libver(self):
Expand All @@ -238,11 +332,12 @@ def swmr_mode(self, value): # type: ignore
raise Exception("Getting swmr_mode is not allowed")

def close(self):
# Nothing was opened, so nothing needs to be closed
pass
self.flush()

def flush(self):
pass
if self._mode != 'r' and self._local_file_path is not None:
rfs = self.to_reference_file_system()
_write_rfs_to_file(rfs=rfs, output_file_name=self._local_file_path)

def __enter__(self): # type: ignore
return self
Expand Down Expand Up @@ -357,24 +452,24 @@ def ref(self):
##############################
# write
def create_group(self, name, track_order=None):
if self._mode not in ['r+']:
if self._mode == 'r':
raise Exception("Cannot create group in read-only mode")
if track_order is not None:
raise Exception("track_order is not supported (I don't know what it is)")
return self._the_group.create_group(name)

def require_group(self, name):
if self._mode not in ['r+']:
if self._mode == 'r':
raise Exception("Cannot require group in read-only mode")
return self._the_group.require_group(name)

def create_dataset(self, name, shape=None, dtype=None, data=None, **kwds):
if self._mode not in ['r+']:
if self._mode == 'r':
raise Exception("Cannot create dataset in read-only mode")
return self._the_group.create_dataset(name, shape=shape, dtype=dtype, data=data, **kwds)

def require_dataset(self, name, shape, dtype, exact=False, **kwds):
if self._mode not in ['r+']:
if self._mode == 'r':
raise Exception("Cannot require dataset in read-only mode")
return self._the_group.require_dataset(name, shape, dtype, exact=exact, **kwds)

Expand Down Expand Up @@ -453,3 +548,32 @@ def _deep_copy(obj):
return tuple(_deep_copy(v) for v in obj)
else:
return obj


def _upload_blobs(
blobs: set,
*,
on_upload_blob: UploadFileFunc
) -> dict:
"""
Upload all the blobs in a set to a storage system and return a mapping from
the original file paths to the URLs of the uploaded files.
"""
blob_mapping = {}
for i, blob in enumerate(blobs):
size = os.path.getsize(blob)
print(f'Uploading blob {i + 1} of {len(blobs)} {blob} ({_format_size_bytes(size)})')
blob_url = on_upload_blob(blob)
blob_mapping[blob] = blob_url
return blob_mapping


def _format_size_bytes(size_bytes: int) -> str:
if size_bytes < 1024:
return f"{size_bytes} bytes"
elif size_bytes < 1024 * 1024:
return f"{size_bytes / 1024:.1f} KB"
elif size_bytes < 1024 * 1024 * 1024:
return f"{size_bytes / 1024 / 1024:.1f} MB"
else:
return f"{size_bytes / 1024 / 1024 / 1024:.1f} GB"
Loading

0 comments on commit aa8ab4c

Please sign in to comment.