Skip to content

Commit

Permalink
Merge branch 'datajoint_pipeline' into datajoint_presocial
Browse files Browse the repository at this point in the history
  • Loading branch information
Thinh Nguyen authored Aug 10, 2023
2 parents 0399603 + 3c9ec7e commit 478e6f9
Show file tree
Hide file tree
Showing 14 changed files with 146 additions and 89 deletions.
15 changes: 6 additions & 9 deletions aeon/dj_pipeline/utils/video.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import base64
import datetime
import pathlib

from pathlib import Path
import cv2
import numpy as np
import pandas as pd
Expand All @@ -16,21 +15,19 @@ def retrieve_video_frames(
camera_name,
start_time,
end_time,
raw_data_dir,
desired_fps=50,
start_frame=0,
chunk_size=50,
**kwargs,
):
from aeon.dj_pipeline import acquisition

raw_data_dir = acquisition.Experiment.get_data_directory(
{"experiment_name": experiment_name}
)
raw_data_dir = Path(raw_data_dir)
assert raw_data_dir.exists()

# do some data loading
# Load video data
videodata = io_api.load(
root=raw_data_dir.as_posix(),
reader=io_reader.Video(camera_name),
reader=io_reader.Video(f"{camera_name}_*"),
start=pd.Timestamp(start_time),
end=pd.Timestamp(end_time),
)
Expand Down
6 changes: 3 additions & 3 deletions aeon/dj_pipeline/webapps/sciviz/docker-compose-local.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ services:
- -c
- |
apk add --update git g++ &&
git clone -b datajoint_pipeline https://github.com/ttngu207/aeon_mecha.git &&
git clone -b datajoint_pipeline https://github.com/SainsburyWellcomeCentre/aeon_mecha.git &&
pip install -e ./aeon_mecha &&
pharus_update() {
[ -z "$$GUNICORN_PID" ] || kill $$GUNICORN_PID
Expand Down Expand Up @@ -52,7 +52,7 @@ services:
sci-viz:
cpus: 2.0
mem_limit: 16g
image: datajoint/sci-viz:2.3.2
image: jverswijver/sci-viz:2.3.3-hotfix3
environment:
- CHOKIDAR_USEPOLLING=true
- REACT_APP_DJSCIVIZ_BACKEND_PREFIX=/api
Expand All @@ -72,7 +72,7 @@ services:
networks:
- main
fakeservices.datajoint.io:
image: datajoint/nginx:v0.2.4
image: datajoint/nginx:v0.2.5
environment:
- ADD_pharus_TYPE=REST
- ADD_pharus_ENDPOINT=pharus:5000
Expand Down
16 changes: 9 additions & 7 deletions aeon/dj_pipeline/webapps/sciviz/docker-compose-remote.yaml
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
# cd aeon/dj_pipeline/webapps/sciviz/
# HOST_UID=$(id -u) docker-compose -f docker-compose-remote.yaml up -d
#

version: '2.4'
services:
pharus:
cpus: 2.0
mem_limit: 4g
# cpus: 2.0
mem_limit: 16g
image: jverswijver/pharus:0.8.5-PY_VER-3.9
environment:
# - FLASK_ENV=development # enables logging to console from Flask
- PHARUS_SPEC_PATH=/main/specsheet.yaml # for dynamic utils spec
env_file: ./.env
user: root
volumes:
- ./specsheet.yaml:/main/specsheet.yaml #copy the spec over to /main/specs/YOUR_SPEC_NAME
Expand All @@ -20,17 +21,18 @@ services:
- -c
- |
apk add --update git g++ &&
git clone -b datajoint_pipeline https://github.com/ttngu207/aeon_mecha.git &&
git clone -b datajoint_pipeline https://github.com/SainsburyWellcomeCentre/aeon_mecha.git &&
pip install -e ./aeon_mecha &&
gunicorn --bind 0.0.0.0:$${PHARUS_PORT} pharus.server:app
gunicorn --bind 0.0.0.0:$${PHARUS_PORT} --workers=3 pharus.server:app
# ports:
# - "5000:5000"
networks:
- main
sci-viz:
cpus: 2.0
mem_limit: 16g
image: jverswijver/sci-viz:2.3.3-hotfix2
mem_limit: 4g
image: jverswijver/sci-viz:2.3.3-hotfix3
environment:
- CHOKIDAR_USEPOLLING=true
- REACT_APP_DJSCIVIZ_BACKEND_PREFIX=/api
Expand Down
16 changes: 9 additions & 7 deletions aeon/dj_pipeline/webapps/sciviz/specsheet.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,7 @@ SciViz:
return dict(**kwargs)
dj_query: >
def dj_query(aeon_acquisition):
acquisition = aeon_acquisition
q = dj.U('camera_description') & acquisition.ExperimentCamera
return {'query': q, 'fetch_args': ['camera_description']}
time_range_selector:
Expand All @@ -453,8 +454,8 @@ SciViz:
width: 2
type: slideshow:aeon
route: /videostream_video_streamer
batch_size: 3
chunk_size: 50
batch_size: 6
chunk_size: 30
buffer_size: 30
max_FPS: 50
channels:
Expand All @@ -464,11 +465,12 @@ SciViz:
stream_time_selector,
]
restriction: >
def restriction(**kwargs):
def restriction(**kwargs):
return dict(**kwargs)
dj_query: >
def dj_query(aeon_acquisition):
q = aeon_acquisition.ExperimentCamera
acquisition = aeon_acquisition
q = dj.U('camera_description', 'raw_data_dir') & (acquisition.ExperimentCamera * acquisition.Experiment.Directory & 'directory_type = "raw"').proj('camera_description', raw_data_dir="CONCAT('/ceph/aeon/', directory_path)")
return {'query': q, 'fetch_args': []}
PipelineMonitor:
route: /pipeline_monitor
Expand Down Expand Up @@ -683,15 +685,15 @@ SciViz:
tables:
- aeon_acquisition.Experiment.Directory
map:
- type: table
input: Experiment Name
destination: aeon_acquisition.Experiment
- type: table
input: Directory Type
destination: aeon_acquisition.DirectoryType
- type: table
input: Pipeline Repository
destination: aeon_acquisition.PipelineRepository
- type: table
input: Experiment Name
destination: aeon_acquisition.Experiment
- type: attribute
input: Path to Experiment Directory
destination: directory_path
26 changes: 12 additions & 14 deletions aeon/io/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def load(root, reader, start=None, end=None, time=None, tolerance=None, epoch=No
fileset = {
chunk_key(fname):fname
for path in root
for fname in Path(path).glob(f"{epoch_pattern}/**/{reader.pattern}*.{reader.extension}")}
for fname in Path(path).glob(f"{epoch_pattern}/**/{reader.pattern}.{reader.extension}")}
files = sorted(fileset.items())

