Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix linking for FSSpec #138

Merged
merged 14 commits into from
Nov 17, 2023
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# HDMF-ZARR Changelog

## 0.5.0 (Upcoming)

### Enhancements
* Fix linking for FSSpec and support passing of `storage_options` required reading data from S3 #138. @alejoe91 [#120](https://github.com/hdmf-dev/hdmf-zarr/pull/138)

## 0.4.0 (October 3, 2023)

### Enhancements
Expand Down
2 changes: 2 additions & 0 deletions requirements-opt.txt
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
tqdm==4.65.0
fsspec==2023.10.0
s3fs==2023.10.0
4 changes: 3 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@
'url': 'https://github.com/hdmf-dev/hdmf-zarr',
'license': "BSD",
'install_requires': reqs,
'extras_require': {"tqdm": ["tqdm>=4.41.0"]},
'extras_require': {"tqdm": ["tqdm>=4.41.0"],
"fsspec": ["fsspec"],
"s3fs": ["s3fs"]},
'packages': pkgs,
'package_dir': {'': 'src'},
'package_data': {},
Expand Down
43 changes: 30 additions & 13 deletions src/hdmf_zarr/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ class ZarrIO(HDMFIO):
@staticmethod
def can_read(path):
try:
# TODO: how to use storage_options? Maybe easier to just check for ".zarr" suffix
zarr.open(path, mode="r")
return True
except Exception:
Expand All @@ -94,11 +95,14 @@ def can_read(path):
{'name': 'object_codec_class', 'type': None,
'doc': 'Set the numcodec object codec class to be used to encode objects.'
'Use numcodecs.pickles.Pickle by default.',
'default': None},
{'name': 'storage_options', 'type': dict,
'doc': 'Zarr storage options to read remote folders',
'default': None})
def __init__(self, **kwargs):
self.logger = logging.getLogger('%s.%s' % (self.__class__.__module__, self.__class__.__qualname__))
path, manager, mode, synchronizer, object_codec_class = popargs(
'path', 'manager', 'mode', 'synchronizer', 'object_codec_class', kwargs)
path, manager, mode, synchronizer, object_codec_class, storage_options = popargs(
'path', 'manager', 'mode', 'synchronizer', 'object_codec_class', 'storage_options', kwargs)
if manager is None:
manager = BuildManager(TypeMap(NamespaceCatalog()))
if isinstance(synchronizer, bool):
Expand All @@ -112,6 +116,7 @@ def __init__(self, **kwargs):
self.__mode = mode
self.__path = path
self.__file = None
self.__storage_options = storage_options
self.__built = dict()
self._written_builders = WriteStatusTracker() # track which builders were written (or read) by this IO object
self.__dci_queue = None # Will be initialized on call to io.write
Expand All @@ -133,7 +138,7 @@ def file(self):

@property
def path(self):
"""The path to the Zarr file as set by the use"""
"""The path to the Zarr file as set by the user"""
return self.__path

@property
Expand All @@ -154,13 +159,22 @@ def open(self):
if self.__file is None:
self.__file = zarr.open(store=self.path,
mode=self.__mode,
synchronizer=self.__synchronizer)
synchronizer=self.__synchronizer,
storage_options=self.__storage_options)

def close(self):
"""Close the Zarr file"""
self.__file = None
return

def is_remote(self):
"""Return True if the file is remote, False otherwise"""
from zarr.storage import FSStore
if isinstance(self.file.store, FSStore):
return True
else:
return False

