Skip to content

Commit

Permalink
Merge pull request #348 from ttngu207/datajoint_pipeline
Browse files Browse the repository at this point in the history
feat(dj): ingest data by searching multiple directories in specified order
  • Loading branch information
ttngu207 authored Apr 4, 2024
2 parents 5287aa6 + 52e7bba commit fc79be9
Show file tree
Hide file tree
Showing 7 changed files with 79 additions and 86 deletions.
14 changes: 9 additions & 5 deletions aeon/dj_pipeline/acquisition.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ class DirectoryType(dj.Lookup):
directory_type: varchar(16)
"""

contents = zip(["raw", "preprocessing", "analysis", "quality-control"])
contents = zip(["raw", "processed", "qc"])


# ------------------- GENERAL INFORMATION ABOUT AN EXPERIMENT --------------------
Expand Down Expand Up @@ -115,6 +115,7 @@ class Directory(dj.Part):
---
-> PipelineRepository
directory_path: varchar(255)
load_order=1: int # order of priority to load the directory
"""

class DevicesSchema(dj.Part):
Expand Down Expand Up @@ -155,10 +156,13 @@ def get_data_directory(cls, experiment_key, directory_type="raw", as_posix=False
@classmethod
def get_data_directories(cls, experiment_key, directory_types=None, as_posix=False):
if directory_types is None:
directory_types = ["raw"]
directory_types = (cls.Directory & experiment_key).fetch(
"directory_type", order_by="load_order"
)
return [
cls.get_data_directory(experiment_key, dir_type, as_posix=as_posix)
d
for dir_type in directory_types
if (d := cls.get_data_directory(experiment_key, dir_type, as_posix=as_posix)) is not None
]


Expand Down Expand Up @@ -542,7 +546,7 @@ def make(self, key):
chunk_start, chunk_end = (Chunk & key).fetch1("chunk_start", "chunk_end")

# Populate the part table
raw_data_dir = Experiment.get_data_directory(key)
data_dirs = Experiment.get_data_directories(key)
devices_schema = getattr(
aeon_schemas,
(Experiment.DevicesSchema & {"experiment_name": key["experiment_name"]}).fetch1(
Expand All @@ -565,7 +569,7 @@ def make(self, key):
stream_reader = getattr(device, stream_type)

stream_data = io_api.load(
root=raw_data_dir.as_posix(),
root=data_dirs,
reader=stream_reader,
start=pd.Timestamp(chunk_start),
end=pd.Timestamp(chunk_end),
Expand Down
1 change: 1 addition & 0 deletions aeon/dj_pipeline/analysis/block_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@ def make(self, key):
subject_names = [s["subject_name"] for s in block_subjects]
# Construct subject position dataframe
subjects_positions_df = pd.concat(

[
pd.DataFrame(
{"subject_name": [s["subject_name"]] * len(s["position_timestamps"])}
Expand Down
9 changes: 4 additions & 5 deletions aeon/dj_pipeline/qc.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,10 @@ def key_source(self):
) # CameraTop

def make(self, key):
chunk_start, chunk_end, dir_type = (acquisition.Chunk & key).fetch1(
"chunk_start", "chunk_end", "directory_type"
)
chunk_start, chunk_end = (acquisition.Chunk & key).fetch1("chunk_start", "chunk_end")

device_name = (streams.SpinnakerVideoSource & key).fetch1("spinnaker_video_source_name")
raw_data_dir = acquisition.Experiment.get_data_directory(key, directory_type=dir_type)
data_dirs = acquisition.Experiment.get_data_directories(key)

devices_schema = getattr(
acquisition.aeon_schemas,
Expand All @@ -81,7 +80,7 @@ def make(self, key):
stream_reader = getattr(getattr(devices_schema, device_name), "Video")

videodata = io_api.load(
root=raw_data_dir.as_posix(),
root=data_dirs,
reader=stream_reader,
start=pd.Timestamp(chunk_start),
end=pd.Timestamp(chunk_end),
Expand Down
Loading

0 comments on commit fc79be9

Please sign in to comment.