if time is not None:
Expand Down Expand Up @@ -117,27 +117,25 @@ def load(root, reader, start=None, end=None, time=None, tolerance=None, epoch=No
return pd.concat(dataframes)

if start is not None or end is not None:
chunkfilter = chunk_range(start, end)
files = list(filter(lambda item: item[0][1] in chunkfilter, files))
else:
chunkfilter = None
chunk_start = chunk(start) if start is not None else pd.Timestamp.min
chunk_end = chunk(end) if end is not None else pd.Timestamp.max
files = list(filter(lambda item: chunk_start <= chunk(item[0][1]) <= chunk_end, files))

if len(files) == 0:
return _empty(reader.columns)

data = pd.concat([reader.read(file) for _, file in files])
_set_index(data)
if chunkfilter is not None:
if start is not None or end is not None:
try:
return data.loc[start:end]
except KeyError:
import warnings
# if not data.index.has_duplicates:
# warnings.warn('data index for {0} contains out-of-order timestamps!'.format(reader.pattern))
# data = data.sort_index()
# else:
# warnings.warn('data index for {0} contains duplicate keys!'.format(reader.pattern))
# data = data[~data.index.duplicated(keep='first')]
# return data.loc[start:end]
return data
if not data.index.has_duplicates:
warnings.warn('data index for {0} contains out-of-order timestamps!'.format(reader.pattern))
data = data.sort_index()
else:
warnings.warn('data index for {0} contains duplicate keys!'.format(reader.pattern))
data = data[~data.index.duplicated(keep='first')]
return data.loc[start:end]
return data
6 changes: 3 additions & 3 deletions aeon/io/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@
}

class Reader:
"""Extracts data from raw chunk files in an Aeon dataset.
"""Extracts data from raw files in an Aeon dataset.
Attributes:
pattern (str): Pattern used to find raw chunk files,
pattern (str): Pattern used to find raw files,
usually in the format `<Device>_<DataStream>`.
columns (str or array-like): Column labels to use for the data.
extension (str): Extension of data file pathnames.
Expand All @@ -35,7 +35,7 @@ def __init__(self, pattern, columns, extension):
self.extension = extension

