Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Zarr benchmarks #51

Merged
merged 16 commits into from
Apr 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions environments/nwb_benchmarks.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,5 @@ dependencies:
- lindi
- hdmf @ git+https://github.com/hdmf-dev/hdmf.git@9b3282aa4999d2922f003f1b79ec7458ea3ddc5e # required until PyNWB propagates changes
#- hdmf @ git+https://github.com/hdmf-dev/hdmf.git@expose_aws_region # required until region fix is released
- hdmf-zarr
- -e ..
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
read_hdf5_remfile,
read_hdf5_remfile_with_cache,
read_hdf5_ros3,
read_zarr,
read_zarr_nwbfile,
)

parameter_cases = dict(
Expand All @@ -31,6 +33,16 @@
ClassicRos3TestCase=dict(s3_url="https://dandiarchive.s3.amazonaws.com/ros3test.nwb"),
)

zarr_parameter_cases = dict(
AIBSTestCase=dict(
s3_url=(
"s3://aind-open-data/ecephys_625749_2022-08-03_15-15-06_nwb_2023-05-16_16-34-55/"
"ecephys_625749_2022-08-03_15-15-06_nwb/"
"ecephys_625749_2022-08-03_15-15-06_experiment1_recording1.nwb.zarr/"
),
),
)


class FsspecNoCacheDirectFileReadBenchmark(BaseBenchmark):
parameter_cases = parameter_cases
Expand Down Expand Up @@ -138,3 +150,39 @@ def track_network_activity_during_read(self, s3_url: str):
self.nwbfile, self.io, retries = read_hdf5_nwbfile_ros3(s3_url=s3_url)
network_tracker.asv_network_statistics.update(retries=retries)
return network_tracker.asv_network_statistics


class ZarrDirectFileReadBenchmark(BaseBenchmark):
parameter_cases = zarr_parameter_cases

def track_network_activity_during_read(self, s3_url: str):
with network_activity_tracker(tshark_path=TSHARK_PATH) as network_tracker:
self.zarr_file = read_zarr(s3_url=s3_url, open_without_consolidated_metadata=False)
return network_tracker.asv_network_statistics


class ZarrForceNoConsolidatedDirectFileReadBenchmark(BaseBenchmark):
parameter_cases = zarr_parameter_cases

def track_network_activity_during_read(self, s3_url: str):
with network_activity_tracker(tshark_path=TSHARK_PATH) as network_tracker:
self.zarr_file = read_zarr(s3_url=s3_url, open_without_consolidated_metadata=True)
return network_tracker.asv_network_statistics


class ZarrNWBFileReadBenchmark(BaseBenchmark):
parameter_cases = zarr_parameter_cases

def track_network_activity_during_read(self, s3_url: str):
with network_activity_tracker(tshark_path=TSHARK_PATH) as network_tracker:
self.nwbfile, self.io = read_zarr_nwbfile(s3_url=s3_url, mode="r")
return network_tracker.asv_network_statistics


class ZarrForceNoConsolidatedNWBFileReadBenchmark(BaseBenchmark):
parameter_cases = zarr_parameter_cases

def track_network_activity_during_read(self, s3_url: str):
with network_activity_tracker(tshark_path=TSHARK_PATH) as network_tracker:
self.nwbfile, self.io = read_zarr_nwbfile(s3_url=s3_url, mode="r-")
return network_tracker.asv_network_statistics
42 changes: 42 additions & 0 deletions src/nwb_benchmarks/benchmarks/network_tracking_remote_slicing.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
read_hdf5_nwbfile_remfile,
read_hdf5_nwbfile_remfile_with_cache,
read_hdf5_nwbfile_ros3,
read_zarr_nwbfile,
robust_ros3_read,
)

Expand All @@ -28,6 +29,19 @@
)


zarr_parameter_cases = dict(
AIBSTestCase=dict(
s3_url=(
"s3://aind-open-data/ecephys_625749_2022-08-03_15-15-06_nwb_2023-05-16_16-34-55/"
"ecephys_625749_2022-08-03_15-15-06_nwb/"
"ecephys_625749_2022-08-03_15-15-06_experiment1_recording1.nwb.zarr/"
),
object_name="ElectricalSeriesProbe A-LFP",
slice_range=(slice(0, 30_000), slice(0, 384)),
)
)


