Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat (abr-testing): simulate protocols with csv parameter #16685

Merged
merged 3 commits into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 15 additions & 14 deletions abr-testing/abr_testing/data_collection/read_robot_logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
"""
import csv
import subprocess
from datetime import datetime
from datetime import datetime, timezone
import os
from abr_testing.data_collection.error_levels import ERROR_LEVELS_PATH
from typing import List, Dict, Any, Tuple, Set, Optional
Expand Down Expand Up @@ -110,6 +110,7 @@ def identify_labware_ids(
file_results: Dict[str, Any], labware_name: Optional[str]
) -> List[str]:
"""Determine what type of labware is being picked up."""
list_of_labware_ids = []
if labware_name:
labwares = file_results.get("labware", "")
list_of_labware_ids = []
Expand Down Expand Up @@ -341,9 +342,9 @@ def hs_commands(file_results: Dict[str, Any]) -> Dict[str, float]:
)
if temp_time is not None and deactivate_time is None:
# If heater shaker module is not deactivated, protocol completedAt time stamp used.
protocol_end = datetime.strptime(
file_results.get("completedAt", ""), "%Y-%m-%dT%H:%M:%S.%f%z"
)
commands = file_results.get("commands")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

commands is already defined on line 292 of this function. remove line 345 and use variable commandData

default = commands[len(commands) - 1].get("completedAt")
protocol_end = datetime.strptime(file_results.get("completedAt", default), "%Y-%m-%dT%H:%M:%S.%f%z")
temp_duration = (protocol_end - temp_time).total_seconds()
hs_temps[hs_temp] = hs_temps.get(hs_temp, 0.0) + temp_duration
hs_latch_sets = hs_latch_count / 2 # one set of open/close
Expand Down Expand Up @@ -389,9 +390,9 @@ def temperature_module_commands(file_results: Dict[str, Any]) -> Dict[str, Any]:
tm_temps[tm_temp] = tm_temps.get(tm_temp, 0.0) + temp_duration
if temp_time is not None and deactivate_time is None:
# If temperature module is not deactivated, protocol completedAt time stamp used.
protocol_end = datetime.strptime(
file_results.get("completedAt", ""), "%Y-%m-%dT%H:%M:%S.%f%z"
)
commands = file_results.get("commands")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove line 393 since commands is defined as commandData on line 370

default = commands[len(commands) - 1].get("completedAt")
protocol_end = datetime.strptime(file_results.get("completedAt", default), "%Y-%m-%dT%H:%M:%S.%f%z")
temp_duration = (protocol_end - temp_time).total_seconds()
tm_temps[tm_temp] = tm_temps.get(tm_temp, 0.0) + temp_duration
tm_total_temp_time = sum(tm_temps.values())
Expand Down Expand Up @@ -473,16 +474,16 @@ def thermocycler_commands(file_results: Dict[str, Any]) -> Dict[str, float]:
block_temps[block_temp] = block_temps.get(block_temp, 0.0) + block_time
if block_on_time is not None and block_off_time is None:
# If thermocycler block not deactivated protocol completedAt time stamp used
protocol_end = datetime.strptime(
file_results.get("completedAt", ""), "%Y-%m-%dT%H:%M:%S.%f%z"
)
commands = file_results.get("commands")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove line 477 since commands is defined as commandData on line 410

default = commands[len(commands) - 1].get("completedAt")
protocol_end = datetime.strptime(file_results.get("completedAt", default), "%Y-%m-%dT%H:%M:%S.%f%z")
temp_duration = (protocol_end - block_on_time).total_seconds()
block_temps[block_temp] = block_temps.get(block_temp, 0.0) + temp_duration

if lid_on_time is not None and lid_off_time is None:
# If thermocycler lid not deactivated protocol completedAt time stamp used
protocol_end = datetime.strptime(
file_results.get("completedAt", ""), "%Y-%m-%dT%H:%M:%S.%f%z"
)
commands = file_results.get("commands")
default = commands[len(commands) - 1].get("completedAt")
protocol_end = datetime.strptime(file_results.get("completedAt", default), "%Y-%m-%dT%H:%M:%S.%f%z")
temp_duration = (protocol_end - lid_on_time).total_seconds()
lid_temps[lid_temp] = block_temps.get(lid_temp, 0.0) + temp_duration

Expand Down
107 changes: 77 additions & 30 deletions abr-testing/abr_testing/protocol_simulation/simulation_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from datetime import datetime
from abr_testing.automation import google_sheets_tool
from abr_testing.data_collection import read_robot_logs
from typing import Any, Tuple, List, Dict, Union, NoReturn
from typing import Any, Tuple, List, Dict, Union, NoReturn, Optional
from abr_testing.tools import plate_reader


Expand Down Expand Up @@ -241,6 +241,7 @@ def parse_results_volume(
"Right Pipette Total Aspirates",
"Right Pipette Total Dispenses",
"Gripper Pick Ups",
"Gripper Pick Ups of None",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is supposed to be Gripper Pick Ups of opentrons_tough_pcr_auto_sealing_lid

"Total Liquid Probes",
"Average Liquid Probe Time (sec)",
]
Expand Down Expand Up @@ -302,6 +303,7 @@ def parse_results_volume(
total_time_row.append(str(end_time - start_time))

for metric in metrics:
print(f"Dictionary: {metric}\n\n")
for cmd in metric.keys():
values_row.append(str(metric[cmd]))
return (
Expand All @@ -320,15 +322,16 @@ def parse_results_volume(


def main(
protocol_file_path_name: str,
protocol_file_path: Path,
save: bool,
storage_directory: str = os.curdir,
google_sheet_name: str = "",
parameters: List[str] = None
) -> None:
"""Main module control."""
sys.exit = mock_exit # Replace sys.exit with the mock function
# Read file path from arguments
protocol_file_path = Path(protocol_file_path_name)
# protocol_file_path = Path(protocol_file_path_name)
protocol_name = protocol_file_path.stem
print("Simulating", protocol_name)
file_date = datetime.now()
Expand All @@ -344,26 +347,58 @@ def main(
)
json_file_output = open(json_file_path, "wb+")
# log_output_file = f"{protocol_name}_log"
ctx.invoke(
analyze,
files=[protocol_file_path],
json_output=json_file_output,
human_json_output=None,
log_output=error_output,
log_level="ERROR",
check=False,
)
if parameters:
print(f"Parameter: {parameters[0]}\n")
csv_params = {}
csv_params['parameters_csv'] = parameters[0]
rtp_json = json.dumps(csv_params)
ctx.invoke(
analyze,
files=[protocol_file_path],
rtp_files=rtp_json,
json_output=json_file_output,
human_json_output=None,
log_output=error_output,
log_level="ERROR",
check=False,
)

else:
ctx.invoke(
analyze,
files=[protocol_file_path],
json_output=json_file_output,
human_json_output=None,
log_output=error_output,
log_level="ERROR",
check=False,
)
json_file_output.close()
else:
ctx.invoke(
analyze,
files=[protocol_file_path],
json_output=None,
human_json_output=None,
log_output=error_output,
log_level="ERROR",
check=True,
)
if parameters:
csv_params = {}
csv_params['parameters_csv'] = parameters[0]
rtp_json = json.dumps(csv_params)
ctx.invoke(
analyze,
files=[protocol_file_path],
rtp_files = rtp_json,
json_output=None,
human_json_output=None,
log_output=error_output,
log_level="ERROR",
check=True,
)
else:
ctx.invoke(
analyze,
files=[protocol_file_path],
json_output=None,
human_json_output=None,
log_output=error_output,
log_level="ERROR",
check=True,
)

except SystemExit as e:
print(f"SystemExit caught with code: {e}")
Expand Down Expand Up @@ -395,6 +430,7 @@ def main(
credentials_path, google_sheet_name, 0
)
google_sheet.write_to_row([])

for row in parse_results_volume(
json_file_path,
protocol_name,
Expand Down Expand Up @@ -428,13 +464,15 @@ def main(
"protocol_file_path",
metavar="PROTOCOL_FILE_PATH",
type=str,
nargs=1,
nargs='*',
help="Path to protocol file",
)
args = parser.parse_args()
storage_directory = args.storage_directory[0]
sheet_name = args.sheet_name[0]
protocol_file_path: str = args.protocol_file_path[0]
protocol_file_path: Path = Path(args.protocol_file_path[0])
parameters: List[str] = args.protocol_file_path[1:]
print(parameters)
SETUP = True
while SETUP:
print(
Expand All @@ -445,7 +483,7 @@ def main(
choice = ""
while not choice:
choice = input(
"Remove air_gap commands to ensure accurate results? (Y/N): "
"Remove air_gap commands to ensure accurate results: (continue)? (Y/N): "
)
if choice.upper() == "Y":
SETUP = False
Expand All @@ -462,11 +500,20 @@ def main(
# Change api level
if CLEAN_PROTOCOL:
set_api_level(protocol_file_path)
main(
protocol_file_path,
True,
storage_directory,
sheet_name,
)
if parameters:
main(
protocol_file_path,
True,
storage_directory,
sheet_name,
parameters=parameters
)
else:
main(
protocol_file_path=protocol_file_path,
save=True,
storage_directory=storage_directory,
google_sheet_name=sheet_name,
)
else:
sys.exit(0)
Loading