Skip to content

Commit

Permalink
add stream object
Browse files Browse the repository at this point in the history
  • Loading branch information
JaerongA committed Jun 29, 2023
1 parent ef71dc8 commit 0c5ec8c
Showing 1 changed file with 25 additions and 18 deletions.
43 changes: 25 additions & 18 deletions aeon/dj_pipeline/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,12 +125,12 @@ def get_device_stream_template(device_type: str, stream_type: str):
stream = reader(**stream_detail["stream_reader_kwargs"])

table_definition = f""" # Raw per-chunk {stream_type} data stream from {device_type} (auto-generated with aeon_mecha-{aeon.__version__})
-> {device_type}
-> acquisition.Chunk
---
sample_count: int # number of data points acquired from this stream for a given chunk
timestamps: longblob # (datetime) timestamps of {stream_type} data
"""
-> {device_type}
-> acquisition.Chunk
---
sample_count: int # number of data points acquired from this stream for a given chunk
timestamps: longblob # (datetime) timestamps of {stream_type} data
"""

for col in stream.columns:
if col.startswith("_"):
Expand Down Expand Up @@ -213,7 +213,7 @@ def main(context=None):
context[table_class.__name__] = table_class
schema(table_class, context=context)

device_table_def = inspect.getsource(table_class)
device_table_def = inspect.getsource(table_class).lstrip()
replacements = {
"ExperimentDevice": device_info["device_type"],
"{device_title}": dj.utils.from_camel_case(device_info["device_type"]),
Expand All @@ -222,7 +222,7 @@ def main(context=None):
}
for old, new in replacements.items():
device_table_def = device_table_def.replace(old, new)
full_def = "\t@schema \n" + device_table_def + "\n\n"
full_def = "@schema \n" + device_table_def + "\n\n"
if os.path.exists("existing_module.py"):
with open("existing_module.py", "r") as f:
existing_content = f.read()
Expand All @@ -234,7 +234,7 @@ def main(context=None):
f.write(full_def)
else:
with open("existing_module.py", "w") as f:
full_def = """import datajoint as dj\nimport pandas as pd\nfrom aeon.dj_pipeline import acquisition\nfrom aeon.io import api as io_api\n\n""" + full_def
full_def = """import datajoint as dj\nimport pandas as pd\n\nimport aeon\nfrom aeon.dj_pipeline import acquisition\nfrom aeon.io import api as io_api\n\n""" + full_def
f.write(full_def)

# Create DeviceDataStream tables.
Expand All @@ -250,15 +250,19 @@ def main(context=None):
context[table_class.__name__] = table_class
schema(table_class, context=context)

device_stream_table_def = inspect.getsource(table_class)
stream_obj = table_class.__dict__["_stream_reader"]
reader = stream_obj.__module__ + '.' + stream_obj.__name__
stream_detail = table_class.__dict__["_stream_detail"]

device_stream_table_def = inspect.getsource(table_class).lstrip()

old_definition = f""" # Raw per-chunk {stream_type} data stream from {device_type} (auto-generated with aeon_mecha-{aeon.__version__})
-> {device_type}
-> acquisition.Chunk
---
sample_count: int # number of data points acquired from this stream for a given chunk
timestamps: longblob # (datetime) timestamps of {stream_type} data
"""
old_definition = f"""# Raw per-chunk {stream_type} data stream from {device_type} (auto-generated with aeon_mecha-{aeon.__version__})
-> {device_type}
-> acquisition.Chunk
---
sample_count: int # number of data points acquired from this stream for a given chunk
timestamps: longblob # (datetime) timestamps of {stream_type} data
"""

replacements = {
"DeviceDataStream": f"{device_type}{stream_type}","ExperimentDevice": device_type,
Expand All @@ -276,8 +280,11 @@ def main(context=None):

for old, new in replacements.items():
device_stream_table_def = device_stream_table_def.replace(old, new)

device_stream_table_def = re.sub(r'_stream_reader\s*=\s*reader', f'_stream_reader = {reader}', device_stream_table_def) # insert reader
device_stream_table_def = re.sub(r'_stream_detail\s*=\s*stream_detail', f'_stream_detail = {stream_detail}', device_stream_table_def) # insert stream details

full_def = "\t@schema \n" + device_stream_table_def + "\n\n"
full_def = "@schema \n" + device_stream_table_def + "\n\n"

with open("existing_module.py", "r") as f:
existing_content = f.read()
Expand Down

0 comments on commit 0c5ec8c

Please sign in to comment.