Skip to content

Commit

Permalink
refactor(api): add explicit Flex hardware protocol (#14091)
Browse files Browse the repository at this point in the history
* pass around mypy protocol instead of OT3API
* Explicit parameterization of config types
* Fix some tests with the protocol engine related to protocol changes
* Added a `FlexInstrumentConfigurer` for flex-specific configuration extensions
* Update test mocks to differentiate ot2/ot3 hardware
* Modifed `get_robot_type` to be parameterized on some singleton class types for robot models
* remove unnecessary TYPE_CHECKING import
* Add tip presence protocol functions to satisfy edits to tip manager
  • Loading branch information
fsinapi authored Dec 12, 2023
1 parent a32571e commit 85dfebe
Show file tree
Hide file tree
Showing 23 changed files with 427 additions and 140 deletions.
15 changes: 11 additions & 4 deletions api/src/opentrons/hardware_control/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,24 +13,29 @@
from .api import API
from .pause_manager import PauseManager
from .backends import Controller, Simulator
from .types import CriticalPoint, ExecutionState
from .types import CriticalPoint, ExecutionState, OT3Mount
from .constants import DROP_TIP_RELEASE_DISTANCE
from .thread_manager import ThreadManager
from .execution_manager import ExecutionManager
from .threaded_async_lock import ThreadedAsyncLock, ThreadedAsyncForbidden
from .protocols import HardwareControlInterface
from .protocols import HardwareControlInterface, FlexHardwareControlInterface
from .instruments import AbstractInstrument, Gripper
from typing import Union
from .ot3_calibration import OT3Transforms
from .robot_calibration import RobotCalibration
from opentrons.config.types import RobotConfig, OT3Config

from opentrons.types import Mount

# TODO (lc 12-05-2022) We should 1. figure out if we need
# to globally export a class that is strictly used in the hardware controller
# and 2. how to properly export an ot2 and ot3 pipette.
from .instruments.ot2.pipette import Pipette

OT2HardwareControlAPI = HardwareControlInterface[RobotCalibration]
OT3HardwareControlAPI = HardwareControlInterface[OT3Transforms]
OT2HardwareControlAPI = HardwareControlInterface[RobotCalibration, Mount, RobotConfig]
OT3HardwareControlAPI = FlexHardwareControlInterface[
OT3Transforms, Union[Mount, OT3Mount], OT3Config
]
HardwareControlAPI = Union[OT2HardwareControlAPI, OT3HardwareControlAPI]

ThreadManagedHardware = ThreadManager[HardwareControlAPI]
Expand All @@ -55,4 +60,6 @@
"ThreadedAsyncForbidden",
"ThreadManagedHardware",
"SyncHardwareAPI",
"OT2HardwareControlAPI",
"OT3HardwareControlAPI",
]
5 changes: 4 additions & 1 deletion api/src/opentrons/hardware_control/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ class API(
# of methods that are present in the protocol will call the (empty,
# do-nothing) methods in the protocol. This will happily make all the
# tests fail.
HardwareControlInterface[RobotCalibration],
HardwareControlInterface[RobotCalibration, top_types.Mount, RobotConfig],
):
"""This API is the primary interface to the hardware controller.
Expand Down Expand Up @@ -426,6 +426,9 @@ async def update_firmware(
firmware_file, checked_loop, explicit_modeset
)

def has_gripper(self) -> bool:
return False

async def cache_instruments(
self, require: Optional[Dict[top_types.Mount, PipetteName]] = None
) -> None:
Expand Down
40 changes: 20 additions & 20 deletions api/src/opentrons/hardware_control/ot3_calibration.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
from .util import DeckTransformState

if TYPE_CHECKING:
from .ot3api import OT3API
from opentrons.hardware_control import OT3HardwareControlAPI

LOG = getLogger(__name__)

Expand Down Expand Up @@ -123,7 +123,7 @@ def _verify_height(


async def _verify_edge_pos(
hcapi: OT3API,
hcapi: OT3HardwareControlAPI,
mount: OT3Mount,
search_axis: Union[Literal[Axis.X, Axis.Y]],
found_edge: Point,
Expand Down Expand Up @@ -177,7 +177,7 @@ def critical_edge_offset(


async def find_edge_binary(
hcapi: OT3API,
hcapi: OT3HardwareControlAPI,
mount: OT3Mount,
slot_edge_nominal: Point,
search_axis: Union[Literal[Axis.X, Axis.Y]],
Expand Down Expand Up @@ -272,7 +272,7 @@ async def find_edge_binary(


async def find_slot_center_binary(
hcapi: OT3API,
hcapi: OT3HardwareControlAPI,
mount: OT3Mount,
estimated_center: Point,
raise_verify_error: bool = True,
Expand Down Expand Up @@ -337,7 +337,7 @@ async def find_slot_center_binary(


async def find_calibration_structure_height(
hcapi: OT3API,
hcapi: OT3HardwareControlAPI,
mount: OT3Mount,
nominal_center: Point,
probe: InstrumentProbeType = InstrumentProbeType.PRIMARY,
Expand Down Expand Up @@ -365,7 +365,7 @@ async def find_calibration_structure_height(


async def _probe_deck_at(
api: OT3API,
api: OT3HardwareControlAPI,
mount: OT3Mount,
target: Point,
settings: CapacitivePassSettings,
Expand All @@ -390,7 +390,7 @@ async def _probe_deck_at(


async def find_axis_center(
hcapi: OT3API,
hcapi: OT3HardwareControlAPI,
mount: OT3Mount,
minus_edge_nominal: Point,
plus_edge_nominal: Point,
Expand Down Expand Up @@ -530,7 +530,7 @@ def _edges_from_data(


async def find_slot_center_noncontact(
hcapi: OT3API, mount: OT3Mount, estimated_center: Point
hcapi: OT3HardwareControlAPI, mount: OT3Mount, estimated_center: Point
) -> Point:
NONCONTACT_INTERVAL_MM: float = 0.1
travel_center = estimated_center + Point(0, 0, NONCONTACT_INTERVAL_MM)
Expand All @@ -552,7 +552,7 @@ async def find_slot_center_noncontact(


async def find_calibration_structure_center(
hcapi: OT3API,
hcapi: OT3HardwareControlAPI,
mount: OT3Mount,
nominal_center: Point,
method: CalibrationMethod = CalibrationMethod.BINARY_SEARCH,
Expand All @@ -574,7 +574,7 @@ async def find_calibration_structure_center(


async def _calibrate_mount(
hcapi: OT3API,
hcapi: OT3HardwareControlAPI,
mount: OT3Mount,
slot: int = SLOT_CENTER,
method: CalibrationMethod = CalibrationMethod.BINARY_SEARCH,
Expand Down Expand Up @@ -641,7 +641,7 @@ async def _calibrate_mount(


async def find_calibration_structure_position(
hcapi: OT3API,
hcapi: OT3HardwareControlAPI,
mount: OT3Mount,
nominal_center: Point,
method: CalibrationMethod = CalibrationMethod.BINARY_SEARCH,
Expand Down Expand Up @@ -673,7 +673,7 @@ async def find_calibration_structure_position(


async def find_slot_center_binary_from_nominal_center(
hcapi: OT3API,
hcapi: OT3HardwareControlAPI,
mount: OT3Mount,
slot: int,
) -> Tuple[Point, Point]:
Expand All @@ -698,7 +698,7 @@ async def find_slot_center_binary_from_nominal_center(


async def _determine_transform_matrix(
hcapi: OT3API,
hcapi: OT3HardwareControlAPI,
mount: OT3Mount,
) -> Tuple[types.AttitudeMatrix, Dict[str, Any]]:
"""
Expand Down Expand Up @@ -750,7 +750,7 @@ def gripper_pin_offsets_mean(front: Point, rear: Point) -> Point:


