Skip to content

Commit

Permalink
Merge pull request #360 from ttngu207/datajoint_pipeline
Browse files Browse the repository at this point in the history
fix first chunk starting before epoch start & disable automatic BlockDetection
  • Loading branch information
ttngu207 authored Apr 25, 2024
2 parents df1cc61 + dba170a commit 5276cc1
Show file tree
Hide file tree
Showing 5 changed files with 9 additions and 16 deletions.
1 change: 1 addition & 0 deletions aeon/dj_pipeline/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ def fetch_stream(query, drop_pk=True):
df.drop(columns=cols2drop, inplace=True, errors="ignore")
df.rename(columns={"timestamps": "time"}, inplace=True)
df.set_index("time", inplace=True)
df.sort_index(inplace=True)
return df


Expand Down
7 changes: 1 addition & 6 deletions aeon/dj_pipeline/acquisition.py
Original file line number Diff line number Diff line change
Expand Up @@ -385,18 +385,13 @@ def ingest_chunks(cls, experiment_name):
continue

chunk_start = chunk.name
chunk_start = max(chunk_start, epoch_start) # first chunk of the epoch starts at epoch_start
chunk_end = chunk_start + datetime.timedelta(hours=io_api.CHUNK_DURATION)

if EpochEnd & epoch_key:
epoch_end = (EpochEnd & epoch_key).fetch1("epoch_end")
chunk_end = min(chunk_end, epoch_end)

if chunk_start in chunk_starts:
# handle cases where two chunks with identical start_time
# (starts in the same hour) but from 2 consecutive epochs
# using epoch_start as chunk_start in this case
chunk_start = epoch_start

# --- insert to Chunk ---
chunk_key = {"experiment_name": experiment_name, "chunk_start": chunk_start}

Expand Down
11 changes: 4 additions & 7 deletions aeon/dj_pipeline/analysis/block_analysis.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import json

import datajoint as dj
import numpy as np
import pandas as pd
Expand Down Expand Up @@ -58,7 +57,7 @@ def make(self, key):
)

block_query = acquisition.Environment.BlockState & chunk_restriction
block_df = fetch_stream(block_query).sort_index()[previous_block_start:chunk_end]
block_df = fetch_stream(block_query)[previous_block_start:chunk_end]

block_ends = block_df[block_df.pellet_ct.diff() < 0]

Expand Down Expand Up @@ -87,7 +86,7 @@ def make(self, key):
block_entries[-1]["block_end"] = block_end
block_entries.append({**exp_key, "block_start": block_end, "block_end": None})

Block.insert(block_entries)
Block.insert(block_entries, skip_duplicates=True)
self.insert1(key)


Expand Down Expand Up @@ -155,10 +154,9 @@ def make(self, key):
)
for streams_table in streams_tables:
if len(streams_table & chunk_keys) < len(streams_table.key_source & chunk_keys):
logger.info(
f"{streams_table.__name__} not yet fully ingested for block: {key}. Skip BlockAnalysis (to retry later)..."
raise ValueError(
f"BlockAnalysis Not Ready - {streams_table.__name__} not yet fully ingested for block: {key}. Skipping (to retry later)..."
)
return

self.insert1({**key, "block_duration": (block_end - block_start).total_seconds() / 3600})

Expand Down Expand Up @@ -323,7 +321,6 @@ def make(self, key):
subject_names = [s["subject_name"] for s in block_subjects]
# Construct subject position dataframe
subjects_positions_df = pd.concat(

[
pd.DataFrame(
{"subject_name": [s["subject_name"]] * len(s["position_timestamps"])}
Expand Down
3 changes: 2 additions & 1 deletion aeon/dj_pipeline/populate/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def ingest_environment_visits():
acquisition_worker(acquisition.EpochConfig)
acquisition_worker(acquisition.Environment)
# acquisition_worker(ingest_environment_visits)
acquisition_worker(block_analysis.BlockDetection)
# acquisition_worker(block_analysis.BlockDetection)

# configure a worker to handle pyrat sync
pyrat_worker = DataJointWorker(
Expand All @@ -87,6 +87,7 @@ def ingest_environment_visits():
db_prefix=db_prefix,
max_idled_cycle=50,
sleep_duration=60,
autoclear_error_patterns=["%BlockAnalysis Not Ready%"],
)

for attr in vars(streams).values():
Expand Down
3 changes: 1 addition & 2 deletions aeon/dj_pipeline/utils/load_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,10 +201,9 @@ def ingest_epoch_metadata(experiment_name, devices_schema, metadata_yml_filepath

if not (streams.Device & device_key):
logger.warning(
f"Device {device_name} (serial number: {device_sn}) is not yet registered in streams.Device. Skipping..."
f"Device {device_name} (serial number: {device_sn}) is not yet registered in streams.Device.\nThis should not happen - check if metadata.yml and schemas dotmap are consistent. Skipping..."
)
# skip if this device (with a serial number) is not yet inserted in streams.Device
# this should not happen - check if metadata.yml and schemas dotmap are consistent
continue

device_list.append(device_key)
Expand Down

0 comments on commit 5276cc1

Please sign in to comment.