class FsspecNoCacheContinuousSliceBenchmark(BaseBenchmark):
parameter_cases = parameter_cases

Expand Down Expand Up @@ -107,3 +121,31 @@ def track_network_activity_during_slice(self, s3_url: str, object_name: str, sli
self._temp, retries = robust_ros3_read(command=self.data_to_slice.__getitem__, command_args=(slice_range,))
network_tracker.asv_network_statistics.update(retries=retries)
return network_tracker.asv_network_statistics


class ZarrContinuousSliceBenchmark(BaseBenchmark):
parameter_cases = zarr_parameter_cases

def setup(self, s3_url: str, object_name: str, slice_range: Tuple[slice]):
self.nwbfile, self.io = read_zarr_nwbfile(s3_url=s3_url, mode="r")
self.neurodata_object = get_object_by_name(nwbfile=self.nwbfile, object_name=object_name)
self.data_to_slice = self.neurodata_object.data

def track_network_activity_during_slice(self, s3_url: str, object_name: str, slice_range: Tuple[slice]):
with network_activity_tracker(tshark_path=TSHARK_PATH) as network_tracker:
self._temp = self.data_to_slice[slice_range]
return network_tracker.asv_network_statistics


class ZarrForceNoConsolidatedContinuousSliceBenchmark(BaseBenchmark):
parameter_cases = zarr_parameter_cases

def setup(self, s3_url: str, object_name: str, slice_range: Tuple[slice]):
self.nwbfile, self.io = read_zarr_nwbfile(s3_url=s3_url, mode="r-")
self.neurodata_object = get_object_by_name(nwbfile=self.nwbfile, object_name=object_name)
self.data_to_slice = self.neurodata_object.data

def track_network_activity_during_slice(self, s3_url: str, object_name: str, slice_range: Tuple[slice]):
with network_activity_tracker(tshark_path=TSHARK_PATH) as network_tracker:
self._temp = self.data_to_slice[slice_range]
return network_tracker.asv_network_statistics
49 changes: 49 additions & 0 deletions src/nwb_benchmarks/benchmarks/time_remote_file_reading.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
read_hdf5_remfile,
read_hdf5_remfile_with_cache,
read_hdf5_ros3,
read_zarr,
read_zarr_nwbfile,
)

parameter_cases = dict(
Expand All @@ -30,6 +32,17 @@
)


zarr_parameter_cases = dict(
AIBSTestCase=dict(
s3_url=(
"s3://aind-open-data/ecephys_625749_2022-08-03_15-15-06_nwb_2023-05-16_16-34-55/"
"ecephys_625749_2022-08-03_15-15-06_nwb/"
"ecephys_625749_2022-08-03_15-15-06_experiment1_recording1.nwb.zarr/"
),
),
)


class DirectFileReadBenchmark(BaseBenchmark):
"""
Time the direct read of the HDF5-backend files with `h5py` using each streaming method.
Expand Down Expand Up @@ -98,3 +111,39 @@ def time_read_hdf5_nwbfile_remfile_with_cache(self, s3_url: str):

def time_read_hdf5_nwbfile_ros3(self, s3_url: str):
self.nwbfile, self.io, _ = read_hdf5_nwbfile_ros3(s3_url=s3_url, retry=False)


class DirectZarrFileReadBenchmark(BaseBenchmark):
"""
Time the read of the Zarr-backend files with `pynwb` using each streaming method.

Note: in all cases, store the in-memory objects to avoid timing garbage collection steps.
"""

rounds = 1
repeat = 3
parameter_cases = zarr_parameter_cases

def time_read_zarr_nwbfile(self, s3_url: str):
self.zarr_file = read_zarr(s3_url=s3_url, open_without_consolidated_metadata=False)

def time_read_zarr_nwbfile_force_no_consolidated(self, s3_url: str):
self.zarr_file = read_zarr(s3_url=s3_url, open_without_consolidated_metadata=True)


class NWBZarrFileReadBenchmark(BaseBenchmark):
"""
Time the read of the Zarr-backend files with `pynwb` using each streaming method.

