Skip to content

Commit

Permalink
Merge branch 'edge' into EXEC-655-store-commands-error-list-in-db
Browse files Browse the repository at this point in the history
  • Loading branch information
TamarZanzouri authored Nov 6, 2024
2 parents e29d95f + d0af9a1 commit c275c35
Show file tree
Hide file tree
Showing 150 changed files with 4,487 additions and 739 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -363,3 +363,4 @@ def run(
folder_name = args.folder_name[0]
google_sheet_name = args.google_sheet_name[0]
email = args.email[0]
run(storage_directory, folder_name, google_sheet_name, email)
15 changes: 10 additions & 5 deletions abr-testing/abr_testing/data_collection/read_robot_logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ def identify_labware_ids(
file_results: Dict[str, Any], labware_name: Optional[str]
) -> List[str]:
"""Determine what type of labware is being picked up."""
list_of_labware_ids: List[str] = []
if labware_name:
labwares = file_results.get("labware", "")
list_of_labware_ids = []
Expand Down Expand Up @@ -341,8 +342,9 @@ def hs_commands(file_results: Dict[str, Any]) -> Dict[str, float]:
)
if temp_time is not None and deactivate_time is None:
# If heater shaker module is not deactivated, protocol completedAt time stamp used.
default = commandData[len(commandData) - 1].get("completedAt")
protocol_end = datetime.strptime(
file_results.get("completedAt", ""), "%Y-%m-%dT%H:%M:%S.%f%z"
file_results.get("completedAt", default), "%Y-%m-%dT%H:%M:%S.%f%z"
)
temp_duration = (protocol_end - temp_time).total_seconds()
hs_temps[hs_temp] = hs_temps.get(hs_temp, 0.0) + temp_duration
Expand Down Expand Up @@ -389,8 +391,9 @@ def temperature_module_commands(file_results: Dict[str, Any]) -> Dict[str, Any]:
tm_temps[tm_temp] = tm_temps.get(tm_temp, 0.0) + temp_duration
if temp_time is not None and deactivate_time is None:
# If temperature module is not deactivated, protocol completedAt time stamp used.
default = commandData[len(commandData) - 1].get("completedAt")
protocol_end = datetime.strptime(
file_results.get("completedAt", ""), "%Y-%m-%dT%H:%M:%S.%f%z"
file_results.get("completedAt", default), "%Y-%m-%dT%H:%M:%S.%f%z"
)
temp_duration = (protocol_end - temp_time).total_seconds()
tm_temps[tm_temp] = tm_temps.get(tm_temp, 0.0) + temp_duration
Expand Down Expand Up @@ -473,15 +476,17 @@ def thermocycler_commands(file_results: Dict[str, Any]) -> Dict[str, float]:
block_temps[block_temp] = block_temps.get(block_temp, 0.0) + block_time
if block_on_time is not None and block_off_time is None:
# If thermocycler block not deactivated protocol completedAt time stamp used
default = commandData[len(commandData) - 1].get("completedAt")
protocol_end = datetime.strptime(
file_results.get("completedAt", ""), "%Y-%m-%dT%H:%M:%S.%f%z"
file_results.get("completedAt", default), "%Y-%m-%dT%H:%M:%S.%f%z"
)
temp_duration = (protocol_end - block_on_time).total_seconds()
block_temps[block_temp] = block_temps.get(block_temp, 0.0) + temp_duration

if lid_on_time is not None and lid_off_time is None:
# If thermocycler lid not deactivated protocol completedAt time stamp used
default = commandData[len(commandData) - 1].get("completedAt")
protocol_end = datetime.strptime(
file_results.get("completedAt", ""), "%Y-%m-%dT%H:%M:%S.%f%z"
file_results.get("completedAt", default), "%Y-%m-%dT%H:%M:%S.%f%z"
)
temp_duration = (protocol_end - lid_on_time).total_seconds()
lid_temps[lid_temp] = block_temps.get(lid_temp, 0.0) + temp_duration
Expand Down
8 changes: 4 additions & 4 deletions abr-testing/abr_testing/protocol_simulation/abr_sim_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
from pathlib import Path


