From a2e8ac74439657d4421655ae33a57ee43d5b057f Mon Sep 17 00:00:00 2001 From: rclarke0 Date: Tue, 1 Oct 2024 17:10:52 -0400 Subject: [PATCH] Adding counting for tc lid and plate reader multi read --- .../automation/google_sheets_tool.py | 10 +- .../data_collection/abr_google_drive.py | 6 +- .../data_collection/read_robot_logs.py | 145 ++++++++++++------ 3 files changed, 114 insertions(+), 47 deletions(-) diff --git a/abr-testing/abr_testing/automation/google_sheets_tool.py b/abr-testing/abr_testing/automation/google_sheets_tool.py index beeb141dc9a..3ca3bd38f9b 100644 --- a/abr-testing/abr_testing/automation/google_sheets_tool.py +++ b/abr-testing/abr_testing/automation/google_sheets_tool.py @@ -172,7 +172,15 @@ def update_cell( self, sheet_title: str, row: int, column: int, single_data: Any ) -> Tuple[int, int, Any]: """Update ONE individual cell according to a row and column.""" - self.spread_sheet.worksheet(sheet_title).update_cell(row, column, single_data) + try: + self.spread_sheet.worksheet(sheet_title).update_cell( + row, column, single_data + ) + except gspread.exceptions.APIError: + t.sleep(30) + self.spread_sheet.worksheet(sheet_title).update_cell( + row, column, single_data + ) return row, column, single_data def get_all_data( diff --git a/abr-testing/abr_testing/data_collection/abr_google_drive.py b/abr-testing/abr_testing/data_collection/abr_google_drive.py index 4556b62800b..e1924e3c53e 100644 --- a/abr-testing/abr_testing/data_collection/abr_google_drive.py +++ b/abr-testing/abr_testing/data_collection/abr_google_drive.py @@ -60,7 +60,7 @@ def create_data_dictionary( print(f"Run {run_id} is incomplete. Skipping run.") continue if run_id in runs_to_save: - print("started reading run.") + print(f"started reading run {run_id}.") robot = file_results.get("robot_name") protocol_name = file_results["protocol"]["metadata"].get("protocolName", "") software_version = file_results.get("API_Version", "") @@ -114,7 +114,9 @@ def create_data_dictionary( tc_dict = read_robot_logs.thermocycler_commands(file_results) hs_dict = read_robot_logs.hs_commands(file_results) tm_dict = read_robot_logs.temperature_module_commands(file_results) - pipette_dict = read_robot_logs.instrument_commands(file_results) + pipette_dict = read_robot_logs.instrument_commands( + file_results, labware_name="opentrons_tough_pcr_auto_sealing_lid" + ) plate_reader_dict = read_robot_logs.plate_reader_commands( file_results, hellma_plate_standards ) diff --git a/abr-testing/abr_testing/data_collection/read_robot_logs.py b/abr-testing/abr_testing/data_collection/read_robot_logs.py index 3501a330a70..224dc58a22b 100644 --- a/abr-testing/abr_testing/data_collection/read_robot_logs.py +++ b/abr-testing/abr_testing/data_collection/read_robot_logs.py @@ -9,7 +9,7 @@ from datetime import datetime import os from abr_testing.data_collection.error_levels import ERROR_LEVELS_PATH -from typing import List, Dict, Any, Tuple, Set +from typing import List, Dict, Any, Tuple, Set, Optional import time as t import json import requests @@ -107,7 +107,44 @@ def count_command_in_run_data( return total_command, avg_time -def instrument_commands(file_results: Dict[str, Any]) -> Dict[str, float]: +def identify_labware_ids( + file_results: Dict[str, Any], labware_name: Optional[str] +) -> List[str]: + """Determine what type of labware is being picked up.""" + if labware_name: + labwares = file_results.get("labware", "") + list_of_labware_ids = [] + if len(labwares) > 1: + for labware in labwares: + load_name = labware["loadName"] + if load_name == labware_name: + labware_id = labware["id"] + list_of_labware_ids.append(labware_id) + return list_of_labware_ids + + +def match_pipette_to_action( + command_dict: Dict[str, Any], + commandTypes: List[str], + right_pipette: Optional[str], + left_pipette: Optional[str], +) -> Tuple[int, int]: + """Match pipette id to id in command.""" + right_pipette_add = 0 + left_pipette_add = 0 + for command in commandTypes: + command_type = command_dict["commandType"] + command_pipette = command_dict.get("pipetteId", "") + if command_type == command and command_pipette == right_pipette: + right_pipette_add = 1 + elif command_type == command and command_pipette == left_pipette: + left_pipette_add = 1 + return left_pipette_add, right_pipette_add + + +def instrument_commands( + file_results: Dict[str, Any], labware_name: Optional[str] +) -> Dict[str, float]: """Count number of pipette and gripper commands per run.""" pipettes = file_results.get("pipettes", "") commandData = file_results.get("commands", "") @@ -120,7 +157,9 @@ def instrument_commands(file_results: Dict[str, Any]) -> Dict[str, float]: right_pipette_id = "" left_pipette_id = "" gripper_pickups = 0.0 + gripper_labware_of_interest = 0.0 avg_liquid_probe_time_sec = 0.0 + list_of_labware_ids = identify_labware_ids(file_results, labware_name) # Match pipette mount to id for pipette in pipettes: if pipette["mount"] == "right": @@ -128,30 +167,34 @@ def instrument_commands(file_results: Dict[str, Any]) -> Dict[str, float]: elif pipette["mount"] == "left": left_pipette_id = pipette["id"] for command in commandData: - commandType = command["commandType"] - # Count tip pick ups - if commandType == "pickUpTip": - if command["params"].get("pipetteId", "") == right_pipette_id: - right_tip_pick_up += 1 - elif command["params"].get("pipetteId", "") == left_pipette_id: - left_tip_pick_up += 1 + # Count pick ups + single_left_pickup, single_right_pickup = match_pipette_to_action( + command, ["pickUpTip"], right_pipette_id, left_pipette_id + ) + right_tip_pick_up += single_right_pickup + left_tip_pick_up += single_left_pickup # Count aspirates - elif commandType == "aspirate": - if command["params"].get("pipetteId", "") == right_pipette_id: - right_aspirate += 1 - elif command["params"].get("pipetteId", "") == left_pipette_id: - left_aspirate += 1 + single_left_aspirate, single_right_aspirate = match_pipette_to_action( + command, ["aspirate"], right_pipette_id, left_pipette_id + ) + right_aspirate += single_right_aspirate + left_aspirate += single_left_aspirate # count dispenses/blowouts - elif commandType == "dispense" or commandType == "blowout": - if command["params"].get("pipetteId", "") == right_pipette_id: - right_dispense += 1 - elif command["params"].get("pipetteId", "") == left_pipette_id: - left_dispense += 1 - elif ( + single_left_dispense, single_right_dispense = match_pipette_to_action( + command, ["blowOut", "dispense"], right_pipette_id, left_pipette_id + ) + right_dispense += single_right_dispense + left_dispense += single_left_dispense + # count gripper actions + commandType = command["commandType"] + if ( commandType == "moveLabware" and command["params"]["strategy"] == "usingGripper" ): gripper_pickups += 1 + labware_moving = command["params"]["labwareId"] + if labware_moving in list_of_labware_ids: + gripper_labware_of_interest += 1 liquid_probes, avg_liquid_probe_time_sec = count_command_in_run_data( commandData, "liquidProbe", True ) @@ -163,6 +206,7 @@ def instrument_commands(file_results: Dict[str, Any]) -> Dict[str, float]: "Right Pipette Total Aspirates": right_aspirate, "Right Pipette Total Dispenses": right_dispense, "Gripper Pick Ups": gripper_pickups, + f"Gripper Pick Ups of {labware_name}": gripper_labware_of_interest, "Total Liquid Probes": liquid_probes, "Average Liquid Probe Time (sec)": avg_liquid_probe_time_sec, } @@ -178,11 +222,12 @@ def plate_reader_commands( initialize_count: int = 0 read = "no" final_result = {} - # Count Number of Reads + read_num = 0 + # Count Number of Reads per measure mode read_count, avg_read_time = count_command_in_run_data( commandData, "absorbanceReader/read", True ) - # Count Number of Initializations + # Count Number of Initializations per measure mode initialize_count, avg_initialize_time = count_command_in_run_data( commandData, "absorbanceReader/initialize", True ) @@ -198,28 +243,37 @@ def plate_reader_commands( read = "yes" elif read == "yes" and commandType == "comment": result = command["params"].get("message", "") - wavelength = result.split("result: {")[1].split(":")[0] - wavelength_str = wavelength + ": " - rest_of_string = result.split(wavelength_str)[1][:-1] - result_dict = eval(rest_of_string) - result_ndarray = plate_reader.convert_read_dictionary_to_array(result_dict) - for item in hellma_plate_standards: - wavelength_of_interest = item["wavelength"] - if str(wavelength) == str(wavelength_of_interest): - error_cells = plate_reader.check_byonoy_data_accuracy( - result_ndarray, item, False - ) - if len(error_cells[0]) > 0: - percent = (96 - len(error_cells)) / 96 * 100 - for cell in error_cells: - print("FAIL: Cell " + str(cell) + " out of accuracy spec.") - else: - percent = 100 - print( - f"PASS: {wavelength_of_interest} meet accuracy specification" + formatted_result = result.split("result: ")[1] + result_dict = eval(formatted_result) + result_dict_keys = list(result_dict.keys()) + if len(result_dict_keys) > 1: + read_type = "multi" + else: + read_type = "single" + for wavelength in result_dict_keys: + one_wavelength_dict = result_dict.get(wavelength) + result_ndarray = plate_reader.convert_read_dictionary_to_array( + one_wavelength_dict + ) + for item in hellma_plate_standards: + wavelength_of_interest = item["wavelength"] + if str(wavelength) == str(wavelength_of_interest): + error_cells = plate_reader.check_byonoy_data_accuracy( + result_ndarray, item, False ) - final_result[wavelength] = percent - input("###########################") + if len(error_cells[0]) > 0: + percent = (96 - len(error_cells)) / 96 * 100 + for cell in error_cells: + print( + "FAIL: Cell " + str(cell) + " out of accuracy spec." + ) + else: + percent = 100 + print( + f"PASS: {wavelength_of_interest} meet accuracy specification" + ) + final_result[read_type, wavelength, read_num] = percent + read_num += 1 read = "no" plate_dict = { "Plate Reader # of Reads": read_count, @@ -502,7 +556,10 @@ def get_error_info(file_results: Dict[str, Any]) -> Dict[str, Any]: error_code = error_details.get("errorCode", "") error_instrument = error_details.get("detail", "") # Determine error level - error_level = error_levels.get(error_code, "4") + if end_run_errors > 0: + error_level = error_levels.get(error_code, "4") + else: + error_level = "" # Create dictionary with all error descriptions error_dict = { "Total Recoverable Error(s)": total_recoverable_errors,