Skip to content

Commit

Permalink
fix(api): enable deck conflict checker for the flex (#13189)
Browse files Browse the repository at this point in the history
  • Loading branch information
TamarZanzouri authored Aug 7, 2023
1 parent b7810bb commit 136f933
Show file tree
Hide file tree
Showing 12 changed files with 674 additions and 262 deletions.
121 changes: 92 additions & 29 deletions api/src/opentrons/motion_planning/deck_conflict.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,19 @@
from typing_extensions import Final

from opentrons_shared_data.labware.dev_types import LabwareUri

from opentrons_shared_data.robot.dev_types import RobotType
from opentrons.motion_planning.adjacent_slots_getters import (
get_east_west_slots,
get_south_slot,
get_adjacent_slots,
)

from opentrons.types import DeckSlotName

_FIXED_TRASH_SLOT: Final = 12
_FIXED_TRASH_SLOT: Final[Set[DeckSlotName]] = {
DeckSlotName.FIXED_TRASH,
DeckSlotName.SLOT_A3,
}


# The maximum height allowed for items adjacent to a Heater-Shaker in the x-direction.
Expand Down Expand Up @@ -93,9 +97,9 @@ class OtherModule(_Module):
class _NothingAllowed(NamedTuple):
"""Nothing is allowed in this slot."""

location: int
location: DeckSlotName
source_item: DeckItem
source_location: int
source_location: DeckSlotName

def is_allowed(self, item: DeckItem) -> bool:
return False
Expand All @@ -104,9 +108,9 @@ def is_allowed(self, item: DeckItem) -> bool:
class _MaxHeight(NamedTuple):
"""Nothing over a certain height is allowed in this slot."""

location: int
location: DeckSlotName
source_item: DeckItem
source_location: int
source_location: DeckSlotName
max_height: float
allowed_labware: List[LabwareUri]

Expand All @@ -123,9 +127,9 @@ def is_allowed(self, item: DeckItem) -> bool:
class _NoModule(NamedTuple):
"""No module of any kind is allowed in this slot."""

location: int
location: DeckSlotName
source_item: DeckItem
source_location: int
source_location: DeckSlotName

def is_allowed(self, item: DeckItem) -> bool:
return not isinstance(item, _Module)
Expand All @@ -134,9 +138,9 @@ def is_allowed(self, item: DeckItem) -> bool:
class _NoHeaterShakerModule(NamedTuple):
"""No Heater-Shaker module is allowed in this slot."""

location: int
location: DeckSlotName
source_item: DeckItem
source_location: int
source_location: DeckSlotName

def is_allowed(self, item: DeckItem) -> bool:
return not isinstance(item, HeaterShakerModule)
Expand All @@ -145,7 +149,7 @@ def is_allowed(self, item: DeckItem) -> bool:
class _FixedTrashOnly(NamedTuple):
"""Only fixed-trash labware is allowed in this slot."""

location: int = _FIXED_TRASH_SLOT
location: DeckSlotName

def is_allowed(self, item: DeckItem) -> bool:
return _is_fixed_trash(item)
Expand All @@ -169,25 +173,32 @@ class DeckConflictError(ValueError):
# things that don't fit into a single deck slot, like the Thermocycler.
# Refactor this interface to take a more symbolic location.
def check(
existing_items: Mapping[int, DeckItem],
existing_items: Mapping[DeckSlotName, DeckItem],
new_item: DeckItem,
new_location: int,
new_location: DeckSlotName,
robot_type: RobotType,
) -> None:
"""Check a deck layout for conflicts.
Args:
existing_items: Existing items on the deck, assumed to be valid.
new_item: New item to add to the deck.
new_location: Location where the new item will be added.
robot_type: The type of the robot to choose the restriction rules.
Raises:
DeckConflictError: Adding this item should not be allowed.
"""
restrictions: List[_DeckRestriction] = [_FixedTrashOnly()]

restrictions: List[_DeckRestriction] = [
_FixedTrashOnly(
location=DeckSlotName.FIXED_TRASH.to_equivalent_for_robot_type(robot_type)
)
]
# build restrictions driven by existing items
for location, item in existing_items.items():
restrictions += _create_restrictions(item=item, location=location)
restrictions += _create_restrictions(
item=item, location=location, robot_type=robot_type
)

# check new item against existing restrictions
for r in restrictions:
Expand All @@ -198,7 +209,9 @@ def check(

# check new restrictions required by new item
# do not interfere with existing items
new_restrictions = _create_restrictions(item=new_item, location=new_location)
new_restrictions = _create_restrictions(
item=new_item, location=new_location, robot_type=robot_type
)

for r in new_restrictions:
existing_item = existing_items.get(r.location)
Expand All @@ -211,10 +224,12 @@ def check(
)


def _create_restrictions(item: DeckItem, location: int) -> List[_DeckRestriction]:
def _create_ot2_restrictions(
item: DeckItem, location: DeckSlotName
) -> List[_DeckRestriction]:
restrictions: List[_DeckRestriction] = []

if location != _FIXED_TRASH_SLOT:
if location not in _FIXED_TRASH_SLOT:
# Disallow a different item from overlapping this item in this deck slot.
restrictions.append(
_NothingAllowed(
Expand All @@ -228,18 +243,18 @@ def _create_restrictions(item: DeckItem, location: int) -> List[_DeckRestriction
# A Heater-Shaker can't safely be placed just south of the fixed trash,
# because the fixed trash blocks access to the screw that locks the
# Heater-Shaker onto the deck.
location_south_of_fixed_trash = get_south_slot(location)
location_south_of_fixed_trash = get_south_slot(location.as_int())
if location_south_of_fixed_trash is not None:
restrictions.append(
_NoHeaterShakerModule(
location=location_south_of_fixed_trash,
location=DeckSlotName.from_primitive(location_south_of_fixed_trash),
source_item=item,
source_location=location,
)
)

if isinstance(item, ThermocyclerModule):
for covered_location in _slots_covered_by_thermocycler(item):
for covered_location in _ot2_slots_covered_by_thermocycler(item):
restrictions.append(
_NothingAllowed(
location=covered_location,
Expand All @@ -249,19 +264,19 @@ def _create_restrictions(item: DeckItem, location: int) -> List[_DeckRestriction
)

if isinstance(item, HeaterShakerModule):
for covered_location in get_adjacent_slots(location):
for hs_covered_location in get_adjacent_slots(location.as_int()):
restrictions.append(
_NoModule(
location=covered_location,
location=DeckSlotName.from_primitive(hs_covered_location),
source_item=item,
source_location=location,
)
)

for covered_location in get_east_west_slots(location):
for hs_covered_location in get_east_west_slots(location.as_int()):
restrictions.append(
_MaxHeight(
location=covered_location,
location=DeckSlotName.from_primitive(hs_covered_location),
source_item=item,
source_location=location,
max_height=HS_MAX_X_ADJACENT_ITEM_HEIGHT,
Expand All @@ -272,6 +287,43 @@ def _create_restrictions(item: DeckItem, location: int) -> List[_DeckRestriction
return restrictions


def _create_flex_restrictions(
item: DeckItem, location: DeckSlotName
) -> List[_DeckRestriction]:
restrictions: List[_DeckRestriction] = []

if location not in _FIXED_TRASH_SLOT:
restrictions.append(
_NothingAllowed(
location=location,
source_item=item,
source_location=location,
)
)

if isinstance(item, ThermocyclerModule):
for covered_location in _flex_slots_covered_by_thermocycler():
restrictions.append(
_NothingAllowed(
location=covered_location,
source_item=item,
source_location=location,
)
)

return restrictions


def _create_restrictions(
item: DeckItem, location: DeckSlotName, robot_type: str
) -> List[_DeckRestriction]:

if robot_type == "OT-2 Standard":
return _create_ot2_restrictions(item, location)
else:
return _create_flex_restrictions(item, location)


def _create_deck_conflict_error_message(
restriction: _DeckRestriction,
new_item: Optional[DeckItem] = None,
Expand Down Expand Up @@ -302,11 +354,22 @@ def _create_deck_conflict_error_message(
return message


def _slots_covered_by_thermocycler(thermocycler: ThermocyclerModule) -> Set[int]:
def _ot2_slots_covered_by_thermocycler(
thermocycler: ThermocyclerModule,
) -> Set[DeckSlotName]:
if thermocycler.is_semi_configuration:
return {7, 10}
return {DeckSlotName.SLOT_7, DeckSlotName.SLOT_10}
else:
return {7, 8, 10, 11}
return {
DeckSlotName.SLOT_7,
DeckSlotName.SLOT_10,
DeckSlotName.SLOT_8,
DeckSlotName.SLOT_11,
}


def _flex_slots_covered_by_thermocycler() -> Set[DeckSlotName]:
return {DeckSlotName.SLOT_B1, DeckSlotName.SLOT_A1}


def _is_fixed_trash(item: DeckItem) -> bool:
Expand Down
21 changes: 7 additions & 14 deletions api/src/opentrons/protocol_api/core/engine/deck_conflict.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
OFF_DECK_LOCATION,
)
from opentrons.protocol_engine.errors.exceptions import LabwareNotLoadedOnModuleError
from opentrons.types import DeckSlotName


@overload
Expand Down Expand Up @@ -67,13 +68,6 @@ def check(
opentrons.motion_planning.deck_conflict.DeckConflictError:
If the newly-added item conflicts with one of the existing items.
"""
if engine_state.config.robot_type == "OT-3 Standard":
# No-op if this is an OT-3 deck, for now.
#
# todo(mm, 2023-02-24): Support deck conflict checking for the OT-3.
# This will likely require adding support for it in the underlying
# wrapped_deck_conflict.check() function.
return

if new_labware_id is not None:
new_location_and_item = _map_labware(engine_state, new_labware_id)
Expand All @@ -96,7 +90,7 @@ def check(
)
mapped_existing_modules = (m for m in all_existing_modules if m is not None)

existing_items: Dict[int, wrapped_deck_conflict.DeckItem] = {}
existing_items: Dict[DeckSlotName, wrapped_deck_conflict.DeckItem] = {}
for existing_location, existing_item in itertools.chain(
mapped_existing_labware, mapped_existing_modules
):
Expand All @@ -107,20 +101,21 @@ def check(
existing_items=existing_items,
new_item=new_item,
new_location=new_location,
robot_type=engine_state.config.robot_type,
)


def _map_labware(
engine_state: StateView,
labware_id: str,
) -> Optional[Tuple[int, wrapped_deck_conflict.DeckItem]]:
) -> Optional[Tuple[DeckSlotName, wrapped_deck_conflict.DeckItem]]:
location_from_engine = engine_state.labware.get_location(labware_id=labware_id)

if isinstance(location_from_engine, DeckSlotLocation):
# This labware is loaded directly into a deck slot.
# Map it to a wrapped_deck_conflict.Labware.
return (
_deck_slot_to_int(location_from_engine),
location_from_engine.slotName,
wrapped_deck_conflict.Labware(
name_for_errors=engine_state.labware.get_load_name(
labware_id=labware_id
Expand Down Expand Up @@ -153,12 +148,10 @@ def _map_labware(
def _map_module(
engine_state: StateView,
module_id: str,
) -> Optional[Tuple[int, wrapped_deck_conflict.DeckItem]]:
) -> Optional[Tuple[DeckSlotName, wrapped_deck_conflict.DeckItem]]:
module_model = engine_state.modules.get_connected_model(module_id=module_id)
module_type = module_model.as_type()
mapped_location = _deck_slot_to_int(
engine_state.modules.get_location(module_id=module_id)
)
mapped_location = engine_state.modules.get_location(module_id=module_id).slotName

# Use the module model (e.g. "temperatureModuleV1") as the name for error messages
# because it's convenient for us. Unfortunately, this won't necessarily match
Expand Down
23 changes: 20 additions & 3 deletions api/src/opentrons/protocol_api/core/engine/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from typing import Dict, Optional, Type, Union, List, Tuple

from opentrons.protocol_engine.commands import LoadModuleResult
from opentrons_shared_data.deck.dev_types import DeckDefinitionV3
from opentrons_shared_data.deck.dev_types import DeckDefinitionV3, SlotDefV3
from opentrons_shared_data.labware.labware_definition import LabwareDefinition
from opentrons_shared_data.labware.dev_types import LabwareDefinition as LabwareDefDict
from opentrons_shared_data.pipette.dev_types import PipetteNameType
Expand Down Expand Up @@ -314,17 +314,23 @@ def load_module(
"""Load a module into the protocol."""
assert configuration is None, "Module `configuration` is deprecated"

module_type = ModuleType.from_model(model)
# TODO(mc, 2022-10-20): move to public ProtocolContext
# once `Deck` and `ProtocolEngine` play nicely together
if deck_slot is None:
if ModuleType.from_model(model) == ModuleType.THERMOCYCLER:
module_type = ModuleType.from_model(model)
if module_type == ModuleType.THERMOCYCLER:
deck_slot = DeckSlotName.SLOT_7
else:
raise InvalidModuleLocationError(deck_slot, model.name)

robot_type = self._engine_client.state.config.robot_type
normalized_deck_slot = deck_slot.to_equivalent_for_robot_type(robot_type)
self._ensure_module_location(normalized_deck_slot, module_type)

result = self._engine_client.load_module(
model=EngineModuleModel(model),
location=DeckSlotLocation(slotName=deck_slot),
location=DeckSlotLocation(slotName=normalized_deck_slot),
)

module_core = self._get_module_core(load_module_result=result, model=model)
Expand Down Expand Up @@ -473,6 +479,17 @@ def get_deck_definition(self) -> DeckDefinitionV3:
"""Get the geometry definition of the robot's deck."""
return self._engine_client.state.labware.get_deck_definition()

def get_slot_definition(self, slot: DeckSlotName) -> SlotDefV3:
return self._engine_client.state.labware.get_slot_definition(slot)

def _ensure_module_location(
self, slot: DeckSlotName, module_type: ModuleType
) -> None:
slot_def = self.get_slot_definition(slot)
compatible_modules = slot_def["compatibleModuleTypes"]
if module_type.value not in compatible_modules:
raise ValueError(f"A {module_type.value} cannot be loaded into slot {slot}")

def get_slot_item(
self, slot_name: DeckSlotName
) -> Union[LabwareCore, ModuleCore, NonConnectedModuleCore, None]:
Expand Down
Loading

0 comments on commit 136f933

Please sign in to comment.