Skip to content

Commit

Permalink
fix(api): use critical point instead of primary nozzle when doing dec…
Browse files Browse the repository at this point in the history
…k conflict check (#16268)

Closes RQA-3175

# Overview

There was a bug in the deck conflict checker that it wasn't considering
the change in critical point of pipette in use when addressing
reservoirs. This was leading to incorrect conflict checks when moving to
any labware that had the `centerMultichannelOnWells` quirk.

This PR fixes that by correctly finding the pipette's boundaries when
its *critical point* is moved to the destination in question rather than
its primary nozzle (the default critical point).

## Test Plan and Hands on Testing

- Added integration tests that uses reservoirs with row and channel
configurations
- Existing unit and integration tests should remain unaffected
- [x] Add tests for conflict checks with 8-channel pipette partial
column configuration
- [x] Test on a robot that existing pipette movements with different
pipettes in different configurations is not affected

## Review requests

- Make sure the logic checks out & test on robot

## Risk assessment

Low. We have quite good test coverage for a lot of cases so any
unexpected changes should get caught in the tests easily.
  • Loading branch information
sanni-t authored Sep 17, 2024
1 parent 5d86811 commit f77a962
Show file tree
Hide file tree
Showing 5 changed files with 356 additions and 35 deletions.
22 changes: 21 additions & 1 deletion api/src/opentrons/protocol_api/core/engine/deck_conflict.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from opentrons_shared_data.errors.exceptions import MotionPlanningFailureError
from opentrons_shared_data.module import FLEX_TC_LID_COLLISION_ZONE

from opentrons.hardware_control import CriticalPoint
from opentrons.hardware_control.modules.types import ModuleType
from opentrons.motion_planning import deck_conflict as wrapped_deck_conflict
from opentrons.motion_planning import adjacent_slots_getters
Expand Down Expand Up @@ -228,9 +229,13 @@ def check_safe_for_pipette_movement(
)
primary_nozzle = engine_state.pipettes.get_primary_nozzle(pipette_id)

destination_cp = _get_critical_point_to_use(engine_state, labware_id)

pipette_bounds_at_well_location = (
engine_state.pipettes.get_pipette_bounds_at_specified_move_to_position(
pipette_id=pipette_id, destination_position=well_location_point
pipette_id=pipette_id,
destination_position=well_location_point,
critical_point=destination_cp,
)
)
if not _is_within_pipette_extents(
Expand Down Expand Up @@ -284,6 +289,21 @@ def check_safe_for_pipette_movement(
)


def _get_critical_point_to_use(
engine_state: StateView, labware_id: str
) -> Optional[CriticalPoint]:
"""Return the critical point to use when accessing the given labware."""
# TODO (spp, 2024-09-17): looks like Y_CENTER of column is the same as its XY_CENTER.
# I'm using this if-else ladder to be consistent with what we do in
# `MotionPlanning.get_movement_waypoints_to_well()`.
# We should probably use only XY_CENTER in both places.
if engine_state.labware.get_should_center_column_on_target_well(labware_id):
return CriticalPoint.Y_CENTER
elif engine_state.labware.get_should_center_pipette_on_target_well(labware_id):
return CriticalPoint.XY_CENTER
return None


def _slot_has_potential_colliding_object(
engine_state: StateView,
pipette_bounds: Tuple[Point, Point, Point, Point],
Expand Down
65 changes: 45 additions & 20 deletions api/src/opentrons/protocol_engine/state/pipettes.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from opentrons_shared_data.pipette import pipette_definition
from opentrons.config.defaults_ot2 import Z_RETRACT_DISTANCE
from opentrons.hardware_control.dev_types import PipetteDict
from opentrons.hardware_control import CriticalPoint
from opentrons.hardware_control.nozzle_manager import (
NozzleConfigurationType,
NozzleMap,
Expand Down Expand Up @@ -795,17 +796,27 @@ def get_primary_nozzle(self, pipette_id: str) -> Optional[str]:
nozzle_map = self._state.nozzle_configuration_by_id.get(pipette_id)
return nozzle_map.starting_nozzle if nozzle_map else None

def get_primary_nozzle_offset(self, pipette_id: str) -> Point:
"""Get the pipette's current primary nozzle's offset."""
def _get_critical_point_offset_without_tip(
self, pipette_id: str, critical_point: Optional[CriticalPoint]
) -> Point:
"""Get the offset of the specified critical point from pipette's mount position."""
nozzle_map = self._state.nozzle_configuration_by_id.get(pipette_id)
if nozzle_map:
primary_nozzle_offset = nozzle_map.starting_nozzle_offset
else:
# When not in partial configuration, back-left nozzle is the primary
primary_nozzle_offset = self.get_config(
pipette_id
).bounding_nozzle_offsets.back_left_offset
return primary_nozzle_offset
# Nozzle map is unavailable only when there's no pipette loaded
# so this is merely for satisfying the type checker
assert (
nozzle_map is not None
), "Error getting critical point offset. Nozzle map not found."
match critical_point:
case CriticalPoint.INSTRUMENT_XY_CENTER:
return nozzle_map.instrument_xy_center_offset
case CriticalPoint.XY_CENTER:
return nozzle_map.xy_center_offset
case CriticalPoint.Y_CENTER:
return nozzle_map.y_center_offset
case CriticalPoint.FRONT_NOZZLE:
return nozzle_map.front_nozzle_offset
case _:
return nozzle_map.starting_nozzle_offset

def get_pipette_bounding_nozzle_offsets(
self, pipette_id: str
Expand All @@ -817,32 +828,46 @@ def get_pipette_bounding_box(self, pipette_id: str) -> PipetteBoundingBoxOffsets
"""Get the bounding box of the pipette."""
return self.get_config(pipette_id).pipette_bounding_box_offsets

# TODO (spp, 2024-09-17): in order to find the position of pipette at destination,
# this method repeats the same steps that waypoints builder does while finding
# waypoints to move to. We should consolidate these steps into a shared entity
# so that the deck conflict checker and movement plan builder always remain in sync.
def get_pipette_bounds_at_specified_move_to_position(
self,
pipette_id: str,
destination_position: Point,
critical_point: Optional[CriticalPoint],
) -> Tuple[Point, Point, Point, Point]:
"""Get the pipette's bounding offsets when primary nozzle is at the given position."""
primary_nozzle_offset = self.get_primary_nozzle_offset(pipette_id)
"""Get the pipette's bounding box position when critical point is at the destination position.
Returns a tuple of the pipette's bounding box position in deck coordinates as-
(back_left_bound, front_right_bound, back_right_bound, front_left_bound)
Bounding box of the pipette includes the pipette's outer casing as well as nozzles.
"""
tip = self.get_attached_tip(pipette_id)
# TODO update this for pipette robot stackup
# Primary nozzle position at destination, in deck coordinates
primary_nozzle_position = destination_position + Point(

# *Offset* of pipette's critical point w.r.t pipette mount
critical_point_offset = self._get_critical_point_offset_without_tip(
pipette_id, critical_point
)

# Position of the above critical point at destination, in deck coordinates
critical_point_position = destination_position + Point(
x=0, y=0, z=tip.length if tip else 0
)

# Get the pipette bounding box based on total nozzles
# Get the pipette bounding box coordinates
pipette_bounds_offsets = self.get_config(
pipette_id
).pipette_bounding_box_offsets
pip_back_left_bound = (
primary_nozzle_position
- primary_nozzle_offset
critical_point_position
- critical_point_offset
+ pipette_bounds_offsets.back_left_corner
)
pip_front_right_bound = (
primary_nozzle_position
- primary_nozzle_offset
critical_point_position
- critical_point_offset
+ pipette_bounds_offsets.front_right_corner
)
pip_back_right_bound = Point(
Expand Down
25 changes: 23 additions & 2 deletions api/tests/opentrons/protocol_api/core/engine/test_deck_conflict.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from opentrons_shared_data.labware.types import LabwareUri
from opentrons_shared_data.robot.types import RobotType

from opentrons.hardware_control import CriticalPoint
from opentrons.hardware_control.nozzle_manager import NozzleConfigurationType
from opentrons.motion_planning import deck_conflict as wrapped_deck_conflict
from opentrons.motion_planning import adjacent_slots_getters
Expand Down Expand Up @@ -545,9 +546,21 @@ def test_deck_conflict_raises_for_bad_pipette_move(
well_location=WellLocation(origin=WellOrigin.TOP, offset=WellOffset(z=10)),
)
).then_return(destination_well_point)
decoy.when(
mock_state_view.labware.get_should_center_column_on_target_well(
"destination-labware-id"
)
).then_return(False)
decoy.when(
mock_state_view.labware.get_should_center_pipette_on_target_well(
"destination-labware-id"
)
).then_return(False)
decoy.when(
mock_state_view.pipettes.get_pipette_bounds_at_specified_move_to_position(
pipette_id="pipette-id", destination_position=destination_well_point
pipette_id="pipette-id",
destination_position=destination_well_point,
critical_point=None,
)
).then_return(pipette_bounds)

Expand Down Expand Up @@ -653,9 +666,17 @@ def test_deck_conflict_raises_for_collision_with_tc_lid(
well_location=WellLocation(origin=WellOrigin.TOP, offset=WellOffset(z=10)),
)
).then_return(destination_well_point)

decoy.when(
mock_state_view.labware.get_should_center_column_on_target_well(
"destination-labware-id"
)
).then_return(True)
decoy.when(
mock_state_view.pipettes.get_pipette_bounds_at_specified_move_to_position(
pipette_id="pipette-id", destination_position=destination_well_point
pipette_id="pipette-id",
destination_position=destination_well_point,
critical_point=CriticalPoint.Y_CENTER,
)
).then_return(pipette_bounds_at_destination)
decoy.when(mock_state_view.pipettes.get_mount("pipette-id")).then_return(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import pytest

from opentrons import simulate
from opentrons.protocol_api import COLUMN, ALL, SINGLE
from opentrons.protocol_api import COLUMN, ALL, SINGLE, ROW
from opentrons.protocol_api.core.engine.deck_conflict import (
PartialTipMovementNotAllowedError,
)
Expand Down Expand Up @@ -226,3 +226,104 @@ def test_deck_conflicts_for_96_ch_a1_column_configuration() -> None:

# No error NOW because of full config
instrument.dispense(50, badly_placed_plate.wells_by_name()["A1"].bottom())


@pytest.mark.ot3_only
def test_deck_conflicts_for_96_ch_and_reservoirs() -> None:
"""It should raise errors for expected deck conflicts when moving to reservoirs.
This test checks that the critical point of the pipette is taken into account,
specifically when it differs from the primary nozzle.
"""
protocol = simulate.get_protocol_api(version="2.20", robot_type="Flex")
instrument = protocol.load_instrument("flex_96channel_1000", mount="left")
# trash_labware = protocol.load_labware("opentrons_1_trash_3200ml_fixed", "A3")
# instrument.trash_container = trash_labware

protocol.load_trash_bin("A3")
right_tiprack = protocol.load_labware("opentrons_flex_96_tiprack_50ul", "C3")
front_tiprack = protocol.load_labware("opentrons_flex_96_tiprack_50ul", "D2")
# Tall deck item in B3
protocol.load_labware(
"opentrons_flex_96_tiprack_50ul",
"B3",
adapter="opentrons_flex_96_tiprack_adapter",
)
# Tall deck item in B1
protocol.load_labware(
"opentrons_flex_96_tiprack_50ul",
"B1",
adapter="opentrons_flex_96_tiprack_adapter",
)

# ############ RESERVOIRS ################
# These labware should be to the east of tall labware to avoid any partial tip deck conflicts
reservoir_1_well = protocol.load_labware("nest_1_reservoir_195ml", "C2")
reservoir_12_well = protocol.load_labware("nest_12_reservoir_15ml", "B2")

# ########### Use COLUMN A1 Config #############
instrument.configure_nozzle_layout(style=COLUMN, start="A1")

instrument.pick_up_tip(front_tiprack.wells_by_name()["A12"])

with pytest.raises(
PartialTipMovementNotAllowedError, match="collision with items in deck slot"
):
instrument.aspirate(10, reservoir_1_well.wells()[0])

instrument.aspirate(25, reservoir_12_well.wells()[0])
instrument.dispense(10, reservoir_12_well.wells()[1])

with pytest.raises(
PartialTipMovementNotAllowedError, match="collision with items in deck slot"
):
instrument.dispense(15, reservoir_12_well.wells()[3])

instrument.drop_tip()
front_tiprack.reset()

# ########### Use COLUMN A12 Config #############
instrument.configure_nozzle_layout(style=COLUMN, start="A12")

instrument.pick_up_tip(front_tiprack.wells_by_name()["A1"])
instrument.aspirate(50, reservoir_1_well.wells()[0])
with pytest.raises(
PartialTipMovementNotAllowedError, match="collision with items in deck slot"
):
instrument.dispense(10, reservoir_12_well.wells()[8])

instrument.dispense(15, reservoir_12_well.wells()[11])
instrument.dispense(10, reservoir_1_well.wells()[0])

instrument.drop_tip()
front_tiprack.reset()

# ######## CHANGE CONFIG TO ROW H1 #########
instrument.configure_nozzle_layout(style=ROW, start="H1", tip_racks=[front_tiprack])
with pytest.raises(
PartialTipMovementNotAllowedError, match="collision with items in deck slot"
):
instrument.pick_up_tip(right_tiprack.wells_by_name()["A1"])

instrument.pick_up_tip()
instrument.aspirate(25, reservoir_1_well.wells()[0])

instrument.drop_tip()
front_tiprack.reset()

# ######## CHANGE CONFIG TO ROW A1 #########
instrument.configure_nozzle_layout(style=ROW, start="A1", tip_racks=[front_tiprack])

with pytest.raises(
PartialTipMovementNotAllowedError, match="outside of robot bounds"
):
instrument.pick_up_tip()
instrument.pick_up_tip(right_tiprack.wells_by_name()["H1"])

with pytest.raises(
PartialTipMovementNotAllowedError, match="collision with items in deck slot"
):
instrument.aspirate(25, reservoir_1_well.wells()[0])

instrument.drop_tip()
front_tiprack.reset()
Loading

0 comments on commit f77a962

Please sign in to comment.