Skip to content

Commit

Permalink
fix(api): Ensure stack of labware on Staging Area Slot properly resol…
Browse files Browse the repository at this point in the history
…ves ancestor slot (#16681)

Covers PLAT-538

Ensures labware stacked in staging area slots can resolve their lowest ancestor slot.
  • Loading branch information
CaseyBatten authored Nov 5, 2024
1 parent 01c06d5 commit 7669fc2
Show file tree
Hide file tree
Showing 7 changed files with 100 additions and 32 deletions.
8 changes: 6 additions & 2 deletions api/src/opentrons/protocol_api/core/engine/labware.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
LabwareOffsetCreate,
LabwareOffsetVector,
)
from opentrons.types import DeckSlotName, Point
from opentrons.types import DeckSlotName, Point, StagingSlotName
from opentrons.hardware_control.nozzle_manager import NozzleMap


Expand Down Expand Up @@ -190,9 +190,13 @@ def get_well_core(self, well_name: str) -> WellCore:
def get_deck_slot(self) -> Optional[DeckSlotName]:
"""Get the deck slot the labware is in, if on deck."""
try:
return self._engine_client.state.geometry.get_ancestor_slot_name(
ancestor = self._engine_client.state.geometry.get_ancestor_slot_name(
self.labware_id
)
if isinstance(ancestor, StagingSlotName):
# The only use case for get_deck_slot is with a legacy OT-2 function which resolves to a numerical deck slot, so we can ignore staging area slots for now
return None
return ancestor
except (
LabwareNotOnDeckError,
ModuleNotOnDeckError,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
)

from opentrons_shared_data.errors.exceptions import MotionPlanningFailureError
from opentrons.protocol_engine.errors import LocationIsStagingSlotError
from opentrons_shared_data.module import FLEX_TC_LID_COLLISION_ZONE

from opentrons.hardware_control import CriticalPoint
Expand Down Expand Up @@ -63,7 +64,7 @@ def __init__(self, message: str) -> None:
)