def read(self, _):
"""Reads data from the specified chunk file."""
"""Reads data from the specified file."""
return pd.DataFrame(columns=self.columns, index=pd.DatetimeIndex([]))

class Harp(Reader):
Expand Down
12 changes: 6 additions & 6 deletions aeon/schema/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,31 +3,31 @@

def video(pattern):
"""Video frame metadata."""
return { "Video": _reader.Video(pattern) }
return { "Video": _reader.Video(f"{pattern}_*") }

def position(pattern):
"""Position tracking data for the specified camera."""
return { "Position": _reader.Position(f"{pattern}_200") }
return { "Position": _reader.Position(f"{pattern}_200_*") }

def encoder(pattern):
"""Wheel magnetic encoder data."""
return { "Encoder": _reader.Encoder(f"{pattern}_90") }
return { "Encoder": _reader.Encoder(f"{pattern}_90_*") }

def environment(pattern):
"""Metadata for environment mode and subjects."""
return _device.compositeStream(pattern, environment_state, subject_state)

def environment_state(pattern):
"""Environment state log."""
return { "EnvironmentState": _reader.Csv(f"{pattern}_EnvironmentState", ['state']) }
return { "EnvironmentState": _reader.Csv(f"{pattern}_EnvironmentState_*", ['state']) }

def subject_state(pattern):
"""Subject state log."""
return { "SubjectState": _reader.Subject(f"{pattern}_SubjectState") }
return { "SubjectState": _reader.Subject(f"{pattern}_SubjectState_*") }

def messageLog(pattern):
"""Message log data."""
return { "MessageLog": _reader.Log(f"{pattern}_MessageLog") }
return { "MessageLog": _reader.Log(f"{pattern}_MessageLog_*") }

def metadata(pattern):
"""Metadata for acquisition epochs."""
Expand Down
16 changes: 8 additions & 8 deletions aeon/schema/foraging.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,23 +48,23 @@ def __init__(self, pattern):

def region(pattern):
"""Region tracking data for the specified camera."""
return { "Region": _RegionReader(f"{pattern}_201") }
return { "Region": _RegionReader(f"{pattern}_201_*") }

def depletionFunction(pattern):
"""State of the linear depletion function for foraging patches."""
return { "DepletionState": _PatchState(f"{pattern}_State") }
return { "DepletionState": _PatchState(f"{pattern}_State_*") }

def feeder(pattern):
"""Feeder commands and events."""
return _device.compositeStream(pattern, beam_break, deliver_pellet)

def beam_break(pattern):
"""Beam break events for pellet detection."""
return { "BeamBreak": _reader.BitmaskEvent(f"{pattern}_32", 0x22, 'PelletDetected') }
return { "BeamBreak": _reader.BitmaskEvent(f"{pattern}_32_*", 0x22, 'PelletDetected') }

def deliver_pellet(pattern):
"""Pellet delivery commands."""
return { "DeliverPellet": _reader.BitmaskEvent(f"{pattern}_35", 0x80, 'TriggerPellet') }
return { "DeliverPellet": _reader.BitmaskEvent(f"{pattern}_35_*", 0x80, 'TriggerPellet') }

def patch(pattern):
"""Data streams for a patch."""
Expand All @@ -76,16 +76,16 @@ def weight(pattern):

def weight_raw(pattern):
"""Raw weight measurement for a specific nest."""
return { "WeightRaw": _Weight(f"{pattern}_200") }
return { "WeightRaw": _Weight(f"{pattern}_200_*") }

def weight_filtered(pattern):
"""Filtered weight measurement for a specific nest."""
return { "WeightFiltered": _Weight(f"{pattern}_202") }
return { "WeightFiltered": _Weight(f"{pattern}_202_*") }

def weight_subject(pattern):
"""Subject weight measurement for a specific nest."""
return { "WeightSubject": _Weight(f"{pattern}_204") }
return { "WeightSubject": _Weight(f"{pattern}_204_*") }

def session(pattern):
"""Session metadata for Experiment 0.1."""
return { pattern: _reader.Csv(f"{pattern}_2", columns=['id','weight','event']) }
return { pattern: _reader.Csv(f"{pattern}_2*", columns=['id','weight','event']) }
Loading

0 comments on commit 478e6f9

Please sign in to comment.