Skip to content

Commit

Permalink
feat(api): add deck conflict checks for staging slots and after movin…
Browse files Browse the repository at this point in the history
…g labware (#14287)
  • Loading branch information
jbleon95 authored Jan 8, 2024
1 parent e627fca commit ba03e84
Show file tree
Hide file tree
Showing 10 changed files with 291 additions and 82 deletions.
26 changes: 18 additions & 8 deletions api/src/opentrons/motion_planning/adjacent_slots_getters.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
"""Getters for specific adjacent slots."""

from typing import Optional, List
from typing import Optional, List, Dict

from opentrons.types import StagingSlotName
from opentrons.types import DeckSlotName, StagingSlotName


def get_north_slot(slot: int) -> Optional[int]:
Expand Down Expand Up @@ -37,19 +37,29 @@ def get_west_slot(slot: int) -> Optional[int]:
return slot - 1


_WEST_OF_STAGING_SLOT_MAP = {
StagingSlotName.SLOT_A4: "A3",
StagingSlotName.SLOT_B4: "B3",
StagingSlotName.SLOT_C4: "C3",
StagingSlotName.SLOT_D4: "D3",
_WEST_OF_STAGING_SLOT_MAP: Dict[StagingSlotName, DeckSlotName] = {
StagingSlotName.SLOT_A4: DeckSlotName.SLOT_A3,
StagingSlotName.SLOT_B4: DeckSlotName.SLOT_B3,
StagingSlotName.SLOT_C4: DeckSlotName.SLOT_C3,
StagingSlotName.SLOT_D4: DeckSlotName.SLOT_D3,
}

_EAST_OF_FLEX_COLUMN_3_MAP: Dict[DeckSlotName, StagingSlotName] = {
deck_slot: staging_slot
for staging_slot, deck_slot in _WEST_OF_STAGING_SLOT_MAP.items()
}


def get_west_of_staging_slot(staging_slot: StagingSlotName) -> str:
def get_west_of_staging_slot(staging_slot: StagingSlotName) -> DeckSlotName:
"""Get slot west of a staging slot."""
return _WEST_OF_STAGING_SLOT_MAP[staging_slot]


def get_adjacent_staging_slot(deck_slot: DeckSlotName) -> Optional[StagingSlotName]:
"""Get the adjacent staging slot if the deck slot is in the third column."""
return _EAST_OF_FLEX_COLUMN_3_MAP.get(deck_slot)


def get_east_west_slots(slot: int) -> List[int]:
"""Get slots east & west of the given slot."""
east = get_east_slot(slot)
Expand Down
60 changes: 42 additions & 18 deletions api/src/opentrons/motion_planning/deck_conflict.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@
get_east_west_slots,
get_south_slot,
get_adjacent_slots,
get_adjacent_staging_slot,
)

from opentrons.types import DeckSlotName
from opentrons.types import DeckSlotName, StagingSlotName

_FIXED_TRASH_SLOT: Final[Set[DeckSlotName]] = {
DeckSlotName.FIXED_TRASH,
Expand Down Expand Up @@ -70,6 +71,11 @@ class HeaterShakerModule(_Module):
"""A Heater-Shaker module."""


@dataclass
class MagneticBlockModule(_Module):
"""A Magnetic Block module."""


@dataclass
class ThermocyclerModule(_Module):
"""A Thermocycler module."""
Expand All @@ -89,6 +95,7 @@ class OtherModule(_Module):
DeckItem = Union[
Labware,
HeaterShakerModule,
MagneticBlockModule,
ThermocyclerModule,
OtherModule,
]
Expand All @@ -97,9 +104,9 @@ class OtherModule(_Module):
class _NothingAllowed(NamedTuple):
"""Nothing is allowed in this slot."""

location: DeckSlotName
location: Union[DeckSlotName, StagingSlotName]
source_item: DeckItem
source_location: DeckSlotName
source_location: Union[DeckSlotName, StagingSlotName]

def is_allowed(self, item: DeckItem) -> bool:
return False
Expand Down Expand Up @@ -163,9 +170,9 @@ class DeckConflictError(ValueError):
# things that don't fit into a single deck slot, like the Thermocycler.
# Refactor this interface to take a more symbolic location.
def check(
existing_items: Mapping[DeckSlotName, DeckItem],
existing_items: Mapping[Union[DeckSlotName, StagingSlotName], DeckItem],
new_item: DeckItem,
new_location: DeckSlotName,
new_location: Union[DeckSlotName, StagingSlotName],
robot_type: RobotType,
) -> None:
"""Check a deck layout for conflicts.
Expand Down Expand Up @@ -210,10 +217,12 @@ def check(
)


def _create_ot2_restrictions(
item: DeckItem, location: DeckSlotName
def _create_ot2_restrictions( # noqa: C901
item: DeckItem, location: Union[DeckSlotName, StagingSlotName]
) -> List[_DeckRestriction]:
restrictions: List[_DeckRestriction] = []
if isinstance(location, StagingSlotName):
raise DeckConflictError(f"OT-2 does not support staging slots ({location.id}).")

if location not in _FIXED_TRASH_SLOT:
# Disallow a different item from overlapping this item in this deck slot.
Expand Down Expand Up @@ -274,20 +283,35 @@ def _create_ot2_restrictions(


def _create_flex_restrictions(
item: DeckItem, location: DeckSlotName
item: DeckItem, location: Union[DeckSlotName, StagingSlotName]
) -> List[_DeckRestriction]:
restrictions: List[_DeckRestriction] = []
restrictions: List[_DeckRestriction] = [
_NothingAllowed(
location=location,
source_item=item,
source_location=location,
)
]

if location not in _FIXED_TRASH_SLOT:
restrictions.append(
_NothingAllowed(
location=location,
source_item=item,
source_location=location,
if isinstance(item, (HeaterShakerModule, OtherModule)):
if isinstance(location, StagingSlotName):
raise DeckConflictError(
"Cannot have a module loaded on a staging area slot."
)
adjacent_staging_slot = get_adjacent_staging_slot(location)
if adjacent_staging_slot is not None:
# You can't have anything on a staging area slot next to a heater-shaker or
# temperature module because the module caddy physically blocks you from having
# that staging area slot installed in the first place.
restrictions.append(
_NothingAllowed(
location=adjacent_staging_slot,
source_item=item,
source_location=location,
)
)
)

if isinstance(item, ThermocyclerModule):
elif isinstance(item, ThermocyclerModule):
for covered_location in _flex_slots_covered_by_thermocycler():
restrictions.append(
_NothingAllowed(
Expand All @@ -301,7 +325,7 @@ def _create_flex_restrictions(


def _create_restrictions(
item: DeckItem, location: DeckSlotName, robot_type: str
item: DeckItem, location: Union[DeckSlotName, StagingSlotName], robot_type: str
) -> List[_DeckRestriction]:

if robot_type == "OT-2 Standard":
Expand Down
45 changes: 36 additions & 9 deletions api/src/opentrons/protocol_api/core/engine/deck_conflict.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
DropTipWellLocation,
)
from opentrons.protocol_engine.errors.exceptions import LabwareNotLoadedOnModuleError
from opentrons.types import DeckSlotName, Point
from opentrons.types import DeckSlotName, StagingSlotName, Point


class PartialTipMovementNotAllowedError(MotionPlanningFailureError):
Expand Down Expand Up @@ -131,7 +131,9 @@ def check(
)
mapped_existing_modules = (m for m in all_existing_modules if m is not None)

existing_items: Dict[DeckSlotName, wrapped_deck_conflict.DeckItem] = {}
existing_items: Dict[
Union[DeckSlotName, StagingSlotName], wrapped_deck_conflict.DeckItem
] = {}
for existing_location, existing_item in itertools.chain(
mapped_existing_labware, mapped_existing_modules
):
Expand Down Expand Up @@ -398,20 +400,37 @@ def _is_within_pipette_extents(
def _map_labware(
engine_state: StateView,
labware_id: str,
) -> Optional[Tuple[DeckSlotName, wrapped_deck_conflict.DeckItem]]:
) -> Optional[
Tuple[Union[DeckSlotName, StagingSlotName], wrapped_deck_conflict.DeckItem]
]:
location_from_engine = engine_state.labware.get_location(labware_id=labware_id)

if isinstance(location_from_engine, AddressableAreaLocation):
# TODO need to deal with staging slots, which will raise the value error we are returning None with below
# This will be guaranteed to be either deck slot name or staging slot name
slot: Union[DeckSlotName, StagingSlotName]
try:
deck_slot = DeckSlotName.from_primitive(
slot = DeckSlotName.from_primitive(location_from_engine.addressableAreaName)
except ValueError:
slot = StagingSlotName.from_primitive(
location_from_engine.addressableAreaName
)
except ValueError:
return None
location_from_engine = DeckSlotLocation(slotName=deck_slot)
return (
slot,
wrapped_deck_conflict.Labware(
name_for_errors=engine_state.labware.get_load_name(
labware_id=labware_id
),
highest_z=engine_state.geometry.get_labware_highest_z(
labware_id=labware_id
),
uri=engine_state.labware.get_definition_uri(labware_id=labware_id),
is_fixed_trash=engine_state.labware.is_fixed_trash(
labware_id=labware_id
),
),
)

if isinstance(location_from_engine, DeckSlotLocation):
elif isinstance(location_from_engine, DeckSlotLocation):
# This labware is loaded directly into a deck slot.
# Map it to a wrapped_deck_conflict.Labware.
return (
Expand Down Expand Up @@ -471,6 +490,14 @@ def _map_module(
highest_z_including_labware=highest_z_including_labware,
),
)
elif module_type == ModuleType.MAGNETIC_BLOCK:
return (
mapped_location,
wrapped_deck_conflict.MagneticBlockModule(
name_for_errors=name_for_errors,
highest_z_including_labware=highest_z_including_labware,
),
)
elif module_type == ModuleType.THERMOCYCLER:
return (
mapped_location,
Expand Down
15 changes: 12 additions & 3 deletions api/src/opentrons/protocol_api/core/engine/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -326,9 +326,6 @@ def move_labware(

to_location = self._convert_labware_location(location=new_location)

# TODO(mm, 2023-02-23): Check for conflicts with other items on the deck,
# when move_labware() support is no longer experimental.

self._engine_client.move_labware(
labware_id=labware_core.labware_id,
new_location=to_location,
Expand All @@ -342,6 +339,18 @@ def move_labware(
# and we only use last location for in-place pipetting commands
self.set_last_location(location=None, mount=Mount.EXTENSION)

# FIXME(jbl, 2024-01-04) deck conflict after execution logic issue, read notes in load_labware for more info:
deck_conflict.check(
engine_state=self._engine_client.state,
new_labware_id=labware_core.labware_id,
existing_labware_ids=[
labware_id
for labware_id in self._labware_cores_by_id
if labware_id != labware_core.labware_id
],
existing_module_ids=list(self._module_cores_by_id.keys()),
)

def _resolve_module_hardware(
self, serial_number: str, model: ModuleModel
) -> AbstractModule:
Expand Down
15 changes: 12 additions & 3 deletions api/src/opentrons/protocol_api/core/legacy/deck.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import functools
import logging
from collections import UserDict
from typing import Dict, Optional, List, Union
from typing import Dict, Optional, List, Union, Mapping
from typing_extensions import Protocol, Final

from opentrons_shared_data.deck import load as load_deck
Expand All @@ -14,7 +14,14 @@
from opentrons.hardware_control.modules.types import ModuleType
from opentrons.motion_planning import deck_conflict
from opentrons.protocols.api_support.labware_like import LabwareLike
from opentrons.types import DeckLocation, Location, Mount, Point, DeckSlotName
from opentrons.types import (
DeckLocation,
Location,
Mount,
Point,
DeckSlotName,
StagingSlotName,
)

from opentrons.protocol_api.core.labware import AbstractLabware
from opentrons.protocol_api.deck import CalibrationPosition
Expand Down Expand Up @@ -167,7 +174,9 @@ def __delitem__(self, key: DeckLocation) -> None:

def __setitem__(self, key: DeckLocation, val: DeckItem) -> None:
slot_key_int = self._check_name(key)
existing_items = {
existing_items: Mapping[
Union[DeckSlotName, StagingSlotName], deck_conflict.DeckItem
] = {
DeckSlotName.from_primitive(slot): self._map_to_conflict_checker_item(item)
for slot, item in self.data.items()
if item is not None
Expand Down
2 changes: 1 addition & 1 deletion api/src/opentrons/protocol_api/deck.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ def left_of(self, slot: DeckLocation) -> Optional[DeckItem]:
if isinstance(slot_name, DeckSlotName):
west_slot = adjacent_slots_getters.get_west_slot(slot_name.as_int())
else:
west_slot = adjacent_slots_getters.get_west_of_staging_slot(slot_name)
west_slot = adjacent_slots_getters.get_west_of_staging_slot(slot_name).id

return self[west_slot] if west_slot is not None else None

Expand Down
31 changes: 25 additions & 6 deletions api/tests/opentrons/motion_planning/test_adjacent_slots_getters.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import pytest
from typing import List, Optional

from opentrons.types import StagingSlotName
from opentrons.types import DeckSlotName, StagingSlotName
from opentrons.motion_planning.adjacent_slots_getters import (
get_east_slot,
get_south_slot,
Expand All @@ -12,6 +12,7 @@
get_north_south_slots,
get_adjacent_slots,
get_west_of_staging_slot,
get_adjacent_staging_slot,
)


Expand Down Expand Up @@ -98,14 +99,32 @@ def test_get_adjacent_slots(slot: int, expected_adjacent: List[int]) -> None:
@pytest.mark.parametrize(
argnames=["slot", "expected_adjacent"],
argvalues=[
(StagingSlotName.SLOT_A4, "A3"),
(StagingSlotName.SLOT_B4, "B3"),
(StagingSlotName.SLOT_C4, "C3"),
(StagingSlotName.SLOT_D4, "D3"),
(StagingSlotName.SLOT_A4, DeckSlotName.SLOT_A3),
(StagingSlotName.SLOT_B4, DeckSlotName.SLOT_B3),
(StagingSlotName.SLOT_C4, DeckSlotName.SLOT_C3),
(StagingSlotName.SLOT_D4, DeckSlotName.SLOT_D3),
],
)
def test_get_west_of_staging_slot(
slot: StagingSlotName, expected_adjacent: str
slot: StagingSlotName, expected_adjacent: DeckSlotName
) -> None:
"""It should find the slot directly west of a staging slot."""
assert get_west_of_staging_slot(slot) == expected_adjacent


@pytest.mark.parametrize(
argnames=["slot", "expected_adjacent"],
argvalues=[
(DeckSlotName.SLOT_A3, StagingSlotName.SLOT_A4),
(DeckSlotName.SLOT_B3, StagingSlotName.SLOT_B4),
(DeckSlotName.SLOT_C3, StagingSlotName.SLOT_C4),
(DeckSlotName.SLOT_D3, StagingSlotName.SLOT_D4),
(DeckSlotName.SLOT_D1, None),
(DeckSlotName.SLOT_1, None),
],
)
def test_get_adjacent_staging_slot(
slot: DeckSlotName, expected_adjacent: Optional[StagingSlotName]
) -> None:
"""It should find the adjacent slot east of a staging slot if it exists."""
assert get_adjacent_staging_slot(slot) == expected_adjacent
Loading

0 comments on commit ba03e84

Please sign in to comment.