From eb39c230a8612a9893ad72f7300f6662672c56e8 Mon Sep 17 00:00:00 2001 From: Ryan Howard Date: Tue, 25 Jul 2023 10:48:54 -0400 Subject: [PATCH] chore(refactor): Refactor and cleanup volumetric scripts (#13071) * Revert "Merge latest volumetric changes (#13100)" This reverts commit 773b8a4a59c43971ce0c9e3763971d0784566765. * Revert "fix(hardware-testing): Remove tips using them for finding liquid height (#13064)" This reverts commit cc0f509b7551b511b8bf608fa37def47bb3729f0. * Revert "support resupplying tipracks during a 96ch increment test run (#13065)" This reverts commit 965f02d1f1a06e36adb498119109e3f9b650910c. * Pull out all of the common functions between gravimetric and photometric * move trial generation out of excecute * combine the load pipette logic * pull gantry speed out of the photometric config * refactor out load tipracks * remove unused method * fix issues from rebasing * fixups from rebase and formating * refactor out trial building from the photometric script * add additional test targets * pull out the common elements of trials into a seperate dataclass * break out even more common code * break building pm report out of the run method * break the run trials loop out of the run method * format/lint * break building test report out of the run method in gravimetric run method * add the ui header to the build_report methods * move all of the dye prep into the dye prep method * improve LiquidTracker initialization * move the homing to execute_trials * refactor out common end script code * refactor out common get_tips * fixup things that broke during rebase * fix the photometric test target * remove evaporation measurement out of run for readablity * break out load scale for readablity * redo: Support resupplying tipracks during a 96ch increment tests * redo: Remove tips using them for finding liquid height * fix the tests * replace all print statements with ui * move the if-else out of the function to decrease the indent * make a base class config * fixup things that were lost in the rebase * construct the trial directory correctly * make sure 96 pipette doesn't collide during gravimetric tests * gracefully handle when the scale is beyond max * reimplement: Merge latest volumetric changes (#13100) * add stable flag to the blank gravimetric trials * Convert the default test volumes to the QC volumes * add default number of trials and correct volumes to match calculator sheet * make blank the default * update makefile to test the correct args for each tip * update increment test values * make single channle QC test work with all tips in one run * last few changes * fixup typing error * forgot a docstring for the linter * support the enviornmental sensor in the volumetric tests * make the asair prompt for port instead of auto finding * fix the tests --- hardware-testing/.flake8 | 4 +- hardware-testing/Makefile | 29 +- hardware-testing/hardware_testing/data/ui.py | 15 +- .../hardware_testing/drivers/__init__.py | 1 - .../hardware_testing/drivers/asair_sensor.py | 22 +- .../drivers/radwag/responses.py | 19 +- .../hardware_testing/gravimetric/__main__.py | 250 +++-- .../hardware_testing/gravimetric/config.py | 146 ++- .../hardware_testing/gravimetric/execute.py | 866 +++++++----------- .../gravimetric/execute_photometric.py | 660 +++++-------- .../hardware_testing/gravimetric/helpers.py | 305 +++++- .../gravimetric/increments.py | 191 ++-- .../gravimetric/liquid_class/defaults.py | 24 +- .../gravimetric/liquid_class/pipetting.py | 4 +- .../gravimetric/liquid_height/height.py | 13 +- .../gravimetric/measurement/__init__.py | 5 +- .../gravimetric/measurement/environment.py | 11 +- .../hardware_testing/gravimetric/tips.py | 12 +- .../hardware_testing/gravimetric/trial.py | 224 +++++ .../drivers/test_asair_sensor.py | 3 +- 20 files changed, 1522 insertions(+), 1282 deletions(-) create mode 100644 hardware-testing/hardware_testing/gravimetric/trial.py diff --git a/hardware-testing/.flake8 b/hardware-testing/.flake8 index a098f87d7e12..058358cad0a7 100644 --- a/hardware-testing/.flake8 +++ b/hardware-testing/.flake8 @@ -5,8 +5,8 @@ max-line-length = 100 # max cyclomatic complexity -# NOTE: (andy s) increasing this from 9 to 20 b/c test scripts often handle all logic in main -max-complexity = 20 +# NOTE: (andy s) increasing this from 9 to 15 b/c test scripts often handle all logic in main +max-complexity = 15 extend-ignore = # ignore E203 because black might reformat it diff --git a/hardware-testing/Makefile b/hardware-testing/Makefile index c06e857033c4..ab41d0b76f86 100755 --- a/hardware-testing/Makefile +++ b/hardware-testing/Makefile @@ -88,24 +88,29 @@ test-cov: .PHONY: test-photometric test-photometric: - $(python) -m hardware_testing.gravimetric --photometric --simulate --pipette 1000 --channels 96 --tip 50 --trials 5 - $(python) -m hardware_testing.gravimetric --photometric --simulate --pipette 1000 --channels 96 --tip 200 --trials 5 + -$(MAKE) apply-patches-gravimetric + $(python) -m hardware_testing.gravimetric --photometric --simulate --pipette 1000 --channels 96 --tip 50 --trials 1 + $(python) -m hardware_testing.gravimetric --photometric --simulate --pipette 1000 --channels 96 --tip 200 --trials 1 + -$(MAKE) remove-patches-gravimetric .PHONY: test-gravimetric-single test-gravimetric-single: - $(python) -m hardware_testing.gravimetric --simulate --pipette 1000 --channels 1 --tip 1000 --trials 1 - $(python) -m hardware_testing.gravimetric --simulate --pipette 1000 --channels 1 --tip 200 --trials 1 - $(python) -m hardware_testing.gravimetric --simulate --pipette 1000 --channels 1 --tip 50 --trials 1 - $(python) -m hardware_testing.gravimetric --simulate --pipette 50 --channels 1 --tip 50 --trials 1 - $(python) -m hardware_testing.gravimetric --simulate --pipette 1000 --channels 1 --tip 1000 --trials 1 --increment + $(python) -m hardware_testing.gravimetric --simulate --pipette 1000 --channels 1 --trials 1 + $(python) -m hardware_testing.gravimetric --simulate --pipette 1000 --channels 1 --extra --no-blank + $(python) -m hardware_testing.gravimetric --simulate --pipette 50 --channels 1 --no-blank + $(python) -m hardware_testing.gravimetric --simulate --pipette 1000 --channels 1 --trials 1 --increment --no-blank .PHONY: test-gravimetric-multi test-gravimetric-multi: - $(python) -m hardware_testing.gravimetric --simulate --pipette 50 --channels 8 --tip 50 --trials 1 - $(python) -m hardware_testing.gravimetric --simulate --pipette 50 --channels 8 --tip 50 --trials 1 --increment - $(python) -m hardware_testing.gravimetric --simulate --pipette 1000 --channels 8 --tip 1000 --trials 1 - $(python) -m hardware_testing.gravimetric --simulate --pipette 1000 --channels 8 --tip 200 --trials 1 - $(python) -m hardware_testing.gravimetric --simulate --pipette 1000 --channels 8 --tip 50 --trials 1 + $(python) -m hardware_testing.gravimetric --simulate --pipette 50 --channels 8 --tip 50 --trials 1 --no-blank + $(python) -m hardware_testing.gravimetric --simulate --pipette 50 --channels 8 --tip 50 --trials 1 --increment --no-blank + $(python) -m hardware_testing.gravimetric --simulate --pipette 1000 --channels 8 --tip 1000 --trials 1 --no-blank + $(python) -m hardware_testing.gravimetric --simulate --pipette 1000 --channels 8 --tip 200 --trials 1 --extra --no-blank + $(python) -m hardware_testing.gravimetric --simulate --pipette 1000 --channels 8 --tip 50 --trials 1 --no-blank + +.PHONY: test-gravimetric-96 +test-gravimetric-96: + $(python) -m hardware_testing.gravimetric --simulate --pipette 1000 --channels 96 --tip 1000 --trials 1 --no-blank .PHONY: test-gravimetric test-gravimetric: diff --git a/hardware-testing/hardware_testing/data/ui.py b/hardware-testing/hardware_testing/data/ui.py index f1e81708afee..3b33c18b52c1 100644 --- a/hardware-testing/hardware_testing/data/ui.py +++ b/hardware-testing/hardware_testing/data/ui.py @@ -1,5 +1,6 @@ """Production QC User Interface.""" - +from opentrons.hardware_control import SyncHardwareAPI +from opentrons.hardware_control.types import StatusBarState PRINT_HEADER_NUM_SPACES = 4 PRINT_HEADER_DASHES = "-" * PRINT_HEADER_NUM_SPACES @@ -25,6 +26,13 @@ def get_user_ready(message: str) -> None: input(f"WAIT: {message}, press ENTER when ready: ") +def alert_user_ready(message: str, hw: SyncHardwareAPI) -> None: + """Flash the ui lights on the ot3 and then use the get_user_ready.""" + hw.set_status_bar_state(StatusBarState.PAUSED) + get_user_ready(message) + hw.set_status_bar_state(StatusBarState.CONFIRMATION) + + def print_title(title: str) -> None: """Print title.""" """ @@ -54,3 +62,8 @@ def print_header(header: str) -> None: def print_error(message: str) -> None: """Print error.""" print(f"ERROR: {message}") + + +def print_info(message: str) -> None: + """Print information.""" + print(message) diff --git a/hardware-testing/hardware_testing/drivers/__init__.py b/hardware-testing/hardware_testing/drivers/__init__.py index a34e04ae8b23..080746c79529 100644 --- a/hardware-testing/hardware_testing/drivers/__init__.py +++ b/hardware-testing/hardware_testing/drivers/__init__.py @@ -2,7 +2,6 @@ from serial.tools.list_ports import comports # type: ignore[import] from .radwag import RadwagScaleBase, RadwagScale, SimRadwagScale -from .asair_sensor import AsairSensor, AsairSensorError def list_ports_and_select(device_name: str = "") -> str: diff --git a/hardware-testing/hardware_testing/drivers/asair_sensor.py b/hardware-testing/hardware_testing/drivers/asair_sensor.py index bf7a9aca4d48..be1d99376671 100644 --- a/hardware-testing/hardware_testing/drivers/asair_sensor.py +++ b/hardware-testing/hardware_testing/drivers/asair_sensor.py @@ -7,14 +7,14 @@ import abc import codecs import logging -import random import time from typing import Tuple from abc import ABC from dataclasses import dataclass - +from . import list_ports_and_select import serial # type: ignore[import] from serial.serialutil import SerialException # type: ignore[import] +from hardware_testing.data import ui log = logging.getLogger(__name__) @@ -66,6 +66,20 @@ def get_reading(self) -> Reading: ... +def BuildAsairSensor(simulate: bool) -> AsairSensorBase: + """Try to find and return an Asair sensor, if not found return a simulator.""" + ui.print_title("Connecting to Environmental sensor") + if not simulate: + port = list_ports_and_select(device_name="Asair environmental sensor") + try: + sensor = AsairSensor.connect(port) + ui.print_info(f"Found sensor on port {port}") + return sensor + except SerialException: + pass + return SimAsairSensor() + + class AsairSensor(AsairSensorBase): """Asair sensor driver.""" @@ -152,6 +166,6 @@ class SimAsairSensor(AsairSensorBase): def get_reading(self) -> Reading: """Get a reading.""" - temp = random.uniform(24.5, 25) - relative_hum = random.uniform(45, 40) + temp = 25.0 + relative_hum = 50.0 return Reading(temperature=temp, relative_humidity=relative_hum) diff --git a/hardware-testing/hardware_testing/drivers/radwag/responses.py b/hardware-testing/hardware_testing/drivers/radwag/responses.py index 5f5348977a58..50af16688ca1 100644 --- a/hardware-testing/hardware_testing/drivers/radwag/responses.py +++ b/hardware-testing/hardware_testing/drivers/radwag/responses.py @@ -74,13 +74,18 @@ def _on_unstable_measurement( data = RadwagResponse.build(command, response_list) # SI ? - 0.00020 g # TODO: we could accept more unit types if we wanted... - assert response_list[-1] == "g", ( - f'Expected units to be grams ("g"), ' f'instead got "{response_list[-1]}"' - ) - data.stable = "?" not in response_list - data.measurement = float(response_list[-2]) - if "-" in response_list: - data.measurement *= -1 + if RadwagResponseCodes.MAX_THRESHOLD_EXCEEDED in response_list: + print("Warning: Scale maximum exceeded returning infinity") + data.stable = False + data.measurement = float("inf") + else: + assert response_list[-1] == "g", ( + f'Expected units to be grams ("g"), ' f'instead got "{response_list[-1]}"' + ) + data.stable = "?" not in response_list + data.measurement = float(response_list[-2]) + if "-" in response_list: + data.measurement *= -1 return data diff --git a/hardware-testing/hardware_testing/gravimetric/__main__.py b/hardware-testing/hardware_testing/gravimetric/__main__.py index bed12dccd0d0..07609e72da47 100644 --- a/hardware-testing/hardware_testing/gravimetric/__main__.py +++ b/hardware-testing/hardware_testing/gravimetric/__main__.py @@ -2,11 +2,11 @@ from json import load as json_load from pathlib import Path import argparse -from typing import List +from typing import List, Union from opentrons.protocol_api import ProtocolContext -from hardware_testing.data import ui +from hardware_testing.data import create_run_id_and_start_time, ui, get_git_description from hardware_testing.protocols import ( gravimetric_ot3_p50_single, gravimetric_ot3_p50_multi_50ul_tip, @@ -26,8 +26,17 @@ ) from . import execute, helpers, workarounds, execute_photometric -from .config import GravimetricConfig, GANTRY_MAX_SPEED, PhotometricConfig +from .config import ( + GravimetricConfig, + GANTRY_MAX_SPEED, + PhotometricConfig, + ConfigType, + get_tip_volumes_for_qc, +) from .measurement import DELAY_FOR_MEASUREMENT +from .trial import TestResources +from .tips import get_tips +from hardware_testing.drivers import asair_sensor # FIXME: bump to v2.15 to utilize protocol engine API_LEVEL = "2.13" @@ -89,7 +98,7 @@ } -def run_gravimetric( +def build_gravimetric_cfg( protocol: ProtocolContext, pipette_volume: int, pipette_channels: int, @@ -104,7 +113,8 @@ def run_gravimetric( gantry_speed: int, scale_delay: int, isolate_channels: List[int], -) -> None: + extra: bool, +) -> GravimetricConfig: """Run.""" if increment: protocol_cfg = GRAVIMETRIC_CFG_INCREMENT[pipette_volume][pipette_channels][ @@ -112,33 +122,32 @@ def run_gravimetric( ] else: protocol_cfg = GRAVIMETRIC_CFG[pipette_volume][pipette_channels][tip_volume] - execute.run( - protocol, - GravimetricConfig( - name=protocol_cfg.metadata["protocolName"], # type: ignore[attr-defined] - pipette_mount="left", - pipette_volume=pipette_volume, - pipette_channels=pipette_channels, - tip_volume=tip_volume, - trials=trials, - labware_offsets=LABWARE_OFFSETS, - labware_on_scale=protocol_cfg.LABWARE_ON_SCALE, # type: ignore[attr-defined] - slot_scale=protocol_cfg.SLOT_SCALE, # type: ignore[attr-defined] - slots_tiprack=protocol_cfg.SLOTS_TIPRACK[tip_volume], # type: ignore[attr-defined] - increment=increment, - return_tip=return_tip, - blank=blank, - mix=mix, - inspect=inspect, - user_volumes=user_volumes, - gantry_speed=gantry_speed, - scale_delay=scale_delay, - isolate_channels=isolate_channels, - ), + return GravimetricConfig( + name=protocol_cfg.metadata["protocolName"], # type: ignore[attr-defined] + pipette_mount="left", + pipette_volume=pipette_volume, + pipette_channels=pipette_channels, + tip_volume=tip_volume, + trials=trials, + labware_offsets=LABWARE_OFFSETS, + labware_on_scale=protocol_cfg.LABWARE_ON_SCALE, # type: ignore[attr-defined] + slot_scale=protocol_cfg.SLOT_SCALE, # type: ignore[attr-defined] + slots_tiprack=protocol_cfg.SLOTS_TIPRACK[tip_volume], # type: ignore[attr-defined] + increment=increment, + return_tip=return_tip, + blank=blank, + mix=mix, + inspect=inspect, + user_volumes=user_volumes, + gantry_speed=gantry_speed, + scale_delay=scale_delay, + isolate_channels=isolate_channels, + kind=ConfigType.gravimetric, + extra=args.extra, ) -def run_photometric( +def build_photometric_cfg( protocol: ProtocolContext, pipette_volume: int, tip_volume: int, @@ -147,48 +156,122 @@ def run_photometric( mix: bool, inspect: bool, user_volumes: bool, - gantry_speed: int, touch_tip: bool, refill: bool, -) -> None: + extra: bool, +) -> PhotometricConfig: """Run.""" protocol_cfg = PHOTOMETRIC_CFG[tip_volume] - execute_photometric.run( - protocol, - PhotometricConfig( - name=protocol_cfg.metadata["protocolName"], # type: ignore[attr-defined] - pipette_mount="left", - pipette_volume=pipette_volume, - tip_volume=tip_volume, - trials=trials, - labware_offsets=LABWARE_OFFSETS, - photoplate=protocol_cfg.PHOTOPLATE_LABWARE, # type: ignore[attr-defined] - photoplate_slot=protocol_cfg.SLOT_PLATE, # type: ignore[attr-defined] - reservoir=protocol_cfg.RESERVOIR_LABWARE, # type: ignore[attr-defined] - reservoir_slot=protocol_cfg.SLOT_RESERVOIR, # type: ignore[attr-defined] - slots_tiprack=protocol_cfg.SLOTS_TIPRACK[tip_volume], # type: ignore[attr-defined] - return_tip=return_tip, - mix=mix, - inspect=inspect, - user_volumes=user_volumes, - gantry_speed=gantry_speed, - touch_tip=touch_tip, - refill=refill, + return PhotometricConfig( + name=protocol_cfg.metadata["protocolName"], # type: ignore[attr-defined] + pipette_mount="left", + pipette_volume=pipette_volume, + pipette_channels=96, + increment=False, + tip_volume=tip_volume, + trials=trials, + labware_offsets=LABWARE_OFFSETS, + photoplate=protocol_cfg.PHOTOPLATE_LABWARE, # type: ignore[attr-defined] + photoplate_slot=protocol_cfg.SLOT_PLATE, # type: ignore[attr-defined] + reservoir=protocol_cfg.RESERVOIR_LABWARE, # type: ignore[attr-defined] + reservoir_slot=protocol_cfg.SLOT_RESERVOIR, # type: ignore[attr-defined] + slots_tiprack=protocol_cfg.SLOTS_TIPRACK[tip_volume], # type: ignore[attr-defined] + return_tip=return_tip, + mix=mix, + inspect=inspect, + user_volumes=user_volumes, + touch_tip=touch_tip, + refill=refill, + kind=ConfigType.photometric, + extra=args.extra, + ) + + +def _main(args: argparse.Namespace, _ctx: ProtocolContext) -> None: + union_cfg: Union[PhotometricConfig, GravimetricConfig] + if args.photometric: + cfg_pm: PhotometricConfig = build_photometric_cfg( + _ctx, + args.pipette, + args.tip, + args.trials, + args.return_tip, + args.mix, + args.inspect, + args.user_volumes, + args.touch_tip, + args.refill, + args.extra, + ) + if args.trials == 0: + cfg_pm.trials = helpers.get_default_trials(cfg_pm) + union_cfg = cfg_pm + else: + cfg_gm: GravimetricConfig = build_gravimetric_cfg( + _ctx, + args.pipette, + args.channels, + args.tip, + args.trials, + args.increment, + args.return_tip, + False if args.no_blank else True, + args.mix, + args.inspect, + args.user_volumes, + args.gantry_speed, + args.scale_delay, + args.isolate_channels if args.isolate_channels else [], + args.extra, + ) + if args.trials == 0: + cfg_gm.trials = helpers.get_default_trials(cfg_gm) + union_cfg = cfg_gm + run_id, start_time = create_run_id_and_start_time() + ui.print_header("LOAD PIPETTE") + pipette = helpers._load_pipette(_ctx, union_cfg) + ui.print_header("GET PARAMETERS") + test_volumes = helpers._get_volumes(_ctx, union_cfg) + for v in test_volumes: + ui.print_info(f"\t{v} uL") + all_channels_same_time = ( + getattr(union_cfg, "increment", False) or union_cfg.pipette_channels == 96 + ) + run_args = TestResources( + ctx=_ctx, + pipette=pipette, + pipette_tag=helpers._get_tag_from_pipette(pipette, union_cfg), + tipracks=helpers._load_tipracks( + _ctx, union_cfg, use_adapters=args.channels == 96 ), + test_volumes=test_volumes, + run_id=run_id, + start_time=start_time, + operator_name=helpers._get_operator_name(_ctx.is_simulating()), + robot_serial=helpers._get_robot_serial(_ctx.is_simulating()), + tip_batch=helpers._get_tip_batch(_ctx.is_simulating()), + git_description=get_git_description(), + tips=get_tips(_ctx, pipette, args.tip, all_channels=all_channels_same_time), + env_sensor=asair_sensor.BuildAsairSensor(_ctx.is_simulating()), ) + if args.photometric: + execute_photometric.run(cfg_pm, run_args) + else: + execute.run(cfg_gm, run_args) + if __name__ == "__main__": parser = argparse.ArgumentParser("Pipette Testing") parser.add_argument("--simulate", action="store_true") parser.add_argument("--pipette", type=int, choices=[50, 1000], required=True) parser.add_argument("--channels", type=int, choices=[1, 8, 96], default=1) - parser.add_argument("--tip", type=int, choices=[50, 200, 1000], required=True) - parser.add_argument("--trials", type=int, required=True) + parser.add_argument("--tip", type=int, choices=[0, 50, 200, 1000], default=0) + parser.add_argument("--trials", type=int, default=0) parser.add_argument("--increment", action="store_true") parser.add_argument("--return-tip", action="store_true") parser.add_argument("--skip-labware-offsets", action="store_true") - parser.add_argument("--blank", action="store_true") + parser.add_argument("--no-blank", action="store_true") parser.add_argument("--mix", action="store_true") parser.add_argument("--inspect", action="store_true") parser.add_argument("--user-volumes", action="store_true") @@ -198,23 +281,22 @@ def run_photometric( parser.add_argument("--touch-tip", action="store_true") parser.add_argument("--refill", action="store_true") parser.add_argument("--isolate-channels", nargs="+", type=int, default=None) + parser.add_argument("--extra", action="store_true") args = parser.parse_args() if not args.simulate and not args.skip_labware_offsets: # getting labware offsets must be done before creating the protocol context # because it requires the robot-server to be running ui.print_title("SETUP") - print("Starting opentrons-robot-server, so we can http GET labware offsets") + ui.print_info( + "Starting opentrons-robot-server, so we can http GET labware offsets" + ) offsets = workarounds.http_get_all_labware_offsets() - print(f"found {len(offsets)} offsets:") + ui.print_info(f"found {len(offsets)} offsets:") for offset in offsets: - print(f"\t{offset['createdAt']}:") - print(f"\t\t{offset['definitionUri']}") - print(f"\t\t{offset['vector']}") + ui.print_info(f"\t{offset['createdAt']}:") + ui.print_info(f"\t\t{offset['definitionUri']}") + ui.print_info(f"\t\t{offset['vector']}") LABWARE_OFFSETS.append(offset) - if args.increment: - _protocol = GRAVIMETRIC_CFG_INCREMENT[args.pipette][args.channels][args.tip] - else: - _protocol = GRAVIMETRIC_CFG[args.pipette][args.channels][args.tip] # gather the custom labware (for simulation) custom_defs = {} if args.simulate: @@ -235,34 +317,14 @@ def run_photometric( deck_version="2", extra_labware=custom_defs, ) - if args.photometric: - run_photometric( - _ctx, - args.pipette, - args.tip, - args.trials, - args.return_tip, - args.mix, - args.inspect, - args.user_volumes, - args.gantry_speed, - args.touch_tip, - args.refill, - ) + if args.tip == 0: + for tip in get_tip_volumes_for_qc( + args.pipette, args.channels, args.extra, args.photometric + ): + hw = _ctx._core.get_hardware() + if not _ctx.is_simulating(): + ui.alert_user_ready(f"Ready to run with {tip}ul tip?", hw) + args.tip = tip + _main(args, _ctx) else: - run_gravimetric( - _ctx, - args.pipette, - args.channels, - args.tip, - args.trials, - args.increment, - args.return_tip, - args.blank, - args.mix, - args.inspect, - args.user_volumes, - args.gantry_speed, - args.scale_delay, - args.isolate_channels if args.isolate_channels else [], - ) + _main(args, _ctx) diff --git a/hardware-testing/hardware_testing/gravimetric/config.py b/hardware-testing/hardware_testing/gravimetric/config.py index 528ad58e5942..f5cbc3ba8e87 100644 --- a/hardware-testing/hardware_testing/gravimetric/config.py +++ b/hardware-testing/hardware_testing/gravimetric/config.py @@ -1,12 +1,20 @@ """Config.""" from dataclasses import dataclass -from typing import List +from typing import List, Dict from typing_extensions import Final +from enum import Enum + + +class ConfigType(Enum): + """Substitute for Literal which isn't available until 3.8.0.""" + + gravimetric = 1 + photometric = 2 @dataclass -class GravimetricConfig: - """Execute Gravimetric Setup Config.""" +class VolumetricConfig: + """Execute shared volumetric Setup Config.""" name: str pipette_volume: int @@ -15,40 +23,36 @@ class GravimetricConfig: tip_volume: int trials: int labware_offsets: List[dict] - labware_on_scale: str - slot_scale: int slots_tiprack: List[int] increment: bool return_tip: bool - blank: bool mix: bool inspect: bool user_volumes: bool + kind: ConfigType + extra: bool + + +@dataclass +class GravimetricConfig(VolumetricConfig): + """Execute Gravimetric Setup Config.""" + + labware_on_scale: str + slot_scale: int + blank: bool gantry_speed: int scale_delay: int isolate_channels: List[int] @dataclass -class PhotometricConfig: +class PhotometricConfig(VolumetricConfig): """Execute photometric Setup Config.""" - name: str - pipette_volume: int - pipette_mount: str - tip_volume: int - trials: int - labware_offsets: List[dict] photoplate: str photoplate_slot: int reservoir: str reservoir_slot: int - slots_tiprack: List[int] - return_tip: bool - mix: bool - inspect: bool - user_volumes: bool - gantry_speed: int touch_tip: bool refill: bool @@ -72,3 +76,107 @@ class PhotometricConfig: VIAL_SAFE_Z_OFFSET: Final = 25 LABWARE_BOTTOM_CLEARANCE = 1.5 + + +QC_VOLUMES_G: Dict[int, Dict[int, Dict[int, List[float]]]] = { + 1: { + 50: { # P50 + 50: [1.0, 50.0], # T50 + }, + 1000: { # P1000 + 50: [5.0], # T50 + 200: [], # T200 + 1000: [1000.0], # T1000 + }, + }, + 8: { + 50: { # P50 + 50: [1.0, 50.0], # T50 + }, + 1000: { # P1000 + 50: [5.0], # T50 + 200: [], # T200 + 1000: [1000.0], # T1000 + }, + }, + 96: { + 1000: { # P1000 + 50: [], # T50 + 200: [], # T200 + 1000: [1000.0], # T1000 + }, + }, +} + + +QC_VOLUMES_EXTRA_G: Dict[int, Dict[int, Dict[int, List[float]]]] = { + 1: { + 50: { # P50 + 50: [1.0, 10.0, 50.0], # T50 + }, + 1000: { # P1000 + 50: [5.0, 50], # T50 + 200: [200.0], # T200 + 1000: [1000.0], # T1000 + }, + }, + 8: { + 50: { # P50 + 50: [1.0, 10.0, 50.0], # T50 + }, + 1000: { # P1000 + 50: [5.0, 50], # T50 + 200: [200.0], # T200 + 1000: [1000.0], # T1000 + }, + }, + 96: { + 1000: { # P1000 + 50: [], # T50 + 200: [], # T200 + 1000: [1000.0], # T1000 + }, + }, +} + +QC_VOLUMES_P: Dict[int, Dict[int, Dict[int, List[float]]]] = { + 96: { + 1000: { # P1000 + 50: [5.0], # T50 + 200: [200.0], # T200 + 1000: [], # T1000 + }, + }, +} + +QC_DEFAULT_TRIALS: Dict[ConfigType, Dict[int, int]] = { + ConfigType.gravimetric: { + 1: 10, + 8: 8, + 96: 9, + }, + ConfigType.photometric: { + 96: 5, + }, +} + + +def get_tip_volumes_for_qc( + pipette_volume: int, pipette_channels: int, extra: bool, photometric: bool +) -> List[int]: + """Build the default testing volumes for qc.""" + config: Dict[int, Dict[int, Dict[int, List[float]]]] = {} + if photometric: + config = QC_VOLUMES_P + else: + if extra: + config = QC_VOLUMES_EXTRA_G + else: + config = QC_VOLUMES_G + tip_volumes = [ + t + for t in config[pipette_channels][pipette_volume].keys() + if len(config[pipette_channels][pipette_volume][t]) > 0 + ] + assert len(tip_volumes) > 0 + return tip_volumes diff --git a/hardware-testing/hardware_testing/gravimetric/execute.py b/hardware-testing/hardware_testing/gravimetric/execute.py index d752a440cf61..5302a477ac86 100644 --- a/hardware-testing/hardware_testing/gravimetric/execute.py +++ b/hardware-testing/hardware_testing/gravimetric/execute.py @@ -1,45 +1,49 @@ """Gravimetric.""" from time import sleep -from dataclasses import dataclass -from inspect import getsource -from statistics import stdev from typing import Optional, Tuple, List, Dict -from opentrons.hardware_control.instruments.ot3.pipette import Pipette -from opentrons.types import Location -from opentrons.protocol_api import ProtocolContext, InstrumentContext, Well, Labware +from opentrons.protocol_api import ProtocolContext, Well, Labware -from hardware_testing.data import create_run_id_and_start_time, ui, get_git_description +from hardware_testing.data import ui from hardware_testing.data.csv_report import CSVReport -from hardware_testing.opentrons_api.types import OT3Mount, Point -from hardware_testing.opentrons_api.helpers_ot3 import clear_pipette_ul_per_mm +from hardware_testing.opentrons_api.types import Point, OT3Mount from . import report from . import config -from .helpers import get_pipette_unique_name -from .workarounds import get_sync_hw_api, get_latest_offset_for_labware -from .increments import get_volume_increments -from .liquid_class.defaults import get_test_volumes, get_liquid_class +from .helpers import ( + _calculate_stats, + _get_channel_offset, + _calculate_average, + _jog_to_find_liquid_height, + _apply_labware_offsets, + _pick_up_tip, + _drop_tip, +) +from .trial import ( + build_gravimetric_trials, + GravimetricTrial, + TestResources, + _finish_test, +) from .liquid_class.pipetting import ( aspirate_with_liquid_class, dispense_with_liquid_class, PipettingCallbacks, ) -from .liquid_height.height import LiquidTracker, initialize_liquid_from_deck +from .liquid_height.height import LiquidTracker from .measurement import ( MeasurementData, MeasurementType, record_measurement_data, calculate_change_in_volume, create_measurement_tag, - DELAY_FOR_MEASUREMENT, ) from .measurement.environment import get_min_reading, get_max_reading from .measurement.record import ( GravimetricRecorder, GravimetricRecorderConfig, ) -from .tips import get_tips, MULTI_CHANNEL_TEST_ORDER +from .tips import MULTI_CHANNEL_TEST_ORDER _MEASUREMENTS: List[Tuple[str, MeasurementData]] = list() @@ -49,6 +53,13 @@ _tip_counter: Dict[int, int] = {} +def _minimum_z_height(cfg: config.GravimetricConfig) -> int: + if cfg.pipette_channels == 96: + return 133 + else: + return 0 + + def _generate_callbacks_for_trial( recorder: GravimetricRecorder, volume: Optional[float], @@ -99,190 +110,29 @@ def _update_environment_first_last_min_max(test_report: report.CSVReport) -> Non report.store_environment(test_report, report.EnvironmentReportState.MAX, max_data) -def _check_if_software_supports_high_volumes() -> bool: - src_a = getsource(Pipette.set_current_volume) - src_b = getsource(Pipette.ok_to_add_volume) - modified_a = "# assert new_volume <= self.working_volume" in src_a - modified_b = "return True" in src_b - return modified_a and modified_b - - -def _reduce_volumes_to_not_exceed_software_limit( - test_volumes: List[float], cfg: config.GravimetricConfig -) -> List[float]: - for i, v in enumerate(test_volumes): - liq_cls = get_liquid_class( - cfg.pipette_volume, cfg.pipette_channels, cfg.tip_volume, int(v) - ) - max_vol = cfg.tip_volume - liq_cls.aspirate.trailing_air_gap - test_volumes[i] = min(v, max_vol - 0.1) - return test_volumes - - -def _get_volumes(ctx: ProtocolContext, cfg: config.GravimetricConfig) -> List[float]: - if cfg.increment: - test_volumes = get_volume_increments(cfg.pipette_volume, cfg.tip_volume) - elif cfg.user_volumes and not ctx.is_simulating(): - _inp = input('Enter desired volumes, comma separated (eg: "10,100,1000") :') - test_volumes = [ - float(vol_str) for vol_str in _inp.strip().split(",") if vol_str - ] - else: - test_volumes = get_test_volumes( - cfg.pipette_volume, cfg.pipette_channels, cfg.tip_volume - ) - if not test_volumes: - raise ValueError("no volumes to test, check the configuration") - if not _check_if_software_supports_high_volumes(): - if ctx.is_simulating(): - test_volumes = _reduce_volumes_to_not_exceed_software_limit( - test_volumes, cfg - ) - else: - raise RuntimeError("you are not the correct branch") - return sorted(test_volumes, reverse=False) # lowest volumes first - - -def _get_channel_offset(cfg: config.GravimetricConfig, channel: int) -> Point: - assert ( - channel < cfg.pipette_channels - ), f"unexpected channel on {cfg.pipette_channels} channel pipette: {channel}" - if cfg.pipette_channels == 1: - return Point() - if cfg.pipette_channels == 8: - return Point(y=channel * 9.0) - if cfg.pipette_channels == 96: - row = channel % 8 # A-H - col = int(float(channel) / 8.0) # 1-12 - return Point(x=col * 9.0, y=row * 9.0) - raise ValueError(f"unexpected number of channels in config: {cfg.pipette_channels}") - - -def _load_pipette( - ctx: ProtocolContext, cfg: config.GravimetricConfig -) -> InstrumentContext: - load_str_channels = {1: "single", 8: "multi", 96: "96"} - if cfg.pipette_channels not in load_str_channels: - raise ValueError(f"unexpected number of channels: {cfg.pipette_channels}") - chnl_str = load_str_channels[cfg.pipette_channels] - if cfg.pipette_channels == 96: - pip_name = "p1000_96" - else: - pip_name = f"p{cfg.pipette_volume}_{chnl_str}_gen3" - print(f'pipette "{pip_name}" on mount "{cfg.pipette_mount}"') - pipette = ctx.load_instrument(pip_name, cfg.pipette_mount) - assert pipette.channels == cfg.pipette_channels, ( - f"expected {cfg.pipette_channels} channels, " - f"but got pipette with {pipette.channels} channels" - ) - assert pipette.max_volume == cfg.pipette_volume, ( - f"expected {cfg.pipette_volume} uL pipette, " - f"but got a {pipette.max_volume} uL pipette" - ) - pipette.default_speed = cfg.gantry_speed - # NOTE: 8ch QC testing means testing 1 channel at a time, - # so we need to decrease the pick-up current to work with 1 tip. - if pipette.channels == 8 and not cfg.increment: - hwapi = get_sync_hw_api(ctx) - mnt = OT3Mount.LEFT if cfg.pipette_mount == "left" else OT3Mount.RIGHT - hwpipette: Pipette = hwapi.hardware_pipettes[mnt.to_mount()] - hwpipette.pick_up_configurations.current = 0.2 - return pipette - - -def _apply_labware_offsets( - cfg: config.GravimetricConfig, tip_racks: List[Labware], labware_on_scale: Labware -) -> None: - def _apply(labware: Labware) -> None: - o = get_latest_offset_for_labware(cfg.labware_offsets, labware) - print( - f'Apply labware offset to "{labware.name}" (slot={labware.parent}): ' - f"x={round(o.x, 2)}, y={round(o.y, 2)}, z={round(o.z, 2)}" - ) - labware.set_calibration(o) - - _apply(labware_on_scale) - for rack in tip_racks: - _apply(rack) - - -def _load_labware( - ctx: ProtocolContext, cfg: config.GravimetricConfig -) -> Tuple[Labware, List[Labware]]: - print(f'Loading labware on scale: "{cfg.labware_on_scale}"') +def _load_labware(ctx: ProtocolContext, cfg: config.GravimetricConfig) -> Labware: + ui.print_info(f'Loading labware on scale: "{cfg.labware_on_scale}"') if cfg.labware_on_scale == "radwag_pipette_calibration_vial": namespace = "custom_beta" else: namespace = "opentrons" + # If running multiple tests in one run, the labware may already be loaded + loaded_labwares = ctx.loaded_labwares + if ( + cfg.slot_scale in loaded_labwares.keys() + and loaded_labwares[cfg.slot_scale].name == cfg.labware_on_scale + ): + return loaded_labwares[cfg.slot_scale] + labware_on_scale = ctx.load_labware( cfg.labware_on_scale, location=cfg.slot_scale, namespace=namespace ) - if cfg.pipette_channels == 96: - tiprack_namespace = "custom_beta" - tiprack_loadname = f"opentrons_flex_96_tiprack_{cfg.tip_volume}ul_adp" - else: - tiprack_namespace = "opentrons" - tiprack_loadname = f"opentrons_flex_96_tiprack_{cfg.tip_volume}ul" - tiprack_load_settings: List[Tuple[int, str, str]] = [ - ( - slot, - tiprack_loadname, - tiprack_namespace, - ) - for slot in cfg.slots_tiprack - ] - tipracks: List[Labware] = [] - for ls in tiprack_load_settings: - print(f'Loading tiprack "{ls[1]}" in slot #{ls[0]} with namespace "{ls[2]}"') - tipracks.append(ctx.load_labware(ls[1], location=ls[0], namespace=ls[2])) - _apply_labware_offsets(cfg, tipracks, labware_on_scale) - return labware_on_scale, tipracks - - -def _jog_to_find_liquid_height( - ctx: ProtocolContext, pipette: InstrumentContext, well: Well -) -> float: - _well_depth = well.depth - _liquid_height = _well_depth - _jog_size = -1.0 - if ctx.is_simulating(): - return _liquid_height - 1 - while True: - pipette.move_to(well.bottom(_liquid_height)) - inp = input( - f"height={_liquid_height}: ENTER to jog {_jog_size} mm, " - f'or enter new jog size, or "yes" to save: ' - ) - if inp: - if inp[0] == "y": - break - try: - _jog_size = min(max(float(inp), -1.0), 1.0) - except ValueError: - continue - _liquid_height = min(max(_liquid_height + _jog_size, 0), _well_depth) - return _liquid_height - - -def _calculate_average(volume_list: List[float]) -> float: - return sum(volume_list) / len(volume_list) - - -def _calculate_stats( - volume_list: List[float], total_volume: float -) -> Tuple[float, float, float]: - average = _calculate_average(volume_list) - if len(volume_list) <= 1: - print("skipping CV, only 1x trial per volume") - cv = -0.01 # negative number is impossible - else: - cv = stdev(volume_list) / average - d = (average - total_volume) / total_volume - return average, cv, d + _apply_labware_offsets(cfg, [labware_on_scale]) + return labware_on_scale def _print_stats(mode: str, average: float, cv: float, d: float) -> None: - print( + ui.print_info( f"{mode}:\n" f"\tavg: {round(average, 2)} uL\n" f"\tcv: {round(cv * 100.0, 2)}%\n" @@ -294,190 +144,148 @@ def _print_final_results( volumes: List[float], channel_count: int, test_report: CSVReport ) -> None: for vol in volumes: - print(f" * {vol}ul channel all:") + ui.print_info(f" * {vol}ul channel all:") for mode in ["aspirate", "dispense"]: avg, cv, d = report.get_volume_results_all(test_report, mode, vol) - print(f" - {mode}:") - print(f" avg: {avg}ul") - print(f" cv: {cv}%") - print(f" d: {d}%") + ui.print_info(f" - {mode}:") + ui.print_info(f" avg: {avg}ul") + ui.print_info(f" cv: {cv}%") + ui.print_info(f" d: {d}%") for channel in range(channel_count): - print(f" * vol {vol}ul channel {channel + 1}:") + ui.print_info(f" * vol {vol}ul channel {channel + 1}:") for mode in ["aspirate", "dispense"]: avg, cv, d = report.get_volume_results_per_channel( test_report, mode, vol, channel ) - print(f" - {mode}:") - print(f" avg: {avg}ul") - print(f" cv: {cv}%") - print(f" d: {d}%") + ui.print_info(f" - {mode}:") + ui.print_info(f" avg: {avg}ul") + ui.print_info(f" cv: {cv}%") + ui.print_info(f" d: {d}%") -def _run_trial( - ctx: ProtocolContext, - pipette: InstrumentContext, - well: Well, - channel_offset: Point, - tip_volume: int, - volume: float, +def _next_tip_for_channel( + cfg: config.GravimetricConfig, + resources: TestResources, channel: int, - channel_count: int, - trial: int, - recorder: GravimetricRecorder, - test_report: report.CSVReport, - liquid_tracker: LiquidTracker, - blank: bool, - inspect: bool, - mix: bool = False, - stable: bool = True, - scale_delay: int = DELAY_FOR_MEASUREMENT, + max_tips: int, +) -> Well: + _tips_used = sum([tc for tc in _tip_counter.values()]) + if _tips_used >= max_tips: + if cfg.pipette_channels != 96: + raise RuntimeError("ran out of tips") + if not resources.ctx.is_simulating(): + ui.print_title("Reset 96ch Tip Racks") + ui.get_user_ready(f"ADD {max_tips}x new tip-racks") + _tip_counter[channel] = 0 + _tip = resources.tips[channel][_tip_counter[channel]] + _tip_counter[channel] += 1 + return _tip + + +def _run_trial( + trial: GravimetricTrial, ) -> Tuple[float, MeasurementData, float, MeasurementData]: global _PREV_TRIAL_GRAMS pipetting_callbacks = _generate_callbacks_for_trial( - recorder, volume, channel, trial, blank + trial.recorder, trial.volume, trial.channel, trial.trial, trial.blank ) def _tag(m_type: MeasurementType) -> str: - return create_measurement_tag(m_type, None if blank else volume, channel, trial) + return create_measurement_tag( + m_type, None if trial.blank else trial.volume, trial.channel, trial.trial + ) def _record_measurement_and_store(m_type: MeasurementType) -> MeasurementData: m_tag = _tag(m_type) - if recorder.is_simulator and not blank: + if trial.recorder.is_simulator and not trial.blank: if m_type == MeasurementType.ASPIRATE: - recorder.add_simulation_mass(volume * -0.001) + trial.recorder.add_simulation_mass(trial.volume * -0.001) elif m_type == MeasurementType.DISPENSE: - recorder.add_simulation_mass(volume * 0.001) + trial.recorder.add_simulation_mass(trial.volume * 0.001) m_data = record_measurement_data( - ctx, + trial.ctx, m_tag, - recorder, - pipette.mount, - stable, - shorten=inspect, - delay_seconds=scale_delay, + trial.recorder, + trial.pipette.mount, + trial.stable, + trial.env_sensor, + shorten=trial.inspect, + delay_seconds=trial.scale_delay, ) - report.store_measurement(test_report, m_tag, m_data) + report.store_measurement(trial.test_report, m_tag, m_data) _MEASUREMENTS.append( ( m_tag, m_data, ) ) - _update_environment_first_last_min_max(test_report) + _update_environment_first_last_min_max(trial.test_report) return m_data - print("recorded weights:") + ui.print_info("recorded weights:") # RUN INIT - pipette.move_to(well.top(50).move(channel_offset)) # center channel over well - mnt = OT3Mount.RIGHT if pipette.mount == "right" else OT3Mount.LEFT - ctx._core.get_hardware().retract(mnt) # retract to top of gantry + trial.pipette.move_to( + trial.well.top(50).move(trial.channel_offset) + ) # center channel over well + mnt = OT3Mount.RIGHT if trial.pipette.mount == "right" else OT3Mount.LEFT + trial.ctx._core.get_hardware().retract(mnt) # retract to top of gantry m_data_init = _record_measurement_and_store(MeasurementType.INIT) - print(f"\tinitial grams: {m_data_init.grams_average} g") + ui.print_info(f"\tinitial grams: {m_data_init.grams_average} g") if _PREV_TRIAL_GRAMS is not None: _evaporation_loss_ul = abs( calculate_change_in_volume(_PREV_TRIAL_GRAMS, m_data_init) ) - print(f"{_evaporation_loss_ul} ul evaporated since last trial") - liquid_tracker.update_affected_wells( - well, aspirate=_evaporation_loss_ul, channels=1 + ui.print_info(f"{_evaporation_loss_ul} ul evaporated since last trial") + trial.liquid_tracker.update_affected_wells( + trial.well, aspirate=_evaporation_loss_ul, channels=1 ) _PREV_TRIAL_GRAMS = m_data_init # RUN ASPIRATE aspirate_with_liquid_class( - ctx, - pipette, - tip_volume, - volume, - well, - channel_offset, - channel_count, - liquid_tracker, + trial.ctx, + trial.pipette, + trial.tip_volume, + trial.volume, + trial.well, + trial.channel_offset, + trial.channel_count, + trial.liquid_tracker, callbacks=pipetting_callbacks, - blank=blank, - inspect=inspect, - mix=mix, + blank=trial.blank, + inspect=trial.inspect, + mix=trial.mix, ) - ctx._core.get_hardware().retract(mnt) + trial.ctx._core.get_hardware().retract(mnt) # retract to top of gantry m_data_aspirate = _record_measurement_and_store(MeasurementType.ASPIRATE) - print(f"\tgrams after aspirate: {m_data_aspirate.grams_average} g") - print(f"\tcelsius after aspirate: {m_data_aspirate.celsius_pipette} C") + ui.print_info(f"\tgrams after aspirate: {m_data_aspirate.grams_average} g") + ui.print_info(f"\tcelsius after aspirate: {m_data_aspirate.celsius_pipette} C") # RUN DISPENSE dispense_with_liquid_class( - ctx, - pipette, - tip_volume, - volume, - well, - channel_offset, - channel_count, - liquid_tracker, + trial.ctx, + trial.pipette, + trial.tip_volume, + trial.volume, + trial.well, + trial.channel_offset, + trial.channel_count, + trial.liquid_tracker, callbacks=pipetting_callbacks, - blank=blank, - inspect=inspect, - mix=mix, + blank=trial.blank, + inspect=trial.inspect, + mix=trial.mix, ) - ctx._core.get_hardware().retract(mnt) + trial.ctx._core.get_hardware().retract(mnt) # retract to top of gantry m_data_dispense = _record_measurement_and_store(MeasurementType.DISPENSE) - print(f"\tgrams after dispense: {m_data_dispense.grams_average} g") - + ui.print_info(f"\tgrams after dispense: {m_data_dispense.grams_average} g") # calculate volumes volume_aspirate = calculate_change_in_volume(m_data_init, m_data_aspirate) volume_dispense = calculate_change_in_volume(m_data_aspirate, m_data_dispense) return volume_aspirate, m_data_aspirate, volume_dispense, m_data_dispense -def _get_operator_name(is_simulating: bool) -> str: - if not is_simulating: - return input("OPERATOR name:").strip() - else: - return "simulation" - - -def _get_robot_serial(is_simulating: bool) -> str: - if not is_simulating: - return input("ROBOT SERIAL NUMBER:").strip() - else: - return "simulation-serial-number" - - -def _get_tip_batch(is_simulating: bool) -> str: - if not is_simulating: - return input("TIP BATCH:").strip() - else: - return "simulation-tip-batch" - - -def _pick_up_tip( - ctx: ProtocolContext, - pipette: InstrumentContext, - cfg: config.GravimetricConfig, - location: Location, -) -> None: - print( - f"picking tip {location.labware.as_well().well_name} " - f"from slot #{location.labware.parent.parent}" - ) - pipette.pick_up_tip(location) - # NOTE: the accuracy-adjust function gets set on the Pipette - # each time we pick-up a new tip. - if cfg.increment: - print("clearing pipette ul-per-mm table to be linear") - clear_pipette_ul_per_mm( - get_sync_hw_api(ctx)._obj_to_adapt, # type: ignore[arg-type] - OT3Mount.LEFT if cfg.pipette_mount == "left" else OT3Mount.RIGHT, - ) - - -def _drop_tip(pipette: InstrumentContext, return_tip: bool) -> None: - if return_tip: - pipette.return_tip(home_after=False) - else: - pipette.drop_tip(home_after=False) - - def _get_test_channels(cfg: config.GravimetricConfig) -> List[int]: if cfg.pipette_channels == 8 and not cfg.increment: # NOTE: only test channels separately when QC'ing a 8ch @@ -493,120 +301,36 @@ def _get_channel_divider(cfg: config.GravimetricConfig) -> float: return float(cfg.pipette_channels) -def _get_tag_from_pipette( - pipette: InstrumentContext, cfg: config.GravimetricConfig -) -> str: - pipette_tag = get_pipette_unique_name(pipette) - print(f'found pipette "{pipette_tag}"') - if cfg.increment: - pipette_tag += "-increment" - elif cfg.user_volumes: - pipette_tag += "-user-volume" - else: - pipette_tag += "-qc" - return pipette_tag - - -def _change_pipettes( - ctx: ProtocolContext, pipette: InstrumentContext, return_tip: bool -) -> None: - if pipette.has_tip: - if pipette.current_volume > 0: - print("dispensing liquid to trash") - trash = pipette.trash_container.wells()[0] - # FIXME: this should be a blow_out() at max volume, - # but that is not available through PyAPI yet - # so instead just dispensing. - pipette.dispense(pipette.current_volume, trash.top()) - pipette.aspirate(10) # to pull any droplets back up - print("dropping tip") - _drop_tip(pipette, return_tip) - print("moving to attach position") - pipette.move_to(ctx.deck.position_for(5).move(Point(x=0, y=9 * 7, z=150))) - - -def _next_tip_for_channel( - ctx: ProtocolContext, +def build_gm_report( cfg: config.GravimetricConfig, - tips: Dict[int, List[Well]], - channel: int, - max_tips: int, -) -> Well: - _tips_used = sum([tc for tc in _tip_counter.values()]) - if _tips_used >= max_tips: - if cfg.pipette_channels != 96: - raise RuntimeError("ran out of tips") - if not ctx.is_simulating(): - ui.print_title("Reset 96ch Tip Racks") - ui.get_user_ready(f"ADD {max_tips}x new tip-racks") - _tip_counter[channel] = 0 - _tip = tips[channel][_tip_counter[channel]] - _tip_counter[channel] += 1 - return _tip - - -@dataclass -class _RunParameters: - test_volumes: List[float] - tips: Dict[int, List[Well]] - num_channels_per_transfer: int - channels_to_test: List[int] - trial_total: int - total_tips: int - - -def _calculate_parameters( - ctx: ProtocolContext, cfg: config.GravimetricConfig, pipette: InstrumentContext -) -> _RunParameters: - test_volumes = _get_volumes(ctx, cfg) - for v in test_volumes: - print(f"\t{v} uL") - all_channels_same_time = cfg.increment or cfg.pipette_channels == 96 - tips = get_tips(ctx, pipette, all_channels=all_channels_same_time) - total_tips = len([tip for chnl_tips in tips.values() for tip in chnl_tips]) - channels_to_test = _get_test_channels(cfg) - for channel in channels_to_test: - # initialize the global tip counter, per each channel that will be tested - _tip_counter[channel] = 0 - if len(channels_to_test) > 1: - num_channels_per_transfer = 1 - else: - num_channels_per_transfer = cfg.pipette_channels - trial_total = len(test_volumes) * cfg.trials * len(channels_to_test) - support_tip_resupply = bool(cfg.pipette_channels == 96 and cfg.increment) - if trial_total > total_tips: - if not support_tip_resupply: - raise ValueError(f"more trials ({trial_total}) than tips ({total_tips})") - elif not ctx.is_simulating(): - ui.get_user_ready(f"prepare {trial_total - total_tips} extra tip-racks") - return _RunParameters( - test_volumes=test_volumes, - tips=tips, - num_channels_per_transfer=num_channels_per_transfer, - channels_to_test=channels_to_test, - trial_total=trial_total, - total_tips=total_tips, + resources: TestResources, + recorder: GravimetricRecorder, +) -> report.CSVReport: + """Build a CSVReport formated for gravimetric tests.""" + ui.print_header("CREATE TEST-REPORT") + test_report = report.create_csv_test_report( + resources.test_volumes, cfg, run_id=resources.run_id ) + test_report.set_tag(resources.pipette_tag) + test_report.set_operator(resources.operator_name) + test_report.set_version(resources.git_description) + report.store_serial_numbers( + test_report, + robot=resources.robot_serial, + pipette=resources.pipette_tag, + tips=resources.tip_batch, + scale=recorder.serial_number, + environment="None", + liquid="None", + ) + return test_report -def run(ctx: ProtocolContext, cfg: config.GravimetricConfig) -> None: - """Run.""" - run_id, start_time = create_run_id_and_start_time() - - ui.print_header("LOAD LABWARE") - labware_on_scale, tipracks = _load_labware(ctx, cfg) - liquid_tracker = LiquidTracker() - initialize_liquid_from_deck(ctx, liquid_tracker) - - ui.print_header("LOAD PIPETTE") - pipette = _load_pipette(ctx, cfg) - pipette_tag = _get_tag_from_pipette(pipette, cfg) - - ui.print_header("GET PARAMETERS") - parameters = _calculate_parameters(ctx, cfg, pipette) - +def _load_scale( + cfg: config.GravimetricConfig, resources: TestResources +) -> GravimetricRecorder: ui.print_header("LOAD SCALE") - print( + ui.print_info( "Some Radwag settings cannot be controlled remotely.\n" "Listed below are the things the must be done using the touchscreen:\n" " 1) Set profile to USER\n" @@ -615,130 +339,179 @@ def run(ctx: ProtocolContext, cfg: config.GravimetricConfig) -> None: recorder = GravimetricRecorder( GravimetricRecorderConfig( test_name=cfg.name, - run_id=run_id, - tag=pipette_tag, - start_time=start_time, + run_id=resources.run_id, + tag=resources.pipette_tag, + start_time=resources.start_time, duration=0, - frequency=1000 if ctx.is_simulating() else 5, + frequency=1000 if resources.ctx.is_simulating() else 5, stable=False, ), - simulate=ctx.is_simulating(), + simulate=resources.ctx.is_simulating(), ) - print(f'found scale "{recorder.serial_number}"') - if ctx.is_simulating(): + ui.print_info(f'found scale "{recorder.serial_number}"') + if resources.ctx.is_simulating(): start_sim_mass = {50: 15, 200: 200, 1000: 200} recorder.set_simulation_mass(start_sim_mass[cfg.tip_volume]) recorder.record(in_thread=True) - print(f'scale is recording to "{recorder.file_name}"') + ui.print_info(f'scale is recording to "{recorder.file_name}"') + return recorder - ui.print_header("CREATE TEST-REPORT") - test_report = report.create_csv_test_report( - parameters.test_volumes, cfg, run_id=run_id + +def _calculate_evaporation( + cfg: config.GravimetricConfig, + resources: TestResources, + recorder: GravimetricRecorder, + liquid_tracker: LiquidTracker, + test_report: report.CSVReport, + labware_on_scale: Labware, +) -> Tuple[float, float]: + ui.print_title("MEASURE EVAPORATION") + blank_trials = build_gravimetric_trials( + resources.ctx, + resources.pipette, + cfg, + labware_on_scale["A1"], + [resources.test_volumes[-1]], + [], + recorder, + test_report, + liquid_tracker, + True, + resources.env_sensor, ) - test_report.set_tag(pipette_tag) - test_report.set_operator(_get_operator_name(ctx.is_simulating())) - serial_number = _get_robot_serial(ctx.is_simulating()) - tip_batch = _get_tip_batch(ctx.is_simulating()) - test_report.set_version(get_git_description()) - report.store_serial_numbers( + ui.print_info(f"running {config.NUM_BLANK_TRIALS}x blank measurements") + mnt = OT3Mount.RIGHT if resources.pipette.mount == "right" else OT3Mount.LEFT + resources.ctx._core.get_hardware().retract(mnt) + for i in range(config.SCALE_SECONDS_TO_TRUE_STABILIZE): + ui.print_info( + f"wait for scale to stabilize " + f"({i + 1}/{config.SCALE_SECONDS_TO_TRUE_STABILIZE})" + ) + sleep(1) + actual_asp_list_evap: List[float] = [] + actual_disp_list_evap: List[float] = [] + for b_trial in blank_trials[resources.test_volumes[-1]][0]: + ui.print_header(f"BLANK {b_trial.trial + 1}/{config.NUM_BLANK_TRIALS}") + evap_aspirate, _, evap_dispense, _ = _run_trial(b_trial) + ui.print_info( + f"blank {b_trial.trial + 1}/{config.NUM_BLANK_TRIALS}:\n" + f"\taspirate: {evap_aspirate} uL\n" + f"\tdispense: {evap_dispense} uL" + ) + actual_asp_list_evap.append(evap_aspirate) + actual_disp_list_evap.append(evap_dispense) + ui.print_header("EVAPORATION AVERAGE") + average_aspirate_evaporation_ul = _calculate_average(actual_asp_list_evap) + average_dispense_evaporation_ul = _calculate_average(actual_disp_list_evap) + ui.print_info( + "average:\n" + f"\taspirate: {average_aspirate_evaporation_ul} uL\n" + f"\tdispense: {average_dispense_evaporation_ul} uL" + ) + report.store_average_evaporation( test_report, - robot=serial_number, - pipette=pipette_tag, - tips=tip_batch, - scale=recorder.serial_number, - environment="None", - liquid="None", + average_aspirate_evaporation_ul, + average_dispense_evaporation_ul, ) + return average_aspirate_evaporation_ul, average_dispense_evaporation_ul + + +def run(cfg: config.GravimetricConfig, resources: TestResources) -> None: + """Run.""" + global _PREV_TRIAL_GRAMS + global _MEASUREMENTS + ui.print_header("LOAD LABWARE") + labware_on_scale = _load_labware(resources.ctx, cfg) + liquid_tracker = LiquidTracker(resources.ctx) + + total_tips = len( + [tip for chnl_tips in resources.tips.values() for tip in chnl_tips] + ) + channels_to_test = _get_test_channels(cfg) + for channel in channels_to_test: + # initialize the global tip counter, per each channel that will be tested + _tip_counter[channel] = 0 + trial_total = len(resources.test_volumes) * cfg.trials * len(channels_to_test) + support_tip_resupply = bool(cfg.pipette_channels == 96 and cfg.increment) + if trial_total > total_tips: + if not support_tip_resupply: + raise ValueError(f"more trials ({trial_total}) than tips ({total_tips})") + elif not resources.ctx.is_simulating(): + ui.get_user_ready(f"prepare {trial_total - total_tips} extra tip-racks") + recorder = _load_scale(cfg, resources) + test_report = build_gm_report(cfg, resources, recorder) calibration_tip_in_use = True + + if resources.ctx.is_simulating(): + _PREV_TRIAL_GRAMS = None + _MEASUREMENTS = list() try: ui.print_title("FIND LIQUID HEIGHT") - print("homing...") - ctx.home() - pipette.home_plunger() - first_tip = parameters.tips[0][0] + ui.print_info("homing...") + resources.ctx.home() + resources.pipette.home_plunger() + first_tip = resources.tips[0][0] setup_channel_offset = _get_channel_offset(cfg, channel=0) first_tip_location = first_tip.top().move(setup_channel_offset) - _pick_up_tip(ctx, pipette, cfg, location=first_tip_location) + _pick_up_tip(resources.ctx, resources.pipette, cfg, location=first_tip_location) mnt = OT3Mount.LEFT if cfg.pipette_mount == "left" else OT3Mount.RIGHT - ctx._core.get_hardware().retract(mnt) - if not ctx.is_simulating(): + resources.ctx._core.get_hardware().retract(mnt) + if not resources.ctx.is_simulating(): ui.get_user_ready("REPLACE first tip with NEW TIP") ui.get_user_ready("CLOSE the door, and MOVE AWAY from machine") - print("moving to scale") + ui.print_info("moving to scale") well = labware_on_scale["A1"] - pipette.move_to(well.top(0)) - _liquid_height = _jog_to_find_liquid_height(ctx, pipette, well) + resources.pipette.move_to(well.top(0), minimum_z_height=_minimum_z_height(cfg)) + _liquid_height = _jog_to_find_liquid_height( + resources.ctx, resources.pipette, well + ) + resources.pipette.move_to(well.top().move(Point(0, 0, _minimum_z_height(cfg)))) height_below_top = well.depth - _liquid_height - print(f"liquid is {height_below_top} mm below top of vial") + ui.print_info(f"liquid is {height_below_top} mm below top of vial") liquid_tracker.set_start_volume_from_liquid_height( labware_on_scale["A1"], _liquid_height, name="Water" ) vial_volume = liquid_tracker.get_volume(well) - print(f"software thinks there is {vial_volume} uL of liquid in the vial") - + ui.print_info( + f"software thinks there is {vial_volume} uL of liquid in the vial" + ) if not cfg.blank or cfg.inspect: average_aspirate_evaporation_ul = 0.0 average_dispense_evaporation_ul = 0.0 else: - ui.print_title("MEASURE EVAPORATION") - print(f"running {config.NUM_BLANK_TRIALS}x blank measurements") - mnt = OT3Mount.RIGHT if pipette.mount == "right" else OT3Mount.LEFT - ctx._core.get_hardware().retract(mnt) - for i in range(config.SCALE_SECONDS_TO_TRUE_STABILIZE): - print( - f"wait for scale to stabilize " - f"({i + 1}/{config.SCALE_SECONDS_TO_TRUE_STABILIZE})" - ) - sleep(1) - actual_asp_list_evap: List[float] = [] - actual_disp_list_evap: List[float] = [] - for trial in range(config.NUM_BLANK_TRIALS): - ui.print_header(f"BLANK {trial + 1}/{config.NUM_BLANK_TRIALS}") - evap_aspirate, _, evap_dispense, _ = _run_trial( - ctx=ctx, - pipette=pipette, - well=labware_on_scale["A1"], - channel_offset=Point(), # first channel - tip_volume=cfg.tip_volume, - volume=parameters.test_volumes[-1], - channel=0, # first channel - channel_count=parameters.num_channels_per_transfer, - trial=trial, - recorder=recorder, - test_report=test_report, - liquid_tracker=liquid_tracker, - blank=True, # stay away from the liquid - inspect=cfg.inspect, - mix=cfg.mix, - stable=True, - scale_delay=cfg.scale_delay, - ) - print( - f"blank {trial + 1}/{config.NUM_BLANK_TRIALS}:\n" - f"\taspirate: {evap_aspirate} uL\n" - f"\tdispense: {evap_dispense} uL" - ) - actual_asp_list_evap.append(evap_aspirate) - actual_disp_list_evap.append(evap_dispense) - ui.print_header("EVAPORATION AVERAGE") - average_aspirate_evaporation_ul = _calculate_average(actual_asp_list_evap) - average_dispense_evaporation_ul = _calculate_average(actual_disp_list_evap) - print( - "average:\n" - f"\taspirate: {average_aspirate_evaporation_ul} uL\n" - f"\tdispense: {average_dispense_evaporation_ul} uL" - ) - report.store_average_evaporation( - test_report, + ( average_aspirate_evaporation_ul, average_dispense_evaporation_ul, + ) = _calculate_evaporation( + cfg, + resources, + recorder, + liquid_tracker, + test_report, + labware_on_scale, ) - print("dropping tip") - _drop_tip(pipette, return_tip=False) # always trash calibration tips + + ui.print_info("dropping tip") + _drop_tip( + resources.pipette, return_tip=False, minimum_z_height=_minimum_z_height(cfg) + ) # always trash calibration tips calibration_tip_in_use = False trial_count = 0 - for volume in parameters.test_volumes: + trials = build_gravimetric_trials( + resources.ctx, + resources.pipette, + cfg, + labware_on_scale["A1"], + resources.test_volumes, + channels_to_test, + recorder, + test_report, + liquid_tracker, + False, + resources.env_sensor, + ) + for volume in trials.keys(): actual_asp_list_all = [] actual_disp_list_all = [] ui.print_title(f"{volume} uL") @@ -749,53 +522,37 @@ def run(ctx: ProtocolContext, cfg: config.GravimetricConfig) -> None: trial_disp_dict: Dict[int, List[float]] = { trial: [] for trial in range(cfg.trials) } - for channel in parameters.channels_to_test: - if cfg.isolate_channels and (channel + 1) not in cfg.isolate_channels: - print(f"skipping channel {channel + 1}") - continue + for channel in trials[volume].keys(): channel_offset = _get_channel_offset(cfg, channel) actual_asp_list_channel = [] actual_disp_list_channel = [] aspirate_data_list = [] dispense_data_list = [] - for trial in range(cfg.trials): + for run_trial in trials[volume][channel]: trial_count += 1 ui.print_header( - f"{volume} uL channel {channel + 1} ({trial + 1}/{cfg.trials})" + f"{volume} uL channel {channel + 1} ({run_trial.trial + 1}/{cfg.trials})" ) - print(f"trial total {trial_count}/{parameters.trial_total}") + ui.print_info(f"trial total {trial_count}/{trial_total}") # NOTE: always pick-up new tip for each trial # b/c it seems tips heatup next_tip: Well = _next_tip_for_channel( - ctx, cfg, parameters.tips, channel, parameters.total_tips + cfg, resources, channel, total_tips ) next_tip_location = next_tip.top().move(channel_offset) - _pick_up_tip(ctx, pipette, cfg, location=next_tip_location) + _pick_up_tip( + resources.ctx, + resources.pipette, + cfg, + location=next_tip_location, + ) ( actual_aspirate, aspirate_data, actual_dispense, dispense_data, - ) = _run_trial( - ctx=ctx, - pipette=pipette, - well=labware_on_scale["A1"], - channel_offset=channel_offset, - tip_volume=cfg.tip_volume, - volume=volume, - channel=channel, - channel_count=parameters.num_channels_per_transfer, - trial=trial, - recorder=recorder, - test_report=test_report, - liquid_tracker=liquid_tracker, - blank=False, - inspect=cfg.inspect, - mix=cfg.mix, - stable=True, - scale_delay=cfg.scale_delay, - ) - print( + ) = _run_trial(run_trial) + ui.print_info( "measured volumes:\n" f"\taspirate: {round(actual_aspirate, 2)} uL\n" f"\tdispense: {round(actual_dispense, 2)} uL" @@ -805,7 +562,7 @@ def run(ctx: ProtocolContext, cfg: config.GravimetricConfig) -> None: chnl_div = _get_channel_divider(cfg) disp_with_evap /= chnl_div asp_with_evap /= chnl_div - print( + ui.print_info( "per-channel volume, with evaporation:\n" f"\taspirate: {round(asp_with_evap, 2)} uL\n" f"\tdispense: {round(disp_with_evap, 2)} uL" @@ -814,22 +571,22 @@ def run(ctx: ProtocolContext, cfg: config.GravimetricConfig) -> None: actual_asp_list_channel.append(asp_with_evap) actual_disp_list_channel.append(disp_with_evap) - trial_asp_dict[trial].append(asp_with_evap) - trial_disp_dict[trial].append(disp_with_evap) + trial_asp_dict[run_trial.trial].append(asp_with_evap) + trial_disp_dict[run_trial.trial].append(disp_with_evap) aspirate_data_list.append(aspirate_data) dispense_data_list.append(dispense_data) report.store_trial( test_report, - trial, - volume, - channel, + run_trial.trial, + run_trial.volume, + run_trial.channel, asp_with_evap, disp_with_evap, ) - print("dropping tip") - _drop_tip(pipette, cfg.return_tip) + ui.print_info("dropping tip") + _drop_tip(resources.pipette, cfg.return_tip, _minimum_z_height(cfg)) ui.print_header(f"{volume} uL channel {channel + 1} CALCULATIONS") aspirate_average, aspirate_cv, aspirate_d = _calculate_stats( @@ -940,15 +697,14 @@ def run(ctx: ProtocolContext, cfg: config.GravimetricConfig) -> None: d=dispense_d, ) finally: - print("ending recording") + ui.print_info("ending recording") recorder.stop() recorder.deactivate() - ui.print_title("CHANGE PIPETTES") _return_tip = False if calibration_tip_in_use else cfg.return_tip - _change_pipettes(ctx, pipette, _return_tip) + _finish_test(cfg, resources, _return_tip) ui.print_title("RESULTS") _print_final_results( - volumes=parameters.test_volumes, - channel_count=len(parameters.channels_to_test), + volumes=resources.test_volumes, + channel_count=len(channels_to_test), test_report=test_report, ) diff --git a/hardware-testing/hardware_testing/gravimetric/execute_photometric.py b/hardware-testing/hardware_testing/gravimetric/execute_photometric.py index 6a1c8374e0d2..8e68e36fea4c 100644 --- a/hardware-testing/hardware_testing/gravimetric/execute_photometric.py +++ b/hardware-testing/hardware_testing/gravimetric/execute_photometric.py @@ -1,15 +1,10 @@ """Gravimetric.""" -from inspect import getsource -from statistics import stdev from typing import Tuple, List, Dict from math import ceil -from opentrons.hardware_control.instruments.ot3.pipette import Pipette -from opentrons.types import Location -from opentrons.protocol_api import ProtocolContext, InstrumentContext, Well, Labware +from opentrons.protocol_api import ProtocolContext, Well, Labware -from hardware_testing.data.csv_report import CSVReport -from hardware_testing.data import create_run_id_and_start_time, ui, get_git_description +from hardware_testing.data import ui from hardware_testing.opentrons_api.types import Point, OT3Mount from .measurement import ( MeasurementType, @@ -19,15 +14,24 @@ from .measurement.environment import read_environment_data from . import report from . import config -from .helpers import get_pipette_unique_name -from .workarounds import get_latest_offset_for_labware -from .liquid_class.defaults import get_test_volumes, get_liquid_class +from .helpers import ( + _jog_to_find_liquid_height, + _apply_labware_offsets, + _pick_up_tip, + _drop_tip, +) +from .trial import ( + PhotometricTrial, + build_photometric_trials, + TestResources, + _finish_test, +) from .liquid_class.pipetting import ( aspirate_with_liquid_class, dispense_with_liquid_class, PipettingCallbacks, ) -from .liquid_height.height import LiquidTracker, initialize_liquid_from_deck +from .liquid_height.height import LiquidTracker from .tips import get_tips @@ -58,157 +62,31 @@ def _get_dye_type(volume: float) -> str: return dye_type -def _check_if_software_supports_high_volumes() -> bool: - src_a = getsource(Pipette.set_current_volume) - src_b = getsource(Pipette.ok_to_add_volume) - modified_a = "# assert new_volume <= self.working_volume" in src_a - modified_b = "return True" in src_b - return modified_a and modified_b - - -def _reduce_volumes_to_not_exceed_software_limit( - test_volumes: List[float], cfg: config.PhotometricConfig -) -> List[float]: - for i, v in enumerate(test_volumes): - liq_cls = get_liquid_class(cfg.pipette_volume, 96, cfg.tip_volume, int(v)) - max_vol = cfg.tip_volume - liq_cls.aspirate.trailing_air_gap - test_volumes[i] = min(v, max_vol - 0.1) - return test_volumes - - -def _get_volumes(ctx: ProtocolContext, cfg: config.PhotometricConfig) -> List[float]: - if cfg.user_volumes and not ctx.is_simulating(): - _inp = input('Enter desired volumes, comma separated (eg: "10,100,1000") :') - test_volumes = [ - float(vol_str) for vol_str in _inp.strip().split(",") if vol_str - ] - else: - test_volumes = get_test_volumes(cfg.pipette_volume, 96, cfg.tip_volume) - if not test_volumes: - raise ValueError("no volumes to test, check the configuration") - if not _check_if_software_supports_high_volumes(): - if ctx.is_simulating(): - test_volumes = _reduce_volumes_to_not_exceed_software_limit( - test_volumes, cfg - ) - else: - raise RuntimeError("you are not the correct branch") - return sorted(test_volumes, reverse=False) # lowest volumes first - - -def _get_channel_offset(cfg: config.PhotometricConfig, channel: int) -> Point: - row = channel % 8 # A-H - col = int(float(channel) / 8.0) # 1-12 - return Point(x=col * 9.0, y=row * 9.0) - - -def _load_pipette( - ctx: ProtocolContext, cfg: config.PhotometricConfig -) -> InstrumentContext: - pip_name = f"p{cfg.pipette_volume}_96" - print(f'pipette "{pip_name}" on mount "{cfg.pipette_mount}"') - pipette = ctx.load_instrument(pip_name, cfg.pipette_mount) - assert pipette.max_volume == cfg.pipette_volume, ( - f"expected {cfg.pipette_volume} uL pipette, " - f"but got a {pipette.max_volume} uL pipette" - ) - # pipette.default_speed = cfg.gantry_speed - return pipette - - -def _apply_labware_offsets( - cfg: config.PhotometricConfig, - tip_racks: List[Labware], - photoplate: Labware, - reservoir: Labware, -) -> None: - def _apply(labware: Labware) -> None: - o = get_latest_offset_for_labware(cfg.labware_offsets, labware) - print( - f'Apply labware offset to "{labware.name}" (slot={labware.parent}): ' - f"x={round(o.x, 2)}, y={round(o.y, 2)}, z={round(o.z, 2)}" - ) - labware.set_calibration(o) - - _apply(photoplate) - for rack in tip_racks: - _apply(rack) - _apply(reservoir) - - def _load_labware( ctx: ProtocolContext, cfg: config.PhotometricConfig -) -> Tuple[Labware, Labware, List[Labware]]: - print(f'Loading photoplate labware: "{cfg.photoplate}"') - photoplate = ctx.load_labware(cfg.photoplate, location=cfg.photoplate_slot) - tiprack_load_settings: List[Tuple[int, str]] = [ - ( - slot, - f"opentrons_flex_96_tiprack_{cfg.tip_volume}ul_adp", - ) - for slot in cfg.slots_tiprack - ] - for ls in tiprack_load_settings: - print(f'Loading tiprack "{ls[1]}" in slot #{ls[0]}') - reservoir = ctx.load_labware(cfg.reservoir, location=cfg.reservoir_slot) - tiprack_namespace = "custom_beta" - tipracks = [ - ctx.load_labware(ls[1], location=ls[0], namespace=tiprack_namespace) - for ls in tiprack_load_settings - ] - _apply_labware_offsets(cfg, tipracks, photoplate, reservoir) - return photoplate, reservoir, tipracks - - -def _jog_to_find_liquid_height( - ctx: ProtocolContext, pipette: InstrumentContext, well: Well -) -> float: - _well_depth = well.depth - _liquid_height = _well_depth - _jog_size = -1.0 - if ctx.is_simulating(): - return _liquid_height - 1 - while True: - pipette.move_to(well.bottom(_liquid_height)) - inp = input( - f"height={_liquid_height}: ENTER to jog {_jog_size} mm, " - f'or enter new jog size, or "yes" to save: ' - ) - if inp: - if inp[0] == "y": - break - try: - _jog_size = min(max(float(inp), -1.0), 1.0) - except ValueError: - continue - _liquid_height = min(max(_liquid_height + _jog_size, 0), _well_depth) - return _liquid_height - - -def _calculate_average(volume_list: List[float]) -> float: - return sum(volume_list) / len(volume_list) - - -def _calculate_stats( - volume_list: List[float], total_volume: float -) -> Tuple[float, float, float]: - average = _calculate_average(volume_list) - if len(volume_list) <= 1: - print("skipping CV, only 1x trial per volume") - cv = -0.01 # negative number is impossible +) -> Tuple[Labware, Labware]: + ui.print_info(f'Loading photoplate labware: "{cfg.photoplate}"') + # If running multiple tests in one run, the labware may already be loaded + loaded_labwares = ctx.loaded_labwares + + if ( + cfg.photoplate_slot in loaded_labwares.keys() + and loaded_labwares[cfg.photoplate_slot].name == cfg.photoplate + ): + photoplate = loaded_labwares[cfg.photoplate_slot] else: - cv = stdev(volume_list) / average - d = (average - total_volume) / total_volume - return average, cv, d - - -def _print_stats(mode: str, average: float, cv: float, d: float) -> None: - print( - f"{mode}:\n" - f"\tavg: {round(average, 2)} uL\n" - f"\tcv: {round(cv * 100.0, 2)}%\n" - f"\td: {round(d * 100.0, 2)}%" - ) + photoplate = ctx.load_labware(cfg.photoplate, location=cfg.photoplate_slot) + _apply_labware_offsets(cfg, [photoplate]) + + if ( + cfg.reservoir_slot in loaded_labwares.keys() + and loaded_labwares[cfg.reservoir_slot].name == cfg.reservoir + ): + reservoir = loaded_labwares[cfg.reservoir_slot] + else: + reservoir = ctx.load_labware(cfg.reservoir, location=cfg.reservoir_slot) + _apply_labware_offsets(cfg, [reservoir]) + return photoplate, reservoir def _dispense_volumes(volume: float) -> Tuple[float, float, int]: @@ -218,22 +96,7 @@ def _dispense_volumes(volume: float) -> Tuple[float, float, int]: return target_volume, volume_to_dispense, num_dispenses -def _run_trial( - ctx: ProtocolContext, - test_report: CSVReport, - pipette: InstrumentContext, - source: Well, - dest: Labware, - channel_offset: Point, - tip_volume: int, - volume: float, - trial: int, - liquid_tracker: LiquidTracker, - blank: bool, - inspect: bool, - cfg: config.PhotometricConfig, - mix: bool = False, -) -> None: +def _run_trial(trial: PhotometricTrial) -> None: """Aspirate dye and dispense into a photometric plate.""" def _no_op() -> None: @@ -241,12 +104,14 @@ def _no_op() -> None: return def _tag(m_type: MeasurementType) -> str: - return create_measurement_tag(m_type, volume, 0, trial) + return create_measurement_tag(m_type, trial.volume, 0, trial.trial) def _record_measurement_and_store(m_type: MeasurementType) -> EnvironmentData: m_tag = _tag(m_type) - m_data = read_environment_data(cfg.pipette_mount, ctx.is_simulating()) - report.store_measurements_pm(test_report, m_tag, m_data) + m_data = read_environment_data( + trial.cfg.pipette_mount, trial.ctx.is_simulating(), trial.env_sensor + ) + report.store_measurements_pm(trial.test_report, m_tag, m_data) _MEASUREMENTS.append( ( m_tag, @@ -267,118 +132,87 @@ def _record_measurement_and_store(m_type: MeasurementType) -> EnvironmentData: channel_count = 96 # RUN INIT - target_volume, volume_to_dispense, num_dispenses = _dispense_volumes(volume) + target_volume, volume_to_dispense, num_dispenses = _dispense_volumes(trial.volume) photoplate_preped_vol = max(target_volume - volume_to_dispense, 0) - if num_dispenses > 1 and not ctx.is_simulating(): + if num_dispenses > 1 and not trial.ctx.is_simulating(): # TODO: Likely will not test 1000 uL in the near-term, # but eventually we'll want to be more helpful here in prompting # what volumes need to be added between trials. ui.get_user_ready("check DYE is enough") _record_measurement_and_store(MeasurementType.INIT) - pipette.move_to(location=source.top().move(channel_offset), minimum_z_height=133) + trial.pipette.move_to(location=trial.source.top(), minimum_z_height=133) # RUN ASPIRATE aspirate_with_liquid_class( - ctx, - pipette, - tip_volume, - volume, - source, - channel_offset, + trial.ctx, + trial.pipette, + trial.tip_volume, + trial.volume, + trial.source, + Point(), channel_count, - liquid_tracker, + trial.liquid_tracker, callbacks=pipetting_callbacks, - blank=blank, - inspect=inspect, - mix=mix, + blank=False, + inspect=trial.inspect, + mix=trial.mix, touch_tip=False, ) _record_measurement_and_store(MeasurementType.ASPIRATE) for i in range(num_dispenses): - for w in dest.wells(): - liquid_tracker.set_start_volume(w, photoplate_preped_vol) - pipette.move_to(dest["A1"].top().move(channel_offset)) + for w in trial.dest.wells(): + trial.liquid_tracker.set_start_volume(w, photoplate_preped_vol) + trial.pipette.move_to(trial.dest["A1"].top()) # RUN DISPENSE dispense_with_liquid_class( - ctx, - pipette, - tip_volume, + trial.ctx, + trial.pipette, + trial.tip_volume, volume_to_dispense, - dest["A1"], - channel_offset, + trial.dest["A1"], + Point(), channel_count, - liquid_tracker, + trial.liquid_tracker, callbacks=pipetting_callbacks, - blank=blank, - inspect=inspect, - mix=mix, + blank=False, + inspect=trial.inspect, + mix=trial.mix, added_blow_out=(i + 1) == num_dispenses, - touch_tip=cfg.touch_tip, + touch_tip=trial.cfg.touch_tip, ) _record_measurement_and_store(MeasurementType.DISPENSE) - pipette.move_to(location=dest["A1"].top().move(Point(0, 0, 133))) + trial.pipette.move_to(location=trial.dest["A1"].top().move(Point(0, 0, 133))) if (i + 1) == num_dispenses: - _drop_tip(ctx, pipette, cfg) + _drop_tip(trial.pipette, trial.cfg.return_tip) else: - pipette.move_to(location=dest["A1"].top().move(Point(0, 107, 133))) - if not ctx.is_simulating(): + trial.pipette.move_to( + location=trial.dest["A1"].top().move(Point(0, 107, 133)) + ) + if not trial.ctx.is_simulating(): ui.get_user_ready("add SEAL to plate and remove from DECK") return -def _get_operator_name(is_simulating: bool) -> str: - if not is_simulating: - return input("OPERATOR name:").strip() - else: - return "simulation" - - -def _get_robot_serial(is_simulating: bool) -> str: - if not is_simulating: - return input("ROBOT SERIAL NUMBER:").strip() - else: - return "simulation-serial-number" - - -def _get_tip_batch(is_simulating: bool) -> str: - if not is_simulating: - return input("TIP BATCH:").strip() - else: - return "simulation-tip-batch" - - -def _pick_up_tip( - ctx: ProtocolContext, - pipette: InstrumentContext, - cfg: config.PhotometricConfig, - location: Location, -) -> None: - print( - f"picking tip {location.labware.as_well().well_name} " - f"from slot #{location.labware.parent.parent}" - ) - pipette.pick_up_tip(location) - - -def _drop_tip( - ctx: ProtocolContext, pipette: InstrumentContext, cfg: config.PhotometricConfig +def _display_dye_information( + cfg: config.PhotometricConfig, resources: TestResources ) -> None: - if cfg.return_tip: - pipette.return_tip(home_after=False) - else: - pipette.drop_tip(home_after=False) + ui.print_header("PREPARE") + dye_types_req: Dict[str, float] = {dye: 0 for dye in _DYE_MAP.keys()} + for vol in resources.test_volumes: + _, volume_to_dispense, num_dispenses = _dispense_volumes(vol) + dye_per_vol = vol * 96 * cfg.trials + dye_types_req[_get_dye_type(volume_to_dispense)] += dye_per_vol + include_hv = not [ + v + for v in resources.test_volumes + if _DYE_MAP["A"]["min"] <= v < _DYE_MAP["A"]["max"] + ] -def _display_dye_information( - ctx: ProtocolContext, - dye_types_req: Dict[str, float], - refill: bool, - include_hv: bool, -) -> None: for dye in dye_types_req.keys(): transfered_ul = dye_types_req[dye] reservoir_ul = max(_MIN_START_VOLUME_UL, transfered_ul + _MIN_END_VOLUME_UL) @@ -388,200 +222,184 @@ def _ul_to_ml(x: float) -> float: return round(x / 1000.0, 1) if dye_types_req[dye] > 0: - if refill: + if cfg.refill: # only add the minimum required volume - print(f' * {_ul_to_ml(leftover_ul)} mL "{dye}" LEFTOVER in reservoir') - if not ctx.is_simulating(): + ui.print_info( + f' * {_ul_to_ml(leftover_ul)} mL "{dye}" LEFTOVER in reservoir' + ) + if not resources.ctx.is_simulating(): ui.get_user_ready( f'[refill] ADD {_ul_to_ml(transfered_ul)} mL more DYE type "{dye}"' ) else: # add minimum required volume PLUS labware's dead-volume - if not ctx.is_simulating(): + if not resources.ctx.is_simulating(): dye_msg = 'A" or "HV' if include_hv and dye == "A" else dye ui.get_user_ready( f'add {_ul_to_ml(reservoir_ul)} mL of DYE type "{dye_msg}"' ) -def run(ctx: ProtocolContext, cfg: config.PhotometricConfig) -> None: - """Run.""" - run_id, start_time = create_run_id_and_start_time() - dye_types_req: Dict[str, float] = {dye: 0 for dye in _DYE_MAP.keys()} - test_volumes = _get_volumes(ctx, cfg) - total_photoplates = 0 - for vol in test_volumes: - target_volume, volume_to_dispense, num_dispenses = _dispense_volumes(vol) - total_photoplates += num_dispenses * cfg.trials - dye_per_vol = vol * 96 * cfg.trials - dye_types_req[_get_dye_type(volume_to_dispense)] += dye_per_vol +def build_pm_report( + cfg: config.PhotometricConfig, resources: TestResources +) -> report.CSVReport: + """Build a CSVReport formated for photometric tests.""" + ui.print_header("CREATE TEST-REPORT") + test_report = report.create_csv_test_report_photometric( + resources.test_volumes, cfg, run_id=resources.run_id + ) + test_report.set_tag(resources.pipette_tag) + test_report.set_operator(resources.operator_name) + test_report.set_version(resources.git_description) + report.store_serial_numbers_pm( + test_report, + robot=resources.robot_serial, + pipette=resources.pipette_tag, + tips=resources.tip_batch, + environment="None", + liquid="None", + ) + return test_report - trial_total = len(test_volumes) * cfg.trials - ui.print_header("LOAD LABWARE") - photoplate, reservoir, tipracks = _load_labware(ctx, cfg) - liquid_tracker = LiquidTracker() - initialize_liquid_from_deck(ctx, liquid_tracker) - - ui.print_header("LOAD PIPETTE") - pipette = _load_pipette(ctx, cfg) - pipette_tag = get_pipette_unique_name(pipette) - print(f"found pipette: {pipette_tag}") - if not ctx.is_simulating(): - ui.get_user_ready("create pipette QR code") - if cfg.user_volumes: - pipette_tag += "-user-volume" - else: - pipette_tag += "-qc" - - ui.print_header("GET PARAMETERS") - for v in test_volumes: - print(f"\t{v} uL") - tips = get_tips(ctx, pipette) - total_tips = len([tip for chnl_tips in tips.values() for tip in chnl_tips]) * len( - test_volumes - ) +def execute_trials( + cfg: config.PhotometricConfig, + resources: TestResources, + tips: Dict[int, List[Well]], + trials: Dict[float, List[PhotometricTrial]], +) -> None: + """Execute a batch of pre-constructed trials.""" + ui.print_info("homing...") + resources.ctx.home() + resources.pipette.home_plunger() def _next_tip() -> Well: + # get the first channel's first-used tip + # NOTE: note using list.pop(), b/c tip will be re-filled by operator, + # and so we can use pick-up-tip from there again nonlocal tips if not len(tips[0]): - if not ctx.is_simulating(): + if not resources.ctx.is_simulating(): ui.get_user_ready(f"replace TIPRACKS in slots {cfg.slots_tiprack}") - tips = get_tips(ctx, pipette) + tips = get_tips(resources.ctx, resources.pipette, cfg.tip_volume, True) return tips[0].pop(0) + trial_total = len(resources.test_volumes) * cfg.trials + trial_count = 0 + for volume in trials.keys(): + ui.print_title(f"{volume} uL") + for trial in trials[volume]: + trial_count += 1 + ui.print_header(f"{volume} uL ({trial.trial + 1}/{cfg.trials})") + ui.print_info(f"trial total {trial_count}/{trial_total}") + if not resources.ctx.is_simulating(): + ui.get_user_ready(f"put PLATE #{trial.trial + 1} and remove SEAL") + next_tip: Well = _next_tip() + next_tip_location = next_tip.top() + _pick_up_tip( + resources.ctx, resources.pipette, cfg, location=next_tip_location + ) + _run_trial(trial) + + +def _find_liquid_height( + cfg: config.PhotometricConfig, + resources: TestResources, + liquid_tracker: LiquidTracker, + reservoir: Well, +) -> None: + channel_count = 96 + setup_tip = resources.tips[0][0] + volume_for_setup = max(resources.test_volumes) + _pick_up_tip(resources.ctx, resources.pipette, cfg, location=setup_tip.top()) + mnt = OT3Mount.LEFT if cfg.pipette_mount == "left" else OT3Mount.RIGHT + resources.ctx._core.get_hardware().retract(mnt) + if not resources.ctx.is_simulating(): + ui.get_user_ready("REPLACE first tip with NEW TIP") + required_ul = max( + (volume_for_setup * channel_count * cfg.trials) + _MIN_END_VOLUME_UL, + _MIN_START_VOLUME_UL, + ) + if not resources.ctx.is_simulating(): + _liquid_height = _jog_to_find_liquid_height( + resources.ctx, resources.pipette, reservoir + ) + height_below_top = reservoir.depth - _liquid_height + ui.print_info(f"liquid is {height_below_top} mm below top of reservoir") + liquid_tracker.set_start_volume_from_liquid_height( + reservoir, _liquid_height, name="Dye" + ) + else: + liquid_tracker.set_start_volume(reservoir, required_ul) + reservoir_ul = liquid_tracker.get_volume(reservoir) + ui.print_info( + f"software thinks there is {round(reservoir_ul / 1000, 1)} mL " + f"of liquid in the reservoir (required = {round(required_ul / 1000, 1)} ml)" + ) + if required_ul <= reservoir_ul < _MAX_VOLUME_UL: + ui.print_info("valid liquid height") + elif required_ul > _MAX_VOLUME_UL: + raise NotImplementedError( + f"too many trials ({cfg.trials}) at {volume_for_setup} uL, " + f"refilling reservoir is currently not supported" + ) + elif reservoir_ul < required_ul: + error_msg = ( + f"not enough volume in reservoir to aspirate {volume_for_setup} uL " + f"across {channel_count}x channels for {cfg.trials}x trials" + ) + if resources.ctx.is_simulating(): + raise ValueError(error_msg) + ui.print_error(error_msg) + resources.pipette.move_to(location=reservoir.top(100)) + difference_ul = required_ul - reservoir_ul + ui.get_user_ready( + f"ADD {round(difference_ul / 1000.0, 1)} mL more liquid to RESERVOIR" + ) + resources.pipette.move_to(location=reservoir.top()) + else: + raise RuntimeError( + f"bad volume in reservoir: {round(reservoir_ul / 1000, 1)} ml" + ) + resources.pipette.drop_tip(home_after=False) # always trash setup tips + # NOTE: the first tip-rack should have already been replaced + # with new tips by the operator + + +def run(cfg: config.PhotometricConfig, resources: TestResources) -> None: + """Run.""" + trial_total = len(resources.test_volumes) * cfg.trials + + ui.print_header("LOAD LABWARE") + photoplate, reservoir = _load_labware(resources.ctx, cfg) + liquid_tracker = LiquidTracker(resources.ctx) + + total_tips = len( + [tip for chnl_tips in resources.tips.values() for tip in chnl_tips] + ) * len(resources.test_volumes) + assert ( trial_total <= total_tips ), f"more trials ({trial_total}) than tips ({total_tips})" - ui.print_header("CREATE TEST-REPORT") - test_report = report.create_csv_test_report_photometric( - test_volumes, cfg, run_id=run_id - ) - test_report.set_tag(pipette_tag) - test_report.set_operator(_get_operator_name(ctx.is_simulating())) - serial_number = _get_robot_serial(ctx.is_simulating()) - tip_batch = _get_tip_batch(ctx.is_simulating()) - test_report.set_version(get_git_description()) - report.store_serial_numbers_pm( + test_report = build_pm_report(cfg, resources) + + _display_dye_information(cfg, resources) + _find_liquid_height(cfg, resources, liquid_tracker, reservoir["A1"]) + + trials = build_photometric_trials( + resources.ctx, test_report, - robot=serial_number, - pipette=pipette_tag, - tips=tip_batch, - environment="None", - liquid="None", + resources.pipette, + reservoir["A1"], + photoplate, + resources.test_volumes, + liquid_tracker, + cfg, + resources.env_sensor, ) - ui.print_header("PREPARE") - can_swap_a_for_hv = not [ - v for v in test_volumes if _DYE_MAP["A"]["min"] <= v < _DYE_MAP["A"]["max"] - ] - _display_dye_information(ctx, dye_types_req, cfg.refill, can_swap_a_for_hv) - - print("homing...") - ctx.home() - pipette.home_plunger() - # get the first channel's first-used tip - # NOTE: note using list.pop(), b/c tip will be re-filled by operator, - # and so we can use pick-up-tip from there again try: - trial_count = 0 - source = reservoir["A1"] - channel_count = 96 - channel_offset = Point() - setup_tip = tips[0][0] - volume_for_setup = max(test_volumes) - _pick_up_tip(ctx, pipette, cfg, location=setup_tip.top()) - mnt = OT3Mount.LEFT if cfg.pipette_mount == "left" else OT3Mount.RIGHT - ctx._core.get_hardware().retract(mnt) - if not ctx.is_simulating(): - ui.get_user_ready("REPLACE first tip with NEW TIP") - required_ul = max( - (volume_for_setup * channel_count * cfg.trials) + _MIN_END_VOLUME_UL, - _MIN_START_VOLUME_UL, - ) - if not ctx.is_simulating(): - _liquid_height = _jog_to_find_liquid_height(ctx, pipette, source) - height_below_top = source.depth - _liquid_height - print(f"liquid is {height_below_top} mm below top of reservoir") - liquid_tracker.set_start_volume_from_liquid_height( - source, _liquid_height, name="Dye" - ) - else: - liquid_tracker.set_start_volume(source, required_ul) - reservoir_ul = liquid_tracker.get_volume(source) - print( - f"software thinks there is {round(reservoir_ul / 1000, 1)} mL " - f"of liquid in the reservoir (required = {round(required_ul / 1000, 1)} ml)" - ) - if required_ul <= reservoir_ul < _MAX_VOLUME_UL: - print("valid liquid height") - elif required_ul > _MAX_VOLUME_UL: - raise NotImplementedError( - f"too many trials ({cfg.trials}) at {volume_for_setup} uL, " - f"refilling reservoir is currently not supported" - ) - elif reservoir_ul < required_ul: - error_msg = ( - f"not enough volume in reservoir to aspirate {volume_for_setup} uL " - f"across {channel_count}x channels for {cfg.trials}x trials" - ) - if ctx.is_simulating(): - raise ValueError(error_msg) - ui.print_error(error_msg) - pipette.move_to(location=source.top(100).move(channel_offset)) - difference_ul = required_ul - reservoir_ul - ui.get_user_ready( - f"ADD {round(difference_ul / 1000.0, 1)} mL more liquid to RESERVOIR" - ) - pipette.move_to(location=source.top().move(channel_offset)) - else: - raise RuntimeError( - f"bad volume in reservoir: {round(reservoir_ul / 1000, 1)} ml" - ) - pipette.drop_tip(home_after=False) # always trash setup tips - # NOTE: the first tip-rack should have already been replaced - # with new tips by the operator - for volume in test_volumes: - ui.print_title(f"{volume} uL") - for trial in range(cfg.trials): - trial_count += 1 - ui.print_header(f"{volume} uL ({trial + 1}/{cfg.trials})") - print(f"trial total {trial_count}/{trial_total}") - if not ctx.is_simulating(): - ui.get_user_ready(f"put PLATE #{trial + 1} and remove SEAL") - next_tip: Well = _next_tip() - next_tip_location = next_tip.top() - _pick_up_tip(ctx, pipette, cfg, location=next_tip_location) - _run_trial( - ctx=ctx, - test_report=test_report, - pipette=pipette, - source=source, - dest=photoplate, - channel_offset=channel_offset, - tip_volume=cfg.tip_volume, - volume=volume, - trial=trial, - liquid_tracker=liquid_tracker, - blank=False, - inspect=cfg.inspect, - cfg=cfg, - mix=cfg.mix, - ) - + execute_trials(cfg, resources, resources.tips, trials) finally: - ui.print_title("CHANGE PIPETTES") - if pipette.has_tip: - if pipette.current_volume > 0: - print("dispensing liquid to trash") - trash = pipette.trash_container.wells()[0] - # FIXME: this should be a blow_out() at max volume, - # but that is not available through PyAPI yet - # so instead just dispensing. - pipette.dispense(pipette.current_volume, trash.top()) - pipette.aspirate(10) # to pull any droplets back up - print("dropping tip") - _drop_tip(ctx, pipette, cfg) - print("moving to attach position") - pipette.move_to(ctx.deck.position_for(5).move(Point(x=0, y=9 * 7, z=150))) + _finish_test(cfg, resources, cfg.return_tip) diff --git a/hardware-testing/hardware_testing/gravimetric/helpers.py b/hardware-testing/hardware_testing/gravimetric/helpers.py index eb563150baa9..03594dde337f 100644 --- a/hardware-testing/hardware_testing/gravimetric/helpers.py +++ b/hardware-testing/hardware_testing/gravimetric/helpers.py @@ -1,23 +1,33 @@ """Opentrons helper methods.""" import asyncio from types import MethodType -from typing import Any, List, Dict, Optional +from typing import Any, List, Dict, Optional, Tuple +from statistics import stdev +from . import config +from .liquid_class.defaults import get_liquid_class +from .increments import get_volume_increments +from inspect import getsource +from hardware_testing.data import ui from opentrons import protocol_api from opentrons.protocols.api_support.deck_type import ( guess_from_global_config as guess_deck_type_from_global_config, ) -from opentrons.protocol_api.labware import Well +from opentrons.protocol_api.labware import Well, Labware from opentrons.protocols.types import APIVersion from opentrons.hardware_control.thread_manager import ThreadManager -from opentrons.hardware_control.types import Axis +from opentrons.hardware_control.types import OT3Mount, Axis from opentrons.hardware_control.ot3api import OT3API +from opentrons.hardware_control.instruments.ot3.pipette import Pipette -from opentrons.types import Point +from opentrons.types import Point, Location from opentrons_shared_data.labware.dev_types import LabwareDefinition from hardware_testing.opentrons_api import helpers_ot3 +from opentrons.protocol_api import ProtocolContext, InstrumentContext +from .workarounds import get_sync_hw_api, get_latest_offset_for_labware +from hardware_testing.opentrons_api.helpers_ot3 import clear_pipette_ul_per_mm def _add_fake_simulate( @@ -34,10 +44,10 @@ def _add_fake_comment_pause( ctx: protocol_api.ProtocolContext, ) -> protocol_api.ProtocolContext: def _comment(_: protocol_api.ProtocolContext, a: Any) -> None: - print(a) + ui.print_info(a) def _pause(_: protocol_api.ProtocolContext, a: Any) -> None: - input(a) + ui.get_user_ready(a) setattr(ctx, "comment", MethodType(_comment, ctx)) setattr(ctx, "pause", MethodType(_pause, ctx)) @@ -104,3 +114,286 @@ def get_pipette_unique_name(pipette: protocol_api.InstrumentContext) -> str: def gantry_position_as_point(position: Dict[Axis, float]) -> Point: """Helper to convert Dict[Axis, float] to a Point().""" return Point(x=position[Axis.X], y=position[Axis.Y], z=position[Axis.Z]) + + +def _jog_to_find_liquid_height( + ctx: ProtocolContext, pipette: InstrumentContext, well: Well +) -> float: + _well_depth = well.depth + _liquid_height = _well_depth + _jog_size = -1.0 + if ctx.is_simulating(): + return _liquid_height - 1 + while True: + pipette.move_to(well.bottom(_liquid_height)) + inp = input( + f"height={_liquid_height}: ENTER to jog {_jog_size} mm, " + f'or enter new jog size, or "yes" to save: ' + ) + if inp: + if inp[0] == "y": + break + try: + _jog_size = min(max(float(inp), -1.0), 1.0) + except ValueError: + continue + _liquid_height = min(max(_liquid_height + _jog_size, 0), _well_depth) + return _liquid_height + + +def _calculate_average(volume_list: List[float]) -> float: + return sum(volume_list) / len(volume_list) + + +def _reduce_volumes_to_not_exceed_software_limit( + test_volumes: List[float], + cfg: config.VolumetricConfig, +) -> List[float]: + for i, v in enumerate(test_volumes): + liq_cls = get_liquid_class( + cfg.pipette_volume, cfg.pipette_channels, cfg.tip_volume, int(v) + ) + max_vol = cfg.tip_volume - liq_cls.aspirate.trailing_air_gap + test_volumes[i] = min(v, max_vol - 0.1) + return test_volumes + + +def _check_if_software_supports_high_volumes() -> bool: + src_a = getsource(Pipette.set_current_volume) + src_b = getsource(Pipette.ok_to_add_volume) + modified_a = "# assert new_volume <= self.working_volume" in src_a + modified_b = "return True" in src_b + return modified_a and modified_b + + +def _get_channel_offset(cfg: config.VolumetricConfig, channel: int) -> Point: + assert ( + channel < cfg.pipette_channels + ), f"unexpected channel on {cfg.pipette_channels} channel pipette: {channel}" + if cfg.pipette_channels == 1: + return Point() + if cfg.pipette_channels == 8: + return Point(y=channel * 9.0) + if cfg.pipette_channels == 96: + row = channel % 8 # A-H + col = int(float(channel) / 8.0) # 1-12 + return Point(x=col * 9.0, y=row * 9.0) + raise ValueError(f"unexpected number of channels in config: {cfg.pipette_channels}") + + +def _get_robot_serial(is_simulating: bool) -> str: + if not is_simulating: + return input("ROBOT SERIAL NUMBER:").strip() + else: + return "simulation-serial-number" + + +def _get_operator_name(is_simulating: bool) -> str: + if not is_simulating: + return input("OPERATOR name:").strip() + else: + return "simulation" + + +def _calculate_stats( + volume_list: List[float], total_volume: float +) -> Tuple[float, float, float]: + average = _calculate_average(volume_list) + if len(volume_list) <= 1: + ui.print_info("skipping CV, only 1x trial per volume") + cv = -0.01 # negative number is impossible + else: + cv = stdev(volume_list) / average + d = (average - total_volume) / total_volume + return average, cv, d + + +def _get_tip_batch(is_simulating: bool) -> str: + if not is_simulating: + return input("TIP BATCH:").strip() + else: + return "simulation-tip-batch" + + +def _apply(labware: Labware, cfg: config.VolumetricConfig) -> None: + o = get_latest_offset_for_labware(cfg.labware_offsets, labware) + ui.print_info( + f'Apply labware offset to "{labware.name}" (slot={labware.parent}): ' + f"x={round(o.x, 2)}, y={round(o.y, 2)}, z={round(o.z, 2)}" + ) + labware.set_calibration(o) + + +def _apply_labware_offsets( + cfg: config.VolumetricConfig, + labwares: List[Labware], +) -> None: + for lw in labwares: + _apply(lw, cfg) + + +def _pick_up_tip( + ctx: ProtocolContext, + pipette: InstrumentContext, + cfg: config.VolumetricConfig, + location: Location, +) -> None: + ui.print_info( + f"picking tip {location.labware.as_well().well_name} " + f"from slot #{location.labware.parent.parent}" + ) + pipette.pick_up_tip(location) + # NOTE: the accuracy-adjust function gets set on the Pipette + # each time we pick-up a new tip. + if cfg.increment: + ui.print_info("clearing pipette ul-per-mm table to be linear") + clear_pipette_ul_per_mm( + get_sync_hw_api(ctx)._obj_to_adapt, # type: ignore[arg-type] + OT3Mount.LEFT if cfg.pipette_mount == "left" else OT3Mount.RIGHT, + ) + + +def _drop_tip( + pipette: InstrumentContext, return_tip: bool, minimum_z_height: int = 0 +) -> None: + if return_tip: + pipette.return_tip(home_after=False) + else: + pipette.drop_tip(home_after=False) + if minimum_z_height > 0: + cur_location = pipette._get_last_location_by_api_version() + if cur_location is not None: + pipette.move_to(cur_location.move(Point(0, 0, minimum_z_height))) + + +def _get_volumes(ctx: ProtocolContext, cfg: config.VolumetricConfig) -> List[float]: + if cfg.increment: + test_volumes = get_volume_increments(cfg.pipette_volume, cfg.tip_volume) + elif cfg.user_volumes and not ctx.is_simulating(): + _inp = input('Enter desired volumes, comma separated (eg: "10,100,1000") :') + test_volumes = [ + float(vol_str) for vol_str in _inp.strip().split(",") if vol_str + ] + else: + test_volumes = get_test_volumes(cfg) + if not test_volumes: + raise ValueError("no volumes to test, check the configuration") + if not _check_if_software_supports_high_volumes(): + if ctx.is_simulating(): + test_volumes = _reduce_volumes_to_not_exceed_software_limit( + test_volumes, cfg + ) + else: + raise RuntimeError("you are not the correct branch") + return sorted(test_volumes, reverse=False) # lowest volumes first + + +def _load_pipette( + ctx: ProtocolContext, cfg: config.VolumetricConfig +) -> InstrumentContext: + load_str_channels = {1: "single_gen3", 8: "multi_gen3", 96: "96"} + pip_channels = cfg.pipette_channels + if pip_channels not in load_str_channels: + raise ValueError(f"unexpected number of channels: {pip_channels}") + chnl_str = load_str_channels[pip_channels] + pip_name = f"p{cfg.pipette_volume}_{chnl_str}" + ui.print_info(f'pipette "{pip_name}" on mount "{cfg.pipette_mount}"') + + # if we're doing multiple tests in one run, the pipette may already be loaded + loaded_pipettes = ctx.loaded_instruments + if cfg.pipette_mount in loaded_pipettes.keys(): + return loaded_pipettes[cfg.pipette_mount] + + pipette = ctx.load_instrument(pip_name, cfg.pipette_mount) + assert pipette.max_volume == cfg.pipette_volume, ( + f"expected {cfg.pipette_volume} uL pipette, " + f"but got a {pipette.max_volume} uL pipette" + ) + if hasattr(cfg, "gantry_speed"): + pipette.default_speed = getattr(cfg, "gantry_speed") + + # NOTE: 8ch QC testing means testing 1 channel at a time, + # so we need to decrease the pick-up current to work with 1 tip. + if pipette.channels == 8 and not cfg.increment: + hwapi = get_sync_hw_api(ctx) + mnt = OT3Mount.LEFT if cfg.pipette_mount == "left" else OT3Mount.RIGHT + hwpipette: Pipette = hwapi.hardware_pipettes[mnt.to_mount()] + hwpipette.pick_up_configurations.current = 0.2 + return pipette + + +def _get_tag_from_pipette( + pipette: InstrumentContext, + cfg: config.VolumetricConfig, +) -> str: + pipette_tag = get_pipette_unique_name(pipette) + ui.print_info(f'found pipette "{pipette_tag}"') + if cfg.increment: + pipette_tag += "-increment" + elif cfg.user_volumes: + pipette_tag += "-user-volume" + else: + pipette_tag += "-qc" + return pipette_tag + + +def _load_tipracks( + ctx: ProtocolContext, + cfg: config.VolumetricConfig, + use_adapters: bool = False, +) -> List[Labware]: + adp_str = "_adp" if use_adapters else "" + tiprack_load_settings: List[Tuple[int, str]] = [ + ( + slot, + f"opentrons_flex_96_tiprack_{cfg.tip_volume}ul{adp_str}", + ) + for slot in cfg.slots_tiprack + ] + for ls in tiprack_load_settings: + ui.print_info(f'Loading tiprack "{ls[1]}" in slot #{ls[0]}') + if use_adapters: + tiprack_namespace = "custom_beta" + else: + tiprack_namespace = "opentrons" + + # If running multiple tests in one run, the labware may already be loaded + loaded_labwares = ctx.loaded_labwares + pre_loaded_tips: List[Labware] = [] + for ls in tiprack_load_settings: + if ls[0] in loaded_labwares.keys() and loaded_labwares[ls[0]].name == ls[1]: + pre_loaded_tips.append(loaded_labwares[ls[0]]) + if len(pre_loaded_tips) == len(tiprack_load_settings): + return pre_loaded_tips + + tipracks = [ + ctx.load_labware(ls[1], location=ls[0], namespace=tiprack_namespace) + for ls in tiprack_load_settings + ] + _apply_labware_offsets(cfg, tipracks) + return tipracks + + +def get_test_volumes(cfg: config.VolumetricConfig) -> List[float]: + """Get test volumes.""" + if cfg.kind is config.ConfigType.photometric: + return config.QC_VOLUMES_P[cfg.pipette_channels][cfg.pipette_volume][ + cfg.tip_volume + ] + else: + if cfg.extra: + return config.QC_VOLUMES_EXTRA_G[cfg.pipette_channels][cfg.pipette_volume][ + cfg.tip_volume + ] + else: + return config.QC_VOLUMES_G[cfg.pipette_channels][cfg.pipette_volume][ + cfg.tip_volume + ] + + +def get_default_trials(cfg: config.VolumetricConfig) -> int: + """Return the default number of trials for QC tests.""" + if cfg.increment: + return 3 + else: + return config.QC_DEFAULT_TRIALS[cfg.kind][cfg.pipette_channels] diff --git a/hardware-testing/hardware_testing/gravimetric/increments.py b/hardware-testing/hardware_testing/gravimetric/increments.py index 385d2753d597..2b625e86ae8c 100644 --- a/hardware-testing/hardware_testing/gravimetric/increments.py +++ b/hardware-testing/hardware_testing/gravimetric/increments.py @@ -2,135 +2,86 @@ from typing import List P50_T50 = [ - 1.20, - 1.37, - 1.56, - 1.79, - 2.04, - 2.33, - 2.66, - 3.04, - 3.47, - 3.96, - 4.52, - 5.16, - 5.89, - 6.73, - 7.68, - 8.77, - 10.02, - 11.44, - 13.06, - 14.91, - 17.02, - 19.44, - 22.19, - 25.34, - 28.94, - 33.04, - 37.72, - 43.07, - 49.18, - 56.16, + 1.100, + 1.200, + 1.370, + 1.700, + 2.040, + 2.660, + 3.470, + 3.960, + 4.350, + 4.800, + 5.160, + 5.890, + 6.730, + 8.200, + 10.020, + 11.100, + 14.910, + 28.940, + 53.500, + 56.160, ] P1000_T50 = [ - 2.00, - 2.25, - 2.53, - 2.84, - 3.20, - 3.60, - 4.04, - 4.55, - 5.11, - 5.75, - 6.46, - 7.27, - 8.17, - 9.19, - 10.33, - 11.62, - 13.06, - 14.69, - 16.51, - 18.57, - 20.88, - 23.48, - 26.40, - 29.69, - 33.38, - 37.53, - 42.20, - 47.45, - 53.36, - 60.00, + 2.530, + 2.700, + 3.000, + 3.600, + 4.040, + 4.550, + 5.110, + 5.500, + 5.750, + 6.000, + 6.460, + 7.270, + 8.170, + 11.000, + 12.900, + 16.510, + 26.400, + 33.380, + 53.360, + 60.000, ] P1000_T200 = [ - 2.00, - 2.35, - 2.77, - 3.25, - 3.82, - 4.50, - 5.29, - 6.22, - 7.31, - 8.60, - 10.11, - 11.89, - 13.99, - 16.45, - 19.34, - 22.75, - 26.75, - 31.46, - 36.99, - 43.50, - 51.15, - 60.16, - 70.74, - 83.19, - 97.83, - 115.04, - 135.28, - 159.09, - 187.08, - 220.00, + 3.250, + 3.600, + 4.400, + 6.220, + 7.310, + 8.600, + 11.890, + 13.990, + 22.750, + 36.990, + 56.000, + 97.830, + 159.090, + 187.080, + 220.000, ] P1000_T1000 = [ - 5.00, - 6.03, - 7.27, - 8.77, - 10.57, - 12.74, - 15.37, - 18.53, - 22.34, - 26.94, - 32.48, - 39.17, - 47.23, - 56.95, - 68.67, - 82.80, - 99.84, - 120.38, - 145.16, - 175.03, - 211.05, - 254.48, - 306.85, - 369.99, - 446.13, - 537.94, - 648.65, - 782.13, - 943.08, - 1137.16, + 3.000, + 4.000, + 5.000, + 7.270, + 12.800, + 15.370, + 18.530, + 56.950, + 99.840, + 120.380, + 254.480, + 369.990, + 446.130, + 648.650, + 1030.000, + 1137.160, ] diff --git a/hardware-testing/hardware_testing/gravimetric/liquid_class/defaults.py b/hardware-testing/hardware_testing/gravimetric/liquid_class/defaults.py index bf31b7b4d14b..d8091d253b95 100644 --- a/hardware-testing/hardware_testing/gravimetric/liquid_class/defaults.py +++ b/hardware-testing/hardware_testing/gravimetric/liquid_class/defaults.py @@ -1,5 +1,5 @@ """Defaults.""" -from typing import List +from typing import Dict from .definition import ( LiquidClassSettings, @@ -20,7 +20,7 @@ _default_accel_96ch_ul_sec_sec = 16000 # dispense settings are constant across volumes -_dispense_defaults = { +_dispense_defaults: Dict[int, Dict[int, Dict[int, Dict[int, DispenseSettings]]]] = { 1: { 50: { # P50 50: { # T50 @@ -358,7 +358,7 @@ }, } -_aspirate_defaults = { +_aspirate_defaults: Dict[int, Dict[int, Dict[int, Dict[int, AspirateSettings]]]] = { 1: { 50: { # P50 50: { # T50 @@ -729,21 +729,3 @@ def _get_interp_liq_class(lower_ul: int, upper_ul: int) -> LiquidClassSettings: return _get_interp_liq_class(defined_volumes[1], defined_volumes[2]) else: return _build_liquid_class(defined_volumes[2]) - - -def get_test_volumes(pipette: int, channels: int, tip: int) -> List[float]: - """Get test volumes.""" - if channels == 96: - if tip == 50: - return [5.0] - elif tip == 200: - return [200.0] - elif tip == 1000: - return [1000.0] - else: - raise ValueError(f"no volumes to test for tip size: {tip} uL") - else: - # FIXME: also reduce total number of volumes we test for 1ch & 8ch pipettes - aspirate_cls_per_volume = _aspirate_defaults[channels][pipette][tip] - defined_volumes = list(aspirate_cls_per_volume.keys()) - return [float(v) for v in defined_volumes] diff --git a/hardware-testing/hardware_testing/gravimetric/liquid_class/pipetting.py b/hardware-testing/hardware_testing/gravimetric/liquid_class/pipetting.py index c98f8130cf69..2457042ad5af 100644 --- a/hardware-testing/hardware_testing/gravimetric/liquid_class/pipetting.py +++ b/hardware-testing/hardware_testing/gravimetric/liquid_class/pipetting.py @@ -144,7 +144,7 @@ def _retract( ] = z_discontinuity # NOTE: re-setting the gantry-load will reset the move-manager's per-axis constraints hw_api.set_gantry_load(hw_api.gantry_load) - # retract out of the liquid (not out of the well + # retract out of the liquid (not out of the well) pipette.move_to(well.top(mm_above_well_bottom).move(channel_offset), speed=speed) # reset discontinuity back to default if pipette.channels == 96: @@ -228,6 +228,8 @@ def _pipette_with_liquid_settings( def _dispense_with_added_blow_out() -> None: # dispense all liquid, plus some air by calling `pipette.blow_out(location, volume)` # FIXME: this is a hack, until there's an equivalent `pipette.blow_out(location, volume)` + hw_api = ctx._core.get_hardware() + hw_mount = OT3Mount.LEFT if pipette.mount == "left" else OT3Mount.RIGHT hw_api.blow_out(hw_mount, liquid_class.dispense.leading_air_gap) # ASPIRATE/DISPENSE SEQUENCE HAS THREE PHASES: diff --git a/hardware-testing/hardware_testing/gravimetric/liquid_height/height.py b/hardware-testing/hardware_testing/gravimetric/liquid_height/height.py index f33025883109..9d2cba5baf1d 100644 --- a/hardware-testing/hardware_testing/gravimetric/liquid_height/height.py +++ b/hardware-testing/hardware_testing/gravimetric/liquid_height/height.py @@ -7,10 +7,7 @@ from opentrons.protocol_api.labware import Well from opentrons.protocol_api import ProtocolContext -from hardware_testing.gravimetric.helpers import ( - well_is_reservoir, - get_list_of_wells_affected, -) +from hardware_testing.gravimetric import helpers class CalcType(ABC): @@ -200,7 +197,7 @@ def _get_actual_volume_change_in_well( """Get the actual volume change in well, depending on if the pipette is single or multi.""" if volume is None: return None - if well_is_reservoir(well): + if helpers.well_is_reservoir(well): volume *= float(channels) return volume @@ -208,9 +205,11 @@ def _get_actual_volume_change_in_well( class LiquidTracker: """Liquid Tracker.""" - def __init__(self) -> None: + def __init__(self, ctx: Optional[ProtocolContext] = None) -> None: """Liquid Tracker.""" self._items: dict = dict({}) + if ctx is not None: + initialize_liquid_from_deck(ctx, self) def reset(self) -> None: """Reset.""" @@ -346,7 +345,7 @@ def update_affected_wells( volume=dispense, channels=channels, ) - for w in get_list_of_wells_affected(well, channels): + for w in helpers.get_list_of_wells_affected(well, channels): self.update_well_volume( w, after_aspirate=actual_aspirate_amount, diff --git a/hardware-testing/hardware_testing/gravimetric/measurement/__init__.py b/hardware-testing/hardware_testing/gravimetric/measurement/__init__.py index f42922b40314..b3d09da44227 100644 --- a/hardware-testing/hardware_testing/gravimetric/measurement/__init__.py +++ b/hardware-testing/hardware_testing/gravimetric/measurement/__init__.py @@ -9,7 +9,7 @@ from .record import GravimetricRecorder, GravimetricRecording from .environment import read_environment_data, EnvironmentData, get_average_reading - +from hardware_testing.drivers import asair_sensor RELATIVE_DENSITY_WATER: Final = 1.0 @@ -139,11 +139,12 @@ def record_measurement_data( recorder: GravimetricRecorder, mount: str, stable: bool, + env_sensor: asair_sensor.AsairSensorBase, shorten: bool = False, delay_seconds: int = DELAY_FOR_MEASUREMENT, ) -> MeasurementData: """Record measurement data.""" - env_data = read_environment_data(mount, ctx.is_simulating()) + env_data = read_environment_data(mount, ctx.is_simulating(), env_sensor) # NOTE: we need to delay some amount, to give the scale time to accumulate samples with recorder.samples_of_tag(tag): if ctx.is_simulating(): diff --git a/hardware-testing/hardware_testing/gravimetric/measurement/environment.py b/hardware-testing/hardware_testing/gravimetric/measurement/environment.py index d780b14e9038..6eebd8187beb 100644 --- a/hardware-testing/hardware_testing/gravimetric/measurement/environment.py +++ b/hardware-testing/hardware_testing/gravimetric/measurement/environment.py @@ -7,6 +7,7 @@ SensorResponseBad, ) from hardware_testing.opentrons_api.types import OT3Mount +from hardware_testing.drivers import asair_sensor @dataclass @@ -21,7 +22,9 @@ class EnvironmentData: celsius_liquid: float -def read_environment_data(mount: str, is_simulating: bool) -> EnvironmentData: +def read_environment_data( + mount: str, is_simulating: bool, env_sensor: asair_sensor.AsairSensorBase +) -> EnvironmentData: """Read blank environment data.""" mnt = OT3Mount.LEFT if mount == "left" else OT3Mount.RIGHT try: @@ -31,12 +34,12 @@ def read_environment_data(mount: str, is_simulating: bool) -> EnvironmentData: print(e) celsius_pipette = 25.0 humidity_pipette = 50.0 - # TODO: implement USB environmental sensors + env_data = env_sensor.get_reading() d = EnvironmentData( celsius_pipette=celsius_pipette, humidity_pipette=humidity_pipette, - celsius_air=25.0, - humidity_air=50.0, + celsius_air=env_data.temperature, + humidity_air=env_data.relative_humidity, pascals_air=1000, celsius_liquid=25.0, ) diff --git a/hardware-testing/hardware_testing/gravimetric/tips.py b/hardware-testing/hardware_testing/gravimetric/tips.py index 363fdf281c93..34ca135774ad 100644 --- a/hardware-testing/hardware_testing/gravimetric/tips.py +++ b/hardware-testing/hardware_testing/gravimetric/tips.py @@ -64,10 +64,15 @@ def _get_racks(ctx: ProtocolContext) -> Dict[int, Labware]: } -def get_tips_for_single(ctx: ProtocolContext) -> List[Well]: +def get_tips_for_single(ctx: ProtocolContext, tip_volume: int) -> List[Well]: """Get tips for single channel.""" racks = _get_racks(ctx) - return [tip for rack in racks.values() for tip in rack.wells()] + return [ + tip + for rack in racks.values() + for tip in rack.wells() + if tip.max_volume == tip_volume + ] def get_tips_for_individual_channel_on_multi( @@ -103,11 +108,12 @@ def get_tips_for_96_channel(ctx: ProtocolContext) -> List[Well]: def get_tips( ctx: ProtocolContext, pipette: InstrumentContext, + tip_volume: int, all_channels: bool = True, ) -> Dict[int, List[Well]]: """Get tips.""" if pipette.channels == 1: - return {0: get_tips_for_single(ctx)} + return {0: get_tips_for_single(ctx, tip_volume)} elif pipette.channels == 8: if all_channels: return {0: get_tips_for_all_channels_on_multi(ctx)} diff --git a/hardware-testing/hardware_testing/gravimetric/trial.py b/hardware-testing/hardware_testing/gravimetric/trial.py new file mode 100644 index 000000000000..936b3c25b322 --- /dev/null +++ b/hardware-testing/hardware_testing/gravimetric/trial.py @@ -0,0 +1,224 @@ +"""Dataclass that describes the arguments for trials.""" +from dataclasses import dataclass +from typing import List, Optional, Union, Dict +from . import config +from opentrons.protocol_api import ProtocolContext, InstrumentContext, Well, Labware +from .measurement.record import GravimetricRecorder +from .measurement import DELAY_FOR_MEASUREMENT +from .liquid_height.height import LiquidTracker +from hardware_testing.data.csv_report import CSVReport +from hardware_testing.opentrons_api.types import Point +from . import helpers +from . import report +from hardware_testing.data import ui +from hardware_testing.drivers import asair_sensor + + +@dataclass +class VolumetricTrial: + """Common arguments for volumetric scripts.""" + + ctx: ProtocolContext + pipette: InstrumentContext + test_report: CSVReport + liquid_tracker: LiquidTracker + inspect: bool + trial: int + tip_volume: int + volume: float + mix: bool + acceptable_cv: Optional[float] + env_sensor: asair_sensor.AsairSensorBase + + +@dataclass +class GravimetricTrial(VolumetricTrial): + """All the arguments for a single gravimetric trial.""" + + well: Well + channel_offset: Point + channel: int + channel_count: int + recorder: GravimetricRecorder + blank: bool + stable: bool + cfg: config.GravimetricConfig + scale_delay: int = DELAY_FOR_MEASUREMENT + + +@dataclass +class PhotometricTrial(VolumetricTrial): + """All the arguments for a single photometric trial.""" + + source: Well + dest: Labware + cfg: config.PhotometricConfig + + +@dataclass +class TrialOrder: + """A list of all of the trials that a particular QC run needs to run.""" + + trials: List[Union[GravimetricTrial, PhotometricTrial]] + + +@dataclass +class TestResources: + """Common arguments to the run method of volumetric tests.""" + + ctx: ProtocolContext + pipette: InstrumentContext + pipette_tag: str + tipracks: List[Labware] + test_volumes: List[float] + run_id: str + start_time: float + operator_name: str + robot_serial: str + tip_batch: str + git_description: str + tips: Dict[int, List[Well]] + env_sensor: asair_sensor.AsairSensorBase + + +def build_gravimetric_trials( + ctx: ProtocolContext, + instr: InstrumentContext, + cfg: config.GravimetricConfig, + well: Well, + test_volumes: List[float], + channels_to_test: List[int], + recorder: GravimetricRecorder, + test_report: report.CSVReport, + liquid_tracker: LiquidTracker, + blank: bool, + env_sensor: asair_sensor.AsairSensorBase, +) -> Dict[float, Dict[int, List[GravimetricTrial]]]: + """Build a list of all the trials that will be run.""" + trial_list: Dict[float, Dict[int, List[GravimetricTrial]]] = {} + if len(channels_to_test) > 1: + num_channels_per_transfer = 1 + else: + num_channels_per_transfer = cfg.pipette_channels + if blank: + trial_list[test_volumes[-1]] = {0: []} + for trial in range(config.NUM_BLANK_TRIALS): + trial_list[test_volumes[-1]][0].append( + GravimetricTrial( + ctx=ctx, + pipette=instr, + well=well, + channel_offset=Point(), + tip_volume=cfg.tip_volume, + volume=test_volumes[-1], + channel=0, + channel_count=num_channels_per_transfer, + trial=trial, + recorder=recorder, + test_report=test_report, + liquid_tracker=liquid_tracker, + blank=blank, + inspect=cfg.inspect, + mix=cfg.mix, + stable=True, + scale_delay=cfg.scale_delay, + acceptable_cv=None, + cfg=cfg, + env_sensor=env_sensor, + ) + ) + else: + for volume in test_volumes: + trial_list[volume] = {} + for channel in channels_to_test: + if cfg.isolate_channels and (channel + 1) not in cfg.isolate_channels: + ui.print_info(f"skipping channel {channel + 1}") + continue + trial_list[volume][channel] = [] + channel_offset = helpers._get_channel_offset(cfg, channel) + for trial in range(cfg.trials): + trial_list[volume][channel].append( + GravimetricTrial( + ctx=ctx, + pipette=instr, + well=well, + channel_offset=channel_offset, + tip_volume=cfg.tip_volume, + volume=volume, + channel=channel, + channel_count=num_channels_per_transfer, + trial=trial, + recorder=recorder, + test_report=test_report, + liquid_tracker=liquid_tracker, + blank=blank, + inspect=cfg.inspect, + mix=cfg.mix, + stable=True, + scale_delay=cfg.scale_delay, + acceptable_cv=None, + cfg=cfg, + env_sensor=env_sensor, + ) + ) + return trial_list + + +def build_photometric_trials( + ctx: ProtocolContext, + test_report: CSVReport, + pipette: InstrumentContext, + source: Well, + dest: Labware, + test_volumes: List[float], + liquid_tracker: LiquidTracker, + cfg: config.PhotometricConfig, + env_sensor: asair_sensor.AsairSensorBase, +) -> Dict[float, List[PhotometricTrial]]: + """Build a list of all the trials that will be run.""" + trial_list: Dict[float, List[PhotometricTrial]] = {} + for volume in test_volumes: + trial_list[volume] = [] + for trial in range(cfg.trials): + trial_list[volume].append( + PhotometricTrial( + ctx=ctx, + test_report=test_report, + pipette=pipette, + source=source, + dest=dest, + tip_volume=cfg.tip_volume, + volume=volume, + trial=trial, + liquid_tracker=liquid_tracker, + inspect=cfg.inspect, + cfg=cfg, + mix=cfg.mix, + acceptable_cv=None, + env_sensor=env_sensor, + ) + ) + return trial_list + + +def _finish_test( + cfg: config.VolumetricConfig, + resources: TestResources, + return_tip: bool, +) -> None: + ui.print_title("CHANGE PIPETTES") + if resources.pipette.has_tip: + if resources.pipette.current_volume > 0: + ui.print_info("dispensing liquid to trash") + trash = resources.pipette.trash_container.wells()[0] + # FIXME: this should be a blow_out() at max volume, + # but that is not available through PyAPI yet + # so instead just dispensing. + resources.pipette.dispense(resources.pipette.current_volume, trash.top()) + resources.pipette.aspirate(10) # to pull any droplets back up + ui.print_info("dropping tip") + helpers._drop_tip(resources.pipette, return_tip) + ui.print_info("moving to attach position") + resources.pipette.move_to( + resources.ctx.deck.position_for(5).move(Point(x=0, y=9 * 7, z=150)) + ) diff --git a/hardware-testing/tests/hardware_testing/drivers/test_asair_sensor.py b/hardware-testing/tests/hardware_testing/drivers/test_asair_sensor.py index c1319d343283..7480b6665a6a 100644 --- a/hardware-testing/tests/hardware_testing/drivers/test_asair_sensor.py +++ b/hardware-testing/tests/hardware_testing/drivers/test_asair_sensor.py @@ -2,8 +2,7 @@ from unittest.mock import MagicMock import pytest -from hardware_testing.drivers import AsairSensor -from hardware_testing.drivers.asair_sensor import Reading +from hardware_testing.drivers.asair_sensor import Reading, AsairSensor @pytest.fixture