async def calibrate_gripper_jaw(
hcapi: OT3API,
hcapi: OT3HardwareControlAPI,
probe: GripperProbe,
slot: int = 5,
method: CalibrationMethod = CalibrationMethod.BINARY_SEARCH,
Expand Down Expand Up @@ -788,7 +788,7 @@ async def calibrate_gripper_jaw(


async def calibrate_gripper(
hcapi: OT3API, offset_front: Point, offset_rear: Point
hcapi: OT3HardwareControlAPI, offset_front: Point, offset_rear: Point
) -> Point:
"""Calibrate gripper."""
offset = gripper_pin_offsets_mean(front=offset_front, rear=offset_rear)
Expand All @@ -798,7 +798,7 @@ async def calibrate_gripper(


async def find_pipette_offset(
hcapi: OT3API,
hcapi: OT3HardwareControlAPI,
mount: Literal[OT3Mount.LEFT, OT3Mount.RIGHT],
slot: int = 5,
method: CalibrationMethod = CalibrationMethod.BINARY_SEARCH,
Expand Down Expand Up @@ -829,7 +829,7 @@ async def find_pipette_offset(


async def calibrate_pipette(
hcapi: OT3API,
hcapi: OT3HardwareControlAPI,
mount: Literal[OT3Mount.LEFT, OT3Mount.RIGHT],
slot: int = 5,
method: CalibrationMethod = CalibrationMethod.BINARY_SEARCH,
Expand All @@ -852,7 +852,7 @@ async def calibrate_pipette(


async def calibrate_module(
hcapi: OT3API,
hcapi: OT3HardwareControlAPI,
mount: OT3Mount,
slot: str,
module_id: str,
Expand Down Expand Up @@ -907,7 +907,7 @@ async def calibrate_module(


async def calibrate_belts(
hcapi: OT3API,
hcapi: OT3HardwareControlAPI,
mount: OT3Mount,
pipette_id: str,
) -> Tuple[types.AttitudeMatrix, Dict[str, Any]]:
Expand Down Expand Up @@ -1005,7 +1005,7 @@ def validate_attitude_deck_calibration(
return DeckTransformState.OK


def delete_belt_calibration_data(hcapi: OT3API) -> None:
def delete_belt_calibration_data(hcapi: OT3HardwareControlAPI) -> None:
delete_robot_belt_attitude()
hcapi.reset_deck_calibration()

Expand Down
44 changes: 12 additions & 32 deletions api/src/opentrons/hardware_control/ot3api.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@
from . import modules
from .ot3_calibration import OT3Transforms, OT3RobotCalibrationProvider

from .protocols import HardwareControlInterface
from .protocols import FlexHardwareControlInterface

# TODO (lc 09/15/2022) We should update our pipette handler to reflect OT-3 properties
# in a follow-up PR.
Expand Down Expand Up @@ -197,7 +197,9 @@ class OT3API(
# of methods that are present in the protocol will call the (empty,
# do-nothing) methods in the protocol. This will happily make all the
# tests fail.
HardwareControlInterface[OT3Transforms],
FlexHardwareControlInterface[
OT3Transforms, Union[top_types.Mount, OT3Mount], OT3Config
],
):
"""This API is the primary interface to the hardware controller.
Expand Down Expand Up @@ -1312,6 +1314,9 @@ async def idle_gripper(self) -> None:
except GripperNotPresentError:
pass

def gripper_jaw_can_home(self) -> bool:
return self._gripper_handler.is_ready_for_jaw_home()

def _build_moves(
self,
origin: Dict[Axis, float],
Expand Down Expand Up @@ -1997,10 +2002,6 @@ async def get_tip_presence_status(
self,
mount: Union[top_types.Mount, OT3Mount],
) -> TipStateType:
"""
Check tip presence status. If a high throughput pipette is present,
move the tip motors down before checking the sensor status.
"""
real_mount = OT3Mount.from_mount(mount)
async with contextlib.AsyncExitStack() as stack:
if (
Expand Down Expand Up @@ -2256,7 +2257,7 @@ def reset_instrument(
self._pipette_handler.reset_instrument(checked_mount)

def get_instrument_offset(
self, mount: OT3Mount
self, mount: Union[top_types.Mount, OT3Mount]
) -> Union[GripperCalibrationOffset, PipetteOffsetSummary, None]:
"""Get instrument calibration data."""
# TODO (spp, 2023-04-19): We haven't introduced a 'calibration_offset' key in
Expand All @@ -2265,11 +2266,13 @@ def get_instrument_offset(
# to be a part of the dict, this getter can be updated to fetch pipette offset
# from the dict, or just remove this getter entirely.

if mount == OT3Mount.GRIPPER:
ot3_mount = OT3Mount.from_mount(mount)

if ot3_mount == OT3Mount.GRIPPER:
gripper_dict = self._gripper_handler.get_gripper_dict()
return gripper_dict["calibration_offset"] if gripper_dict else None
else:
return self._pipette_handler.get_instrument_offset(mount=mount)
return self._pipette_handler.get_instrument_offset(mount=ot3_mount)

async def reset_instrument_offset(
self, mount: Union[top_types.Mount, OT3Mount], to_default: bool = True
Expand Down Expand Up @@ -2534,29 +2537,6 @@ async def capacitive_probe(
retract_after: bool = True,
probe: Optional[InstrumentProbeType] = None,
) -> Tuple[float, bool]:
"""Determine the position of something using the capacitive sensor.
This function orchestrates detecting the position of a collision between the
capacitive probe on the tool on the specified mount, and some fixed element
of the robot.
When calling this function, the mount's probe critical point should already
be aligned in the probe axis with the item to be probed.
It will move the mount's probe critical point to a small distance behind
the expected position of the element (which is target_pos, in deck coordinates,
in the axis to be probed) while running the tool's capacitive sensor. When the
sensor senses contact, the mount stops.
This function moves away and returns the sensed position.
This sensed position can be used in several ways, including
- To get an absolute position in deck coordinates of whatever was
targeted, if something was guaranteed to be physically present.
- To detect whether a collision occured at all. If this function
returns a value far enough past the anticipated position, then it indicates
there was no material there.
"""
if moving_axis not in [
Axis.X,
Axis.Y,
Expand Down
Loading

0 comments on commit 85dfebe

Please sign in to comment.