From 54cca8f82975b30e61077939a2ff8fb3e418dc43 Mon Sep 17 00:00:00 2001 From: Thinh Nguyen Date: Mon, 17 Apr 2023 15:48:35 -0500 Subject: [PATCH 01/12] Update video.py --- aeon/dj_pipeline/utils/video.py | 8 -------- 1 file changed, 8 deletions(-) diff --git a/aeon/dj_pipeline/utils/video.py b/aeon/dj_pipeline/utils/video.py index 15cdbfb7..dc54a60d 100644 --- a/aeon/dj_pipeline/utils/video.py +++ b/aeon/dj_pipeline/utils/video.py @@ -10,9 +10,6 @@ import aeon.io.reader as io_reader -camera_name = "CameraTop" -start_time = datetime.datetime(2022, 4, 3, 13, 0, 0) -end_time = datetime.datetime(2022, 4, 3, 15, 0, 0) raw_data_dir = pathlib.Path("/ceph/aeon/aeon/data/raw/AEON2/experiment0.2") @@ -39,11 +36,6 @@ def retrieve_video_frames( framedata = videodata[start_frame : start_frame + chunk_size] - # downsample - # actual_fps = 1 / np.median(np.diff(videodata.index) / np.timedelta64(1, "s")) - # final_fps = min(desired_fps, actual_fps) - # ds_factor = int(np.around(actual_fps / final_fps)) - # framedata = videodata[::ds_factor] final_fps = desired_fps # read frames From 6e3096539bf3c895fc89147b8dba96cb8a217fae Mon Sep 17 00:00:00 2001 From: Thinh Nguyen Date: Tue, 25 Apr 2023 15:44:06 -0500 Subject: [PATCH 02/12] Update video.py --- aeon/dj_pipeline/utils/video.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/aeon/dj_pipeline/utils/video.py b/aeon/dj_pipeline/utils/video.py index dc54a60d..70d1e1c7 100644 --- a/aeon/dj_pipeline/utils/video.py +++ b/aeon/dj_pipeline/utils/video.py @@ -5,15 +5,15 @@ import datetime import cv2 +from aeon.dj_pipeline import acquisition + from aeon.io import api as io_api from aeon.io import video as io_video import aeon.io.reader as io_reader -raw_data_dir = pathlib.Path("/ceph/aeon/aeon/data/raw/AEON2/experiment0.2") - - def retrieve_video_frames( + experiment_name, camera_name, start_time, end_time, @@ -22,6 +22,10 @@ def retrieve_video_frames( chunk_size=50, **kwargs, ): + raw_data_dir = acquisition.Experiment.get_data_directory( + {"experiment_name": experiment_name} + ) + # do some data loading videodata = io_api.load( root=raw_data_dir.as_posix(), From 5dd508e040d969f97fdfad3f3595145b30365f4f Mon Sep 17 00:00:00 2001 From: Thinh Nguyen Date: Tue, 25 Apr 2023 15:44:13 -0500 Subject: [PATCH 03/12] update sciviz version --- .../webapps/sciviz/docker-compose-local.yaml | 36 ++++--------------- .../webapps/sciviz/docker-compose-remote.yaml | 16 ++++----- 2 files changed, 13 insertions(+), 39 deletions(-) diff --git a/aeon/dj_pipeline/webapps/sciviz/docker-compose-local.yaml b/aeon/dj_pipeline/webapps/sciviz/docker-compose-local.yaml index 597d2f3c..29bca674 100644 --- a/aeon/dj_pipeline/webapps/sciviz/docker-compose-local.yaml +++ b/aeon/dj_pipeline/webapps/sciviz/docker-compose-local.yaml @@ -7,14 +7,14 @@ services: pharus: cpus: 2.0 mem_limit: 4g - image: jverswijver/pharus:0.8.0py3.9 + image: datajoint/pharus:0.8.4 environment: # - FLASK_ENV=development # enables logging to console from Flask - - PHARUS_SPEC_PATH=/main/specs/specsheet.yaml # for dynamic utils spec + - PHARUS_SPEC_PATH=/main/specsheet.yaml # for dynamic utils spec - PHARUS_MODE=DEV user: ${HOST_UID}:anaconda volumes: - - ./specsheet.yaml:/main/specs/specsheet.yaml #copy the spec over to /main/specs/YOUR_SPEC_NAME + - ./specsheet.yaml:/main/specsheet.yaml #copy the spec over to /main/specs/YOUR_SPEC_NAME - ./apk_requirements.txt:/tmp/apk_requirements.txt - /ceph/aeon/aeon:/ceph/aeon/aeon command: @@ -52,11 +52,11 @@ services: sci-viz: cpus: 2.0 mem_limit: 16g - image: jverswijver/sci-viz:1.1.1-bugfix2 + image: datajoint/sci-viz:2.3.0 environment: - CHOKIDAR_USEPOLLING=true - REACT_APP_DJSCIVIZ_BACKEND_PREFIX=/api - - DJSCIVIZ_SPEC_PATH=specsheet.yaml + - DJSCIVIZ_SPEC_PATH=/main/specsheet.yaml - DJSCIVIZ_MODE=DEV - NODE_OPTIONS="--max-old-space-size=12000" volumes: @@ -67,31 +67,7 @@ services: - sh - -c - | - sciviz_update() { - [ -z "$$NGINX_PID" ] || kill $$NGINX_PID - rm -R /usr/share/nginx/html - python frontend_gen.py - yarn build - mv ./build /usr/share/nginx/html - nginx -g "daemon off;" & - NGINX_PID=$$! - } - sciviz_update - echo "[$$(date -u '+%Y-%m-%d %H:%M:%S')][DataJoint]: Monitoring SciViz updates..." - INIT_TIME=$$(date +%s) - LAST_MOD_TIME=$$(date -r $$DJSCIVIZ_SPEC_PATH +%s) - DELTA=$$(expr $$LAST_MOD_TIME - $$INIT_TIME) - while true; do - CURR_LAST_MOD_TIME=$$(date -r $$DJSCIVIZ_SPEC_PATH +%s) - CURR_DELTA=$$(expr $$CURR_LAST_MOD_TIME - $$INIT_TIME) - if [ "$$DELTA" -lt "$$CURR_DELTA" ]; then - echo "[$$(date -u '+%Y-%m-%d %H:%M:%S')][DataJoint]: Reloading SciViz since \`$$DJSCIVIZ_SPEC_PATH\` changed." - sciviz_update - DELTA=$$CURR_DELTA - else - sleep 5 - fi - done + sh sci-viz-hotreload-dev.sh networks: - main fakeservices.datajoint.io: diff --git a/aeon/dj_pipeline/webapps/sciviz/docker-compose-remote.yaml b/aeon/dj_pipeline/webapps/sciviz/docker-compose-remote.yaml index 97ef5a67..4791edcc 100644 --- a/aeon/dj_pipeline/webapps/sciviz/docker-compose-remote.yaml +++ b/aeon/dj_pipeline/webapps/sciviz/docker-compose-remote.yaml @@ -6,14 +6,15 @@ services: pharus: cpus: 2.0 mem_limit: 4g - image: jverswijver/pharus:0.8.0py3.9 + image: datajoint/pharus:0.8.4 environment: # - FLASK_ENV=development # enables logging to console from Flask - - PHARUS_SPEC_PATH=/main/specs/specsheet.yaml # for dynamic utils spec + - PHARUS_SPEC_PATH=/main/specsheet.yaml # for dynamic utils spec user: ${HOST_UID}:anaconda volumes: - - ./specsheet.yaml:/main/specs/specsheet.yaml #copy the spec over to /main/specs/YOUR_SPEC_NAME + - ./specsheet.yaml:/main/specsheet.yaml #copy the spec over to /main/specs/YOUR_SPEC_NAME - ./apk_requirements.txt:/tmp/apk_requirements.txt + - /ceph/aeon/aeon:/ceph/aeon/aeon command: - sh - -c @@ -29,11 +30,11 @@ services: sci-viz: cpus: 2.0 mem_limit: 16g - image: jverswijver/sci-viz:1.1.1-bugfix2 + image: datajoint/sci-viz:2.3.0 environment: - CHOKIDAR_USEPOLLING=true - REACT_APP_DJSCIVIZ_BACKEND_PREFIX=/aeon/utils - - DJSCIVIZ_SPEC_PATH=specsheet.yaml + - DJSCIVIZ_SPEC_PATH=/main/specsheet.yaml - NODE_OPTIONS="--max-old-space-size=12000" volumes: - ./specsheet.yaml:/main/specsheet.yaml @@ -43,10 +44,7 @@ services: - sh - -c - | - python frontend_gen.py - npm run build - mv ./build /usr/share/nginx/html - nginx -g "daemon off;" + sh sci-viz-hotreload-prod.sh networks: - main reverse-proxy: From 20144c97c47c1b38d4c7a640ab5cb846e1e99f41 Mon Sep 17 00:00:00 2001 From: Thinh Nguyen Date: Tue, 25 Apr 2023 17:19:53 -0500 Subject: [PATCH 04/12] Update specsheet.yaml --- aeon/dj_pipeline/webapps/sciviz/specsheet.yaml | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/aeon/dj_pipeline/webapps/sciviz/specsheet.yaml b/aeon/dj_pipeline/webapps/sciviz/specsheet.yaml index c666c44c..8b40263a 100644 --- a/aeon/dj_pipeline/webapps/sciviz/specsheet.yaml +++ b/aeon/dj_pipeline/webapps/sciviz/specsheet.yaml @@ -36,7 +36,7 @@ SciViz: pages: Colony: - route: /colony_entry + route: /colony_page grids: grid1: type: fixed @@ -68,6 +68,19 @@ SciViz: - type: attribute input: Note destination: note + Colony: + route: /colony_page_table + x: 0 + y: 0 + height: 1 + width: 1 + type: antd-table + restriction: > + def restriction(**kwargs): + return dict(**kwargs) + dj_query: > + def dj_query(aeon_lab): + return {'query': aeon_lab.Colony(), 'fetch_args': []} Subjects: route: /subjects @@ -422,7 +435,7 @@ SciViz: return dict(**kwargs) dj_query: > def dj_query(aeon_acquisition): - q = aeon_acquisition.ExperimentCamera.proj('camera_description') + q = dj.U('camera_description') & acquisition.ExperimentCamera return {'query': q, 'fetch_args': ['camera_description']} time_range_selector: x: 0 From 6d38d688d41ca67b40ce2223189bbbe10c543066 Mon Sep 17 00:00:00 2001 From: Thinh Nguyen Date: Tue, 25 Apr 2023 17:28:35 -0500 Subject: [PATCH 05/12] pharus with python 3.9 --- aeon/dj_pipeline/webapps/sciviz/docker-compose-remote.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aeon/dj_pipeline/webapps/sciviz/docker-compose-remote.yaml b/aeon/dj_pipeline/webapps/sciviz/docker-compose-remote.yaml index 4791edcc..effb43c6 100644 --- a/aeon/dj_pipeline/webapps/sciviz/docker-compose-remote.yaml +++ b/aeon/dj_pipeline/webapps/sciviz/docker-compose-remote.yaml @@ -6,7 +6,7 @@ services: pharus: cpus: 2.0 mem_limit: 4g - image: datajoint/pharus:0.8.4 + image: jverswijver/pharus:0.8.0py3.9 environment: # - FLASK_ENV=development # enables logging to console from Flask - PHARUS_SPEC_PATH=/main/specsheet.yaml # for dynamic utils spec From 833f013a035fa8cb7e40f564fa1a74020ab1bc7d Mon Sep 17 00:00:00 2001 From: Thinh Nguyen Date: Tue, 25 Apr 2023 17:35:24 -0500 Subject: [PATCH 06/12] Update video.py --- aeon/dj_pipeline/utils/video.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/aeon/dj_pipeline/utils/video.py b/aeon/dj_pipeline/utils/video.py index 70d1e1c7..c2ccdb13 100644 --- a/aeon/dj_pipeline/utils/video.py +++ b/aeon/dj_pipeline/utils/video.py @@ -5,8 +5,6 @@ import datetime import cv2 -from aeon.dj_pipeline import acquisition - from aeon.io import api as io_api from aeon.io import video as io_video import aeon.io.reader as io_reader @@ -22,6 +20,8 @@ def retrieve_video_frames( chunk_size=50, **kwargs, ): + from aeon.dj_pipeline import acquisition + raw_data_dir = acquisition.Experiment.get_data_directory( {"experiment_name": experiment_name} ) From 93d0f3d2c1b42d3610e864ed5d6f38fdd150028a Mon Sep 17 00:00:00 2001 From: Thinh Nguyen Date: Mon, 3 Jul 2023 16:30:41 -0500 Subject: [PATCH 07/12] refactor - explicit `streams_maker` --- aeon/dj_pipeline/acquisition.py | 16 +- aeon/dj_pipeline/populate/worker.py | 6 +- aeon/dj_pipeline/streams.py | 250 +----------------- aeon/dj_pipeline/streams_maker.py | 331 ++++++++++++++++++++++++ aeon/dj_pipeline/utils/load_metadata.py | 82 +++--- 5 files changed, 400 insertions(+), 285 deletions(-) create mode 100644 aeon/dj_pipeline/streams_maker.py diff --git a/aeon/dj_pipeline/acquisition.py b/aeon/dj_pipeline/acquisition.py index 585cf2bc..9a503a3a 100644 --- a/aeon/dj_pipeline/acquisition.py +++ b/aeon/dj_pipeline/acquisition.py @@ -15,7 +15,6 @@ logger = dj.logger schema = dj.schema(get_schema_name("acquisition")) -# streams = dj.VirtualModule("streams", get_schema_name("streams")) # ------------------- Some Constants -------------------------- @@ -124,7 +123,7 @@ class Directory(dj.Part): @classmethod def get_data_directory(cls, experiment_key, directory_type="raw", as_posix=False): - + try: repo_name, dir_path = ( cls.Directory & experiment_key & {"directory_type": directory_type} @@ -135,6 +134,7 @@ def get_data_directory(cls, experiment_key, directory_type="raw", as_posix=False return data_directory.as_posix() if as_posix else data_directory except dj.errors.DataJointError: return + @classmethod def get_data_directories( cls, experiment_key, directory_types=["raw"], as_posix=False @@ -277,14 +277,14 @@ def ingest_epochs(cls, experiment_name, start=None, end=None): - if not specified, ingest all epochs Note: "start" and "end" are datetime specified a string in the format: "%Y-%m-%d %H:%M:%S" """ - from aeon.dj_pipeline import streams + from aeon.dj_pipeline import streams_maker from .utils.load_metadata import ( extract_epoch_config, ingest_epoch_metadata, insert_device_types, ) - + device_name = _ref_device_mapping.get(experiment_name, "CameraTop") all_chunks, raw_data_dirs = _get_all_chunks(experiment_name, device_name) @@ -363,15 +363,17 @@ def ingest_epochs(cls, experiment_name, start=None, end=None): if epoch_config: cls.Config.insert1(epoch_config) if metadata_yml_filepath and metadata_yml_filepath.exists(): - + try: - # Ingest streams.DeviceType, streams.Device and create device tables. + # Insert new entries for streams.DeviceType, streams.Device. insert_device_types( _device_schema_mapping[epoch_key["experiment_name"]], metadata_yml_filepath, ) - streams.main(context=streams.__dict__) # create device tables under streams schema + # Define and instantiate new devices/stream tables under `streams` schema + streams_maker.main() with cls.connection.transaction: + # Insert devices' installation/removal/settings ingest_epoch_metadata( experiment_name, metadata_yml_filepath ) diff --git a/aeon/dj_pipeline/populate/worker.py b/aeon/dj_pipeline/populate/worker.py index 7b1678dc..395023b0 100644 --- a/aeon/dj_pipeline/populate/worker.py +++ b/aeon/dj_pipeline/populate/worker.py @@ -12,11 +12,13 @@ db_prefix, qc, report, - streams, + streams_maker, tracking, ) from aeon.dj_pipeline.utils import load_metadata +streams = streams_maker.main() + __all__ = [ "high_priority", "mid_priority", @@ -98,5 +100,5 @@ ) for attr in vars(streams).values(): - if is_djtable(attr) and hasattr(attr, "populate"): + if is_djtable(attr, dj.user_tables.AutoPopulate): streams_worker(attr) diff --git a/aeon/dj_pipeline/streams.py b/aeon/dj_pipeline/streams.py index fbdb93d8..f642a62f 100644 --- a/aeon/dj_pipeline/streams.py +++ b/aeon/dj_pipeline/streams.py @@ -1,5 +1,5 @@ -import inspect -import os +#---- DO NOT MODIFY ---- +#---- THIS FILE IS AUTO-GENERATED BY `streams_maker.py` ---- import datajoint as dj import pandas as pd @@ -8,17 +8,11 @@ from aeon.dj_pipeline import acquisition, get_schema_name from aeon.io import api as io_api -logger = dj.logger +schema_name = get_schema_name('streams') +schema = dj.Schema() -# schema_name = f'u_{dj.config["database.user"]}_streams' # for testing -schema_name = get_schema_name("streams") -schema = dj.schema(schema_name) - -schema.spawn_missing_classes() - - -@schema +@schema class StreamType(dj.Lookup): """ Catalog of all steam types for the different device types used across Project Aeon @@ -38,7 +32,7 @@ class StreamType(dj.Lookup): """ -@schema +@schema class DeviceType(dj.Lookup): """ Catalog of all device types used across Project Aeon @@ -57,7 +51,7 @@ class Stream(dj.Part): """ -@schema +@schema class Device(dj.Lookup): definition = """ # Physical devices, of a particular type, identified by unique serial number device_serial_number: varchar(12) @@ -66,233 +60,3 @@ class Device(dj.Lookup): """ -# region Helper functions for creating device tables. - - -def get_device_template(device_type: str): - """Returns table class template for ExperimentDevice""" - device_title = device_type - device_type = dj.utils.from_camel_case(device_type) - - class ExperimentDevice(dj.Manual): - definition = f""" - # {device_title} placement and operation for a particular time period, at a certain location, for a given experiment (auto-generated with aeon_mecha-{aeon.__version__}) - -> acquisition.Experiment - -> Device - {device_type}_install_time : datetime(6) # time of the {device_type} placed and started operation at this position - --- - {device_type}_name : varchar(36) - """ - - class Attribute(dj.Part): - definition = """ # metadata/attributes (e.g. FPS, config, calibration, etc.) associated with this experimental device - -> master - attribute_name : varchar(32) - --- - attribute_value=null : longblob - """ - - class RemovalTime(dj.Part): - definition = f""" - -> master - --- - {device_type}_removal_time: datetime(6) # time of the {device_type} being removed - """ - - ExperimentDevice.__name__ = f"{device_title}" - - return ExperimentDevice - - -def get_device_stream_template(device_type: str, stream_type: str): - """Returns table class template for DeviceDataStream""" - - context = inspect.currentframe().f_back.f_locals["context"] - ExperimentDevice = context[device_type] - - # DeviceDataStream table(s) - stream_detail = ( - StreamType - & (DeviceType.Stream & {"device_type": device_type, "stream_type": stream_type}) - ).fetch1() - - for i, n in enumerate(stream_detail["stream_reader"].split(".")): - if i == 0: - reader = aeon - else: - reader = getattr(reader, n) - - stream = reader(**stream_detail["stream_reader_kwargs"]) - - table_definition = f""" # Raw per-chunk {stream_type} data stream from {device_type} (auto-generated with aeon_mecha-{aeon.__version__}) - -> {device_type} - -> acquisition.Chunk - --- - sample_count: int # number of data points acquired from this stream for a given chunk - timestamps: longblob # (datetime) timestamps of {stream_type} data - """ - - for col in stream.columns: - if col.startswith("_"): - continue - table_definition += f"{col}: longblob\n\t\t\t" - - class DeviceDataStream(dj.Imported): - definition = table_definition - _stream_reader = reader - _stream_detail = stream_detail - - @property - def key_source(self): - f""" - Only the combination of Chunk and {device_type} with overlapping time - + Chunk(s) that started after {device_type} install time and ended before {device_type} remove time - + Chunk(s) that started after {device_type} install time for {device_type} that are not yet removed - """ - return ( - acquisition.Chunk - * ExperimentDevice.join(ExperimentDevice.RemovalTime, left=True) - & f"chunk_start >= {dj.utils.from_camel_case(device_type)}_install_time" - & f'chunk_start < IFNULL({dj.utils.from_camel_case(device_type)}_removal_time, "2200-01-01")' - ) - - def make(self, key): - chunk_start, chunk_end, dir_type = (acquisition.Chunk & key).fetch1( - "chunk_start", "chunk_end", "directory_type" - ) - raw_data_dir = acquisition.Experiment.get_data_directory( - key, directory_type=dir_type - ) - - device_name = (ExperimentDevice & key).fetch1(f"{dj.utils.from_camel_case(device_type)}_name") - - stream = self._stream_reader( - **{ - k: v.format(**{k: device_name}) if k == "pattern" else v - for k, v in self._stream_detail["stream_reader_kwargs"].items() - } - ) - - stream_data = io_api.load( - root=raw_data_dir.as_posix(), - reader=stream, - start=pd.Timestamp(chunk_start), - end=pd.Timestamp(chunk_end), - ) - - self.insert1( - { - **key, - "sample_count": len(stream_data), - "timestamps": stream_data.index.values, - **{ - c: stream_data[c].values - for c in stream.columns - if not c.startswith("_") - }, - } - ) - - DeviceDataStream.__name__ = f"{device_type}{stream_type}" - - return DeviceDataStream - -# endregion - - -def main(context=None): - - import re - if context is None: - context = inspect.currentframe().f_back.f_locals - - # Create DeviceType tables. - for device_info in (DeviceType).fetch(as_dict=True): - if device_info["device_type"] not in locals(): - table_class = get_device_template(device_info["device_type"]) - context[table_class.__name__] = table_class - schema(table_class, context=context) - - device_table_def = inspect.getsource(table_class).lstrip() - replacements = { - "ExperimentDevice": device_info["device_type"], - "{device_title}": dj.utils.from_camel_case(device_info["device_type"]), - "{device_type}": dj.utils.from_camel_case(device_info["device_type"]), - "{aeon.__version__}": aeon.__version__ - } - for old, new in replacements.items(): - device_table_def = device_table_def.replace(old, new) - full_def = "@schema \n" + device_table_def + "\n\n" - if os.path.exists("existing_module.py"): - with open("existing_module.py", "r") as f: - existing_content = f.read() - - if full_def in existing_content: - continue - - with open("existing_module.py", "a") as f: - f.write(full_def) - else: - with open("existing_module.py", "w") as f: - full_def = """import datajoint as dj\nimport pandas as pd\n\nimport aeon\nfrom aeon.dj_pipeline import acquisition\nfrom aeon.io import api as io_api\n\n""" + full_def - f.write(full_def) - - # Create DeviceDataStream tables. - for device_info in (DeviceType.Stream).fetch(as_dict=True): - - device_type = device_info['device_type'] - stream_type = device_info['stream_type'] - table_name = f"{device_type}{stream_type}" - - if table_name not in locals(): - table_class = get_device_stream_template( - device_type, stream_type) - context[table_class.__name__] = table_class - schema(table_class, context=context) - - stream_obj = table_class.__dict__["_stream_reader"] - reader = stream_obj.__module__ + '.' + stream_obj.__name__ - stream_detail = table_class.__dict__["_stream_detail"] - - device_stream_table_def = inspect.getsource(table_class).lstrip() - - old_definition = f"""# Raw per-chunk {stream_type} data stream from {device_type} (auto-generated with aeon_mecha-{aeon.__version__}) - -> {device_type} - -> acquisition.Chunk - --- - sample_count: int # number of data points acquired from this stream for a given chunk - timestamps: longblob # (datetime) timestamps of {stream_type} data - """ - - replacements = { - "DeviceDataStream": f"{device_type}{stream_type}","ExperimentDevice": device_type, - 'f"chunk_start >= {dj.utils.from_camel_case(device_type)}_install_time"': f"'chunk_start >= {dj.utils.from_camel_case(device_type)}_install_time'", - """f'chunk_start < IFNULL({dj.utils.from_camel_case(device_type)}_removal_time, "2200-01-01")'""": f"""'chunk_start < IFNULL({dj.utils.from_camel_case(device_type)}_removal_time, "2200-01-01")'""", - 'f"{dj.utils.from_camel_case(device_type)}_name"': f"'{dj.utils.from_camel_case(device_type)}_name'", - "{device_type}": device_type, - "{stream_type}": stream_type, - "{aeon.__version__}": aeon.__version__, - } - for old, new in replacements.items(): - new_definition = old_definition.replace(old, new) - - replacements["table_definition"] = '"""'+new_definition+'"""' - - for old, new in replacements.items(): - device_stream_table_def = device_stream_table_def.replace(old, new) - - device_stream_table_def = re.sub(r'_stream_reader\s*=\s*reader', f'_stream_reader = {reader}', device_stream_table_def) # insert reader - device_stream_table_def = re.sub(r'_stream_detail\s*=\s*stream_detail', f'_stream_detail = {stream_detail}', device_stream_table_def) # insert stream details - - full_def = "@schema \n" + device_stream_table_def + "\n\n" - - with open("existing_module.py", "r") as f: - existing_content = f.read() - - if full_def in existing_content: - continue - - with open("existing_module.py", "a") as f: - f.write(full_def) - -main() \ No newline at end of file diff --git a/aeon/dj_pipeline/streams_maker.py b/aeon/dj_pipeline/streams_maker.py new file mode 100644 index 00000000..0bc19ea5 --- /dev/null +++ b/aeon/dj_pipeline/streams_maker.py @@ -0,0 +1,331 @@ +import inspect +from pathlib import Path +import datajoint as dj +import pandas as pd +import re +import importlib + +import aeon +from aeon.dj_pipeline import acquisition, get_schema_name +from aeon.io import api as io_api + +logger = dj.logger + + +# schema_name = f'u_{dj.config["database.user"]}_streams' # for testing +schema_name = get_schema_name("streams") + +STREAMS_MODULE_NAME = "streams" +_STREAMS_MODULE_FILE = Path(__file__).parent / f"{STREAMS_MODULE_NAME}.py" + + +class StreamType(dj.Lookup): + """ + Catalog of all steam types for the different device types used across Project Aeon + One StreamType corresponds to one reader class in `aeon.io.reader` + The combination of `stream_reader` and `stream_reader_kwargs` should fully specify + the data loading routine for a particular device, using the `aeon.io.utils` + """ + + definition = """ # Catalog of all stream types used across Project Aeon + stream_type : varchar(20) + --- + stream_reader : varchar(256) # name of the reader class found in `aeon_mecha` package (e.g. aeon.io.reader.Video) + stream_reader_kwargs : longblob # keyword arguments to instantiate the reader class + stream_description='': varchar(256) + stream_hash : uuid # hash of dict(stream_reader_kwargs, stream_reader=stream_reader) + unique index (stream_hash) + """ + + +class DeviceType(dj.Lookup): + """ + Catalog of all device types used across Project Aeon + """ + + definition = """ # Catalog of all device types used across Project Aeon + device_type: varchar(36) + --- + device_description='': varchar(256) + """ + + class Stream(dj.Part): + definition = """ # Data stream(s) associated with a particular device type + -> master + -> StreamType + """ + + +class Device(dj.Lookup): + definition = """ # Physical devices, of a particular type, identified by unique serial number + device_serial_number: varchar(12) + --- + -> DeviceType + """ + + +# region Helper functions for creating device tables. + + +def get_device_template(device_type: str): + """Returns table class template for ExperimentDevice""" + device_title = device_type + device_type = dj.utils.from_camel_case(device_type) + + class ExperimentDevice(dj.Manual): + definition = f""" + # {device_title} placement and operation for a particular time period, at a certain location, for a given experiment (auto-generated with aeon_mecha-{aeon.__version__}) + -> acquisition.Experiment + -> Device + {device_type}_install_time : datetime(6) # time of the {device_type} placed and started operation at this position + --- + {device_type}_name : varchar(36) + """ + + class Attribute(dj.Part): + definition = """ # metadata/attributes (e.g. FPS, config, calibration, etc.) associated with this experimental device + -> master + attribute_name : varchar(32) + --- + attribute_value=null : longblob + """ + + class RemovalTime(dj.Part): + definition = f""" + -> master + --- + {device_type}_removal_time: datetime(6) # time of the {device_type} being removed + """ + + ExperimentDevice.__name__ = f"{device_title}" + + return ExperimentDevice + + +def get_device_stream_template(device_type: str, stream_type: str): + """Returns table class template for DeviceDataStream""" + + context = inspect.currentframe().f_back.f_locals["context"] + ExperimentDevice = context[device_type] + + # DeviceDataStream table(s) + stream_detail = ( + StreamType + & (DeviceType.Stream & {"device_type": device_type, "stream_type": stream_type}) + ).fetch1() + + for i, n in enumerate(stream_detail["stream_reader"].split(".")): + if i == 0: + reader = aeon + else: + reader = getattr(reader, n) + + stream = reader(**stream_detail["stream_reader_kwargs"]) + + table_definition = f""" # Raw per-chunk {stream_type} data stream from {device_type} (auto-generated with aeon_mecha-{aeon.__version__}) + -> {device_type} + -> acquisition.Chunk + --- + sample_count: int # number of data points acquired from this stream for a given chunk + timestamps: longblob # (datetime) timestamps of {stream_type} data + """ + + for col in stream.columns: + if col.startswith("_"): + continue + table_definition += f"{col}: longblob\n\t\t\t" + + class DeviceDataStream(dj.Imported): + definition = table_definition + _stream_reader = reader + _stream_detail = stream_detail + + @property + def key_source(self): + f""" + Only the combination of Chunk and {device_type} with overlapping time + + Chunk(s) that started after {device_type} install time and ended before {device_type} remove time + + Chunk(s) that started after {device_type} install time for {device_type} that are not yet removed + """ + return ( + acquisition.Chunk + * ExperimentDevice.join(ExperimentDevice.RemovalTime, left=True) + & f"chunk_start >= {dj.utils.from_camel_case(device_type)}_install_time" + & f'chunk_start < IFNULL({dj.utils.from_camel_case(device_type)}_removal_time, "2200-01-01")' + ) + + def make(self, key): + chunk_start, chunk_end, dir_type = (acquisition.Chunk & key).fetch1( + "chunk_start", "chunk_end", "directory_type" + ) + raw_data_dir = acquisition.Experiment.get_data_directory( + key, directory_type=dir_type + ) + + device_name = (ExperimentDevice & key).fetch1( + f"{dj.utils.from_camel_case(device_type)}_name" + ) + + stream = self._stream_reader( + **{ + k: v.format(**{k: device_name}) if k == "pattern" else v + for k, v in self._stream_detail["stream_reader_kwargs"].items() + } + ) + + stream_data = io_api.load( + root=raw_data_dir.as_posix(), + reader=stream, + start=pd.Timestamp(chunk_start), + end=pd.Timestamp(chunk_end), + ) + + self.insert1( + { + **key, + "sample_count": len(stream_data), + "timestamps": stream_data.index.values, + **{ + c: stream_data[c].values + for c in stream.columns + if not c.startswith("_") + }, + } + ) + + DeviceDataStream.__name__ = f"{device_type}{stream_type}" + + return DeviceDataStream + + +# endregion + + +def main(create_tables=True): + + if not _STREAMS_MODULE_FILE.exists(): + with open(_STREAMS_MODULE_FILE, "w") as f: + imports_str = ( + "#---- DO NOT MODIFY ----\n" + "#---- THIS FILE IS AUTO-GENERATED BY `streams_maker.py` ----\n\n" + "import datajoint as dj\n" + "import pandas as pd\n\n" + "import aeon\n" + "from aeon.dj_pipeline import acquisition, get_schema_name\n" + "from aeon.io import api as io_api\n\n" + "schema_name = get_schema_name('streams')\n" + "schema = dj.Schema()\n\n\n" + ) + f.write(imports_str) + for table_class in (StreamType, DeviceType, Device): + device_table_def = inspect.getsource(table_class).lstrip() + full_def = "@schema \n" + device_table_def + "\n\n" + f.write(full_def) + + streams = importlib.import_module(f"aeon.dj_pipeline.{STREAMS_MODULE_NAME}") + streams.schema.activate(schema_name) + + if create_tables: + # Create DeviceType tables. + for device_info in streams.DeviceType.fetch(as_dict=True): + if hasattr(streams, device_info["device_type"]): + continue + + table_class = get_device_template(device_info["device_type"]) + # context[table_class.__name__] = table_class + # schema(table_class, context=context) + + device_table_def = inspect.getsource(table_class).lstrip() + replacements = { + "ExperimentDevice": device_info["device_type"], + "{device_title}": dj.utils.from_camel_case(device_info["device_type"]), + "{device_type}": dj.utils.from_camel_case(device_info["device_type"]), + "{aeon.__version__}": aeon.__version__, + } + for old, new in replacements.items(): + device_table_def = device_table_def.replace(old, new) + full_def = "@schema \n" + device_table_def + "\n\n" + with open(_STREAMS_MODULE_FILE, "r") as f: + existing_content = f.read() + + if full_def in existing_content: + continue + + with open(_STREAMS_MODULE_FILE, "a") as f: + f.write(full_def) + + # Create DeviceDataStream tables. + for device_info in streams.DeviceType.Stream.fetch(as_dict=True): + + device_type = device_info["device_type"] + stream_type = device_info["stream_type"] + table_name = f"{device_type}{stream_type}" + + if hasattr(streams, table_name): + continue + + table_class = get_device_stream_template(device_type, stream_type) + # context[table_class.__name__] = table_class + # schema(table_class, context=context) + + stream_obj = table_class.__dict__["_stream_reader"] + reader = stream_obj.__module__ + "." + stream_obj.__name__ + stream_detail = table_class.__dict__["_stream_detail"] + + device_stream_table_def = inspect.getsource(table_class).lstrip() + + old_definition = f"""# Raw per-chunk {stream_type} data stream from {device_type} (auto-generated with aeon_mecha-{aeon.__version__}) + -> {device_type} + -> acquisition.Chunk + --- + sample_count: int # number of data points acquired from this stream for a given chunk + timestamps: longblob # (datetime) timestamps of {stream_type} data + """ + + replacements = { + "DeviceDataStream": f"{device_type}{stream_type}", + "ExperimentDevice": device_type, + 'f"chunk_start >= {dj.utils.from_camel_case(device_type)}_install_time"': f"'chunk_start >= {dj.utils.from_camel_case(device_type)}_install_time'", + """f'chunk_start < IFNULL({dj.utils.from_camel_case(device_type)}_removal_time, "2200-01-01")'""": f"""'chunk_start < IFNULL({dj.utils.from_camel_case(device_type)}_removal_time, "2200-01-01")'""", + 'f"{dj.utils.from_camel_case(device_type)}_name"': f"'{dj.utils.from_camel_case(device_type)}_name'", + "{device_type}": device_type, + "{stream_type}": stream_type, + "{aeon.__version__}": aeon.__version__, + } + for old, new in replacements.items(): + new_definition = old_definition.replace(old, new) + + replacements["table_definition"] = '"""' + new_definition + '"""' + + for old, new in replacements.items(): + device_stream_table_def = device_stream_table_def.replace(old, new) + + device_stream_table_def = re.sub( + r"_stream_reader\s*=\s*reader", + f"_stream_reader = {reader}", + device_stream_table_def, + ) # insert reader + device_stream_table_def = re.sub( + r"_stream_detail\s*=\s*stream_detail", + f"_stream_detail = {stream_detail}", + device_stream_table_def, + ) # insert stream details + + full_def = "@schema \n" + device_stream_table_def + "\n\n" + + with open(_STREAMS_MODULE_FILE, "r") as f: + existing_content = f.read() + + if full_def in existing_content: + continue + + with open(_STREAMS_MODULE_FILE, "a") as f: + f.write(full_def) + + importlib.reload(streams) + streams.schema.activate(schema_name) + + return streams + + +streams = main() diff --git a/aeon/dj_pipeline/utils/load_metadata.py b/aeon/dj_pipeline/utils/load_metadata.py index 0650a8f7..090b6b19 100644 --- a/aeon/dj_pipeline/utils/load_metadata.py +++ b/aeon/dj_pipeline/utils/load_metadata.py @@ -2,7 +2,6 @@ import inspect import json import pathlib -import re from collections import defaultdict from pathlib import Path @@ -11,16 +10,10 @@ import pandas as pd from dotmap import DotMap -from aeon.dj_pipeline import ( - acquisition, - dict_to_uuid, - get_schema_name, - lab, - streams, - subject, -) +from aeon.dj_pipeline import acquisition, dict_to_uuid, subject, streams_maker from aeon.io import api as io_api + _weight_scale_rate = 100 _weight_scale_nest = 1 _colony_csv_path = pathlib.Path("/ceph/aeon/aeon/colony/colony.csv") @@ -43,6 +36,8 @@ def insert_stream_types(): """Insert into streams.streamType table all streams in the dataset schema.""" from aeon.schema import dataset + streams = dj.VirtualModule("streams", streams_maker.STREAMS_MODULE_NAME) + schemas = [v for v in dataset.__dict__.values() if isinstance(v, DotMap)] for schema in schemas: @@ -64,6 +59,8 @@ def insert_stream_types(): def insert_device_types(schema: DotMap, metadata_yml_filepath: Path): """Use dataset.schema and metadata.yml to insert into streams.DeviceType and streams.Device. Only insert device types that were defined both in the device schema (e.g., exp02) and Metadata.yml. It then creates new device tables under streams schema.""" + streams = dj.VirtualModule("streams", streams_maker.STREAMS_MODULE_NAME) + device_info: dict[dict] = get_device_info(schema) device_type_mapper, device_sn = get_device_mapper(schema, metadata_yml_filepath) @@ -137,7 +134,7 @@ def extract_epoch_config(experiment_name: str, metadata_yml_filepath: str) -> di Returns: dict: epoch_config [dict] """ - + metadata_yml_filepath = pathlib.Path(metadata_yml_filepath) epoch_start = datetime.datetime.strptime( metadata_yml_filepath.parent.name, "%Y-%m-%dT%H-%M-%S" @@ -163,7 +160,9 @@ def extract_epoch_config(experiment_name: str, metadata_yml_filepath: str) -> di ) ) - if isinstance(devices, list): # In exp02, it is a list of dict. In presocial. It's a dict of dict. + if isinstance( + devices, list + ): # In exp02, it is a list of dict. In presocial. It's a dict of dict. devices: dict = { d.pop("Name"): d for d in devices } # {deivce_name: device_config} @@ -182,7 +181,8 @@ def ingest_epoch_metadata(experiment_name, metadata_yml_filepath): """ Make entries into device tables """ - + streams = dj.VirtualModule("streams", streams_maker.STREAMS_MODULE_NAME) + if experiment_name.startswith("oct"): ingest_epoch_metadata_octagon(experiment_name, metadata_yml_filepath) return @@ -209,14 +209,14 @@ def ingest_epoch_metadata(experiment_name, metadata_yml_filepath): schema = acquisition._device_schema_mapping[experiment_name] device_type_mapper, _ = get_device_mapper(schema, metadata_yml_filepath) - + # Insert into each device table device_list = [] device_removal_list = [] - + for device_name, device_config in epoch_config["metadata"].items(): if table := getattr(streams, device_type_mapper.get(device_name) or "", None): - + device_sn = device_config.get("SerialNumber", device_config.get("PortName")) device_key = {"device_serial_number": device_sn} @@ -263,38 +263,50 @@ def ingest_epoch_metadata(experiment_name, metadata_yml_filepath): for entry in table_attribute_entry ] - if dict_to_uuid({config["attribute_name"]: config["attribute_value"] for config in current_device_config}) == dict_to_uuid( - {config["attribute_name"]: config["attribute_value"] for config in new_device_config} + if dict_to_uuid( + { + config["attribute_name"]: config["attribute_value"] + for config in current_device_config + } + ) == dict_to_uuid( + { + config["attribute_name"]: config["attribute_value"] + for config in new_device_config + } ): # Skip if none of the configuration has changed. continue # Remove old device - device_removal_list.append({ - **current_device_query.fetch1("KEY"), - f"{dj.utils.from_camel_case(table.__name__)}_removal_time": epoch_config[ - "epoch_start" - ], - }) + device_removal_list.append( + { + **current_device_query.fetch1("KEY"), + f"{dj.utils.from_camel_case(table.__name__)}_removal_time": epoch_config[ + "epoch_start" + ], + } + ) # Insert into table. table.insert1(table_entry, skip_duplicates=True) table.Attribute.insert(table_attribute_entry, ignore_extra_fields=True) # Remove the currently installed devices that are absent in this config - device_removal = lambda device_type, device_entry: any(dj.utils. from_camel_case(device_type) in k for k in device_entry) # returns True if the device type is found in the attribute name - + device_removal = lambda device_type, device_entry: any( + dj.utils.from_camel_case(device_type) in k for k in device_entry + ) # returns True if the device type is found in the attribute name + for device_type in streams.DeviceType.fetch("device_type"): table = getattr(streams, device_type) device_removal_list.extend( - (table - table.RemovalTime - device_list & experiment_key - ).fetch("KEY")) # could be VideoSource or Patch - + (table - table.RemovalTime - device_list & experiment_key).fetch("KEY") + ) # could be VideoSource or Patch + for device_entry in device_removal_list: if device_removal(device_type, device_entry): table.RemovalTime.insert1(device_entry) - - + + # region Get stream & device information def get_stream_entries(schema: DotMap) -> list[dict]: """Returns a list of dictionaries containing the stream entries for a given device, @@ -474,10 +486,14 @@ def get_device_mapper(schema: DotMap, metadata_yml_filepath: Path): device_sn[item.Name] = ( item.SerialNumber or item.PortName or None ) # assign either the serial number (if it exists) or port name. If neither exists, assign None - elif isinstance(item, str): # presocial + elif isinstance(item, str): # presocial if meta_data.Devices[item].get("Type"): device_type_mapper[item] = meta_data.Devices[item].get("Type") - device_sn[item] = meta_data.Devices[item].get("SerialNumber") or meta_data.Devices[item].get("PortName") or None + device_sn[item] = ( + meta_data.Devices[item].get("SerialNumber") + or meta_data.Devices[item].get("PortName") + or None + ) with filename.open("w") as f: json.dump(device_type_mapper, f) @@ -491,7 +507,7 @@ def ingest_epoch_metadata_octagon(experiment_name, metadata_yml_filepath): """ Temporary ingestion routine to load devices' meta information for Octagon arena experiments """ - from aeon.dj_pipeline import streams + streams = dj.VirtualModule("streams", streams_maker.STREAMS_MODULE_NAME) oct01_devices = [ ("Metadata", "Metadata"), From 37cdfa26f54060feb66fb8532589ccb1aa7fa741 Mon Sep 17 00:00:00 2001 From: Thinh Nguyen Date: Mon, 3 Jul 2023 16:52:22 -0500 Subject: [PATCH 08/12] update streams for presocial --- aeon/dj_pipeline/streams.py | 808 ++++++++++++++++++++++++++++++ aeon/dj_pipeline/streams_maker.py | 24 +- 2 files changed, 821 insertions(+), 11 deletions(-) diff --git a/aeon/dj_pipeline/streams.py b/aeon/dj_pipeline/streams.py index f642a62f..120b41fb 100644 --- a/aeon/dj_pipeline/streams.py +++ b/aeon/dj_pipeline/streams.py @@ -3,6 +3,7 @@ import datajoint as dj import pandas as pd +from uuid import UUID import aeon from aeon.dj_pipeline import acquisition, get_schema_name @@ -60,3 +61,810 @@ class Device(dj.Lookup): """ +@schema +class Patch(dj.Manual): + definition = f""" + # patch placement and operation for a particular time period, at a certain location, for a given experiment (auto-generated with aeon_mecha-unknown) + -> acquisition.Experiment + -> Device + patch_install_time : datetime(6) # time of the patch placed and started operation at this position + --- + patch_name : varchar(36) + """ + + class Attribute(dj.Part): + definition = """ # metadata/attributes (e.g. FPS, config, calibration, etc.) associated with this experimental device + -> master + attribute_name : varchar(32) + --- + attribute_value=null : longblob + """ + + class RemovalTime(dj.Part): + definition = f""" + -> master + --- + patch_removal_time: datetime(6) # time of the patch being removed + """ + + +@schema +class UndergroundFeeder(dj.Manual): + definition = f""" + # underground_feeder placement and operation for a particular time period, at a certain location, for a given experiment (auto-generated with aeon_mecha-unknown) + -> acquisition.Experiment + -> Device + underground_feeder_install_time : datetime(6) # time of the underground_feeder placed and started operation at this position + --- + underground_feeder_name : varchar(36) + """ + + class Attribute(dj.Part): + definition = """ # metadata/attributes (e.g. FPS, config, calibration, etc.) associated with this experimental device + -> master + attribute_name : varchar(32) + --- + attribute_value=null : longblob + """ + + class RemovalTime(dj.Part): + definition = f""" + -> master + --- + underground_feeder_removal_time: datetime(6) # time of the underground_feeder being removed + """ + + +@schema +class VideoSource(dj.Manual): + definition = f""" + # video_source placement and operation for a particular time period, at a certain location, for a given experiment (auto-generated with aeon_mecha-unknown) + -> acquisition.Experiment + -> Device + video_source_install_time : datetime(6) # time of the video_source placed and started operation at this position + --- + video_source_name : varchar(36) + """ + + class Attribute(dj.Part): + definition = """ # metadata/attributes (e.g. FPS, config, calibration, etc.) associated with this experimental device + -> master + attribute_name : varchar(32) + --- + attribute_value=null : longblob + """ + + class RemovalTime(dj.Part): + definition = f""" + -> master + --- + video_source_removal_time: datetime(6) # time of the video_source being removed + """ + + +@schema +class PatchBeamBreak(dj.Imported): + definition = """# Raw per-chunk BeamBreak data stream from Patch (auto-generated with aeon_mecha-unknown) + -> Patch + -> acquisition.Chunk + --- + sample_count: int # number of data points acquired from this stream for a given chunk + timestamps: longblob # (datetime) timestamps of BeamBreak data + """ + _stream_reader = aeon.io.reader.BitmaskEvent + _stream_detail = {'stream_type': 'BeamBreak', 'stream_reader': 'aeon.io.reader.BitmaskEvent', 'stream_reader_kwargs': {'pattern': '{pattern}_32', 'value': 34, 'tag': 'BeamBroken'}, 'stream_description': '', 'stream_hash': UUID('b14171e6-d27d-117a-ae73-a16c4b5fc8a2')} + + @property + def key_source(self): + f""" + Only the combination of Chunk and Patch with overlapping time + + Chunk(s) that started after Patch install time and ended before Patch remove time + + Chunk(s) that started after Patch install time for Patch that are not yet removed + """ + return ( + acquisition.Chunk + * Patch.join(Patch.RemovalTime, left=True) + & 'chunk_start >= patch_install_time' + & 'chunk_start < IFNULL(patch_removal_time, "2200-01-01")' + ) + + def make(self, key): + chunk_start, chunk_end, dir_type = (acquisition.Chunk & key).fetch1( + "chunk_start", "chunk_end", "directory_type" + ) + raw_data_dir = acquisition.Experiment.get_data_directory( + key, directory_type=dir_type + ) + + device_name = (Patch & key).fetch1( + 'patch_name' + ) + + stream = self._stream_reader( + **{ + k: v.format(**{k: device_name}) if k == "pattern" else v + for k, v in self._stream_detail["stream_reader_kwargs"].items() + } + ) + + stream_data = io_api.load( + root=raw_data_dir.as_posix(), + reader=stream, + start=pd.Timestamp(chunk_start), + end=pd.Timestamp(chunk_end), + ) + + self.insert1( + { + **key, + "sample_count": len(stream_data), + "timestamps": stream_data.index.values, + **{ + c: stream_data[c].values + for c in stream.columns + if not c.startswith("_") + }, + } + ) + + +@schema +class PatchDeliverPellet(dj.Imported): + definition = """# Raw per-chunk DeliverPellet data stream from Patch (auto-generated with aeon_mecha-unknown) + -> Patch + -> acquisition.Chunk + --- + sample_count: int # number of data points acquired from this stream for a given chunk + timestamps: longblob # (datetime) timestamps of DeliverPellet data + """ + _stream_reader = aeon.io.reader.BitmaskEvent + _stream_detail = {'stream_type': 'DeliverPellet', 'stream_reader': 'aeon.io.reader.BitmaskEvent', 'stream_reader_kwargs': {'pattern': '{pattern}_35', 'value': 1, 'tag': 'TriggeredPellet'}, 'stream_description': '', 'stream_hash': UUID('c49dda51-2e38-8b49-d1d8-2e54ea928e9c')} + + @property + def key_source(self): + f""" + Only the combination of Chunk and Patch with overlapping time + + Chunk(s) that started after Patch install time and ended before Patch remove time + + Chunk(s) that started after Patch install time for Patch that are not yet removed + """ + return ( + acquisition.Chunk + * Patch.join(Patch.RemovalTime, left=True) + & 'chunk_start >= patch_install_time' + & 'chunk_start < IFNULL(patch_removal_time, "2200-01-01")' + ) + + def make(self, key): + chunk_start, chunk_end, dir_type = (acquisition.Chunk & key).fetch1( + "chunk_start", "chunk_end", "directory_type" + ) + raw_data_dir = acquisition.Experiment.get_data_directory( + key, directory_type=dir_type + ) + + device_name = (Patch & key).fetch1( + 'patch_name' + ) + + stream = self._stream_reader( + **{ + k: v.format(**{k: device_name}) if k == "pattern" else v + for k, v in self._stream_detail["stream_reader_kwargs"].items() + } + ) + + stream_data = io_api.load( + root=raw_data_dir.as_posix(), + reader=stream, + start=pd.Timestamp(chunk_start), + end=pd.Timestamp(chunk_end), + ) + + self.insert1( + { + **key, + "sample_count": len(stream_data), + "timestamps": stream_data.index.values, + **{ + c: stream_data[c].values + for c in stream.columns + if not c.startswith("_") + }, + } + ) + + +@schema +class PatchDepletionState(dj.Imported): + definition = """# Raw per-chunk DepletionState data stream from Patch (auto-generated with aeon_mecha-unknown) + -> Patch + -> acquisition.Chunk + --- + sample_count: int # number of data points acquired from this stream for a given chunk + timestamps: longblob # (datetime) timestamps of DepletionState data + """ + _stream_reader = aeon.schema.foraging._PatchState + _stream_detail = {'stream_type': 'DepletionState', 'stream_reader': 'aeon.schema.foraging._PatchState', 'stream_reader_kwargs': {'pattern': '{pattern}_State'}, 'stream_description': '', 'stream_hash': UUID('73025490-348c-18fd-d565-8e682b5b4bcd')} + + @property + def key_source(self): + f""" + Only the combination of Chunk and Patch with overlapping time + + Chunk(s) that started after Patch install time and ended before Patch remove time + + Chunk(s) that started after Patch install time for Patch that are not yet removed + """ + return ( + acquisition.Chunk + * Patch.join(Patch.RemovalTime, left=True) + & 'chunk_start >= patch_install_time' + & 'chunk_start < IFNULL(patch_removal_time, "2200-01-01")' + ) + + def make(self, key): + chunk_start, chunk_end, dir_type = (acquisition.Chunk & key).fetch1( + "chunk_start", "chunk_end", "directory_type" + ) + raw_data_dir = acquisition.Experiment.get_data_directory( + key, directory_type=dir_type + ) + + device_name = (Patch & key).fetch1( + 'patch_name' + ) + + stream = self._stream_reader( + **{ + k: v.format(**{k: device_name}) if k == "pattern" else v + for k, v in self._stream_detail["stream_reader_kwargs"].items() + } + ) + + stream_data = io_api.load( + root=raw_data_dir.as_posix(), + reader=stream, + start=pd.Timestamp(chunk_start), + end=pd.Timestamp(chunk_end), + ) + + self.insert1( + { + **key, + "sample_count": len(stream_data), + "timestamps": stream_data.index.values, + **{ + c: stream_data[c].values + for c in stream.columns + if not c.startswith("_") + }, + } + ) + + +@schema +class PatchEncoder(dj.Imported): + definition = """# Raw per-chunk Encoder data stream from Patch (auto-generated with aeon_mecha-unknown) + -> Patch + -> acquisition.Chunk + --- + sample_count: int # number of data points acquired from this stream for a given chunk + timestamps: longblob # (datetime) timestamps of Encoder data + """ + _stream_reader = aeon.io.reader.Encoder + _stream_detail = {'stream_type': 'Encoder', 'stream_reader': 'aeon.io.reader.Encoder', 'stream_reader_kwargs': {'pattern': '{pattern}_90'}, 'stream_description': '', 'stream_hash': UUID('45002714-c31d-b2b8-a6e6-6ae624385cc1')} + + @property + def key_source(self): + f""" + Only the combination of Chunk and Patch with overlapping time + + Chunk(s) that started after Patch install time and ended before Patch remove time + + Chunk(s) that started after Patch install time for Patch that are not yet removed + """ + return ( + acquisition.Chunk + * Patch.join(Patch.RemovalTime, left=True) + & 'chunk_start >= patch_install_time' + & 'chunk_start < IFNULL(patch_removal_time, "2200-01-01")' + ) + + def make(self, key): + chunk_start, chunk_end, dir_type = (acquisition.Chunk & key).fetch1( + "chunk_start", "chunk_end", "directory_type" + ) + raw_data_dir = acquisition.Experiment.get_data_directory( + key, directory_type=dir_type + ) + + device_name = (Patch & key).fetch1( + 'patch_name' + ) + + stream = self._stream_reader( + **{ + k: v.format(**{k: device_name}) if k == "pattern" else v + for k, v in self._stream_detail["stream_reader_kwargs"].items() + } + ) + + stream_data = io_api.load( + root=raw_data_dir.as_posix(), + reader=stream, + start=pd.Timestamp(chunk_start), + end=pd.Timestamp(chunk_end), + ) + + self.insert1( + { + **key, + "sample_count": len(stream_data), + "timestamps": stream_data.index.values, + **{ + c: stream_data[c].values + for c in stream.columns + if not c.startswith("_") + }, + } + ) + + +@schema +class UndergroundFeederBeamBreak(dj.Imported): + definition = """# Raw per-chunk BeamBreak data stream from UndergroundFeeder (auto-generated with aeon_mecha-unknown) + -> UndergroundFeeder + -> acquisition.Chunk + --- + sample_count: int # number of data points acquired from this stream for a given chunk + timestamps: longblob # (datetime) timestamps of BeamBreak data + """ + _stream_reader = aeon.io.reader.BitmaskEvent + _stream_detail = {'stream_type': 'BeamBreak', 'stream_reader': 'aeon.io.reader.BitmaskEvent', 'stream_reader_kwargs': {'pattern': '{pattern}_32', 'value': 34, 'tag': 'BeamBroken'}, 'stream_description': '', 'stream_hash': UUID('b14171e6-d27d-117a-ae73-a16c4b5fc8a2')} + + @property + def key_source(self): + f""" + Only the combination of Chunk and UndergroundFeeder with overlapping time + + Chunk(s) that started after UndergroundFeeder install time and ended before UndergroundFeeder remove time + + Chunk(s) that started after UndergroundFeeder install time for UndergroundFeeder that are not yet removed + """ + return ( + acquisition.Chunk + * UndergroundFeeder.join(UndergroundFeeder.RemovalTime, left=True) + & 'chunk_start >= underground_feeder_install_time' + & 'chunk_start < IFNULL(underground_feeder_removal_time, "2200-01-01")' + ) + + def make(self, key): + chunk_start, chunk_end, dir_type = (acquisition.Chunk & key).fetch1( + "chunk_start", "chunk_end", "directory_type" + ) + raw_data_dir = acquisition.Experiment.get_data_directory( + key, directory_type=dir_type + ) + + device_name = (UndergroundFeeder & key).fetch1( + 'underground_feeder_name' + ) + + stream = self._stream_reader( + **{ + k: v.format(**{k: device_name}) if k == "pattern" else v + for k, v in self._stream_detail["stream_reader_kwargs"].items() + } + ) + + stream_data = io_api.load( + root=raw_data_dir.as_posix(), + reader=stream, + start=pd.Timestamp(chunk_start), + end=pd.Timestamp(chunk_end), + ) + + self.insert1( + { + **key, + "sample_count": len(stream_data), + "timestamps": stream_data.index.values, + **{ + c: stream_data[c].values + for c in stream.columns + if not c.startswith("_") + }, + } + ) + + +@schema +class UndergroundFeederDeliverPellet(dj.Imported): + definition = """# Raw per-chunk DeliverPellet data stream from UndergroundFeeder (auto-generated with aeon_mecha-unknown) + -> UndergroundFeeder + -> acquisition.Chunk + --- + sample_count: int # number of data points acquired from this stream for a given chunk + timestamps: longblob # (datetime) timestamps of DeliverPellet data + """ + _stream_reader = aeon.io.reader.BitmaskEvent + _stream_detail = {'stream_type': 'DeliverPellet', 'stream_reader': 'aeon.io.reader.BitmaskEvent', 'stream_reader_kwargs': {'pattern': '{pattern}_35', 'value': 1, 'tag': 'TriggeredPellet'}, 'stream_description': '', 'stream_hash': UUID('c49dda51-2e38-8b49-d1d8-2e54ea928e9c')} + + @property + def key_source(self): + f""" + Only the combination of Chunk and UndergroundFeeder with overlapping time + + Chunk(s) that started after UndergroundFeeder install time and ended before UndergroundFeeder remove time + + Chunk(s) that started after UndergroundFeeder install time for UndergroundFeeder that are not yet removed + """ + return ( + acquisition.Chunk + * UndergroundFeeder.join(UndergroundFeeder.RemovalTime, left=True) + & 'chunk_start >= underground_feeder_install_time' + & 'chunk_start < IFNULL(underground_feeder_removal_time, "2200-01-01")' + ) + + def make(self, key): + chunk_start, chunk_end, dir_type = (acquisition.Chunk & key).fetch1( + "chunk_start", "chunk_end", "directory_type" + ) + raw_data_dir = acquisition.Experiment.get_data_directory( + key, directory_type=dir_type + ) + + device_name = (UndergroundFeeder & key).fetch1( + 'underground_feeder_name' + ) + + stream = self._stream_reader( + **{ + k: v.format(**{k: device_name}) if k == "pattern" else v + for k, v in self._stream_detail["stream_reader_kwargs"].items() + } + ) + + stream_data = io_api.load( + root=raw_data_dir.as_posix(), + reader=stream, + start=pd.Timestamp(chunk_start), + end=pd.Timestamp(chunk_end), + ) + + self.insert1( + { + **key, + "sample_count": len(stream_data), + "timestamps": stream_data.index.values, + **{ + c: stream_data[c].values + for c in stream.columns + if not c.startswith("_") + }, + } + ) + + +@schema +class UndergroundFeederDepletionState(dj.Imported): + definition = """# Raw per-chunk DepletionState data stream from UndergroundFeeder (auto-generated with aeon_mecha-unknown) + -> UndergroundFeeder + -> acquisition.Chunk + --- + sample_count: int # number of data points acquired from this stream for a given chunk + timestamps: longblob # (datetime) timestamps of DepletionState data + """ + _stream_reader = aeon.schema.foraging._PatchState + _stream_detail = {'stream_type': 'DepletionState', 'stream_reader': 'aeon.schema.foraging._PatchState', 'stream_reader_kwargs': {'pattern': '{pattern}_State'}, 'stream_description': '', 'stream_hash': UUID('73025490-348c-18fd-d565-8e682b5b4bcd')} + + @property + def key_source(self): + f""" + Only the combination of Chunk and UndergroundFeeder with overlapping time + + Chunk(s) that started after UndergroundFeeder install time and ended before UndergroundFeeder remove time + + Chunk(s) that started after UndergroundFeeder install time for UndergroundFeeder that are not yet removed + """ + return ( + acquisition.Chunk + * UndergroundFeeder.join(UndergroundFeeder.RemovalTime, left=True) + & 'chunk_start >= underground_feeder_install_time' + & 'chunk_start < IFNULL(underground_feeder_removal_time, "2200-01-01")' + ) + + def make(self, key): + chunk_start, chunk_end, dir_type = (acquisition.Chunk & key).fetch1( + "chunk_start", "chunk_end", "directory_type" + ) + raw_data_dir = acquisition.Experiment.get_data_directory( + key, directory_type=dir_type + ) + + device_name = (UndergroundFeeder & key).fetch1( + 'underground_feeder_name' + ) + + stream = self._stream_reader( + **{ + k: v.format(**{k: device_name}) if k == "pattern" else v + for k, v in self._stream_detail["stream_reader_kwargs"].items() + } + ) + + stream_data = io_api.load( + root=raw_data_dir.as_posix(), + reader=stream, + start=pd.Timestamp(chunk_start), + end=pd.Timestamp(chunk_end), + ) + + self.insert1( + { + **key, + "sample_count": len(stream_data), + "timestamps": stream_data.index.values, + **{ + c: stream_data[c].values + for c in stream.columns + if not c.startswith("_") + }, + } + ) + + +@schema +class UndergroundFeederEncoder(dj.Imported): + definition = """# Raw per-chunk Encoder data stream from UndergroundFeeder (auto-generated with aeon_mecha-unknown) + -> UndergroundFeeder + -> acquisition.Chunk + --- + sample_count: int # number of data points acquired from this stream for a given chunk + timestamps: longblob # (datetime) timestamps of Encoder data + """ + _stream_reader = aeon.io.reader.Encoder + _stream_detail = {'stream_type': 'Encoder', 'stream_reader': 'aeon.io.reader.Encoder', 'stream_reader_kwargs': {'pattern': '{pattern}_90'}, 'stream_description': '', 'stream_hash': UUID('45002714-c31d-b2b8-a6e6-6ae624385cc1')} + + @property + def key_source(self): + f""" + Only the combination of Chunk and UndergroundFeeder with overlapping time + + Chunk(s) that started after UndergroundFeeder install time and ended before UndergroundFeeder remove time + + Chunk(s) that started after UndergroundFeeder install time for UndergroundFeeder that are not yet removed + """ + return ( + acquisition.Chunk + * UndergroundFeeder.join(UndergroundFeeder.RemovalTime, left=True) + & 'chunk_start >= underground_feeder_install_time' + & 'chunk_start < IFNULL(underground_feeder_removal_time, "2200-01-01")' + ) + + def make(self, key): + chunk_start, chunk_end, dir_type = (acquisition.Chunk & key).fetch1( + "chunk_start", "chunk_end", "directory_type" + ) + raw_data_dir = acquisition.Experiment.get_data_directory( + key, directory_type=dir_type + ) + + device_name = (UndergroundFeeder & key).fetch1( + 'underground_feeder_name' + ) + + stream = self._stream_reader( + **{ + k: v.format(**{k: device_name}) if k == "pattern" else v + for k, v in self._stream_detail["stream_reader_kwargs"].items() + } + ) + + stream_data = io_api.load( + root=raw_data_dir.as_posix(), + reader=stream, + start=pd.Timestamp(chunk_start), + end=pd.Timestamp(chunk_end), + ) + + self.insert1( + { + **key, + "sample_count": len(stream_data), + "timestamps": stream_data.index.values, + **{ + c: stream_data[c].values + for c in stream.columns + if not c.startswith("_") + }, + } + ) + + +@schema +class VideoSourcePosition(dj.Imported): + definition = """# Raw per-chunk Position data stream from VideoSource (auto-generated with aeon_mecha-unknown) + -> VideoSource + -> acquisition.Chunk + --- + sample_count: int # number of data points acquired from this stream for a given chunk + timestamps: longblob # (datetime) timestamps of Position data + """ + _stream_reader = aeon.io.reader.Position + _stream_detail = {'stream_type': 'Position', 'stream_reader': 'aeon.io.reader.Position', 'stream_reader_kwargs': {'pattern': '{pattern}_200'}, 'stream_description': '', 'stream_hash': UUID('75f9f365-037a-1e9b-ad38-8b2b3783315d')} + + @property + def key_source(self): + f""" + Only the combination of Chunk and VideoSource with overlapping time + + Chunk(s) that started after VideoSource install time and ended before VideoSource remove time + + Chunk(s) that started after VideoSource install time for VideoSource that are not yet removed + """ + return ( + acquisition.Chunk + * VideoSource.join(VideoSource.RemovalTime, left=True) + & 'chunk_start >= video_source_install_time' + & 'chunk_start < IFNULL(video_source_removal_time, "2200-01-01")' + ) + + def make(self, key): + chunk_start, chunk_end, dir_type = (acquisition.Chunk & key).fetch1( + "chunk_start", "chunk_end", "directory_type" + ) + raw_data_dir = acquisition.Experiment.get_data_directory( + key, directory_type=dir_type + ) + + device_name = (VideoSource & key).fetch1( + 'video_source_name' + ) + + stream = self._stream_reader( + **{ + k: v.format(**{k: device_name}) if k == "pattern" else v + for k, v in self._stream_detail["stream_reader_kwargs"].items() + } + ) + + stream_data = io_api.load( + root=raw_data_dir.as_posix(), + reader=stream, + start=pd.Timestamp(chunk_start), + end=pd.Timestamp(chunk_end), + ) + + self.insert1( + { + **key, + "sample_count": len(stream_data), + "timestamps": stream_data.index.values, + **{ + c: stream_data[c].values + for c in stream.columns + if not c.startswith("_") + }, + } + ) + + +@schema +class VideoSourceRegion(dj.Imported): + definition = """# Raw per-chunk Region data stream from VideoSource (auto-generated with aeon_mecha-unknown) + -> VideoSource + -> acquisition.Chunk + --- + sample_count: int # number of data points acquired from this stream for a given chunk + timestamps: longblob # (datetime) timestamps of Region data + """ + _stream_reader = aeon.schema.foraging._RegionReader + _stream_detail = {'stream_type': 'Region', 'stream_reader': 'aeon.schema.foraging._RegionReader', 'stream_reader_kwargs': {'pattern': '{pattern}_201'}, 'stream_description': '', 'stream_hash': UUID('6234a429-8ae5-d7dc-41c8-602ac76da029')} + + @property + def key_source(self): + f""" + Only the combination of Chunk and VideoSource with overlapping time + + Chunk(s) that started after VideoSource install time and ended before VideoSource remove time + + Chunk(s) that started after VideoSource install time for VideoSource that are not yet removed + """ + return ( + acquisition.Chunk + * VideoSource.join(VideoSource.RemovalTime, left=True) + & 'chunk_start >= video_source_install_time' + & 'chunk_start < IFNULL(video_source_removal_time, "2200-01-01")' + ) + + def make(self, key): + chunk_start, chunk_end, dir_type = (acquisition.Chunk & key).fetch1( + "chunk_start", "chunk_end", "directory_type" + ) + raw_data_dir = acquisition.Experiment.get_data_directory( + key, directory_type=dir_type + ) + + device_name = (VideoSource & key).fetch1( + 'video_source_name' + ) + + stream = self._stream_reader( + **{ + k: v.format(**{k: device_name}) if k == "pattern" else v + for k, v in self._stream_detail["stream_reader_kwargs"].items() + } + ) + + stream_data = io_api.load( + root=raw_data_dir.as_posix(), + reader=stream, + start=pd.Timestamp(chunk_start), + end=pd.Timestamp(chunk_end), + ) + + self.insert1( + { + **key, + "sample_count": len(stream_data), + "timestamps": stream_data.index.values, + **{ + c: stream_data[c].values + for c in stream.columns + if not c.startswith("_") + }, + } + ) + + +@schema +class VideoSourceVideo(dj.Imported): + definition = """# Raw per-chunk Video data stream from VideoSource (auto-generated with aeon_mecha-unknown) + -> VideoSource + -> acquisition.Chunk + --- + sample_count: int # number of data points acquired from this stream for a given chunk + timestamps: longblob # (datetime) timestamps of Video data + """ + _stream_reader = aeon.io.reader.Video + _stream_detail = {'stream_type': 'Video', 'stream_reader': 'aeon.io.reader.Video', 'stream_reader_kwargs': {'pattern': '{pattern}'}, 'stream_description': '', 'stream_hash': UUID('4246295b-789f-206d-b413-7af25b7548b2')} + + @property + def key_source(self): + f""" + Only the combination of Chunk and VideoSource with overlapping time + + Chunk(s) that started after VideoSource install time and ended before VideoSource remove time + + Chunk(s) that started after VideoSource install time for VideoSource that are not yet removed + """ + return ( + acquisition.Chunk + * VideoSource.join(VideoSource.RemovalTime, left=True) + & 'chunk_start >= video_source_install_time' + & 'chunk_start < IFNULL(video_source_removal_time, "2200-01-01")' + ) + + def make(self, key): + chunk_start, chunk_end, dir_type = (acquisition.Chunk & key).fetch1( + "chunk_start", "chunk_end", "directory_type" + ) + raw_data_dir = acquisition.Experiment.get_data_directory( + key, directory_type=dir_type + ) + + device_name = (VideoSource & key).fetch1( + 'video_source_name' + ) + + stream = self._stream_reader( + **{ + k: v.format(**{k: device_name}) if k == "pattern" else v + for k, v in self._stream_detail["stream_reader_kwargs"].items() + } + ) + + stream_data = io_api.load( + root=raw_data_dir.as_posix(), + reader=stream, + start=pd.Timestamp(chunk_start), + end=pd.Timestamp(chunk_end), + ) + + self.insert1( + { + **key, + "sample_count": len(stream_data), + "timestamps": stream_data.index.values, + **{ + c: stream_data[c].values + for c in stream.columns + if not c.startswith("_") + }, + } + ) + + diff --git a/aeon/dj_pipeline/streams_maker.py b/aeon/dj_pipeline/streams_maker.py index 0bc19ea5..8b78f3ff 100644 --- a/aeon/dj_pipeline/streams_maker.py +++ b/aeon/dj_pipeline/streams_maker.py @@ -102,16 +102,18 @@ class RemovalTime(dj.Part): return ExperimentDevice -def get_device_stream_template(device_type: str, stream_type: str): +def get_device_stream_template(device_type: str, stream_type: str, streams_module): """Returns table class template for DeviceDataStream""" - context = inspect.currentframe().f_back.f_locals["context"] - ExperimentDevice = context[device_type] + ExperimentDevice = getattr(streams_module, device_type) # DeviceDataStream table(s) stream_detail = ( - StreamType - & (DeviceType.Stream & {"device_type": device_type, "stream_type": stream_type}) + streams_module.StreamType + & ( + streams_module.DeviceType.Stream + & {"device_type": device_type, "stream_type": stream_type} + ) ).fetch1() for i, n in enumerate(stream_detail["stream_reader"].split(".")): @@ -209,7 +211,8 @@ def main(create_tables=True): "#---- DO NOT MODIFY ----\n" "#---- THIS FILE IS AUTO-GENERATED BY `streams_maker.py` ----\n\n" "import datajoint as dj\n" - "import pandas as pd\n\n" + "import pandas as pd\n" + "from uuid import UUID\n\n" "import aeon\n" "from aeon.dj_pipeline import acquisition, get_schema_name\n" "from aeon.io import api as io_api\n\n" @@ -232,8 +235,7 @@ def main(create_tables=True): continue table_class = get_device_template(device_info["device_type"]) - # context[table_class.__name__] = table_class - # schema(table_class, context=context) + streams.__dict__[table_class.__name__] = table_class device_table_def = inspect.getsource(table_class).lstrip() replacements = { @@ -264,9 +266,9 @@ def main(create_tables=True): if hasattr(streams, table_name): continue - table_class = get_device_stream_template(device_type, stream_type) - # context[table_class.__name__] = table_class - # schema(table_class, context=context) + table_class = get_device_stream_template( + device_type, stream_type, streams_module=streams + ) stream_obj = table_class.__dict__["_stream_reader"] reader = stream_obj.__module__ + "." + stream_obj.__name__ From 5678b4a2d6e1a2f2c64c55ac2c04950d942053d9 Mon Sep 17 00:00:00 2001 From: Thinh Nguyen Date: Wed, 5 Jul 2023 17:04:51 -0500 Subject: [PATCH 09/12] update workers and epochs/chunks ingestion --- aeon/dj_pipeline/populate/process.py | 4 +- aeon/dj_pipeline/populate/worker.py | 72 +++++++++++++++++++--------- 2 files changed, 51 insertions(+), 25 deletions(-) diff --git a/aeon/dj_pipeline/populate/process.py b/aeon/dj_pipeline/populate/process.py index c83347cb..023425ad 100644 --- a/aeon/dj_pipeline/populate/process.py +++ b/aeon/dj_pipeline/populate/process.py @@ -36,7 +36,7 @@ from datajoint_utilities.dj_worker import parse_args from aeon.dj_pipeline.populate.worker import ( - high_priority, + acquisition_worker, mid_priority, streams_worker, logger, @@ -46,7 +46,7 @@ # ---- some wrappers to support execution as script or CLI configured_workers = { - "high_priority": high_priority, + "high_priority": acquisition_worker, "mid_priority": mid_priority, "streams_worker": streams_worker, } diff --git a/aeon/dj_pipeline/populate/worker.py b/aeon/dj_pipeline/populate/worker.py index 395023b0..1ca198a0 100644 --- a/aeon/dj_pipeline/populate/worker.py +++ b/aeon/dj_pipeline/populate/worker.py @@ -20,7 +20,7 @@ streams = streams_maker.main() __all__ = [ - "high_priority", + "acquisition_worker", "mid_priority", "streams_worker", "WorkerLog", @@ -30,34 +30,61 @@ # ---- Some constants ---- logger = dj.logger -_current_experiment = "exp0.2-r0" -worker_schema_name = db_prefix + "workerlog" +worker_schema_name = db_prefix + "worker" load_metadata.insert_stream_types() +# ---- Manage experiments for automated ingestion ---- + +schema = dj.Schema(worker_schema_name) + + +@schema +class AutomatedExperimentIngestion(dj.Manual): + definition = """ # experiments to undergo automated ingestion + -> acquisition.Experiment + """ + + +def ingest_colony_epochs_chunks(): + """ + Load and insert subjects from colony.csv + Ingest epochs and chunks + for experiments specified in AutomatedExperimentIngestion + """ + load_metadata.ingest_subject() + experiment_names = AutomatedExperimentIngestion.fetch("experiment_name") + for experiment_name in experiment_names: + acquisition.Epoch.ingest_epochs(experiment_name) + acquisition.Chunk.ingest_chunks(experiment_name) + + +def ingest_environment_visits(): + """ + Extract and insert complete visits + for experiments specified in AutomatedExperimentIngestion + """ + experiment_names = AutomatedExperimentIngestion.fetch("experiment_name") + analysis.ingest_environment_visits(experiment_names) + + # ---- Define worker(s) ---- -# configure a worker to process high-priority tasks -high_priority = DataJointWorker( - "high_priority", +# configure a worker to process `acquisition`-related tasks +acquisition_worker = DataJointWorker( + "acquisition_worker", worker_schema_name=worker_schema_name, db_prefix=db_prefix, run_duration=-1, sleep_duration=600, ) -high_priority(load_metadata.ingest_subject) -high_priority(acquisition.Epoch.ingest_epochs, experiment_name=_current_experiment) -high_priority(acquisition.Chunk.ingest_chunks, experiment_name=_current_experiment) -high_priority(acquisition.ExperimentLog) -high_priority(acquisition.SubjectEnterExit) -high_priority(acquisition.SubjectWeight) -high_priority(acquisition.FoodPatchEvent) -high_priority(acquisition.WheelState) -high_priority(acquisition.WeightMeasurement) -high_priority(acquisition.WeightMeasurementFiltered) - -high_priority( - analysis.ingest_environment_visits, experiment_names=[_current_experiment] -) +acquisition_worker(ingest_colony_epochs_chunks) +acquisition_worker(acquisition.ExperimentLog) +acquisition_worker(acquisition.SubjectEnterExit) +acquisition_worker(acquisition.SubjectWeight) +acquisition_worker(acquisition.FoodPatchEvent) +acquisition_worker(acquisition.WheelState) + +acquisition_worker(ingest_environment_visits) # configure a worker to process mid-priority tasks mid_priority = DataJointWorker( @@ -71,10 +98,9 @@ mid_priority(qc.CameraQC) mid_priority(tracking.CameraTracking) mid_priority(acquisition.FoodPatchWheel) +mid_priority(acquisition.WeightMeasurement) +mid_priority(acquisition.WeightMeasurementFiltered) -mid_priority( - analysis.visit.ingest_environment_visits, experiment_names=[_current_experiment] -) mid_priority(analysis.OverlapVisit) mid_priority(analysis.VisitSubjectPosition) From 4e162310933ec92b71b6908a5d5506a9c2743081 Mon Sep 17 00:00:00 2001 From: Thinh Nguyen Date: Thu, 6 Jul 2023 16:35:14 -0500 Subject: [PATCH 10/12] update worker containers and config --- aeon/dj_pipeline/acquisition.py | 5 ++--- aeon/dj_pipeline/populate/worker.py | 14 ++++++------- aeon/dj_pipeline/utils/load_metadata.py | 3 ++- aeon/dj_pipeline/{ => utils}/streams_maker.py | 0 docker/docker-compose.yml | 20 +++++++++++-------- 5 files changed, 23 insertions(+), 19 deletions(-) rename aeon/dj_pipeline/{ => utils}/streams_maker.py (100%) diff --git a/aeon/dj_pipeline/acquisition.py b/aeon/dj_pipeline/acquisition.py index 9a503a3a..03a9a0f4 100644 --- a/aeon/dj_pipeline/acquisition.py +++ b/aeon/dj_pipeline/acquisition.py @@ -10,7 +10,7 @@ from aeon.io import reader as io_reader from aeon.schema import dataset as aeon_schema -from . import get_schema_name, lab, subject +from . import get_schema_name from .utils import paths logger = dj.logger @@ -277,8 +277,7 @@ def ingest_epochs(cls, experiment_name, start=None, end=None): - if not specified, ingest all epochs Note: "start" and "end" are datetime specified a string in the format: "%Y-%m-%d %H:%M:%S" """ - from aeon.dj_pipeline import streams_maker - + from .utils import streams_maker from .utils.load_metadata import ( extract_epoch_config, ingest_epoch_metadata, diff --git a/aeon/dj_pipeline/populate/worker.py b/aeon/dj_pipeline/populate/worker.py index 1ca198a0..b57c4c4e 100644 --- a/aeon/dj_pipeline/populate/worker.py +++ b/aeon/dj_pipeline/populate/worker.py @@ -12,10 +12,10 @@ db_prefix, qc, report, - streams_maker, tracking, ) -from aeon.dj_pipeline.utils import load_metadata +from aeon.dj_pipeline.utils import load_metadata, streams_maker + streams = streams_maker.main() @@ -75,7 +75,7 @@ def ingest_environment_visits(): worker_schema_name=worker_schema_name, db_prefix=db_prefix, run_duration=-1, - sleep_duration=600, + sleep_duration=1200, ) acquisition_worker(ingest_colony_epochs_chunks) acquisition_worker(acquisition.ExperimentLog) @@ -92,7 +92,7 @@ def ingest_environment_visits(): worker_schema_name=worker_schema_name, db_prefix=db_prefix, run_duration=-1, - sleep_duration=120, + sleep_duration=3600, ) mid_priority(qc.CameraQC) @@ -121,10 +121,10 @@ def ingest_environment_visits(): "streams_worker", worker_schema_name=worker_schema_name, db_prefix=db_prefix, - run_duration=1, - sleep_duration=600, + run_duration=-1, + sleep_duration=1200, ) for attr in vars(streams).values(): if is_djtable(attr, dj.user_tables.AutoPopulate): - streams_worker(attr) + streams_worker(attr, max_calls=10) diff --git a/aeon/dj_pipeline/utils/load_metadata.py b/aeon/dj_pipeline/utils/load_metadata.py index 090b6b19..84de0a85 100644 --- a/aeon/dj_pipeline/utils/load_metadata.py +++ b/aeon/dj_pipeline/utils/load_metadata.py @@ -10,7 +10,8 @@ import pandas as pd from dotmap import DotMap -from aeon.dj_pipeline import acquisition, dict_to_uuid, subject, streams_maker +from aeon.dj_pipeline import acquisition, dict_to_uuid, subject +from aeon.dj_pipeline.utils import streams_maker from aeon.io import api as io_api diff --git a/aeon/dj_pipeline/streams_maker.py b/aeon/dj_pipeline/utils/streams_maker.py similarity index 100% rename from aeon/dj_pipeline/streams_maker.py rename to aeon/dj_pipeline/utils/streams_maker.py diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index a6d1868b..e2dce70a 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -45,22 +45,26 @@ x-aeon-ingest-common: &aeon-ingest-common max-file: "5" services: - ingest_high: + acquisition_worker: <<: *aeon-ingest-common - command: ["aeon_ingest", "high_priority", "--duration=-1", "--sleep=1200"] + command: ["aeon_ingest", "acquisition_worker"] - ingest_mid: + streams_worker: <<: *aeon-ingest-common depends_on: - ingest_high: + acquisition_worker: condition: service_started deploy: mode: replicated - replicas: 2 - command: ["aeon_ingest", "mid_priority", "--duration=-1", "--sleep=3600"] + replicas: 3 + command: ["aeon_ingest", "streams_worker"] - dev: + ingest_mid: <<: *aeon-ingest-common + depends_on: + acquisition_worker: + condition: service_started deploy: mode: replicated - replicas: 0 + replicas: 2 + command: ["aeon_ingest", "mid_priority"] From 68cb397ba39b90e60a52824aaef8e8869047c4a0 Mon Sep 17 00:00:00 2001 From: JaerongA Date: Fri, 7 Jul 2023 17:28:23 -0500 Subject: [PATCH 11/12] enable import streams from aeon.dj_pipeline --- aeon/dj_pipeline/__init__.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/aeon/dj_pipeline/__init__.py b/aeon/dj_pipeline/__init__.py index 4664cb93..ea805b57 100644 --- a/aeon/dj_pipeline/__init__.py +++ b/aeon/dj_pipeline/__init__.py @@ -16,6 +16,12 @@ repository_config = dj.config['custom'].get('repository_config', _default_repository_config) +try: + from .utils import streams_maker + streams = dj.VirtualModule("streams", streams_maker.STREAMS_MODULE_NAME) +except: + pass + def get_schema_name(name): return db_prefix + name From 3d66f67c5de2ab7c5be7b4d3ccf75899f28aecac Mon Sep 17 00:00:00 2001 From: Thinh Nguyen Date: Mon, 10 Jul 2023 12:50:54 -0500 Subject: [PATCH 12/12] Apply suggestions from code review Co-authored-by: JaerongA --- aeon/dj_pipeline/utils/streams_maker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aeon/dj_pipeline/utils/streams_maker.py b/aeon/dj_pipeline/utils/streams_maker.py index 8b78f3ff..46043e94 100644 --- a/aeon/dj_pipeline/utils/streams_maker.py +++ b/aeon/dj_pipeline/utils/streams_maker.py @@ -16,7 +16,7 @@ schema_name = get_schema_name("streams") STREAMS_MODULE_NAME = "streams" -_STREAMS_MODULE_FILE = Path(__file__).parent / f"{STREAMS_MODULE_NAME}.py" +_STREAMS_MODULE_FILE = Path(__file__).parent.parent / f"{STREAMS_MODULE_NAME}.py" class StreamType(dj.Lookup):