Skip to content

Commit

Permalink
Merge pull request #201 from JaerongA/metadata
Browse files Browse the repository at this point in the history
Add device table definitions
  • Loading branch information
Thinh Nguyen authored Jun 29, 2023
2 parents b6935c9 + 0c5ec8c commit 6b34e43
Show file tree
Hide file tree
Showing 8 changed files with 216 additions and 106 deletions.
23 changes: 15 additions & 8 deletions aeon/dj_pipeline/acquisition.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
"exp0.2-r0": "CameraTop",
"oct1.0-r0": "CameraTop",
"presocial0.1-a2": "CameraTop",
"presocial0.1-a3": "CameraTop",
"presocial0.1-a4": "CameraTop",
}

_device_schema_mapping = {
Expand All @@ -33,6 +35,8 @@
"exp0.2-r0": aeon_schema.exp02,
"oct1.0-r0": aeon_schema.octagon01,
"presocial0.1-a2": aeon_schema.presocial,
"presocial0.1-a3": aeon_schema.presocial,
"presocial0.1-a4": aeon_schema.presocial,
}


Expand Down Expand Up @@ -120,14 +124,17 @@ class Directory(dj.Part):

@classmethod
def get_data_directory(cls, experiment_key, directory_type="raw", as_posix=False):
repo_name, dir_path = (
cls.Directory & experiment_key & {"directory_type": directory_type}
).fetch1("repository_name", "directory_path")
data_directory = paths.get_repository_path(repo_name) / dir_path
if not data_directory.exists():
return None
return data_directory.as_posix() if as_posix else data_directory


try:
repo_name, dir_path = (
cls.Directory & experiment_key & {"directory_type": directory_type}
).fetch1("repository_name", "directory_path")
data_directory = paths.get_repository_path(repo_name) / dir_path
if not data_directory.exists():
return None
return data_directory.as_posix() if as_posix else data_directory
except dj.errors.DataJointError:
return
@classmethod
def get_data_directories(
cls, experiment_key, directory_types=["raw"], as_posix=False
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
from aeon.dj_pipeline import acquisition, lab, subject

experiment_type = "presocial"
experiment_name = "presocial0.1-a2" # AEON2 acquisition computer
experiment_type = "presocial0.1"
experiment_names = ["presocial0.1-a2", "presocial0.1-a3", "presocial0.1-a4"]
location = "4th floor"

computers = ["AEON2", "AEON3", "AEON4"]

def create_new_experiment():

Expand All @@ -13,22 +13,23 @@ def create_new_experiment():
{"experiment_type": experiment_type}, skip_duplicates=True
)

acquisition.Experiment.insert1(
{
acquisition.Experiment.insert(
[{
"experiment_name": experiment_name,
"experiment_start_time": "2023-02-25 00:00:00",
"experiment_description": "presocial experiment 0.1 in aeon2",
"experiment_description": "presocial experiment 0.1",
"arena_name": "circle-2m",
"lab": "SWC",
"location": location,
"experiment_type": experiment_type,
},
"experiment_type": experiment_type
} for experiment_name in experiment_names],
skip_duplicates=True,
)

acquisition.Experiment.Subject.insert(
[
{"experiment_name": experiment_name, "subject": s}
for experiment_name in experiment_names
for s in subject.Subject.fetch("subject")
],
skip_duplicates=True,
Expand All @@ -40,14 +41,8 @@ def create_new_experiment():
"experiment_name": experiment_name,
"repository_name": "ceph_aeon",
"directory_type": "raw",
"directory_path": "aeon/data/raw/AEON2/presocial0.1",
},
{
"experiment_name": experiment_name,
"repository_name": "ceph_aeon",
"directory_type": "quality-control",
"directory_path": "aeon/data/qc/AEON2/presocial0.1",
},
"directory_path": f"aeon/data/raw/{computer}/{experiment_type}"
} for experiment_name, computer in zip(experiment_names, computers)
],
skip_duplicates=True,
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"VideoController": "VideoController", "CameraTop": "VideoSource", "CameraWest": "VideoSource", "CameraEast": "VideoSource", "CameraNorth": "VideoSource", "CameraSouth": "VideoSource", "CameraPatch1": "VideoSource", "CameraPatch2": "VideoSource", "CameraNest": "VideoSource", "AudioAmbient": "AudioSource", "Patch1": "Patch", "Patch2": "Patch", "WeightNest": "WeightScale", "TrackingTop": "PositionTracking", "ActivityCenter": "ActivityTracking", "ActivityArena": "ActivityTracking", "ActivityNest": "ActivityTracking", "ActivityPatch1": "ActivityTracking", "ActivityPatch2": "ActivityTracking", "InNest": "RegionTracking", "InPatch1": "RegionTracking", "InPatch2": "RegionTracking", "ArenaCenter": "DistanceFromPoint", "InArena": "InRange", "InCorridor": "InRange", "ClockSynchronizer": null}
{"VideoController": "VideoController", "CameraTop": "VideoSource", "CameraWest": "VideoSource", "CameraEast": "VideoSource", "CameraNorth": "VideoSource", "CameraSouth": "VideoSource", "CameraPatch1": "VideoSource", "CameraPatch2": "VideoSource", "CameraNest": "VideoSource", "AudioAmbient": "AudioSource", "Patch1": "UndergroundFeeder", "Patch2": "UndergroundFeeder", "WeightNest": "WeightScale", "TrackingTop": "PositionTracking", "ActivityCenter": "ActivityTracking", "ActivityArena": "ActivityTracking", "ActivityNest": "ActivityTracking", "ActivityPatch1": "ActivityTracking", "ActivityPatch2": "ActivityTracking", "InNest": "RegionTracking", "InPatch1": "RegionTracking", "InPatch2": "RegionTracking", "ArenaCenter": "DistanceFromPoint", "InArena": "InRange", "InCorridor": "InRange", "ClockSynchronizer": "Synchronizer"}
112 changes: 94 additions & 18 deletions aeon/dj_pipeline/streams.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import inspect
import os

