From 0321b3339b87dd42177132d6ef60195b5e435234 Mon Sep 17 00:00:00 2001 From: Thinh Nguyen Date: Thu, 4 Apr 2024 13:06:22 -0500 Subject: [PATCH 1/2] feat(dj): ingest data by searching multiple directories in specified order --- aeon/dj_pipeline/acquisition.py | 14 ++- aeon/dj_pipeline/analysis/block_analysis.py | 1 + aeon/dj_pipeline/qc.py | 9 +- aeon/dj_pipeline/streams.py | 119 +++++++++----------- aeon/dj_pipeline/tracking.py | 11 +- aeon/dj_pipeline/utils/streams_maker.py | 8 +- 6 files changed, 76 insertions(+), 86 deletions(-) diff --git a/aeon/dj_pipeline/acquisition.py b/aeon/dj_pipeline/acquisition.py index 52dab8dc..2816b899 100644 --- a/aeon/dj_pipeline/acquisition.py +++ b/aeon/dj_pipeline/acquisition.py @@ -84,7 +84,7 @@ class DirectoryType(dj.Lookup): directory_type: varchar(16) """ - contents = zip(["raw", "preprocessing", "analysis", "quality-control"]) + contents = zip(["raw", "processed", "qc"]) # ------------------- GENERAL INFORMATION ABOUT AN EXPERIMENT -------------------- @@ -115,6 +115,7 @@ class Directory(dj.Part): --- -> PipelineRepository directory_path: varchar(255) + load_order=1: int # order of priority to load the directory """ class DevicesSchema(dj.Part): @@ -155,10 +156,13 @@ def get_data_directory(cls, experiment_key, directory_type="raw", as_posix=False @classmethod def get_data_directories(cls, experiment_key, directory_types=None, as_posix=False): if directory_types is None: - directory_types = ["raw"] + directory_types = (cls.Directory & experiment_key).fetch( + "directory_type", order_by="load_order" + ) return [ - cls.get_data_directory(experiment_key, dir_type, as_posix=as_posix) + d for dir_type in directory_types + if (d := cls.get_data_directory(experiment_key, dir_type, as_posix=as_posix)) is not None ] @@ -542,7 +546,7 @@ def make(self, key): chunk_start, chunk_end = (Chunk & key).fetch1("chunk_start", "chunk_end") # Populate the part table - raw_data_dir = Experiment.get_data_directory(key) + data_dirs = Experiment.get_data_directories(key) devices_schema = getattr( aeon_schemas, (Experiment.DevicesSchema & {"experiment_name": key["experiment_name"]}).fetch1( @@ -565,7 +569,7 @@ def make(self, key): stream_reader = getattr(device, stream_type) stream_data = io_api.load( - root=raw_data_dir.as_posix(), + root=data_dirs, reader=stream_reader, start=pd.Timestamp(chunk_start), end=pd.Timestamp(chunk_end), diff --git a/aeon/dj_pipeline/analysis/block_analysis.py b/aeon/dj_pipeline/analysis/block_analysis.py index d7c134b8..6d120805 100644 --- a/aeon/dj_pipeline/analysis/block_analysis.py +++ b/aeon/dj_pipeline/analysis/block_analysis.py @@ -253,6 +253,7 @@ def make(self, key): subject_names = [s["subject_name"] for s in block_subjects] # Construct subject position dataframe subjects_positions_df = pd.concat( + [ pd.DataFrame( {"subject_name": [s["subject_name"]] * len(s["position_timestamps"])} diff --git a/aeon/dj_pipeline/qc.py b/aeon/dj_pipeline/qc.py index b33a030b..0a9bd4e9 100644 --- a/aeon/dj_pipeline/qc.py +++ b/aeon/dj_pipeline/qc.py @@ -66,11 +66,10 @@ def key_source(self): ) # CameraTop def make(self, key): - chunk_start, chunk_end, dir_type = (acquisition.Chunk & key).fetch1( - "chunk_start", "chunk_end", "directory_type" - ) + chunk_start, chunk_end = (acquisition.Chunk & key).fetch1("chunk_start", "chunk_end") + device_name = (streams.SpinnakerVideoSource & key).fetch1("spinnaker_video_source_name") - raw_data_dir = acquisition.Experiment.get_data_directory(key, directory_type=dir_type) + data_dirs = acquisition.Experiment.get_data_directories(key) devices_schema = getattr( acquisition.aeon_schemas, @@ -81,7 +80,7 @@ def make(self, key): stream_reader = getattr(getattr(devices_schema, device_name), "Video") videodata = io_api.load( - root=raw_data_dir.as_posix(), + root=data_dirs, reader=stream_reader, start=pd.Timestamp(chunk_start), end=pd.Timestamp(chunk_end), diff --git a/aeon/dj_pipeline/streams.py b/aeon/dj_pipeline/streams.py index 49558b03..4cd482a0 100644 --- a/aeon/dj_pipeline/streams.py +++ b/aeon/dj_pipeline/streams.py @@ -188,10 +188,9 @@ def key_source(self): ) def make(self, key): - chunk_start, chunk_end, dir_type = (acquisition.Chunk & key).fetch1( - "chunk_start", "chunk_end", "directory_type" - ) - raw_data_dir = acquisition.Experiment.get_data_directory(key, directory_type=dir_type) + chunk_start, chunk_end = (acquisition.Chunk & key).fetch1("chunk_start", "chunk_end") + + data_dirs = acquisition.Experiment.get_data_directories(key) device_name = (RfidReader & key).fetch1('rfid_reader_name') @@ -204,7 +203,7 @@ def make(self, key): stream_reader = getattr(getattr(devices_schema, device_name), "RfidEvents") stream_data = io_api.load( - root=raw_data_dir.as_posix(), + root=data_dirs, reader=stream_reader, start=pd.Timestamp(chunk_start), end=pd.Timestamp(chunk_end), @@ -225,7 +224,7 @@ def make(self, key): ) -@schema +@schema class SpinnakerVideoSourceVideo(dj.Imported): definition = """ # Raw per-chunk Video data stream from SpinnakerVideoSource (auto-generated with aeon_mecha-unknown) -> SpinnakerVideoSource @@ -251,10 +250,9 @@ def key_source(self): ) def make(self, key): - chunk_start, chunk_end, dir_type = (acquisition.Chunk & key).fetch1( - "chunk_start", "chunk_end", "directory_type" - ) - raw_data_dir = acquisition.Experiment.get_data_directory(key, directory_type=dir_type) + chunk_start, chunk_end = (acquisition.Chunk & key).fetch1("chunk_start", "chunk_end") + + data_dirs = acquisition.Experiment.get_data_directories(key) device_name = (SpinnakerVideoSource & key).fetch1('spinnaker_video_source_name') @@ -267,7 +265,7 @@ def make(self, key): stream_reader = getattr(getattr(devices_schema, device_name), "Video") stream_data = io_api.load( - root=raw_data_dir.as_posix(), + root=data_dirs, reader=stream_reader, start=pd.Timestamp(chunk_start), end=pd.Timestamp(chunk_end), @@ -288,7 +286,7 @@ def make(self, key): ) -@schema +@schema class UndergroundFeederBeamBreak(dj.Imported): definition = """ # Raw per-chunk BeamBreak data stream from UndergroundFeeder (auto-generated with aeon_mecha-unknown) -> UndergroundFeeder @@ -313,10 +311,9 @@ def key_source(self): ) def make(self, key): - chunk_start, chunk_end, dir_type = (acquisition.Chunk & key).fetch1( - "chunk_start", "chunk_end", "directory_type" - ) - raw_data_dir = acquisition.Experiment.get_data_directory(key, directory_type=dir_type) + chunk_start, chunk_end = (acquisition.Chunk & key).fetch1("chunk_start", "chunk_end") + + data_dirs = acquisition.Experiment.get_data_directories(key) device_name = (UndergroundFeeder & key).fetch1('underground_feeder_name') @@ -329,7 +326,7 @@ def make(self, key): stream_reader = getattr(getattr(devices_schema, device_name), "BeamBreak") stream_data = io_api.load( - root=raw_data_dir.as_posix(), + root=data_dirs, reader=stream_reader, start=pd.Timestamp(chunk_start), end=pd.Timestamp(chunk_end), @@ -350,7 +347,7 @@ def make(self, key): ) -@schema +@schema class UndergroundFeederDeliverPellet(dj.Imported): definition = """ # Raw per-chunk DeliverPellet data stream from UndergroundFeeder (auto-generated with aeon_mecha-unknown) -> UndergroundFeeder @@ -375,10 +372,9 @@ def key_source(self): ) def make(self, key): - chunk_start, chunk_end, dir_type = (acquisition.Chunk & key).fetch1( - "chunk_start", "chunk_end", "directory_type" - ) - raw_data_dir = acquisition.Experiment.get_data_directory(key, directory_type=dir_type) + chunk_start, chunk_end = (acquisition.Chunk & key).fetch1("chunk_start", "chunk_end") + + data_dirs = acquisition.Experiment.get_data_directories(key) device_name = (UndergroundFeeder & key).fetch1('underground_feeder_name') @@ -391,7 +387,7 @@ def make(self, key): stream_reader = getattr(getattr(devices_schema, device_name), "DeliverPellet") stream_data = io_api.load( - root=raw_data_dir.as_posix(), + root=data_dirs, reader=stream_reader, start=pd.Timestamp(chunk_start), end=pd.Timestamp(chunk_end), @@ -412,7 +408,7 @@ def make(self, key): ) -@schema +@schema class UndergroundFeederDepletionState(dj.Imported): definition = """ # Raw per-chunk DepletionState data stream from UndergroundFeeder (auto-generated with aeon_mecha-unknown) -> UndergroundFeeder @@ -439,10 +435,9 @@ def key_source(self): ) def make(self, key): - chunk_start, chunk_end, dir_type = (acquisition.Chunk & key).fetch1( - "chunk_start", "chunk_end", "directory_type" - ) - raw_data_dir = acquisition.Experiment.get_data_directory(key, directory_type=dir_type) + chunk_start, chunk_end = (acquisition.Chunk & key).fetch1("chunk_start", "chunk_end") + + data_dirs = acquisition.Experiment.get_data_directories(key) device_name = (UndergroundFeeder & key).fetch1('underground_feeder_name') @@ -455,7 +450,7 @@ def make(self, key): stream_reader = getattr(getattr(devices_schema, device_name), "DepletionState") stream_data = io_api.load( - root=raw_data_dir.as_posix(), + root=data_dirs, reader=stream_reader, start=pd.Timestamp(chunk_start), end=pd.Timestamp(chunk_end), @@ -476,7 +471,7 @@ def make(self, key): ) -@schema +@schema class UndergroundFeederEncoder(dj.Imported): definition = """ # Raw per-chunk Encoder data stream from UndergroundFeeder (auto-generated with aeon_mecha-unknown) -> UndergroundFeeder @@ -502,10 +497,9 @@ def key_source(self): ) def make(self, key): - chunk_start, chunk_end, dir_type = (acquisition.Chunk & key).fetch1( - "chunk_start", "chunk_end", "directory_type" - ) - raw_data_dir = acquisition.Experiment.get_data_directory(key, directory_type=dir_type) + chunk_start, chunk_end = (acquisition.Chunk & key).fetch1("chunk_start", "chunk_end") + + data_dirs = acquisition.Experiment.get_data_directories(key) device_name = (UndergroundFeeder & key).fetch1('underground_feeder_name') @@ -518,7 +512,7 @@ def make(self, key): stream_reader = getattr(getattr(devices_schema, device_name), "Encoder") stream_data = io_api.load( - root=raw_data_dir.as_posix(), + root=data_dirs, reader=stream_reader, start=pd.Timestamp(chunk_start), end=pd.Timestamp(chunk_end), @@ -539,7 +533,7 @@ def make(self, key): ) -@schema +@schema class UndergroundFeederManualDelivery(dj.Imported): definition = """ # Raw per-chunk ManualDelivery data stream from UndergroundFeeder (auto-generated with aeon_mecha-unknown) -> UndergroundFeeder @@ -564,10 +558,9 @@ def key_source(self): ) def make(self, key): - chunk_start, chunk_end, dir_type = (acquisition.Chunk & key).fetch1( - "chunk_start", "chunk_end", "directory_type" - ) - raw_data_dir = acquisition.Experiment.get_data_directory(key, directory_type=dir_type) + chunk_start, chunk_end = (acquisition.Chunk & key).fetch1("chunk_start", "chunk_end") + + data_dirs = acquisition.Experiment.get_data_directories(key) device_name = (UndergroundFeeder & key).fetch1('underground_feeder_name') @@ -580,7 +573,7 @@ def make(self, key): stream_reader = getattr(getattr(devices_schema, device_name), "ManualDelivery") stream_data = io_api.load( - root=raw_data_dir.as_posix(), + root=data_dirs, reader=stream_reader, start=pd.Timestamp(chunk_start), end=pd.Timestamp(chunk_end), @@ -601,7 +594,7 @@ def make(self, key): ) -@schema +@schema class UndergroundFeederMissedPellet(dj.Imported): definition = """ # Raw per-chunk MissedPellet data stream from UndergroundFeeder (auto-generated with aeon_mecha-unknown) -> UndergroundFeeder @@ -626,10 +619,9 @@ def key_source(self): ) def make(self, key): - chunk_start, chunk_end, dir_type = (acquisition.Chunk & key).fetch1( - "chunk_start", "chunk_end", "directory_type" - ) - raw_data_dir = acquisition.Experiment.get_data_directory(key, directory_type=dir_type) + chunk_start, chunk_end = (acquisition.Chunk & key).fetch1("chunk_start", "chunk_end") + + data_dirs = acquisition.Experiment.get_data_directories(key) device_name = (UndergroundFeeder & key).fetch1('underground_feeder_name') @@ -642,7 +634,7 @@ def make(self, key): stream_reader = getattr(getattr(devices_schema, device_name), "MissedPellet") stream_data = io_api.load( - root=raw_data_dir.as_posix(), + root=data_dirs, reader=stream_reader, start=pd.Timestamp(chunk_start), end=pd.Timestamp(chunk_end), @@ -663,7 +655,7 @@ def make(self, key): ) -@schema +@schema class UndergroundFeederRetriedDelivery(dj.Imported): definition = """ # Raw per-chunk RetriedDelivery data stream from UndergroundFeeder (auto-generated with aeon_mecha-unknown) -> UndergroundFeeder @@ -688,10 +680,9 @@ def key_source(self): ) def make(self, key): - chunk_start, chunk_end, dir_type = (acquisition.Chunk & key).fetch1( - "chunk_start", "chunk_end", "directory_type" - ) - raw_data_dir = acquisition.Experiment.get_data_directory(key, directory_type=dir_type) + chunk_start, chunk_end = (acquisition.Chunk & key).fetch1("chunk_start", "chunk_end") + + data_dirs = acquisition.Experiment.get_data_directories(key) device_name = (UndergroundFeeder & key).fetch1('underground_feeder_name') @@ -704,7 +695,7 @@ def make(self, key): stream_reader = getattr(getattr(devices_schema, device_name), "RetriedDelivery") stream_data = io_api.load( - root=raw_data_dir.as_posix(), + root=data_dirs, reader=stream_reader, start=pd.Timestamp(chunk_start), end=pd.Timestamp(chunk_end), @@ -725,7 +716,7 @@ def make(self, key): ) -@schema +@schema class WeightScaleWeightFiltered(dj.Imported): definition = """ # Raw per-chunk WeightFiltered data stream from WeightScale (auto-generated with aeon_mecha-unknown) -> WeightScale @@ -751,10 +742,9 @@ def key_source(self): ) def make(self, key): - chunk_start, chunk_end, dir_type = (acquisition.Chunk & key).fetch1( - "chunk_start", "chunk_end", "directory_type" - ) - raw_data_dir = acquisition.Experiment.get_data_directory(key, directory_type=dir_type) + chunk_start, chunk_end = (acquisition.Chunk & key).fetch1("chunk_start", "chunk_end") + + data_dirs = acquisition.Experiment.get_data_directories(key) device_name = (WeightScale & key).fetch1('weight_scale_name') @@ -767,7 +757,7 @@ def make(self, key): stream_reader = getattr(getattr(devices_schema, device_name), "WeightFiltered") stream_data = io_api.load( - root=raw_data_dir.as_posix(), + root=data_dirs, reader=stream_reader, start=pd.Timestamp(chunk_start), end=pd.Timestamp(chunk_end), @@ -788,7 +778,7 @@ def make(self, key): ) -@schema +@schema class WeightScaleWeightRaw(dj.Imported): definition = """ # Raw per-chunk WeightRaw data stream from WeightScale (auto-generated with aeon_mecha-unknown) -> WeightScale @@ -814,10 +804,9 @@ def key_source(self): ) def make(self, key): - chunk_start, chunk_end, dir_type = (acquisition.Chunk & key).fetch1( - "chunk_start", "chunk_end", "directory_type" - ) - raw_data_dir = acquisition.Experiment.get_data_directory(key, directory_type=dir_type) + chunk_start, chunk_end = (acquisition.Chunk & key).fetch1("chunk_start", "chunk_end") + + data_dirs = acquisition.Experiment.get_data_directories(key) device_name = (WeightScale & key).fetch1('weight_scale_name') @@ -830,7 +819,7 @@ def make(self, key): stream_reader = getattr(getattr(devices_schema, device_name), "WeightRaw") stream_data = io_api.load( - root=raw_data_dir.as_posix(), + root=data_dirs, reader=stream_reader, start=pd.Timestamp(chunk_start), end=pd.Timestamp(chunk_end), diff --git a/aeon/dj_pipeline/tracking.py b/aeon/dj_pipeline/tracking.py index 225430fa..6d1ed307 100644 --- a/aeon/dj_pipeline/tracking.py +++ b/aeon/dj_pipeline/tracking.py @@ -150,10 +150,9 @@ def key_source(self): ) # SLEAP & CameraTop def make(self, key): - chunk_start, chunk_end, dir_type = (acquisition.Chunk & key).fetch1( - "chunk_start", "chunk_end", "directory_type" - ) - raw_data_dir = acquisition.Experiment.get_data_directory(key, directory_type=dir_type) + chunk_start, chunk_end = (acquisition.Chunk & key).fetch1("chunk_start", "chunk_end") + + data_dirs = acquisition.Experiment.get_data_directories(key) device_name = (streams.SpinnakerVideoSource & key).fetch1("spinnaker_video_source_name") @@ -166,7 +165,7 @@ def make(self, key): stream_reader = getattr(getattr(devices_schema, device_name), "Pose") pose_data = io_api.load( - root=raw_data_dir.as_posix(), + root=data_dirs, reader=stream_reader, start=pd.Timestamp(chunk_start), end=pd.Timestamp(chunk_end), @@ -179,7 +178,7 @@ def make(self, key): # Find the config file for the SLEAP model try: f = next( - raw_data_dir.glob( + data_dirs.glob( f"**/**/{stream_reader.pattern}{io_api.chunk(chunk_start).strftime('%Y-%m-%dT%H-%M-%S')}*.{stream_reader.extension}" ) ) diff --git a/aeon/dj_pipeline/utils/streams_maker.py b/aeon/dj_pipeline/utils/streams_maker.py index d333387e..3e5acafc 100644 --- a/aeon/dj_pipeline/utils/streams_maker.py +++ b/aeon/dj_pipeline/utils/streams_maker.py @@ -146,10 +146,8 @@ def key_source(self): ) def make(self, key): - chunk_start, chunk_end, dir_type = (acquisition.Chunk & key).fetch1( - "chunk_start", "chunk_end", "directory_type" - ) - raw_data_dir = acquisition.Experiment.get_data_directory(key, directory_type=dir_type) + chunk_start, chunk_end = (acquisition.Chunk & key).fetch1("chunk_start", "chunk_end") + data_dirs = acquisition.Experiment.get_data_directories(key) device_name = (ExperimentDevice & key).fetch1(f"{dj.utils.from_camel_case(device_type)}_name") @@ -162,7 +160,7 @@ def make(self, key): stream_reader = getattr(getattr(devices_schema, device_name), "{stream_type}") stream_data = io_api.load( - root=raw_data_dir.as_posix(), + root=data_dirs, reader=stream_reader, start=pd.Timestamp(chunk_start), end=pd.Timestamp(chunk_end), From 52e7bba37489795691eddbc4fdd0dd3be53639e4 Mon Sep 17 00:00:00 2001 From: Thinh Nguyen Date: Thu, 4 Apr 2024 13:21:08 -0500 Subject: [PATCH 2/2] feat(sciviz): add `load_order` in Directory input page --- aeon/dj_pipeline/webapps/sciviz/specsheet.yaml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/aeon/dj_pipeline/webapps/sciviz/specsheet.yaml b/aeon/dj_pipeline/webapps/sciviz/specsheet.yaml index ea2ef70b..dceab461 100644 --- a/aeon/dj_pipeline/webapps/sciviz/specsheet.yaml +++ b/aeon/dj_pipeline/webapps/sciviz/specsheet.yaml @@ -198,6 +198,9 @@ SciViz: - type: attribute input: Directory Path destination: directory_path + - type: attribute + input: Loading Order + destination: load_order New Experiment Type: route: /exp_type_form