Note: in all cases, store the in-memory objects to avoid timing garbage collection steps.
"""

rounds = 1
repeat = 3
parameter_cases = zarr_parameter_cases

def time_read_zarr_nwbfile(self, s3_url: str):
self.nwbfile, self.io = read_zarr_nwbfile(s3_url=s3_url, mode="r")

def time_read_zarr_nwbfile_force_no_consolidated(self, s3_url: str):
self.nwbfile, self.io = read_zarr_nwbfile(s3_url=s3_url, mode="r-")
45 changes: 42 additions & 3 deletions src/nwb_benchmarks/benchmarks/time_remote_slicing.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,12 @@
BaseBenchmark,
get_object_by_name,
get_s3_url,
read_hdf5_fsspec_with_cache,
read_hdf5_nwbfile_fsspec_no_cache,
read_hdf5_nwbfile_fsspec_with_cache,
read_hdf5_nwbfile_remfile,
read_hdf5_nwbfile_remfile_with_cache,
read_hdf5_nwbfile_ros3,
read_hdf5_remfile_with_cache,
read_zarr_nwbfile,
)

parameter_cases = dict(
Expand All @@ -27,6 +26,18 @@
)
)

zarr_parameter_cases = dict(
AIBSTestCase=dict(
s3_url=(
"s3://aind-open-data/ecephys_625749_2022-08-03_15-15-06_nwb_2023-05-16_16-34-55/"
"ecephys_625749_2022-08-03_15-15-06_nwb/"
"ecephys_625749_2022-08-03_15-15-06_experiment1_recording1.nwb.zarr/"
),
object_name="ElectricalSeriesProbe A-LFP",
slice_range=(slice(0, 30_000), slice(0, 384)),
)
)


class FsspecNoCacheContinuousSliceBenchmark(BaseBenchmark):
rounds = 1
Expand Down Expand Up @@ -110,4 +121,32 @@ def setup(self, s3_url: str, object_name: str, slice_range: Tuple[slice]):

def time_slice(self, s3_url: str, object_name: str, slice_range: Tuple[slice]):
"""Note: store as self._temp to avoid tracking garbage collection as well."""
self.data_to_slice[slice_range]
self._temp = self.data_to_slice[slice_range]


class ZarrContinuousSliceBenchmark(BaseBenchmark):
rounds = 1
repeat = 3
parameter_cases = zarr_parameter_cases

def setup(self, s3_url: str, object_name: str, slice_range: Tuple[slice]):
self.nwbfile, self.io = read_zarr_nwbfile(s3_url=s3_url, mode="r")
self.neurodata_object = get_object_by_name(nwbfile=self.nwbfile, object_name=object_name)
self.data_to_slice = self.neurodata_object.data

def track_network_activity_during_slice(self, s3_url: str, object_name: str, slice_range: Tuple[slice]):
self._temp = self.data_to_slice[slice_range]


class ZarrForceNoConsolidatedContinuousSliceBenchmark(BaseBenchmark):
rounds = 1
repeat = 3
parameter_cases = zarr_parameter_cases

def setup(self, s3_url: str, object_name: str, slice_range: Tuple[slice]):
self.nwbfile, self.io = read_zarr_nwbfile(s3_url=s3_url, mode="r-")
self.neurodata_object = get_object_by_name(nwbfile=self.nwbfile, object_name=object_name)
self.data_to_slice = self.neurodata_object.data

def track_network_activity_during_slice(self, s3_url: str, object_name: str, slice_range: Tuple[slice]):
self._temp = self.data_to_slice[slice_range]
6 changes: 4 additions & 2 deletions src/nwb_benchmarks/core/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
"""Exposed imports to the `core` submodule."""

# from ._parametrize import benchmark_parametrize
from ._base_benchmark import BaseBenchmark
from ._capture_connections import CaptureConnections
from ._dandi import get_s3_url
Expand All @@ -19,6 +18,8 @@
read_hdf5_remfile,
read_hdf5_remfile_with_cache,
read_hdf5_ros3,
read_zarr,
read_zarr_nwbfile,
robust_ros3_read,
)

Expand All @@ -41,5 +42,6 @@
"get_s3_url",
"get_object_by_name",
"robust_ros3_read",
"benchmark_parametrize",
"read_zarr",
"read_zarr_nwbfile",
]
23 changes: 14 additions & 9 deletions src/nwb_benchmarks/core/_nwb_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,17 @@ def get_object_by_name(nwbfile: pynwb.NWBFile, object_name: str) -> Any:

This method should only be used in the `setup` method of a benchmark class.
"""
object_names = [neurodata_object.name for neurodata_object in nwbfile.objects.values()]
assert len(object_names) == len(
set(object_names)
), "Some object names in the NWBFile are not unique! Unable to do a lookup by name."
assert object_name in object_names, f"The specified object name ({object_name}) is not in the NWBFile."

