diff --git a/aeon/dj_pipeline/__init__.py b/aeon/dj_pipeline/__init__.py index 4664cb93..ea805b57 100644 --- a/aeon/dj_pipeline/__init__.py +++ b/aeon/dj_pipeline/__init__.py @@ -16,6 +16,12 @@ repository_config = dj.config['custom'].get('repository_config', _default_repository_config) +try: + from .utils import streams_maker + streams = dj.VirtualModule("streams", streams_maker.STREAMS_MODULE_NAME) +except: + pass + def get_schema_name(name): return db_prefix + name diff --git a/aeon/dj_pipeline/acquisition.py b/aeon/dj_pipeline/acquisition.py index e180382c..03a9a0f4 100644 --- a/aeon/dj_pipeline/acquisition.py +++ b/aeon/dj_pipeline/acquisition.py @@ -10,14 +10,12 @@ from aeon.io import reader as io_reader from aeon.schema import dataset as aeon_schema -from . import get_schema_name, lab, subject +from . import get_schema_name from .utils import paths -from .utils.load_metadata import extract_epoch_metadata, ingest_epoch_metadata logger = dj.logger schema = dj.schema(get_schema_name("acquisition")) - # ------------------- Some Constants -------------------------- _ref_device_mapping = { @@ -25,6 +23,9 @@ "social0-r1": "FrameTop", "exp0.2-r0": "CameraTop", "oct1.0-r0": "CameraTop", + "presocial0.1-a2": "CameraTop", + "presocial0.1-a3": "CameraTop", + "presocial0.1-a4": "CameraTop", } _device_schema_mapping = { @@ -32,6 +33,9 @@ "social0-r1": aeon_schema.exp01, "exp0.2-r0": aeon_schema.exp02, "oct1.0-r0": aeon_schema.octagon01, + "presocial0.1-a2": aeon_schema.presocial, + "presocial0.1-a3": aeon_schema.presocial, + "presocial0.1-a4": aeon_schema.presocial, } @@ -119,13 +123,17 @@ class Directory(dj.Part): @classmethod def get_data_directory(cls, experiment_key, directory_type="raw", as_posix=False): - repo_name, dir_path = ( - cls.Directory & experiment_key & {"directory_type": directory_type} - ).fetch1("repository_name", "directory_path") - data_directory = paths.get_repository_path(repo_name) / dir_path - if not data_directory.exists(): - return None - return data_directory.as_posix() if as_posix else data_directory + + try: + repo_name, dir_path = ( + cls.Directory & experiment_key & {"directory_type": directory_type} + ).fetch1("repository_name", "directory_path") + data_directory = paths.get_repository_path(repo_name) / dir_path + if not data_directory.exists(): + return None + return data_directory.as_posix() if as_posix else data_directory + except dj.errors.DataJointError: + return @classmethod def get_data_directories( @@ -269,6 +277,13 @@ def ingest_epochs(cls, experiment_name, start=None, end=None): - if not specified, ingest all epochs Note: "start" and "end" are datetime specified a string in the format: "%Y-%m-%d %H:%M:%S" """ + from .utils import streams_maker + from .utils.load_metadata import ( + extract_epoch_config, + ingest_epoch_metadata, + insert_device_types, + ) + device_name = _ref_device_mapping.get(experiment_name, "CameraTop") all_chunks, raw_data_dirs = _get_all_chunks(experiment_name, device_name) @@ -301,7 +316,7 @@ def ingest_epochs(cls, experiment_name, start=None, end=None): if experiment_name != "exp0.1-r0": metadata_yml_filepath = epoch_dir / "Metadata.yml" if metadata_yml_filepath.exists(): - epoch_config = extract_epoch_metadata( + epoch_config = extract_epoch_config( experiment_name, metadata_yml_filepath ) @@ -346,8 +361,27 @@ def ingest_epochs(cls, experiment_name, start=None, end=None): cls.insert1(epoch_key) if epoch_config: cls.Config.insert1(epoch_config) - ingest_epoch_metadata(experiment_name, metadata_yml_filepath) - epoch_list.append(epoch_key) + if metadata_yml_filepath and metadata_yml_filepath.exists(): + + try: + # Insert new entries for streams.DeviceType, streams.Device. + insert_device_types( + _device_schema_mapping[epoch_key["experiment_name"]], + metadata_yml_filepath, + ) + # Define and instantiate new devices/stream tables under `streams` schema + streams_maker.main() + with cls.connection.transaction: + # Insert devices' installation/removal/settings + ingest_epoch_metadata( + experiment_name, metadata_yml_filepath + ) + epoch_list.append(epoch_key) + except Exception as e: + (cls.Config & epoch_key).delete_quick() + (cls & epoch_key).delete_quick() + raise e + # update previous epoch if ( previous_epoch_key diff --git a/aeon/dj_pipeline/create_experiments/create_presocial.py b/aeon/dj_pipeline/create_experiments/create_presocial.py new file mode 100644 index 00000000..038d5927 --- /dev/null +++ b/aeon/dj_pipeline/create_experiments/create_presocial.py @@ -0,0 +1,56 @@ +from aeon.dj_pipeline import acquisition, lab, subject + +experiment_type = "presocial0.1" +experiment_names = ["presocial0.1-a2", "presocial0.1-a3", "presocial0.1-a4"] +location = "4th floor" +computers = ["AEON2", "AEON3", "AEON4"] + +def create_new_experiment(): + + lab.Location.insert1({"lab": "SWC", "location": location}, skip_duplicates=True) + + acquisition.ExperimentType.insert1( + {"experiment_type": experiment_type}, skip_duplicates=True + ) + + acquisition.Experiment.insert( + [{ + "experiment_name": experiment_name, + "experiment_start_time": "2023-02-25 00:00:00", + "experiment_description": "presocial experiment 0.1", + "arena_name": "circle-2m", + "lab": "SWC", + "location": location, + "experiment_type": experiment_type + } for experiment_name in experiment_names], + skip_duplicates=True, + ) + + acquisition.Experiment.Subject.insert( + [ + {"experiment_name": experiment_name, "subject": s} + for experiment_name in experiment_names + for s in subject.Subject.fetch("subject") + ], + skip_duplicates=True, + ) + + acquisition.Experiment.Directory.insert( + [ + { + "experiment_name": experiment_name, + "repository_name": "ceph_aeon", + "directory_type": "raw", + "directory_path": f"aeon/data/raw/{computer}/{experiment_type}" + } for experiment_name, computer in zip(experiment_names, computers) + ], + skip_duplicates=True, + ) + + +def main(): + create_new_experiment() + + +if __name__ == "__main__": + main() diff --git a/aeon/dj_pipeline/create_experiments/device_type_mapper.json b/aeon/dj_pipeline/create_experiments/device_type_mapper.json new file mode 100644 index 00000000..5ffbee8f --- /dev/null +++ b/aeon/dj_pipeline/create_experiments/device_type_mapper.json @@ -0,0 +1 @@ +{"VideoController": "VideoController", "CameraTop": "VideoSource", "CameraWest": "VideoSource", "CameraEast": "VideoSource", "CameraNorth": "VideoSource", "CameraSouth": "VideoSource", "CameraPatch1": "VideoSource", "CameraPatch2": "VideoSource", "CameraNest": "VideoSource", "AudioAmbient": "AudioSource", "Patch1": "UndergroundFeeder", "Patch2": "UndergroundFeeder", "WeightNest": "WeightScale", "TrackingTop": "PositionTracking", "ActivityCenter": "ActivityTracking", "ActivityArena": "ActivityTracking", "ActivityNest": "ActivityTracking", "ActivityPatch1": "ActivityTracking", "ActivityPatch2": "ActivityTracking", "InNest": "RegionTracking", "InPatch1": "RegionTracking", "InPatch2": "RegionTracking", "ArenaCenter": "DistanceFromPoint", "InArena": "InRange", "InCorridor": "InRange", "ClockSynchronizer": "Synchronizer"} \ No newline at end of file diff --git a/aeon/dj_pipeline/lab.py b/aeon/dj_pipeline/lab.py index 9c193483..07ea8b16 100644 --- a/aeon/dj_pipeline/lab.py +++ b/aeon/dj_pipeline/lab.py @@ -16,7 +16,7 @@ class Colony(dj.Lookup): --- reference_weight=null : float sex='U' : enum('M', 'F', 'U') - subject_birth_date=null : date # date of birth + subject_birth_date=null : date # date of birth note='' : varchar(1024) """ diff --git a/aeon/dj_pipeline/populate/process.py b/aeon/dj_pipeline/populate/process.py index c83347cb..023425ad 100644 --- a/aeon/dj_pipeline/populate/process.py +++ b/aeon/dj_pipeline/populate/process.py @@ -36,7 +36,7 @@ from datajoint_utilities.dj_worker import parse_args from aeon.dj_pipeline.populate.worker import ( - high_priority, + acquisition_worker, mid_priority, streams_worker, logger, @@ -46,7 +46,7 @@ # ---- some wrappers to support execution as script or CLI configured_workers = { - "high_priority": high_priority, + "high_priority": acquisition_worker, "mid_priority": mid_priority, "streams_worker": streams_worker, } diff --git a/aeon/dj_pipeline/populate/worker.py b/aeon/dj_pipeline/populate/worker.py index 994ad60a..b57c4c4e 100644 --- a/aeon/dj_pipeline/populate/worker.py +++ b/aeon/dj_pipeline/populate/worker.py @@ -1,8 +1,8 @@ import datajoint as dj from datajoint_utilities.dj_worker import ( DataJointWorker, - WorkerLog, ErrorLog, + WorkerLog, is_djtable, ) @@ -13,13 +13,14 @@ qc, report, tracking, - streams, ) -from aeon.dj_pipeline.utils import load_metadata +from aeon.dj_pipeline.utils import load_metadata, streams_maker +streams = streams_maker.main() + __all__ = [ - "high_priority", + "acquisition_worker", "mid_priority", "streams_worker", "WorkerLog", @@ -29,34 +30,61 @@ # ---- Some constants ---- logger = dj.logger -_current_experiment = "exp0.2-r0" -worker_schema_name = db_prefix + "workerlog" +worker_schema_name = db_prefix + "worker" +load_metadata.insert_stream_types() + + +# ---- Manage experiments for automated ingestion ---- + +schema = dj.Schema(worker_schema_name) + + +@schema +class AutomatedExperimentIngestion(dj.Manual): + definition = """ # experiments to undergo automated ingestion + -> acquisition.Experiment + """ + + +def ingest_colony_epochs_chunks(): + """ + Load and insert subjects from colony.csv + Ingest epochs and chunks + for experiments specified in AutomatedExperimentIngestion + """ + load_metadata.ingest_subject() + experiment_names = AutomatedExperimentIngestion.fetch("experiment_name") + for experiment_name in experiment_names: + acquisition.Epoch.ingest_epochs(experiment_name) + acquisition.Chunk.ingest_chunks(experiment_name) + + +def ingest_environment_visits(): + """ + Extract and insert complete visits + for experiments specified in AutomatedExperimentIngestion + """ + experiment_names = AutomatedExperimentIngestion.fetch("experiment_name") + analysis.ingest_environment_visits(experiment_names) # ---- Define worker(s) ---- -# configure a worker to process high-priority tasks -high_priority = DataJointWorker( - "high_priority", +# configure a worker to process `acquisition`-related tasks +acquisition_worker = DataJointWorker( + "acquisition_worker", worker_schema_name=worker_schema_name, db_prefix=db_prefix, run_duration=-1, - sleep_duration=600, + sleep_duration=1200, ) +acquisition_worker(ingest_colony_epochs_chunks) +acquisition_worker(acquisition.ExperimentLog) +acquisition_worker(acquisition.SubjectEnterExit) +acquisition_worker(acquisition.SubjectWeight) +acquisition_worker(acquisition.FoodPatchEvent) +acquisition_worker(acquisition.WheelState) -high_priority(load_metadata.ingest_subject) -high_priority(acquisition.Epoch.ingest_epochs, experiment_name=_current_experiment) -high_priority(acquisition.Chunk.ingest_chunks, experiment_name=_current_experiment) -high_priority(acquisition.ExperimentLog) -high_priority(acquisition.SubjectEnterExit) -high_priority(acquisition.SubjectWeight) -high_priority(acquisition.FoodPatchEvent) -high_priority(acquisition.WheelState) -high_priority(acquisition.WeightMeasurement) -high_priority(acquisition.WeightMeasurementFiltered) - -high_priority( - analysis.ingest_environment_visits, experiment_names=[_current_experiment] -) +acquisition_worker(ingest_environment_visits) # configure a worker to process mid-priority tasks mid_priority = DataJointWorker( @@ -64,16 +92,15 @@ worker_schema_name=worker_schema_name, db_prefix=db_prefix, run_duration=-1, - sleep_duration=120, + sleep_duration=3600, ) mid_priority(qc.CameraQC) mid_priority(tracking.CameraTracking) mid_priority(acquisition.FoodPatchWheel) +mid_priority(acquisition.WeightMeasurement) +mid_priority(acquisition.WeightMeasurementFiltered) -mid_priority( - analysis.visit.ingest_environment_visits, experiment_names=[_current_experiment] -) mid_priority(analysis.OverlapVisit) mid_priority(analysis.VisitSubjectPosition) @@ -94,11 +121,10 @@ "streams_worker", worker_schema_name=worker_schema_name, db_prefix=db_prefix, - run_duration=1, - sleep_duration=600, + run_duration=-1, + sleep_duration=1200, ) for attr in vars(streams).values(): - if is_djtable(attr) and hasattr(attr, "populate"): - streams_worker(attr) - + if is_djtable(attr, dj.user_tables.AutoPopulate): + streams_worker(attr, max_calls=10) diff --git a/aeon/dj_pipeline/streams.py b/aeon/dj_pipeline/streams.py index 4a2992a0..120b41fb 100644 --- a/aeon/dj_pipeline/streams.py +++ b/aeon/dj_pipeline/streams.py @@ -1,81 +1,19 @@ -import inspect -import re -from collections import defaultdict, namedtuple -from functools import cached_property +#---- DO NOT MODIFY ---- +#---- THIS FILE IS AUTO-GENERATED BY `streams_maker.py` ---- import datajoint as dj import pandas as pd +from uuid import UUID import aeon -import aeon.schema.core as stream -import aeon.schema.foraging as foraging -import aeon.schema.octagon as octagon -from aeon.dj_pipeline import acquisition, dict_to_uuid, get_schema_name +from aeon.dj_pipeline import acquisition, get_schema_name from aeon.io import api as io_api -logger = dj.logger - - -schema_name = f'u_{dj.config["database.user"]}_streams' # for testing -# schema_name = get_schema_name("streams") -schema = dj.schema(schema_name) - - -# __all__ = [ -# "StreamType", -# "DeviceType", -# "Device", -# ] - - -# Read from this list of device configurations -# (device_type, description, streams) -DEVICE_CONFIGS = [ - ( - "Camera", - "Camera device", - (stream.video, stream.position, foraging.region), - ), - ("Metadata", "Metadata", (stream.metadata,)), - ( - "ExperimentalMetadata", - "ExperimentalMetadata", - (stream.environment, stream.messageLog), - ), - ( - "NestScale", - "Weight scale at nest", - (foraging.weight,), - ), - ( - "FoodPatch", - "Food patch", - (foraging.patch,), - ), - ( - "Photodiode", - "Photodiode", - (octagon.photodiode,), - ), - ( - "OSC", - "OSC", - (octagon.OSC,), - ), - ( - "TaskLogic", - "TaskLogic", - (octagon.TaskLogic,), - ), - ( - "Wall", - "Wall", - (octagon.Wall,), - ), -] - - -@schema +schema_name = get_schema_name('streams') +schema = dj.Schema() + + +@schema class StreamType(dj.Lookup): """ Catalog of all steam types for the different device types used across Project Aeon @@ -85,66 +23,17 @@ class StreamType(dj.Lookup): """ definition = """ # Catalog of all stream types used across Project Aeon - stream_type: varchar(20) + stream_type : varchar(20) --- - stream_reader: varchar(256) # name of the reader class found in `aeon_mecha` package (e.g. aeon.io.reader.Video) - stream_reader_kwargs: longblob # keyword arguments to instantiate the reader class - stream_description='': varchar(256) - stream_hash: uuid # hash of dict(stream_reader_kwargs, stream_reader=stream_reader) + stream_reader : varchar(256) # name of the reader class found in `aeon_mecha` package (e.g. aeon.io.reader.Video) + stream_reader_kwargs : longblob # keyword arguments to instantiate the reader class + stream_description='': varchar(256) + stream_hash : uuid # hash of dict(stream_reader_kwargs, stream_reader=stream_reader) unique index (stream_hash) """ - @staticmethod - def get_stream_entries(device_streams: tuple, pattern="{pattern}") -> dict: - - composite = aeon.io.device.compositeStream(pattern, *device_streams) - stream_entries = [] - for stream_name, stream_reader in composite.items(): - if stream_name == pattern: - stream_name = stream_reader.__class__.__name__ - entry = { - "stream_type": stream_name, - "stream_reader": f"{stream_reader.__module__}.{stream_reader.__class__.__name__}", - "stream_reader_kwargs": { - k: v - for k, v in vars(stream_reader).items() - if k - in inspect.signature(stream_reader.__class__.__init__).parameters - }, - } - entry["stream_hash"] = dict_to_uuid( - { - **entry["stream_reader_kwargs"], - "stream_reader": entry["stream_reader"], - } - ) - stream_entries.append(entry) - - return stream_entries - - @classmethod - def insert_streams(cls, device_configs: list[namedtuple] = []): - - if not device_configs: - device_configs = get_device_configs() - for device in device_configs: - stream_entries = cls.get_stream_entries(device.streams) - for entry in stream_entries: - q_param = cls & {"stream_hash": entry["stream_hash"]} - if q_param: # If the specified stream type already exists - pname = q_param.fetch1("stream_type") - if pname != entry["stream_type"]: - # If the existed stream type does not have the same name: - # human error, trying to add the same content with different name - raise dj.DataJointError( - f"The specified stream type already exists - name: {pname}" - ) - - cls.insert(stream_entries, skip_duplicates=True) - - -@schema +@schema class DeviceType(dj.Lookup): """ Catalog of all device types used across Project Aeon @@ -162,33 +51,8 @@ class Stream(dj.Part): -> StreamType """ - @classmethod - def insert_devices(cls, device_configs: list[namedtuple] = []): - if not device_configs: - device_configs = get_device_configs() - for device in device_configs: - stream_entries = StreamType.get_stream_entries(device.streams) - with cls.connection.transaction: - cls.insert1( - { - "device_type": device.type, - "device_description": device.desc, - }, - skip_duplicates=True, - ) - cls.Stream.insert( - [ - { - "device_type": device.type, - "stream_type": e["stream_type"], - } - for e in stream_entries - ], - skip_duplicates=True, - ) - - -@schema + +@schema class Device(dj.Lookup): definition = """ # Physical devices, of a particular type, identified by unique serial number device_serial_number: varchar(12) @@ -197,101 +61,243 @@ class Device(dj.Lookup): """ -## --------- Helper functions & classes --------- ## +@schema +class Patch(dj.Manual): + definition = f""" + # patch placement and operation for a particular time period, at a certain location, for a given experiment (auto-generated with aeon_mecha-unknown) + -> acquisition.Experiment + -> Device + patch_install_time : datetime(6) # time of the patch placed and started operation at this position + --- + patch_name : varchar(36) + """ + class Attribute(dj.Part): + definition = """ # metadata/attributes (e.g. FPS, config, calibration, etc.) associated with this experimental device + -> master + attribute_name : varchar(32) + --- + attribute_value=null : longblob + """ -def get_device_configs(device_configs=DEVICE_CONFIGS) -> list[namedtuple]: - """Returns a list of device configurations from DEVICE_CONFIGS""" + class RemovalTime(dj.Part): + definition = f""" + -> master + --- + patch_removal_time: datetime(6) # time of the patch being removed + """ - device = namedtuple("device", "type desc streams") - return [device._make(c) for c in device_configs] +@schema +class UndergroundFeeder(dj.Manual): + definition = f""" + # underground_feeder placement and operation for a particular time period, at a certain location, for a given experiment (auto-generated with aeon_mecha-unknown) + -> acquisition.Experiment + -> Device + underground_feeder_install_time : datetime(6) # time of the underground_feeder placed and started operation at this position + --- + underground_feeder_name : varchar(36) + """ + + class Attribute(dj.Part): + definition = """ # metadata/attributes (e.g. FPS, config, calibration, etc.) associated with this experimental device + -> master + attribute_name : varchar(32) + --- + attribute_value=null : longblob + """ + + class RemovalTime(dj.Part): + definition = f""" + -> master + --- + underground_feeder_removal_time: datetime(6) # time of the underground_feeder being removed + """ -def get_device_template(device_type): - """Returns table class template for ExperimentDevice""" - device_title = device_type - device_type = dj.utils.from_camel_case(device_type) - class ExperimentDevice(dj.Manual): +@schema +class VideoSource(dj.Manual): definition = f""" - # {device_title} placement and operation for a particular time period, at a certain location, for a given experiment (auto-generated with aeon_mecha-{aeon.__version__}) + # video_source placement and operation for a particular time period, at a certain location, for a given experiment (auto-generated with aeon_mecha-unknown) -> acquisition.Experiment -> Device - {device_type}_install_time: datetime(6) # time of the {device_type} placed and started operation at this position + video_source_install_time : datetime(6) # time of the video_source placed and started operation at this position --- - {device_type}_name: varchar(36) + video_source_name : varchar(36) """ class Attribute(dj.Part): definition = """ # metadata/attributes (e.g. FPS, config, calibration, etc.) associated with this experimental device -> master - attribute_name : varchar(32) + attribute_name : varchar(32) --- - attribute_value='': varchar(2000) + attribute_value=null : longblob """ class RemovalTime(dj.Part): definition = f""" -> master --- - {device_type}_remove_time: datetime(6) # time of the camera being removed from this position + video_source_removal_time: datetime(6) # time of the video_source being removed """ - ExperimentDevice.__name__ = f"Experiment{device_title}" - return ExperimentDevice +@schema +class PatchBeamBreak(dj.Imported): + definition = """# Raw per-chunk BeamBreak data stream from Patch (auto-generated with aeon_mecha-unknown) + -> Patch + -> acquisition.Chunk + --- + sample_count: int # number of data points acquired from this stream for a given chunk + timestamps: longblob # (datetime) timestamps of BeamBreak data + """ + _stream_reader = aeon.io.reader.BitmaskEvent + _stream_detail = {'stream_type': 'BeamBreak', 'stream_reader': 'aeon.io.reader.BitmaskEvent', 'stream_reader_kwargs': {'pattern': '{pattern}_32', 'value': 34, 'tag': 'BeamBroken'}, 'stream_description': '', 'stream_hash': UUID('b14171e6-d27d-117a-ae73-a16c4b5fc8a2')} + + @property + def key_source(self): + f""" + Only the combination of Chunk and Patch with overlapping time + + Chunk(s) that started after Patch install time and ended before Patch remove time + + Chunk(s) that started after Patch install time for Patch that are not yet removed + """ + return ( + acquisition.Chunk + * Patch.join(Patch.RemovalTime, left=True) + & 'chunk_start >= patch_install_time' + & 'chunk_start < IFNULL(patch_removal_time, "2200-01-01")' + ) + + def make(self, key): + chunk_start, chunk_end, dir_type = (acquisition.Chunk & key).fetch1( + "chunk_start", "chunk_end", "directory_type" + ) + raw_data_dir = acquisition.Experiment.get_data_directory( + key, directory_type=dir_type + ) + + device_name = (Patch & key).fetch1( + 'patch_name' + ) + + stream = self._stream_reader( + **{ + k: v.format(**{k: device_name}) if k == "pattern" else v + for k, v in self._stream_detail["stream_reader_kwargs"].items() + } + ) + stream_data = io_api.load( + root=raw_data_dir.as_posix(), + reader=stream, + start=pd.Timestamp(chunk_start), + end=pd.Timestamp(chunk_end), + ) -def get_device_stream_template(device_type, stream_type): - """Returns table class template for DeviceDataStream""" + self.insert1( + { + **key, + "sample_count": len(stream_data), + "timestamps": stream_data.index.values, + **{ + c: stream_data[c].values + for c in stream.columns + if not c.startswith("_") + }, + } + ) - ExperimentDevice = get_device_template(device_type) - exp_device_table_name = f"Experiment{device_type}" - # DeviceDataStream table(s) - stream_detail = ( - StreamType - & (DeviceType.Stream & {"device_type": device_type, "stream_type": stream_type}) - ).fetch1() +@schema +class PatchDeliverPellet(dj.Imported): + definition = """# Raw per-chunk DeliverPellet data stream from Patch (auto-generated with aeon_mecha-unknown) + -> Patch + -> acquisition.Chunk + --- + sample_count: int # number of data points acquired from this stream for a given chunk + timestamps: longblob # (datetime) timestamps of DeliverPellet data + """ + _stream_reader = aeon.io.reader.BitmaskEvent + _stream_detail = {'stream_type': 'DeliverPellet', 'stream_reader': 'aeon.io.reader.BitmaskEvent', 'stream_reader_kwargs': {'pattern': '{pattern}_35', 'value': 1, 'tag': 'TriggeredPellet'}, 'stream_description': '', 'stream_hash': UUID('c49dda51-2e38-8b49-d1d8-2e54ea928e9c')} - for i, n in enumerate(stream_detail["stream_reader"].split(".")): - if i == 0: - reader = aeon - else: - reader = getattr(reader, n) + @property + def key_source(self): + f""" + Only the combination of Chunk and Patch with overlapping time + + Chunk(s) that started after Patch install time and ended before Patch remove time + + Chunk(s) that started after Patch install time for Patch that are not yet removed + """ + return ( + acquisition.Chunk + * Patch.join(Patch.RemovalTime, left=True) + & 'chunk_start >= patch_install_time' + & 'chunk_start < IFNULL(patch_removal_time, "2200-01-01")' + ) - stream = reader(**stream_detail["stream_reader_kwargs"]) + def make(self, key): + chunk_start, chunk_end, dir_type = (acquisition.Chunk & key).fetch1( + "chunk_start", "chunk_end", "directory_type" + ) + raw_data_dir = acquisition.Experiment.get_data_directory( + key, directory_type=dir_type + ) - table_definition = f""" # Raw per-chunk {stream_type} data stream from {device_type} (auto-generated with aeon_mecha-{aeon.__version__}) - -> Experiment{device_type} - -> acquisition.Chunk - --- - sample_count: int # number of data points acquired from this stream for a given chunk - timestamps: longblob # (datetime) timestamps of {stream_type} data - """ + device_name = (Patch & key).fetch1( + 'patch_name' + ) - for col in stream.columns: - if col.startswith("_"): - continue - table_definition += f"{col}: longblob\n\t\t\t" + stream = self._stream_reader( + **{ + k: v.format(**{k: device_name}) if k == "pattern" else v + for k, v in self._stream_detail["stream_reader_kwargs"].items() + } + ) - class DeviceDataStream(dj.Imported): - definition = table_definition - _stream_reader = reader - _stream_detail = stream_detail + stream_data = io_api.load( + root=raw_data_dir.as_posix(), + reader=stream, + start=pd.Timestamp(chunk_start), + end=pd.Timestamp(chunk_end), + ) + + self.insert1( + { + **key, + "sample_count": len(stream_data), + "timestamps": stream_data.index.values, + **{ + c: stream_data[c].values + for c in stream.columns + if not c.startswith("_") + }, + } + ) + + +@schema +class PatchDepletionState(dj.Imported): + definition = """# Raw per-chunk DepletionState data stream from Patch (auto-generated with aeon_mecha-unknown) + -> Patch + -> acquisition.Chunk + --- + sample_count: int # number of data points acquired from this stream for a given chunk + timestamps: longblob # (datetime) timestamps of DepletionState data + """ + _stream_reader = aeon.schema.foraging._PatchState + _stream_detail = {'stream_type': 'DepletionState', 'stream_reader': 'aeon.schema.foraging._PatchState', 'stream_reader_kwargs': {'pattern': '{pattern}_State'}, 'stream_description': '', 'stream_hash': UUID('73025490-348c-18fd-d565-8e682b5b4bcd')} @property def key_source(self): f""" - Only the combination of Chunk and {exp_device_table_name} with overlapping time - + Chunk(s) that started after {exp_device_table_name} install time and ended before {exp_device_table_name} remove time - + Chunk(s) that started after {exp_device_table_name} install time for {exp_device_table_name} that are not yet removed + Only the combination of Chunk and Patch with overlapping time + + Chunk(s) that started after Patch install time and ended before Patch remove time + + Chunk(s) that started after Patch install time for Patch that are not yet removed """ return ( acquisition.Chunk - * ExperimentDevice.join(ExperimentDevice.RemovalTime, left=True) - & f"chunk_start >= {device_type}_install_time" - & f'chunk_start < IFNULL({device_type}_remove_time, "2200-01-01")' + * Patch.join(Patch.RemovalTime, left=True) + & 'chunk_start >= patch_install_time' + & 'chunk_start < IFNULL(patch_removal_time, "2200-01-01")' ) def make(self, key): @@ -302,7 +308,9 @@ def make(self, key): key, directory_type=dir_type ) - device_name = (ExperimentDevice & key).fetch1(f"{device_type}_name") + device_name = (Patch & key).fetch1( + 'patch_name' + ) stream = self._stream_reader( **{ @@ -331,117 +339,532 @@ def make(self, key): } ) - DeviceDataStream.__name__ = f"{device_type}{stream_type}" - return DeviceDataStream +@schema +class PatchEncoder(dj.Imported): + definition = """# Raw per-chunk Encoder data stream from Patch (auto-generated with aeon_mecha-unknown) + -> Patch + -> acquisition.Chunk + --- + sample_count: int # number of data points acquired from this stream for a given chunk + timestamps: longblob # (datetime) timestamps of Encoder data + """ + _stream_reader = aeon.io.reader.Encoder + _stream_detail = {'stream_type': 'Encoder', 'stream_reader': 'aeon.io.reader.Encoder', 'stream_reader_kwargs': {'pattern': '{pattern}_90'}, 'stream_description': '', 'stream_hash': UUID('45002714-c31d-b2b8-a6e6-6ae624385cc1')} + @property + def key_source(self): + f""" + Only the combination of Chunk and Patch with overlapping time + + Chunk(s) that started after Patch install time and ended before Patch remove time + + Chunk(s) that started after Patch install time for Patch that are not yet removed + """ + return ( + acquisition.Chunk + * Patch.join(Patch.RemovalTime, left=True) + & 'chunk_start >= patch_install_time' + & 'chunk_start < IFNULL(patch_removal_time, "2200-01-01")' + ) -class DeviceTableManager: - def __init__(self, context=None): + def make(self, key): + chunk_start, chunk_end, dir_type = (acquisition.Chunk & key).fetch1( + "chunk_start", "chunk_end", "directory_type" + ) + raw_data_dir = acquisition.Experiment.get_data_directory( + key, directory_type=dir_type + ) - if context is None: - self.context = inspect.currentframe().f_back.f_locals - else: - self.context = context + device_name = (Patch & key).fetch1( + 'patch_name' + ) - self._schema = dj.schema(context=self.context) - self._device_tables = [] - self._device_stream_tables = [] - self._device_types = DeviceType.fetch("device_type") - self._device_stream_map = defaultdict( - list - ) # dictionary for showing hierarchical relationship between device type and stream type + stream = self._stream_reader( + **{ + k: v.format(**{k: device_name}) if k == "pattern" else v + for k, v in self._stream_detail["stream_reader_kwargs"].items() + } + ) - def _add_device_tables(self): - for device_type in self._device_types: - table_name = f"Experiment{device_type}" - if table_name not in self._device_tables: - self._device_tables.append(table_name) + stream_data = io_api.load( + root=raw_data_dir.as_posix(), + reader=stream, + start=pd.Timestamp(chunk_start), + end=pd.Timestamp(chunk_end), + ) - def _add_device_stream_tables(self): - for device_type in self._device_types: - for stream_type in ( - StreamType & (DeviceType.Stream & {"device_type": device_type}) - ).fetch("stream_type"): + self.insert1( + { + **key, + "sample_count": len(stream_data), + "timestamps": stream_data.index.values, + **{ + c: stream_data[c].values + for c in stream.columns + if not c.startswith("_") + }, + } + ) - table_name = f"{device_type}{stream_type}" - if table_name not in self._device_stream_tables: - self._device_stream_tables.append(table_name) - self._device_stream_map[device_type].append(stream_type) +@schema +class UndergroundFeederBeamBreak(dj.Imported): + definition = """# Raw per-chunk BeamBreak data stream from UndergroundFeeder (auto-generated with aeon_mecha-unknown) + -> UndergroundFeeder + -> acquisition.Chunk + --- + sample_count: int # number of data points acquired from this stream for a given chunk + timestamps: longblob # (datetime) timestamps of BeamBreak data + """ + _stream_reader = aeon.io.reader.BitmaskEvent + _stream_detail = {'stream_type': 'BeamBreak', 'stream_reader': 'aeon.io.reader.BitmaskEvent', 'stream_reader_kwargs': {'pattern': '{pattern}_32', 'value': 34, 'tag': 'BeamBroken'}, 'stream_description': '', 'stream_hash': UUID('b14171e6-d27d-117a-ae73-a16c4b5fc8a2')} - @property - def device_types(self): - return self._device_types + @property + def key_source(self): + f""" + Only the combination of Chunk and UndergroundFeeder with overlapping time + + Chunk(s) that started after UndergroundFeeder install time and ended before UndergroundFeeder remove time + + Chunk(s) that started after UndergroundFeeder install time for UndergroundFeeder that are not yet removed + """ + return ( + acquisition.Chunk + * UndergroundFeeder.join(UndergroundFeeder.RemovalTime, left=True) + & 'chunk_start >= underground_feeder_install_time' + & 'chunk_start < IFNULL(underground_feeder_removal_time, "2200-01-01")' + ) - @cached_property - def device_tables(self) -> list: - """ - Name of the device tables to be created - """ + def make(self, key): + chunk_start, chunk_end, dir_type = (acquisition.Chunk & key).fetch1( + "chunk_start", "chunk_end", "directory_type" + ) + raw_data_dir = acquisition.Experiment.get_data_directory( + key, directory_type=dir_type + ) - self._add_device_tables() - return self._device_tables + device_name = (UndergroundFeeder & key).fetch1( + 'underground_feeder_name' + ) - @cached_property - def device_stream_tables(self) -> list: - """ - Name of the device stream tables to be created - """ - self._add_device_stream_tables() - return self._device_stream_tables + stream = self._stream_reader( + **{ + k: v.format(**{k: device_name}) if k == "pattern" else v + for k, v in self._stream_detail["stream_reader_kwargs"].items() + } + ) - @cached_property - def device_stream_map(self) -> dict: - self._add_device_stream_tables() - return self._device_stream_map + stream_data = io_api.load( + root=raw_data_dir.as_posix(), + reader=stream, + start=pd.Timestamp(chunk_start), + end=pd.Timestamp(chunk_end), + ) - def create_device_tables(self): + self.insert1( + { + **key, + "sample_count": len(stream_data), + "timestamps": stream_data.index.values, + **{ + c: stream_data[c].values + for c in stream.columns + if not c.startswith("_") + }, + } + ) - for device_table in self.device_tables: - device_type = re.sub(r"\bExperiment", "", device_table) +@schema +class UndergroundFeederDeliverPellet(dj.Imported): + definition = """# Raw per-chunk DeliverPellet data stream from UndergroundFeeder (auto-generated with aeon_mecha-unknown) + -> UndergroundFeeder + -> acquisition.Chunk + --- + sample_count: int # number of data points acquired from this stream for a given chunk + timestamps: longblob # (datetime) timestamps of DeliverPellet data + """ + _stream_reader = aeon.io.reader.BitmaskEvent + _stream_detail = {'stream_type': 'DeliverPellet', 'stream_reader': 'aeon.io.reader.BitmaskEvent', 'stream_reader_kwargs': {'pattern': '{pattern}_35', 'value': 1, 'tag': 'TriggeredPellet'}, 'stream_description': '', 'stream_hash': UUID('c49dda51-2e38-8b49-d1d8-2e54ea928e9c')} - table_class = get_device_template(device_type) + @property + def key_source(self): + f""" + Only the combination of Chunk and UndergroundFeeder with overlapping time + + Chunk(s) that started after UndergroundFeeder install time and ended before UndergroundFeeder remove time + + Chunk(s) that started after UndergroundFeeder install time for UndergroundFeeder that are not yet removed + """ + return ( + acquisition.Chunk + * UndergroundFeeder.join(UndergroundFeeder.RemovalTime, left=True) + & 'chunk_start >= underground_feeder_install_time' + & 'chunk_start < IFNULL(underground_feeder_removal_time, "2200-01-01")' + ) - self.context[table_class.__name__] = table_class - self._schema(table_class, context=self.context) + def make(self, key): + chunk_start, chunk_end, dir_type = (acquisition.Chunk & key).fetch1( + "chunk_start", "chunk_end", "directory_type" + ) + raw_data_dir = acquisition.Experiment.get_data_directory( + key, directory_type=dir_type + ) - self._schema.activate(schema_name) + device_name = (UndergroundFeeder & key).fetch1( + 'underground_feeder_name' + ) - def create_device_stream_tables(self): + stream = self._stream_reader( + **{ + k: v.format(**{k: device_name}) if k == "pattern" else v + for k, v in self._stream_detail["stream_reader_kwargs"].items() + } + ) - for device_type in self.device_stream_map: + stream_data = io_api.load( + root=raw_data_dir.as_posix(), + reader=stream, + start=pd.Timestamp(chunk_start), + end=pd.Timestamp(chunk_end), + ) + + self.insert1( + { + **key, + "sample_count": len(stream_data), + "timestamps": stream_data.index.values, + **{ + c: stream_data[c].values + for c in stream.columns + if not c.startswith("_") + }, + } + ) + + +@schema +class UndergroundFeederDepletionState(dj.Imported): + definition = """# Raw per-chunk DepletionState data stream from UndergroundFeeder (auto-generated with aeon_mecha-unknown) + -> UndergroundFeeder + -> acquisition.Chunk + --- + sample_count: int # number of data points acquired from this stream for a given chunk + timestamps: longblob # (datetime) timestamps of DepletionState data + """ + _stream_reader = aeon.schema.foraging._PatchState + _stream_detail = {'stream_type': 'DepletionState', 'stream_reader': 'aeon.schema.foraging._PatchState', 'stream_reader_kwargs': {'pattern': '{pattern}_State'}, 'stream_description': '', 'stream_hash': UUID('73025490-348c-18fd-d565-8e682b5b4bcd')} + + @property + def key_source(self): + f""" + Only the combination of Chunk and UndergroundFeeder with overlapping time + + Chunk(s) that started after UndergroundFeeder install time and ended before UndergroundFeeder remove time + + Chunk(s) that started after UndergroundFeeder install time for UndergroundFeeder that are not yet removed + """ + return ( + acquisition.Chunk + * UndergroundFeeder.join(UndergroundFeeder.RemovalTime, left=True) + & 'chunk_start >= underground_feeder_install_time' + & 'chunk_start < IFNULL(underground_feeder_removal_time, "2200-01-01")' + ) + + def make(self, key): + chunk_start, chunk_end, dir_type = (acquisition.Chunk & key).fetch1( + "chunk_start", "chunk_end", "directory_type" + ) + raw_data_dir = acquisition.Experiment.get_data_directory( + key, directory_type=dir_type + ) + + device_name = (UndergroundFeeder & key).fetch1( + 'underground_feeder_name' + ) + + stream = self._stream_reader( + **{ + k: v.format(**{k: device_name}) if k == "pattern" else v + for k, v in self._stream_detail["stream_reader_kwargs"].items() + } + ) + + stream_data = io_api.load( + root=raw_data_dir.as_posix(), + reader=stream, + start=pd.Timestamp(chunk_start), + end=pd.Timestamp(chunk_end), + ) + + self.insert1( + { + **key, + "sample_count": len(stream_data), + "timestamps": stream_data.index.values, + **{ + c: stream_data[c].values + for c in stream.columns + if not c.startswith("_") + }, + } + ) + + +@schema +class UndergroundFeederEncoder(dj.Imported): + definition = """# Raw per-chunk Encoder data stream from UndergroundFeeder (auto-generated with aeon_mecha-unknown) + -> UndergroundFeeder + -> acquisition.Chunk + --- + sample_count: int # number of data points acquired from this stream for a given chunk + timestamps: longblob # (datetime) timestamps of Encoder data + """ + _stream_reader = aeon.io.reader.Encoder + _stream_detail = {'stream_type': 'Encoder', 'stream_reader': 'aeon.io.reader.Encoder', 'stream_reader_kwargs': {'pattern': '{pattern}_90'}, 'stream_description': '', 'stream_hash': UUID('45002714-c31d-b2b8-a6e6-6ae624385cc1')} + + @property + def key_source(self): + f""" + Only the combination of Chunk and UndergroundFeeder with overlapping time + + Chunk(s) that started after UndergroundFeeder install time and ended before UndergroundFeeder remove time + + Chunk(s) that started after UndergroundFeeder install time for UndergroundFeeder that are not yet removed + """ + return ( + acquisition.Chunk + * UndergroundFeeder.join(UndergroundFeeder.RemovalTime, left=True) + & 'chunk_start >= underground_feeder_install_time' + & 'chunk_start < IFNULL(underground_feeder_removal_time, "2200-01-01")' + ) + + def make(self, key): + chunk_start, chunk_end, dir_type = (acquisition.Chunk & key).fetch1( + "chunk_start", "chunk_end", "directory_type" + ) + raw_data_dir = acquisition.Experiment.get_data_directory( + key, directory_type=dir_type + ) + + device_name = (UndergroundFeeder & key).fetch1( + 'underground_feeder_name' + ) + + stream = self._stream_reader( + **{ + k: v.format(**{k: device_name}) if k == "pattern" else v + for k, v in self._stream_detail["stream_reader_kwargs"].items() + } + ) + + stream_data = io_api.load( + root=raw_data_dir.as_posix(), + reader=stream, + start=pd.Timestamp(chunk_start), + end=pd.Timestamp(chunk_end), + ) + + self.insert1( + { + **key, + "sample_count": len(stream_data), + "timestamps": stream_data.index.values, + **{ + c: stream_data[c].values + for c in stream.columns + if not c.startswith("_") + }, + } + ) + + +@schema +class VideoSourcePosition(dj.Imported): + definition = """# Raw per-chunk Position data stream from VideoSource (auto-generated with aeon_mecha-unknown) + -> VideoSource + -> acquisition.Chunk + --- + sample_count: int # number of data points acquired from this stream for a given chunk + timestamps: longblob # (datetime) timestamps of Position data + """ + _stream_reader = aeon.io.reader.Position + _stream_detail = {'stream_type': 'Position', 'stream_reader': 'aeon.io.reader.Position', 'stream_reader_kwargs': {'pattern': '{pattern}_200'}, 'stream_description': '', 'stream_hash': UUID('75f9f365-037a-1e9b-ad38-8b2b3783315d')} + + @property + def key_source(self): + f""" + Only the combination of Chunk and VideoSource with overlapping time + + Chunk(s) that started after VideoSource install time and ended before VideoSource remove time + + Chunk(s) that started after VideoSource install time for VideoSource that are not yet removed + """ + return ( + acquisition.Chunk + * VideoSource.join(VideoSource.RemovalTime, left=True) + & 'chunk_start >= video_source_install_time' + & 'chunk_start < IFNULL(video_source_removal_time, "2200-01-01")' + ) - for stream_type in self.device_stream_map[device_type]: + def make(self, key): + chunk_start, chunk_end, dir_type = (acquisition.Chunk & key).fetch1( + "chunk_start", "chunk_end", "directory_type" + ) + raw_data_dir = acquisition.Experiment.get_data_directory( + key, directory_type=dir_type + ) - table_class = get_device_stream_template(device_type, stream_type) + device_name = (VideoSource & key).fetch1( + 'video_source_name' + ) - self.context[table_class.__name__] = table_class - self._schema(table_class, context=self.context) + stream = self._stream_reader( + **{ + k: v.format(**{k: device_name}) if k == "pattern" else v + for k, v in self._stream_detail["stream_reader_kwargs"].items() + } + ) - self._schema.activate(schema_name) + stream_data = io_api.load( + root=raw_data_dir.as_posix(), + reader=stream, + start=pd.Timestamp(chunk_start), + end=pd.Timestamp(chunk_end), + ) + self.insert1( + { + **key, + "sample_count": len(stream_data), + "timestamps": stream_data.index.values, + **{ + c: stream_data[c].values + for c in stream.columns + if not c.startswith("_") + }, + } + ) -# Main function -def main(): - # Populate StreamType - StreamType.insert_streams() +@schema +class VideoSourceRegion(dj.Imported): + definition = """# Raw per-chunk Region data stream from VideoSource (auto-generated with aeon_mecha-unknown) + -> VideoSource + -> acquisition.Chunk + --- + sample_count: int # number of data points acquired from this stream for a given chunk + timestamps: longblob # (datetime) timestamps of Region data + """ + _stream_reader = aeon.schema.foraging._RegionReader + _stream_detail = {'stream_type': 'Region', 'stream_reader': 'aeon.schema.foraging._RegionReader', 'stream_reader_kwargs': {'pattern': '{pattern}_201'}, 'stream_description': '', 'stream_hash': UUID('6234a429-8ae5-d7dc-41c8-602ac76da029')} - # Populate DeviceType - DeviceType.insert_devices() + @property + def key_source(self): + f""" + Only the combination of Chunk and VideoSource with overlapping time + + Chunk(s) that started after VideoSource install time and ended before VideoSource remove time + + Chunk(s) that started after VideoSource install time for VideoSource that are not yet removed + """ + return ( + acquisition.Chunk + * VideoSource.join(VideoSource.RemovalTime, left=True) + & 'chunk_start >= video_source_install_time' + & 'chunk_start < IFNULL(video_source_removal_time, "2200-01-01")' + ) - # Populate device tables - tbmg = DeviceTableManager(context=inspect.currentframe().f_back.f_locals) + def make(self, key): + chunk_start, chunk_end, dir_type = (acquisition.Chunk & key).fetch1( + "chunk_start", "chunk_end", "directory_type" + ) + raw_data_dir = acquisition.Experiment.get_data_directory( + key, directory_type=dir_type + ) - # # List all tables - # tbmg.device_tables - # tbmg.device_stream_tables + device_name = (VideoSource & key).fetch1( + 'video_source_name' + ) - # Create device & device stream tables - tbmg.create_device_tables() - tbmg.create_device_stream_tables() + stream = self._stream_reader( + **{ + k: v.format(**{k: device_name}) if k == "pattern" else v + for k, v in self._stream_detail["stream_reader_kwargs"].items() + } + ) + + stream_data = io_api.load( + root=raw_data_dir.as_posix(), + reader=stream, + start=pd.Timestamp(chunk_start), + end=pd.Timestamp(chunk_end), + ) + + self.insert1( + { + **key, + "sample_count": len(stream_data), + "timestamps": stream_data.index.values, + **{ + c: stream_data[c].values + for c in stream.columns + if not c.startswith("_") + }, + } + ) + + +@schema +class VideoSourceVideo(dj.Imported): + definition = """# Raw per-chunk Video data stream from VideoSource (auto-generated with aeon_mecha-unknown) + -> VideoSource + -> acquisition.Chunk + --- + sample_count: int # number of data points acquired from this stream for a given chunk + timestamps: longblob # (datetime) timestamps of Video data + """ + _stream_reader = aeon.io.reader.Video + _stream_detail = {'stream_type': 'Video', 'stream_reader': 'aeon.io.reader.Video', 'stream_reader_kwargs': {'pattern': '{pattern}'}, 'stream_description': '', 'stream_hash': UUID('4246295b-789f-206d-b413-7af25b7548b2')} + + @property + def key_source(self): + f""" + Only the combination of Chunk and VideoSource with overlapping time + + Chunk(s) that started after VideoSource install time and ended before VideoSource remove time + + Chunk(s) that started after VideoSource install time for VideoSource that are not yet removed + """ + return ( + acquisition.Chunk + * VideoSource.join(VideoSource.RemovalTime, left=True) + & 'chunk_start >= video_source_install_time' + & 'chunk_start < IFNULL(video_source_removal_time, "2200-01-01")' + ) + + def make(self, key): + chunk_start, chunk_end, dir_type = (acquisition.Chunk & key).fetch1( + "chunk_start", "chunk_end", "directory_type" + ) + raw_data_dir = acquisition.Experiment.get_data_directory( + key, directory_type=dir_type + ) + + device_name = (VideoSource & key).fetch1( + 'video_source_name' + ) + + stream = self._stream_reader( + **{ + k: v.format(**{k: device_name}) if k == "pattern" else v + for k, v in self._stream_detail["stream_reader_kwargs"].items() + } + ) + + stream_data = io_api.load( + root=raw_data_dir.as_posix(), + reader=stream, + start=pd.Timestamp(chunk_start), + end=pd.Timestamp(chunk_end), + ) + + self.insert1( + { + **key, + "sample_count": len(stream_data), + "timestamps": stream_data.index.values, + **{ + c: stream_data[c].values + for c in stream.columns + if not c.startswith("_") + }, + } + ) -# main() diff --git a/aeon/dj_pipeline/utils/load_metadata.py b/aeon/dj_pipeline/utils/load_metadata.py index 9b7e701f..84de0a85 100644 --- a/aeon/dj_pipeline/utils/load_metadata.py +++ b/aeon/dj_pipeline/utils/load_metadata.py @@ -1,12 +1,18 @@ -import pathlib -import re import datetime +import inspect +import json +import pathlib +from collections import defaultdict +from pathlib import Path +import datajoint as dj +import numpy as np import pandas as pd -import yaml +from dotmap import DotMap -from aeon.dj_pipeline import acquisition, lab, subject -from aeon.dj_pipeline import dict_to_uuid +from aeon.dj_pipeline import acquisition, dict_to_uuid, subject +from aeon.dj_pipeline.utils import streams_maker +from aeon.io import api as io_api _weight_scale_rate = 100 @@ -27,340 +33,482 @@ def ingest_subject(colony_csv_path: pathlib.Path = _colony_csv_path) -> None: ) -def extract_epoch_metadata(experiment_name, metadata_yml_filepath): +def insert_stream_types(): + """Insert into streams.streamType table all streams in the dataset schema.""" + from aeon.schema import dataset + + streams = dj.VirtualModule("streams", streams_maker.STREAMS_MODULE_NAME) + + schemas = [v for v in dataset.__dict__.values() if isinstance(v, DotMap)] + for schema in schemas: + + stream_entries = get_stream_entries(schema) + + for entry in stream_entries: + q_param = streams.StreamType & {"stream_hash": entry["stream_hash"]} + if q_param: # If the specified stream type already exists + pname = q_param.fetch1("stream_type") + if pname != entry["stream_type"]: + # If the existed stream type does not have the same name: + # human error, trying to add the same content with different name + raise dj.DataJointError( + f"The specified stream type already exists - name: {pname}" + ) + + streams.StreamType.insert(stream_entries, skip_duplicates=True) + + +def insert_device_types(schema: DotMap, metadata_yml_filepath: Path): + """Use dataset.schema and metadata.yml to insert into streams.DeviceType and streams.Device. Only insert device types that were defined both in the device schema (e.g., exp02) and Metadata.yml. It then creates new device tables under streams schema.""" + streams = dj.VirtualModule("streams", streams_maker.STREAMS_MODULE_NAME) + + device_info: dict[dict] = get_device_info(schema) + device_type_mapper, device_sn = get_device_mapper(schema, metadata_yml_filepath) + + # Add device type to device_info. Only add if device types that are defined in Metadata.yml + device_info = { + device_name: { + "device_type": device_type_mapper.get(device_name), + **device_info[device_name], + } + for device_name in device_info + if device_type_mapper.get(device_name) and device_sn.get(device_name) + } + + # Create a map of device_type to stream_type. + device_stream_map: dict[list] = {} + + for device_config in device_info.values(): + device_type = device_config["device_type"] + stream_types = device_config["stream_type"] + + if device_type not in device_stream_map: + device_stream_map[device_type] = [] + + for stream_type in stream_types: + if stream_type not in device_stream_map[device_type]: + device_stream_map[device_type].append(stream_type) + + # List only new device & stream types that need to be inserted & created. + new_device_types = [ + {"device_type": device_type} + for device_type in device_stream_map.keys() + if not streams.DeviceType & {"device_type": device_type} + ] + + new_device_stream_types = [ + {"device_type": device_type, "stream_type": stream_type} + for device_type, stream_list in device_stream_map.items() + for stream_type in stream_list + if not streams.DeviceType.Stream + & {"device_type": device_type, "stream_type": stream_type} + ] + + new_devices = [ + { + "device_serial_number": device_sn[device_name], + "device_type": device_config["device_type"], + } + for device_name, device_config in device_info.items() + if device_sn[device_name] + and not streams.Device & {"device_serial_number": device_sn[device_name]} + ] + + # Insert new entries. + if new_device_types: + streams.DeviceType.insert(new_device_types) + + if new_device_stream_types: + streams.DeviceType.Stream.insert(new_device_stream_types) + + if new_devices: + streams.Device.insert(new_devices) + + +def extract_epoch_config(experiment_name: str, metadata_yml_filepath: str) -> dict: + """Parse experiment metadata YAML file and extract epoch configuration. + + Args: + experiment_name (str) + metadata_yml_filepath (str) + + Returns: + dict: epoch_config [dict] + """ + metadata_yml_filepath = pathlib.Path(metadata_yml_filepath) epoch_start = datetime.datetime.strptime( metadata_yml_filepath.parent.name, "%Y-%m-%dT%H-%M-%S" ) - with open(metadata_yml_filepath, "r") as f: - experiment_setup = yaml.safe_load(f) - commit = experiment_setup.get("Commit", experiment_setup.get("Revision")) + epoch_config: dict = ( + io_api.load( + str(metadata_yml_filepath.parent), + acquisition._device_schema_mapping[experiment_name].Metadata, + ) + .reset_index() + .to_dict("records")[0] + ) + + commit = epoch_config.get("commit") + if isinstance(commit, float) and np.isnan(commit): + commit = epoch_config["metadata"]["Revision"] + assert commit, f'Neither "Commit" nor "Revision" found in {metadata_yml_filepath}' + + devices: list[dict] = json.loads( + json.dumps( + epoch_config["metadata"]["Devices"], default=lambda x: x.__dict__, indent=4 + ) + ) + + if isinstance( + devices, list + ): # In exp02, it is a list of dict. In presocial. It's a dict of dict. + devices: dict = { + d.pop("Name"): d for d in devices + } # {deivce_name: device_config} + return { "experiment_name": experiment_name, "epoch_start": epoch_start, - "bonsai_workflow": experiment_setup["Workflow"], + "bonsai_workflow": epoch_config["workflow"], "commit": commit, - "metadata": experiment_setup, + "metadata": devices, #! this format might have changed since using aeon metadata reader "metadata_file_path": metadata_yml_filepath, } def ingest_epoch_metadata(experiment_name, metadata_yml_filepath): """ - work-in-progress - Missing: - + camera/patch location - + patch, weightscale serial number + Make entries into device tables """ + streams = dj.VirtualModule("streams", streams_maker.STREAMS_MODULE_NAME) + if experiment_name.startswith("oct"): ingest_epoch_metadata_octagon(experiment_name, metadata_yml_filepath) return - metadata_yml_filepath = pathlib.Path(metadata_yml_filepath) - epoch_start = datetime.datetime.strptime( - metadata_yml_filepath.parent.name, "%Y-%m-%dT%H-%M-%S" - ) - with open(metadata_yml_filepath, "r") as f: - experiment_setup = yaml.safe_load(f) experiment_key = {"experiment_name": experiment_name} - # Check if there has been any changes in the arena setup - # by comparing the "Commit" against the most immediate preceding epoch - commit = experiment_setup.get("Commit", experiment_setup.get("Revision")) - assert commit, f'Neither "Commit" nor "Revision" found in {metadata_yml_filepath}' + metadata_yml_filepath = pathlib.Path(metadata_yml_filepath) + epoch_config = extract_epoch_config(experiment_name, metadata_yml_filepath) + previous_epoch = (acquisition.Experiment & experiment_key).aggr( - acquisition.Epoch & f'epoch_start < "{epoch_start}"', + acquisition.Epoch & f'epoch_start < "{epoch_config["epoch_start"]}"', epoch_start="MAX(epoch_start)", ) - if len(acquisition.Epoch.Config & previous_epoch) and commit == ( + if len(acquisition.Epoch.Config & previous_epoch) and epoch_config["commit"] == ( acquisition.Epoch.Config & previous_epoch ).fetch1("commit"): # if identical commit -> no changes return - if isinstance(experiment_setup["Devices"], list): - experiment_devices = experiment_setup.pop("Devices") - elif isinstance(experiment_setup["Devices"], dict): - experiment_devices = [] - for device_name, device_info in experiment_setup.pop("Devices").items(): - if device_name.startswith("VideoController"): - device_type = "VideoController" - elif all(v in device_info for v in ("TriggerFrequency", "FrameEvents")): - device_type = "VideoSource" - elif all(v in device_info for v in ("PelletDelivered", "PatchEvents")): - device_type = "Patch" - elif all(v in device_info for v in ("TareWeight", "WeightEvents")): - device_type = "WeightScale" - elif device_name.startswith("AudioAmbient"): - device_type = "AudioAmbient" - elif device_name.startswith("Wall"): - device_type = "Wall" - elif device_name.startswith("Photodiode"): - device_type = "Photodiode" - else: - raise ValueError(f"Unrecognized Device Type for {device_name}") - experiment_devices.append( - {"Name": device_name, "Type": device_type, **device_info} - ) - else: - raise ValueError( - f"Unexpected devices variable type: {type(experiment_setup['Devices'])}" - ) - # ---- Video Controller ---- - video_controller = [ - device for device in experiment_devices if device["Type"] == "VideoController" - ] - assert ( - len(video_controller) == 1 - ), "Unable to find one unique VideoController device" - video_controller = video_controller[0] + device_frequency_mapper = { name: float(value) - for name, value in video_controller.items() + for name, value in epoch_config["metadata"]["VideoController"].items() if name.endswith("Frequency") - } - # ---- Load cameras ---- - cameras = [ - device for device in experiment_devices if device["Type"] == "VideoSource" - ] - camera_list, camera_installation_list, camera_removal_list, camera_position_list = ( - [], - [], - [], - [], - ) - for camera in cameras: - # ---- Check if this is a new camera, add to lab.Camera if needed - camera_key = {"camera_serial_number": camera["SerialNumber"]} - camera_list.append(camera_key) - camera_installation = { - "experiment_name": experiment_name, - **camera_key, - "camera_install_time": epoch_start, - "camera_description": camera["Name"], - "camera_sampling_rate": device_frequency_mapper[camera["TriggerFrequency"]], - "camera_gain": float(camera["Gain"]), - "camera_bin": int(camera["Binning"]), - } - if "position" in camera: - camera_position = { - **camera_key, + } # May not be needed? + + schema = acquisition._device_schema_mapping[experiment_name] + device_type_mapper, _ = get_device_mapper(schema, metadata_yml_filepath) + + # Insert into each device table + device_list = [] + device_removal_list = [] + + for device_name, device_config in epoch_config["metadata"].items(): + if table := getattr(streams, device_type_mapper.get(device_name) or "", None): + + device_sn = device_config.get("SerialNumber", device_config.get("PortName")) + device_key = {"device_serial_number": device_sn} + + device_list.append(device_key) + table_entry = { "experiment_name": experiment_name, - "camera_install_time": epoch_start, - "camera_position_x": camera["position"]["x"], - "camera_position_y": camera["position"]["y"], - "camera_position_z": camera["position"]["z"], - } - else: - camera_position = { - "camera_position_x": None, - "camera_position_y": None, - "camera_position_z": None, - "camera_rotation_x": None, - "camera_rotation_y": None, - "camera_rotation_z": None, + **device_key, + f"{dj.utils.from_camel_case(table.__name__)}_install_time": epoch_config[ + "epoch_start" + ], + f"{dj.utils.from_camel_case(table.__name__)}_name": device_name, } - # ---- Check if this camera is currently installed - # If the same camera serial number is currently installed - # check for any changes in configuration, if not, skip this - current_camera_query = ( - acquisition.ExperimentCamera - acquisition.ExperimentCamera.RemovalTime - & experiment_key - & camera_key - ) - if current_camera_query: - current_camera_config = current_camera_query.join( - acquisition.ExperimentCamera.Position, left=True - ).fetch1() - new_camera_config = {**camera_installation, **camera_position} - current_camera_config.pop("camera_install_time") - new_camera_config.pop("camera_install_time") - if dict_to_uuid(current_camera_config) == dict_to_uuid(new_camera_config): - continue - # ---- Remove old camera - camera_removal_list.append( + + table_attribute_entry = [ { - **current_camera_query.fetch1("KEY"), - "camera_remove_time": epoch_start, + **table_entry, + "attribute_name": attribute_name, + "attribute_value": attribute_value, } + for attribute_name, attribute_value in device_config.items() + ] + + """Check if this camera is currently installed. If the same camera serial number is currently installed check for any changes in configuration. If not, skip this""" + current_device_query = ( + table - table.RemovalTime & experiment_key & device_key ) - # ---- Install new camera - camera_installation_list.append(camera_installation) - if "position" in camera: - camera_position_list.append(camera_position) - # remove the currently installed cameras that are absent in this config - camera_removal_list.extend( - ( - acquisition.ExperimentCamera - - acquisition.ExperimentCamera.RemovalTime - - camera_list - & experiment_key - ).fetch("KEY") - ) - # ---- Load food patches ---- - food_patches = [ - device for device in experiment_devices if device["Type"] == "Patch" - ] - patch_list, patch_installation_list, patch_removal_list, patch_position_list = ( - [], - [], - [], - [], - ) - for patch in food_patches: - # ---- Check if this is a new food patch, add to lab.FoodPatch if needed - patch_key = { - "food_patch_serial_number": patch.get("SerialNumber") or patch["PortName"] - } - patch_list.append(patch_key) - patch_installation = { - **patch_key, - "experiment_name": experiment_name, - "food_patch_install_time": epoch_start, - "food_patch_description": patch["Name"], - "wheel_sampling_rate": float( - re.search(r"\d+", patch["SampleRate"]).group() - ), - "wheel_radius": float(patch["Radius"]), + + if current_device_query: + current_device_config: list[dict] = ( + table.Attribute & current_device_query + ).fetch( + "experiment_name", + "device_serial_number", + "attribute_name", + "attribute_value", + as_dict=True, + ) + new_device_config: list[dict] = [ + { + k: v + for k, v in entry.items() + if dj.utils.from_camel_case(table.__name__) not in k + } + for entry in table_attribute_entry + ] + + if dict_to_uuid( + { + config["attribute_name"]: config["attribute_value"] + for config in current_device_config + } + ) == dict_to_uuid( + { + config["attribute_name"]: config["attribute_value"] + for config in new_device_config + } + ): # Skip if none of the configuration has changed. + continue + + # Remove old device + device_removal_list.append( + { + **current_device_query.fetch1("KEY"), + f"{dj.utils.from_camel_case(table.__name__)}_removal_time": epoch_config[ + "epoch_start" + ], + } + ) + + # Insert into table. + table.insert1(table_entry, skip_duplicates=True) + table.Attribute.insert(table_attribute_entry, ignore_extra_fields=True) + + # Remove the currently installed devices that are absent in this config + device_removal = lambda device_type, device_entry: any( + dj.utils.from_camel_case(device_type) in k for k in device_entry + ) # returns True if the device type is found in the attribute name + + for device_type in streams.DeviceType.fetch("device_type"): + table = getattr(streams, device_type) + + device_removal_list.extend( + (table - table.RemovalTime - device_list & experiment_key).fetch("KEY") + ) # could be VideoSource or Patch + + for device_entry in device_removal_list: + if device_removal(device_type, device_entry): + table.RemovalTime.insert1(device_entry) + + +# region Get stream & device information +def get_stream_entries(schema: DotMap) -> list[dict]: + """Returns a list of dictionaries containing the stream entries for a given device, + + Args: + schema (DotMap): DotMap object (e.g., exp02, octagon01) + + Returns: + stream_info (list[dict]): list of dictionaries containing the stream entries for a given device, + + e.g. {'stream_type': 'EnvironmentState', + 'stream_reader': aeon.io.reader.Csv, + 'stream_reader_kwargs': {'pattern': '{pattern}_EnvironmentState', + 'columns': ['state'], + 'extension': 'csv', + 'dtype': None} + """ + device_info = get_device_info(schema) + return [ + { + "stream_type": stream_type, + "stream_reader": stream_reader, + "stream_reader_kwargs": stream_reader_kwargs, + "stream_hash": stream_hash, } - if "position" in patch: - patch_position = { - **patch_key, - "experiment_name": experiment_name, - "food_patch_install_time": epoch_start, - "food_patch_position_x": patch["position"]["x"], - "food_patch_position_y": patch["position"]["y"], - "food_patch_position_z": patch["position"]["z"], - } - else: - patch_position = { - "food_patch_position_x": None, - "food_patch_position_y": None, - "food_patch_position_z": None, - } - # ---- Check if this camera is currently installed - # If the same camera serial number is currently installed - # check for any changes in configuration, if not, skip this - current_patch_query = ( - acquisition.ExperimentFoodPatch - - acquisition.ExperimentFoodPatch.RemovalTime - & experiment_key - & patch_key + for stream_info in device_info.values() + for stream_type, stream_reader, stream_reader_kwargs, stream_hash in zip( + stream_info["stream_type"], + stream_info["stream_reader"], + stream_info["stream_reader_kwargs"], + stream_info["stream_hash"], ) - if current_patch_query: - current_patch_config = current_patch_query.join( - acquisition.ExperimentFoodPatch.Position, left=True - ).fetch1() - new_patch_config = {**patch_installation, **patch_position} - current_patch_config.pop("food_patch_install_time") - new_patch_config.pop("food_patch_install_time") - if dict_to_uuid(current_patch_config) == dict_to_uuid(new_patch_config): - continue - # ---- Remove old food patch - patch_removal_list.append( - { - **current_patch_query.fetch1("KEY"), - "food_patch_remove_time": epoch_start, - } - ) - # ---- Install new food patch - patch_installation_list.append(patch_installation) - if "position" in patch: - patch_position_list.append(patch_position) - # remove the currently installed patches that are absent in this config - patch_removal_list.extend( - ( - acquisition.ExperimentFoodPatch - - acquisition.ExperimentFoodPatch.RemovalTime - - patch_list - & experiment_key - ).fetch("KEY") - ) - # ---- Load weight scales ---- - weight_scales = [ - device for device in experiment_devices if device["Type"] == "WeightScale" ] - weight_scale_list, weight_scale_installation_list, weight_scale_removal_list = ( - [], - [], - [], - ) - for weight_scale in weight_scales: - # ---- Check if this is a new weight scale, add to lab.WeightScale if needed - weight_scale_key = { - "weight_scale_serial_number": weight_scale.get("SerialNumber") - or weight_scale["PortName"] - } - weight_scale_list.append(weight_scale_key) - arena_key = (lab.Arena & acquisition.Experiment & experiment_key).fetch1("KEY") - weight_scale_installation = { - "experiment_name": experiment_name, - **weight_scale_key, - "weight_scale_install_time": epoch_start, - **arena_key, - "nest": _weight_scale_nest, - "weight_scale_description": weight_scale["Name"], - "weight_scale_sampling_rate": float(_weight_scale_rate), - } - # ---- Check if this weight scale is currently installed - if so, remove it - current_weight_scale_query = ( - acquisition.ExperimentWeightScale - - acquisition.ExperimentWeightScale.RemovalTime - & experiment_key - & weight_scale_key - ) - if current_weight_scale_query: - current_weight_scale_config = current_weight_scale_query.fetch1() - new_weight_scale_config = weight_scale_installation.copy() - current_weight_scale_config.pop("weight_scale_install_time") - new_weight_scale_config.pop("weight_scale_install_time") - if dict_to_uuid(current_weight_scale_config) == dict_to_uuid( - new_weight_scale_config - ): - continue - # ---- Remove old weight scale - weight_scale_removal_list.append( - { - **current_weight_scale_query.fetch1("KEY"), - "weight_scale_remove_time": epoch_start, + + +def get_device_info(schema: DotMap) -> dict[dict]: + """ + Read from the above DotMap object and returns a device dictionary as the following. + + Args: + schema (DotMap): DotMap object (e.g., exp02, octagon01) + + Returns: + device_info (dict[dict]): A dictionary of device information + + e.g. {'CameraTop': + {'stream_type': ['Video', 'Position', 'Region'], + 'stream_reader': [ + aeon.io.reader.Video, + aeon.io.reader.Position, + aeon.schema.foraging._RegionReader + ], + 'pattern': ['{pattern}', '{pattern}_200', '{pattern}_201'] + } } + """ + + def _get_class_path(obj): + return f"{obj.__class__.__module__}.{obj.__class__.__name__}" + + schema_json = json.dumps(schema, default=lambda x: x.__dict__, indent=4) + schema_dict = json.loads(schema_json) + device_info = {} + + for device_name, device in schema.items(): + if device_name.startswith("_"): + continue + + device_info[device_name] = defaultdict(list) + + if isinstance(device, DotMap): + for stream_type, stream_obj in device.items(): + if stream_obj.__class__.__module__ in [ + "aeon.io.reader", + "aeon.schema.foraging", + "aeon.schema.octagon", + ]: + device_info[device_name]["stream_type"].append(stream_type) + device_info[device_name]["stream_reader"].append( + _get_class_path(stream_obj) + ) + + required_args = [ + k + for k in inspect.signature(stream_obj.__init__).parameters + if k != "self" + ] + pattern = schema_dict[device_name][stream_type].get("pattern") + schema_dict[device_name][stream_type]["pattern"] = pattern.replace( + device_name, "{pattern}" + ) + + kwargs = { + k: v + for k, v in schema_dict[device_name][stream_type].items() + if k in required_args + } + device_info[device_name]["stream_reader_kwargs"].append(kwargs) + # Add hash + device_info[device_name]["stream_hash"].append( + dict_to_uuid( + {**kwargs, "stream_reader": _get_class_path(stream_obj)} + ) + ) + else: + stream_type = device.__class__.__name__ + device_info[device_name]["stream_type"].append(stream_type) + device_info[device_name]["stream_reader"].append(_get_class_path(device)) + + required_args = { + k: None + for k in inspect.signature(device.__init__).parameters + if k != "self" + } + pattern = schema_dict[device_name].get("pattern") + schema_dict[device_name]["pattern"] = pattern.replace( + device_name, "{pattern}" + ) + + kwargs = { + k: v for k, v in schema_dict[device_name].items() if k in required_args + } + device_info[device_name]["stream_reader_kwargs"].append(kwargs) + # Add hash + device_info[device_name]["stream_hash"].append( + dict_to_uuid({**kwargs, "stream_reader": _get_class_path(device)}) ) - # ---- Install new weight scale - weight_scale_installation_list.append(weight_scale_installation) - # remove the currently installed weight scales that are absent in this config - weight_scale_removal_list.extend( - ( - acquisition.ExperimentWeightScale - - acquisition.ExperimentWeightScale.RemovalTime - - weight_scale_list - & experiment_key - ).fetch("KEY") + return device_info + + +def get_device_mapper(schema: DotMap, metadata_yml_filepath: Path): + """Returns a mapping dictionary between device name and device type based on the dataset schema and metadata.yml from the experiment. Store the mapper dictionary and read from it if the type info doesn't exist in Metadata.yml. + + Args: + schema (DotMap): DotMap object (e.g., exp02) + metadata_yml_filepath (Path): Path to metadata.yml. + + Returns: + device_type_mapper (dict): {"device_name", "device_type"} + e.g. {'CameraTop': 'VideoSource', 'Patch1': 'Patch'} + device_sn (dict): {"device_name", "serial_number"} + e.g. {'CameraTop': '21053810'} + """ + import os + + from aeon.io import api + + metadata_yml_filepath = Path(metadata_yml_filepath) + meta_data = ( + api.load( + str(metadata_yml_filepath.parent), + schema.Metadata, + ) + .reset_index() + .to_dict("records")[0]["metadata"] ) - # ---- insert ---- - def insert(): - lab.Camera.insert(camera_list, skip_duplicates=True) - acquisition.ExperimentCamera.RemovalTime.insert(camera_removal_list) - acquisition.ExperimentCamera.insert(camera_installation_list) - acquisition.ExperimentCamera.Position.insert(camera_position_list) - lab.FoodPatch.insert(patch_list, skip_duplicates=True) - acquisition.ExperimentFoodPatch.RemovalTime.insert(patch_removal_list) - acquisition.ExperimentFoodPatch.insert(patch_installation_list) - acquisition.ExperimentFoodPatch.Position.insert(patch_position_list) - lab.WeightScale.insert(weight_scale_list, skip_duplicates=True) - acquisition.ExperimentWeightScale.RemovalTime.insert(weight_scale_removal_list) - acquisition.ExperimentWeightScale.insert(weight_scale_installation_list) - - if acquisition.Experiment.connection.in_transaction: - insert() - else: - with acquisition.Experiment.connection.transaction: - insert() + + # Store the mapper dictionary here + repository_root = ( + os.popen("git rev-parse --show-toplevel").read().strip() + ) # repo root path + filename = Path( + repository_root + "/aeon/dj_pipeline/create_experiments/device_type_mapper.json" + ) + + device_type_mapper = {} # {device_name: device_type} + device_sn = {} # {device_name: device_sn} + + if filename.is_file(): + with filename.open("r") as f: + device_type_mapper = json.load(f) + + try: # if the device type is not in the mapper, add it + for item in meta_data.Devices: + if isinstance(item, DotMap): + device_type_mapper[item.Name] = item.Type + device_sn[item.Name] = ( + item.SerialNumber or item.PortName or None + ) # assign either the serial number (if it exists) or port name. If neither exists, assign None + elif isinstance(item, str): # presocial + if meta_data.Devices[item].get("Type"): + device_type_mapper[item] = meta_data.Devices[item].get("Type") + device_sn[item] = ( + meta_data.Devices[item].get("SerialNumber") + or meta_data.Devices[item].get("PortName") + or None + ) + + with filename.open("w") as f: + json.dump(device_type_mapper, f) + except AttributeError: + pass + + return device_type_mapper, device_sn def ingest_epoch_metadata_octagon(experiment_name, metadata_yml_filepath): """ Temporary ingestion routine to load devices' meta information for Octagon arena experiments """ - from aeon.dj_pipeline import streams + streams = dj.VirtualModule("streams", streams_maker.STREAMS_MODULE_NAME) oct01_devices = [ ("Metadata", "Metadata"), @@ -398,3 +546,6 @@ def ingest_epoch_metadata_octagon(experiment_name, metadata_yml_filepath): experiment_table.insert1( (experiment_name, device_sn, epoch_start, device_name) ) + + +# endregion diff --git a/aeon/dj_pipeline/utils/streams_maker.py b/aeon/dj_pipeline/utils/streams_maker.py new file mode 100644 index 00000000..46043e94 --- /dev/null +++ b/aeon/dj_pipeline/utils/streams_maker.py @@ -0,0 +1,333 @@ +import inspect +from pathlib import Path +import datajoint as dj +import pandas as pd +import re +import importlib + +import aeon +from aeon.dj_pipeline import acquisition, get_schema_name +from aeon.io import api as io_api + +logger = dj.logger + + +# schema_name = f'u_{dj.config["database.user"]}_streams' # for testing +schema_name = get_schema_name("streams") + +STREAMS_MODULE_NAME = "streams" +_STREAMS_MODULE_FILE = Path(__file__).parent.parent / f"{STREAMS_MODULE_NAME}.py" + + +class StreamType(dj.Lookup): + """ + Catalog of all steam types for the different device types used across Project Aeon + One StreamType corresponds to one reader class in `aeon.io.reader` + The combination of `stream_reader` and `stream_reader_kwargs` should fully specify + the data loading routine for a particular device, using the `aeon.io.utils` + """ + + definition = """ # Catalog of all stream types used across Project Aeon + stream_type : varchar(20) + --- + stream_reader : varchar(256) # name of the reader class found in `aeon_mecha` package (e.g. aeon.io.reader.Video) + stream_reader_kwargs : longblob # keyword arguments to instantiate the reader class + stream_description='': varchar(256) + stream_hash : uuid # hash of dict(stream_reader_kwargs, stream_reader=stream_reader) + unique index (stream_hash) + """ + + +class DeviceType(dj.Lookup): + """ + Catalog of all device types used across Project Aeon + """ + + definition = """ # Catalog of all device types used across Project Aeon + device_type: varchar(36) + --- + device_description='': varchar(256) + """ + + class Stream(dj.Part): + definition = """ # Data stream(s) associated with a particular device type + -> master + -> StreamType + """ + + +class Device(dj.Lookup): + definition = """ # Physical devices, of a particular type, identified by unique serial number + device_serial_number: varchar(12) + --- + -> DeviceType + """ + + +# region Helper functions for creating device tables. + + +def get_device_template(device_type: str): + """Returns table class template for ExperimentDevice""" + device_title = device_type + device_type = dj.utils.from_camel_case(device_type) + + class ExperimentDevice(dj.Manual): + definition = f""" + # {device_title} placement and operation for a particular time period, at a certain location, for a given experiment (auto-generated with aeon_mecha-{aeon.__version__}) + -> acquisition.Experiment + -> Device + {device_type}_install_time : datetime(6) # time of the {device_type} placed and started operation at this position + --- + {device_type}_name : varchar(36) + """ + + class Attribute(dj.Part): + definition = """ # metadata/attributes (e.g. FPS, config, calibration, etc.) associated with this experimental device + -> master + attribute_name : varchar(32) + --- + attribute_value=null : longblob + """ + + class RemovalTime(dj.Part): + definition = f""" + -> master + --- + {device_type}_removal_time: datetime(6) # time of the {device_type} being removed + """ + + ExperimentDevice.__name__ = f"{device_title}" + + return ExperimentDevice + + +def get_device_stream_template(device_type: str, stream_type: str, streams_module): + """Returns table class template for DeviceDataStream""" + + ExperimentDevice = getattr(streams_module, device_type) + + # DeviceDataStream table(s) + stream_detail = ( + streams_module.StreamType + & ( + streams_module.DeviceType.Stream + & {"device_type": device_type, "stream_type": stream_type} + ) + ).fetch1() + + for i, n in enumerate(stream_detail["stream_reader"].split(".")): + if i == 0: + reader = aeon + else: + reader = getattr(reader, n) + + stream = reader(**stream_detail["stream_reader_kwargs"]) + + table_definition = f""" # Raw per-chunk {stream_type} data stream from {device_type} (auto-generated with aeon_mecha-{aeon.__version__}) + -> {device_type} + -> acquisition.Chunk + --- + sample_count: int # number of data points acquired from this stream for a given chunk + timestamps: longblob # (datetime) timestamps of {stream_type} data + """ + + for col in stream.columns: + if col.startswith("_"): + continue + table_definition += f"{col}: longblob\n\t\t\t" + + class DeviceDataStream(dj.Imported): + definition = table_definition + _stream_reader = reader + _stream_detail = stream_detail + + @property + def key_source(self): + f""" + Only the combination of Chunk and {device_type} with overlapping time + + Chunk(s) that started after {device_type} install time and ended before {device_type} remove time + + Chunk(s) that started after {device_type} install time for {device_type} that are not yet removed + """ + return ( + acquisition.Chunk + * ExperimentDevice.join(ExperimentDevice.RemovalTime, left=True) + & f"chunk_start >= {dj.utils.from_camel_case(device_type)}_install_time" + & f'chunk_start < IFNULL({dj.utils.from_camel_case(device_type)}_removal_time, "2200-01-01")' + ) + + def make(self, key): + chunk_start, chunk_end, dir_type = (acquisition.Chunk & key).fetch1( + "chunk_start", "chunk_end", "directory_type" + ) + raw_data_dir = acquisition.Experiment.get_data_directory( + key, directory_type=dir_type + ) + + device_name = (ExperimentDevice & key).fetch1( + f"{dj.utils.from_camel_case(device_type)}_name" + ) + + stream = self._stream_reader( + **{ + k: v.format(**{k: device_name}) if k == "pattern" else v + for k, v in self._stream_detail["stream_reader_kwargs"].items() + } + ) + + stream_data = io_api.load( + root=raw_data_dir.as_posix(), + reader=stream, + start=pd.Timestamp(chunk_start), + end=pd.Timestamp(chunk_end), + ) + + self.insert1( + { + **key, + "sample_count": len(stream_data), + "timestamps": stream_data.index.values, + **{ + c: stream_data[c].values + for c in stream.columns + if not c.startswith("_") + }, + } + ) + + DeviceDataStream.__name__ = f"{device_type}{stream_type}" + + return DeviceDataStream + + +# endregion + + +def main(create_tables=True): + + if not _STREAMS_MODULE_FILE.exists(): + with open(_STREAMS_MODULE_FILE, "w") as f: + imports_str = ( + "#---- DO NOT MODIFY ----\n" + "#---- THIS FILE IS AUTO-GENERATED BY `streams_maker.py` ----\n\n" + "import datajoint as dj\n" + "import pandas as pd\n" + "from uuid import UUID\n\n" + "import aeon\n" + "from aeon.dj_pipeline import acquisition, get_schema_name\n" + "from aeon.io import api as io_api\n\n" + "schema_name = get_schema_name('streams')\n" + "schema = dj.Schema()\n\n\n" + ) + f.write(imports_str) + for table_class in (StreamType, DeviceType, Device): + device_table_def = inspect.getsource(table_class).lstrip() + full_def = "@schema \n" + device_table_def + "\n\n" + f.write(full_def) + + streams = importlib.import_module(f"aeon.dj_pipeline.{STREAMS_MODULE_NAME}") + streams.schema.activate(schema_name) + + if create_tables: + # Create DeviceType tables. + for device_info in streams.DeviceType.fetch(as_dict=True): + if hasattr(streams, device_info["device_type"]): + continue + + table_class = get_device_template(device_info["device_type"]) + streams.__dict__[table_class.__name__] = table_class + + device_table_def = inspect.getsource(table_class).lstrip() + replacements = { + "ExperimentDevice": device_info["device_type"], + "{device_title}": dj.utils.from_camel_case(device_info["device_type"]), + "{device_type}": dj.utils.from_camel_case(device_info["device_type"]), + "{aeon.__version__}": aeon.__version__, + } + for old, new in replacements.items(): + device_table_def = device_table_def.replace(old, new) + full_def = "@schema \n" + device_table_def + "\n\n" + with open(_STREAMS_MODULE_FILE, "r") as f: + existing_content = f.read() + + if full_def in existing_content: + continue + + with open(_STREAMS_MODULE_FILE, "a") as f: + f.write(full_def) + + # Create DeviceDataStream tables. + for device_info in streams.DeviceType.Stream.fetch(as_dict=True): + + device_type = device_info["device_type"] + stream_type = device_info["stream_type"] + table_name = f"{device_type}{stream_type}" + + if hasattr(streams, table_name): + continue + + table_class = get_device_stream_template( + device_type, stream_type, streams_module=streams + ) + + stream_obj = table_class.__dict__["_stream_reader"] + reader = stream_obj.__module__ + "." + stream_obj.__name__ + stream_detail = table_class.__dict__["_stream_detail"] + + device_stream_table_def = inspect.getsource(table_class).lstrip() + + old_definition = f"""# Raw per-chunk {stream_type} data stream from {device_type} (auto-generated with aeon_mecha-{aeon.__version__}) + -> {device_type} + -> acquisition.Chunk + --- + sample_count: int # number of data points acquired from this stream for a given chunk + timestamps: longblob # (datetime) timestamps of {stream_type} data + """ + + replacements = { + "DeviceDataStream": f"{device_type}{stream_type}", + "ExperimentDevice": device_type, + 'f"chunk_start >= {dj.utils.from_camel_case(device_type)}_install_time"': f"'chunk_start >= {dj.utils.from_camel_case(device_type)}_install_time'", + """f'chunk_start < IFNULL({dj.utils.from_camel_case(device_type)}_removal_time, "2200-01-01")'""": f"""'chunk_start < IFNULL({dj.utils.from_camel_case(device_type)}_removal_time, "2200-01-01")'""", + 'f"{dj.utils.from_camel_case(device_type)}_name"': f"'{dj.utils.from_camel_case(device_type)}_name'", + "{device_type}": device_type, + "{stream_type}": stream_type, + "{aeon.__version__}": aeon.__version__, + } + for old, new in replacements.items(): + new_definition = old_definition.replace(old, new) + + replacements["table_definition"] = '"""' + new_definition + '"""' + + for old, new in replacements.items(): + device_stream_table_def = device_stream_table_def.replace(old, new) + + device_stream_table_def = re.sub( + r"_stream_reader\s*=\s*reader", + f"_stream_reader = {reader}", + device_stream_table_def, + ) # insert reader + device_stream_table_def = re.sub( + r"_stream_detail\s*=\s*stream_detail", + f"_stream_detail = {stream_detail}", + device_stream_table_def, + ) # insert stream details + + full_def = "@schema \n" + device_stream_table_def + "\n\n" + + with open(_STREAMS_MODULE_FILE, "r") as f: + existing_content = f.read() + + if full_def in existing_content: + continue + + with open(_STREAMS_MODULE_FILE, "a") as f: + f.write(full_def) + + importlib.reload(streams) + streams.schema.activate(schema_name) + + return streams + + +streams = main() diff --git a/aeon/dj_pipeline/utils/video.py b/aeon/dj_pipeline/utils/video.py index fb791b0a..af1e9595 100644 --- a/aeon/dj_pipeline/utils/video.py +++ b/aeon/dj_pipeline/utils/video.py @@ -1,7 +1,6 @@ import base64 import datetime from pathlib import Path - import cv2 import numpy as np import pandas as pd @@ -59,4 +58,4 @@ def retrieve_video_frames( "finalChunk": bool(last_frame_time >= end_time), }, "frames": encoded_frames, - } + } \ No newline at end of file diff --git a/aeon/schema/dataset.py b/aeon/schema/dataset.py index ae787456..f74bcce7 100644 --- a/aeon/schema/dataset.py +++ b/aeon/schema/dataset.py @@ -3,8 +3,11 @@ import aeon.schema.core as stream import aeon.schema.foraging as foraging import aeon.schema.octagon as octagon +from aeon.io import reader from aeon.io.device import Device +__all__ = ["exp02", "exp01", "octagon01", "presocial"] + exp02 = DotMap( [ Device("Metadata", stream.metadata), @@ -24,34 +27,52 @@ ] ) -exp01 = DotMap([ - Device("SessionData", foraging.session), - Device("FrameTop", stream.video, stream.position), - Device("FrameEast", stream.video), - Device("FrameGate", stream.video), - Device("FrameNorth", stream.video), - Device("FramePatch1", stream.video), - Device("FramePatch2", stream.video), - Device("FrameSouth", stream.video), - Device("FrameWest", stream.video), - Device("Patch1", foraging.depletionFunction, stream.encoder, foraging.feeder), - Device("Patch2", foraging.depletionFunction, stream.encoder, foraging.feeder) -]) +exp01 = DotMap( + [ + Device("SessionData", foraging.session), + Device("FrameTop", stream.video, stream.position), + Device("FrameEast", stream.video), + Device("FrameGate", stream.video), + Device("FrameNorth", stream.video), + Device("FramePatch1", stream.video), + Device("FramePatch2", stream.video), + Device("FrameSouth", stream.video), + Device("FrameWest", stream.video), + Device("Patch1", foraging.depletionFunction, stream.encoder, foraging.feeder), + Device("Patch2", foraging.depletionFunction, stream.encoder, foraging.feeder), + ] +) -octagon01 = DotMap([ - Device("Metadata", stream.metadata), - Device("CameraTop", stream.video, stream.position), - Device("CameraColorTop", stream.video), - Device("ExperimentalMetadata", stream.subject_state), - Device("Photodiode", octagon.photodiode), - Device("OSC", octagon.OSC), - Device("TaskLogic", octagon.TaskLogic), - Device("Wall1", octagon.Wall), - Device("Wall2", octagon.Wall), - Device("Wall3", octagon.Wall), - Device("Wall4", octagon.Wall), - Device("Wall5", octagon.Wall), - Device("Wall6", octagon.Wall), - Device("Wall7", octagon.Wall), - Device("Wall8", octagon.Wall) -]) +octagon01 = DotMap( + [ + Device("Metadata", stream.metadata), + Device("CameraTop", stream.video, stream.position), + Device("CameraColorTop", stream.video), + Device("ExperimentalMetadata", stream.subject_state), + Device("Photodiode", octagon.photodiode), + Device("OSC", octagon.OSC), + Device("TaskLogic", octagon.TaskLogic), + Device("Wall1", octagon.Wall), + Device("Wall2", octagon.Wall), + Device("Wall3", octagon.Wall), + Device("Wall4", octagon.Wall), + Device("Wall5", octagon.Wall), + Device("Wall6", octagon.Wall), + Device("Wall7", octagon.Wall), + Device("Wall8", octagon.Wall), + ] +) + +presocial = exp02 +presocial.Patch1.BeamBreak = reader.BitmaskEvent( + pattern="Patch1_32", value=0x22, tag="BeamBroken" +) +presocial.Patch2.BeamBreak = reader.BitmaskEvent( + pattern="Patch2_32", value=0x22, tag="BeamBroken" +) +presocial.Patch1.DeliverPellet = reader.BitmaskEvent( + pattern="Patch1_35", value=0x1, tag="TriggeredPellet" +) +presocial.Patch2.DeliverPellet = reader.BitmaskEvent( + pattern="Patch2_35", value=0x1, tag="TriggeredPellet" +) diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index a6d1868b..e2dce70a 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -45,22 +45,26 @@ x-aeon-ingest-common: &aeon-ingest-common max-file: "5" services: - ingest_high: + acquisition_worker: <<: *aeon-ingest-common - command: ["aeon_ingest", "high_priority", "--duration=-1", "--sleep=1200"] + command: ["aeon_ingest", "acquisition_worker"] - ingest_mid: + streams_worker: <<: *aeon-ingest-common depends_on: - ingest_high: + acquisition_worker: condition: service_started deploy: mode: replicated - replicas: 2 - command: ["aeon_ingest", "mid_priority", "--duration=-1", "--sleep=3600"] + replicas: 3 + command: ["aeon_ingest", "streams_worker"] - dev: + ingest_mid: <<: *aeon-ingest-common + depends_on: + acquisition_worker: + condition: service_started deploy: mode: replicated - replicas: 0 + replicas: 2 + command: ["aeon_ingest", "mid_priority"]