Skip to content

Commit

Permalink
Merge pull request #225 from SainsburyWellcomeCentre/datajoint_presocial
Browse files Browse the repository at this point in the history
Datajoint presocial
  • Loading branch information
Thinh Nguyen authored Aug 10, 2023
2 parents 3c9ec7e + 478e6f9 commit b29f0c3
Show file tree
Hide file tree
Showing 13 changed files with 1,721 additions and 667 deletions.
6 changes: 6 additions & 0 deletions aeon/dj_pipeline/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@
repository_config = dj.config['custom'].get('repository_config',
_default_repository_config)

try:
from .utils import streams_maker
streams = dj.VirtualModule("streams", streams_maker.STREAMS_MODULE_NAME)
except:
pass


def get_schema_name(name):
return db_prefix + name
Expand Down
60 changes: 47 additions & 13 deletions aeon/dj_pipeline/acquisition.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,28 +10,32 @@
from aeon.io import reader as io_reader
from aeon.schema import dataset as aeon_schema

from . import get_schema_name, lab, subject
from . import get_schema_name
from .utils import paths
from .utils.load_metadata import extract_epoch_metadata, ingest_epoch_metadata

logger = dj.logger
schema = dj.schema(get_schema_name("acquisition"))


# ------------------- Some Constants --------------------------

_ref_device_mapping = {
"exp0.1-r0": "FrameTop",
"social0-r1": "FrameTop",
"exp0.2-r0": "CameraTop",
"oct1.0-r0": "CameraTop",
"presocial0.1-a2": "CameraTop",
"presocial0.1-a3": "CameraTop",
"presocial0.1-a4": "CameraTop",
}

_device_schema_mapping = {
"exp0.1-r0": aeon_schema.exp01,
"social0-r1": aeon_schema.exp01,
"exp0.2-r0": aeon_schema.exp02,
"oct1.0-r0": aeon_schema.octagon01,
"presocial0.1-a2": aeon_schema.presocial,
"presocial0.1-a3": aeon_schema.presocial,
"presocial0.1-a4": aeon_schema.presocial,
}


Expand Down Expand Up @@ -119,13 +123,17 @@ class Directory(dj.Part):

@classmethod
def get_data_directory(cls, experiment_key, directory_type="raw", as_posix=False):
repo_name, dir_path = (
cls.Directory & experiment_key & {"directory_type": directory_type}
).fetch1("repository_name", "directory_path")
data_directory = paths.get_repository_path(repo_name) / dir_path
if not data_directory.exists():
return None
return data_directory.as_posix() if as_posix else data_directory

try:
repo_name, dir_path = (
cls.Directory & experiment_key & {"directory_type": directory_type}
).fetch1("repository_name", "directory_path")
data_directory = paths.get_repository_path(repo_name) / dir_path
if not data_directory.exists():
return None
return data_directory.as_posix() if as_posix else data_directory
except dj.errors.DataJointError:
return

