Skip to content

Commit

Permalink
Merge pull request #43 from NeurodataWithoutBorders/fsspec_remfile_wi…
Browse files Browse the repository at this point in the history
…th_cache

Fsspec remfile with cache
  • Loading branch information
oruebel authored Apr 24, 2024
2 parents e9eeab2 + 3f5d60d commit 3d65167
Show file tree
Hide file tree
Showing 7 changed files with 237 additions and 4 deletions.
1 change: 1 addition & 0 deletions environments/nwb_benchmarks.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ dependencies:
- aiohttp
- remfile
- pyshark
- lindi
- hdmf @ git+https://github.com/hdmf-dev/hdmf.git@9b3282aa4999d2922f003f1b79ec7458ea3ddc5e # required until PyNWB propagates changes
#- hdmf @ git+https://github.com/hdmf-dev/hdmf.git@expose_aws_region # required until region fix is released
- -e ..
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
"""Basic benchmarks for profiling network statistics for streaming access to NWB files and their contents."""

import os

from nwb_benchmarks import TSHARK_PATH
from nwb_benchmarks.core import (
get_s3_url,
network_activity_tracker,
read_hdf5_fsspec_no_cache,
read_hdf5_fsspec_with_cache,
read_hdf5_nwbfile_fsspec_no_cache,
read_hdf5_nwbfile_fsspec_with_cache,
read_hdf5_nwbfile_remfile,
read_hdf5_nwbfile_remfile_with_cache,
read_hdf5_nwbfile_ros3,
read_hdf5_remfile,
read_hdf5_remfile_with_cache,
read_hdf5_ros3,
)

Expand All @@ -35,6 +37,19 @@ def track_network_activity_during_read(self, s3_url: str):
return network_tracker.asv_network_statistics


class FsspecWithCacheDirectFileReadBenchmark:
param_names = param_names
params = params

def teardown(self, s3_url: str):
self.tmpdir.cleanup()

def track_network_activity_during_read(self, s3_url: str):
with network_activity_tracker(tshark_path=TSHARK_PATH) as network_tracker:
self.file, self.bytestream, self.tmpdir = read_hdf5_fsspec_with_cache(s3_url=s3_url)
return network_tracker.asv_network_statistics


class RemfileDirectFileReadBenchmark:
param_names = param_names
params = params
Expand All @@ -45,6 +60,19 @@ def track_network_activity_during_read(self, s3_url: str):
return network_tracker.asv_network_statistics


class RemfileDirectFileReadBenchmarkWithCache:
param_names = param_names
params = params

def teardown(self, s3_url: str):
self.tmpdir.cleanup()

def track_network_activity_during_read(self, s3_url: str):
with network_activity_tracker(tshark_path=TSHARK_PATH) as network_tracker:
self.file, self.bytestream, self.tmpdir = read_hdf5_remfile_with_cache(s3_url=s3_url)
return network_tracker.asv_network_statistics


class Ros3DirectFileReadBenchmark:
param_names = param_names
params = params
Expand All @@ -66,6 +94,21 @@ def track_network_activity_during_read(self, s3_url: str):
return network_tracker.asv_network_statistics


class FsspecWithCacheNWBFileReadBenchmark:
param_names = param_names
params = params

def teardown(self, s3_url: str):
self.tmpdir.cleanup()

def track_network_activity_during_read(self, s3_url: str):
with network_activity_tracker(tshark_path=TSHARK_PATH) as network_tracker:
self.nwbfile, self.io, self.file, self.bytestream, self.tmpdir = read_hdf5_nwbfile_fsspec_with_cache(
s3_url=s3_url
)
return network_tracker.asv_network_statistics


class RemfileNWBFileReadBenchmark:
param_names = param_names
params = params
Expand All @@ -76,6 +119,21 @@ def track_network_activity_during_read(self, s3_url: str):
return network_tracker.asv_network_statistics


class RemfileNWBFileReadBenchmarkWithCache:
param_names = param_names
params = params