def check_safe_for_pipette_movement(
def check_safe_for_pipette_movement( # noqa: C901
engine_state: StateView,
pipette_id: str,
labware_id: str,
Expand Down Expand Up @@ -121,8 +122,12 @@ def check_safe_for_pipette_movement(
f"Requested motion with the {primary_nozzle} nozzle partial configuration"
f" is outside of robot bounds for the pipette."
)

labware_slot = engine_state.geometry.get_ancestor_slot_name(labware_id)
ancestor = engine_state.geometry.get_ancestor_slot_name(labware_id)
if isinstance(ancestor, StagingSlotName):
raise LocationIsStagingSlotError(
"Cannot perform pipette actions on labware in Staging Area Slot."
)
labware_slot = ancestor

surrounding_slots = adjacent_slots_getters.get_surrounding_slots(
slot=labware_slot.as_int(), robot_type=engine_state.config.robot_type
Expand Down
13 changes: 9 additions & 4 deletions api/src/opentrons/protocol_engine/execution/movement.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@
import logging
from typing import Optional, List, Union

from opentrons.types import Point, MountType
from opentrons.types import Point, MountType, StagingSlotName
from opentrons.hardware_control import HardwareControlAPI
from opentrons_shared_data.errors.exceptions import PositionUnknownError
from opentrons.protocol_engine.errors import LocationIsStagingSlotError

from ..types import (
WellLocation,
Expand Down Expand Up @@ -93,9 +94,13 @@ async def move_to_well(
self._state_store.modules.get_heater_shaker_movement_restrictors()
)

dest_slot_int = self._state_store.geometry.get_ancestor_slot_name(
labware_id
).as_int()
ancestor = self._state_store.geometry.get_ancestor_slot_name(labware_id)
if isinstance(ancestor, StagingSlotName):
raise LocationIsStagingSlotError(
"Cannot move to well on labware in Staging Area Slot."
)

dest_slot_int = ancestor.as_int()

self._hs_movement_flagger.raise_if_movement_restricted(
hs_movement_restrictors=hs_movement_restrictors,
Expand Down
43 changes: 29 additions & 14 deletions api/src/opentrons/protocol_engine/state/geometry.py
Original file line number Diff line number Diff line change
Expand Up @@ -709,10 +709,12 @@ def _get_lid_dock_slot_name(self, labware_id: str) -> str:
assert isinstance(labware_location, AddressableAreaLocation)
return labware_location.addressableAreaName

def get_ancestor_slot_name(self, labware_id: str) -> DeckSlotName:
def get_ancestor_slot_name(
self, labware_id: str
) -> Union[DeckSlotName, StagingSlotName]:
"""Get the slot name of the labware or the module that the labware is on."""
labware = self._labware.get(labware_id)
slot_name: DeckSlotName
slot_name: Union[DeckSlotName, StagingSlotName]

if isinstance(labware.location, DeckSlotLocation):
slot_name = labware.location.slotName
Expand All @@ -724,18 +726,14 @@ def get_ancestor_slot_name(self, labware_id: str) -> DeckSlotName:
slot_name = self.get_ancestor_slot_name(below_labware_id)
elif isinstance(labware.location, AddressableAreaLocation):
area_name = labware.location.addressableAreaName
# TODO we might want to eventually return some sort of staging slot name when we're ready to work through
# the linting nightmare it will create
if self._labware.is_absorbance_reader_lid(labware_id):
raise errors.LocationIsLidDockSlotError(
"Cannot get ancestor slot name for labware on lid dock slot."
)
if fixture_validation.is_staging_slot(area_name):
raise errors.LocationIsStagingSlotError(
"Cannot get ancestor slot name for labware on staging slot."
)
raise errors.LocationIs
slot_name = DeckSlotName.from_primitive(area_name)
elif fixture_validation.is_staging_slot(area_name):
slot_name = StagingSlotName.from_primitive(area_name)
else:
slot_name = DeckSlotName.from_primitive(area_name)
elif labware.location == OFF_DECK_LOCATION:
raise errors.LabwareNotOnDeckError(
f"Labware {labware_id} does not have a slot associated with it"
Expand Down Expand Up @@ -829,7 +827,9 @@ def get_labware_grip_point(
)

def get_extra_waypoints(
self, location: Optional[CurrentPipetteLocation], to_slot: DeckSlotName
self,
location: Optional[CurrentPipetteLocation],
to_slot: Union[DeckSlotName, StagingSlotName],
) -> List[Tuple[float, float]]:
"""Get extra waypoints for movement if thermocycler needs to be dodged."""
if location is not None:
Expand Down Expand Up @@ -888,8 +888,10 @@ def get_slot_item(
return maybe_labware or maybe_module or maybe_fixture or None

@staticmethod
def get_slot_column(slot_name: DeckSlotName) -> int:
def get_slot_column(slot_name: Union[DeckSlotName, StagingSlotName]) -> int:
"""Get the column number for the specified slot."""
if isinstance(slot_name, StagingSlotName):
return 4
row_col_name = slot_name.to_ot3_equivalent()
slot_name_match = WELL_NAME_PATTERN.match(row_col_name.value)
assert (
Expand Down Expand Up @@ -1170,7 +1172,13 @@ def get_total_nominal_gripper_offset_for_move_type(
)

assert isinstance(
ancestor, (DeckSlotLocation, ModuleLocation, OnLabwareLocation)
ancestor,
(
DeckSlotLocation,
ModuleLocation,
OnLabwareLocation,
AddressableAreaLocation,
),
), "No gripper offsets for off-deck labware"
return (
direct_parent_offset.pickUpOffset
Expand Down Expand Up @@ -1217,7 +1225,13 @@ def get_total_nominal_gripper_offset_for_move_type(
)

assert isinstance(
ancestor, (DeckSlotLocation, ModuleLocation, OnLabwareLocation)
ancestor,
(
DeckSlotLocation,
ModuleLocation,
OnLabwareLocation,
AddressableAreaLocation,
),
), "No gripper offsets for off-deck labware"
return (
direct_parent_offset.dropOffset
Expand Down Expand Up @@ -1293,6 +1307,7 @@ def _labware_gripper_offsets(
DeckSlotLocation,
ModuleLocation,
AddressableAreaLocation,
OnLabwareLocation,
),
), "No gripper offsets for off-deck labware"

Expand Down
6 changes: 3 additions & 3 deletions api/src/opentrons/protocol_engine/state/modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
from opentrons.protocol_engine.state.module_substates.absorbance_reader_substate import (
AbsorbanceReaderMeasureMode,
)
from opentrons.types import DeckSlotName, MountType
from opentrons.types import DeckSlotName, MountType, StagingSlotName
from ..errors import ModuleNotConnectedError

from ..types import (
Expand Down Expand Up @@ -1124,8 +1124,8 @@ def calculate_magnet_height(

def should_dodge_thermocycler(
self,
from_slot: DeckSlotName,
to_slot: DeckSlotName,
from_slot: Union[DeckSlotName, StagingSlotName],
to_slot: Union[DeckSlotName, StagingSlotName],
) -> bool:
"""Decide if the requested path would cross the thermocycler, if installed.
Expand Down
22 changes: 17 additions & 5 deletions api/src/opentrons/protocol_engine/state/motion.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from dataclasses import dataclass
from typing import List, Optional, Union

from opentrons.types import MountType, Point
from opentrons.types import MountType, Point, StagingSlotName
from opentrons.hardware_control.types import CriticalPoint
from opentrons.motion_planning.adjacent_slots_getters import (
get_east_west_slots,
Expand Down Expand Up @@ -277,9 +277,13 @@ def check_pipette_blocking_hs_latch(
current_location = self._pipettes.get_current_location()
if current_location is not None:
if isinstance(current_location, CurrentWell):
pipette_deck_slot = self._geometry.get_ancestor_slot_name(
ancestor = self._geometry.get_ancestor_slot_name(
current_location.labware_id
).as_int()
)
if isinstance(ancestor, StagingSlotName):
# Staging Area Slots cannot intersect with the h/s
return False
pipette_deck_slot = ancestor.as_int()
else:
pipette_deck_slot = (
self._addressable_areas.get_addressable_area_base_slot(
Expand All @@ -299,9 +303,13 @@ def check_pipette_blocking_hs_shaker(
current_location = self._pipettes.get_current_location()
if current_location is not None:
if isinstance(current_location, CurrentWell):
pipette_deck_slot = self._geometry.get_ancestor_slot_name(
ancestor = self._geometry.get_ancestor_slot_name(
current_location.labware_id
).as_int()
)
if isinstance(ancestor, StagingSlotName):
# Staging Area Slots cannot intersect with the h/s
return False
pipette_deck_slot = ancestor.as_int()
else:
pipette_deck_slot = (
self._addressable_areas.get_addressable_area_base_slot(
Expand All @@ -324,6 +332,10 @@ def get_touch_tip_waypoints(
"""Get a list of touch points for a touch tip operation."""
mount = self._pipettes.get_mount(pipette_id)
labware_slot = self._geometry.get_ancestor_slot_name(labware_id)
if isinstance(labware_slot, StagingSlotName):
raise errors.LocationIsStagingSlotError(
"Cannot perform Touch Tip on labware in Staging Area Slot."
)
next_to_module = self._modules.is_edge_move_unsafe(mount, labware_slot)
edge_path_type = self._labware.get_edge_path_type(
labware_id, well_name, mount, labware_slot, next_to_module
Expand Down
29 changes: 28 additions & 1 deletion api/tests/opentrons/protocol_engine/state/test_geometry_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from opentrons_shared_data.pipette import pipette_definition
from opentrons.calibration_storage.helpers import uri_from_details
from opentrons.protocols.models import LabwareDefinition
from opentrons.types import Point, DeckSlotName, MountType
from opentrons.types import Point, DeckSlotName, MountType, StagingSlotName
from opentrons_shared_data.pipette.types import PipetteNameType
from opentrons_shared_data.labware.labware_definition import (
Dimensions as LabwareDimensions,
Expand Down Expand Up @@ -2189,6 +2189,33 @@ def test_get_ancestor_slot_name(
assert subject.get_ancestor_slot_name("labware-2") == DeckSlotName.SLOT_1


def test_get_ancestor_slot_for_labware_stack_in_staging_area_slot(
decoy: Decoy,
mock_labware_view: LabwareView,
subject: GeometryView,
) -> None:
"""It should get name of ancestor slot of a stack of labware in a staging area slot."""
decoy.when(mock_labware_view.get("labware-1")).then_return(
LoadedLabware(
id="labware-1",
loadName="load-name",
definitionUri="1234",
location=AddressableAreaLocation(
addressableAreaName=StagingSlotName.SLOT_D4.id
),
)
)
decoy.when(mock_labware_view.get("labware-2")).then_return(
LoadedLabware(
id="labware-2",
loadName="load-name",
definitionUri="1234",
location=OnLabwareLocation(labwareId="labware-1"),
)
)
assert subject.get_ancestor_slot_name("labware-2") == StagingSlotName.SLOT_D4


def test_ensure_location_not_occupied_raises(
decoy: Decoy,
mock_labware_view: LabwareView,
Expand Down

0 comments on commit 7669fc2

Please sign in to comment.