Skip to content

Commit

Permalink
Parallel zarr with Queue instance attributes (#118)
Browse files Browse the repository at this point in the history
* Added on-node parallel write support for the ``ZarrIO``.
* add optional requirements and some test integration for that
* readthedocs optional requirements
* update coverage CI with optional requirements
* update external links CI with optional requirements
* modular scope on parallel helpers

Co-authored-by: Ryan Ly <[email protected]>
Co-authored-by: Oliver Ruebel <[email protected]>
  • Loading branch information
3 people authored Oct 1, 2023
1 parent c262481 commit 9f6c386
Show file tree
Hide file tree
Showing 13 changed files with 713 additions and 98 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/check_external_links.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ jobs:
- name: Install Sphinx dependencies and package
run: |
python -m pip install --upgrade pip
python -m pip install -r requirements-doc.txt -r requirements.txt
python -m pip install -r requirements-doc.txt -r requirements.txt -r requirements-opt.txt
python -m pip install .
- name: Check Sphinx external links
run: sphinx-build -b linkcheck ./docs/source ./test_build
2 changes: 1 addition & 1 deletion .github/workflows/run_coverage.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ jobs:
- name: Install dependencies
run: |
python -m pip install --upgrade pip
python -m pip install -r requirements-dev.txt -r requirements.txt
python -m pip install -r requirements-dev.txt -r requirements.txt -r requirements-opt.txt
- name: Install package
run: |
Expand Down
1 change: 1 addition & 0 deletions .readthedocs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ python:
install:
- requirements: requirements-doc.txt
- requirements: requirements.txt
- requirements: requirements-opt.txt
- path: . # path to the package relative to the root

# Optionally include all submodules
Expand Down
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
* Fixed error in deploy workflow. @mavaylon1 [#109](https://github.com/hdmf-dev/hdmf-zarr/pull/109)
* Fixed build error for ReadtheDocs by degrading numpy for python 3.7 support. @mavaylon1 [#115](https://github.com/hdmf-dev/hdmf-zarr/pull/115)

### New Features
* Added parallel write support for the ``ZarrIO``. @CodyCBakerPhD [#118](https://github.com/hdmf-dev/hdmf-zarr/pull/118)


## 0.3.0 (July 21, 2023)

Expand Down
2 changes: 1 addition & 1 deletion MANIFEST.in
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
include LICENSE.txt versioneer.py src/hdmf_zarr/_version.py src/hdmf_zarr/_due.py
include requirements.txt requirements-dev.txt requirements-doc.txt
include requirements.txt requirements-dev.txt requirements-doc.txt requirements-opt.txt
include test.py tox.ini
graft tests
1 change: 1 addition & 0 deletions requirements-min.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ numcodecs==0.9.1
pynwb==2.5.0
setuptools
importlib_resources;python_version<'3.9' # Remove when python 3.9 becomes the new minimum
threadpoolctl==3.1.0
1 change: 1 addition & 0 deletions requirements-opt.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
tqdm==4.65.0
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@
hdmf==3.9.0
zarr==2.11.0
pynwb==2.5.0
numpy==1.24
numpy==1.24.0
numcodecs==0.11.0
threadpoolctl==3.2.0
2 changes: 2 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
'numcodecs==0.11.0',
'pynwb>=2.5.0',
'setuptools',
'threadpoolctl>=3.1.0',
]

print(reqs)
Expand All @@ -40,6 +41,7 @@
'url': 'https://github.com/hdmf-dev/hdmf-zarr',
'license': "BSD",
'install_requires': reqs,
'extras_require': {"tqdm": ["tqdm>=4.41.0"]},
'packages': pkgs,
'package_dir': {'': 'src'},
'package_data': {},
Expand Down
220 changes: 157 additions & 63 deletions src/hdmf_zarr/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ def __init__(self, **kwargs):
self.__file = None
self.__built = dict()
self._written_builders = WriteStatusTracker() # track which builders were written (or read) by this IO object
self.__dci_queue = ZarrIODataChunkIteratorQueue() # a queue of DataChunkIterators that need to be exhausted
self.__dci_queue = None # Will be initialized on call to io.write
# Codec class to be used. Alternates, e.g., =numcodecs.JSON
self.__codec_cls = numcodecs.pickles.Pickle if object_codec_class is None else object_codec_class
source_path = self.__path
Expand Down Expand Up @@ -188,17 +188,54 @@ def load_namespaces(cls, namespace_catalog, path, namespaces=None):
reader = ZarrSpecReader(ns_group)
namespace_catalog.load_namespaces('namespace', reader=reader)

@docval({'name': 'container', 'type': Container, 'doc': 'the Container object to write'},
{'name': 'cache_spec', 'type': bool, 'doc': 'cache specification to file', 'default': True},
{'name': 'link_data', 'type': bool,
'doc': 'If not specified otherwise link (True) or copy (False) Datasets', 'default': True},
{'name': 'exhaust_dci', 'type': bool,
'doc': 'exhaust DataChunkIterators one at a time. If False, add ' +
'them to the internal queue self.__dci_queue and exhaust them concurrently at the end',
'default': True},)
@docval(
{'name': 'container', 'type': Container, 'doc': 'the Container object to write'},
{'name': 'cache_spec', 'type': bool, 'doc': 'cache specification to file', 'default': True},
{'name': 'link_data', 'type': bool,
'doc': 'If not specified otherwise link (True) or copy (False) Datasets', 'default': True},
{'name': 'exhaust_dci', 'type': bool,
'doc': 'exhaust DataChunkIterators one at a time. If False, add ' +
'them to the internal queue self.__dci_queue and exhaust them concurrently at the end',
'default': True},
{
"name": "number_of_jobs",
"type": int,
"doc": (
"Number of jobs to use in parallel during write "
"(only works with GenericDataChunkIterator-wrapped datasets)."
),
"default": 1,
},
{
"name": "max_threads_per_process",
"type": int,
"doc": (
"Limits the number of threads used by each process. The default is None (no limits)."
),
"default": None,
},
{
"name": "multiprocessing_context",
"type": str,
"doc": (
"Context for multiprocessing. It can be None (default), 'fork' or 'spawn'. "
"Note that 'fork' is only available on UNIX systems (not Windows)."
),
"default": None,
},
)
def write(self, **kwargs):
"""Overwrite the write method to add support for caching the specification"""
cache_spec = popargs('cache_spec', kwargs)
"""Overwrite the write method to add support for caching the specification and parallelization."""
cache_spec, number_of_jobs, max_threads_per_process, multiprocessing_context = popargs(
"cache_spec", "number_of_jobs", "max_threads_per_process", "multiprocessing_context", kwargs
)

self.__dci_queue = ZarrIODataChunkIteratorQueue(
number_of_jobs=number_of_jobs,
max_threads_per_process=max_threads_per_process,
multiprocessing_context=multiprocessing_context,
)

super(ZarrIO, self).write(**kwargs)
if cache_spec:
self.__cache_spec()
Expand All @@ -225,8 +262,36 @@ def __cache_spec(self):
writer = ZarrSpecWriter(ns_group)
ns_builder.export('namespace', writer=writer)

@docval(*get_docval(HDMFIO.export),
{'name': 'cache_spec', 'type': bool, 'doc': 'whether to cache the specification to file', 'default': True})
@docval(
*get_docval(HDMFIO.export),
{'name': 'cache_spec', 'type': bool, 'doc': 'whether to cache the specification to file', 'default': True},
{
"name": "number_of_jobs",
"type": int,
"doc": (
"Number of jobs to use in parallel during write "
"(only works with GenericDataChunkIterator-wrapped datasets)."
),
"default": 1,
},
{
"name": "max_threads_per_process",
"type": int,
"doc": (
"Limits the number of threads used by each process. The default is None (no limits)."
),
"default": None,
},
{
"name": "multiprocessing_context",
"type": str,
"doc": (
"Context for multiprocessing. It can be None (default), 'fork' or 'spawn'. "
"Note that 'fork' is only available on UNIX systems (not Windows)."
),
"default": None,
},
)
def export(self, **kwargs):
"""Export data read from a file from any backend to Zarr.
See :py:meth:`hdmf.backends.io.HDMFIO.export` for more details.
Expand All @@ -237,6 +302,15 @@ def export(self, **kwargs):

src_io = getargs('src_io', kwargs)
write_args, cache_spec = popargs('write_args', 'cache_spec', kwargs)
number_of_jobs, max_threads_per_process, multiprocessing_context = popargs(
"number_of_jobs", "max_threads_per_process", "multiprocessing_context", kwargs
)

self.__dci_queue = ZarrIODataChunkIteratorQueue(
number_of_jobs=number_of_jobs,
max_threads_per_process=max_threads_per_process,
multiprocessing_context=multiprocessing_context,
)

if not isinstance(src_io, ZarrIO) and write_args.get('link_data', True):
raise UnsupportedOperation("Cannot export from non-Zarr backend %s to Zarr with write argument "
Expand Down Expand Up @@ -286,36 +360,53 @@ def get_builder_disk_path(self, **kwargs):
builder_path = os.path.join(basepath, self.__get_path(builder).lstrip("/"))
return builder_path

@docval({'name': 'builder', 'type': GroupBuilder, 'doc': 'the GroupBuilder object representing the NWBFile'},
{'name': 'link_data', 'type': bool,
'doc': 'If not specified otherwise link (True) or copy (False) Zarr Datasets', 'default': True},
{'name': 'exhaust_dci', 'type': bool,
'doc': 'exhaust DataChunkIterators one at a time. If False, add ' +
'them to the internal queue self.__dci_queue and exhaust them concurrently at the end',
'default': True},
{'name': 'export_source', 'type': str,
'doc': 'The source of the builders when exporting', 'default': None})
@docval(
{'name': 'builder', 'type': GroupBuilder, 'doc': 'the GroupBuilder object representing the NWBFile'},
{
'name': 'link_data',
'type': bool,
'doc': 'If not specified otherwise link (True) or copy (False) Zarr Datasets',
'default': True
},
{
'name': 'exhaust_dci',
'type': bool,
'doc': (
'Exhaust DataChunkIterators one at a time. If False, add '
'them to the internal queue self.__dci_queue and exhaust them concurrently at the end'
),
'default': True,
},
{
'name': 'export_source',
'type': str,
'doc': 'The source of the builders when exporting',
'default': None,
},
)
def write_builder(self, **kwargs):
"""Write a builder to disk"""
f_builder, link_data, exhaust_dci, export_source = getargs('builder',
'link_data',
'exhaust_dci',
'export_source',
kwargs)
"""Write a builder to disk."""
f_builder, link_data, exhaust_dci, export_source = getargs(
'builder', 'link_data', 'exhaust_dci', 'export_source', kwargs
)
for name, gbldr in f_builder.groups.items():
self.write_group(parent=self.__file,
builder=gbldr,
link_data=link_data,
exhaust_dci=exhaust_dci,
export_source=export_source)
self.write_group(
parent=self.__file,
builder=gbldr,
link_data=link_data,
exhaust_dci=exhaust_dci,
export_source=export_source,
)
for name, dbldr in f_builder.datasets.items():
self.write_dataset(parent=self.__file,
builder=dbldr,
link_data=link_data,
exhaust_dci=exhaust_dci,
export_source=export_source)
self.write_dataset(
parent=self.__file,
builder=dbldr,
link_data=link_data,
exhaust_dci=exhaust_dci,
export_source=export_source,
)
self.write_attributes(self.__file, f_builder.attributes) # the same as set_attributes in HDMF
self.__dci_queue.exhaust_queue() # Write all DataChunkIterators that have been queued
self.__dci_queue.exhaust_queue() # Write any remaining DataChunkIterators that have been queued
self._written_builders.set_written(f_builder)
self.logger.debug("Done writing %s '%s' to path '%s'" %
(f_builder.__class__.__qualname__, f_builder.name, self.source))
Expand All @@ -333,12 +424,10 @@ def write_builder(self, **kwargs):
returns='the Group that was created', rtype='Group')
def write_group(self, **kwargs):
"""Write a GroupBuider to file"""
parent, builder, link_data, exhaust_dci, export_source = getargs('parent',
'builder',
'link_data',
'exhaust_dci',
'export_source',
kwargs)
parent, builder, link_data, exhaust_dci, export_source = getargs(
'parent', 'builder', 'link_data', 'exhaust_dci', 'export_source', kwargs
)

if self.get_written(builder):
group = parent[builder.name]
else:
Expand All @@ -347,19 +436,23 @@ def write_group(self, **kwargs):
subgroups = builder.groups
if subgroups:
for subgroup_name, sub_builder in subgroups.items():
self.write_group(parent=group,
builder=sub_builder,
link_data=link_data,
exhaust_dci=exhaust_dci)
self.write_group(
parent=group,
builder=sub_builder,
link_data=link_data,
exhaust_dci=exhaust_dci,
)

datasets = builder.datasets
if datasets:
for dset_name, sub_builder in datasets.items():
self.write_dataset(parent=group,
builder=sub_builder,
link_data=link_data,
exhaust_dci=exhaust_dci,
export_source=export_source)
self.write_dataset(
parent=group,
builder=sub_builder,
link_data=link_data,
exhaust_dci=exhaust_dci,
export_source=export_source,
)

# write all links (haven implemented)
links = builder.links
Expand All @@ -379,10 +472,9 @@ def write_group(self, **kwargs):
{'name': 'export_source', 'type': str,
'doc': 'The source of the builders when exporting', 'default': None})
def write_attributes(self, **kwargs):
"""
Set (i.e., write) the attributes on a given Zarr Group or Array
"""
"""Set (i.e., write) the attributes on a given Zarr Group or Array."""
obj, attributes, export_source = getargs('obj', 'attributes', 'export_source', kwargs)

for key, value in attributes.items():
# Case 1: list, set, tuple type attributes
if isinstance(value, (set, list, tuple)) or (isinstance(value, np.ndarray) and np.ndim(value) != 0):
Expand Down Expand Up @@ -723,13 +815,15 @@ def __setup_chunked_dataset__(cls, parent, name, data, options=None):
'doc': 'The source of the builders when exporting', 'default': None},
returns='the Zarr array that was created', rtype=Array)
def write_dataset(self, **kwargs): # noqa: C901
parent, builder, link_data, exhaust_dci, export_source = getargs('parent',
'builder',
'link_data',
'exhaust_dci',
'export_source',
kwargs)
parent, builder, link_data, exhaust_dci, export_source = getargs(
'parent', 'builder', 'link_data', 'exhaust_dci', 'export_source', kwargs
)

force_data = getargs('force_data', kwargs)

if exhaust_dci and self.__dci_queue is None:
self.__dci_queue = ZarrIODataChunkIteratorQueue()

if self.get_written(builder):
return None
name = builder.name
Expand Down
Loading

0 comments on commit 9f6c386

Please sign in to comment.