@classmethod
@docval({'name': 'namespace_catalog',
'type': (NamespaceCatalog, TypeMap),
Expand All @@ -173,7 +187,8 @@ def load_namespaces(cls, namespace_catalog, path, namespaces=None):
'''
Load cached namespaces from a file.
'''
f = zarr.open(path, 'r')
# TODO: how to use storage_options here?
f = zarr.open(path, mode='r')
if SPEC_LOC_ATTR not in f.attrs:
msg = "No cached namespaces found in %s" % path
warnings.warn(msg)
Expand Down Expand Up @@ -624,19 +639,21 @@ def resolve_ref(self, zarr_ref):
else:
source_file = str(zarr_ref['source'])
# Resolve the path relative to the current file
source_file = os.path.abspath(os.path.join(self.source, source_file))
if not self.is_remote():
source_file = os.path.abspath(os.path.join(self.source, source_file))
else:
# get rid of extra "/" and "./" in the path root and source_file
root_path = str(self.path).rstrip("/")
source_path = str(source_file).lstrip(".")
source_file = root_path + source_path
oruebel marked this conversation as resolved.
Show resolved Hide resolved

object_path = zarr_ref.get('path', None)
# full_path = None
# if os.path.isdir(source_file):
# if object_path is not None:
# full_path = os.path.join(source_file, object_path.lstrip('/'))
# else:
# full_path = source_file
if object_path:
target_name = os.path.basename(object_path)
else:
target_name = ROOT_NAME
target_zarr_obj = zarr.open(source_file, mode='r')

target_zarr_obj = zarr.open(source_file, mode='r', storage_options=self.__storage_options)
if object_path is not None:
try:
target_zarr_obj = target_zarr_obj[object_path]
Expand Down
7 changes: 4 additions & 3 deletions src/hdmf_zarr/nwb.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ class NWBZarrIO(ZarrIO):
'doc': 'a path to a namespace, a TypeMap, or a list consisting paths to namespaces and TypeMaps',
'default': None})
def __init__(self, **kwargs):
path, mode, manager, extensions, load_namespaces, synchronizer = \
path, mode, manager, extensions, load_namespaces, synchronizer, storage_options = \
popargs('path', 'mode', 'manager', 'extensions',
'load_namespaces', 'synchronizer', kwargs)
'load_namespaces', 'synchronizer', 'storage_options', kwargs)
if load_namespaces:
if manager is not None:
warn("loading namespaces from file - ignoring 'manager'")
Expand All @@ -52,7 +52,8 @@ def __init__(self, **kwargs):
super(NWBZarrIO, self).__init__(path,
manager=manager,
mode=mode,
synchronizer=synchronizer)
synchronizer=synchronizer,
storage_options=storage_options)

@docval({'name': 'src_io', 'type': HDMFIO, 'doc': 'the HDMFIO object for reading the data to export'},
{'name': 'nwbfile', 'type': 'NWBFile',
Expand Down
26 changes: 26 additions & 0 deletions tests/unit/test_fsspec_streaming.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import unittest
from hdmf_zarr import NWBZarrIO
from .utils import check_s3fs_ffspec_installed

HAVE_FSSPEC = check_s3fs_ffspec_installed()


class TestFSSpecStreaming(unittest.TestCase):
@unittest.skipIf(not HAVE_FSSPEC, "fsspec not installed")
def test_fsspec_streaming(self):
# PLACEHOLDER test file from Allen Insitute for Neural Dynamics
# TODO: store a small test file and use it to speed up testing
remote_path = (
"s3://aind-open-data/ecephys_625749_2022-08-03_15-15-06_nwb_2023-05-16_16-34-55/"
"ecephys_625749_2022-08-03_15-15-06_nwb/"
"ecephys_625749_2022-08-03_15-15-06_experiment1_recording1.nwb.zarr/"
)

with NWBZarrIO(remote_path, mode="r", storage_options=dict(anon=True)) as io:
nwbfile = io.read()

self.assertEqual(nwbfile.identifier, "ecephys_625749_2022-08-03_15-15-06")
self.assertEqual(len(nwbfile.devices), 2)
self.assertEqual(len(nwbfile.electrode_groups), 2)
self.assertEqual(len(nwbfile.electrodes), 1152)
self.assertEqual(nwbfile.institution, "AIND")
10 changes: 10 additions & 0 deletions tests/unit/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,16 @@ def get_temp_filepath():
return temp_file.name


def check_s3fs_ffspec_installed():
"""Check if s3fs and ffspec are installed required for streaming access from S3"""
try:
import s3fs # noqa F401
import fsspec # noqa F401
return True
except ImportError:
return False


############################################
# Foo example data containers and specs
###########################################
Expand Down
Loading