Skip to content

Commit

Permalink
revert to version of current/speed without no-acceleration
Browse files Browse the repository at this point in the history
  • Loading branch information
andySigler committed Jul 20, 2023
1 parent fdb194b commit c2ae357
Showing 1 changed file with 26 additions and 154 deletions.
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
"""Pipette Current/Speed Test."""
import argparse
import asyncio
from typing import Optional

from opentrons.hardware_control.ot3api import OT3API
from opentrons.config.defaults_ot3 import (
DEFAULT_RUN_CURRENT,
DEFAULT_MAX_SPEEDS,
DEFAULT_ACCELERATIONS,
DEFAULT_MAX_SPEED_DISCONTINUITY,
)

from opentrons_shared_data.errors.exceptions import StallOrCollisionDetectedError
Expand All @@ -23,13 +21,7 @@
from hardware_testing.opentrons_api import helpers_ot3
from hardware_testing.data import ui

TEST_TAG_NO_ACCELERATION = "NO-ACCELERATION"
TEST_TAG_WITH_ACCELERATION = "WITH-ACCELERATION"

NO_ACCELERATION_TEST_SPEED = 20
NO_ACCELERATION_TEST_TRIALS = 1
NO_ACCELERATION_TEST_CURRENTS = [0.8, 0.6, 0.4, 0.3, 0.2]
NO_ACCELERATION_TEST_PASS_CURRENT = 0.5
TEST_TAG = "CURRENTS-SPEEDS"

STALL_THRESHOLD_MM = 0.1
TEST_SPEEDS = [80]
Expand All @@ -38,13 +30,13 @@
0.3: TEST_SPEEDS,
0.4: TEST_SPEEDS,
0.6: TEST_SPEEDS,
1.0: TEST_SPEEDS
}
TEST_ACCELERATION = 1500 # used during gravimetric tests

DEFAULT_ACCELERATION = DEFAULT_ACCELERATIONS.low_throughput[types.OT3AxisKind.P]
DEFAULT_CURRENT = DEFAULT_RUN_CURRENT.low_throughput[types.OT3AxisKind.P]
DEFAULT_SPEED = DEFAULT_MAX_SPEEDS.low_throughput[types.OT3AxisKind.P]
DEFAULT_DISCONTINUITY = DEFAULT_MAX_SPEED_DISCONTINUITY.low_throughput[types.OT3AxisKind.P]
MAX_CURRENT = max(max(list(PLUNGER_CURRENTS_SPEED.keys())), 1.0)
MAX_SPEED = max(TEST_SPEEDS)

Expand All @@ -55,45 +47,19 @@ def _get_operator(is_simulating: bool) -> str:
return input("enter OPERATOR name: ")


def _get_test_tag(
current: float, speed: float, direction: str, pos: str, has_acceleration: bool
) -> str:
if has_acceleration:
accel_substring = "with-acceleration"
else:
accel_substring = "no-acceleration"
return f"current-{current}-speed-{speed}-{direction}-{pos}-{accel_substring}"
def _get_test_tag(current: float, speed: float, direction: str, pos: str) -> str:
return f"current-{current}-speed-{speed}-{direction}-{pos}"


