diff --git a/aeon/dj_pipeline/__init__.py b/aeon/dj_pipeline/__init__.py index 4664cb93..ea805b57 100644 --- a/aeon/dj_pipeline/__init__.py +++ b/aeon/dj_pipeline/__init__.py @@ -16,6 +16,12 @@ repository_config = dj.config['custom'].get('repository_config', _default_repository_config) +try: + from .utils import streams_maker + streams = dj.VirtualModule("streams", streams_maker.STREAMS_MODULE_NAME) +except: + pass + def get_schema_name(name): return db_prefix + name diff --git a/aeon/dj_pipeline/acquisition.py b/aeon/dj_pipeline/acquisition.py index 585cf2bc..03a9a0f4 100644 --- a/aeon/dj_pipeline/acquisition.py +++ b/aeon/dj_pipeline/acquisition.py @@ -10,12 +10,11 @@ from aeon.io import reader as io_reader from aeon.schema import dataset as aeon_schema -from . import get_schema_name, lab, subject +from . import get_schema_name from .utils import paths logger = dj.logger schema = dj.schema(get_schema_name("acquisition")) -# streams = dj.VirtualModule("streams", get_schema_name("streams")) # ------------------- Some Constants -------------------------- @@ -124,7 +123,7 @@ class Directory(dj.Part): @classmethod def get_data_directory(cls, experiment_key, directory_type="raw", as_posix=False): - + try: repo_name, dir_path = ( cls.Directory & experiment_key & {"directory_type": directory_type} @@ -135,6 +134,7 @@ def get_data_directory(cls, experiment_key, directory_type="raw", as_posix=False return data_directory.as_posix() if as_posix else data_directory except dj.errors.DataJointError: return + @classmethod def get_data_directories( cls, experiment_key, directory_types=["raw"], as_posix=False @@ -277,14 +277,13 @@ def ingest_epochs(cls, experiment_name, start=None, end=None): - if not specified, ingest all epochs Note: "start" and "end" are datetime specified a string in the format: "%Y-%m-%d %H:%M:%S" """ - from aeon.dj_pipeline import streams - + from .utils import streams_maker from .utils.load_metadata import ( extract_epoch_config, ingest_epoch_metadata, insert_device_types, ) - + device_name = _ref_device_mapping.get(experiment_name, "CameraTop") all_chunks, raw_data_dirs = _get_all_chunks(experiment_name, device_name) @@ -363,15 +362,17 @@ def ingest_epochs(cls, experiment_name, start=None, end=None): if epoch_config: cls.Config.insert1(epoch_config) if metadata_yml_filepath and metadata_yml_filepath.exists(): - + try: - # Ingest streams.DeviceType, streams.Device and create device tables. + # Insert new entries for streams.DeviceType, streams.Device. insert_device_types( _device_schema_mapping[epoch_key["experiment_name"]], metadata_yml_filepath, ) - streams.main(context=streams.__dict__) # create device tables under streams schema + # Define and instantiate new devices/stream tables under `streams` schema + streams_maker.main() with cls.connection.transaction: + # Insert devices' installation/removal/settings ingest_epoch_metadata( experiment_name, metadata_yml_filepath ) diff --git a/aeon/dj_pipeline/populate/process.py b/aeon/dj_pipeline/populate/process.py index c83347cb..023425ad 100644 --- a/aeon/dj_pipeline/populate/process.py +++ b/aeon/dj_pipeline/populate/process.py @@ -36,7 +36,7 @@ from datajoint_utilities.dj_worker import parse_args from aeon.dj_pipeline.populate.worker import ( - high_priority, + acquisition_worker, mid_priority, streams_worker, logger, @@ -46,7 +46,7 @@ # ---- some wrappers to support execution as script or CLI configured_workers = { - "high_priority": high_priority, + "high_priority": acquisition_worker, "mid_priority": mid_priority, "streams_worker": streams_worker, } diff --git a/aeon/dj_pipeline/populate/worker.py b/aeon/dj_pipeline/populate/worker.py index 7b1678dc..b57c4c4e 100644 --- a/aeon/dj_pipeline/populate/worker.py +++ b/aeon/dj_pipeline/populate/worker.py @@ -12,13 +12,15 @@ db_prefix, qc, report, - streams, tracking, ) -from aeon.dj_pipeline.utils import load_metadata +from aeon.dj_pipeline.utils import load_metadata, streams_maker + + +streams = streams_maker.main() __all__ = [ - "high_priority", + "acquisition_worker", "mid_priority", "streams_worker", "WorkerLog", @@ -28,34 +30,61 @@ # ---- Some constants ---- logger = dj.logger -_current_experiment = "exp0.2-r0" -worker_schema_name = db_prefix + "workerlog" +worker_schema_name = db_prefix + "worker" load_metadata.insert_stream_types() +# ---- Manage experiments for automated ingestion ---- + +schema = dj.Schema(worker_schema_name) + + +@schema +class AutomatedExperimentIngestion(dj.Manual): + definition = """ # experiments to undergo automated ingestion + -> acquisition.Experiment + """ + + +def ingest_colony_epochs_chunks(): + """ + Load and insert subjects from colony.csv + Ingest epochs and chunks + for experiments specified in AutomatedExperimentIngestion + """ + load_metadata.ingest_subject() + experiment_names = AutomatedExperimentIngestion.fetch("experiment_name") + for experiment_name in experiment_names: + acquisition.Epoch.ingest_epochs(experiment_name) + acquisition.Chunk.ingest_chunks(experiment_name) + + +def ingest_environment_visits(): + """ + Extract and insert complete visits + for experiments specified in AutomatedExperimentIngestion + """ + experiment_names = AutomatedExperimentIngestion.fetch("experiment_name") + analysis.ingest_environment_visits(experiment_names) + + # ---- Define worker(s) ---- -# configure a worker to process high-priority tasks -high_priority = DataJointWorker( - "high_priority", +# configure a worker to process `acquisition`-related tasks +acquisition_worker = DataJointWorker( + "acquisition_worker", worker_schema_name=worker_schema_name, db_prefix=db_prefix, run_duration=-1, - sleep_duration=600, -) -high_priority(load_metadata.ingest_subject) -high_priority(acquisition.Epoch.ingest_epochs, experiment_name=_current_experiment) -high_priority(acquisition.Chunk.ingest_chunks, experiment_name=_current_experiment) -high_priority(acquisition.ExperimentLog) -high_priority(acquisition.SubjectEnterExit) -high_priority(acquisition.SubjectWeight) -high_priority(acquisition.FoodPatchEvent) -high_priority(acquisition.WheelState) -high_priority(acquisition.WeightMeasurement) -high_priority(acquisition.WeightMeasurementFiltered) - -high_priority( - analysis.ingest_environment_visits, experiment_names=[_current_experiment] + sleep_duration=1200, ) +acquisition_worker(ingest_colony_epochs_chunks) +acquisition_worker(acquisition.ExperimentLog) +acquisition_worker(acquisition.SubjectEnterExit) +acquisition_worker(acquisition.SubjectWeight) +acquisition_worker(acquisition.FoodPatchEvent) +acquisition_worker(acquisition.WheelState) + +acquisition_worker(ingest_environment_visits) # configure a worker to process mid-priority tasks mid_priority = DataJointWorker( @@ -63,16 +92,15 @@ worker_schema_name=worker_schema_name, db_prefix=db_prefix, run_duration=-1, - sleep_duration=120, + sleep_duration=3600, ) mid_priority(qc.CameraQC) mid_priority(tracking.CameraTracking) mid_priority(acquisition.FoodPatchWheel) +mid_priority(acquisition.WeightMeasurement) +mid_priority(acquisition.WeightMeasurementFiltered) -mid_priority( - analysis.visit.ingest_environment_visits, experiment_names=[_current_experiment] -) mid_priority(analysis.OverlapVisit) mid_priority(analysis.VisitSubjectPosition) @@ -93,10 +121,10 @@ "streams_worker", worker_schema_name=worker_schema_name, db_prefix=db_prefix, - run_duration=1, - sleep_duration=600, + run_duration=-1, + sleep_duration=1200, ) for attr in vars(streams).values(): - if is_djtable(attr) and hasattr(attr, "populate"): - streams_worker(attr) + if is_djtable(attr, dj.user_tables.AutoPopulate): + streams_worker(attr, max_calls=10) diff --git a/aeon/dj_pipeline/streams.py b/aeon/dj_pipeline/streams.py index fbdb93d8..120b41fb 100644 --- a/aeon/dj_pipeline/streams.py +++ b/aeon/dj_pipeline/streams.py @@ -1,24 +1,19 @@ -import inspect -import os +#---- DO NOT MODIFY ---- +#---- THIS FILE IS AUTO-GENERATED BY `streams_maker.py` ---- import datajoint as dj import pandas as pd +from uuid import UUID import aeon from aeon.dj_pipeline import acquisition, get_schema_name from aeon.io import api as io_api -logger = dj.logger +schema_name = get_schema_name('streams') +schema = dj.Schema() -# schema_name = f'u_{dj.config["database.user"]}_streams' # for testing -schema_name = get_schema_name("streams") -schema = dj.schema(schema_name) - -schema.spawn_missing_classes() - - -@schema +@schema class StreamType(dj.Lookup): """ Catalog of all steam types for the different device types used across Project Aeon @@ -38,7 +33,7 @@ class StreamType(dj.Lookup): """ -@schema +@schema class DeviceType(dj.Lookup): """ Catalog of all device types used across Project Aeon @@ -57,7 +52,7 @@ class Stream(dj.Part): """ -@schema +@schema class Device(dj.Lookup): definition = """ # Physical devices, of a particular type, identified by unique serial number device_serial_number: varchar(12) @@ -66,22 +61,42 @@ class Device(dj.Lookup): """ -# region Helper functions for creating device tables. +@schema +class Patch(dj.Manual): + definition = f""" + # patch placement and operation for a particular time period, at a certain location, for a given experiment (auto-generated with aeon_mecha-unknown) + -> acquisition.Experiment + -> Device + patch_install_time : datetime(6) # time of the patch placed and started operation at this position + --- + patch_name : varchar(36) + """ + + class Attribute(dj.Part): + definition = """ # metadata/attributes (e.g. FPS, config, calibration, etc.) associated with this experimental device + -> master + attribute_name : varchar(32) + --- + attribute_value=null : longblob + """ + class RemovalTime(dj.Part): + definition = f""" + -> master + --- + patch_removal_time: datetime(6) # time of the patch being removed + """ -def get_device_template(device_type: str): - """Returns table class template for ExperimentDevice""" - device_title = device_type - device_type = dj.utils.from_camel_case(device_type) - class ExperimentDevice(dj.Manual): +@schema +class UndergroundFeeder(dj.Manual): definition = f""" - # {device_title} placement and operation for a particular time period, at a certain location, for a given experiment (auto-generated with aeon_mecha-{aeon.__version__}) + # underground_feeder placement and operation for a particular time period, at a certain location, for a given experiment (auto-generated with aeon_mecha-unknown) -> acquisition.Experiment -> Device - {device_type}_install_time : datetime(6) # time of the {device_type} placed and started operation at this position + underground_feeder_install_time : datetime(6) # time of the underground_feeder placed and started operation at this position --- - {device_type}_name : varchar(36) + underground_feeder_name : varchar(36) """ class Attribute(dj.Part): @@ -96,64 +111,325 @@ class RemovalTime(dj.Part): definition = f""" -> master --- - {device_type}_removal_time: datetime(6) # time of the {device_type} being removed + underground_feeder_removal_time: datetime(6) # time of the underground_feeder being removed + """ + + +@schema +class VideoSource(dj.Manual): + definition = f""" + # video_source placement and operation for a particular time period, at a certain location, for a given experiment (auto-generated with aeon_mecha-unknown) + -> acquisition.Experiment + -> Device + video_source_install_time : datetime(6) # time of the video_source placed and started operation at this position + --- + video_source_name : varchar(36) + """ + + class Attribute(dj.Part): + definition = """ # metadata/attributes (e.g. FPS, config, calibration, etc.) associated with this experimental device + -> master + attribute_name : varchar(32) + --- + attribute_value=null : longblob """ - ExperimentDevice.__name__ = f"{device_title}" + class RemovalTime(dj.Part): + definition = f""" + -> master + --- + video_source_removal_time: datetime(6) # time of the video_source being removed + """ - return ExperimentDevice +@schema +class PatchBeamBreak(dj.Imported): + definition = """# Raw per-chunk BeamBreak data stream from Patch (auto-generated with aeon_mecha-unknown) + -> Patch + -> acquisition.Chunk + --- + sample_count: int # number of data points acquired from this stream for a given chunk + timestamps: longblob # (datetime) timestamps of BeamBreak data + """ + _stream_reader = aeon.io.reader.BitmaskEvent + _stream_detail = {'stream_type': 'BeamBreak', 'stream_reader': 'aeon.io.reader.BitmaskEvent', 'stream_reader_kwargs': {'pattern': '{pattern}_32', 'value': 34, 'tag': 'BeamBroken'}, 'stream_description': '', 'stream_hash': UUID('b14171e6-d27d-117a-ae73-a16c4b5fc8a2')} -def get_device_stream_template(device_type: str, stream_type: str): - """Returns table class template for DeviceDataStream""" - - context = inspect.currentframe().f_back.f_locals["context"] - ExperimentDevice = context[device_type] - - # DeviceDataStream table(s) - stream_detail = ( - StreamType - & (DeviceType.Stream & {"device_type": device_type, "stream_type": stream_type}) - ).fetch1() + @property + def key_source(self): + f""" + Only the combination of Chunk and Patch with overlapping time + + Chunk(s) that started after Patch install time and ended before Patch remove time + + Chunk(s) that started after Patch install time for Patch that are not yet removed + """ + return ( + acquisition.Chunk + * Patch.join(Patch.RemovalTime, left=True) + & 'chunk_start >= patch_install_time' + & 'chunk_start < IFNULL(patch_removal_time, "2200-01-01")' + ) - for i, n in enumerate(stream_detail["stream_reader"].split(".")): - if i == 0: - reader = aeon - else: - reader = getattr(reader, n) + def make(self, key): + chunk_start, chunk_end, dir_type = (acquisition.Chunk & key).fetch1( + "chunk_start", "chunk_end", "directory_type" + ) + raw_data_dir = acquisition.Experiment.get_data_directory( + key, directory_type=dir_type + ) - stream = reader(**stream_detail["stream_reader_kwargs"]) + device_name = (Patch & key).fetch1( + 'patch_name' + ) - table_definition = f""" # Raw per-chunk {stream_type} data stream from {device_type} (auto-generated with aeon_mecha-{aeon.__version__}) - -> {device_type} - -> acquisition.Chunk - --- - sample_count: int # number of data points acquired from this stream for a given chunk - timestamps: longblob # (datetime) timestamps of {stream_type} data - """ + stream = self._stream_reader( + **{ + k: v.format(**{k: device_name}) if k == "pattern" else v + for k, v in self._stream_detail["stream_reader_kwargs"].items() + } + ) + + stream_data = io_api.load( + root=raw_data_dir.as_posix(), + reader=stream, + start=pd.Timestamp(chunk_start), + end=pd.Timestamp(chunk_end), + ) + + self.insert1( + { + **key, + "sample_count": len(stream_data), + "timestamps": stream_data.index.values, + **{ + c: stream_data[c].values + for c in stream.columns + if not c.startswith("_") + }, + } + ) + + +@schema +class PatchDeliverPellet(dj.Imported): + definition = """# Raw per-chunk DeliverPellet data stream from Patch (auto-generated with aeon_mecha-unknown) + -> Patch + -> acquisition.Chunk + --- + sample_count: int # number of data points acquired from this stream for a given chunk + timestamps: longblob # (datetime) timestamps of DeliverPellet data + """ + _stream_reader = aeon.io.reader.BitmaskEvent + _stream_detail = {'stream_type': 'DeliverPellet', 'stream_reader': 'aeon.io.reader.BitmaskEvent', 'stream_reader_kwargs': {'pattern': '{pattern}_35', 'value': 1, 'tag': 'TriggeredPellet'}, 'stream_description': '', 'stream_hash': UUID('c49dda51-2e38-8b49-d1d8-2e54ea928e9c')} + + @property + def key_source(self): + f""" + Only the combination of Chunk and Patch with overlapping time + + Chunk(s) that started after Patch install time and ended before Patch remove time + + Chunk(s) that started after Patch install time for Patch that are not yet removed + """ + return ( + acquisition.Chunk + * Patch.join(Patch.RemovalTime, left=True) + & 'chunk_start >= patch_install_time' + & 'chunk_start < IFNULL(patch_removal_time, "2200-01-01")' + ) + + def make(self, key): + chunk_start, chunk_end, dir_type = (acquisition.Chunk & key).fetch1( + "chunk_start", "chunk_end", "directory_type" + ) + raw_data_dir = acquisition.Experiment.get_data_directory( + key, directory_type=dir_type + ) + + device_name = (Patch & key).fetch1( + 'patch_name' + ) + + stream = self._stream_reader( + **{ + k: v.format(**{k: device_name}) if k == "pattern" else v + for k, v in self._stream_detail["stream_reader_kwargs"].items() + } + ) + + stream_data = io_api.load( + root=raw_data_dir.as_posix(), + reader=stream, + start=pd.Timestamp(chunk_start), + end=pd.Timestamp(chunk_end), + ) + + self.insert1( + { + **key, + "sample_count": len(stream_data), + "timestamps": stream_data.index.values, + **{ + c: stream_data[c].values + for c in stream.columns + if not c.startswith("_") + }, + } + ) + + +@schema +class PatchDepletionState(dj.Imported): + definition = """# Raw per-chunk DepletionState data stream from Patch (auto-generated with aeon_mecha-unknown) + -> Patch + -> acquisition.Chunk + --- + sample_count: int # number of data points acquired from this stream for a given chunk + timestamps: longblob # (datetime) timestamps of DepletionState data + """ + _stream_reader = aeon.schema.foraging._PatchState + _stream_detail = {'stream_type': 'DepletionState', 'stream_reader': 'aeon.schema.foraging._PatchState', 'stream_reader_kwargs': {'pattern': '{pattern}_State'}, 'stream_description': '', 'stream_hash': UUID('73025490-348c-18fd-d565-8e682b5b4bcd')} + + @property + def key_source(self): + f""" + Only the combination of Chunk and Patch with overlapping time + + Chunk(s) that started after Patch install time and ended before Patch remove time + + Chunk(s) that started after Patch install time for Patch that are not yet removed + """ + return ( + acquisition.Chunk + * Patch.join(Patch.RemovalTime, left=True) + & 'chunk_start >= patch_install_time' + & 'chunk_start < IFNULL(patch_removal_time, "2200-01-01")' + ) + + def make(self, key): + chunk_start, chunk_end, dir_type = (acquisition.Chunk & key).fetch1( + "chunk_start", "chunk_end", "directory_type" + ) + raw_data_dir = acquisition.Experiment.get_data_directory( + key, directory_type=dir_type + ) + + device_name = (Patch & key).fetch1( + 'patch_name' + ) + + stream = self._stream_reader( + **{ + k: v.format(**{k: device_name}) if k == "pattern" else v + for k, v in self._stream_detail["stream_reader_kwargs"].items() + } + ) + + stream_data = io_api.load( + root=raw_data_dir.as_posix(), + reader=stream, + start=pd.Timestamp(chunk_start), + end=pd.Timestamp(chunk_end), + ) + + self.insert1( + { + **key, + "sample_count": len(stream_data), + "timestamps": stream_data.index.values, + **{ + c: stream_data[c].values + for c in stream.columns + if not c.startswith("_") + }, + } + ) + + +@schema +class PatchEncoder(dj.Imported): + definition = """# Raw per-chunk Encoder data stream from Patch (auto-generated with aeon_mecha-unknown) + -> Patch + -> acquisition.Chunk + --- + sample_count: int # number of data points acquired from this stream for a given chunk + timestamps: longblob # (datetime) timestamps of Encoder data + """ + _stream_reader = aeon.io.reader.Encoder + _stream_detail = {'stream_type': 'Encoder', 'stream_reader': 'aeon.io.reader.Encoder', 'stream_reader_kwargs': {'pattern': '{pattern}_90'}, 'stream_description': '', 'stream_hash': UUID('45002714-c31d-b2b8-a6e6-6ae624385cc1')} + + @property + def key_source(self): + f""" + Only the combination of Chunk and Patch with overlapping time + + Chunk(s) that started after Patch install time and ended before Patch remove time + + Chunk(s) that started after Patch install time for Patch that are not yet removed + """ + return ( + acquisition.Chunk + * Patch.join(Patch.RemovalTime, left=True) + & 'chunk_start >= patch_install_time' + & 'chunk_start < IFNULL(patch_removal_time, "2200-01-01")' + ) + + def make(self, key): + chunk_start, chunk_end, dir_type = (acquisition.Chunk & key).fetch1( + "chunk_start", "chunk_end", "directory_type" + ) + raw_data_dir = acquisition.Experiment.get_data_directory( + key, directory_type=dir_type + ) + + device_name = (Patch & key).fetch1( + 'patch_name' + ) + + stream = self._stream_reader( + **{ + k: v.format(**{k: device_name}) if k == "pattern" else v + for k, v in self._stream_detail["stream_reader_kwargs"].items() + } + ) + + stream_data = io_api.load( + root=raw_data_dir.as_posix(), + reader=stream, + start=pd.Timestamp(chunk_start), + end=pd.Timestamp(chunk_end), + ) + + self.insert1( + { + **key, + "sample_count": len(stream_data), + "timestamps": stream_data.index.values, + **{ + c: stream_data[c].values + for c in stream.columns + if not c.startswith("_") + }, + } + ) - for col in stream.columns: - if col.startswith("_"): - continue - table_definition += f"{col}: longblob\n\t\t\t" - class DeviceDataStream(dj.Imported): - definition = table_definition - _stream_reader = reader - _stream_detail = stream_detail +@schema +class UndergroundFeederBeamBreak(dj.Imported): + definition = """# Raw per-chunk BeamBreak data stream from UndergroundFeeder (auto-generated with aeon_mecha-unknown) + -> UndergroundFeeder + -> acquisition.Chunk + --- + sample_count: int # number of data points acquired from this stream for a given chunk + timestamps: longblob # (datetime) timestamps of BeamBreak data + """ + _stream_reader = aeon.io.reader.BitmaskEvent + _stream_detail = {'stream_type': 'BeamBreak', 'stream_reader': 'aeon.io.reader.BitmaskEvent', 'stream_reader_kwargs': {'pattern': '{pattern}_32', 'value': 34, 'tag': 'BeamBroken'}, 'stream_description': '', 'stream_hash': UUID('b14171e6-d27d-117a-ae73-a16c4b5fc8a2')} @property def key_source(self): f""" - Only the combination of Chunk and {device_type} with overlapping time - + Chunk(s) that started after {device_type} install time and ended before {device_type} remove time - + Chunk(s) that started after {device_type} install time for {device_type} that are not yet removed + Only the combination of Chunk and UndergroundFeeder with overlapping time + + Chunk(s) that started after UndergroundFeeder install time and ended before UndergroundFeeder remove time + + Chunk(s) that started after UndergroundFeeder install time for UndergroundFeeder that are not yet removed """ return ( acquisition.Chunk - * ExperimentDevice.join(ExperimentDevice.RemovalTime, left=True) - & f"chunk_start >= {dj.utils.from_camel_case(device_type)}_install_time" - & f'chunk_start < IFNULL({dj.utils.from_camel_case(device_type)}_removal_time, "2200-01-01")' + * UndergroundFeeder.join(UndergroundFeeder.RemovalTime, left=True) + & 'chunk_start >= underground_feeder_install_time' + & 'chunk_start < IFNULL(underground_feeder_removal_time, "2200-01-01")' ) def make(self, key): @@ -164,7 +440,9 @@ def make(self, key): key, directory_type=dir_type ) - device_name = (ExperimentDevice & key).fetch1(f"{dj.utils.from_camel_case(device_type)}_name") + device_name = (UndergroundFeeder & key).fetch1( + 'underground_feeder_name' + ) stream = self._stream_reader( **{ @@ -193,106 +471,400 @@ def make(self, key): } ) - DeviceDataStream.__name__ = f"{device_type}{stream_type}" - - return DeviceDataStream - -# endregion - - -def main(context=None): - - import re - if context is None: - context = inspect.currentframe().f_back.f_locals - - # Create DeviceType tables. - for device_info in (DeviceType).fetch(as_dict=True): - if device_info["device_type"] not in locals(): - table_class = get_device_template(device_info["device_type"]) - context[table_class.__name__] = table_class - schema(table_class, context=context) - - device_table_def = inspect.getsource(table_class).lstrip() - replacements = { - "ExperimentDevice": device_info["device_type"], - "{device_title}": dj.utils.from_camel_case(device_info["device_type"]), - "{device_type}": dj.utils.from_camel_case(device_info["device_type"]), - "{aeon.__version__}": aeon.__version__ - } - for old, new in replacements.items(): - device_table_def = device_table_def.replace(old, new) - full_def = "@schema \n" + device_table_def + "\n\n" - if os.path.exists("existing_module.py"): - with open("existing_module.py", "r") as f: - existing_content = f.read() - - if full_def in existing_content: - continue - - with open("existing_module.py", "a") as f: - f.write(full_def) - else: - with open("existing_module.py", "w") as f: - full_def = """import datajoint as dj\nimport pandas as pd\n\nimport aeon\nfrom aeon.dj_pipeline import acquisition\nfrom aeon.io import api as io_api\n\n""" + full_def - f.write(full_def) - - # Create DeviceDataStream tables. - for device_info in (DeviceType.Stream).fetch(as_dict=True): - - device_type = device_info['device_type'] - stream_type = device_info['stream_type'] - table_name = f"{device_type}{stream_type}" - - if table_name not in locals(): - table_class = get_device_stream_template( - device_type, stream_type) - context[table_class.__name__] = table_class - schema(table_class, context=context) - - stream_obj = table_class.__dict__["_stream_reader"] - reader = stream_obj.__module__ + '.' + stream_obj.__name__ - stream_detail = table_class.__dict__["_stream_detail"] - - device_stream_table_def = inspect.getsource(table_class).lstrip() - - old_definition = f"""# Raw per-chunk {stream_type} data stream from {device_type} (auto-generated with aeon_mecha-{aeon.__version__}) - -> {device_type} + +@schema +class UndergroundFeederDeliverPellet(dj.Imported): + definition = """# Raw per-chunk DeliverPellet data stream from UndergroundFeeder (auto-generated with aeon_mecha-unknown) + -> UndergroundFeeder -> acquisition.Chunk --- sample_count: int # number of data points acquired from this stream for a given chunk - timestamps: longblob # (datetime) timestamps of {stream_type} data + timestamps: longblob # (datetime) timestamps of DeliverPellet data + """ + _stream_reader = aeon.io.reader.BitmaskEvent + _stream_detail = {'stream_type': 'DeliverPellet', 'stream_reader': 'aeon.io.reader.BitmaskEvent', 'stream_reader_kwargs': {'pattern': '{pattern}_35', 'value': 1, 'tag': 'TriggeredPellet'}, 'stream_description': '', 'stream_hash': UUID('c49dda51-2e38-8b49-d1d8-2e54ea928e9c')} + + @property + def key_source(self): + f""" + Only the combination of Chunk and UndergroundFeeder with overlapping time + + Chunk(s) that started after UndergroundFeeder install time and ended before UndergroundFeeder remove time + + Chunk(s) that started after UndergroundFeeder install time for UndergroundFeeder that are not yet removed """ + return ( + acquisition.Chunk + * UndergroundFeeder.join(UndergroundFeeder.RemovalTime, left=True) + & 'chunk_start >= underground_feeder_install_time' + & 'chunk_start < IFNULL(underground_feeder_removal_time, "2200-01-01")' + ) + + def make(self, key): + chunk_start, chunk_end, dir_type = (acquisition.Chunk & key).fetch1( + "chunk_start", "chunk_end", "directory_type" + ) + raw_data_dir = acquisition.Experiment.get_data_directory( + key, directory_type=dir_type + ) + + device_name = (UndergroundFeeder & key).fetch1( + 'underground_feeder_name' + ) + + stream = self._stream_reader( + **{ + k: v.format(**{k: device_name}) if k == "pattern" else v + for k, v in self._stream_detail["stream_reader_kwargs"].items() + } + ) + + stream_data = io_api.load( + root=raw_data_dir.as_posix(), + reader=stream, + start=pd.Timestamp(chunk_start), + end=pd.Timestamp(chunk_end), + ) + + self.insert1( + { + **key, + "sample_count": len(stream_data), + "timestamps": stream_data.index.values, + **{ + c: stream_data[c].values + for c in stream.columns + if not c.startswith("_") + }, + } + ) + + +@schema +class UndergroundFeederDepletionState(dj.Imported): + definition = """# Raw per-chunk DepletionState data stream from UndergroundFeeder (auto-generated with aeon_mecha-unknown) + -> UndergroundFeeder + -> acquisition.Chunk + --- + sample_count: int # number of data points acquired from this stream for a given chunk + timestamps: longblob # (datetime) timestamps of DepletionState data + """ + _stream_reader = aeon.schema.foraging._PatchState + _stream_detail = {'stream_type': 'DepletionState', 'stream_reader': 'aeon.schema.foraging._PatchState', 'stream_reader_kwargs': {'pattern': '{pattern}_State'}, 'stream_description': '', 'stream_hash': UUID('73025490-348c-18fd-d565-8e682b5b4bcd')} + + @property + def key_source(self): + f""" + Only the combination of Chunk and UndergroundFeeder with overlapping time + + Chunk(s) that started after UndergroundFeeder install time and ended before UndergroundFeeder remove time + + Chunk(s) that started after UndergroundFeeder install time for UndergroundFeeder that are not yet removed + """ + return ( + acquisition.Chunk + * UndergroundFeeder.join(UndergroundFeeder.RemovalTime, left=True) + & 'chunk_start >= underground_feeder_install_time' + & 'chunk_start < IFNULL(underground_feeder_removal_time, "2200-01-01")' + ) + + def make(self, key): + chunk_start, chunk_end, dir_type = (acquisition.Chunk & key).fetch1( + "chunk_start", "chunk_end", "directory_type" + ) + raw_data_dir = acquisition.Experiment.get_data_directory( + key, directory_type=dir_type + ) + + device_name = (UndergroundFeeder & key).fetch1( + 'underground_feeder_name' + ) + + stream = self._stream_reader( + **{ + k: v.format(**{k: device_name}) if k == "pattern" else v + for k, v in self._stream_detail["stream_reader_kwargs"].items() + } + ) + + stream_data = io_api.load( + root=raw_data_dir.as_posix(), + reader=stream, + start=pd.Timestamp(chunk_start), + end=pd.Timestamp(chunk_end), + ) + + self.insert1( + { + **key, + "sample_count": len(stream_data), + "timestamps": stream_data.index.values, + **{ + c: stream_data[c].values + for c in stream.columns + if not c.startswith("_") + }, + } + ) + + +@schema +class UndergroundFeederEncoder(dj.Imported): + definition = """# Raw per-chunk Encoder data stream from UndergroundFeeder (auto-generated with aeon_mecha-unknown) + -> UndergroundFeeder + -> acquisition.Chunk + --- + sample_count: int # number of data points acquired from this stream for a given chunk + timestamps: longblob # (datetime) timestamps of Encoder data + """ + _stream_reader = aeon.io.reader.Encoder + _stream_detail = {'stream_type': 'Encoder', 'stream_reader': 'aeon.io.reader.Encoder', 'stream_reader_kwargs': {'pattern': '{pattern}_90'}, 'stream_description': '', 'stream_hash': UUID('45002714-c31d-b2b8-a6e6-6ae624385cc1')} + + @property + def key_source(self): + f""" + Only the combination of Chunk and UndergroundFeeder with overlapping time + + Chunk(s) that started after UndergroundFeeder install time and ended before UndergroundFeeder remove time + + Chunk(s) that started after UndergroundFeeder install time for UndergroundFeeder that are not yet removed + """ + return ( + acquisition.Chunk + * UndergroundFeeder.join(UndergroundFeeder.RemovalTime, left=True) + & 'chunk_start >= underground_feeder_install_time' + & 'chunk_start < IFNULL(underground_feeder_removal_time, "2200-01-01")' + ) + + def make(self, key): + chunk_start, chunk_end, dir_type = (acquisition.Chunk & key).fetch1( + "chunk_start", "chunk_end", "directory_type" + ) + raw_data_dir = acquisition.Experiment.get_data_directory( + key, directory_type=dir_type + ) + + device_name = (UndergroundFeeder & key).fetch1( + 'underground_feeder_name' + ) + + stream = self._stream_reader( + **{ + k: v.format(**{k: device_name}) if k == "pattern" else v + for k, v in self._stream_detail["stream_reader_kwargs"].items() + } + ) + + stream_data = io_api.load( + root=raw_data_dir.as_posix(), + reader=stream, + start=pd.Timestamp(chunk_start), + end=pd.Timestamp(chunk_end), + ) + + self.insert1( + { + **key, + "sample_count": len(stream_data), + "timestamps": stream_data.index.values, + **{ + c: stream_data[c].values + for c in stream.columns + if not c.startswith("_") + }, + } + ) + + +@schema +class VideoSourcePosition(dj.Imported): + definition = """# Raw per-chunk Position data stream from VideoSource (auto-generated with aeon_mecha-unknown) + -> VideoSource + -> acquisition.Chunk + --- + sample_count: int # number of data points acquired from this stream for a given chunk + timestamps: longblob # (datetime) timestamps of Position data + """ + _stream_reader = aeon.io.reader.Position + _stream_detail = {'stream_type': 'Position', 'stream_reader': 'aeon.io.reader.Position', 'stream_reader_kwargs': {'pattern': '{pattern}_200'}, 'stream_description': '', 'stream_hash': UUID('75f9f365-037a-1e9b-ad38-8b2b3783315d')} + + @property + def key_source(self): + f""" + Only the combination of Chunk and VideoSource with overlapping time + + Chunk(s) that started after VideoSource install time and ended before VideoSource remove time + + Chunk(s) that started after VideoSource install time for VideoSource that are not yet removed + """ + return ( + acquisition.Chunk + * VideoSource.join(VideoSource.RemovalTime, left=True) + & 'chunk_start >= video_source_install_time' + & 'chunk_start < IFNULL(video_source_removal_time, "2200-01-01")' + ) + + def make(self, key): + chunk_start, chunk_end, dir_type = (acquisition.Chunk & key).fetch1( + "chunk_start", "chunk_end", "directory_type" + ) + raw_data_dir = acquisition.Experiment.get_data_directory( + key, directory_type=dir_type + ) + + device_name = (VideoSource & key).fetch1( + 'video_source_name' + ) + + stream = self._stream_reader( + **{ + k: v.format(**{k: device_name}) if k == "pattern" else v + for k, v in self._stream_detail["stream_reader_kwargs"].items() + } + ) + + stream_data = io_api.load( + root=raw_data_dir.as_posix(), + reader=stream, + start=pd.Timestamp(chunk_start), + end=pd.Timestamp(chunk_end), + ) + + self.insert1( + { + **key, + "sample_count": len(stream_data), + "timestamps": stream_data.index.values, + **{ + c: stream_data[c].values + for c in stream.columns + if not c.startswith("_") + }, + } + ) + + +@schema +class VideoSourceRegion(dj.Imported): + definition = """# Raw per-chunk Region data stream from VideoSource (auto-generated with aeon_mecha-unknown) + -> VideoSource + -> acquisition.Chunk + --- + sample_count: int # number of data points acquired from this stream for a given chunk + timestamps: longblob # (datetime) timestamps of Region data + """ + _stream_reader = aeon.schema.foraging._RegionReader + _stream_detail = {'stream_type': 'Region', 'stream_reader': 'aeon.schema.foraging._RegionReader', 'stream_reader_kwargs': {'pattern': '{pattern}_201'}, 'stream_description': '', 'stream_hash': UUID('6234a429-8ae5-d7dc-41c8-602ac76da029')} + + @property + def key_source(self): + f""" + Only the combination of Chunk and VideoSource with overlapping time + + Chunk(s) that started after VideoSource install time and ended before VideoSource remove time + + Chunk(s) that started after VideoSource install time for VideoSource that are not yet removed + """ + return ( + acquisition.Chunk + * VideoSource.join(VideoSource.RemovalTime, left=True) + & 'chunk_start >= video_source_install_time' + & 'chunk_start < IFNULL(video_source_removal_time, "2200-01-01")' + ) + + def make(self, key): + chunk_start, chunk_end, dir_type = (acquisition.Chunk & key).fetch1( + "chunk_start", "chunk_end", "directory_type" + ) + raw_data_dir = acquisition.Experiment.get_data_directory( + key, directory_type=dir_type + ) + + device_name = (VideoSource & key).fetch1( + 'video_source_name' + ) + + stream = self._stream_reader( + **{ + k: v.format(**{k: device_name}) if k == "pattern" else v + for k, v in self._stream_detail["stream_reader_kwargs"].items() + } + ) + + stream_data = io_api.load( + root=raw_data_dir.as_posix(), + reader=stream, + start=pd.Timestamp(chunk_start), + end=pd.Timestamp(chunk_end), + ) + + self.insert1( + { + **key, + "sample_count": len(stream_data), + "timestamps": stream_data.index.values, + **{ + c: stream_data[c].values + for c in stream.columns + if not c.startswith("_") + }, + } + ) + + +@schema +class VideoSourceVideo(dj.Imported): + definition = """# Raw per-chunk Video data stream from VideoSource (auto-generated with aeon_mecha-unknown) + -> VideoSource + -> acquisition.Chunk + --- + sample_count: int # number of data points acquired from this stream for a given chunk + timestamps: longblob # (datetime) timestamps of Video data + """ + _stream_reader = aeon.io.reader.Video + _stream_detail = {'stream_type': 'Video', 'stream_reader': 'aeon.io.reader.Video', 'stream_reader_kwargs': {'pattern': '{pattern}'}, 'stream_description': '', 'stream_hash': UUID('4246295b-789f-206d-b413-7af25b7548b2')} + + @property + def key_source(self): + f""" + Only the combination of Chunk and VideoSource with overlapping time + + Chunk(s) that started after VideoSource install time and ended before VideoSource remove time + + Chunk(s) that started after VideoSource install time for VideoSource that are not yet removed + """ + return ( + acquisition.Chunk + * VideoSource.join(VideoSource.RemovalTime, left=True) + & 'chunk_start >= video_source_install_time' + & 'chunk_start < IFNULL(video_source_removal_time, "2200-01-01")' + ) + + def make(self, key): + chunk_start, chunk_end, dir_type = (acquisition.Chunk & key).fetch1( + "chunk_start", "chunk_end", "directory_type" + ) + raw_data_dir = acquisition.Experiment.get_data_directory( + key, directory_type=dir_type + ) + + device_name = (VideoSource & key).fetch1( + 'video_source_name' + ) + + stream = self._stream_reader( + **{ + k: v.format(**{k: device_name}) if k == "pattern" else v + for k, v in self._stream_detail["stream_reader_kwargs"].items() + } + ) + + stream_data = io_api.load( + root=raw_data_dir.as_posix(), + reader=stream, + start=pd.Timestamp(chunk_start), + end=pd.Timestamp(chunk_end), + ) + + self.insert1( + { + **key, + "sample_count": len(stream_data), + "timestamps": stream_data.index.values, + **{ + c: stream_data[c].values + for c in stream.columns + if not c.startswith("_") + }, + } + ) + - replacements = { - "DeviceDataStream": f"{device_type}{stream_type}","ExperimentDevice": device_type, - 'f"chunk_start >= {dj.utils.from_camel_case(device_type)}_install_time"': f"'chunk_start >= {dj.utils.from_camel_case(device_type)}_install_time'", - """f'chunk_start < IFNULL({dj.utils.from_camel_case(device_type)}_removal_time, "2200-01-01")'""": f"""'chunk_start < IFNULL({dj.utils.from_camel_case(device_type)}_removal_time, "2200-01-01")'""", - 'f"{dj.utils.from_camel_case(device_type)}_name"': f"'{dj.utils.from_camel_case(device_type)}_name'", - "{device_type}": device_type, - "{stream_type}": stream_type, - "{aeon.__version__}": aeon.__version__, - } - for old, new in replacements.items(): - new_definition = old_definition.replace(old, new) - - replacements["table_definition"] = '"""'+new_definition+'"""' - - for old, new in replacements.items(): - device_stream_table_def = device_stream_table_def.replace(old, new) - - device_stream_table_def = re.sub(r'_stream_reader\s*=\s*reader', f'_stream_reader = {reader}', device_stream_table_def) # insert reader - device_stream_table_def = re.sub(r'_stream_detail\s*=\s*stream_detail', f'_stream_detail = {stream_detail}', device_stream_table_def) # insert stream details - - full_def = "@schema \n" + device_stream_table_def + "\n\n" - - with open("existing_module.py", "r") as f: - existing_content = f.read() - - if full_def in existing_content: - continue - - with open("existing_module.py", "a") as f: - f.write(full_def) - -main() \ No newline at end of file diff --git a/aeon/dj_pipeline/utils/load_metadata.py b/aeon/dj_pipeline/utils/load_metadata.py index 0650a8f7..84de0a85 100644 --- a/aeon/dj_pipeline/utils/load_metadata.py +++ b/aeon/dj_pipeline/utils/load_metadata.py @@ -2,7 +2,6 @@ import inspect import json import pathlib -import re from collections import defaultdict from pathlib import Path @@ -11,16 +10,11 @@ import pandas as pd from dotmap import DotMap -from aeon.dj_pipeline import ( - acquisition, - dict_to_uuid, - get_schema_name, - lab, - streams, - subject, -) +from aeon.dj_pipeline import acquisition, dict_to_uuid, subject +from aeon.dj_pipeline.utils import streams_maker from aeon.io import api as io_api + _weight_scale_rate = 100 _weight_scale_nest = 1 _colony_csv_path = pathlib.Path("/ceph/aeon/aeon/colony/colony.csv") @@ -43,6 +37,8 @@ def insert_stream_types(): """Insert into streams.streamType table all streams in the dataset schema.""" from aeon.schema import dataset + streams = dj.VirtualModule("streams", streams_maker.STREAMS_MODULE_NAME) + schemas = [v for v in dataset.__dict__.values() if isinstance(v, DotMap)] for schema in schemas: @@ -64,6 +60,8 @@ def insert_stream_types(): def insert_device_types(schema: DotMap, metadata_yml_filepath: Path): """Use dataset.schema and metadata.yml to insert into streams.DeviceType and streams.Device. Only insert device types that were defined both in the device schema (e.g., exp02) and Metadata.yml. It then creates new device tables under streams schema.""" + streams = dj.VirtualModule("streams", streams_maker.STREAMS_MODULE_NAME) + device_info: dict[dict] = get_device_info(schema) device_type_mapper, device_sn = get_device_mapper(schema, metadata_yml_filepath) @@ -137,7 +135,7 @@ def extract_epoch_config(experiment_name: str, metadata_yml_filepath: str) -> di Returns: dict: epoch_config [dict] """ - + metadata_yml_filepath = pathlib.Path(metadata_yml_filepath) epoch_start = datetime.datetime.strptime( metadata_yml_filepath.parent.name, "%Y-%m-%dT%H-%M-%S" @@ -163,7 +161,9 @@ def extract_epoch_config(experiment_name: str, metadata_yml_filepath: str) -> di ) ) - if isinstance(devices, list): # In exp02, it is a list of dict. In presocial. It's a dict of dict. + if isinstance( + devices, list + ): # In exp02, it is a list of dict. In presocial. It's a dict of dict. devices: dict = { d.pop("Name"): d for d in devices } # {deivce_name: device_config} @@ -182,7 +182,8 @@ def ingest_epoch_metadata(experiment_name, metadata_yml_filepath): """ Make entries into device tables """ - + streams = dj.VirtualModule("streams", streams_maker.STREAMS_MODULE_NAME) + if experiment_name.startswith("oct"): ingest_epoch_metadata_octagon(experiment_name, metadata_yml_filepath) return @@ -209,14 +210,14 @@ def ingest_epoch_metadata(experiment_name, metadata_yml_filepath): schema = acquisition._device_schema_mapping[experiment_name] device_type_mapper, _ = get_device_mapper(schema, metadata_yml_filepath) - + # Insert into each device table device_list = [] device_removal_list = [] - + for device_name, device_config in epoch_config["metadata"].items(): if table := getattr(streams, device_type_mapper.get(device_name) or "", None): - + device_sn = device_config.get("SerialNumber", device_config.get("PortName")) device_key = {"device_serial_number": device_sn} @@ -263,38 +264,50 @@ def ingest_epoch_metadata(experiment_name, metadata_yml_filepath): for entry in table_attribute_entry ] - if dict_to_uuid({config["attribute_name"]: config["attribute_value"] for config in current_device_config}) == dict_to_uuid( - {config["attribute_name"]: config["attribute_value"] for config in new_device_config} + if dict_to_uuid( + { + config["attribute_name"]: config["attribute_value"] + for config in current_device_config + } + ) == dict_to_uuid( + { + config["attribute_name"]: config["attribute_value"] + for config in new_device_config + } ): # Skip if none of the configuration has changed. continue # Remove old device - device_removal_list.append({ - **current_device_query.fetch1("KEY"), - f"{dj.utils.from_camel_case(table.__name__)}_removal_time": epoch_config[ - "epoch_start" - ], - }) + device_removal_list.append( + { + **current_device_query.fetch1("KEY"), + f"{dj.utils.from_camel_case(table.__name__)}_removal_time": epoch_config[ + "epoch_start" + ], + } + ) # Insert into table. table.insert1(table_entry, skip_duplicates=True) table.Attribute.insert(table_attribute_entry, ignore_extra_fields=True) # Remove the currently installed devices that are absent in this config - device_removal = lambda device_type, device_entry: any(dj.utils. from_camel_case(device_type) in k for k in device_entry) # returns True if the device type is found in the attribute name - + device_removal = lambda device_type, device_entry: any( + dj.utils.from_camel_case(device_type) in k for k in device_entry + ) # returns True if the device type is found in the attribute name + for device_type in streams.DeviceType.fetch("device_type"): table = getattr(streams, device_type) device_removal_list.extend( - (table - table.RemovalTime - device_list & experiment_key - ).fetch("KEY")) # could be VideoSource or Patch - + (table - table.RemovalTime - device_list & experiment_key).fetch("KEY") + ) # could be VideoSource or Patch + for device_entry in device_removal_list: if device_removal(device_type, device_entry): table.RemovalTime.insert1(device_entry) - - + + # region Get stream & device information def get_stream_entries(schema: DotMap) -> list[dict]: """Returns a list of dictionaries containing the stream entries for a given device, @@ -474,10 +487,14 @@ def get_device_mapper(schema: DotMap, metadata_yml_filepath: Path): device_sn[item.Name] = ( item.SerialNumber or item.PortName or None ) # assign either the serial number (if it exists) or port name. If neither exists, assign None - elif isinstance(item, str): # presocial + elif isinstance(item, str): # presocial if meta_data.Devices[item].get("Type"): device_type_mapper[item] = meta_data.Devices[item].get("Type") - device_sn[item] = meta_data.Devices[item].get("SerialNumber") or meta_data.Devices[item].get("PortName") or None + device_sn[item] = ( + meta_data.Devices[item].get("SerialNumber") + or meta_data.Devices[item].get("PortName") + or None + ) with filename.open("w") as f: json.dump(device_type_mapper, f) @@ -491,7 +508,7 @@ def ingest_epoch_metadata_octagon(experiment_name, metadata_yml_filepath): """ Temporary ingestion routine to load devices' meta information for Octagon arena experiments """ - from aeon.dj_pipeline import streams + streams = dj.VirtualModule("streams", streams_maker.STREAMS_MODULE_NAME) oct01_devices = [ ("Metadata", "Metadata"), diff --git a/aeon/dj_pipeline/utils/streams_maker.py b/aeon/dj_pipeline/utils/streams_maker.py new file mode 100644 index 00000000..46043e94 --- /dev/null +++ b/aeon/dj_pipeline/utils/streams_maker.py @@ -0,0 +1,333 @@ +import inspect +from pathlib import Path +import datajoint as dj +import pandas as pd +import re +import importlib + +import aeon +from aeon.dj_pipeline import acquisition, get_schema_name +from aeon.io import api as io_api + +logger = dj.logger + + +# schema_name = f'u_{dj.config["database.user"]}_streams' # for testing +schema_name = get_schema_name("streams") + +STREAMS_MODULE_NAME = "streams" +_STREAMS_MODULE_FILE = Path(__file__).parent.parent / f"{STREAMS_MODULE_NAME}.py" + + +class StreamType(dj.Lookup): + """ + Catalog of all steam types for the different device types used across Project Aeon + One StreamType corresponds to one reader class in `aeon.io.reader` + The combination of `stream_reader` and `stream_reader_kwargs` should fully specify + the data loading routine for a particular device, using the `aeon.io.utils` + """ + + definition = """ # Catalog of all stream types used across Project Aeon + stream_type : varchar(20) + --- + stream_reader : varchar(256) # name of the reader class found in `aeon_mecha` package (e.g. aeon.io.reader.Video) + stream_reader_kwargs : longblob # keyword arguments to instantiate the reader class + stream_description='': varchar(256) + stream_hash : uuid # hash of dict(stream_reader_kwargs, stream_reader=stream_reader) + unique index (stream_hash) + """ + + +class DeviceType(dj.Lookup): + """ + Catalog of all device types used across Project Aeon + """ + + definition = """ # Catalog of all device types used across Project Aeon + device_type: varchar(36) + --- + device_description='': varchar(256) + """ + + class Stream(dj.Part): + definition = """ # Data stream(s) associated with a particular device type + -> master + -> StreamType + """ + + +class Device(dj.Lookup): + definition = """ # Physical devices, of a particular type, identified by unique serial number + device_serial_number: varchar(12) + --- + -> DeviceType + """ + + +# region Helper functions for creating device tables. + + +def get_device_template(device_type: str): + """Returns table class template for ExperimentDevice""" + device_title = device_type + device_type = dj.utils.from_camel_case(device_type) + + class ExperimentDevice(dj.Manual): + definition = f""" + # {device_title} placement and operation for a particular time period, at a certain location, for a given experiment (auto-generated with aeon_mecha-{aeon.__version__}) + -> acquisition.Experiment + -> Device + {device_type}_install_time : datetime(6) # time of the {device_type} placed and started operation at this position + --- + {device_type}_name : varchar(36) + """ + + class Attribute(dj.Part): + definition = """ # metadata/attributes (e.g. FPS, config, calibration, etc.) associated with this experimental device + -> master + attribute_name : varchar(32) + --- + attribute_value=null : longblob + """ + + class RemovalTime(dj.Part): + definition = f""" + -> master + --- + {device_type}_removal_time: datetime(6) # time of the {device_type} being removed + """ + + ExperimentDevice.__name__ = f"{device_title}" + + return ExperimentDevice + + +def get_device_stream_template(device_type: str, stream_type: str, streams_module): + """Returns table class template for DeviceDataStream""" + + ExperimentDevice = getattr(streams_module, device_type) + + # DeviceDataStream table(s) + stream_detail = ( + streams_module.StreamType + & ( + streams_module.DeviceType.Stream + & {"device_type": device_type, "stream_type": stream_type} + ) + ).fetch1() + + for i, n in enumerate(stream_detail["stream_reader"].split(".")): + if i == 0: + reader = aeon + else: + reader = getattr(reader, n) + + stream = reader(**stream_detail["stream_reader_kwargs"]) + + table_definition = f""" # Raw per-chunk {stream_type} data stream from {device_type} (auto-generated with aeon_mecha-{aeon.__version__}) + -> {device_type} + -> acquisition.Chunk + --- + sample_count: int # number of data points acquired from this stream for a given chunk + timestamps: longblob # (datetime) timestamps of {stream_type} data + """ + + for col in stream.columns: + if col.startswith("_"): + continue + table_definition += f"{col}: longblob\n\t\t\t" + + class DeviceDataStream(dj.Imported): + definition = table_definition + _stream_reader = reader + _stream_detail = stream_detail + + @property + def key_source(self): + f""" + Only the combination of Chunk and {device_type} with overlapping time + + Chunk(s) that started after {device_type} install time and ended before {device_type} remove time + + Chunk(s) that started after {device_type} install time for {device_type} that are not yet removed + """ + return ( + acquisition.Chunk + * ExperimentDevice.join(ExperimentDevice.RemovalTime, left=True) + & f"chunk_start >= {dj.utils.from_camel_case(device_type)}_install_time" + & f'chunk_start < IFNULL({dj.utils.from_camel_case(device_type)}_removal_time, "2200-01-01")' + ) + + def make(self, key): + chunk_start, chunk_end, dir_type = (acquisition.Chunk & key).fetch1( + "chunk_start", "chunk_end", "directory_type" + ) + raw_data_dir = acquisition.Experiment.get_data_directory( + key, directory_type=dir_type + ) + + device_name = (ExperimentDevice & key).fetch1( + f"{dj.utils.from_camel_case(device_type)}_name" + ) + + stream = self._stream_reader( + **{ + k: v.format(**{k: device_name}) if k == "pattern" else v + for k, v in self._stream_detail["stream_reader_kwargs"].items() + } + ) + + stream_data = io_api.load( + root=raw_data_dir.as_posix(), + reader=stream, + start=pd.Timestamp(chunk_start), + end=pd.Timestamp(chunk_end), + ) + + self.insert1( + { + **key, + "sample_count": len(stream_data), + "timestamps": stream_data.index.values, + **{ + c: stream_data[c].values + for c in stream.columns + if not c.startswith("_") + }, + } + ) + + DeviceDataStream.__name__ = f"{device_type}{stream_type}" + + return DeviceDataStream + + +# endregion + + +def main(create_tables=True): + + if not _STREAMS_MODULE_FILE.exists(): + with open(_STREAMS_MODULE_FILE, "w") as f: + imports_str = ( + "#---- DO NOT MODIFY ----\n" + "#---- THIS FILE IS AUTO-GENERATED BY `streams_maker.py` ----\n\n" + "import datajoint as dj\n" + "import pandas as pd\n" + "from uuid import UUID\n\n" + "import aeon\n" + "from aeon.dj_pipeline import acquisition, get_schema_name\n" + "from aeon.io import api as io_api\n\n" + "schema_name = get_schema_name('streams')\n" + "schema = dj.Schema()\n\n\n" + ) + f.write(imports_str) + for table_class in (StreamType, DeviceType, Device): + device_table_def = inspect.getsource(table_class).lstrip() + full_def = "@schema \n" + device_table_def + "\n\n" + f.write(full_def) + + streams = importlib.import_module(f"aeon.dj_pipeline.{STREAMS_MODULE_NAME}") + streams.schema.activate(schema_name) + + if create_tables: + # Create DeviceType tables. + for device_info in streams.DeviceType.fetch(as_dict=True): + if hasattr(streams, device_info["device_type"]): + continue + + table_class = get_device_template(device_info["device_type"]) + streams.__dict__[table_class.__name__] = table_class + + device_table_def = inspect.getsource(table_class).lstrip() + replacements = { + "ExperimentDevice": device_info["device_type"], + "{device_title}": dj.utils.from_camel_case(device_info["device_type"]), + "{device_type}": dj.utils.from_camel_case(device_info["device_type"]), + "{aeon.__version__}": aeon.__version__, + } + for old, new in replacements.items(): + device_table_def = device_table_def.replace(old, new) + full_def = "@schema \n" + device_table_def + "\n\n" + with open(_STREAMS_MODULE_FILE, "r") as f: + existing_content = f.read() + + if full_def in existing_content: + continue + + with open(_STREAMS_MODULE_FILE, "a") as f: + f.write(full_def) + + # Create DeviceDataStream tables. + for device_info in streams.DeviceType.Stream.fetch(as_dict=True): + + device_type = device_info["device_type"] + stream_type = device_info["stream_type"] + table_name = f"{device_type}{stream_type}" + + if hasattr(streams, table_name): + continue + + table_class = get_device_stream_template( + device_type, stream_type, streams_module=streams + ) + + stream_obj = table_class.__dict__["_stream_reader"] + reader = stream_obj.__module__ + "." + stream_obj.__name__ + stream_detail = table_class.__dict__["_stream_detail"] + + device_stream_table_def = inspect.getsource(table_class).lstrip() + + old_definition = f"""# Raw per-chunk {stream_type} data stream from {device_type} (auto-generated with aeon_mecha-{aeon.__version__}) + -> {device_type} + -> acquisition.Chunk + --- + sample_count: int # number of data points acquired from this stream for a given chunk + timestamps: longblob # (datetime) timestamps of {stream_type} data + """ + + replacements = { + "DeviceDataStream": f"{device_type}{stream_type}", + "ExperimentDevice": device_type, + 'f"chunk_start >= {dj.utils.from_camel_case(device_type)}_install_time"': f"'chunk_start >= {dj.utils.from_camel_case(device_type)}_install_time'", + """f'chunk_start < IFNULL({dj.utils.from_camel_case(device_type)}_removal_time, "2200-01-01")'""": f"""'chunk_start < IFNULL({dj.utils.from_camel_case(device_type)}_removal_time, "2200-01-01")'""", + 'f"{dj.utils.from_camel_case(device_type)}_name"': f"'{dj.utils.from_camel_case(device_type)}_name'", + "{device_type}": device_type, + "{stream_type}": stream_type, + "{aeon.__version__}": aeon.__version__, + } + for old, new in replacements.items(): + new_definition = old_definition.replace(old, new) + + replacements["table_definition"] = '"""' + new_definition + '"""' + + for old, new in replacements.items(): + device_stream_table_def = device_stream_table_def.replace(old, new) + + device_stream_table_def = re.sub( + r"_stream_reader\s*=\s*reader", + f"_stream_reader = {reader}", + device_stream_table_def, + ) # insert reader + device_stream_table_def = re.sub( + r"_stream_detail\s*=\s*stream_detail", + f"_stream_detail = {stream_detail}", + device_stream_table_def, + ) # insert stream details + + full_def = "@schema \n" + device_stream_table_def + "\n\n" + + with open(_STREAMS_MODULE_FILE, "r") as f: + existing_content = f.read() + + if full_def in existing_content: + continue + + with open(_STREAMS_MODULE_FILE, "a") as f: + f.write(full_def) + + importlib.reload(streams) + streams.schema.activate(schema_name) + + return streams + + +streams = main() diff --git a/aeon/dj_pipeline/utils/video.py b/aeon/dj_pipeline/utils/video.py index 60434419..06b811a5 100644 --- a/aeon/dj_pipeline/utils/video.py +++ b/aeon/dj_pipeline/utils/video.py @@ -10,10 +10,9 @@ from aeon.io import api as io_api from aeon.io import video as io_video -raw_data_dir = pathlib.Path("/ceph/aeon/aeon/data/raw/AEON2/experiment0.2") - def retrieve_video_frames( + experiment_name, camera_name, start_time, end_time, @@ -22,6 +21,12 @@ def retrieve_video_frames( chunk_size=50, **kwargs, ): + from aeon.dj_pipeline import acquisition + + raw_data_dir = acquisition.Experiment.get_data_directory( + {"experiment_name": experiment_name} + ) + # do some data loading videodata = io_api.load( root=raw_data_dir.as_posix(), diff --git a/aeon/dj_pipeline/webapps/sciviz/specsheet.yaml b/aeon/dj_pipeline/webapps/sciviz/specsheet.yaml index 3dee1e19..29a54cec 100644 --- a/aeon/dj_pipeline/webapps/sciviz/specsheet.yaml +++ b/aeon/dj_pipeline/webapps/sciviz/specsheet.yaml @@ -38,7 +38,7 @@ SciViz: pages: Colony: - route: /colony_entry + route: /colony_page grids: grid1: type: fixed @@ -70,16 +70,19 @@ SciViz: - type: attribute input: Note destination: note - # component_templates: - # Colony Table: - # route: /colony_meta - # type: metadata - # restriction: > - # def restriction(**kwargs): - # return dict(**kwargs) - # dj_query: > - # def dj_query(aeon_lab): - # return dict(query=aeon_lab.Colony(), fetch_args=[]) + Colony: + route: /colony_page_table + x: 1 + y: 0 + height: 1 + width: 1 + type: antd-table + restriction: > + def restriction(**kwargs): + return dict(**kwargs) + dj_query: > + def dj_query(aeon_lab): + return {'query': aeon_lab.Colony(), 'fetch_args': []} Subjects: route: /subjects @@ -434,7 +437,7 @@ SciViz: return dict(**kwargs) dj_query: > def dj_query(aeon_acquisition): - q = aeon_acquisition.ExperimentCamera.proj('camera_description') + q = dj.U('camera_description') & acquisition.ExperimentCamera return {'query': q, 'fetch_args': ['camera_description']} time_range_selector: x: 0 diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index a6d1868b..e2dce70a 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -45,22 +45,26 @@ x-aeon-ingest-common: &aeon-ingest-common max-file: "5" services: - ingest_high: + acquisition_worker: <<: *aeon-ingest-common - command: ["aeon_ingest", "high_priority", "--duration=-1", "--sleep=1200"] + command: ["aeon_ingest", "acquisition_worker"] - ingest_mid: + streams_worker: <<: *aeon-ingest-common depends_on: - ingest_high: + acquisition_worker: condition: service_started deploy: mode: replicated - replicas: 2 - command: ["aeon_ingest", "mid_priority", "--duration=-1", "--sleep=3600"] + replicas: 3 + command: ["aeon_ingest", "streams_worker"] - dev: + ingest_mid: <<: *aeon-ingest-common + depends_on: + acquisition_worker: + condition: service_started deploy: mode: replicated - replicas: 0 + replicas: 2 + command: ["aeon_ingest", "mid_priority"]