Skip to content

Commit

Permalink
Merge pull request #212 from ttngu207/datajoint_presocial
Browse files Browse the repository at this point in the history
refactor - explicit `streams_maker`
  • Loading branch information
JaerongA authored Jul 10, 2023
2 parents 6b34e43 + 153c8e0 commit 0399603
Show file tree
Hide file tree
Showing 10 changed files with 1,228 additions and 259 deletions.
6 changes: 6 additions & 0 deletions aeon/dj_pipeline/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@
repository_config = dj.config['custom'].get('repository_config',
_default_repository_config)

try:
from .utils import streams_maker
streams = dj.VirtualModule("streams", streams_maker.STREAMS_MODULE_NAME)
except:
pass


def get_schema_name(name):
return db_prefix + name
Expand Down
19 changes: 10 additions & 9 deletions aeon/dj_pipeline/acquisition.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,11 @@
from aeon.io import reader as io_reader
from aeon.schema import dataset as aeon_schema

from . import get_schema_name, lab, subject
from . import get_schema_name
from .utils import paths

logger = dj.logger
schema = dj.schema(get_schema_name("acquisition"))
# streams = dj.VirtualModule("streams", get_schema_name("streams"))

# ------------------- Some Constants --------------------------

Expand Down Expand Up @@ -124,7 +123,7 @@ class Directory(dj.Part):

@classmethod
def get_data_directory(cls, experiment_key, directory_type="raw", as_posix=False):

