From 2cff9d24c542185c8d7de5efbfa1c137b64ac210 Mon Sep 17 00:00:00 2001 From: Ryan Howard Date: Tue, 9 Apr 2024 12:48:20 -0400 Subject: [PATCH] feat(hardware-testing): liquid sense testing script (#14807) # Overview This PR adds a new testing script that allows us to test all kinds of variations of the liquid-sense routine it adds some additional features in the hardware control layer to change up output options to during the probe so we can gate using the buffer-on-pipette feature to a firmware version flag, since that feature has to be compiled in separately # Test Plan # Changelog # Review requests # Risk assessment --------- Co-authored-by: caila-marashaj --- hardware-testing/Makefile | 8 + .../gravimetric/measurement/record.py | 13 +- .../labware/dial_indicator/1.json | 57 ++++ .../hardware_testing/liquid_sense/__init__.py | 1 + .../hardware_testing/liquid_sense/__main__.py | 317 ++++++++++++++++++ .../hardware_testing/liquid_sense/execute.py | 307 +++++++++++++++++ .../liquid_sense/post_process.py | 170 ++++++++++ .../hardware_testing/liquid_sense/report.py | 263 +++++++++++++++ .../opentrons_api/helpers_ot3.py | 4 +- .../protocols/liquid_sense_lpc/__init__.py | 1 + .../liquid_sense_ot3_p1000_96.py | 33 ++ .../liquid_sense_ot3_p1000_multi.py | 26 ++ .../liquid_sense_ot3_p1000_single.py | 33 ++ .../liquid_sense_ot3_p50_multi.py | 28 ++ .../liquid_sense_ot3_p50_single.py | 31 ++ .../firmware_bindings/messages/messages.py | 1 + .../hardware_control/tool_sensors.py | 3 +- 17 files changed, 1287 insertions(+), 9 deletions(-) create mode 100644 hardware-testing/hardware_testing/labware/dial_indicator/1.json create mode 100644 hardware-testing/hardware_testing/liquid_sense/__init__.py create mode 100644 hardware-testing/hardware_testing/liquid_sense/__main__.py create mode 100644 hardware-testing/hardware_testing/liquid_sense/execute.py create mode 100644 hardware-testing/hardware_testing/liquid_sense/post_process.py create mode 100644 hardware-testing/hardware_testing/liquid_sense/report.py create mode 100644 hardware-testing/hardware_testing/protocols/liquid_sense_lpc/__init__.py create mode 100644 hardware-testing/hardware_testing/protocols/liquid_sense_lpc/liquid_sense_ot3_p1000_96.py create mode 100644 hardware-testing/hardware_testing/protocols/liquid_sense_lpc/liquid_sense_ot3_p1000_multi.py create mode 100644 hardware-testing/hardware_testing/protocols/liquid_sense_lpc/liquid_sense_ot3_p1000_single.py create mode 100644 hardware-testing/hardware_testing/protocols/liquid_sense_lpc/liquid_sense_ot3_p50_multi.py create mode 100644 hardware-testing/hardware_testing/protocols/liquid_sense_lpc/liquid_sense_ot3_p50_single.py diff --git a/hardware-testing/Makefile b/hardware-testing/Makefile index 6c12dc305a0..a48b794977f 100755 --- a/hardware-testing/Makefile +++ b/hardware-testing/Makefile @@ -155,6 +155,14 @@ test-examples: test-scripts: $(python) -m hardware_testing.scripts.bowtie_ot3 --simulate +.PHONY: test-liquid-sense +test-liquid-sense: + $(python) -m hardware_testing.liquid_sense --simulate --pipette 1000 --channels 1 + $(python) -m hardware_testing.liquid_sense --simulate --pipette 50 --channels 1 + $(python) -m hardware_testing.liquid_sense --simulate --pipette 1000 --channels 8 + $(python) -m hardware_testing.liquid_sense --simulate --pipette 50 --channels 8 + $(python) -m hardware_testing.liquid_sense --simulate --pipette 1000 --channels 96 + .PHONY: test-integration test-integration: test-production-qc test-examples test-scripts test-gravimetric diff --git a/hardware-testing/hardware_testing/gravimetric/measurement/record.py b/hardware-testing/hardware_testing/gravimetric/measurement/record.py index d1e4ab7e4d4..86ef8b84903 100644 --- a/hardware-testing/hardware_testing/gravimetric/measurement/record.py +++ b/hardware-testing/hardware_testing/gravimetric/measurement/record.py @@ -280,7 +280,11 @@ class GravimetricRecorder: """Gravimetric Recorder.""" def __init__( - self, cfg: GravimetricRecorderConfig, scale: Scale, simulate: bool = False + self, + cfg: GravimetricRecorderConfig, + scale: Scale, + simulate: bool = False, + start_graph: bool = True, ) -> None: """Gravimetric Recorder.""" self._cfg = cfg @@ -294,7 +298,7 @@ def __init__( self._scale_serial: str = "" self._scale_max_capacity: float = 0.0 super().__init__() - self.activate() + self.activate(start_graph) def _start_graph_server_process(self) -> None: if self.is_simulator: @@ -350,9 +354,10 @@ def add_simulation_mass(self, mass: float) -> None: """Add simulation mass.""" self._scale.add_simulation_mass(mass) - def activate(self) -> None: + def activate(self, graph: bool = True) -> None: """Activate.""" - self._start_graph_server_process() + if graph: + self._start_graph_server_process() # Some Radwag settings cannot be controlled remotely. # Listed below are the things the must be done using the touchscreen: # 1) Set profile to USER diff --git a/hardware-testing/hardware_testing/labware/dial_indicator/1.json b/hardware-testing/hardware_testing/labware/dial_indicator/1.json new file mode 100644 index 00000000000..6c3ac9c3f24 --- /dev/null +++ b/hardware-testing/hardware_testing/labware/dial_indicator/1.json @@ -0,0 +1,57 @@ +{ + "schemaVersion": 2, + "version": 1, + "namespace": "custom_beta", + "ordering": [["A1"]], + "metadata": { + "displayName": "Mitutoyo Digimatic Indicator", + "displayCategory": "tubeRack", + "displayVolumeUnits": "µL", + "tags": [] + }, + "dimensions": { + "xDimension": 128, + "yDimension": 86, + "zDimension": 136 + }, + "parameters": { + "format": "irregular", + "quirks": [], + "isTiprack": false, + "isMagneticModuleCompatible": false, + "loadName": "dial_indicator" + }, + "wells": { + "A1": { + "depth": 14, + "totalLiquidVolume": 10, + "shape": "circular", + "diameter": 4, + "x": 60.8, + "y": 41.5, + "z": 135 + } + }, + "brand": { + "brand": "Mitutoyo", + "brandId": ["ID-S"] + }, + "groups": [ + { + "brand": { + "brand": "Mitutoyo", + "brandId": ["ID-S"] + }, + "metadata": { + "wellBottomShape": "flat", + "displayCategory": "tubeRack" + }, + "wells": ["A1"] + } + ], + "cornerOffsetFromSlot": { + "x": 0, + "y": 0, + "z": 0 + } +} diff --git a/hardware-testing/hardware_testing/liquid_sense/__init__.py b/hardware-testing/hardware_testing/liquid_sense/__init__.py new file mode 100644 index 00000000000..e6b26332d7b --- /dev/null +++ b/hardware-testing/hardware_testing/liquid_sense/__init__.py @@ -0,0 +1 @@ +"""Liquid Sense.""" diff --git a/hardware-testing/hardware_testing/liquid_sense/__main__.py b/hardware-testing/hardware_testing/liquid_sense/__main__.py new file mode 100644 index 00000000000..10db70e67c8 --- /dev/null +++ b/hardware-testing/hardware_testing/liquid_sense/__main__.py @@ -0,0 +1,317 @@ +"""Liquid sense testing.""" +import argparse +from dataclasses import dataclass +from json import load as json_load +from pathlib import Path +import subprocess +from time import sleep +import os +from typing import List, Any, Optional +import traceback + +from hardware_testing.opentrons_api import helpers_ot3 +from hardware_testing.gravimetric import helpers, workarounds +from hardware_testing.data.csv_report import CSVReport +from hardware_testing.gravimetric.measurement.record import GravimetricRecorder +from hardware_testing.gravimetric.measurement.scale import Scale +from hardware_testing.drivers import ( + asair_sensor, + mitutoyo_digimatic_indicator, + list_ports_and_select, +) +from hardware_testing.data import ( + ui, + create_run_id_and_start_time, + get_git_description, + get_testing_data_directory, +) + +from opentrons.protocol_api import InstrumentContext, ProtocolContext +from opentrons.protocol_engine.types import LabwareOffset + +from hardware_testing.liquid_sense import execute +from .report import build_ls_report, store_config, store_serial_numbers +from .post_process import process_csv_directory + +from hardware_testing.protocols.liquid_sense_lpc import ( + liquid_sense_ot3_p50_single, + liquid_sense_ot3_p50_multi, + liquid_sense_ot3_p1000_single, + liquid_sense_ot3_p1000_multi, + liquid_sense_ot3_p1000_96, +) + +API_LEVEL = "2.18" + +LABWARE_OFFSETS: List[LabwareOffset] = [] + + +LIQUID_SENSE_CFG = { + 50: { + 1: liquid_sense_ot3_p50_single, + 8: liquid_sense_ot3_p50_multi, + }, + 1000: { + 1: liquid_sense_ot3_p1000_single, + 8: liquid_sense_ot3_p1000_multi, + 96: liquid_sense_ot3_p1000_96, + }, +} + +PIPETTE_MODEL_NAME = { + 50: { + 1: "p50_single_flex", + 8: "p50_multi_flex", + }, + 1000: { + 1: "p1000_single_flex", + 8: "p1000_multi_flex", + 96: "p1000_96_flex", + }, +} + + +@dataclass +class RunArgs: + """Common resources across multiple runs.""" + + tip_volumes: List[int] + run_id: str + pipette: InstrumentContext + pipette_tag: str + git_description: str + robot_serial: str + recorder: GravimetricRecorder + pipette_volume: int + pipette_channels: int + name: str + environment_sensor: asair_sensor.AsairSensorBase + trials: int + z_speed: float + return_tip: bool + ctx: ProtocolContext + protocol_cfg: Any + test_report: CSVReport + start_height_offset: float + aspirate: bool + dial_indicator: Optional[mitutoyo_digimatic_indicator.Mitutoyo_Digimatic_Indicator] + plunger_speed: bool + trials_before_jog: int + + @classmethod + def _get_protocol_context(cls, args: argparse.Namespace) -> ProtocolContext: + if not args.simulate and not args.skip_labware_offsets: + # getting labware offsets must be done before creating the protocol context + # because it requires the robot-server to be running + ui.print_title("SETUP") + ui.print_info( + "Starting opentrons-robot-server, so we can http GET labware offsets" + ) + LABWARE_OFFSETS.extend(workarounds.http_get_all_labware_offsets()) + ui.print_info(f"found {len(LABWARE_OFFSETS)} offsets:") + for offset in LABWARE_OFFSETS: + ui.print_info(f"\t{offset.createdAt}:") + ui.print_info(f"\t\t{offset.definitionUri}") + ui.print_info(f"\t\t{offset.vector}") + # gather the custom labware (for simulation) + custom_defs = {} + if args.simulate: + labware_dir = Path(__file__).parent.parent / "labware" + custom_def_uris = [ + "radwag_pipette_calibration_vial", + "dial_indicator", + ] + for def_uri in custom_def_uris: + with open(labware_dir / def_uri / "1.json", "r") as f: + custom_def = json_load(f) + custom_defs[def_uri] = custom_def + _ctx = helpers.get_api_context( + API_LEVEL, # type: ignore[attr-defined] + is_simulating=args.simulate, + pipette_left=PIPETTE_MODEL_NAME[args.pipette][args.channels], + extra_labware=custom_defs, + ) + for offset in LABWARE_OFFSETS: + engine = _ctx._core._engine_client._transport._engine # type: ignore[attr-defined] + engine.state_view._labware_store._add_labware_offset(offset) + return _ctx + + @classmethod + def build_run_args(cls, args: argparse.Namespace) -> "RunArgs": + """Build.""" + _ctx = RunArgs._get_protocol_context(args) + robot_serial = helpers._get_robot_serial(_ctx.is_simulating()) + run_id, start_time = create_run_id_and_start_time() + environment_sensor = asair_sensor.BuildAsairSensor( + _ctx.is_simulating() or args.ignore_env + ) + git_description = get_git_description() + protocol_cfg = LIQUID_SENSE_CFG[args.pipette][args.channels] + name = protocol_cfg.metadata["protocolName"] # type: ignore[attr-defined] + ui.print_header("LOAD PIPETTE") + pipette = _ctx.load_instrument( + f"flex_{args.channels}channel_{args.pipette}", "left" + ) + loaded_labwares = _ctx.loaded_labwares + if 12 in loaded_labwares.keys(): + trash = loaded_labwares[12] + else: + trash = _ctx.load_labware("opentrons_1_trash_3200ml_fixed", "A3") + pipette.trash_container = trash + pipette_tag = helpers._get_tag_from_pipette(pipette, False, False) + + if args.trials == 0: + trials = 10 + else: + trials = args.trials + + if args.tip == 0: + if args.pipette == 1000: + tip_volumes: List[int] = [50, 200, 1000] + else: + tip_volumes = [50] + else: + tip_volumes = [args.tip] + + scale = Scale.build(simulate=_ctx.is_simulating() or args.ignore_scale) + recorder: GravimetricRecorder = execute._load_scale( + name, + scale, + run_id, + pipette_tag, + start_time, + _ctx.is_simulating() or args.ignore_scale, + ) + dial: Optional[mitutoyo_digimatic_indicator.Mitutoyo_Digimatic_Indicator] = None + if not _ctx.is_simulating() and not args.ignore_dial: + dial_port = list_ports_and_select("Dial Indicator") + dial = mitutoyo_digimatic_indicator.Mitutoyo_Digimatic_Indicator( + port=dial_port + ) + dial.connect() + ui.print_info(f"pipette_tag {pipette_tag}") + report = build_ls_report(name, run_id, trials, tip_volumes) + report.set_tag(name) + # go ahead and store the meta data now + store_serial_numbers( + report, + robot_serial, + pipette_tag, + scale.read_serial_number(), + environment_sensor.get_serial(), + git_description, + ) + + store_config( + report, + name, + args.pipette, + tip_volumes, + trials, + args.plunger_direction, + args.liquid, + protocol_cfg.LABWARE_ON_SCALE, # type: ignore[attr-defined] + args.z_speed, + args.start_height_offset, + ) + return RunArgs( + tip_volumes=tip_volumes, + run_id=run_id, + pipette=pipette, + pipette_tag=pipette_tag, + git_description=git_description, + robot_serial=robot_serial, + recorder=recorder, + pipette_volume=args.pipette, + pipette_channels=args.channels, + name=name, + environment_sensor=environment_sensor, + trials=trials, + z_speed=args.z_speed, + return_tip=args.return_tip, + ctx=_ctx, + protocol_cfg=protocol_cfg, + test_report=report, + start_height_offset=args.start_height_offset, + aspirate=args.plunger_direction == "aspirate", + dial_indicator=dial, + plunger_speed=args.plunger_speed, + trials_before_jog=args.trials_before_jog, + ) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser("Pipette Testing") + parser.add_argument("--simulate", action="store_true") + parser.add_argument("--pipette", type=int, choices=[50, 1000], required=True) + parser.add_argument("--channels", type=int, choices=[1, 8, 96], default=1) + parser.add_argument("--tip", type=int, choices=[0, 50, 200, 1000], default=0) + parser.add_argument("--trials", type=int, default=0) + parser.add_argument("--return-tip", action="store_true") + parser.add_argument("--skip-labware-offsets", action="store_true") + parser.add_argument( + "--liquid", type=str, choices=["water", "glycerol", "alchohol"], default="water" + ) + parser.add_argument("--z-speed", type=float, default=5) + parser.add_argument( + "--plunger-direction", + type=str, + choices=["aspirate", "dispense"], + default="aspirate", + ) + parser.add_argument("--labware-type", type=str, default="nest_1_reservoir_195ml") + parser.add_argument("--plunger-speed", type=float, default=-1.0) + parser.add_argument("--isolate-plungers", action="store_true") + parser.add_argument("--start-height-offset", type=float, default=0) + parser.add_argument("--ignore-scale", action="store_true") + parser.add_argument("--ignore-env", action="store_true") + parser.add_argument("--ignore-dial", action="store_true") + parser.add_argument("--trials-before-jog", type=int, default=10) + + args = parser.parse_args() + run_args = RunArgs.build_run_args(args) + try: + if not run_args.ctx.is_simulating(): + data_dir = get_testing_data_directory() + data_file = f"/{data_dir}/{run_args.name}/{run_args.run_id}/serial.log" + ui.print_info(f"logging can data to {data_file}") + serial_logger = subprocess.Popen( + [f"python3 -m opentrons_hardware.scripts.can_mon > {data_file}"], + shell=True, + ) + sleep(1) + hw = run_args.ctx._core.get_hardware() + if not run_args.ctx.is_simulating(): + ui.get_user_ready("CLOSE the door, and MOVE AWAY from machine") + ui.print_info("homing...") + run_args.ctx.home() + for tip in run_args.tip_volumes: + if args.channels == 96 and not run_args.ctx.is_simulating(): + ui.alert_user_ready(f"prepare the {tip}ul tipracks", hw) + execute.run(tip, run_args) + except Exception as e: + ui.print_info(f"got error {e}") + ui.print_info(traceback.format_exc()) + finally: + if run_args.recorder is not None: + ui.print_info("ending recording") + run_args.recorder.stop() + run_args.recorder.deactivate() + if not run_args.ctx.is_simulating(): + ui.print_info("killing serial log") + serial_logger.terminate() + if run_args.dial_indicator is not None: + run_args.dial_indicator.disconnect() + run_args.test_report.save_to_disk() + run_args.test_report.print_results() + ui.print_info("done\n\n") + if not run_args.ctx.is_simulating(): + process_csv_directory( + f"{data_dir}/{run_args.name}/{run_args.run_id}", + run_args.tip_volumes, + run_args.trials, + ) + run_args.ctx.cleanup() + if not args.simulate: + helpers_ot3.restart_server_ot3() + os._exit(os.EX_OK) diff --git a/hardware-testing/hardware_testing/liquid_sense/execute.py b/hardware-testing/hardware_testing/liquid_sense/execute.py new file mode 100644 index 00000000000..1fc95d62d44 --- /dev/null +++ b/hardware-testing/hardware_testing/liquid_sense/execute.py @@ -0,0 +1,307 @@ +"""Logic for running a single liquid probe test.""" +from typing import Dict, Any, List, Tuple, Optional +from .report import store_tip_results, store_trial, store_baseline_trial +from opentrons.config.types import LiquidProbeSettings, OutputOptions +from .__main__ import RunArgs +from hardware_testing.gravimetric.workarounds import get_sync_hw_api +from hardware_testing.gravimetric.helpers import ( + _jog_to_find_liquid_height, +) +from hardware_testing.gravimetric.config import LIQUID_PROBE_SETTINGS +from hardware_testing.gravimetric.tips import get_unused_tips +from hardware_testing.data import ui, get_testing_data_directory +from opentrons.hardware_control.types import ( + InstrumentProbeType, + OT3Mount, + Axis, + top_types, +) + +from hardware_testing.gravimetric.measurement.scale import Scale +from hardware_testing.gravimetric.measurement.record import ( + GravimetricRecorder, + GravimetricRecorderConfig, +) +from opentrons.protocol_api._types import OffDeckType + +from opentrons.protocol_api import ProtocolContext, Well, Labware + + +def _load_tipracks( + ctx: ProtocolContext, pipette_channels: int, protocol_cfg: Any, tip: int +) -> List[Labware]: + # TODO add logic here for partial tip using 96 + use_adapters: bool = pipette_channels == 96 + tiprack_load_settings: List[Tuple[int, str]] = [ + ( + slot, + f"opentrons_flex_96_tiprack_{tip}ul", + ) + for slot in protocol_cfg.SLOTS_TIPRACK[tip] # type: ignore[attr-defined] + ] + for ls in tiprack_load_settings: + ui.print_info(f'Loading tiprack "{ls[1]}" in slot #{ls[0]}') + + adapter: Optional[str] = ( + "opentrons_flex_96_tiprack_adapter" if use_adapters else None + ) + # If running multiple tests in one run, the labware may already be loaded + loaded_labwares = ctx.loaded_labwares + ui.print_info(f"Loaded labwares {loaded_labwares}") + pre_loaded_tips: List[Labware] = [] + for ls in tiprack_load_settings: + if ls[0] in loaded_labwares.keys(): + if loaded_labwares[ls[0]].name == ls[1]: + pre_loaded_tips.append(loaded_labwares[ls[0]]) + else: + # If something is in the slot that's not what we want, remove it + # we use this only for the 96 channel + ui.print_info( + f"Removing {loaded_labwares[ls[0]].name} from slot {ls[0]}" + ) + ctx._core.move_labware( + loaded_labwares[ls[0]]._core, + new_location=OffDeckType.OFF_DECK, + use_gripper=False, + pause_for_manual_move=False, + pick_up_offset=None, + drop_offset=None, + ) + if len(pre_loaded_tips) == len(tiprack_load_settings): + return pre_loaded_tips + + tipracks: List[Labware] = [] + for ls in tiprack_load_settings: + if ctx.deck[ls[0]] is not None: + tipracks.append( + ctx.deck[ls[0]].load_labware(ls[1]) # type: ignore[union-attr] + ) + else: + tipracks.append(ctx.load_labware(ls[1], location=ls[0], adapter=adapter)) + return tipracks + + +def _load_dial_indicator(run_args: RunArgs) -> Labware: + slot_dial = run_args.protocol_cfg.SLOT_DIAL # type: ignore[union-attr] + dial_labware_name = "dial_indicator" + loaded_labwares = run_args.ctx.loaded_labwares + if ( + slot_dial in loaded_labwares.keys() + and loaded_labwares[slot_dial].name == dial_labware_name + ): + return loaded_labwares[slot_dial] + + dial_labware = run_args.ctx.load_labware( + dial_labware_name, location=slot_dial, namespace="custom_beta" + ) + return dial_labware + + +def _load_test_well(run_args: RunArgs) -> Labware: + slot_scale = run_args.protocol_cfg.SLOT_SCALE # type: ignore[union-attr] + labware_on_scale = run_args.protocol_cfg.LABWARE_ON_SCALE # type: ignore[union-attr] + ui.print_info(f'Loading labware on scale: "{labware_on_scale}"') + if labware_on_scale == "radwag_pipette_calibration_vial": + namespace = "custom_beta" + else: + namespace = "opentrons" + # If running multiple tests in one run, the labware may already be loaded + loaded_labwares = run_args.ctx.loaded_labwares + if ( + slot_scale in loaded_labwares.keys() + and loaded_labwares[slot_scale].name == labware_on_scale + ): + return loaded_labwares[slot_scale] + + labware_on_scale = run_args.ctx.load_labware( + labware_on_scale, location=slot_scale, namespace=namespace + ) + return labware_on_scale + + +def _load_scale( + name: str, + scale: Scale, + run_id: str, + pipette_tag: str, + start_time: float, + simulating: bool, +) -> GravimetricRecorder: + ui.print_header("LOAD SCALE") + ui.print_info( + "Some Radwag settings cannot be controlled remotely.\n" + "Listed below are the things the must be done using the touchscreen:\n" + " 1) Set profile to USER\n" + " 2) Set screensaver to NONE\n" + ) + recorder = GravimetricRecorder( + GravimetricRecorderConfig( + test_name=name, + run_id=run_id, + tag=pipette_tag, + start_time=start_time, + duration=0, + frequency=1000 if simulating else 60, + stable=False, + ), + scale, + simulate=simulating, + start_graph=False, + ) + ui.print_info(f'found scale "{recorder.serial_number}"') + if simulating: + recorder.set_simulation_mass(0) + recorder.record(in_thread=True) + ui.print_info(f'scale is recording to "{recorder.file_name}"') + return recorder + + +def run(tip: int, run_args: RunArgs) -> None: + """Run a liquid probe test.""" + test_labware: Labware = _load_test_well(run_args) + dial_indicator: Labware = _load_dial_indicator(run_args) + dial_well: Well = dial_indicator["A1"] + hw_api = get_sync_hw_api(run_args.ctx) + test_well: Well = test_labware["A1"] + _load_tipracks(run_args.ctx, run_args.pipette_channels, run_args.protocol_cfg, tip) + tips: List[Well] = get_unused_tips( + ctx=run_args.ctx, tip_volume=tip, pipette_mount="" + ) + assert len(tips) >= run_args.trials + results: List[float] = [] + adjusted_results: List[float] = [] + lpc_offset = 0.0 + if run_args.dial_indicator is not None: + run_args.pipette.move_to(dial_well.top()) + lpc_offset = run_args.dial_indicator.read_stable() + run_args.pipette._retract() + + def _get_baseline() -> float: + run_args.pipette.pick_up_tip(tips.pop(0)) + liquid_height = _jog_to_find_liquid_height( + run_args.ctx, run_args.pipette, test_well + ) + target_height = test_well.bottom(liquid_height).point.z + + run_args.pipette._retract() + # tip_offset = 0.0 + if run_args.dial_indicator is not None: + run_args.pipette.move_to(dial_well.top()) + tip_offset = run_args.dial_indicator.read_stable() + run_args.pipette._retract() + if run_args.return_tip: + run_args.pipette.return_tip() + else: + run_args.pipette.drop_tip() + + env_data = run_args.environment_sensor.get_reading() + + store_baseline_trial( + run_args.test_report, + tip, + target_height, + env_data.relative_humidity, + env_data.temperature, + test_well.top().point.z - target_height, + tip_offset - lpc_offset, + ) + return target_height + + trials_before_jog = run_args.trials_before_jog + tip_offset = 0.0 + for trial in range(run_args.trials): + if trial % trials_before_jog == 0: + tip_offset = _get_baseline() + + ui.print_info(f"Picking up {tip}ul tip") + run_args.pipette.pick_up_tip(tips.pop(0)) + run_args.pipette.move_to(test_well.top()) + + start_pos = hw_api.current_position_ot3(OT3Mount.LEFT) + height = _run_trial(run_args, tip, test_well, trial) + end_pos = hw_api.current_position_ot3(OT3Mount.LEFT) + run_args.pipette.blow_out() + tip_length_offset = 0.0 + if run_args.dial_indicator is not None: + + run_args.pipette._retract() + run_args.pipette.move_to(dial_well.top()) + tip_length_offset = tip_offset - run_args.dial_indicator.read_stable() + run_args.pipette._retract() + ui.print_info(f"Tip Offset {tip_length_offset}") + + ui.print_info("Droping tip") + if run_args.return_tip: + run_args.pipette.return_tip() + else: + run_args.pipette.drop_tip() + results.append(height) + adjusted_results.append(height + tip_length_offset) + env_data = run_args.environment_sensor.get_reading() + hw_pipette = hw_api.hardware_pipettes[top_types.Mount.LEFT] + plunger_start = ( + hw_pipette.plunger_positions.bottom + if run_args.aspirate + else hw_pipette.plunger_positions.top + ) + store_trial( + run_args.test_report, + trial, + tip, + height, + end_pos[Axis.P_L], + env_data.relative_humidity, + env_data.temperature, + start_pos[Axis.Z_L] - end_pos[Axis.Z_L], + plunger_start - end_pos[Axis.P_L], + tip_length_offset, + ) + ui.print_info( + f"\n\n Z axis start pos {start_pos[Axis.Z_L]} end pos {end_pos[Axis.Z_L]}" + ) + ui.print_info( + f"plunger start pos {plunger_start} end pos {end_pos[Axis.P_L]}\n\n" + ) + + ui.print_info(f"RESULTS: \n{results}") + ui.print_info(f"Adjusted RESULTS: \n{adjusted_results}") + store_tip_results(run_args.test_report, tip, results, adjusted_results) + + +def _run_trial(run_args: RunArgs, tip: int, well: Well, trial: int) -> float: + hw_api = get_sync_hw_api(run_args.ctx) + lqid_cfg: Dict[str, int] = LIQUID_PROBE_SETTINGS[run_args.pipette_volume][ + run_args.pipette_channels + ][tip] + data_dir = get_testing_data_directory() + data_filename = f"pressure_sensor_data-trial{trial}-tip{tip}.csv" + data_file = f"{data_dir}/{run_args.name}/{run_args.run_id}/{data_filename}" + ui.print_info(f"logging pressure data to {data_file}") + + plunger_speed = ( + lqid_cfg["plunger_speed"] + if run_args.plunger_speed == -1 + else run_args.plunger_speed + ) + lps = LiquidProbeSettings( + starting_mount_height=well.top().point.z + run_args.start_height_offset, + max_z_distance=min(well.depth, lqid_cfg["max_z_distance"]), + min_z_distance=lqid_cfg["min_z_distance"], + mount_speed=run_args.z_speed, + plunger_speed=plunger_speed, + sensor_threshold_pascals=lqid_cfg["sensor_threshold_pascals"], + expected_liquid_height=110, + output_option=OutputOptions.sync_buffer_to_csv, + aspirate_while_sensing=run_args.aspirate, + auto_zero_sensor=True, + num_baseline_reads=10, + data_file=data_file, + ) + + hw_mount = OT3Mount.LEFT if run_args.pipette.mount == "left" else OT3Mount.RIGHT + run_args.recorder.set_sample_tag(f"trial-{trial}-{tip}ul") + # TODO add in stuff for secondary probe + height = hw_api.liquid_probe(hw_mount, lps, InstrumentProbeType.PRIMARY) + ui.print_info(f"Trial {trial} complete") + run_args.recorder.clear_sample_tag() + return height diff --git a/hardware-testing/hardware_testing/liquid_sense/post_process.py b/hardware-testing/hardware_testing/liquid_sense/post_process.py new file mode 100644 index 00000000000..20e46ed746a --- /dev/null +++ b/hardware-testing/hardware_testing/liquid_sense/post_process.py @@ -0,0 +1,170 @@ +"""Post process script csvs.""" +import csv +import os +from typing import List, Dict, Tuple +from math import isclose + +COL_TRIAL_CONVERSION = { + 1: "E", + 2: "H", + 3: "K", + 4: "N", + 5: "Q", + 6: "T", + 7: "W", + 8: "Z", + 9: "AC", + 10: "AF", + 11: "AI", + 12: "AL", + 13: "AO", +} + + +def process_csv_directory( # noqa: C901 + data_directory: str, tips: List[int], trials: int, make_graph: bool = False +) -> None: + """Post process script csvs.""" + csv_files: List[str] = os.listdir(data_directory) + summary: str = [f for f in csv_files if "CSVReport" in f][0] + final_report_file: str = f"{data_directory}/final_report.csv" + # initialize our data structs + pressure_csvs = [f for f in csv_files if "pressure_sensor_data" in f] + pressure_results_files: Dict[int, List[str]] = {} + pressure_results: Dict[int, Dict[int, List[float]]] = {} + results_settings: Dict[int, Dict[int, Tuple[float, float, float]]] = {} + tip_offsets: Dict[int, List[float]] = {} + p_offsets: Dict[int, List[float]] = {} + meniscus_travel: float = 0 + for tip in tips: + pressure_results_files[tip] = [f for f in pressure_csvs if f"tip{tip}" in f] + pressure_results[tip] = {} + results_settings[tip] = {} + tip_offsets[tip] = [] + p_offsets[tip] = [i * 0 for i in range(trials)] + for trial in range(trials): + pressure_results[tip][trial] = [] + results_settings[tip][trial] = (0.0, 0.0, 0.0) + max_results_len = 0 + + # read in all of the pressure csvs into one big struct so we can process them + for tip in tips: + for trial in range(trials): + with open( + f"{data_directory}/{pressure_results_files[tip][trial]}", newline="" + ) as trial_csv: + trial_reader = csv.reader(trial_csv) + i = 0 + for row in trial_reader: + if i == 1: + results_settings[tip][trial] = ( + float(row[2]), + float(row[3]), + float(row[4]), + ) + if i > 1: + pressure_results[tip][trial].append(float(row[1])) + i += 1 + max_results_len = max([i - 2, max_results_len]) + # start writing the final report csv + with open(f"{data_directory}/{summary}", newline="") as summary_csv: + summary_reader = csv.reader(summary_csv) + with open(final_report_file, "w", newline="") as final_report: + # copy over the results summary + final_report_writer = csv.writer(final_report) + s = 0 + for row in summary_reader: + final_report_writer.writerow(row) + s += 1 + if s == 45: + meniscus_travel = float(row[6]) + if s >= 46 and s < 46 + (trials * len(tips)): + # while processing this grab the tip offsets from the summary + tip_offsets[tips[int((s - 46) / trials)]].append(float(row[8])) + # summary_reader.line_num is the last line in the summary that has text + pressures_start_line = summary_reader.line_num + 3 + # calculate where the start and end of each block of data we want to graph + final_report_writer.writerow( + [ + "50ul", + f"A{pressures_start_line-1}", + f"{COL_TRIAL_CONVERSION[trials]}{pressures_start_line + max_results_len -1}", + "200ul", + f"A{pressures_start_line+max_results_len-1}", + f"{COL_TRIAL_CONVERSION[trials]}{pressures_start_line +(2*max_results_len)-1}", + "10000ul", + f"A{pressures_start_line+(2*max_results_len-1)}", + f"{COL_TRIAL_CONVERSION[trials]}{pressures_start_line + (3*max_results_len)-1}", + ] + ) + + # build a header row + pressure_header_row = ["time", ""] + for i in range(trials): + pressure_header_row.extend( + [f"pressure T{i+1}", f"z_travel T{i+1}", f"p_travel T{i+1}"] + ) + + # we want to line up the z height's of each trial at time==0 + # to do this we drop the results at the beginning of each of the trials + # except for one with the longest tip (lower tip offset are longer tips) + min_tip_offset = 0.0 + if make_graph: + for tip in tips: + min_tip_offset = min(tip_offsets[tip]) + for trial in range(trials): + for i in range(max_results_len): + if tip_offsets[tip][trial] > min_tip_offset: + # drop this pressure result + pressure_results[tip][trial].pop(0) + # we don't want to change the length of this array so just + # stretch out the last value + pressure_results[tip][trial].append( + pressure_results[tip][trial][-1] + ) + # decrement the offset while this is true + # so we can account for it later + tip_offsets[tip][trial] -= ( + 0.001 * results_settings[tip][0][0] + ) + # keep track of how this effects the plunger start position + p_offsets[tip][trial] = ( + (i + 1) * 0.001 * results_settings[tip][0][1] * -1 + ) + else: + # we've lined up this trial so move to the next + break + # write the processed test data + for tip in tips: + time = 0.0 + final_report_writer.writerow(pressure_header_row) + meniscus_time = (meniscus_travel + min_tip_offset) / results_settings[ + tip + ][0][0] + for i in range(max_results_len): + pressure_row: List[str] = [f"{time}"] + if isclose( + time, + meniscus_time, + rel_tol=0.001, + ): + pressure_row.append("Meniscus") + else: + pressure_row.append("") + for trial in range(trials): + if i < len(pressure_results[tip][trial]): + pressure_row.append(f"{pressure_results[tip][trial][i]}") + else: + pressure_row.append("") + pressure_row.append( + f"{results_settings[tip][trial][0] * time - tip_offsets[tip][trial]}" + ) + pressure_row.append( + f"{abs(results_settings[tip][trial][1]) * time + p_offsets[tip][trial]}" + ) + final_report_writer.writerow(pressure_row) + time += 0.001 + + +if __name__ == "__main__": + process_csv_directory("/home/ryan/testdata", [50], 10) diff --git a/hardware-testing/hardware_testing/liquid_sense/report.py b/hardware-testing/hardware_testing/liquid_sense/report.py new file mode 100644 index 00000000000..bca898e79c7 --- /dev/null +++ b/hardware-testing/hardware_testing/liquid_sense/report.py @@ -0,0 +1,263 @@ +"""Format the csv report for a liquid-sense run.""" + +import statistics +from hardware_testing.data.csv_report import ( + CSVReport, + CSVSection, + CSVLine, + CSVLineRepeating, +) +from typing import List, Union + +""" +CSV Test Report: + - Serial numbers: + - Robot + - Pipette + - Scale + - Environment sensor + - Config: + - protocol name + - pipette_volume + - pipette_mount + - tip_volume + - trials + - plunger direction + - liquid + - labware type + - speed + - start height offset + - Trials + trial-x-{tipsize}ul + - Results + {tipsize}ul-average + {tipsize}ul-cv + {tipsize}ul-d +""" + + +def build_serial_number_section() -> CSVSection: + """Build section.""" + return CSVSection( + title="SERIAL-NUMBERS", + lines=[ + CSVLine("robot", [str]), + CSVLine("git_description", [str]), + CSVLine("pipette", [str]), + CSVLine("scale", [str]), + CSVLine("environment", [str]), + ], + ) + + +def build_config_section() -> CSVSection: + """Build section.""" + return CSVSection( + title="CONFIG", + lines=[ + CSVLine("protocol_name", [str]), + CSVLine("pipette_volume", [str]), + CSVLine("tip_volume", [bool, bool, bool]), + CSVLine("trials", [str]), + CSVLine("plunger_direction", [str]), + CSVLine("liquid", [str]), + CSVLine("labware_type", [str]), + CSVLine("speed", [str]), + CSVLine("start_height_offset", [str]), + ], + ) + + +def build_trials_section(trials: int, tips: List[int]) -> CSVSection: + """Build section.""" + lines: List[Union[CSVLine, CSVLineRepeating]] = [ + CSVLine("trial_number", [str, str, str, str, str, str, str, str]) + ] + lines.extend( + [ + CSVLine( + f"trial-baseline-{tip}ul", + [float, float, float, float, float, float, float, float], + ) + for tip in tips + ] + ) + lines.extend( + [ + CSVLine( + f"trial-{t + 1}-{tip}ul", + [float, float, float, float, float, float, float, float], + ) + for tip in tips + for t in range(trials) + ] + ) + + return CSVSection( + title="TRIALS", + lines=lines, + ) + + +def build_results_section(tips: List[int]) -> CSVSection: + """Build section.""" + lines: List[CSVLine] = [] + for tip in tips: + lines.append(CSVLine(f"{tip}ul-average", [float])) + lines.append(CSVLine(f"{tip}ul-minumum", [float])) + lines.append(CSVLine(f"{tip}ul-maximum", [float])) + lines.append(CSVLine(f"{tip}ul-stdev", [float])) + lines.append(CSVLine(f"{tip}ul-adjusted-average", [float])) + lines.append(CSVLine(f"{tip}ul-adjusted-minumum", [float])) + lines.append(CSVLine(f"{tip}ul-adjusted-maximum", [float])) + lines.append(CSVLine(f"{tip}ul-adjusted-stdev", [float])) + return CSVSection(title="RESULTS", lines=lines) # type: ignore[arg-type] + + +def store_serial_numbers( + report: CSVReport, + robot: str, + pipette: str, + scale: str, + environment: str, + git_description: str, +) -> None: + """Report serial numbers.""" + report("SERIAL-NUMBERS", "robot", [robot]) + report("SERIAL-NUMBERS", "git_description", [git_description]) + report("SERIAL-NUMBERS", "pipette", [pipette]) + report("SERIAL-NUMBERS", "scale", [scale]) + report("SERIAL-NUMBERS", "environment", [environment]) + + +def store_config( + report: CSVReport, + protocol_name: str, + pipette_volume: str, + tip_volumes: List[int], + trials: int, + plunger_direction: str, + liquid: str, + labware_type: str, + speed: str, + start_height_offset: str, +) -> None: + """Report config.""" + report("CONFIG", "protocol_name", [protocol_name]) + report("CONFIG", "pipette_volume", [pipette_volume]) + report( + "CONFIG", + "tip_volume", + [50 in tip_volumes, 200 in tip_volumes, 1000 in tip_volumes], + ) + report("CONFIG", "trials", [trials]) + report("CONFIG", "plunger_direction", [plunger_direction]) + report("CONFIG", "liquid", [liquid]) + report("CONFIG", "labware_type", [labware_type]) + report("CONFIG", "speed", [speed]) + report("CONFIG", "start_height_offset", [start_height_offset]) + + +def store_baseline_trial( + report: CSVReport, + tip: float, + height: float, + humidity: float, + temp: float, + z_travel: float, + measured_error: float, +) -> None: + """Report Trial.""" + report( + "TRIALS", + f"trial-baseline-{tip}ul", + [ + height, + 0, + humidity, + temp, + z_travel, + 0, + 0, + measured_error, + ], + ) + + +def store_trial( + report: CSVReport, + trial: int, + tip: float, + height: float, + plunger_pos: float, + humidity: float, + temp: float, + z_travel: float, + plunger_travel: float, + tip_length_offset: float, +) -> None: + """Report Trial.""" + report( + "TRIALS", + f"trial-{trial + 1}-{tip}ul", + [ + height, + plunger_pos, + humidity, + temp, + z_travel, + plunger_travel, + tip_length_offset, + height + tip_length_offset, + ], + ) + + +def store_tip_results( + report: CSVReport, tip: float, results: List[float], adjusted_results: List[float] +) -> None: + """Store final results.""" + report("RESULTS", f"{tip}ul-average", [sum(results) / len(results)]) + report("RESULTS", f"{tip}ul-minumum", [min(results)]) + report("RESULTS", f"{tip}ul-maximum", [max(results)]) + report("RESULTS", f"{tip}ul-stdev", [statistics.stdev(results)]) + report( + "RESULTS", + f"{tip}ul-adjusted-average", + [sum(adjusted_results) / len(adjusted_results)], + ) + report("RESULTS", f"{tip}ul-adjusted-minumum", [min(adjusted_results)]) + report("RESULTS", f"{tip}ul-adjusted-maximum", [max(adjusted_results)]) + report("RESULTS", f"{tip}ul-adjusted-stdev", [statistics.stdev(adjusted_results)]) + + +def build_ls_report( + test_name: str, run_id: str, trials: int, tips: List[int] +) -> CSVReport: + """Generate a CSV Report.""" + report = CSVReport( + test_name=test_name, + sections=[ + build_serial_number_section(), + build_config_section(), + build_trials_section(trials, tips), + build_results_section(tips), + ], + run_id=run_id, + start_time=0.0, + ) + report( + "TRIALS", + "trial_number", + [ + "height", + "plunger_pos", + "humidity", + "temp", + "z_travel", + "plunger_travel", + "tip_length_offset", + "adjusted_height", + ], + ) + return report diff --git a/hardware-testing/hardware_testing/opentrons_api/helpers_ot3.py b/hardware-testing/hardware_testing/opentrons_api/helpers_ot3.py index f277ff93f76..d1ff8f91d53 100644 --- a/hardware-testing/hardware_testing/opentrons_api/helpers_ot3.py +++ b/hardware-testing/hardware_testing/opentrons_api/helpers_ot3.py @@ -84,9 +84,7 @@ def stop_server_ot3() -> None: def restart_server_ot3() -> None: """Start opentrons-robot-server on the OT3.""" print('Starting "opentrons-robot-server"...') - Popen( - ["systemctl", "restart", "opentrons-robot-server", "&"], - ) + Popen(["systemctl restart opentrons-robot-server &"], shell=True) def start_server_ot3() -> None: diff --git a/hardware-testing/hardware_testing/protocols/liquid_sense_lpc/__init__.py b/hardware-testing/hardware_testing/protocols/liquid_sense_lpc/__init__.py new file mode 100644 index 00000000000..6ec34e45de0 --- /dev/null +++ b/hardware-testing/hardware_testing/protocols/liquid_sense_lpc/__init__.py @@ -0,0 +1 @@ +"""Liquid Sense LPC.""" diff --git a/hardware-testing/hardware_testing/protocols/liquid_sense_lpc/liquid_sense_ot3_p1000_96.py b/hardware-testing/hardware_testing/protocols/liquid_sense_lpc/liquid_sense_ot3_p1000_96.py new file mode 100644 index 00000000000..02644b314a4 --- /dev/null +++ b/hardware-testing/hardware_testing/protocols/liquid_sense_lpc/liquid_sense_ot3_p1000_96.py @@ -0,0 +1,33 @@ +"""Liquid sense OT3 P1000.""" +from opentrons.protocol_api import ProtocolContext + +metadata = {"protocolName": "liquid-sense-ot3-p1000-96"} +requirements = {"robotType": "Flex", "apiLevel": "2.15"} + +SLOT_SCALE = 4 +SLOT_DIAL = 5 +SLOTS_TIPRACK = { + # TODO: add slot 12 when tipracks are disposable + 50: [2, 3, 6, 7, 8, 9, 10, 11], + 200: [2, 3, 6, 7, 8, 9, 10, 11], # NOTE: ignored during calibration + 1000: [2, 3, 6, 7, 8, 9, 10, 11], # NOTE: ignored during calibration +} + +LABWARE_ON_SCALE = "nest_1_reservoir_195ml" + + +def run(ctx: ProtocolContext) -> None: + """Run.""" + tipracks = [ + ctx.load_labware(f"opentrons_flex_96_tiprack_{size}uL_adp", slot) + for size, slots in SLOTS_TIPRACK.items() + for slot in slots + if size == 50 # only calibrate 50ul tip-racks + ] + scale_labware = ctx.load_labware(LABWARE_ON_SCALE, SLOT_SCALE) + pipette = ctx.load_instrument("p1000_96", "left") + for rack in tipracks: + pipette.pick_up_tip(rack["A1"]) + pipette.aspirate(10, scale_labware["A1"].top()) + pipette.dispense(10, scale_labware["A1"].top()) + pipette.drop_tip(home_after=False) diff --git a/hardware-testing/hardware_testing/protocols/liquid_sense_lpc/liquid_sense_ot3_p1000_multi.py b/hardware-testing/hardware_testing/protocols/liquid_sense_lpc/liquid_sense_ot3_p1000_multi.py new file mode 100644 index 00000000000..d2b806d1229 --- /dev/null +++ b/hardware-testing/hardware_testing/protocols/liquid_sense_lpc/liquid_sense_ot3_p1000_multi.py @@ -0,0 +1,26 @@ +"""LiquidSense OT3 P1000.""" +from opentrons.protocol_api import ProtocolContext + +metadata = {"protocolName": "liquid-sense-ot3-p1000-multi"} +requirements = {"robotType": "Flex", "apiLevel": "2.15"} + +SLOT_SCALE = 4 +SLOT_DIAL = 5 +SLOTS_TIPRACK = {50: [2], 200: [3], 1000: [6]} +LABWARE_ON_SCALE = "nest_1_reservoir_195ml" + + +def run(ctx: ProtocolContext) -> None: + """Run.""" + tipracks = [ + ctx.load_labware(f"opentrons_flex_96_tiprack_{size}uL", slot) + for size, slots in SLOTS_TIPRACK.items() + for slot in slots + ] + vial = ctx.load_labware(LABWARE_ON_SCALE, SLOT_SCALE) + pipette = ctx.load_instrument("flex_8channel_1000", "left") + for rack in tipracks: + pipette.pick_up_tip(rack["A1"]) + pipette.aspirate(10, vial["A1"].top()) + pipette.dispense(10, vial["A1"].top()) + pipette.drop_tip(home_after=False) diff --git a/hardware-testing/hardware_testing/protocols/liquid_sense_lpc/liquid_sense_ot3_p1000_single.py b/hardware-testing/hardware_testing/protocols/liquid_sense_lpc/liquid_sense_ot3_p1000_single.py new file mode 100644 index 00000000000..4e8fcc177f4 --- /dev/null +++ b/hardware-testing/hardware_testing/protocols/liquid_sense_lpc/liquid_sense_ot3_p1000_single.py @@ -0,0 +1,33 @@ +"""Liquid Sense OT3 P1000.""" +from opentrons.protocol_api import ProtocolContext + +metadata = {"protocolName": "liquid-sense-ot3-p1000-single"} +requirements = {"robotType": "Flex", "apiLevel": "2.15"} + +SLOT_SCALE = 4 +SLOT_DIAL = 5 +SLOTS_TIPRACK = { + 50: [3], + 200: [6], + 1000: [9], +} +LABWARE_ON_SCALE = "nest_1_reservoir_195ml" + + +def run(ctx: ProtocolContext) -> None: + """Run.""" + tipracks = [ + ctx.load_labware(f"opentrons_flex_96_tiprack_{size}uL", slot) + for size, slots in SLOTS_TIPRACK.items() + for slot in slots + ] + vial = ctx.load_labware(LABWARE_ON_SCALE, SLOT_SCALE) + dial = ctx.load_labware("dial_indicator", SLOT_DIAL) + pipette = ctx.load_instrument("flex_1channel_1000", "left") + for rack in tipracks: + pipette.pick_up_tip(rack["A1"]) + pipette.aspirate(10, vial["A1"].top()) + pipette.dispense(10, vial["A1"].top()) + pipette.aspirate(1, dial["A1"].top()) + pipette.dispense(1, dial["A1"].top()) + pipette.drop_tip(home_after=False) diff --git a/hardware-testing/hardware_testing/protocols/liquid_sense_lpc/liquid_sense_ot3_p50_multi.py b/hardware-testing/hardware_testing/protocols/liquid_sense_lpc/liquid_sense_ot3_p50_multi.py new file mode 100644 index 00000000000..34f83cd4cf7 --- /dev/null +++ b/hardware-testing/hardware_testing/protocols/liquid_sense_lpc/liquid_sense_ot3_p50_multi.py @@ -0,0 +1,28 @@ +"""Liquid Sense OT3.""" +from opentrons.protocol_api import ProtocolContext + +metadata = {"protocolName": "liquid_sense-ot3-p50-multi-50ul-tip"} +requirements = {"robotType": "Flex", "apiLevel": "2.15"} + +SLOT_SCALE = 4 +SLOT_DIAL = 5 +SLOTS_TIPRACK = { + 50: [3], +} +LABWARE_ON_SCALE = "nest_1_reservoir_195ml" + + +def run(ctx: ProtocolContext) -> None: + """Run.""" + tipracks = [ + ctx.load_labware(f"opentrons_flex_96_tiprack_{size}uL", slot) + for size, slots in SLOTS_TIPRACK.items() + for slot in slots + ] + vial = ctx.load_labware(LABWARE_ON_SCALE, SLOT_SCALE) + pipette = ctx.load_instrument("flex_8channel_50", "left") + for rack in tipracks: + pipette.pick_up_tip(rack["A1"]) + pipette.aspirate(pipette.min_volume, vial["A1"].top()) + pipette.dispense(pipette.min_volume, vial["A1"].top()) + pipette.drop_tip(home_after=False) diff --git a/hardware-testing/hardware_testing/protocols/liquid_sense_lpc/liquid_sense_ot3_p50_single.py b/hardware-testing/hardware_testing/protocols/liquid_sense_lpc/liquid_sense_ot3_p50_single.py new file mode 100644 index 00000000000..8e9d65a72e2 --- /dev/null +++ b/hardware-testing/hardware_testing/protocols/liquid_sense_lpc/liquid_sense_ot3_p50_single.py @@ -0,0 +1,31 @@ +"""Liquid Sense OT3.""" +from opentrons.protocol_api import ProtocolContext + +metadata = {"protocolName": "liquid-sense-ot3-p50-single"} +requirements = {"robotType": "Flex", "apiLevel": "2.15"} + +SLOT_SCALE = 4 +SLOT_DIAL = 5 +SLOTS_TIPRACK = { + 50: [3], +} +LABWARE_ON_SCALE = "radwag_pipette_calibration_vial" + + +def run(ctx: ProtocolContext) -> None: + """Run.""" + tipracks = [ + ctx.load_labware(f"opentrons_flex_96_tiprack_{size}uL", slot) + for size, slots in SLOTS_TIPRACK.items() + for slot in slots + ] + vial = ctx.load_labware(LABWARE_ON_SCALE, SLOT_SCALE) + dial = ctx.load_labware("dial_indicator", SLOT_DIAL) + pipette = ctx.load_instrument("flex_1channel_50", "left") + for rack in tipracks: + pipette.pick_up_tip(rack["A1"]) + pipette.aspirate(10, vial["A1"].top()) + pipette.dispense(10, vial["A1"].top()) + pipette.aspirate(1, dial["A1"].top()) + pipette.dispense(1, dial["A1"].top()) + pipette.drop_tip(home_after=False) diff --git a/hardware/opentrons_hardware/firmware_bindings/messages/messages.py b/hardware/opentrons_hardware/firmware_bindings/messages/messages.py index 6611edecfe4..9906aa8dc07 100644 --- a/hardware/opentrons_hardware/firmware_bindings/messages/messages.py +++ b/hardware/opentrons_hardware/firmware_bindings/messages/messages.py @@ -111,6 +111,7 @@ defs.GetHepaUVStateResponse, defs.SendAccumulatedPressureDataRequest, defs.AddSensorLinearMoveRequest, + defs.SendAccumulatedPressureDataRequest, ] diff --git a/hardware/opentrons_hardware/hardware_control/tool_sensors.py b/hardware/opentrons_hardware/hardware_control/tool_sensors.py index 94301464f22..67e85a1554b 100644 --- a/hardware/opentrons_hardware/hardware_control/tool_sensors.py +++ b/hardware/opentrons_hardware/hardware_control/tool_sensors.py @@ -201,7 +201,6 @@ async def liquid_probe( csv_output: bool = False, sync_buffer_output: bool = False, can_bus_only_output: bool = False, - # output_option: OutputOptions, data_file: Optional[str] = None, auto_zero_sensor: bool = True, num_baseline_reads: int = 10, @@ -232,7 +231,7 @@ async def liquid_probe( ) sensor_runner = MoveGroupRunner(move_groups=[[sensor_group]]) - log_file: str = "/var/pressure_sensor_data.csv" if not data_file else data_file + log_file: str = "/data/pressure_sensor_data.csv" if not data_file else data_file if csv_output: return await run_stream_output_to_csv( messenger,