def run(file_to_simulate: str) -> None:
def run(file_to_simulate: Path) -> None:
"""Simulate protocol and raise errors."""
protocol_name = Path(file_to_simulate).stem
protocol_name = file_to_simulate.stem
try:
simulation_metrics.main(file_to_simulate, False)
except Exception:
Expand All @@ -29,6 +29,6 @@ def run(file_to_simulate: str) -> None:
if file.endswith(".py"): # If it's a Python file
if file in exclude:
continue
file_path = os.path.join(root, file)
print(f"Simulating protocol: {file_path}")
file_path = Path(os.path.join(root, file))
print(f"Simulating protocol: {file_path.stem}")
run(file_path)
103 changes: 75 additions & 28 deletions abr-testing/abr_testing/protocol_simulation/simulation_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ def parse_results_volume(
"Right Pipette Total Aspirates",
"Right Pipette Total Dispenses",
"Gripper Pick Ups",
"Gripper Pick Ups of opentrons_tough_pcr_auto_sealing_lid",
"Total Liquid Probes",
"Average Liquid Probe Time (sec)",
]
Expand Down Expand Up @@ -302,6 +303,7 @@ def parse_results_volume(
total_time_row.append(str(end_time - start_time))

for metric in metrics:
print(f"Dictionary: {metric}\n\n")
for cmd in metric.keys():
values_row.append(str(metric[cmd]))
return (
Expand All @@ -320,15 +322,16 @@ def parse_results_volume(


def main(
protocol_file_path_name: str,
protocol_file_path: Path,
save: bool,
storage_directory: str = os.curdir,
google_sheet_name: str = "",
parameters: List[str] = [],
) -> None:
"""Main module control."""
sys.exit = mock_exit # Replace sys.exit with the mock function
# Read file path from arguments
protocol_file_path = Path(protocol_file_path_name)
# protocol_file_path = Path(protocol_file_path_name)
protocol_name = protocol_file_path.stem
print("Simulating", protocol_name)
file_date = datetime.now()
Expand All @@ -344,26 +347,58 @@ def main(
)
json_file_output = open(json_file_path, "wb+")
# log_output_file = f"{protocol_name}_log"
ctx.invoke(
analyze,
files=[protocol_file_path],
json_output=json_file_output,
human_json_output=None,
log_output=error_output,
log_level="ERROR",
check=False,
)
if parameters:
print(f"Parameter: {parameters[0]}\n")
csv_params = {}
csv_params["parameters_csv"] = parameters[0]
rtp_json = json.dumps(csv_params)
ctx.invoke(
analyze,
files=[protocol_file_path],
rtp_files=rtp_json,
json_output=json_file_output,
human_json_output=None,
log_output=error_output,
log_level="ERROR",
check=False,
)

else:
ctx.invoke(
analyze,
files=[protocol_file_path],
json_output=json_file_output,
human_json_output=None,
log_output=error_output,
log_level="ERROR",
check=False,
)
json_file_output.close()
else:
ctx.invoke(
analyze,
files=[protocol_file_path],
json_output=None,
human_json_output=None,
log_output=error_output,
log_level="ERROR",
check=True,
)
if parameters:
csv_params = {}
csv_params["parameters_csv"] = parameters[0]
rtp_json = json.dumps(csv_params)
ctx.invoke(
analyze,
files=[protocol_file_path],
rtp_files=rtp_json,
json_output=None,
human_json_output=None,
log_output=error_output,
log_level="ERROR",
check=True,
)
else:
ctx.invoke(
analyze,
files=[protocol_file_path],
json_output=None,
human_json_output=None,
log_output=error_output,
log_level="ERROR",
check=True,
)

except SystemExit as e:
print(f"SystemExit caught with code: {e}")
Expand Down Expand Up @@ -395,6 +430,7 @@ def main(
credentials_path, google_sheet_name, 0
)
google_sheet.write_to_row([])

for row in parse_results_volume(
json_file_path,
protocol_name,
Expand Down Expand Up @@ -428,13 +464,15 @@ def main(
"protocol_file_path",
metavar="PROTOCOL_FILE_PATH",
type=str,
nargs=1,
nargs="*",
help="Path to protocol file",
)
args = parser.parse_args()
storage_directory = args.storage_directory[0]
sheet_name = args.sheet_name[0]
protocol_file_path: str = args.protocol_file_path[0]
parameters: List[str] = args.protocol_file_path[1:]
print(parameters)
SETUP = True
while SETUP:
print(
Expand All @@ -445,7 +483,7 @@ def main(
choice = ""
while not choice:
choice = input(
"Remove air_gap commands to ensure accurate results? (Y/N): "
"Remove air_gap commands to ensure accurate results: (continue)? (Y/N): "
)
if choice.upper() == "Y":
SETUP = False
Expand All @@ -462,11 +500,20 @@ def main(
# Change api level
if CLEAN_PROTOCOL:
set_api_level(protocol_file_path)
main(
protocol_file_path,
True,
storage_directory,
sheet_name,
)
if parameters:
main(
Path(protocol_file_path),
True,
storage_directory,
sheet_name,
parameters=parameters,
)
else:
main(
protocol_file_path=Path(protocol_file_path),
save=True,
storage_directory=storage_directory,
google_sheet_name=sheet_name,
)
else:
sys.exit(0)
73 changes: 24 additions & 49 deletions api/src/opentrons/protocol_api/_liquid.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
from __future__ import annotations

from dataclasses import dataclass
from typing import Optional, Sequence
from typing import Optional, Dict

from opentrons_shared_data.liquid_classes.liquid_class_definition import (
LiquidClassSchemaV1,
AspirateProperties,
SingleDispenseProperties,
MultiDispenseProperties,
ByPipetteSetting,
ByTipTypeSetting,
)

from ._liquid_properties import (
TransferProperties,
build_transfer_properties,
)


Expand All @@ -29,46 +31,29 @@ class Liquid:
display_color: Optional[str]


# TODO (spp, 2024-10-17): create PAPI-equivalent types for all the properties
# and have validation on value updates with user-facing error messages
@dataclass
class TransferProperties:
_aspirate: AspirateProperties
_dispense: SingleDispenseProperties
_multi_dispense: Optional[MultiDispenseProperties]

@property
def aspirate(self) -> AspirateProperties:
"""Aspirate properties."""
return self._aspirate

@property
def dispense(self) -> SingleDispenseProperties:
"""Single dispense properties."""
return self._dispense

@property
def multi_dispense(self) -> Optional[MultiDispenseProperties]:
"""Multi dispense properties."""
return self._multi_dispense


@dataclass
class LiquidClass:
"""A data class that contains properties of a specific class of liquids."""

_name: str
_display_name: str
_by_pipette_setting: Sequence[ByPipetteSetting]
_by_pipette_setting: Dict[str, Dict[str, TransferProperties]]

@classmethod
def create(cls, liquid_class_definition: LiquidClassSchemaV1) -> "LiquidClass":
"""Liquid class factory method."""

by_pipette_settings: Dict[str, Dict[str, TransferProperties]] = {}
for by_pipette in liquid_class_definition.byPipette:
tip_settings: Dict[str, TransferProperties] = {}
for tip_type in by_pipette.byTipType:
tip_settings[tip_type.tiprack] = build_transfer_properties(tip_type)
by_pipette_settings[by_pipette.pipetteModel] = tip_settings

return cls(
_name=liquid_class_definition.liquidClassName,
_display_name=liquid_class_definition.displayName,
_by_pipette_setting=liquid_class_definition.byPipette,
_by_pipette_setting=by_pipette_settings,
)

@property
Expand All @@ -81,26 +66,16 @@ def display_name(self) -> str:

def get_for(self, pipette: str, tiprack: str) -> TransferProperties:
"""Get liquid class transfer properties for the specified pipette and tip."""
settings_for_pipette: Sequence[ByPipetteSetting] = [
pip_setting
for pip_setting in self._by_pipette_setting
if pip_setting.pipetteModel == pipette
]
if len(settings_for_pipette) == 0:
try:
settings_for_pipette = self._by_pipette_setting[pipette]
except KeyError:
raise ValueError(
f"No properties found for {pipette} in {self._name} liquid class"
)
settings_for_tip: Sequence[ByTipTypeSetting] = [
tip_setting
for tip_setting in settings_for_pipette[0].byTipType
if tip_setting.tiprack == tiprack
]
if len(settings_for_tip) == 0:
try:
transfer_properties = settings_for_pipette[tiprack]
except KeyError:
raise ValueError(
f"No properties found for {tiprack} in {self._name} liquid class"
)
return TransferProperties(
_aspirate=settings_for_tip[0].aspirate,
_dispense=settings_for_tip[0].singleDispense,
_multi_dispense=settings_for_tip[0].multiDispense,
)
return transfer_properties
Loading

0 comments on commit c275c35

Please sign in to comment.