@classmethod
def get_data_directories(
Expand Down Expand Up @@ -269,6 +277,13 @@ def ingest_epochs(cls, experiment_name, start=None, end=None):
- if not specified, ingest all epochs
Note: "start" and "end" are datetime specified a string in the format: "%Y-%m-%d %H:%M:%S"
"""
from .utils import streams_maker
from .utils.load_metadata import (
extract_epoch_config,
ingest_epoch_metadata,
insert_device_types,
)

device_name = _ref_device_mapping.get(experiment_name, "CameraTop")

all_chunks, raw_data_dirs = _get_all_chunks(experiment_name, device_name)
Expand Down Expand Up @@ -301,7 +316,7 @@ def ingest_epochs(cls, experiment_name, start=None, end=None):
if experiment_name != "exp0.1-r0":
metadata_yml_filepath = epoch_dir / "Metadata.yml"
if metadata_yml_filepath.exists():
epoch_config = extract_epoch_metadata(
epoch_config = extract_epoch_config(
experiment_name, metadata_yml_filepath
)

Expand Down Expand Up @@ -346,8 +361,27 @@ def ingest_epochs(cls, experiment_name, start=None, end=None):
cls.insert1(epoch_key)
if epoch_config:
cls.Config.insert1(epoch_config)
ingest_epoch_metadata(experiment_name, metadata_yml_filepath)
epoch_list.append(epoch_key)
if metadata_yml_filepath and metadata_yml_filepath.exists():

try:
# Insert new entries for streams.DeviceType, streams.Device.
insert_device_types(
_device_schema_mapping[epoch_key["experiment_name"]],
metadata_yml_filepath,
)
# Define and instantiate new devices/stream tables under `streams` schema
streams_maker.main()
with cls.connection.transaction:
# Insert devices' installation/removal/settings
ingest_epoch_metadata(
experiment_name, metadata_yml_filepath
)
epoch_list.append(epoch_key)
except Exception as e:
(cls.Config & epoch_key).delete_quick()
(cls & epoch_key).delete_quick()
raise e

# update previous epoch
if (
previous_epoch_key
Expand Down
56 changes: 56 additions & 0 deletions aeon/dj_pipeline/create_experiments/create_presocial.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
from aeon.dj_pipeline import acquisition, lab, subject

experiment_type = "presocial0.1"
experiment_names = ["presocial0.1-a2", "presocial0.1-a3", "presocial0.1-a4"]
location = "4th floor"
computers = ["AEON2", "AEON3", "AEON4"]

def create_new_experiment():

lab.Location.insert1({"lab": "SWC", "location": location}, skip_duplicates=True)

acquisition.ExperimentType.insert1(
{"experiment_type": experiment_type}, skip_duplicates=True
)

acquisition.Experiment.insert(
[{
"experiment_name": experiment_name,
"experiment_start_time": "2023-02-25 00:00:00",
"experiment_description": "presocial experiment 0.1",
"arena_name": "circle-2m",
"lab": "SWC",
"location": location,
"experiment_type": experiment_type
} for experiment_name in experiment_names],
skip_duplicates=True,
)

acquisition.Experiment.Subject.insert(
[
{"experiment_name": experiment_name, "subject": s}
for experiment_name in experiment_names
for s in subject.Subject.fetch("subject")
],
skip_duplicates=True,
)

acquisition.Experiment.Directory.insert(
[
{
"experiment_name": experiment_name,
"repository_name": "ceph_aeon",
"directory_type": "raw",
"directory_path": f"aeon/data/raw/{computer}/{experiment_type}"
} for experiment_name, computer in zip(experiment_names, computers)
],
skip_duplicates=True,
)


def main():
create_new_experiment()


if __name__ == "__main__":
main()
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"VideoController": "VideoController", "CameraTop": "VideoSource", "CameraWest": "VideoSource", "CameraEast": "VideoSource", "CameraNorth": "VideoSource", "CameraSouth": "VideoSource", "CameraPatch1": "VideoSource", "CameraPatch2": "VideoSource", "CameraNest": "VideoSource", "AudioAmbient": "AudioSource", "Patch1": "UndergroundFeeder", "Patch2": "UndergroundFeeder", "WeightNest": "WeightScale", "TrackingTop": "PositionTracking", "ActivityCenter": "ActivityTracking", "ActivityArena": "ActivityTracking", "ActivityNest": "ActivityTracking", "ActivityPatch1": "ActivityTracking", "ActivityPatch2": "ActivityTracking", "InNest": "RegionTracking", "InPatch1": "RegionTracking", "InPatch2": "RegionTracking", "ArenaCenter": "DistanceFromPoint", "InArena": "InRange", "InCorridor": "InRange", "ClockSynchronizer": "Synchronizer"}
2 changes: 1 addition & 1 deletion aeon/dj_pipeline/lab.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ class Colony(dj.Lookup):
---
reference_weight=null : float
sex='U' : enum('M', 'F', 'U')
subject_birth_date=null : date # date of birth
subject_birth_date=null : date # date of birth
note='' : varchar(1024)
"""

Expand Down
4 changes: 2 additions & 2 deletions aeon/dj_pipeline/populate/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
from datajoint_utilities.dj_worker import parse_args

from aeon.dj_pipeline.populate.worker import (
high_priority,
acquisition_worker,
mid_priority,
streams_worker,
logger,
Expand All @@ -46,7 +46,7 @@
# ---- some wrappers to support execution as script or CLI

configured_workers = {
"high_priority": high_priority,
"high_priority": acquisition_worker,
"mid_priority": mid_priority,
"streams_worker": streams_worker,
}
Expand Down
92 changes: 59 additions & 33 deletions aeon/dj_pipeline/populate/worker.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import datajoint as dj
from datajoint_utilities.dj_worker import (
DataJointWorker,
WorkerLog,
ErrorLog,
WorkerLog,
is_djtable,
)

Expand All @@ -13,13 +13,14 @@
qc,
report,
tracking,
streams,
)
from aeon.dj_pipeline.utils import load_metadata
from aeon.dj_pipeline.utils import load_metadata, streams_maker


streams = streams_maker.main()

__all__ = [
"high_priority",
"acquisition_worker",
"mid_priority",
"streams_worker",
"WorkerLog",
Expand All @@ -29,51 +30,77 @@

# ---- Some constants ----
logger = dj.logger
_current_experiment = "exp0.2-r0"
worker_schema_name = db_prefix + "workerlog"
worker_schema_name = db_prefix + "worker"
load_metadata.insert_stream_types()


# ---- Manage experiments for automated ingestion ----

schema = dj.Schema(worker_schema_name)


@schema
class AutomatedExperimentIngestion(dj.Manual):
definition = """ # experiments to undergo automated ingestion
-> acquisition.Experiment
"""


def ingest_colony_epochs_chunks():
"""
Load and insert subjects from colony.csv
Ingest epochs and chunks
for experiments specified in AutomatedExperimentIngestion
"""
load_metadata.ingest_subject()
experiment_names = AutomatedExperimentIngestion.fetch("experiment_name")
for experiment_name in experiment_names:
acquisition.Epoch.ingest_epochs(experiment_name)
acquisition.Chunk.ingest_chunks(experiment_name)


def ingest_environment_visits():
"""
Extract and insert complete visits
for experiments specified in AutomatedExperimentIngestion
"""
experiment_names = AutomatedExperimentIngestion.fetch("experiment_name")
analysis.ingest_environment_visits(experiment_names)


# ---- Define worker(s) ----
# configure a worker to process high-priority tasks
high_priority = DataJointWorker(
"high_priority",
# configure a worker to process `acquisition`-related tasks
acquisition_worker = DataJointWorker(
"acquisition_worker",
worker_schema_name=worker_schema_name,
db_prefix=db_prefix,
run_duration=-1,
sleep_duration=600,
sleep_duration=1200,
)
acquisition_worker(ingest_colony_epochs_chunks)
acquisition_worker(acquisition.ExperimentLog)
acquisition_worker(acquisition.SubjectEnterExit)
acquisition_worker(acquisition.SubjectWeight)
acquisition_worker(acquisition.FoodPatchEvent)
acquisition_worker(acquisition.WheelState)

high_priority(load_metadata.ingest_subject)
high_priority(acquisition.Epoch.ingest_epochs, experiment_name=_current_experiment)
high_priority(acquisition.Chunk.ingest_chunks, experiment_name=_current_experiment)
high_priority(acquisition.ExperimentLog)
high_priority(acquisition.SubjectEnterExit)
high_priority(acquisition.SubjectWeight)
high_priority(acquisition.FoodPatchEvent)
high_priority(acquisition.WheelState)
high_priority(acquisition.WeightMeasurement)
high_priority(acquisition.WeightMeasurementFiltered)

high_priority(
analysis.ingest_environment_visits, experiment_names=[_current_experiment]
)
acquisition_worker(ingest_environment_visits)

# configure a worker to process mid-priority tasks
mid_priority = DataJointWorker(
"mid_priority",
worker_schema_name=worker_schema_name,
db_prefix=db_prefix,
run_duration=-1,
sleep_duration=120,
sleep_duration=3600,
)

mid_priority(qc.CameraQC)
mid_priority(tracking.CameraTracking)
mid_priority(acquisition.FoodPatchWheel)
mid_priority(acquisition.WeightMeasurement)
mid_priority(acquisition.WeightMeasurementFiltered)

mid_priority(
analysis.visit.ingest_environment_visits, experiment_names=[_current_experiment]
)
mid_priority(analysis.OverlapVisit)

mid_priority(analysis.VisitSubjectPosition)
Expand All @@ -94,11 +121,10 @@
"streams_worker",
worker_schema_name=worker_schema_name,
db_prefix=db_prefix,
run_duration=1,
sleep_duration=600,
run_duration=-1,
sleep_duration=1200,
)

for attr in vars(streams).values():
if is_djtable(attr) and hasattr(attr, "populate"):
streams_worker(attr)

if is_djtable(attr, dj.user_tables.AutoPopulate):
streams_worker(attr, max_calls=10)
Loading

0 comments on commit b29f0c3

Please sign in to comment.