Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fsspec remfile with cache #43

Merged
merged 25 commits into from
Apr 24, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
a1ebb85
Update _streaming.py
sinha-r Apr 18, 2024
40f77c2
Update network_tracking_remote_file_reading.py
sinha-r Apr 18, 2024
8798f3c
Update network_tracking_remote_slicing.py
sinha-r Apr 18, 2024
6815090
Update time_remote_file_reading.py
sinha-r Apr 18, 2024
2cef575
Update time_remote_slicing.py
sinha-r Apr 18, 2024
b7936c4
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Apr 18, 2024
cc2a57f
Update time_remote_file_reading.py
sinha-r Apr 18, 2024
9a8f3b8
Update _streaming.py
sinha-r Apr 19, 2024
f9acd83
Update time_remote_file_reading.py
sinha-r Apr 19, 2024
97631eb
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Apr 19, 2024
fb7ac89
Update _streaming.py
sinha-r Apr 19, 2024
1d9b4d6
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Apr 19, 2024
e22a2d5
Update nwb_benchmarks.yaml
sinha-r Apr 19, 2024
97966d5
Update src/nwb_benchmarks/core/_streaming.py
sinha-r Apr 19, 2024
d2d7845
Update src/nwb_benchmarks/core/_streaming.py
sinha-r Apr 19, 2024
8bc5a89
Update src/nwb_benchmarks/core/_streaming.py
sinha-r Apr 19, 2024
4486b0d
Update src/nwb_benchmarks/core/_streaming.py
sinha-r Apr 19, 2024
20b34b8
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Apr 19, 2024
e8c7cac
Fix use of undefined variables and add ignore_cleanup_errors=True set…
oruebel Apr 24, 2024
d2b4777
Add new test functions to the public interface of core
oruebel Apr 24, 2024
be3acaf
Add explicit cleanup of tempdir caches in network tracking tests
oruebel Apr 24, 2024
9c46c30
Add explicit cleanup of tempdir caches in network tracking tests for …
oruebel Apr 24, 2024
4e4c4d0
Add explicit cleanup of tempdir caches in time_remote_file_reading
oruebel Apr 24, 2024
9147fd4
Add explicit cleanup of tempdir caches time_remote_slicing and fix se…
oruebel Apr 24, 2024
3f5d60d
Add missing entry in __all__
oruebel Apr 24, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,14 @@
get_s3_url,
network_activity_tracker,
read_hdf5_fsspec_no_cache,
read_hdf5_fsspec_with_cache,
read_hdf5_nwbfile_fsspec_no_cache,
read_hdf5_nwbfile_fsspec_with_cache,
read_hdf5_nwbfile_remfile,
read_hdf5_nwbfile_remfile_with_cache,
read_hdf5_nwbfile_ros3,
read_hdf5_remfile,
read_hdf5_remfile_with_cache,
read_hdf5_ros3,
)
oruebel marked this conversation as resolved.
Show resolved Hide resolved

Expand All @@ -35,6 +39,16 @@ def track_network_activity_during_read(self, s3_url: str):
return network_tracker.asv_network_statistics


class FsspecWithCacheDirectFileReadBenchmark:
param_names = param_names
params = params

def track_network_activity_during_read(self, s3_url: str):
with network_activity_tracker(tshark_path=TSHARK_PATH) as network_tracker:
self.file, self.bytestream, self.tmpdir = read_hdf5_fsspec_with_cache(s3_url=s3_url)
return network_tracker.asv_network_statistics


class RemfileDirectFileReadBenchmark:
param_names = param_names
params = params
Expand All @@ -45,6 +59,16 @@ def track_network_activity_during_read(self, s3_url: str):
return network_tracker.asv_network_statistics


class RemfileDirectFileReadBenchmarkWithCache:
param_names = param_names
params = params

def track_network_activity_during_read(self, s3_url: str):
with network_activity_tracker(tshark_path=TSHARK_PATH) as network_tracker:
self.file, self.bytestream, self.tmpdir = read_hdf5_remfile_with_cache(s3_url=s3_url)
return network_tracker.asv_network_statistics


class Ros3DirectFileReadBenchmark:
param_names = param_names
params = params
Expand All @@ -66,6 +90,18 @@ def track_network_activity_during_read(self, s3_url: str):
return network_tracker.asv_network_statistics


class FsspecWithCacheNWBFileReadBenchmark:
param_names = param_names
params = params

def track_network_activity_during_read(self, s3_url: str):
with network_activity_tracker(tshark_path=TSHARK_PATH) as network_tracker:
self.nwbfile, self.io, self.file, self.bytestream, self.tmpdir = read_hdf5_nwbfile_fsspec_with_cache(
s3_url=s3_url
)
return network_tracker.asv_network_statistics