def teardown(self, s3_url: str):
self.tmpdir.cleanup()

def track_network_activity_during_read(self, s3_url: str):
with network_activity_tracker(tshark_path=TSHARK_PATH) as network_tracker:
self.nwbfile, self.io, self.file, self.bytestream, self.tmpdir = read_hdf5_nwbfile_remfile_with_cache(
s3_url=s3_url
)
return network_tracker.asv_network_statistics


class Ros3NWBFileReadBenchmark:
param_names = param_names
params = params
Expand Down
42 changes: 42 additions & 0 deletions src/nwb_benchmarks/benchmarks/network_tracking_remote_slicing.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
get_s3_url,
network_activity_tracker,
read_hdf5_nwbfile_fsspec_no_cache,
read_hdf5_nwbfile_fsspec_with_cache,
read_hdf5_nwbfile_remfile,
read_hdf5_nwbfile_remfile_with_cache,
read_hdf5_nwbfile_ros3,
robust_ros3_read,
)
Expand Down Expand Up @@ -41,6 +43,26 @@ def track_network_activity_during_slice(self, s3_url: str, object_name: str, sli
return network_tracker.asv_network_statistics


class FsspecWithCacheContinuousSliceBenchmark:
param_names = param_names
params = params

def teardown(self, s3_url: str, object_name: str, slice_range: Tuple[slice]):
self.tmpdir.cleanup()

def setup(self, s3_url: str, object_name: str, slice_range: Tuple[slice]):
self.nwbfile, self.io, self.file, self.bytestream, self.tmpdir = read_hdf5_nwbfile_fsspec_with_cache(
s3_url=s3_url
)
self.neurodata_object = get_object_by_name(nwbfile=self.nwbfile, object_name=object_name)
self.data_to_slice = self.neurodata_object.data

def track_network_activity_during_slice(self, s3_url: str, object_name: str, slice_range: Tuple[slice]):
with network_activity_tracker(tshark_path=TSHARK_PATH) as network_tracker:
self._temp = self.data_to_slice[slice_range]
return network_tracker.asv_network_statistics


class RemfileContinuousSliceBenchmark:
param_names = param_names
params = params
Expand All @@ -56,6 +78,26 @@ def track_network_activity_during_slice(self, s3_url: str, object_name: str, sli
return network_tracker.asv_network_statistics


class RemfileContinuousSliceBenchmarkWithCache:
param_names = param_names
params = params

def teardown(self, s3_url: str, object_name: str, slice_range: Tuple[slice]):
self.tmpdir.cleanup()

def setup(self, s3_url: str, object_name: str, slice_range: Tuple[slice]):
self.nwbfile, self.io, self.file, self.bytestream, self.tmpdir = read_hdf5_nwbfile_remfile_with_cache(
s3_url=s3_url
)
self.neurodata_object = get_object_by_name(nwbfile=self.nwbfile, object_name=object_name)
self.data_to_slice = self.neurodata_object.data

def track_network_activity_during_slice(self, s3_url: str, object_name: str, slice_range: Tuple[slice]):
with network_activity_tracker(tshark_path=TSHARK_PATH) as network_tracker:
self._temp = self.data_to_slice[slice_range]
return network_tracker.asv_network_statistics


class Ros3ContinuousSliceBenchmark:
param_names = param_names
params = params
Expand Down
30 changes: 30 additions & 0 deletions src/nwb_benchmarks/benchmarks/time_remote_file_reading.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,14 @@
from nwb_benchmarks.core import (
get_s3_url,
read_hdf5_fsspec_no_cache,
read_hdf5_fsspec_with_cache,
read_hdf5_nwbfile_fsspec_no_cache,
read_hdf5_nwbfile_fsspec_with_cache,
read_hdf5_nwbfile_remfile,
read_hdf5_nwbfile_remfile_with_cache,
read_hdf5_nwbfile_ros3,
read_hdf5_remfile,
read_hdf5_remfile_with_cache,
read_hdf5_ros3,
)

Expand Down Expand Up @@ -35,12 +39,23 @@ class DirectFileReadBenchmark:
param_names = param_names
params = params

def teardown(self, s3_url: str):
# Not all tests in the class are using a temporary dir as cache. Clean up if it does.
if hasattr(self, "tmpdir"):
self.tmpdir.cleanup()

def time_read_hdf5_fsspec_no_cache(self, s3_url: str):
self.file, self.bytestream = read_hdf5_fsspec_no_cache(s3_url=s3_url)

def time_read_hdf5_fsspec_with_cache(self, s3_url: str):
self.file, self.bytestream, self.tmpdir = read_hdf5_fsspec_with_cache(s3_url=s3_url)

def time_read_hdf5_remfile(self, s3_url: str):
self.file, self.bytestream = read_hdf5_remfile(s3_url=s3_url)

def time_read_hdf5_remfile_with_cache(self, s3_url: str):
self.file, self.bytestream, self.tmpdir = read_hdf5_remfile_with_cache(s3_url=s3_url)

def time_read_hdf5_ros3(self, s3_url: str):
self.file, _ = read_hdf5_ros3(s3_url=s3_url, retry=False)

Expand All @@ -57,11 +72,26 @@ class NWBFileReadBenchmark:
param_names = param_names
params = params

def teardown(self, s3_url: str):
# Not all tests in the class are using a temporary dir as cache. Clean up if it does.
if hasattr(self, "tmpdir"):
self.tmpdir.cleanup()

def time_read_hdf5_nwbfile_fsspec_no_cache(self, s3_url: str):
self.nwbfile, self.io, self.file, self.bytestream = read_hdf5_nwbfile_fsspec_no_cache(s3_url=s3_url)

def time_read_hdf5_nwbfile_fsspec_with_cache(self, s3_url: str):
self.nwbfile, self.io, self.file, self.bytestream, self.tmpdir = read_hdf5_nwbfile_fsspec_with_cache(
s3_url=s3_url
)

def time_read_hdf5_nwbfile_remfile(self, s3_url: str):
self.nwbfile, self.io, self.file, self.bytestream = read_hdf5_nwbfile_remfile(s3_url=s3_url)

def time_read_hdf5_nwbfile_remfile_with_cache(self, s3_url: str):
self.nwbfile, self.io, self.file, self.bytestream, self.tmpdir = read_hdf5_nwbfile_remfile_with_cache(
s3_url=s3_url
)

def time_read_hdf5_nwbfile_ros3(self, s3_url: str):
self.nwbfile, self.io, _ = read_hdf5_nwbfile_ros3(s3_url=s3_url, retry=False)
50 changes: 48 additions & 2 deletions src/nwb_benchmarks/benchmarks/time_remote_slicing.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,13 @@
from nwb_benchmarks.core import (
get_object_by_name,
get_s3_url,
read_hdf5_fsspec_with_cache,
read_hdf5_nwbfile_fsspec_no_cache,
read_hdf5_nwbfile_fsspec_with_cache,
read_hdf5_nwbfile_remfile,
read_hdf5_nwbfile_remfile_with_cache,
read_hdf5_nwbfile_ros3,
read_hdf5_remfile_with_cache,
)

# TODO: add the others
Expand Down Expand Up @@ -42,6 +46,27 @@ def time_slice(self, s3_url: str, object_name: str, slice_range: Tuple[slice]):
self._temp = self.data_to_slice[slice_range]


class FsspecWithCacheContinuousSliceBenchmark:
rounds = 1
repeat = 3
param_names = param_names
params = params

def teardown(self, s3_url: str, object_name: str, slice_range: Tuple[slice]):
self.tmpdir.cleanup()

def setup(self, s3_url: str, object_name: str, slice_range: Tuple[slice]):
self.nwbfile, self.io, self.file, self.bytestream, self.tmpdir = read_hdf5_nwbfile_fsspec_with_cache(
s3_url=s3_url
)
self.neurodata_object = get_object_by_name(nwbfile=self.nwbfile, object_name=object_name)
self.data_to_slice = self.neurodata_object.data

def time_slice(self, s3_url: str, object_name: str, slice_range: Tuple[slice]):
"""Note: store as self._temp to avoid tracking garbage collection as well."""
self._temp = self.data_to_slice[slice_range]


class RemfileContinuousSliceBenchmark:
rounds = 1
repeat = 3
Expand All @@ -50,7 +75,28 @@ class RemfileContinuousSliceBenchmark:

def setup(self, s3_url: str, object_name: str, slice_range: Tuple[slice]):
self.nwbfile, self.io, self.file, self.bytestream = read_hdf5_nwbfile_remfile(s3_url=s3_url)
self.neurodata_object = get_object_by_name(nwbfile=self.nwbfile, object_name="ElectricalSeriesAp")
self.neurodata_object = get_object_by_name(nwbfile=self.nwbfile, object_name=object_name)
self.data_to_slice = self.neurodata_object.data

def time_slice(self, s3_url: str, object_name: str, slice_range: Tuple[slice]):
"""Note: store as self._temp to avoid tracking garbage collection as well."""
self._temp = self.data_to_slice[slice_range]


class RemfileContinuousSliceBenchmarkWithCache:
rounds = 1
repeat = 3
param_names = param_names
params = params

def teardown(self, s3_url: str, object_name: str, slice_range: Tuple[slice]):
self.tmpdir.cleanup()

def setup(self, s3_url: str, object_name: str, slice_range: Tuple[slice]):
self.nwbfile, self.io, self.file, self.bytestream, self.tmpdir = read_hdf5_nwbfile_remfile_with_cache(
s3_url=s3_url
)
self.neurodata_object = get_object_by_name(nwbfile=self.nwbfile, object_name=object_name)
self.data_to_slice = self.neurodata_object.data

def time_slice(self, s3_url: str, object_name: str, slice_range: Tuple[slice]):
Expand All @@ -66,7 +112,7 @@ class Ros3ContinuousSliceBenchmark:

def setup(self, s3_url: str, object_name: str, slice_range: Tuple[slice]):
self.nwbfile, self.io, _ = read_hdf5_nwbfile_ros3(s3_url=s3_url)
self.neurodata_object = get_object_by_name(nwbfile=self.nwbfile, object_name="ElectricalSeriesAp")
self.neurodata_object = get_object_by_name(nwbfile=self.nwbfile, object_name=object_name)
self.data_to_slice = self.neurodata_object.data

def time_slice(self, s3_url: str, object_name: str, slice_range: Tuple[slice]):
Expand Down
8 changes: 8 additions & 0 deletions src/nwb_benchmarks/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,14 @@
from ._nwb_helpers import get_object_by_name
from ._streaming import (
read_hdf5_fsspec_no_cache,
read_hdf5_fsspec_with_cache,
read_hdf5_nwbfile_fsspec_no_cache,
read_hdf5_nwbfile_fsspec_with_cache,
read_hdf5_nwbfile_remfile,
read_hdf5_nwbfile_remfile_with_cache,
read_hdf5_nwbfile_ros3,
read_hdf5_remfile,
read_hdf5_remfile_with_cache,
read_hdf5_ros3,
robust_ros3_read,
)
Expand All @@ -22,11 +26,15 @@
"NetworkStatistics",
"network_activity_tracker",
"read_hdf5_fsspec_no_cache",
"read_hdf5_fsspec_with_cache",
"read_hdf5_nwbfile_fsspec_no_cache",
"read_hdf5_nwbfile_fsspec_with_cache",
"read_hdf5_ros3",
"read_hdf5_nwbfile_ros3",
"read_hdf5_remfile",
"read_hdf5_remfile_with_cache",
"read_hdf5_nwbfile_remfile",
"read_hdf5_nwbfile_remfile_with_cache",
"get_s3_url",
"get_object_by_name",
"robust_ros3_read",
Expand Down
Loading

0 comments on commit 3d65167

Please sign in to comment.