return next(
neurodata_object for neurodata_object in nwbfile.objects.values() if neurodata_object.name == object_name
)
# Find all objects that are matching the given name
matching_objects = [
(neurodata_object.name, neurodata_object)
for neurodata_object in nwbfile.objects.values()
if neurodata_object.name == object_name
]
# Raise an error if the object wasn't found
if len(matching_objects) == 0:
raise ValueError(f"The specified object name ({object_name}) is not in the NWBFile.")
# Make sure that the object we are looking for is unique
elif len(matching_objects) > 1:
raise ValueError(f"The specified object name ({object_name}) was found multiple times in the NWBFile.")
# Return the matching object
return matching_objects[0][1]
41 changes: 40 additions & 1 deletion src/nwb_benchmarks/core/_streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@

import fsspec
import h5py
import hdmf_zarr
import pynwb
import remfile
import zarr
from fsspec.asyn import reset_lock
from fsspec.implementations.cached import CachingFileSystem
from fsspec.implementations.http import HTTPFile

# Useful if running in verbose mode
# Useful if running in verbose model
warnings.filterwarnings(action="ignore", message="No cached namespaces found in .*")
warnings.filterwarnings(action="ignore", message="Ignoring cached namespace .*")

Expand Down Expand Up @@ -179,3 +181,40 @@ def read_hdf5_nwbfile_ros3(s3_url: str, retry: bool = True) -> Tuple[pynwb.NWBFi
retries = None
nwbfile = io.read()
return (nwbfile, io, retries)


def read_zarr(s3_url: str, open_without_consolidated_metadata: bool = False) -> zarr.Group:
"""
Open a Zarr file from an S3 URL using the built-in fsspec support in Zarr.

Returns
-------
file : zarr.Group
The zarr.Group object representing the opened file
"""
if open_without_consolidated_metadata:
zarrfile = zarr.open(store=s3_url, mode="r", storage_options=dict(anon=True))
else:
zarrfile = zarr.open_consolidated(store=s3_url, mode="r", storage_options=dict(anon=True))
return zarrfile


def read_zarr_nwbfile(s3_url: str, mode: str) -> Tuple[pynwb.NWBFile, hdmf_zarr.NWBZarrIO]:
"""
Read a Zarr NWB file from an S3 URL using the built-in fsspec support in Zarr.

Note: `r-` indicated reading without consolidated metadata, while `r` indicated reading with consolidated.
`r` should only be used in a benchmark for files that actually have consolidated metadata available,
for files without consolidated metadata, `hdmf_zarr` automatically reads without consolidated
metadata if no consolidated metadata is present.

Returns
-------
NWBFile : pynwb.NWBFile
The remote NWBFile object.
NWBZarrIO : hdmf_zarr.NWBZarrIO
The open IO object used to open the file.
"""
io = hdmf_zarr.NWBZarrIO(s3_url, mode=mode, storage_options=dict(anon=True))
nwbfile = io.read()
return (nwbfile, io)