class RemfileNWBFileReadBenchmark:
param_names = param_names
params = params
Expand All @@ -76,6 +112,18 @@ def track_network_activity_during_read(self, s3_url: str):
return network_tracker.asv_network_statistics


class RemfileNWBFileReadBenchmarkWithCache:
sinha-r marked this conversation as resolved.
Show resolved Hide resolved
param_names = param_names
params = params

def track_network_activity_during_read(self, s3_url: str):
with network_activity_tracker(tshark_path=TSHARK_PATH) as network_tracker:
self.nwbfile, self.io, self.file, self.bytestream, self.tmpdir = read_hdf5_nwbfile_remfile_with_cache(
s3_url=s3_url
)
return network_tracker.asv_network_statistics


class Ros3NWBFileReadBenchmark:
param_names = param_names
params = params
Expand Down
38 changes: 38 additions & 0 deletions src/nwb_benchmarks/benchmarks/network_tracking_remote_slicing.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,13 @@
get_object_by_name,
get_s3_url,
network_activity_tracker,
read_hdf5_fsspec_with_cache,
read_hdf5_nwbfile_fsspec_no_cache,
read_hdf5_nwbfile_fsspec_with_cache,
read_hdf5_nwbfile_remfile,
read_hdf5_nwbfile_remfile_with_cache,
read_hdf5_nwbfile_ros3,
read_hdf5_remfile_with_cache,
oruebel marked this conversation as resolved.
Show resolved Hide resolved
robust_ros3_read,
)

