From 6bf6f03f299de34da8f9f6d421dc314e6d0ff8a4 Mon Sep 17 00:00:00 2001 From: Thinh Nguyen Date: Thu, 25 Apr 2024 08:54:59 -0500 Subject: [PATCH 1/4] fix: add `.sort_index()` on the streams dataframe --- aeon/dj_pipeline/__init__.py | 1 + aeon/dj_pipeline/analysis/block_analysis.py | 3 +-- aeon/dj_pipeline/utils/load_metadata.py | 3 +-- 3 files changed, 3 insertions(+), 4 deletions(-) diff --git a/aeon/dj_pipeline/__init__.py b/aeon/dj_pipeline/__init__.py index b319f55b..72e57718 100644 --- a/aeon/dj_pipeline/__init__.py +++ b/aeon/dj_pipeline/__init__.py @@ -44,6 +44,7 @@ def fetch_stream(query, drop_pk=True): df.drop(columns=cols2drop, inplace=True, errors="ignore") df.rename(columns={"timestamps": "time"}, inplace=True) df.set_index("time", inplace=True) + df.sort_index(inplace=True) return df diff --git a/aeon/dj_pipeline/analysis/block_analysis.py b/aeon/dj_pipeline/analysis/block_analysis.py index 517ff373..16e78872 100644 --- a/aeon/dj_pipeline/analysis/block_analysis.py +++ b/aeon/dj_pipeline/analysis/block_analysis.py @@ -58,7 +58,7 @@ def make(self, key): ) block_query = acquisition.Environment.BlockState & chunk_restriction - block_df = fetch_stream(block_query).sort_index()[previous_block_start:chunk_end] + block_df = fetch_stream(block_query)[previous_block_start:chunk_end] block_ends = block_df[block_df.pellet_ct.diff() < 0] @@ -323,7 +323,6 @@ def make(self, key): subject_names = [s["subject_name"] for s in block_subjects] # Construct subject position dataframe subjects_positions_df = pd.concat( - [ pd.DataFrame( {"subject_name": [s["subject_name"]] * len(s["position_timestamps"])} diff --git a/aeon/dj_pipeline/utils/load_metadata.py b/aeon/dj_pipeline/utils/load_metadata.py index 56993880..f2639c22 100644 --- a/aeon/dj_pipeline/utils/load_metadata.py +++ b/aeon/dj_pipeline/utils/load_metadata.py @@ -201,10 +201,9 @@ def ingest_epoch_metadata(experiment_name, devices_schema, metadata_yml_filepath if not (streams.Device & device_key): logger.warning( - f"Device {device_name} (serial number: {device_sn}) is not yet registered in streams.Device. Skipping..." + f"Device {device_name} (serial number: {device_sn}) is not yet registered in streams.Device.\nThis should not happen - check if metadata.yml and schemas dotmap are consistent. Skipping..." ) # skip if this device (with a serial number) is not yet inserted in streams.Device - # this should not happen - check if metadata.yml and schemas dotmap are consistent continue device_list.append(device_key) From 193f0472e49c67680d5bb708d26be2a594c7d504 Mon Sep 17 00:00:00 2001 From: Thinh Nguyen Date: Thu, 25 Apr 2024 08:59:43 -0500 Subject: [PATCH 2/4] fix: more robust automated analysis - skip `BlockAnalysis` that are not ready --- aeon/dj_pipeline/analysis/block_analysis.py | 5 ++--- aeon/dj_pipeline/populate/worker.py | 1 + 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/aeon/dj_pipeline/analysis/block_analysis.py b/aeon/dj_pipeline/analysis/block_analysis.py index 16e78872..83cf62a1 100644 --- a/aeon/dj_pipeline/analysis/block_analysis.py +++ b/aeon/dj_pipeline/analysis/block_analysis.py @@ -155,10 +155,9 @@ def make(self, key): ) for streams_table in streams_tables: if len(streams_table & chunk_keys) < len(streams_table.key_source & chunk_keys): - logger.info( - f"{streams_table.__name__} not yet fully ingested for block: {key}. Skip BlockAnalysis (to retry later)..." + raise ValueError( + f"BlockAnalysis Not Ready - {streams_table.__name__} not yet fully ingested for block: {key}. Skipping (to retry later)..." ) - return self.insert1({**key, "block_duration": (block_end - block_start).total_seconds() / 3600}) diff --git a/aeon/dj_pipeline/populate/worker.py b/aeon/dj_pipeline/populate/worker.py index c1a8f86c..18f9e60c 100644 --- a/aeon/dj_pipeline/populate/worker.py +++ b/aeon/dj_pipeline/populate/worker.py @@ -103,6 +103,7 @@ def ingest_environment_visits(): db_prefix=db_prefix, max_idled_cycle=6, sleep_duration=1200, + autoclear_error_patterns=["%BlockAnalysis Not Ready%"], ) analysis_worker(block_analysis.BlockAnalysis, max_calls=6) From 9c0c8c6536f6a246d27f842b4512cc219e0404da Mon Sep 17 00:00:00 2001 From: Thinh Nguyen Date: Thu, 25 Apr 2024 11:50:16 -0500 Subject: [PATCH 3/4] fix(chunk): fix first chunk starting before epoch start --- aeon/dj_pipeline/acquisition.py | 7 +------ aeon/dj_pipeline/analysis/block_analysis.py | 1 - aeon/dj_pipeline/populate/worker.py | 2 +- 3 files changed, 2 insertions(+), 8 deletions(-) diff --git a/aeon/dj_pipeline/acquisition.py b/aeon/dj_pipeline/acquisition.py index 9823435d..b20c1a0c 100644 --- a/aeon/dj_pipeline/acquisition.py +++ b/aeon/dj_pipeline/acquisition.py @@ -385,18 +385,13 @@ def ingest_chunks(cls, experiment_name): continue chunk_start = chunk.name + chunk_start = max(chunk_start, epoch_start) # first chunk of the epoch starts at epoch_start chunk_end = chunk_start + datetime.timedelta(hours=io_api.CHUNK_DURATION) if EpochEnd & epoch_key: epoch_end = (EpochEnd & epoch_key).fetch1("epoch_end") chunk_end = min(chunk_end, epoch_end) - if chunk_start in chunk_starts: - # handle cases where two chunks with identical start_time - # (starts in the same hour) but from 2 consecutive epochs - # using epoch_start as chunk_start in this case - chunk_start = epoch_start - # --- insert to Chunk --- chunk_key = {"experiment_name": experiment_name, "chunk_start": chunk_start} diff --git a/aeon/dj_pipeline/analysis/block_analysis.py b/aeon/dj_pipeline/analysis/block_analysis.py index 83cf62a1..17927918 100644 --- a/aeon/dj_pipeline/analysis/block_analysis.py +++ b/aeon/dj_pipeline/analysis/block_analysis.py @@ -1,5 +1,4 @@ import json - import datajoint as dj import numpy as np import pandas as pd diff --git a/aeon/dj_pipeline/populate/worker.py b/aeon/dj_pipeline/populate/worker.py index 18f9e60c..3153ffe6 100644 --- a/aeon/dj_pipeline/populate/worker.py +++ b/aeon/dj_pipeline/populate/worker.py @@ -87,6 +87,7 @@ def ingest_environment_visits(): db_prefix=db_prefix, max_idled_cycle=50, sleep_duration=60, + autoclear_error_patterns=["%BlockAnalysis Not Ready%"], ) for attr in vars(streams).values(): @@ -103,7 +104,6 @@ def ingest_environment_visits(): db_prefix=db_prefix, max_idled_cycle=6, sleep_duration=1200, - autoclear_error_patterns=["%BlockAnalysis Not Ready%"], ) analysis_worker(block_analysis.BlockAnalysis, max_calls=6) From dba170aa9a7d0a5993c2f1c65afa0c7e6e896fc7 Mon Sep 17 00:00:00 2001 From: Thinh Nguyen Date: Thu, 25 Apr 2024 12:26:24 -0500 Subject: [PATCH 4/4] disable automatic BlockDetection --- aeon/dj_pipeline/analysis/block_analysis.py | 2 +- aeon/dj_pipeline/populate/worker.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/aeon/dj_pipeline/analysis/block_analysis.py b/aeon/dj_pipeline/analysis/block_analysis.py index 17927918..e49cf71d 100644 --- a/aeon/dj_pipeline/analysis/block_analysis.py +++ b/aeon/dj_pipeline/analysis/block_analysis.py @@ -86,7 +86,7 @@ def make(self, key): block_entries[-1]["block_end"] = block_end block_entries.append({**exp_key, "block_start": block_end, "block_end": None}) - Block.insert(block_entries) + Block.insert(block_entries, skip_duplicates=True) self.insert1(key) diff --git a/aeon/dj_pipeline/populate/worker.py b/aeon/dj_pipeline/populate/worker.py index 3153ffe6..e538c8cb 100644 --- a/aeon/dj_pipeline/populate/worker.py +++ b/aeon/dj_pipeline/populate/worker.py @@ -64,7 +64,7 @@ def ingest_environment_visits(): acquisition_worker(acquisition.EpochConfig) acquisition_worker(acquisition.Environment) # acquisition_worker(ingest_environment_visits) -acquisition_worker(block_analysis.BlockDetection) +# acquisition_worker(block_analysis.BlockDetection) # configure a worker to handle pyrat sync pyrat_worker = DataJointWorker(