Skip to content

Commit

Permalink
Merge pull request #347 from ttngu207/datajoint_pipeline
Browse files Browse the repository at this point in the history
Update ingestion - using new Stream/Device io
  • Loading branch information
ttngu207 authored Apr 4, 2024
2 parents b8f76d4 + 8e4b967 commit 5287aa6
Show file tree
Hide file tree
Showing 13 changed files with 595 additions and 419 deletions.
9 changes: 0 additions & 9 deletions aeon/dj_pipeline/acquisition.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,6 @@
"exp0.2-r0": "CameraTop",
}

# _device_schema_mapping = {
# "exp0.1-r0": aeon_schemas.exp01,
# "social0-r1": aeon_schemas.exp01,
# "exp0.2-r0": aeon_schemas.exp02,
# "oct1.0-r0": aeon_schemas.octagon01,
# "social0.1-a3": aeon_schemas.social01,
# "social0.1-a4": aeon_schemas.social01,
# }


# ------------------- Type Lookup ------------------------

Expand Down
9 changes: 6 additions & 3 deletions aeon/io/device.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import inspect
from typing_extensions import deprecated


def register(pattern, *args):
"""Merges multiple Readers into a single registry."""
registry = {}
@deprecated("Please use the StreamGroup class from the streams module instead.")
def compositeStream(pattern, *args):
"""Merges multiple data streams into a single composite stream."""
composite = {}
if args:
for binder_fn in args:
if inspect.isclass(binder_fn):
Expand All @@ -15,6 +17,7 @@ def register(pattern, *args):
return registry