Expand Down Expand Up @@ -41,6 +45,23 @@ def track_network_activity_during_slice(self, s3_url: str, object_name: str, sli
return network_tracker.asv_network_statistics


class FsspecWithCacheContinuousSliceBenchmark:
param_names = param_names
params = params

def setup(self, s3_url: str, object_name: str, slice_range: Tuple[slice]):
self.nwbfile, self.io, self.file, self.bytestream, self.tmpdir = read_hdf5_nwbfile_fsspec_with_cache(
s3_url=s3_url
)
self.neurodata_object = get_object_by_name(nwbfile=self.nwbfile, object_name=object_name)
self.data_to_slice = self.neurodata_object.data

def track_network_activity_during_slice(self, s3_url: str, object_name: str, slice_range: Tuple[slice]):
with network_activity_tracker(tshark_path=TSHARK_PATH) as network_tracker:
self._temp = self.data_to_slice[slice_range]
return network_tracker.asv_network_statistics


class RemfileContinuousSliceBenchmark:
param_names = param_names
params = params
Expand All @@ -56,6 +77,23 @@ def track_network_activity_during_slice(self, s3_url: str, object_name: str, sli
return network_tracker.asv_network_statistics


class RemfileContinuousSliceBenchmarkWithCache:
sinha-r marked this conversation as resolved.
Show resolved Hide resolved
param_names = param_names
params = params

def setup(self, s3_url: str, object_name: str, slice_range: Tuple[slice]):
self.nwbfile, self.io, self.file, self.bytestream, self.tmpdir = read_hdf5_nwbfile_remfile_with_cache(
s3_url=s3_url
)
self.neurodata_object = get_object_by_name(nwbfile=self.nwbfile, object_name=object_name)
self.data_to_slice = self.neurodata_object.data

def track_network_activity_during_slice(self, s3_url: str, object_name: str, slice_range: Tuple[slice]):
with network_activity_tracker(tshark_path=TSHARK_PATH) as network_tracker:
self._temp = self.data_to_slice[slice_range]
return network_tracker.asv_network_statistics


class Ros3ContinuousSliceBenchmark:
param_names = param_names
params = params
Expand Down
17 changes: 17 additions & 0 deletions src/nwb_benchmarks/benchmarks/time_remote_file_reading.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,14 @@
from nwb_benchmarks.core import (
get_s3_url,
read_hdf5_fsspec_no_cache,
read_hdf5_fsspec_with_cache,
read_hdf5_nwbfile_fsspec_no_cache,
read_hdf5_nwbfile_fsspec_with_cache,
read_hdf5_nwbfile_remfile,
read_hdf5_nwbfile_remfile_with_cache,
read_hdf5_nwbfile_ros3,
read_hdf5_remfile,
read_hdf5_remfile_with_cache,
read_hdf5_ros3,
)

Expand Down Expand Up @@ -38,13 +42,20 @@ class DirectFileReadBenchmark:
def time_read_hdf5_fsspec_no_cache(self, s3_url: str):
self.file, self.bytestream = read_hdf5_fsspec_no_cache(s3_url=s3_url)

def time_read_hdf5_fsspec_with_cache(self, s3_url: str):
sinha-r marked this conversation as resolved.
Show resolved Hide resolved
self.file, self.bytestream, self.tmpdir = read_hdf5_fsspec_with_cache(s3_url=s3_url)

def time_read_hdf5_remfile(self, s3_url: str):
self.file, self.bytestream = read_hdf5_remfile(s3_url=s3_url)

def time_read_hdf5_remfile_with_cache(self, s3_url: str):
self.file, self.bytestream, self.tmpdir = read_hdf5_remfile_with_cache(s3_url=s3_url)

def time_read_hdf5_ros3(self, s3_url: str):
self.file, _ = read_hdf5_ros3(s3_url=s3_url, retry=False)



class NWBFileReadBenchmark:
"""
Time the read of the HDF5-backend files with `pynwb` using each streaming method.
Expand All @@ -60,8 +71,14 @@ class NWBFileReadBenchmark:
def time_read_hdf5_nwbfile_fsspec_no_cache(self, s3_url: str):
self.nwbfile, self.io, self.file, self.bytestream = read_hdf5_nwbfile_fsspec_no_cache(s3_url=s3_url)

def time_read_hdf5_nwbfile_fsspec_with_cache(self, s3_url: str):
self.nwbfile, self.io, self.file, self.bytestream, self.tmpdir = read_hdf5_nwbfile_fsspec_with_cache(s3_url=s3_url)
CodyCBakerPhD marked this conversation as resolved.
Show resolved Hide resolved

def time_read_hdf5_nwbfile_remfile(self, s3_url: str):
self.nwbfile, self.io, self.file, self.bytestream = read_hdf5_nwbfile_remfile(s3_url=s3_url)

def time_read_hdf5_nwbfile_remfile_with_cache(self, s3_url: str):
self.nwbfile, self.io, self.file, self.bytestream, self.tmpdir = read_hdf5_nwbfile_remfile_with_cache(s3_url=s3_url)

def time_read_hdf5_nwbfile_ros3(self, s3_url: str):
self.nwbfile, self.io, _ = read_hdf5_nwbfile_ros3(s3_url=s3_url, retry=False)
40 changes: 40 additions & 0 deletions src/nwb_benchmarks/benchmarks/time_remote_slicing.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,13 @@
from nwb_benchmarks.core import (
get_object_by_name,
get_s3_url,
read_hdf5_fsspec_with_cache,
read_hdf5_nwbfile_fsspec_no_cache,
read_hdf5_nwbfile_fsspec_with_cache,
read_hdf5_nwbfile_remfile,
read_hdf5_nwbfile_remfile_with_cache,
read_hdf5_nwbfile_ros3,
read_hdf5_remfile_with_cache,
)

# TODO: add the others
Expand Down Expand Up @@ -42,6 +46,24 @@ def time_slice(self, s3_url: str, object_name: str, slice_range: Tuple[slice]):
self._temp = self.data_to_slice[slice_range]


class FsspecWithCacheContinuousSliceBenchmark:
rounds = 1
repeat = 3
param_names = param_names
params = params

def setup(self, s3_url: str, object_name: str, slice_range: Tuple[slice]):
self.nwbfile, self.io, self.file, self.bytestream, self.tmpdir = read_hdf5_nwbfile_fsspec_with_cache(
s3_url=s3_url
)
self.neurodata_object = get_object_by_name(nwbfile=self.nwbfile, object_name=object_name)
self.data_to_slice = self.neurodata_object.data

def time_slice(self, s3_url: str, object_name: str, slice_range: Tuple[slice]):
"""Note: store as self._temp to avoid tracking garbage collection as well."""
self._temp = self.data_to_slice[slice_range]


class RemfileContinuousSliceBenchmark:
rounds = 1
repeat = 3
Expand All @@ -58,6 +80,24 @@ def time_slice(self, s3_url: str, object_name: str, slice_range: Tuple[slice]):
self._temp = self.data_to_slice[slice_range]


class RemfileContinuousSliceBenchmarkWithCache:
sinha-r marked this conversation as resolved.
Show resolved Hide resolved
rounds = 1
repeat = 3
param_names = param_names
params = params

def setup(self, s3_url: str, object_name: str, slice_range: Tuple[slice]):
self.nwbfile, self.io, self.file, self.bytestream, self.tmpdir = read_hdf5_nwbfile_remfile_with_cache(
s3_url=s3_url
)
self.neurodata_object = get_object_by_name(nwbfile=self.nwbfile, object_name="ElectricalSeriesAp")
oruebel marked this conversation as resolved.
Show resolved Hide resolved
self.data_to_slice = self.neurodata_object.data

def time_slice(self, s3_url: str, object_name: str, slice_range: Tuple[slice]):
"""Note: store as self._temp to avoid tracking garbage collection as well."""
self._temp = self.data_to_slice[slice_range]


class Ros3ContinuousSliceBenchmark:
rounds = 1
repeat = 3
Expand Down
47 changes: 47 additions & 0 deletions src/nwb_benchmarks/core/_streaming.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import tempfile
import time
import warnings
from typing import Any, Callable, Tuple, Union
Expand All @@ -7,6 +8,7 @@
import pynwb
import remfile
from fsspec.asyn import reset_lock
from fsspec.implementations.cached import CachingFileSystem
from fsspec.implementations.http import HTTPFile

# Useful if running in verbose mode
Expand All @@ -26,6 +28,22 @@ def read_hdf5_fsspec_no_cache(
file = h5py.File(name=byte_stream)
return (file, byte_stream)

def read_hdf5_fsspec_with_cache(
s3_url: str,
) -> Tuple[h5py.File, HTTPFile]:
sinha-r marked this conversation as resolved.
Show resolved Hide resolved
"""Load the raw HDF5 file from an S3 URL using fsspec without a cache; does not formally read the NWB file."""
reset_lock()
fsspec.get_filesystem_class("https").clear_instance_cache()
filesystem = fsspec.filesystem("https")
tmpdir= tempfile.TemporaryDirectory()
fs = CachingFileSystem(
sinha-r marked this conversation as resolved.
Show resolved Hide resolved
fs=fs,
cache_storage=tmpdir.name, # Local folder for the cache
)
sinha-r marked this conversation as resolved.
Show resolved Hide resolved
sinha-r marked this conversation as resolved.
Show resolved Hide resolved
byte_stream = filesystem.open(path=s3_url, mode="rb")
file = h5py.File(name=byte_stream)
return (file, byte_stream, tmpdir)


def read_hdf5_nwbfile_fsspec_no_cache(
s3_url: str,
Expand All @@ -36,6 +54,20 @@ def read_hdf5_nwbfile_fsspec_no_cache(
nwbfile = io.read()
return (nwbfile, io, file, byte_stream)

def read_hdf5_nwbfile_fsspec_with_cache(
s3_url: str,
) -> Tuple[pynwb.NWBFile, pynwb.NWBHDF5IO, h5py.File, HTTPFile]:
sinha-r marked this conversation as resolved.
Show resolved Hide resolved
"""Read an HDF5 NWB file from an S3 URL using fsspec without a cache."""
(file, byte_stream) = read_hdf5_fsspec_no_cache(s3_url=s3_url)
io = pynwb.NWBHDF5IO(file=file, load_namespaces=True)
nwbfile = io.read()
tmpdir= tempfile.TemporaryDirectory()
fs = CachingFileSystem(
CodyCBakerPhD marked this conversation as resolved.
Show resolved Hide resolved
fs=fs,
cache_storage=tmpdir.name, # Local folder for the cache
)
return (nwbfile, io, file, byte_stream, tmpdir)


def read_hdf5_remfile(s3_url: str) -> Tuple[h5py.File, remfile.File]:
"""Load the raw HDF5 file from an S3 URL using remfile; does not formally read the NWB file."""
Expand All @@ -51,6 +83,21 @@ def read_hdf5_nwbfile_remfile(s3_url: str) -> Tuple[pynwb.NWBFile, pynwb.NWBHDF5
nwbfile = io.read()
return (nwbfile, io, file, byte_stream)

def read_hdf5_remfile_with_cache(s3_url: str) -> Tuple[h5py.File, remfile.File]:
sinha-r marked this conversation as resolved.
Show resolved Hide resolved
"""Load the raw HDF5 file from an S3 URL using remfile; does not formally read the NWB file."""
tmpdir= tempfile.TemporaryDirectory()
disk_cache= remfile.DiskCache(tempdir.name)
byte_stream = remfile.File(url=s3_url,disk_cache=disk_cache)
file = h5py.File(name=byte_stream)
return (file, byte_stream, tmpdir)


def read_hdf5_nwbfile_remfile_with_cache(s3_url: str) -> Tuple[pynwb.NWBFile, pynwb.NWBHDF5IO, h5py.File, remfile.File]:
sinha-r marked this conversation as resolved.
Show resolved Hide resolved
"""Read an HDF5 NWB file from an S3 URL using the ROS3 driver from h5py."""
(file, byte_stream, tmpdir) = read_hdf5_remfile_with_cache(s3_url=s3_url)
io = pynwb.NWBHDF5IO(file=file, load_namespaces=True)
nwbfile = io.read()
return (nwbfile, io, file, byte_stream, tmpdir)

def robust_ros3_read(
command: Callable,
Expand Down