def _build_csv_report(operator: str, pipette_sn: str) -> CSVReport:
_report = CSVReport(
test_name="pipette-current-speed-qc-ot3",
sections=[
CSVSection(
title=TEST_TAG_NO_ACCELERATION,
lines=[
CSVLine(
_get_test_tag(
current,
NO_ACCELERATION_TEST_SPEED,
direction,
pos,
has_acceleration=False,
),
[float, float, float, float, CSVResult],
)
for current in NO_ACCELERATION_TEST_CURRENTS
for direction in ["down", "up"]
for pos in ["start", "end"]
],
),
CSVSection(
title=TEST_TAG_WITH_ACCELERATION,
title=TEST_TAG,
lines=[
CSVLine(
_get_test_tag(
current, speed, direction, pos, has_acceleration=True
),
_get_test_tag(current, speed, direction, pos),
[float, float, float, float, CSVResult],
)
for current, speeds in PLUNGER_CURRENTS_SPEED.items()
Expand All @@ -111,51 +77,39 @@ def _build_csv_report(operator: str, pipette_sn: str) -> CSVReport:


async def _home_plunger(api: OT3API, mount: types.OT3Mount) -> None:
print("homing")
# restore default current/speed before homing
pipette_ax = types.Axis.of_main_tool_actuator(mount)
await helpers_ot3.set_gantry_load_per_axis_current_settings_ot3(
api, pipette_ax, run_current=DEFAULT_CURRENT
)
await helpers_ot3.set_gantry_load_per_axis_motion_settings_ot3(
api, pipette_ax, default_max_speed=DEFAULT_SPEED, max_speed_discontinuity=DEFAULT_DISCONTINUITY
api, pipette_ax, default_max_speed=DEFAULT_SPEED
)
# using backend to home, so we avoid unrecoverable error after a stall
await api._backend.home([pipette_ax], api.gantry_load)
await api.refresh_positions()
await api.home([pipette_ax])


async def _move_plunger(
api: OT3API,
mount: types.OT3Mount,
position: float,
speed: float,
current: float,
acceleration: Optional[float],
p: float,
s: float,
c: float,
a: float,
) -> None:
# set max currents/speeds, to make sure we're not accidentally limiting ourselves
pipette_ax = types.Axis.of_main_tool_actuator(mount)
await helpers_ot3.set_gantry_load_per_axis_current_settings_ot3(
api, pipette_ax, run_current=MAX_CURRENT
)
if acceleration:
_discontinuity = DEFAULT_DISCONTINUITY
_acceleration = acceleration
else:
_discontinuity = speed
_acceleration = DEFAULT_ACCELERATION
print(f"speed={speed}, discontinuity={_discontinuity}, "
f"acceleration={_acceleration}, current={current}")
await helpers_ot3.set_gantry_load_per_axis_motion_settings_ot3(
api,
pipette_ax,
default_max_speed=speed,
max_speed_discontinuity=speed,
acceleration=DEFAULT_ACCELERATION,
default_max_speed=MAX_SPEED,
acceleration=a,
)
# move
await helpers_ot3.move_plunger_absolute_ot3(
api, mount, position, speed=speed, motor_current=current, expect_stalls=True
api, mount, p, speed=s, motor_current=c, expect_stalls=True
)


Expand All @@ -167,7 +121,6 @@ async def _record_plunger_alignment(
speed: float,
direction: str,
position: str,
has_acceleration: bool,
) -> bool:
pipette_ax = types.Axis.of_main_tool_actuator(mount)
_current_pos = await api.current_position_ot3(mount)
Expand All @@ -180,13 +133,9 @@ async def _record_plunger_alignment(
_stalled_mm = est - enc
print(f"{position}: motor={est}, encoder={enc}")
_did_pass = abs(_stalled_mm) < STALL_THRESHOLD_MM
_tag = _get_test_tag(current, speed, direction, position, has_acceleration)
if has_acceleration:
section_title = TEST_TAG_WITH_ACCELERATION
else:
section_title = TEST_TAG_NO_ACCELERATION
_tag = _get_test_tag(current, speed, direction, position)
report(
section_title,
TEST_TAG,
_tag,
[current, speed, est, enc, CSVResult.from_bool(_did_pass)],
)
Expand All @@ -199,22 +148,14 @@ async def _test_direction(
report: CSVReport,
current: float,
speed: float,
acceleration: Optional[float],
acceleration: float,
direction: str,
) -> bool:
has_acceleration = True if acceleration else False
plunger_poses = helpers_ot3.get_plunger_positions_ot3(api, mount)
top, bottom, blowout, drop_tip = plunger_poses
# check that encoder/motor align
aligned = await _record_plunger_alignment(
api,
mount,
report,
current,
speed,
direction,
"start",
has_acceleration,
api, mount, report, current, speed, direction, "start"
)
if not aligned:
return False
Expand All @@ -224,14 +165,7 @@ async def _test_direction(
await _move_plunger(api, mount, _plunger_target, speed, current, acceleration)
# check that encoder/motor still align
aligned = await _record_plunger_alignment(
api,
mount,
report,
current,
speed,
direction,
"end",
has_acceleration,
api, mount, report, current, speed, direction, "end"
)
except StallOrCollisionDetectedError as e:
print(e)
Expand All @@ -241,76 +175,26 @@ async def _test_direction(


async def _unstick_plunger(api: OT3API, mount: types.OT3Mount) -> None:
ui.print_header("UNSTICK PLUNGER")
await api.refresh_positions()
plunger_poses = helpers_ot3.get_plunger_positions_ot3(api, mount)
top, bottom, blowout, drop_tip = plunger_poses
print("moving to plunger bottom")
pos = await api.current_position_ot3(mount)
enc = await api.encoder_current_position_ot3(mount)
print("pos")
print(pos)
print("end")
print(enc)
await _move_plunger(api, mount, bottom, 10, 1.0, DEFAULT_ACCELERATION)
pos = await api.current_position_ot3(mount)
enc = await api.encoder_current_position_ot3(mount)
print("pos")
print(pos)
print("enc")
print(enc)
await _home_plunger(api, mount)


async def _test_plunger_no_acceleration(api: OT3API, mount: types.OT3Mount, report: CSVReport) -> bool:
currents = sorted(NO_ACCELERATION_TEST_CURRENTS, reverse=True)
speed = NO_ACCELERATION_TEST_SPEED
for current in currents:
for trial in range(NO_ACCELERATION_TEST_TRIALS):
ui.print_header(
f"NO ACCELERATION; "
f"CURRENT = {current}; SPEED = {speed}; "
f"TRIAL #{trial + 1}"
)
await _home_plunger(api, mount)
for direction in ["down", "up"]:
_pass = await _test_direction(
api,
mount,
report,
current,
speed,
None, # no acceleration
direction,
)
if not _pass:
ui.print_error(
f"failed moving {direction} at {current} amps and {speed} mm/sec"
)
if current >= NO_ACCELERATION_TEST_PASS_CURRENT:
return False
else:
return True


async def _test_plunger_with_acceleration(api: OT3API, mount: types.OT3Mount, report: CSVReport) -> None:
async def _test_plunger(api: OT3API, mount: types.OT3Mount, report: CSVReport) -> None:
ui.print_header("UNSTICK PLUNGER")
await _unstick_plunger(api, mount)
# start at HIGHEST (easiest) current
currents = sorted(list(PLUNGER_CURRENTS_SPEED.keys()), reverse=True)
for current in currents:
# start at LOWEST (easiest) speed
speeds = sorted(PLUNGER_CURRENTS_SPEED[current], reverse=False)
for speed in speeds:
ui.print_header(f"WITH ACCELERATION; CURRENT = {current}; SPEED = {speed}")
ui.print_header(f"CURRENT = {current}; SPEED = {speed}")
await _home_plunger(api, mount)
for direction in ["down", "up"]:
_pass = await _test_direction(
api,
mount,
report,
current,
speed,
TEST_ACCELERATION,
direction,
api, mount, report, current, speed, TEST_ACCELERATION, direction
)
if not _pass:
ui.print_error(
Expand All @@ -319,20 +203,9 @@ async def _test_plunger_with_acceleration(api: OT3API, mount: types.OT3Mount, re
return


async def _test_plunger(api: OT3API, mount: types.OT3Mount, report: CSVReport) -> None:
await _unstick_plunger(api, mount)
# result = await _test_plunger_no_acceleration(api, mount, report)
# if not result:
# ui.print_error("no-acceleration test failed, exiting test early")
# return
await _test_plunger_with_acceleration(api, mount, report)


async def _get_next_pipette_mount(api: OT3API) -> types.OT3Mount:
if not api.is_simulator:
ui.get_user_ready("attach a pipette")
# update pipette, b/c assemblers will be attaching new pipettes
# while this script continues to run
await helpers_ot3.update_firmware(api)
await api.cache_instruments()
found = [
Expand All @@ -344,8 +217,7 @@ async def _get_next_pipette_mount(api: OT3API) -> types.OT3Mount:


async def _reset_gantry(api: OT3API) -> None:
print("resetting the gantry")
await api.home([types.Axis.X, types.Axis.Y, types.Axis.Z_L, types.Axis.Z_R])
await api.home()
home_pos = await api.gantry_position(
types.OT3Mount.RIGHT, types.CriticalPoint.MOUNT
)
Expand Down

0 comments on commit c2ae357

Please sign in to comment.