Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallel zarr with Queue instance attributes #118

Merged
merged 33 commits into from
Oct 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
5c12e6c
adding WIP
CodyCBakerPhD Jul 29, 2023
635b3f1
working!
CodyCBakerPhD Jul 29, 2023
cde517e
move threadpool to zarr
CodyCBakerPhD Aug 4, 2023
e025bda
cleanup; propagating remaining args
CodyCBakerPhD Aug 8, 2023
a88e03b
Merge branch 'dev' into add_parallel_zarr
CodyCBakerPhD Aug 22, 2023
b239e10
first attempt at a test
CodyCBakerPhD Aug 22, 2023
d90b3e7
make pytest from notebook cell
CodyCBakerPhD Aug 22, 2023
56a9e7c
add notes for other tests
CodyCBakerPhD Aug 22, 2023
ff1868f
Merge branch 'dev' into add_parallel_zarr
CodyCBakerPhD Aug 22, 2023
9ff7a17
more tests; more debugs
CodyCBakerPhD Aug 23, 2023
2f37253
more tests; more debugs
CodyCBakerPhD Aug 23, 2023
35242d0
debug roundtrip; some flake8
CodyCBakerPhD Aug 30, 2023
dff285a
add optional requirements and some test integration for that
CodyCBakerPhD Aug 30, 2023
6e6a6ed
readthedocs optional requirements
CodyCBakerPhD Aug 30, 2023
1e3e8f1
update coverage CI with optional requirements
CodyCBakerPhD Aug 30, 2023
141e2fe
update external links CI with optional requirements
CodyCBakerPhD Aug 30, 2023
15cfa8d
modular scope on parallel helpers
CodyCBakerPhD Aug 30, 2023
516ccd8
Merge branch 'add_parallel_zarr' of https://github.com/catalystneuro/…
CodyCBakerPhD Aug 30, 2023
7d807ec
make arguments attributes of Queue class; add test for propagation
CodyCBakerPhD Aug 31, 2023
6945cc2
add changelog
CodyCBakerPhD Aug 31, 2023
f172ceb
ryans suggestions from other branch
CodyCBakerPhD Aug 31, 2023
cca279d
fix all flake8
CodyCBakerPhD Aug 31, 2023
ebc7805
fix typo in threadpool install
CodyCBakerPhD Aug 31, 2023
1c04acc
Update .readthedocs.yaml
rly Aug 31, 2023
f0b48a5
Update requirements-min.txt
rly Aug 31, 2023
3f035b0
Update setup.py
rly Aug 31, 2023
d283963
Update tox.ini
rly Aug 31, 2023
17a53cc
Update backend.py
rly Aug 31, 2023
4699a29
Update backend.py
rly Aug 31, 2023
aefdd2a
fix unit run of write_dataset
CodyCBakerPhD Aug 31, 2023
04919f8
Merge branch 'dev' into parallel_zarr_as_queue_attributes
CodyCBakerPhD Sep 29, 2023
261e095
restore 3.8 compatible nested with statements
CodyCBakerPhD Sep 30, 2023
9d84044
Revert changes to requirements.txt
rly Sep 30, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/check_external_links.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ jobs:
- name: Install Sphinx dependencies and package
run: |
python -m pip install --upgrade pip
python -m pip install -r requirements-doc.txt -r requirements.txt
python -m pip install -r requirements-doc.txt -r requirements.txt -r requirements-opt.txt
python -m pip install .
- name: Check Sphinx external links
run: sphinx-build -b linkcheck ./docs/source ./test_build
2 changes: 1 addition & 1 deletion .github/workflows/run_coverage.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ jobs:
- name: Install dependencies
run: |
python -m pip install --upgrade pip
python -m pip install -r requirements-dev.txt -r requirements.txt
python -m pip install -r requirements-dev.txt -r requirements.txt -r requirements-opt.txt

- name: Install package
run: |
Expand Down
1 change: 1 addition & 0 deletions .readthedocs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ python:
install:
- requirements: requirements-doc.txt
- requirements: requirements.txt
- requirements: requirements-opt.txt
- path: . # path to the package relative to the root

