From 0f84566a4e05fb04afe1bd4f91ae26991c0500d2 Mon Sep 17 00:00:00 2001 From: Chang Huan Lo Date: Fri, 1 Mar 2024 17:55:54 +0000 Subject: [PATCH 01/18] Update license.md --- license.md | 29 ++++++++++++++++++++++++++++- 1 file changed, 28 insertions(+), 1 deletion(-) diff --git a/license.md b/license.md index 4287ca86..8ef38fca 100644 --- a/license.md +++ b/license.md @@ -1 +1,28 @@ -# \ No newline at end of file +BSD 3-Clause License + +Copyright (c) 2024, University College London + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + +1. Redistributions of source code must retain the above copyright notice, this + list of conditions and the following disclaimer. + +2. Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + +3. Neither the name of the copyright holder nor the names of its + contributors may be used to endorse or promote products derived from + this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. From 1d5d4f820ad9b0fda52aa50503132b384f2bbcb0 Mon Sep 17 00:00:00 2001 From: Chang Huan Lo Date: Tue, 12 Mar 2024 15:25:41 +0000 Subject: [PATCH 02/18] Update license.md year range --- license.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/license.md b/license.md index 8ef38fca..91a756fc 100644 --- a/license.md +++ b/license.md @@ -1,6 +1,6 @@ BSD 3-Clause License -Copyright (c) 2024, University College London +Copyright (c) 2023-2024, University College London Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met: From 19e925e75249330785ca8e285c58b570569b090d Mon Sep 17 00:00:00 2001 From: glopesdev Date: Wed, 20 Mar 2024 12:29:56 +0000 Subject: [PATCH 03/18] Add object model for new stream io infrastructure --- aeon/io/device.py | 3 ++ aeon/io/streams.py | 72 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 75 insertions(+) create mode 100644 aeon/io/streams.py diff --git a/aeon/io/device.py b/aeon/io/device.py index 1a4916e6..1bae0a13 100644 --- a/aeon/io/device.py +++ b/aeon/io/device.py @@ -1,6 +1,8 @@ import inspect +from typing_extensions import deprecated +@deprecated def compositeStream(pattern, *args): """Merges multiple data streams into a single composite stream.""" composite = {} @@ -15,6 +17,7 @@ def compositeStream(pattern, *args): return composite +@deprecated class Device: """Groups multiple data streams into a logical device. diff --git a/aeon/io/streams.py b/aeon/io/streams.py new file mode 100644 index 00000000..c86167a6 --- /dev/null +++ b/aeon/io/streams.py @@ -0,0 +1,72 @@ +import inspect +from itertools import chain + + +class Stream: + """Represents a single data stream. + + Attributes: + reader (Reader): The reader used to retrieve the stream data. + """ + + def __init__(self, reader): + self.reader = reader + + def __iter__(self): + yield (self.__class__.__name__, self.reader) + + +class StreamGroup: + """Represents a logical group of multiple data streams. + + Attributes: + name (str): Name of the logical group used to find raw files. + """ + + def __init__(self, name, *args): + self.name = name + self._args = args + + def __iter__(self): + for member in chain(vars(self.__class__).values(), self._args): + if inspect.isclass(member): + for stream in iter(member(self.name)): + yield stream + elif isinstance(member, staticmethod): + for stream in iter(member.__func__(self.name)): + yield stream + + +def compositeStream(pattern, *args): + """Merges multiple data streams into a single composite stream.""" + composite = {} + if args: + for stream in args: + composite.update(stream(pattern)) + return composite + + +class Device: + """Groups multiple data streams into a logical device. + + If a device contains a single stream with the same pattern as the device + `name`, it will be considered a singleton, and the stream reader will be + paired directly with the device without nesting. + + Attributes: + name (str): Name of the device. + args (Any): Data streams collected from the device. + pattern (str, optional): Pattern used to find raw chunk files, + usually in the format `_`. + """ + + def __init__(self, name, *args, pattern=None): + self.name = name + self.provider = compositeStream(name if pattern is None else pattern, *args) + + def __iter__(self): + if len(self.provider) == 1: + singleton = self.provider.get(self.name, None) + if singleton: + return iter((self.name, singleton)) + return iter((self.name, self.provider)) From a9eac9154735b56126bd16bedc029433c0c5baa6 Mon Sep 17 00:00:00 2001 From: glopesdev Date: Wed, 20 Mar 2024 12:49:32 +0000 Subject: [PATCH 04/18] Support legacy stream group classes --- aeon/io/streams.py | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/aeon/io/streams.py b/aeon/io/streams.py index c86167a6..ad4ff11d 100644 --- a/aeon/io/streams.py +++ b/aeon/io/streams.py @@ -1,5 +1,6 @@ import inspect from itertools import chain +from warnings import warn class Stream: @@ -42,7 +43,17 @@ def compositeStream(pattern, *args): composite = {} if args: for stream in args: - composite.update(stream(pattern)) + try: + composite.update(stream(pattern)) + except TypeError: + warn( + f"Stream groups with no constructors are deprecated. {stream}", + category=DeprecationWarning, + ) + if inspect.isclass(stream): + for method in vars(stream).values(): + if isinstance(method, staticmethod): + composite.update(method.__func__(pattern)) return composite From e85986b37a1d636d60ee8c61ee08b11d241fd713 Mon Sep 17 00:00:00 2001 From: glopesdev Date: Wed, 20 Mar 2024 12:52:27 +0000 Subject: [PATCH 05/18] Ensure deprecation message --- aeon/io/device.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/aeon/io/device.py b/aeon/io/device.py index 1bae0a13..344298e2 100644 --- a/aeon/io/device.py +++ b/aeon/io/device.py @@ -2,7 +2,7 @@ from typing_extensions import deprecated -@deprecated +@deprecated("Please use the streams module instead.") def compositeStream(pattern, *args): """Merges multiple data streams into a single composite stream.""" composite = {} @@ -17,7 +17,7 @@ def compositeStream(pattern, *args): return composite -@deprecated +@deprecated("Please use the Device class in the streams module instead.") class Device: """Groups multiple data streams into a logical device. From 1e499c7e8e677ea74f9646f87f569930cf58c607 Mon Sep 17 00:00:00 2001 From: glopesdev Date: Wed, 20 Mar 2024 17:32:48 +0000 Subject: [PATCH 06/18] Refactor stream classes for clarity --- aeon/io/streams.py | 70 ++++++++++++++++++++++------------------------ 1 file changed, 33 insertions(+), 37 deletions(-) diff --git a/aeon/io/streams.py b/aeon/io/streams.py index ad4ff11d..39a35931 100644 --- a/aeon/io/streams.py +++ b/aeon/io/streams.py @@ -1,5 +1,4 @@ import inspect -from itertools import chain from warnings import warn @@ -21,40 +20,18 @@ class StreamGroup: """Represents a logical group of multiple data streams. Attributes: - name (str): Name of the logical group used to find raw files. + path (str): Path to the folder where stream chunks are located. + args (Any): Data streams or data stream groups to be included in this stream group. """ - def __init__(self, name, *args): - self.name = name + def __init__(self, path, *args): + self.path = path self._args = args def __iter__(self): - for member in chain(vars(self.__class__).values(), self._args): - if inspect.isclass(member): - for stream in iter(member(self.name)): - yield stream - elif isinstance(member, staticmethod): - for stream in iter(member.__func__(self.name)): - yield stream - - -def compositeStream(pattern, *args): - """Merges multiple data streams into a single composite stream.""" - composite = {} - if args: - for stream in args: - try: - composite.update(stream(pattern)) - except TypeError: - warn( - f"Stream groups with no constructors are deprecated. {stream}", - category=DeprecationWarning, - ) - if inspect.isclass(stream): - for method in vars(stream).values(): - if isinstance(method, staticmethod): - composite.update(method.__func__(pattern)) - return composite + for callable in self._args: + for stream in iter(callable(self.path)): + yield stream class Device: @@ -67,17 +44,36 @@ class Device: Attributes: name (str): Name of the device. args (Any): Data streams collected from the device. - pattern (str, optional): Pattern used to find raw chunk files, - usually in the format `_`. + path (str, optional): Path to the folder where stream chunks are located. """ - def __init__(self, name, *args, pattern=None): + def __init__(self, name, *args, path=None): self.name = name - self.provider = compositeStream(name if pattern is None else pattern, *args) + self._streams = Device._createStreams(name if path is None else path, *args) + + @staticmethod + def _createStreams(path, *args): + streams = {} + if args: + for callable in args: + try: + streams.update(callable(path)) + except TypeError: + if inspect.isclass(callable): + warn( + f"Stream group classes with no constructors are deprecated. {callable}", + category=DeprecationWarning, + ) + for method in vars(callable).values(): + if isinstance(method, staticmethod): + streams.update(method.__func__(path)) + else: + raise + return streams def __iter__(self): - if len(self.provider) == 1: - singleton = self.provider.get(self.name, None) + if len(self._streams) == 1: + singleton = self._streams.get(self.name, None) if singleton: return iter((self.name, singleton)) - return iter((self.name, self.provider)) + return iter((self.name, self._streams)) From 31721cc9db8ed617283d52fcbb3e76e16bd802a7 Mon Sep 17 00:00:00 2001 From: glopesdev Date: Wed, 20 Mar 2024 17:44:24 +0000 Subject: [PATCH 07/18] Ensure name is not None --- aeon/io/streams.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/aeon/io/streams.py b/aeon/io/streams.py index 39a35931..d005b53b 100644 --- a/aeon/io/streams.py +++ b/aeon/io/streams.py @@ -48,6 +48,9 @@ class Device: """ def __init__(self, name, *args, path=None): + if name is None: + raise ValueError("name cannot be None.") + self.name = name self._streams = Device._createStreams(name if path is None else path, *args) From 8f4a90b30f57f82d61c6f10aeb6176b3394e976c Mon Sep 17 00:00:00 2001 From: glopesdev Date: Wed, 20 Mar 2024 18:08:43 +0000 Subject: [PATCH 08/18] Remove unused conditional --- aeon/io/streams.py | 33 ++++++++++++++++----------------- 1 file changed, 16 insertions(+), 17 deletions(-) diff --git a/aeon/io/streams.py b/aeon/io/streams.py index d005b53b..a861da27 100644 --- a/aeon/io/streams.py +++ b/aeon/io/streams.py @@ -52,26 +52,25 @@ def __init__(self, name, *args, path=None): raise ValueError("name cannot be None.") self.name = name - self._streams = Device._createStreams(name if path is None else path, *args) + self._streams = Device._createStreams(name if path is None else path, args) @staticmethod - def _createStreams(path, *args): + def _createStreams(path, args): streams = {} - if args: - for callable in args: - try: - streams.update(callable(path)) - except TypeError: - if inspect.isclass(callable): - warn( - f"Stream group classes with no constructors are deprecated. {callable}", - category=DeprecationWarning, - ) - for method in vars(callable).values(): - if isinstance(method, staticmethod): - streams.update(method.__func__(path)) - else: - raise + for callable in args: + try: + streams.update(callable(path)) + except TypeError: + if inspect.isclass(callable): + warn( + f"Stream group classes with no constructors are deprecated. {callable}", + category=DeprecationWarning, + ) + for method in vars(callable).values(): + if isinstance(method, staticmethod): + streams.update(method.__func__(path)) + else: + raise return streams def __iter__(self): From 75e9d264f336bd0afe624c9c70b2d7518108edd4 Mon Sep 17 00:00:00 2001 From: glopesdev Date: Wed, 20 Mar 2024 18:12:00 +0000 Subject: [PATCH 09/18] Improve deprecation warnings --- aeon/io/device.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/aeon/io/device.py b/aeon/io/device.py index 344298e2..54998b17 100644 --- a/aeon/io/device.py +++ b/aeon/io/device.py @@ -2,7 +2,7 @@ from typing_extensions import deprecated -@deprecated("Please use the streams module instead.") +@deprecated("Please use the StreamGroup class from the streams module instead.") def compositeStream(pattern, *args): """Merges multiple data streams into a single composite stream.""" composite = {} @@ -17,7 +17,7 @@ def compositeStream(pattern, *args): return composite -@deprecated("Please use the Device class in the streams module instead.") +@deprecated("The Device class has been moved to the streams module.") class Device: """Groups multiple data streams into a logical device. From b5627a6a9752c18050bc75f381409fff5e680203 Mon Sep 17 00:00:00 2001 From: glopesdev Date: Wed, 20 Mar 2024 18:17:50 +0000 Subject: [PATCH 10/18] Avoid exception handling on deprecated code path --- aeon/io/streams.py | 21 +++++++++------------ 1 file changed, 9 insertions(+), 12 deletions(-) diff --git a/aeon/io/streams.py b/aeon/io/streams.py index a861da27..c7d91a27 100644 --- a/aeon/io/streams.py +++ b/aeon/io/streams.py @@ -58,19 +58,16 @@ def __init__(self, name, *args, path=None): def _createStreams(path, args): streams = {} for callable in args: - try: + if inspect.isclass(callable) and callable.__init__.__code__.co_argcount == 1: + warn( + f"Stream group classes with default constructors are deprecated. {callable}", + category=DeprecationWarning, + ) + for method in vars(callable).values(): + if isinstance(method, staticmethod): + streams.update(method.__func__(path)) + else: streams.update(callable(path)) - except TypeError: - if inspect.isclass(callable): - warn( - f"Stream group classes with no constructors are deprecated. {callable}", - category=DeprecationWarning, - ) - for method in vars(callable).values(): - if isinstance(method, staticmethod): - streams.update(method.__func__(path)) - else: - raise return streams def __iter__(self): From e5e75f04e6803fa49542847bc13fad456627ec29 Mon Sep 17 00:00:00 2001 From: glopesdev Date: Wed, 20 Mar 2024 18:20:46 +0000 Subject: [PATCH 11/18] Avoid clashing with built-in function names --- aeon/io/streams.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/aeon/io/streams.py b/aeon/io/streams.py index c7d91a27..b23073bc 100644 --- a/aeon/io/streams.py +++ b/aeon/io/streams.py @@ -29,8 +29,8 @@ def __init__(self, path, *args): self._args = args def __iter__(self): - for callable in self._args: - for stream in iter(callable(self.path)): + for factory in self._args: + for stream in iter(factory(self.path)): yield stream @@ -57,17 +57,17 @@ def __init__(self, name, *args, path=None): @staticmethod def _createStreams(path, args): streams = {} - for callable in args: - if inspect.isclass(callable) and callable.__init__.__code__.co_argcount == 1: + for factory in args: + if inspect.isclass(factory) and factory.__init__.__code__.co_argcount == 1: warn( - f"Stream group classes with default constructors are deprecated. {callable}", + f"Stream group classes with default constructors are deprecated. {factory}", category=DeprecationWarning, ) - for method in vars(callable).values(): + for method in vars(factory).values(): if isinstance(method, staticmethod): streams.update(method.__func__(path)) else: - streams.update(callable(path)) + streams.update(factory(path)) return streams def __iter__(self): From d3a7a87120061bdfa6b0d6f73d59c70e67e69ff9 Mon Sep 17 00:00:00 2001 From: glopesdev Date: Wed, 20 Mar 2024 21:07:43 +0000 Subject: [PATCH 12/18] Move stream schema classes to the schema module --- aeon/{io => schema}/streams.py | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename aeon/{io => schema}/streams.py (100%) diff --git a/aeon/io/streams.py b/aeon/schema/streams.py similarity index 100% rename from aeon/io/streams.py rename to aeon/schema/streams.py From 0e9a6a675507b4ac161640a3e8bf0d2f49c3d82e Mon Sep 17 00:00:00 2001 From: glopesdev Date: Wed, 20 Mar 2024 21:51:17 +0000 Subject: [PATCH 13/18] Fix check for auto-generated initializer --- aeon/schema/streams.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aeon/schema/streams.py b/aeon/schema/streams.py index b23073bc..6104178f 100644 --- a/aeon/schema/streams.py +++ b/aeon/schema/streams.py @@ -58,7 +58,7 @@ def __init__(self, name, *args, path=None): def _createStreams(path, args): streams = {} for factory in args: - if inspect.isclass(factory) and factory.__init__.__code__.co_argcount == 1: + if inspect.isclass(factory) and not hasattr(factory.__init__, "__code__"): warn( f"Stream group classes with default constructors are deprecated. {factory}", category=DeprecationWarning, From 51fe0b974d02293cc9b402073114b8ff9463ddb2 Mon Sep 17 00:00:00 2001 From: glopesdev Date: Wed, 20 Mar 2024 21:55:52 +0000 Subject: [PATCH 14/18] Allow automatic import of nested stream schemas --- aeon/schema/streams.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/aeon/schema/streams.py b/aeon/schema/streams.py index 6104178f..f306bf63 100644 --- a/aeon/schema/streams.py +++ b/aeon/schema/streams.py @@ -1,4 +1,5 @@ import inspect +from itertools import chain from warnings import warn @@ -27,9 +28,14 @@ class StreamGroup: def __init__(self, path, *args): self.path = path self._args = args + self._nested = ( + member + for member in vars(self.__class__).values() + if inspect.isclass(member) and issubclass(member, (Stream, StreamGroup)) + ) def __iter__(self): - for factory in self._args: + for factory in chain(self._nested, self._args): for stream in iter(factory(self.path)): yield stream From 9220b1e5a42d7d71cdd440833e012176b337e65e Mon Sep 17 00:00:00 2001 From: glopesdev Date: Wed, 20 Mar 2024 21:56:20 +0000 Subject: [PATCH 15/18] Black formatting --- tests/io/test_api.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/tests/io/test_api.py b/tests/io/test_api.py index 48986830..f7acd77d 100644 --- a/tests/io/test_api.py +++ b/tests/io/test_api.py @@ -19,17 +19,13 @@ def test_load_start_only(): @mark.api def test_load_end_only(): - data = aeon.load( - nonmonotonic_path, exp02.Patch2.Encoder, end=pd.Timestamp("2022-06-06T13:00:49") - ) + data = aeon.load(nonmonotonic_path, exp02.Patch2.Encoder, end=pd.Timestamp("2022-06-06T13:00:49")) assert len(data) > 0 @mark.api def test_load_filter_nonchunked(): - data = aeon.load( - nonmonotonic_path, exp02.Metadata, start=pd.Timestamp("2022-06-06T09:00:00") - ) + data = aeon.load(nonmonotonic_path, exp02.Metadata, start=pd.Timestamp("2022-06-06T09:00:00")) assert len(data) > 0 From 258900ff010e34a678085878373d7415ca476fb7 Mon Sep 17 00:00:00 2001 From: glopesdev Date: Wed, 20 Mar 2024 22:29:49 +0000 Subject: [PATCH 16/18] Refactor dataset module to use stream schemas --- aeon/schema/core.py | 56 +++++--- aeon/schema/dataset.py | 60 ++++---- aeon/schema/foraging.py | 74 ++++++---- aeon/schema/octagon.py | 305 ++++++++++++++++++++-------------------- 4 files changed, 270 insertions(+), 225 deletions(-) diff --git a/aeon/schema/core.py b/aeon/schema/core.py index 8181c710..f3ca95a5 100644 --- a/aeon/schema/core.py +++ b/aeon/schema/core.py @@ -1,47 +1,65 @@ -import aeon.io.device as _device +from aeon.schema.streams import Stream, StreamGroup import aeon.io.reader as _reader -def heartbeat(pattern): +class Heartbeat(Stream): """Heartbeat event for Harp devices.""" - return {"Heartbeat": _reader.Heartbeat(f"{pattern}_8_*")} + def __init__(self, pattern): + super().__init__(_reader.Heartbeat(f"{pattern}_8_*")) -def video(pattern): + +class Video(Stream): """Video frame metadata.""" - return {"Video": _reader.Video(f"{pattern}_*")} + + def __init__(self, pattern): + super().__init__(_reader.Video(f"{pattern}_*")) -def position(pattern): +class Position(Stream): """Position tracking data for the specified camera.""" - return {"Position": _reader.Position(f"{pattern}_200_*")} + + def __init__(self, pattern): + super().__init__(_reader.Position(f"{pattern}_200_*")) -def encoder(pattern): +class Encoder(Stream): """Wheel magnetic encoder data.""" - return {"Encoder": _reader.Encoder(f"{pattern}_90_*")} + def __init__(self, pattern): + super().__init__(_reader.Encoder(f"{pattern}_90_*")) -def environment(pattern): + +class Environment(StreamGroup): """Metadata for environment mode and subjects.""" - return _device.compositeStream(pattern, environment_state, subject_state) + + def __init__(self, pattern): + super().__init__(pattern, EnvironmentState, SubjectState) -def environment_state(pattern): +class EnvironmentState(Stream): """Environment state log.""" - return {"EnvironmentState": _reader.Csv(f"{pattern}_EnvironmentState_*", ["state"])} + def __init__(self, pattern): + super().__init__(_reader.Csv(f"{pattern}_EnvironmentState_*", ["state"])) -def subject_state(pattern): + +class SubjectState(Stream): """Subject state log.""" - return {"SubjectState": _reader.Subject(f"{pattern}_SubjectState_*")} + + def __init__(self, pattern): + super().__init__(_reader.Subject(f"{pattern}_SubjectState_*")) -def messageLog(pattern): +class MessageLog(Stream): """Message log data.""" - return {"MessageLog": _reader.Log(f"{pattern}_MessageLog_*")} + def __init__(self, pattern): + super().__init__(_reader.Log(f"{pattern}_MessageLog_*")) -def metadata(pattern): + +class Metadata(Stream): """Metadata for acquisition epochs.""" - return {pattern: _reader.Metadata(pattern)} + + def __init__(self, pattern): + super().__init__(_reader.Metadata(pattern)) diff --git a/aeon/schema/dataset.py b/aeon/schema/dataset.py index b9586de4..bbb7cbb8 100644 --- a/aeon/schema/dataset.py +++ b/aeon/schema/dataset.py @@ -1,50 +1,50 @@ from dotmap import DotMap import aeon.schema.core as stream -from aeon.io.device import Device +from aeon.schema.streams import Device from aeon.schema import foraging, octagon exp02 = DotMap( [ - Device("Metadata", stream.metadata), - Device("ExperimentalMetadata", stream.environment, stream.messageLog), - Device("CameraTop", stream.video, stream.position, foraging.region), - Device("CameraEast", stream.video), - Device("CameraNest", stream.video), - Device("CameraNorth", stream.video), - Device("CameraPatch1", stream.video), - Device("CameraPatch2", stream.video), - Device("CameraSouth", stream.video), - Device("CameraWest", stream.video), - Device("Nest", foraging.weight), - Device("Patch1", foraging.patch), - Device("Patch2", foraging.patch), + Device("Metadata", stream.Metadata), + Device("ExperimentalMetadata", stream.Environment, stream.MessageLog), + Device("CameraTop", stream.Video, stream.Position, foraging.Region), + Device("CameraEast", stream.Video), + Device("CameraNest", stream.Video), + Device("CameraNorth", stream.Video), + Device("CameraPatch1", stream.Video), + Device("CameraPatch2", stream.Video), + Device("CameraSouth", stream.Video), + Device("CameraWest", stream.Video), + Device("Nest", foraging.Weight), + Device("Patch1", foraging.Patch), + Device("Patch2", foraging.Patch), ] ) exp01 = DotMap( [ - Device("SessionData", foraging.session), - Device("FrameTop", stream.video, stream.position), - Device("FrameEast", stream.video), - Device("FrameGate", stream.video), - Device("FrameNorth", stream.video), - Device("FramePatch1", stream.video), - Device("FramePatch2", stream.video), - Device("FrameSouth", stream.video), - Device("FrameWest", stream.video), - Device("Patch1", foraging.depletionFunction, stream.encoder, foraging.feeder), - Device("Patch2", foraging.depletionFunction, stream.encoder, foraging.feeder), + Device("SessionData", foraging.SessionData), + Device("FrameTop", stream.Video, stream.Position), + Device("FrameEast", stream.Video), + Device("FrameGate", stream.Video), + Device("FrameNorth", stream.Video), + Device("FramePatch1", stream.Video), + Device("FramePatch2", stream.Video), + Device("FrameSouth", stream.Video), + Device("FrameWest", stream.Video), + Device("Patch1", foraging.DepletionFunction, stream.Encoder, foraging.Feeder), + Device("Patch2", foraging.DepletionFunction, stream.Encoder, foraging.Feeder), ] ) octagon01 = DotMap( [ - Device("Metadata", stream.metadata), - Device("CameraTop", stream.video, stream.position), - Device("CameraColorTop", stream.video), - Device("ExperimentalMetadata", stream.subject_state), - Device("Photodiode", octagon.photodiode), + Device("Metadata", stream.Metadata), + Device("CameraTop", stream.Video, stream.Position), + Device("CameraColorTop", stream.Video), + Device("ExperimentalMetadata", stream.SubjectState), + Device("Photodiode", octagon.Photodiode), Device("OSC", octagon.OSC), Device("TaskLogic", octagon.TaskLogic), Device("Wall1", octagon.Wall), diff --git a/aeon/schema/foraging.py b/aeon/schema/foraging.py index ffd8fdd9..c25ff70d 100644 --- a/aeon/schema/foraging.py +++ b/aeon/schema/foraging.py @@ -1,13 +1,11 @@ -from enum import Enum as _Enum - +from enum import Enum import pandas as pd - -import aeon.io.device as _device import aeon.io.reader as _reader import aeon.schema.core as _stream +from aeon.schema.streams import Stream, StreamGroup -class Area(_Enum): +class Area(Enum): Null = 0 Nest = 1 Corridor = 2 @@ -53,56 +51,78 @@ def __init__(self, pattern): super().__init__(pattern, columns=["value", "stable"]) -def region(pattern): +class Region(Stream): """Region tracking data for the specified camera.""" - return {"Region": _RegionReader(f"{pattern}_201_*")} + + def __init__(self, pattern): + super().__init__(_RegionReader(f"{pattern}_201_*")) -def depletionFunction(pattern): +class DepletionFunction(Stream): """State of the linear depletion function for foraging patches.""" - return {"DepletionState": _PatchState(f"{pattern}_State_*")} + + def __init__(self, pattern): + super().__init__(_PatchState(f"{pattern}_State_*")) -def feeder(pattern): +class Feeder(StreamGroup): """Feeder commands and events.""" - return _device.compositeStream(pattern, beam_break, deliver_pellet) + + def __init__(self, pattern): + super().__init__(pattern, BeamBreak, DeliverPellet) -def beam_break(pattern): +class BeamBreak(Stream): """Beam break events for pellet detection.""" - return {"BeamBreak": _reader.BitmaskEvent(f"{pattern}_32_*", 0x22, "PelletDetected")} + + def __init__(self, pattern): + super().__init__(_reader.BitmaskEvent(f"{pattern}_32_*", 0x22, "PelletDetected")) -def deliver_pellet(pattern): +class DeliverPellet(Stream): """Pellet delivery commands.""" - return {"DeliverPellet": _reader.BitmaskEvent(f"{pattern}_35_*", 0x80, "TriggerPellet")} + + def __init__(self, pattern): + super().__init__(_reader.BitmaskEvent(f"{pattern}_35_*", 0x80, "TriggerPellet")) -def patch(pattern): +class Patch(StreamGroup): """Data streams for a patch.""" - return _device.compositeStream(pattern, depletionFunction, _stream.encoder, feeder) + + def __init__(self, pattern): + super().__init__(pattern, DepletionFunction, _stream.Encoder, Feeder) -def weight(pattern): +class Weight(StreamGroup): """Weight measurement data streams for a specific nest.""" - return _device.compositeStream(pattern, weight_raw, weight_filtered, weight_subject) + + def __init__(self, pattern): + super().__init__(pattern, WeightRaw, WeightFiltered, WeightSubject) -def weight_raw(pattern): +class WeightRaw(Stream): """Raw weight measurement for a specific nest.""" - return {"WeightRaw": _Weight(f"{pattern}_200_*")} + + def __init__(self, pattern): + super().__init__(_Weight(f"{pattern}_200_*")) -def weight_filtered(pattern): +class WeightFiltered(Stream): """Filtered weight measurement for a specific nest.""" - return {"WeightFiltered": _Weight(f"{pattern}_202_*")} + + def __init__(self, pattern): + super().__init__(_Weight(f"{pattern}_202_*")) -def weight_subject(pattern): +class WeightSubject(Stream): """Subject weight measurement for a specific nest.""" - return {"WeightSubject": _Weight(f"{pattern}_204_*")} + + def __init__(self, pattern): + super().__init__(_Weight(f"{pattern}_204_*")) -def session(pattern): +class SessionData(Stream): """Session metadata for Experiment 0.1.""" - return {pattern: _reader.Csv(f"{pattern}_2*", columns=["id", "weight", "event"])} + + def __init__(self, pattern): + super().__init__(_reader.Csv(f"{pattern}_2*", columns=["id", "weight", "event"])) diff --git a/aeon/schema/octagon.py b/aeon/schema/octagon.py index a792fac4..2ea85b4e 100644 --- a/aeon/schema/octagon.py +++ b/aeon/schema/octagon.py @@ -1,190 +1,197 @@ import aeon.io.reader as _reader +from aeon.schema.streams import Stream, StreamGroup -def photodiode(pattern): - return {"Photodiode": _reader.Harp(f"{pattern}_44_*", columns=["adc", "encoder"])} +class Photodiode(Stream): + def __init__(self, path): + super().__init__(_reader.Harp(f"{path}_44_*", columns=["adc", "encoder"])) -class OSC: - @staticmethod - def background_color(pattern): - return { - "BackgroundColor": _reader.Csv( - f"{pattern}_backgroundcolor_*", columns=["typetag", "r", "g", "b", "a"] +class OSC(StreamGroup): + def __init__(self, path): + super().__init__(path) + + class BackgroundColor(Stream): + def __init__(self, pattern): + super().__init__( + _reader.Csv(f"{pattern}_backgroundcolor_*", columns=["typetag", "r", "g", "b", "a"]) ) - } - @staticmethod - def change_subject_state(pattern): - return { - "ChangeSubjectState": _reader.Csv( - f"{pattern}_changesubjectstate_*", columns=["typetag", "id", "weight", "event"] + class ChangeSubjectState(Stream): + def __init__(self, pattern): + super().__init__( + _reader.Csv(f"{pattern}_changesubjectstate_*", columns=["typetag", "id", "weight", "event"]) ) - } - @staticmethod - def end_trial(pattern): - return {"EndTrial": _reader.Csv(f"{pattern}_endtrial_*", columns=["typetag", "value"])} + class EndTrial(Stream): + def __init__(self, pattern): + super().__init__(_reader.Csv(f"{pattern}_endtrial_*", columns=["typetag", "value"])) - @staticmethod - def slice(pattern): - return { - "Slice": _reader.Csv( - f"{pattern}_octagonslice_*", columns=["typetag", "wall_id", "r", "g", "b", "a", "delay"] + class Slice(Stream): + def __init__(self, pattern): + super().__init__( + _reader.Csv( + f"{pattern}_octagonslice_*", columns=["typetag", "wall_id", "r", "g", "b", "a", "delay"] + ) ) - } - - @staticmethod - def gratings_slice(pattern): - return { - "GratingsSlice": _reader.Csv( - f"{pattern}_octagongratingsslice_*", - columns=[ - "typetag", - "wall_id", - "contrast", - "opacity", - "spatial_frequency", - "temporal_frequency", - "angle", - "delay", - ], + + class GratingsSlice(Stream): + def __init__(self, pattern): + super().__init__( + _reader.Csv( + f"{pattern}_octagongratingsslice_*", + columns=[ + "typetag", + "wall_id", + "contrast", + "opacity", + "spatial_frequency", + "temporal_frequency", + "angle", + "delay", + ], + ) ) - } - - @staticmethod - def poke(pattern): - return { - "Poke": _reader.Csv( - f"{pattern}_poke_*", - columns=[ - "typetag", - "wall_id", - "poke_id", - "reward", - "reward_interval", - "delay", - "led_delay", - ], + + class Poke(Stream): + def __init__(self, pattern): + super().__init__( + _reader.Csv( + f"{pattern}_poke_*", + columns=[ + "typetag", + "wall_id", + "poke_id", + "reward", + "reward_interval", + "delay", + "led_delay", + ], + ) ) - } - @staticmethod - def response(pattern): - return { - "Response": _reader.Csv( - f"{pattern}_response_*", columns=["typetag", "wall_id", "poke_id", "response_time"] + class Response(Stream): + def __init__(self, pattern): + super().__init__( + _reader.Csv( + f"{pattern}_response_*", columns=["typetag", "wall_id", "poke_id", "response_time"] + ) ) - } - - @staticmethod - def run_pre_trial_no_poke(pattern): - return { - "RunPreTrialNoPoke": _reader.Csv( - f"{pattern}_run_pre_no_poke_*", - columns=[ - "typetag", - "wait_for_poke", - "reward_iti", - "timeout_iti", - "pre_trial_duration", - "activity_reset_flag", - ], + + class RunPreTrialNoPoke(Stream): + def __init__(self, pattern): + super().__init__( + _reader.Csv( + f"{pattern}_run_pre_no_poke_*", + columns=[ + "typetag", + "wait_for_poke", + "reward_iti", + "timeout_iti", + "pre_trial_duration", + "activity_reset_flag", + ], + ) ) - } - @staticmethod - def start_new_session(pattern): - return {"StartNewSession": _reader.Csv(f"{pattern}_startnewsession_*", columns=["typetag", "path"])} + class StartNewSession(Stream): + def __init__(self, pattern): + super().__init__(_reader.Csv(f"{pattern}_startnewsession_*", columns=["typetag", "path"])) + + +class TaskLogic(StreamGroup): + def __init__(self, path): + super().__init__(path) + class TrialInitiation(Stream): + def __init__(self, pattern): + super().__init__(_reader.Harp(f"{pattern}_1_*", columns=["trial_type"])) -class TaskLogic: - @staticmethod - def trial_initiation(pattern): - return {"TrialInitiation": _reader.Harp(f"{pattern}_1_*", columns=["trial_type"])} + class Response(Stream): + def __init__(self, pattern): + super().__init__(_reader.Harp(f"{pattern}_2_*", columns=["wall_id", "poke_id"])) - @staticmethod - def response(pattern): - return {"Response": _reader.Harp(f"{pattern}_2_*", columns=["wall_id", "poke_id"])} + class PreTrialState(Stream): + def __init__(self, pattern): + super().__init__(_reader.Harp(f"{pattern}_3_*", columns=["state"])) - @staticmethod - def pre_trial(pattern): - return {"PreTrialState": _reader.Harp(f"{pattern}_3_*", columns=["state"])} + class InterTrialInterval(Stream): + def __init__(self, pattern): + super().__init__(_reader.Harp(f"{pattern}_4_*", columns=["state"])) - @staticmethod - def inter_trial_interval(pattern): - return {"InterTrialInterval": _reader.Harp(f"{pattern}_4_*", columns=["state"])} + class SliceOnset(Stream): + def __init__(self, pattern): + super().__init__(_reader.Harp(f"{pattern}_10_*", columns=["wall_id"])) - @staticmethod - def slice_onset(pattern): - return {"SliceOnset": _reader.Harp(f"{pattern}_10_*", columns=["wall_id"])} + class DrawBackground(Stream): + def __init__(self, pattern): + super().__init__(_reader.Harp(f"{pattern}_11_*", columns=["state"])) - @staticmethod - def draw_background(pattern): - return {"DrawBackground": _reader.Harp(f"{pattern}_11_*", columns=["state"])} + class GratingsSliceOnset(Stream): + def __init__(self, pattern): + super().__init__(_reader.Harp(f"{pattern}_12_*", columns=["wall_id"])) - @staticmethod - def gratings_slice_onset(pattern): - return {"GratingsSliceOnset": _reader.Harp(f"{pattern}_12_*", columns=["wall_id"])} +class Wall(StreamGroup): + def __init__(self, path): + super().__init__(path) -class Wall: - @staticmethod - def beam_break0(pattern): - return {"BeamBreak0": _reader.DigitalBitmask(f"{pattern}_32_*", 0x1, columns=["state"])} + class BeamBreak0(Stream): + def __init__(self, pattern): + super().__init__(_reader.DigitalBitmask(f"{pattern}_32_*", 0x1, columns=["state"])) - @staticmethod - def beam_break1(pattern): - return {"BeamBreak1": _reader.DigitalBitmask(f"{pattern}_32_*", 0x2, columns=["state"])} + class BeamBreak1(Stream): + def __init__(self, pattern): + super().__init__(_reader.DigitalBitmask(f"{pattern}_32_*", 0x2, columns=["state"])) - @staticmethod - def beam_break2(pattern): - return {"BeamBreak2": _reader.DigitalBitmask(f"{pattern}_32_*", 0x4, columns=["state"])} + class BeamBreak2(Stream): + def __init__(self, pattern): + super().__init__(_reader.DigitalBitmask(f"{pattern}_32_*", 0x4, columns=["state"])) - @staticmethod - def set_led0(pattern): - return {"SetLed0": _reader.BitmaskEvent(f"{pattern}_34_*", 0x1, "Set")} + class SetLed0(Stream): + def __init__(self, pattern): + super().__init__(_reader.BitmaskEvent(f"{pattern}_34_*", 0x1, "Set")) - @staticmethod - def set_led1(pattern): - return {"SetLed1": _reader.BitmaskEvent(f"{pattern}_34_*", 0x2, "Set")} + class SetLed1(Stream): + def __init__(self, pattern): + super().__init__(_reader.BitmaskEvent(f"{pattern}_34_*", 0x2, "Set")) - @staticmethod - def set_led2(pattern): - return {"SetLed2": _reader.BitmaskEvent(f"{pattern}_34_*", 0x4, "Set")} + class SetLed2(Stream): + def __init__(self, pattern): + super().__init__(_reader.BitmaskEvent(f"{pattern}_34_*", 0x4, "Set")) - @staticmethod - def set_valve0(pattern): - return {"SetValve0": _reader.BitmaskEvent(f"{pattern}_34_*", 0x8, "Set")} + class SetValve0(Stream): + def __init__(self, pattern): + super().__init__(_reader.BitmaskEvent(f"{pattern}_34_*", 0x8, "Set")) - @staticmethod - def set_valve1(pattern): - return {"SetValve1": _reader.BitmaskEvent(f"{pattern}_34_*", 0x10, "Set")} + class SetValve1(Stream): + def __init__(self, pattern): + super().__init__(_reader.BitmaskEvent(f"{pattern}_34_*", 0x10, "Set")) - @staticmethod - def set_valve2(pattern): - return {"SetValve2": _reader.BitmaskEvent(f"{pattern}_34_*", 0x20, "Set")} + class SetValve2(Stream): + def __init__(self, pattern): + super().__init__(_reader.BitmaskEvent(f"{pattern}_34_*", 0x20, "Set")) - @staticmethod - def clear_led0(pattern): - return {"ClearLed0": _reader.BitmaskEvent(f"{pattern}_35_*", 0x1, "Clear")} + class ClearLed0(Stream): + def __init__(self, pattern): + super().__init__(_reader.BitmaskEvent(f"{pattern}_35_*", 0x1, "Clear")) - @staticmethod - def clear_led1(pattern): - return {"ClearLed1": _reader.BitmaskEvent(f"{pattern}_35_*", 0x2, "Clear")} + class ClearLed1(Stream): + def __init__(self, pattern): + super().__init__(_reader.BitmaskEvent(f"{pattern}_35_*", 0x2, "Clear")) - @staticmethod - def clear_led2(pattern): - return {"ClearLed2": _reader.BitmaskEvent(f"{pattern}_35_*", 0x4, "Clear")} + class ClearLed2(Stream): + def __init__(self, pattern): + super().__init__(_reader.BitmaskEvent(f"{pattern}_35_*", 0x4, "Clear")) - @staticmethod - def clear_valve0(pattern): - return {"ClearValve0": _reader.BitmaskEvent(f"{pattern}_35_*", 0x8, "Clear")} + class ClearValve0(Stream): + def __init__(self, pattern): + super().__init__(_reader.BitmaskEvent(f"{pattern}_35_*", 0x8, "Clear")) - @staticmethod - def clear_valve1(pattern): - return {"ClearValve1": _reader.BitmaskEvent(f"{pattern}_35_*", 0x10, "Clear")} + class ClearValve1(Stream): + def __init__(self, pattern): + super().__init__(_reader.BitmaskEvent(f"{pattern}_35_*", 0x10, "Clear")) - @staticmethod - def clear_valve2(pattern): - return {"ClearValve2": _reader.BitmaskEvent(f"{pattern}_35_*", 0x20, "Clear")} + class ClearValve2(Stream): + def __init__(self, pattern): + super().__init__(_reader.BitmaskEvent(f"{pattern}_35_*", 0x20, "Clear")) From 28e53972bfe5d0d40c68627df280d8525a4b6373 Mon Sep 17 00:00:00 2001 From: Thinh Nguyen Date: Wed, 3 Apr 2024 19:19:05 -0500 Subject: [PATCH 17/18] update(schemas): update social schemas to use the refactored Stream, StreamGroup and Device --- aeon/schema/schemas.py | 150 ++++++++++++++++++++------------------- aeon/schema/social.py | 113 ----------------------------- aeon/schema/social_01.py | 12 ++++ aeon/schema/social_02.py | 88 +++++++++++++++++++++++ 4 files changed, 177 insertions(+), 186 deletions(-) delete mode 100644 aeon/schema/social.py create mode 100644 aeon/schema/social_01.py create mode 100644 aeon/schema/social_02.py diff --git a/aeon/schema/schemas.py b/aeon/schema/schemas.py index 72820fdc..bc6eaa31 100644 --- a/aeon/schema/schemas.py +++ b/aeon/schema/schemas.py @@ -1,48 +1,51 @@ from dotmap import DotMap -from aeon.io.device import Device -from aeon.schema import core, foraging, octagon, social + +import aeon.schema.core as stream +from aeon.schema.streams import Device +from aeon.schema import foraging, octagon, social_01, social_02 + exp02 = DotMap( [ - Device("Metadata", core.metadata), - Device("ExperimentalMetadata", core.environment, core.message_log), - Device("CameraTop", core.video, core.position, foraging.region), - Device("CameraEast", core.video), - Device("CameraNest", core.video), - Device("CameraNorth", core.video), - Device("CameraPatch1", core.video), - Device("CameraPatch2", core.video), - Device("CameraSouth", core.video), - Device("CameraWest", core.video), - Device("Nest", foraging.weight), - Device("Patch1", foraging.patch), - Device("Patch2", foraging.patch), + Device("Metadata", stream.Metadata), + Device("ExperimentalMetadata", stream.Environment, stream.MessageLog), + Device("CameraTop", stream.Video, stream.Position, foraging.Region), + Device("CameraEast", stream.Video), + Device("CameraNest", stream.Video), + Device("CameraNorth", stream.Video), + Device("CameraPatch1", stream.Video), + Device("CameraPatch2", stream.Video), + Device("CameraSouth", stream.Video), + Device("CameraWest", stream.Video), + Device("Nest", foraging.Weight), + Device("Patch1", foraging.Patch), + Device("Patch2", foraging.Patch), ] ) exp01 = DotMap( [ - Device("SessionData", foraging.session), - Device("FrameTop", core.video, core.position), - Device("FrameEast", core.video), - Device("FrameGate", core.video), - Device("FrameNorth", core.video), - Device("FramePatch1", core.video), - Device("FramePatch2", core.video), - Device("FrameSouth", core.video), - Device("FrameWest", core.video), - Device("Patch1", foraging.depletion_function, core.encoder, foraging.feeder), - Device("Patch2", foraging.depletion_function, core.encoder, foraging.feeder), + Device("SessionData", foraging.SessionData), + Device("FrameTop", stream.Video, stream.Position), + Device("FrameEast", stream.Video), + Device("FrameGate", stream.Video), + Device("FrameNorth", stream.Video), + Device("FramePatch1", stream.Video), + Device("FramePatch2", stream.Video), + Device("FrameSouth", stream.Video), + Device("FrameWest", stream.Video), + Device("Patch1", foraging.DepletionFunction, stream.Encoder, foraging.Feeder), + Device("Patch2", foraging.DepletionFunction, stream.Encoder, foraging.Feeder), ] ) octagon01 = DotMap( [ - Device("Metadata", core.metadata), - Device("CameraTop", core.video, core.position), - Device("CameraColorTop", core.video), - Device("ExperimentalMetadata", core.subject_state), - Device("Photodiode", octagon.photodiode), + Device("Metadata", stream.Metadata), + Device("CameraTop", stream.Video, stream.Position), + Device("CameraColorTop", stream.Video), + Device("ExperimentalMetadata", stream.SubjectState), + Device("Photodiode", octagon.Photodiode), Device("OSC", octagon.OSC), Device("TaskLogic", octagon.TaskLogic), Device("Wall1", octagon.Wall), @@ -58,55 +61,56 @@ social01 = DotMap( [ - Device("Metadata", core.metadata), - Device("Environment", social.environment_b, social.subject_b), - Device("CameraTop", core.video, social.camera_top_pos_b), - Device("CameraNorth", core.video), - Device("CameraSouth", core.video), - Device("CameraEast", core.video), - Device("CameraWest", core.video), - Device("CameraPatch1", core.video), - Device("CameraPatch2", core.video), - Device("CameraPatch3", core.video), - Device("CameraNest", core.video), - Device("Nest", social.weight_raw_b, social.weight_filtered_b), - Device("Patch1", social.patch_streams_b), - Device("Patch2", social.patch_streams_b), - Device("Patch3", social.patch_streams_b), - Device("RfidGate", social.rfid_events_social01_b), - Device("RfidNest1", social.rfid_events_social01_b), - Device("RfidNest2", social.rfid_events_social01_b), - Device("RfidPatch1", social.rfid_events_social01_b), - Device("RfidPatch2", social.rfid_events_social01_b), - Device("RfidPatch3", social.rfid_events_social01_b), + Device("Metadata", stream.Metadata), + Device("Environment", social_02.Environment, social_02.SubjectData), + Device("CameraTop", stream.Video, social_02.Pose), + Device("CameraNorth", stream.Video), + Device("CameraSouth", stream.Video), + Device("CameraEast", stream.Video), + Device("CameraWest", stream.Video), + Device("CameraPatch1", stream.Video), + Device("CameraPatch2", stream.Video), + Device("CameraPatch3", stream.Video), + Device("CameraNest", stream.Video), + Device("Nest", social_02.WeightRaw, social_02.WeightFiltered), + Device("Patch1", social_02.Patch), + Device("Patch2", social_02.Patch), + Device("Patch3", social_02.Patch), + Device("RfidGate", social_01.RfidEvents), + Device("RfidNest1", social_01.RfidEvents), + Device("RfidNest2", social_01.RfidEvents), + Device("RfidPatch1", social_01.RfidEvents), + Device("RfidPatch2", social_01.RfidEvents), + Device("RfidPatch3", social_01.RfidEvents), ] ) social02 = DotMap( [ - Device("Metadata", core.metadata), - Device("Environment", social.environment_b, social.subject_b), - Device("CameraTop", core.video, social.camera_top_pos_b), - Device("CameraNorth", core.video), - Device("CameraSouth", core.video), - Device("CameraEast", core.video), - Device("CameraWest", core.video), - Device("CameraPatch1", core.video), - Device("CameraPatch2", core.video), - Device("CameraPatch3", core.video), - Device("CameraNest", core.video), - Device("Nest", social.weight_raw_b, social.weight_filtered_b), - Device("Patch1", social.patch_streams_b), - Device("Patch2", social.patch_streams_b), - Device("Patch3", social.patch_streams_b), - Device("Patch1Rfid", social.rfid_events_b), - Device("Patch2Rfid", social.rfid_events_b), - Device("Patch3Rfid", social.rfid_events_b), - Device("NestRfid1", social.rfid_events_b), - Device("NestRfid2", social.rfid_events_b), - Device("GateRfid", social.rfid_events_b), + Device("Metadata", stream.Metadata), + Device("Environment", social_02.Environment, social_02.SubjectData), + Device("CameraTop", stream.Video, social_02.Pose), + Device("CameraNorth", stream.Video), + Device("CameraSouth", stream.Video), + Device("CameraEast", stream.Video), + Device("CameraWest", stream.Video), + Device("CameraPatch1", stream.Video), + Device("CameraPatch2", stream.Video), + Device("CameraPatch3", stream.Video), + Device("CameraNest", stream.Video), + Device("Nest", social_02.WeightRaw, social_02.WeightFiltered), + Device("Patch1", social_02.Patch), + Device("Patch2", social_02.Patch), + Device("Patch3", social_02.Patch), + Device("RfidGate", social_02.RfidEvents), + Device("RfidNest1", social_02.RfidEvents), + Device("RfidNest2", social_02.RfidEvents), + Device("RfidPatch1", social_02.RfidEvents), + Device("RfidPatch2", social_02.RfidEvents), + Device("RfidPatch3", social_02.RfidEvents), ] ) + __all__ = ["exp01", "exp02", "octagon01", "social01", "social02"] diff --git a/aeon/schema/social.py b/aeon/schema/social.py deleted file mode 100644 index 54c08302..00000000 --- a/aeon/schema/social.py +++ /dev/null @@ -1,113 +0,0 @@ -from aeon.io import reader -from aeon.io.device import Device, register -from aeon.schema import core, foraging - - -"""Creating the Social 0.1 schema""" - -# Above we've listed out all the streams we recorded from during Social0.1, but we won't care to analyze all -# of them. Instead, we'll create a DotMap schema from Device objects that only contains Readers for the -# streams we want to analyze. - -# We'll see both examples of binder functions we saw previously: 1. "empty pattern", and -# 2. "device-name passed". - -# And we'll see both examples of instantiating Device objects we saw previously: 1. from singleton binder -# functions; 2. from multiple and/or nested binder functions. - -# (Note, in the simplest case, a schema can always be created from / reduced to "empty pattern" binder -# functions as singletons in Device objects.) - -# Metadata.yml (will be a singleton binder function Device object) -# --- - -metadata = Device("Metadata", core.metadata) - -# --- - -# Environment (will be a nested, multiple binder function Device object) -# --- - -# BlockState -block_state_b = lambda pattern: { - "BlockState": reader.Csv(f"{pattern}_BlockState_*", ["pellet_ct", "pellet_ct_thresh", "due_time"]) -} - -# LightEvents -light_events_b = lambda pattern: { - "LightEvents": reader.Csv(f"{pattern}_LightEvents_*", ["channel", "value"]) -} - -# Combine EnvironmentState, BlockState, LightEvents -environment_b = lambda pattern: register( - pattern, core.environment_state, block_state_b, light_events_b, core.message_log -) - -# SubjectState -subject_state_b = lambda pattern: { - "SubjectState": reader.Csv(f"{pattern}_SubjectState_*", ["id", "weight", "type"]) -} - -# SubjectVisits -subject_visits_b = lambda pattern: { - "SubjectVisits": reader.Csv(f"{pattern}_SubjectVisits_*", ["id", "type", "region"]) -} - -# SubjectWeight -subject_weight_b = lambda pattern: { - "SubjectWeight": reader.Csv( - f"{pattern}_SubjectWeight_*", ["weight", "confidence", "subject_id", "int_id"] - ) -} - -# Separate Device object for subject-specific streams. -subject_b = lambda pattern: register(pattern, subject_state_b, subject_visits_b, subject_weight_b) -# --- - -# Camera -# --- - -camera_top_pos_b = lambda pattern: {"Pose": reader.Pose(f"{pattern}_test-node1*")} - -# --- - -# Nest -# --- - -weight_raw_b = lambda pattern: {"WeightRaw": reader.Harp(f"{pattern}_200_*", ["weight(g)", "stability"])} -weight_filtered_b = lambda pattern: { - "WeightFiltered": reader.Harp(f"{pattern}_202_*", ["weight(g)", "stability"]) -} - -# --- - -# Patch -# --- - -# Combine streams for Patch device -patch_streams_b = lambda pattern: register( - pattern, - foraging.pellet_depletion_state, - core.encoder, - foraging.feeder, - foraging.pellet_manual_delivery, - foraging.missed_pellet, - foraging.pellet_retried_delivery, -) -# --- - -# Rfid -# --- - - -def rfid_events_social01_b(pattern): - """RFID events reader (with social0.1 specific logic)""" - pattern = pattern.replace("Rfid", "") - if pattern.startswith("Events"): - pattern = pattern.replace("Events", "") - return {"RfidEvents": reader.Harp(f"RfidEvents{pattern}_*", ["rfid"])} - - -def rfid_events_b(pattern): - """RFID events reader""" - return {"RfidEvents": reader.Harp(f"{pattern}_32*", ["rfid"])} diff --git a/aeon/schema/social_01.py b/aeon/schema/social_01.py new file mode 100644 index 00000000..becfc252 --- /dev/null +++ b/aeon/schema/social_01.py @@ -0,0 +1,12 @@ +import aeon.io.reader as _reader +from aeon.schema.streams import Stream + + +class RfidEvents(Stream): + + def __init__(self, path): + path = path.replace("Rfid", "") + if path.startswith("Events"): + path = path.replace("Events", "") + + super().__init__(_reader.Harp(f"RfidEvents{path}_32*", ["rfid"])) diff --git a/aeon/schema/social_02.py b/aeon/schema/social_02.py new file mode 100644 index 00000000..44c26c91 --- /dev/null +++ b/aeon/schema/social_02.py @@ -0,0 +1,88 @@ +import aeon.io.reader as _reader +from aeon.schema.streams import Stream, StreamGroup +from aeon.schema import core, foraging + + +class Environment(StreamGroup): + + def __init__(self, path): + super().__init__(path) + + EnvironmentState = core.EnvironmentState + + class BlockState(Stream): + def __init__(self, path): + super().__init__(_reader.Csv(f"{path}_BlockState_*", columns=["pellet_ct", "pellet_ct_thresh", "due_time"])) + + class LightEvents(Stream): + def __init__(self, path): + super().__init__(_reader.Csv(f"{path}_LightEvents_*", columns=["channel", "value"])) + + MessageLog = core.MessageLog + + +class SubjectData(StreamGroup): + def __init__(self, path): + super().__init__(path) + + class SubjectState(Stream): + def __init__(self, path): + super().__init__(_reader.Csv(f"{path}_SubjectState_*", columns=["id", "weight", "type"])) + + class SubjectVisits(Stream): + def __init__(self, path): + super().__init__(_reader.Csv(f"{path}_SubjectVisits_*", columns=["id", "type", "region"])) + + class SubjectWeight(Stream): + def __init__(self, path): + super().__init__(_reader.Csv(f"{path}_SubjectWeight_*", columns=["weight", "confidence", "subject_id", "int_id"])) + + +class Pose(Stream): + + def __init__(self, path): + super().__init__(_reader.Pose(f"{path}_test-node1*")) + + +class WeightRaw(Stream): + + def __init__(self, path): + super().__init__(_reader.Harp(f"{path}_200_*", ["weight(g)", "stability"])) + + +class WeightFiltered(Stream): + + def __init__(self, path): + super().__init__(_reader.Harp(f"{path}_202_*", ["weight(g)", "stability"])) + + +class Patch(StreamGroup): + + def __init__(self, path): + super().__init__(path) + + class DepletionState(Stream): + def __init__(self, path): + super().__init__(_reader.Csv(f"{path}_State_*", columns=["threshold", "offset", "rate"])) + + Encoder = core.Encoder + + Feeder = foraging.Feeder + + class ManualDelivery(Stream): + def __init__(self, path): + super().__init__(_reader.Harp(f"{path}_201_*", ["manual_delivery"])) + + class MissedPellet(Stream): + def __init__(self, path): + super().__init__(_reader.Harp(f"{path}_202_*", ["missed_pellet"])) + + class RetriedDelivery(Stream): + def __init__(self, path): + super().__init__(_reader.Harp(f"{path}_203_*", ["retried_delivery"])) + + +class RfidEvents(Stream): + + def __init__(self, path): + super().__init__(_reader.Harp(f"{path}_32*", ["rfid"])) From 8e4b967ea67750d577b3eb23258450997e98601e Mon Sep 17 00:00:00 2001 From: Thinh Nguyen Date: Wed, 3 Apr 2024 19:24:56 -0500 Subject: [PATCH 18/18] chore: code cleanup --- aeon/dj_pipeline/acquisition.py | 9 --------- 1 file changed, 9 deletions(-) diff --git a/aeon/dj_pipeline/acquisition.py b/aeon/dj_pipeline/acquisition.py index 5209f2d6..52dab8dc 100644 --- a/aeon/dj_pipeline/acquisition.py +++ b/aeon/dj_pipeline/acquisition.py @@ -25,15 +25,6 @@ "exp0.2-r0": "CameraTop", } -# _device_schema_mapping = { -# "exp0.1-r0": aeon_schemas.exp01, -# "social0-r1": aeon_schemas.exp01, -# "exp0.2-r0": aeon_schemas.exp02, -# "oct1.0-r0": aeon_schemas.octagon01, -# "social0.1-a3": aeon_schemas.social01, -# "social0.1-a4": aeon_schemas.social01, -# } - # ------------------- Type Lookup ------------------------