import datajoint as dj
import pandas as pd
Expand Down Expand Up @@ -124,12 +125,12 @@ def get_device_stream_template(device_type: str, stream_type: str):
stream = reader(**stream_detail["stream_reader_kwargs"])

table_definition = f""" # Raw per-chunk {stream_type} data stream from {device_type} (auto-generated with aeon_mecha-{aeon.__version__})
-> {device_type}
-> acquisition.Chunk
---
sample_count: int # number of data points acquired from this stream for a given chunk
timestamps: longblob # (datetime) timestamps of {stream_type} data
"""
-> {device_type}
-> acquisition.Chunk
---
sample_count: int # number of data points acquired from this stream for a given chunk
timestamps: longblob # (datetime) timestamps of {stream_type} data
"""

for col in stream.columns:
if col.startswith("_"):
Expand Down Expand Up @@ -196,27 +197,102 @@ def make(self, key):

return DeviceDataStream


# endregion



def main(context=None):

import re
if context is None:
context = inspect.currentframe().f_back.f_locals

# Create tables.
# Create DeviceType tables.
for device_info in (DeviceType).fetch(as_dict=True):
table_class = get_device_template(device_info["device_type"])
context[table_class.__name__] = table_class
schema(table_class, context=context)

if device_info["device_type"] not in locals():
table_class = get_device_template(device_info["device_type"])
context[table_class.__name__] = table_class
schema(table_class, context=context)

device_table_def = inspect.getsource(table_class).lstrip()
replacements = {
"ExperimentDevice": device_info["device_type"],
"{device_title}": dj.utils.from_camel_case(device_info["device_type"]),
"{device_type}": dj.utils.from_camel_case(device_info["device_type"]),
"{aeon.__version__}": aeon.__version__
}
for old, new in replacements.items():
device_table_def = device_table_def.replace(old, new)
full_def = "@schema \n" + device_table_def + "\n\n"
if os.path.exists("existing_module.py"):
with open("existing_module.py", "r") as f:
existing_content = f.read()

if full_def in existing_content:
continue

with open("existing_module.py", "a") as f:
f.write(full_def)
else:
with open("existing_module.py", "w") as f:
full_def = """import datajoint as dj\nimport pandas as pd\n\nimport aeon\nfrom aeon.dj_pipeline import acquisition\nfrom aeon.io import api as io_api\n\n""" + full_def
f.write(full_def)

# Create DeviceDataStream tables.
for device_info in (DeviceType.Stream).fetch(as_dict=True):
table_class = get_device_stream_template(
device_info["device_type"], device_info["stream_type"]
)
context[table_class.__name__] = table_class
schema(table_class, context=context)

device_type = device_info['device_type']
stream_type = device_info['stream_type']
table_name = f"{device_type}{stream_type}"

if table_name not in locals():
table_class = get_device_stream_template(
device_type, stream_type)
context[table_class.__name__] = table_class
schema(table_class, context=context)

stream_obj = table_class.__dict__["_stream_reader"]
reader = stream_obj.__module__ + '.' + stream_obj.__name__
stream_detail = table_class.__dict__["_stream_detail"]

device_stream_table_def = inspect.getsource(table_class).lstrip()

old_definition = f"""# Raw per-chunk {stream_type} data stream from {device_type} (auto-generated with aeon_mecha-{aeon.__version__})
-> {device_type}
-> acquisition.Chunk
---
sample_count: int # number of data points acquired from this stream for a given chunk
timestamps: longblob # (datetime) timestamps of {stream_type} data
"""

replacements = {
"DeviceDataStream": f"{device_type}{stream_type}","ExperimentDevice": device_type,
'f"chunk_start >= {dj.utils.from_camel_case(device_type)}_install_time"': f"'chunk_start >= {dj.utils.from_camel_case(device_type)}_install_time'",
"""f'chunk_start < IFNULL({dj.utils.from_camel_case(device_type)}_removal_time, "2200-01-01")'""": f"""'chunk_start < IFNULL({dj.utils.from_camel_case(device_type)}_removal_time, "2200-01-01")'""",
'f"{dj.utils.from_camel_case(device_type)}_name"': f"'{dj.utils.from_camel_case(device_type)}_name'",
"{device_type}": device_type,
"{stream_type}": stream_type,
"{aeon.__version__}": aeon.__version__,
}
for old, new in replacements.items():
new_definition = old_definition.replace(old, new)

replacements["table_definition"] = '"""'+new_definition+'"""'

for old, new in replacements.items():
device_stream_table_def = device_stream_table_def.replace(old, new)

device_stream_table_def = re.sub(r'_stream_reader\s*=\s*reader', f'_stream_reader = {reader}', device_stream_table_def) # insert reader
device_stream_table_def = re.sub(r'_stream_detail\s*=\s*stream_detail', f'_stream_detail = {stream_detail}', device_stream_table_def) # insert stream details

full_def = "@schema \n" + device_stream_table_def + "\n\n"

with open("existing_module.py", "r") as f:
existing_content = f.read()

if full_def in existing_content:
continue

with open("existing_module.py", "a") as f:
f.write(full_def)

main()
41 changes: 9 additions & 32 deletions aeon/dj_pipeline/webapps/sciviz/docker-compose-local.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@ services:
pharus:
cpus: 2.0
mem_limit: 4g
image: jverswijver/pharus:0.8.0py3.9
image: jverswijver/pharus:0.8.5-PY_VER-3.9
environment:
# - FLASK_ENV=development # enables logging to console from Flask
- PHARUS_SPEC_PATH=/main/specs/specsheet.yaml # for dynamic utils spec
- PHARUS_SPEC_PATH=/main/specsheet.yaml # for dynamic utils spec
- PHARUS_MODE=DEV
user: ${HOST_UID}:anaconda
user: root
volumes:
- ./specsheet.yaml:/main/specs/specsheet.yaml #copy the spec over to /main/specs/YOUR_SPEC_NAME
- ./specsheet.yaml:/main/specsheet.yaml #copy the spec over to /main/specs/YOUR_SPEC_NAME
- ./apk_requirements.txt:/tmp/apk_requirements.txt
- /ceph/aeon/aeon:/ceph/aeon/aeon
command:
Expand Down Expand Up @@ -52,13 +52,14 @@ services:
sci-viz:
cpus: 2.0
mem_limit: 16g
image: jverswijver/sci-viz:1.1.1-bugfix2
image: datajoint/sci-viz:2.3.2
environment:
- CHOKIDAR_USEPOLLING=true
- REACT_APP_DJSCIVIZ_BACKEND_PREFIX=/api
- DJSCIVIZ_SPEC_PATH=specsheet.yaml
- DJSCIVIZ_SPEC_PATH=/main/specsheet.yaml
- DJSCIVIZ_MODE=DEV
- NODE_OPTIONS="--max-old-space-size=12000"
user: root
volumes:
- ./specsheet.yaml:/main/specsheet.yaml
# ports:
Expand All @@ -67,31 +68,7 @@ services:
- sh
- -c
- |
sciviz_update() {
[ -z "$$NGINX_PID" ] || kill $$NGINX_PID
rm -R /usr/share/nginx/html
python frontend_gen.py
yarn build
mv ./build /usr/share/nginx/html
nginx -g "daemon off;" &
NGINX_PID=$$!
}
sciviz_update
echo "[$$(date -u '+%Y-%m-%d %H:%M:%S')][DataJoint]: Monitoring SciViz updates..."
INIT_TIME=$$(date +%s)
LAST_MOD_TIME=$$(date -r $$DJSCIVIZ_SPEC_PATH +%s)
DELTA=$$(expr $$LAST_MOD_TIME - $$INIT_TIME)
while true; do
CURR_LAST_MOD_TIME=$$(date -r $$DJSCIVIZ_SPEC_PATH +%s)
CURR_DELTA=$$(expr $$CURR_LAST_MOD_TIME - $$INIT_TIME)
if [ "$$DELTA" -lt "$$CURR_DELTA" ]; then
echo "[$$(date -u '+%Y-%m-%d %H:%M:%S')][DataJoint]: Reloading SciViz since \`$$DJSCIVIZ_SPEC_PATH\` changed."
sciviz_update
DELTA=$$CURR_DELTA
else
sleep 5
fi
done
sh sci-viz-hotreload-dev.sh
networks:
- main
fakeservices.datajoint.io:
Expand All @@ -103,7 +80,7 @@ services:
- ADD_sciviz_TYPE=REST
- ADD_sciviz_ENDPOINT=sci-viz:3000
- ADD_sciviz_PREFIX=/
# - HTTPS_PASSTHRU=TRUE
# - HTTPS_PASSTHRU=TRUE
- DEPLOYMENT_PORT
ports:
- "443:443"
Expand Down
Loading

0 comments on commit 6b34e43

Please sign in to comment.