@deprecated("The Device class has been moved to the streams module.")
class Device:
"""Groups multiple Readers into a logical device.
Expand Down
56 changes: 37 additions & 19 deletions aeon/schema/core.py
Original file line number Diff line number Diff line change
@@ -1,47 +1,65 @@
import aeon.io.device as _device
from aeon.schema.streams import Stream, StreamGroup
import aeon.io.reader as _reader


def heartbeat(pattern):
class Heartbeat(Stream):
"""Heartbeat event for Harp devices."""
return {"Heartbeat": _reader.Heartbeat(f"{pattern}_8_*")}

def __init__(self, pattern):
super().__init__(_reader.Heartbeat(f"{pattern}_8_*"))

def video(pattern):

class Video(Stream):
"""Video frame metadata."""
return {"Video": _reader.Video(f"{pattern}_*")}

def __init__(self, pattern):
super().__init__(_reader.Video(f"{pattern}_*"))


def position(pattern):
class Position(Stream):
"""Position tracking data for the specified camera."""
return {"Position": _reader.Position(f"{pattern}_200_*")}

def __init__(self, pattern):
super().__init__(_reader.Position(f"{pattern}_200_*"))


def encoder(pattern):
class Encoder(Stream):
"""Wheel magnetic encoder data."""
return {"Encoder": _reader.Encoder(f"{pattern}_90_*")}

def __init__(self, pattern):
super().__init__(_reader.Encoder(f"{pattern}_90_*"))

def environment(pattern):

class Environment(StreamGroup):
"""Metadata for environment mode and subjects."""
return _device.register(pattern, environment_state, subject_state)

def __init__(self, pattern):
super().__init__(pattern, EnvironmentState, SubjectState)


def environment_state(pattern):
class EnvironmentState(Stream):
"""Environment state log."""
return {"EnvironmentState": _reader.Csv(f"{pattern}_EnvironmentState_*", ["state"])}

def __init__(self, pattern):
super().__init__(_reader.Csv(f"{pattern}_EnvironmentState_*", ["state"]))

def subject_state(pattern):

class SubjectState(Stream):
"""Subject state log."""
return {"SubjectState": _reader.Subject(f"{pattern}_SubjectState_*")}

def __init__(self, pattern):
super().__init__(_reader.Subject(f"{pattern}_SubjectState_*"))


def message_log(pattern):
class MessageLog(Stream):
"""Message log data."""
return {"MessageLog": _reader.Log(f"{pattern}_MessageLog_*")}

def __init__(self, pattern):
super().__init__(_reader.Log(f"{pattern}_MessageLog_*"))

def metadata(pattern):

class Metadata(Stream):
"""Metadata for acquisition epochs."""
return {pattern: _reader.Metadata(pattern)}

def __init__(self, pattern):
super().__init__(_reader.Metadata(pattern))
59 changes: 59 additions & 0 deletions aeon/schema/dataset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
from dotmap import DotMap

import aeon.schema.core as stream
from aeon.schema.streams import Device
from aeon.schema import foraging, octagon

exp02 = DotMap(
[
Device("Metadata", stream.Metadata),
Device("ExperimentalMetadata", stream.Environment, stream.MessageLog),
Device("CameraTop", stream.Video, stream.Position, foraging.Region),
Device("CameraEast", stream.Video),
Device("CameraNest", stream.Video),
Device("CameraNorth", stream.Video),
Device("CameraPatch1", stream.Video),
Device("CameraPatch2", stream.Video),
Device("CameraSouth", stream.Video),
Device("CameraWest", stream.Video),
Device("Nest", foraging.Weight),
Device("Patch1", foraging.Patch),
Device("Patch2", foraging.Patch),
]
)

exp01 = DotMap(
[
Device("SessionData", foraging.SessionData),
Device("FrameTop", stream.Video, stream.Position),
Device("FrameEast", stream.Video),
Device("FrameGate", stream.Video),
Device("FrameNorth", stream.Video),
Device("FramePatch1", stream.Video),
Device("FramePatch2", stream.Video),
Device("FrameSouth", stream.Video),
Device("FrameWest", stream.Video),
Device("Patch1", foraging.DepletionFunction, stream.Encoder, foraging.Feeder),
Device("Patch2", foraging.DepletionFunction, stream.Encoder, foraging.Feeder),
]
)

octagon01 = DotMap(
[
Device("Metadata", stream.Metadata),
Device("CameraTop", stream.Video, stream.Position),
Device("CameraColorTop", stream.Video),
Device("ExperimentalMetadata", stream.SubjectState),
Device("Photodiode", octagon.Photodiode),
Device("OSC", octagon.OSC),
Device("TaskLogic", octagon.TaskLogic),
Device("Wall1", octagon.Wall),
Device("Wall2", octagon.Wall),
Device("Wall3", octagon.Wall),
Device("Wall4", octagon.Wall),
Device("Wall5", octagon.Wall),
Device("Wall6", octagon.Wall),
Device("Wall7", octagon.Wall),
Device("Wall8", octagon.Wall),
]
)
92 changes: 46 additions & 46 deletions aeon/schema/foraging.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
from enum import Enum as _Enum

from enum import Enum
import pandas as pd

import aeon.io.device as _device
import aeon.io.reader as _reader
import aeon.schema.core as _stream
from aeon.schema.streams import Stream, StreamGroup


class Area(_Enum):
class Area(Enum):
Null = 0
Nest = 1
Corridor = 2
Expand Down Expand Up @@ -53,76 +51,78 @@ def __init__(self, pattern):
super().__init__(pattern, columns=["value", "stable"])


def region(pattern):
class Region(Stream):
"""Region tracking data for the specified camera."""
return {"Region": _RegionReader(f"{pattern}_201_*")}


def depletion_function(pattern):
"""State of the linear depletion function for foraging patches."""
return {"DepletionState": _PatchState(f"{pattern}_State_*")}


def feeder(pattern):
"""Feeder commands and events."""
return _device.register(pattern, beam_break, deliver_pellet)
def __init__(self, pattern):
super().__init__(_RegionReader(f"{pattern}_201_*"))


def beam_break(pattern):
"""Beam break events for pellet detection."""
return {"BeamBreak": _reader.BitmaskEvent(f"{pattern}_32_*", 34, "PelletDetected")}
class DepletionFunction(Stream):
"""State of the linear depletion function for foraging patches."""

def __init__(self, pattern):
super().__init__(_PatchState(f"{pattern}_State_*"))

def deliver_pellet(pattern):
"""Pellet delivery commands."""
return {"DeliverPellet": _reader.BitmaskEvent(f"{pattern}_35_*", 1, "TriggerPellet")}

class Feeder(StreamGroup):
"""Feeder commands and events."""

def pellet_manual_delivery(pattern):
"""Manual pellet delivery."""
return {"ManualDelivery": _reader.Harp(f"{pattern}_201_*", ["manual_delivery"])}
def __init__(self, pattern):
super().__init__(pattern, BeamBreak, DeliverPellet)


def missed_pellet(pattern):
"""Missed pellet delivery."""
return {"MissedPellet": _reader.Harp(f"{pattern}_202_*", ["missed_pellet"])}
class BeamBreak(Stream):
"""Beam break events for pellet detection."""

def __init__(self, pattern):
super().__init__(_reader.BitmaskEvent(f"{pattern}_32_*", 0x22, "PelletDetected"))

def pellet_retried_delivery(pattern):
"""Retry pellet delivery."""
return {"RetriedDelivery": _reader.Harp(f"{pattern}_203_*", ["retried_delivery"])}

class DeliverPellet(Stream):
"""Pellet delivery commands."""

def pellet_depletion_state(pattern):
"""Pellet delivery state."""
return {"DepletionState": _reader.Csv(f"{pattern}_State_*", ["threshold", "offset", "rate"])}
def __init__(self, pattern):
super().__init__(_reader.BitmaskEvent(f"{pattern}_35_*", 0x80, "TriggerPellet"))


def patch(pattern):
class Patch(StreamGroup):
"""Data streams for a patch."""
return _device.register(pattern, depletion_function, _stream.encoder, feeder)

def __init__(self, pattern):
super().__init__(pattern, DepletionFunction, _stream.Encoder, Feeder)


def weight(pattern):
class Weight(StreamGroup):
"""Weight measurement data streams for a specific nest."""
return _device.register(pattern, weight_raw, weight_filtered, weight_subject)

def __init__(self, pattern):
super().__init__(pattern, WeightRaw, WeightFiltered, WeightSubject)


def weight_raw(pattern):
class WeightRaw(Stream):
"""Raw weight measurement for a specific nest."""
return {"WeightRaw": _Weight(f"{pattern}_200_*")}

def __init__(self, pattern):
super().__init__(_Weight(f"{pattern}_200_*"))

def weight_filtered(pattern):

class WeightFiltered(Stream):
"""Filtered weight measurement for a specific nest."""
return {"WeightFiltered": _Weight(f"{pattern}_202_*")}

def __init__(self, pattern):
super().__init__(_Weight(f"{pattern}_202_*"))


def weight_subject(pattern):
class WeightSubject(Stream):
"""Subject weight measurement for a specific nest."""
return {"WeightSubject": _Weight(f"{pattern}_204_*")}

def __init__(self, pattern):
super().__init__(_Weight(f"{pattern}_204_*"))

def session(pattern):

class SessionData(Stream):
"""Session metadata for Experiment 0.1."""
return {pattern: _reader.Csv(f"{pattern}_2*", columns=["id", "weight", "event"])}

def __init__(self, pattern):
super().__init__(_reader.Csv(f"{pattern}_2*", columns=["id", "weight", "event"]))
Loading

0 comments on commit 5287aa6

Please sign in to comment.