try:
repo_name, dir_path = (
cls.Directory & experiment_key & {"directory_type": directory_type}
Expand All @@ -135,6 +134,7 @@ def get_data_directory(cls, experiment_key, directory_type="raw", as_posix=False
return data_directory.as_posix() if as_posix else data_directory
except dj.errors.DataJointError:
return

@classmethod
def get_data_directories(
cls, experiment_key, directory_types=["raw"], as_posix=False
Expand Down Expand Up @@ -277,14 +277,13 @@ def ingest_epochs(cls, experiment_name, start=None, end=None):
- if not specified, ingest all epochs
Note: "start" and "end" are datetime specified a string in the format: "%Y-%m-%d %H:%M:%S"
"""
from aeon.dj_pipeline import streams

from .utils import streams_maker
from .utils.load_metadata import (
extract_epoch_config,
ingest_epoch_metadata,
insert_device_types,
)

device_name = _ref_device_mapping.get(experiment_name, "CameraTop")

all_chunks, raw_data_dirs = _get_all_chunks(experiment_name, device_name)
Expand Down Expand Up @@ -363,15 +362,17 @@ def ingest_epochs(cls, experiment_name, start=None, end=None):
if epoch_config:
cls.Config.insert1(epoch_config)
if metadata_yml_filepath and metadata_yml_filepath.exists():

try:
# Ingest streams.DeviceType, streams.Device and create device tables.
# Insert new entries for streams.DeviceType, streams.Device.
insert_device_types(
_device_schema_mapping[epoch_key["experiment_name"]],
metadata_yml_filepath,
)
streams.main(context=streams.__dict__) # create device tables under streams schema
# Define and instantiate new devices/stream tables under `streams` schema
streams_maker.main()
with cls.connection.transaction:
# Insert devices' installation/removal/settings
ingest_epoch_metadata(
experiment_name, metadata_yml_filepath
)
Expand Down
4 changes: 2 additions & 2 deletions aeon/dj_pipeline/populate/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
from datajoint_utilities.dj_worker import parse_args

from aeon.dj_pipeline.populate.worker import (
high_priority,
acquisition_worker,
mid_priority,
streams_worker,
logger,
Expand All @@ -46,7 +46,7 @@
# ---- some wrappers to support execution as script or CLI

configured_workers = {
"high_priority": high_priority,
"high_priority": acquisition_worker,
"mid_priority": mid_priority,
"streams_worker": streams_worker,
}
Expand Down
90 changes: 59 additions & 31 deletions aeon/dj_pipeline/populate/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@
db_prefix,
qc,
report,
streams,
tracking,
)
from aeon.dj_pipeline.utils import load_metadata
from aeon.dj_pipeline.utils import load_metadata, streams_maker


streams = streams_maker.main()

__all__ = [
"high_priority",
"acquisition_worker",
"mid_priority",
"streams_worker",
"WorkerLog",
Expand All @@ -28,51 +30,77 @@

# ---- Some constants ----
logger = dj.logger
_current_experiment = "exp0.2-r0"
worker_schema_name = db_prefix + "workerlog"
worker_schema_name = db_prefix + "worker"
load_metadata.insert_stream_types()


# ---- Manage experiments for automated ingestion ----

schema = dj.Schema(worker_schema_name)


@schema
class AutomatedExperimentIngestion(dj.Manual):
definition = """ # experiments to undergo automated ingestion
-> acquisition.Experiment
"""


def ingest_colony_epochs_chunks():
"""
Load and insert subjects from colony.csv
Ingest epochs and chunks
for experiments specified in AutomatedExperimentIngestion
"""
load_metadata.ingest_subject()
experiment_names = AutomatedExperimentIngestion.fetch("experiment_name")
for experiment_name in experiment_names:
acquisition.Epoch.ingest_epochs(experiment_name)
acquisition.Chunk.ingest_chunks(experiment_name)


def ingest_environment_visits():
"""
Extract and insert complete visits
for experiments specified in AutomatedExperimentIngestion
"""
experiment_names = AutomatedExperimentIngestion.fetch("experiment_name")
analysis.ingest_environment_visits(experiment_names)


# ---- Define worker(s) ----
# configure a worker to process high-priority tasks
high_priority = DataJointWorker(
"high_priority",
# configure a worker to process `acquisition`-related tasks
acquisition_worker = DataJointWorker(
"acquisition_worker",
worker_schema_name=worker_schema_name,
db_prefix=db_prefix,
run_duration=-1,
sleep_duration=600,
)
high_priority(load_metadata.ingest_subject)
high_priority(acquisition.Epoch.ingest_epochs, experiment_name=_current_experiment)
high_priority(acquisition.Chunk.ingest_chunks, experiment_name=_current_experiment)
high_priority(acquisition.ExperimentLog)
high_priority(acquisition.SubjectEnterExit)
high_priority(acquisition.SubjectWeight)
high_priority(acquisition.FoodPatchEvent)
high_priority(acquisition.WheelState)
high_priority(acquisition.WeightMeasurement)
high_priority(acquisition.WeightMeasurementFiltered)

high_priority(
analysis.ingest_environment_visits, experiment_names=[_current_experiment]
sleep_duration=1200,
)
acquisition_worker(ingest_colony_epochs_chunks)
acquisition_worker(acquisition.ExperimentLog)
acquisition_worker(acquisition.SubjectEnterExit)
acquisition_worker(acquisition.SubjectWeight)
acquisition_worker(acquisition.FoodPatchEvent)
acquisition_worker(acquisition.WheelState)

acquisition_worker(ingest_environment_visits)

# configure a worker to process mid-priority tasks
mid_priority = DataJointWorker(
"mid_priority",
worker_schema_name=worker_schema_name,
db_prefix=db_prefix,
run_duration=-1,
sleep_duration=120,
sleep_duration=3600,
)

mid_priority(qc.CameraQC)
mid_priority(tracking.CameraTracking)
mid_priority(acquisition.FoodPatchWheel)
mid_priority(acquisition.WeightMeasurement)
mid_priority(acquisition.WeightMeasurementFiltered)

mid_priority(
analysis.visit.ingest_environment_visits, experiment_names=[_current_experiment]
)
mid_priority(analysis.OverlapVisit)

mid_priority(analysis.VisitSubjectPosition)
Expand All @@ -93,10 +121,10 @@
"streams_worker",
worker_schema_name=worker_schema_name,
db_prefix=db_prefix,
run_duration=1,
sleep_duration=600,
run_duration=-1,
sleep_duration=1200,
)

for attr in vars(streams).values():
if is_djtable(attr) and hasattr(attr, "populate"):
streams_worker(attr)
if is_djtable(attr, dj.user_tables.AutoPopulate):
streams_worker(attr, max_calls=10)
Loading

0 comments on commit 0399603

Please sign in to comment.