# Optionally include all submodules
Expand Down
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
* Fixed error in deploy workflow. @mavaylon1 [#109](https://github.com/hdmf-dev/hdmf-zarr/pull/109)
* Fixed build error for ReadtheDocs by degrading numpy for python 3.7 support. @mavaylon1 [#115](https://github.com/hdmf-dev/hdmf-zarr/pull/115)

### New Features
* Added parallel write support for the ``ZarrIO``. @CodyCBakerPhD [#118](https://github.com/hdmf-dev/hdmf-zarr/pull/118)


## 0.3.0 (July 21, 2023)

Expand Down
2 changes: 1 addition & 1 deletion MANIFEST.in
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
include LICENSE.txt versioneer.py src/hdmf_zarr/_version.py src/hdmf_zarr/_due.py
include requirements.txt requirements-dev.txt requirements-doc.txt
include requirements.txt requirements-dev.txt requirements-doc.txt requirements-opt.txt
include test.py tox.ini
graft tests
1 change: 1 addition & 0 deletions requirements-min.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ numcodecs==0.9.1
pynwb==2.5.0
setuptools
importlib_resources;python_version<'3.9' # Remove when python 3.9 becomes the new minimum
threadpoolctl==3.1.0
1 change: 1 addition & 0 deletions requirements-opt.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
tqdm==4.65.0
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@
hdmf==3.9.0
zarr==2.11.0
pynwb==2.5.0
numpy==1.24
numpy==1.24.0
numcodecs==0.11.0
threadpoolctl==3.2.0
2 changes: 2 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
'numcodecs==0.11.0',
'pynwb>=2.5.0',
'setuptools',
'threadpoolctl>=3.1.0',
]

print(reqs)
Expand All @@ -40,6 +41,7 @@
'url': 'https://github.com/hdmf-dev/hdmf-zarr',
'license': "BSD",
'install_requires': reqs,
'extras_require': {"tqdm": ["tqdm>=4.41.0"]},
'packages': pkgs,
'package_dir': {'': 'src'},
'package_data': {},
Expand Down
220 changes: 157 additions & 63 deletions src/hdmf_zarr/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ def __init__(self, **kwargs):
self.__file = None
self.__built = dict()
self._written_builders = WriteStatusTracker() # track which builders were written (or read) by this IO object
self.__dci_queue = ZarrIODataChunkIteratorQueue() # a queue of DataChunkIterators that need to be exhausted
self.__dci_queue = None # Will be initialized on call to io.write
# Codec class to be used. Alternates, e.g., =numcodecs.JSON
self.__codec_cls = numcodecs.pickles.Pickle if object_codec_class is None else object_codec_class
source_path = self.__path
Expand Down Expand Up @@ -188,17 +188,54 @@ def load_namespaces(cls, namespace_catalog, path, namespaces=None):
reader = ZarrSpecReader(ns_group)
namespace_catalog.load_namespaces('namespace', reader=reader)

@docval({'name': 'container', 'type': Container, 'doc': 'the Container object to write'},
{'name': 'cache_spec', 'type': bool, 'doc': 'cache specification to file', 'default': True},
{'name': 'link_data', 'type': bool,
'doc': 'If not specified otherwise link (True) or copy (False) Datasets', 'default': True},
{'name': 'exhaust_dci', 'type': bool,
'doc': 'exhaust DataChunkIterators one at a time. If False, add ' +
'them to the internal queue self.__dci_queue and exhaust them concurrently at the end',
'default': True},)
@docval(
{'name': 'container', 'type': Container, 'doc': 'the Container object to write'},
{'name': 'cache_spec', 'type': bool, 'doc': 'cache specification to file', 'default': True},
{'name': 'link_data', 'type': bool,
'doc': 'If not specified otherwise link (True) or copy (False) Datasets', 'default': True},
{'name': 'exhaust_dci', 'type': bool,
'doc': 'exhaust DataChunkIterators one at a time. If False, add ' +
'them to the internal queue self.__dci_queue and exhaust them concurrently at the end',
'default': True},
{
"name": "number_of_jobs",
"type": int,
"doc": (
"Number of jobs to use in parallel during write "
"(only works with GenericDataChunkIterator-wrapped datasets)."
),
"default": 1,
},
{
"name": "max_threads_per_process",
"type": int,
"doc": (
"Limits the number of threads used by each process. The default is None (no limits)."
),
"default": None,
},
{
"name": "multiprocessing_context",
"type": str,
"doc": (
"Context for multiprocessing. It can be None (default), 'fork' or 'spawn'. "
"Note that 'fork' is only available on UNIX systems (not Windows)."
),
"default": None,
},
)
def write(self, **kwargs):
"""Overwrite the write method to add support for caching the specification"""
cache_spec = popargs('cache_spec', kwargs)
"""Overwrite the write method to add support for caching the specification and parallelization."""
cache_spec, number_of_jobs, max_threads_per_process, multiprocessing_context = popargs(
"cache_spec", "number_of_jobs", "max_threads_per_process", "multiprocessing_context", kwargs
)

self.__dci_queue = ZarrIODataChunkIteratorQueue(
number_of_jobs=number_of_jobs,
max_threads_per_process=max_threads_per_process,
multiprocessing_context=multiprocessing_context,
oruebel marked this conversation as resolved.
Show resolved Hide resolved
)

super(ZarrIO, self).write(**kwargs)
if cache_spec:
self.__cache_spec()
Expand All @@ -225,8 +262,36 @@ def __cache_spec(self):
writer = ZarrSpecWriter(ns_group)
ns_builder.export('namespace', writer=writer)

@docval(*get_docval(HDMFIO.export),
{'name': 'cache_spec', 'type': bool, 'doc': 'whether to cache the specification to file', 'default': True})
@docval(
*get_docval(HDMFIO.export),
{'name': 'cache_spec', 'type': bool, 'doc': 'whether to cache the specification to file', 'default': True},
{
"name": "number_of_jobs",
"type": int,
"doc": (
"Number of jobs to use in parallel during write "
"(only works with GenericDataChunkIterator-wrapped datasets)."
),
"default": 1,
},
{
"name": "max_threads_per_process",
"type": int,
"doc": (
"Limits the number of threads used by each process. The default is None (no limits)."
),
"default": None,
},
{
"name": "multiprocessing_context",
"type": str,
"doc": (
"Context for multiprocessing. It can be None (default), 'fork' or 'spawn'. "
"Note that 'fork' is only available on UNIX systems (not Windows)."
),
"default": None,
},
)
def export(self, **kwargs):
"""Export data read from a file from any backend to Zarr.
See :py:meth:`hdmf.backends.io.HDMFIO.export` for more details.
Expand All @@ -237,6 +302,15 @@ def export(self, **kwargs):

src_io = getargs('src_io', kwargs)
write_args, cache_spec = popargs('write_args', 'cache_spec', kwargs)
number_of_jobs, max_threads_per_process, multiprocessing_context = popargs(
"number_of_jobs", "max_threads_per_process", "multiprocessing_context", kwargs
)

self.__dci_queue = ZarrIODataChunkIteratorQueue(
number_of_jobs=number_of_jobs,
max_threads_per_process=max_threads_per_process,
multiprocessing_context=multiprocessing_context,
)

if not isinstance(src_io, ZarrIO) and write_args.get('link_data', True):
raise UnsupportedOperation("Cannot export from non-Zarr backend %s to Zarr with write argument "
Expand Down Expand Up @@ -286,36 +360,53 @@ def get_builder_disk_path(self, **kwargs):
builder_path = os.path.join(basepath, self.__get_path(builder).lstrip("/"))
return builder_path

@docval({'name': 'builder', 'type': GroupBuilder, 'doc': 'the GroupBuilder object representing the NWBFile'},
{'name': 'link_data', 'type': bool,
'doc': 'If not specified otherwise link (True) or copy (False) Zarr Datasets', 'default': True},
{'name': 'exhaust_dci', 'type': bool,
'doc': 'exhaust DataChunkIterators one at a time. If False, add ' +
'them to the internal queue self.__dci_queue and exhaust them concurrently at the end',
'default': True},
{'name': 'export_source', 'type': str,
'doc': 'The source of the builders when exporting', 'default': None})
@docval(
{'name': 'builder', 'type': GroupBuilder, 'doc': 'the GroupBuilder object representing the NWBFile'},
{
'name': 'link_data',
'type': bool,
'doc': 'If not specified otherwise link (True) or copy (False) Zarr Datasets',
'default': True
},
{
'name': 'exhaust_dci',
'type': bool,
'doc': (
'Exhaust DataChunkIterators one at a time. If False, add '
'them to the internal queue self.__dci_queue and exhaust them concurrently at the end'
),
'default': True,
},
{
'name': 'export_source',
'type': str,
'doc': 'The source of the builders when exporting',
'default': None,
},
)
def write_builder(self, **kwargs):
"""Write a builder to disk"""
f_builder, link_data, exhaust_dci, export_source = getargs('builder',
'link_data',
'exhaust_dci',
'export_source',
kwargs)
"""Write a builder to disk."""
f_builder, link_data, exhaust_dci, export_source = getargs(
'builder', 'link_data', 'exhaust_dci', 'export_source', kwargs
)
for name, gbldr in f_builder.groups.items():
self.write_group(parent=self.__file,
builder=gbldr,
link_data=link_data,
exhaust_dci=exhaust_dci,
export_source=export_source)
self.write_group(
parent=self.__file,
builder=gbldr,
link_data=link_data,
exhaust_dci=exhaust_dci,
export_source=export_source,
)
for name, dbldr in f_builder.datasets.items():
self.write_dataset(parent=self.__file,
builder=dbldr,
link_data=link_data,
exhaust_dci=exhaust_dci,
export_source=export_source)
self.write_dataset(
parent=self.__file,
builder=dbldr,
link_data=link_data,
exhaust_dci=exhaust_dci,
export_source=export_source,
)
self.write_attributes(self.__file, f_builder.attributes) # the same as set_attributes in HDMF
self.__dci_queue.exhaust_queue() # Write all DataChunkIterators that have been queued
self.__dci_queue.exhaust_queue() # Write any remaining DataChunkIterators that have been queued
self._written_builders.set_written(f_builder)
self.logger.debug("Done writing %s '%s' to path '%s'" %
(f_builder.__class__.__qualname__, f_builder.name, self.source))
Expand All @@ -333,12 +424,10 @@ def write_builder(self, **kwargs):
returns='the Group that was created', rtype='Group')
def write_group(self, **kwargs):
"""Write a GroupBuider to file"""
parent, builder, link_data, exhaust_dci, export_source = getargs('parent',
'builder',
'link_data',
'exhaust_dci',
'export_source',
kwargs)
parent, builder, link_data, exhaust_dci, export_source = getargs(
'parent', 'builder', 'link_data', 'exhaust_dci', 'export_source', kwargs
)

if self.get_written(builder):
group = parent[builder.name]
else:
Expand All @@ -347,19 +436,23 @@ def write_group(self, **kwargs):
subgroups = builder.groups
if subgroups:
for subgroup_name, sub_builder in subgroups.items():
self.write_group(parent=group,
builder=sub_builder,
link_data=link_data,
exhaust_dci=exhaust_dci)
self.write_group(
parent=group,
builder=sub_builder,
link_data=link_data,
exhaust_dci=exhaust_dci,
)

datasets = builder.datasets
if datasets:
for dset_name, sub_builder in datasets.items():
self.write_dataset(parent=group,
builder=sub_builder,
link_data=link_data,
exhaust_dci=exhaust_dci,
export_source=export_source)
self.write_dataset(
parent=group,
builder=sub_builder,
link_data=link_data,
exhaust_dci=exhaust_dci,
export_source=export_source,
)

# write all links (haven implemented)
links = builder.links
Expand All @@ -379,10 +472,9 @@ def write_group(self, **kwargs):
{'name': 'export_source', 'type': str,
'doc': 'The source of the builders when exporting', 'default': None})
def write_attributes(self, **kwargs):
"""
Set (i.e., write) the attributes on a given Zarr Group or Array
"""
"""Set (i.e., write) the attributes on a given Zarr Group or Array."""
obj, attributes, export_source = getargs('obj', 'attributes', 'export_source', kwargs)

for key, value in attributes.items():
# Case 1: list, set, tuple type attributes
if isinstance(value, (set, list, tuple)) or (isinstance(value, np.ndarray) and np.ndim(value) != 0):
Expand Down Expand Up @@ -723,13 +815,15 @@ def __setup_chunked_dataset__(cls, parent, name, data, options=None):
'doc': 'The source of the builders when exporting', 'default': None},
returns='the Zarr array that was created', rtype=Array)
def write_dataset(self, **kwargs): # noqa: C901
parent, builder, link_data, exhaust_dci, export_source = getargs('parent',
'builder',
'link_data',
'exhaust_dci',
'export_source',
kwargs)
parent, builder, link_data, exhaust_dci, export_source = getargs(
'parent', 'builder', 'link_data', 'exhaust_dci', 'export_source', kwargs
)

force_data = getargs('force_data', kwargs)

if exhaust_dci and self.__dci_queue is None:
self.__dci_queue = ZarrIODataChunkIteratorQueue()

if self.get_written(builder):
return None
name = builder.name
Expand Down
Loading
Loading