Skip to content

Commit

Permalink
debugging
Browse files Browse the repository at this point in the history
  • Loading branch information
andySigler committed Jul 20, 2023
1 parent 2378654 commit fdb194b
Show file tree
Hide file tree
Showing 3 changed files with 173 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -190,9 +190,11 @@ async def build_async_ot3_hardware_api(
api = await builder(loop=loop, **kwargs) # type: ignore[arg-type]
if not is_simulating:
await asyncio.sleep(0.5)
await api.cache_instruments()
if not is_simulating:
await update_firmware(api)
print(f"Firmware: v{api.fw_version}")
await api.cache_instruments()
await api.refresh_positions()
return api


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,9 @@
LEAK_HOVER_ABOVE_LIQUID_MM: Final = 50
ASPIRATE_SUBMERGE_MM: Final = 3

LIQUID_PROBE_ERROR_THRESHOLD_PRECISION_MM = 0.2
LIQUID_PROBE_ERROR_THRESHOLD_ACCURACY_MM = 0.2
# FIXME: reduce this spec after dial indicator is implemented
LIQUID_PROBE_ERROR_THRESHOLD_PRECISION_MM = 0.4
LIQUID_PROBE_ERROR_THRESHOLD_ACCURACY_MM = 0.4

SAFE_HEIGHT_TRAVEL = 10
SAFE_HEIGHT_CALIBRATE = 0
Expand Down Expand Up @@ -1120,15 +1121,16 @@ async def _test_liquid_probe(
pip = api.hardware_pipettes[mount.to_mount()]
assert pip
pip_vol = int(pip.working_volume)
if not CALIBRATED_LABWARE_LOCATIONS.reservoir:
await _pick_up_tip_for_tip_volume(api, mount, tip_volume)
await _move_to_liquid(api, mount)
await _drop_tip_in_trash(api, mount)
# force the operator to re-calibrate the liquid every time
CALIBRATED_LABWARE_LOCATIONS.reservoir = None
await _pick_up_tip_for_tip_volume(api, mount, tip_volume)
await _move_to_liquid(api, mount)
await _drop_tip_in_trash(api, mount)
trial_results: List[float] = []
hover_mm = 3
max_submerge_mm = -3
max_z_distance_machine_coords = hover_mm - max_submerge_mm
assert CALIBRATED_LABWARE_LOCATIONS.reservoir
assert CALIBRATED_LABWARE_LOCATIONS.reservoir is not None
target_z = CALIBRATED_LABWARE_LOCATIONS.reservoir.z
for trial in range(trials):
await _pick_up_tip_for_tip_volume(api, mount, tip_volume)
Expand Down Expand Up @@ -1377,7 +1379,9 @@ async def _main(test_config: TestConfig) -> None:
print(prec_tag, precision, _bool_to_pass_fail(precision_passed))
print(acc_tag, accuracy, _bool_to_pass_fail(accuracy_passed))
print(tip_tag, _bool_to_pass_fail(tip_passed))
csv_cb.write([prec_tag, precision, _bool_to_pass_fail(precision_passed)])
csv_cb.write(
[prec_tag, precision, _bool_to_pass_fail(precision_passed)]
)
csv_cb.write([acc_tag, accuracy, _bool_to_pass_fail(accuracy_passed)])
csv_cb.write([tip_tag, _bool_to_pass_fail(tip_passed)])
if not tip_passed:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
"""Pipette Current/Speed Test."""
import argparse
import asyncio
from typing import Optional

from opentrons.hardware_control.ot3api import OT3API
from opentrons.config.defaults_ot3 import (
DEFAULT_RUN_CURRENT,
DEFAULT_MAX_SPEEDS,
DEFAULT_ACCELERATIONS,
DEFAULT_MAX_SPEED_DISCONTINUITY,
)

from opentrons_shared_data.errors.exceptions import StallOrCollisionDetectedError
Expand All @@ -21,10 +23,16 @@
from hardware_testing.opentrons_api import helpers_ot3
from hardware_testing.data import ui

TEST_TAG = "CURRENTS-SPEEDS"
TEST_TAG_NO_ACCELERATION = "NO-ACCELERATION"
TEST_TAG_WITH_ACCELERATION = "WITH-ACCELERATION"

STALL_THRESHOLD_MM = 0.25
TEST_SPEEDS = [40, 60, 70, 80]
NO_ACCELERATION_TEST_SPEED = 20
NO_ACCELERATION_TEST_TRIALS = 1
NO_ACCELERATION_TEST_CURRENTS = [0.8, 0.6, 0.4, 0.3, 0.2]
NO_ACCELERATION_TEST_PASS_CURRENT = 0.5

STALL_THRESHOLD_MM = 0.1
TEST_SPEEDS = [80]
PLUNGER_CURRENTS_SPEED = {
0.2: TEST_SPEEDS,
0.3: TEST_SPEEDS,
Expand All @@ -36,6 +44,7 @@
DEFAULT_ACCELERATION = DEFAULT_ACCELERATIONS.low_throughput[types.OT3AxisKind.P]
DEFAULT_CURRENT = DEFAULT_RUN_CURRENT.low_throughput[types.OT3AxisKind.P]
DEFAULT_SPEED = DEFAULT_MAX_SPEEDS.low_throughput[types.OT3AxisKind.P]
DEFAULT_DISCONTINUITY = DEFAULT_MAX_SPEED_DISCONTINUITY.low_throughput[types.OT3AxisKind.P]
MAX_CURRENT = max(max(list(PLUNGER_CURRENTS_SPEED.keys())), 1.0)
MAX_SPEED = max(TEST_SPEEDS)

Expand All @@ -46,19 +55,45 @@ def _get_operator(is_simulating: bool) -> str:
return input("enter OPERATOR name: ")


def _get_test_tag(current: float, speed: float, direction: str, pos: str) -> str:
return f"current-{current}-speed-{speed}-{direction}-{pos}"
def _get_test_tag(
current: float, speed: float, direction: str, pos: str, has_acceleration: bool
) -> str:
if has_acceleration:
accel_substring = "with-acceleration"
else:
accel_substring = "no-acceleration"
return f"current-{current}-speed-{speed}-{direction}-{pos}-{accel_substring}"


def _build_csv_report(operator: str, pipette_sn: str) -> CSVReport:
_report = CSVReport(
test_name="pipette-current-speed-qc-ot3",
sections=[
CSVSection(
title=TEST_TAG,
title=TEST_TAG_NO_ACCELERATION,
lines=[
CSVLine(
_get_test_tag(
current,
NO_ACCELERATION_TEST_SPEED,
direction,
pos,
has_acceleration=False,
),
[float, float, float, float, CSVResult],
)
for current in NO_ACCELERATION_TEST_CURRENTS
for direction in ["down", "up"]
for pos in ["start", "end"]
],
),
CSVSection(
title=TEST_TAG_WITH_ACCELERATION,
lines=[
CSVLine(
_get_test_tag(current, speed, direction, pos),
_get_test_tag(
current, speed, direction, pos, has_acceleration=True
),
[float, float, float, float, CSVResult],
)
for current, speeds in PLUNGER_CURRENTS_SPEED.items()
Expand All @@ -76,39 +111,51 @@ def _build_csv_report(operator: str, pipette_sn: str) -> CSVReport:


async def _home_plunger(api: OT3API, mount: types.OT3Mount) -> None:
print("homing")
# restore default current/speed before homing
pipette_ax = types.Axis.of_main_tool_actuator(mount)
await helpers_ot3.set_gantry_load_per_axis_current_settings_ot3(
api, pipette_ax, run_current=DEFAULT_CURRENT
)
await helpers_ot3.set_gantry_load_per_axis_motion_settings_ot3(
api, pipette_ax, default_max_speed=DEFAULT_SPEED
api, pipette_ax, default_max_speed=DEFAULT_SPEED, max_speed_discontinuity=DEFAULT_DISCONTINUITY
)
await api.home([pipette_ax])
# using backend to home, so we avoid unrecoverable error after a stall
await api._backend.home([pipette_ax], api.gantry_load)
await api.refresh_positions()


async def _move_plunger(
api: OT3API,
mount: types.OT3Mount,
p: float,
s: float,
c: float,
a: float,
position: float,
speed: float,
current: float,
acceleration: Optional[float],
) -> None:
# set max currents/speeds, to make sure we're not accidentally limiting ourselves
pipette_ax = types.Axis.of_main_tool_actuator(mount)
await helpers_ot3.set_gantry_load_per_axis_current_settings_ot3(
api, pipette_ax, run_current=MAX_CURRENT
)
if acceleration:
_discontinuity = DEFAULT_DISCONTINUITY
_acceleration = acceleration
else:
_discontinuity = speed
_acceleration = DEFAULT_ACCELERATION
print(f"speed={speed}, discontinuity={_discontinuity}, "
f"acceleration={_acceleration}, current={current}")
await helpers_ot3.set_gantry_load_per_axis_motion_settings_ot3(
api,
pipette_ax,
default_max_speed=MAX_SPEED,
acceleration=a,
default_max_speed=speed,
max_speed_discontinuity=speed,
acceleration=DEFAULT_ACCELERATION,
)
# move
await helpers_ot3.move_plunger_absolute_ot3(
api, mount, p, speed=s, motor_current=c, expect_stalls=True
api, mount, position, speed=speed, motor_current=current, expect_stalls=True
)


Expand All @@ -120,6 +167,7 @@ async def _record_plunger_alignment(
speed: float,
direction: str,
position: str,
has_acceleration: bool,
) -> bool:
pipette_ax = types.Axis.of_main_tool_actuator(mount)
_current_pos = await api.current_position_ot3(mount)
Expand All @@ -132,9 +180,13 @@ async def _record_plunger_alignment(
_stalled_mm = est - enc
print(f"{position}: motor={est}, encoder={enc}")
_did_pass = abs(_stalled_mm) < STALL_THRESHOLD_MM
_tag = _get_test_tag(current, speed, direction, position)
_tag = _get_test_tag(current, speed, direction, position, has_acceleration)
if has_acceleration:
section_title = TEST_TAG_WITH_ACCELERATION
else:
section_title = TEST_TAG_NO_ACCELERATION
report(
TEST_TAG,
section_title,
_tag,
[current, speed, est, enc, CSVResult.from_bool(_did_pass)],
)
Expand All @@ -147,14 +199,22 @@ async def _test_direction(
report: CSVReport,
current: float,
speed: float,
acceleration: float,
acceleration: Optional[float],
direction: str,
) -> bool:
has_acceleration = True if acceleration else False
plunger_poses = helpers_ot3.get_plunger_positions_ot3(api, mount)
top, bottom, blowout, drop_tip = plunger_poses
# check that encoder/motor align
aligned = await _record_plunger_alignment(
api, mount, report, current, speed, direction, "start"
api,
mount,
report,
current,
speed,
direction,
"start",
has_acceleration,
)
if not aligned:
return False
Expand All @@ -164,7 +224,14 @@ async def _test_direction(
await _move_plunger(api, mount, _plunger_target, speed, current, acceleration)
# check that encoder/motor still align
aligned = await _record_plunger_alignment(
api, mount, report, current, speed, direction, "end"
api,
mount,
report,
current,
speed,
direction,
"end",
has_acceleration,
)
except StallOrCollisionDetectedError as e:
print(e)
Expand All @@ -174,26 +241,76 @@ async def _test_direction(


async def _unstick_plunger(api: OT3API, mount: types.OT3Mount) -> None:
ui.print_header("UNSTICK PLUNGER")
await api.refresh_positions()
plunger_poses = helpers_ot3.get_plunger_positions_ot3(api, mount)
top, bottom, blowout, drop_tip = plunger_poses
print("moving to plunger bottom")
pos = await api.current_position_ot3(mount)
enc = await api.encoder_current_position_ot3(mount)
print("pos")
print(pos)
print("end")
print(enc)
await _move_plunger(api, mount, bottom, 10, 1.0, DEFAULT_ACCELERATION)
pos = await api.current_position_ot3(mount)
enc = await api.encoder_current_position_ot3(mount)
print("pos")
print(pos)
print("enc")
print(enc)
await _home_plunger(api, mount)


async def _test_plunger(api: OT3API, mount: types.OT3Mount, report: CSVReport) -> None:
ui.print_header("UNSTICK PLUNGER")
await _unstick_plunger(api, mount)
async def _test_plunger_no_acceleration(api: OT3API, mount: types.OT3Mount, report: CSVReport) -> bool:
currents = sorted(NO_ACCELERATION_TEST_CURRENTS, reverse=True)
speed = NO_ACCELERATION_TEST_SPEED
for current in currents:
for trial in range(NO_ACCELERATION_TEST_TRIALS):
ui.print_header(
f"NO ACCELERATION; "
f"CURRENT = {current}; SPEED = {speed}; "
f"TRIAL #{trial + 1}"
)
await _home_plunger(api, mount)
for direction in ["down", "up"]:
_pass = await _test_direction(
api,
mount,
report,
current,
speed,
None, # no acceleration
direction,
)
if not _pass:
ui.print_error(
f"failed moving {direction} at {current} amps and {speed} mm/sec"
)
if current >= NO_ACCELERATION_TEST_PASS_CURRENT:
return False
else:
return True


async def _test_plunger_with_acceleration(api: OT3API, mount: types.OT3Mount, report: CSVReport) -> None:
# start at HIGHEST (easiest) current
currents = sorted(list(PLUNGER_CURRENTS_SPEED.keys()), reverse=True)
for current in currents:
# start at LOWEST (easiest) speed
speeds = sorted(PLUNGER_CURRENTS_SPEED[current], reverse=False)
for speed in speeds:
ui.print_header(f"CURRENT = {current}; SPEED = {speed}")
ui.print_header(f"WITH ACCELERATION; CURRENT = {current}; SPEED = {speed}")
await _home_plunger(api, mount)
for direction in ["down", "up"]:
_pass = await _test_direction(
api, mount, report, current, speed, TEST_ACCELERATION, direction
api,
mount,
report,
current,
speed,
TEST_ACCELERATION,
direction,
)
if not _pass:
ui.print_error(
Expand All @@ -202,10 +319,22 @@ async def _test_plunger(api: OT3API, mount: types.OT3Mount, report: CSVReport) -
return


async def _test_plunger(api: OT3API, mount: types.OT3Mount, report: CSVReport) -> None:
await _unstick_plunger(api, mount)
# result = await _test_plunger_no_acceleration(api, mount, report)
# if not result:
# ui.print_error("no-acceleration test failed, exiting test early")
# return
await _test_plunger_with_acceleration(api, mount, report)


async def _get_next_pipette_mount(api: OT3API) -> types.OT3Mount:
if not api.is_simulator:
ui.get_user_ready("attach a pipette")
await api.cache_instruments()
# update pipette, b/c assemblers will be attaching new pipettes
# while this script continues to run
await helpers_ot3.update_firmware(api)
await api.cache_instruments()
found = [
types.OT3Mount.from_mount(m) for m, p in api.hardware_pipettes.items() if p
]
Expand All @@ -215,7 +344,8 @@ async def _get_next_pipette_mount(api: OT3API) -> types.OT3Mount:


async def _reset_gantry(api: OT3API) -> None:
await api.home()
print("resetting the gantry")
await api.home([types.Axis.X, types.Axis.Y, types.Axis.Z_L, types.Axis.Z_R])
home_pos = await api.gantry_position(
types.OT3Mount.RIGHT, types.CriticalPoint.MOUNT
)
Expand Down

0 comments on commit fdb194b

Please sign in to comment.