-
Notifications
You must be signed in to change notification settings - Fork 178
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(api): enable deck conflict checker for the flex #13189
Changes from 25 commits
45d3338
8fbfd93
7c77ebd
f0cd5b5
b9d77e4
a97512a
815a764
b4aaa04
f3a8400
7e643f8
17c6ed0
b4fa2d0
937bdcc
cd66144
bfe6404
d9cdf99
81c6fc3
0a95b8c
351c369
51dae4c
91d4e48
62e1abf
09dc495
2e6a26f
c12bff6
0a7c60e
6c6b86d
28f6c75
db8c98a
6abab47
3168268
efbdc75
87b39fc
bf2810e
759061c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,8 +13,9 @@ | |
get_adjacent_slots, | ||
) | ||
|
||
from opentrons.types import DeckSlotName | ||
|
||
_FIXED_TRASH_SLOT: Final = 12 | ||
_FIXED_TRASH_SLOT: Final = DeckSlotName.FIXED_TRASH | ||
|
||
|
||
# The maximum height allowed for items adjacent to a Heater-Shaker in the x-direction. | ||
|
@@ -93,9 +94,9 @@ class OtherModule(_Module): | |
class _NothingAllowed(NamedTuple): | ||
"""Nothing is allowed in this slot.""" | ||
|
||
location: int | ||
location: DeckSlotName | ||
source_item: DeckItem | ||
source_location: int | ||
source_location: DeckSlotName | ||
|
||
def is_allowed(self, item: DeckItem) -> bool: | ||
return False | ||
|
@@ -104,9 +105,9 @@ def is_allowed(self, item: DeckItem) -> bool: | |
class _MaxHeight(NamedTuple): | ||
"""Nothing over a certain height is allowed in this slot.""" | ||
|
||
location: int | ||
location: DeckSlotName | ||
source_item: DeckItem | ||
source_location: int | ||
source_location: DeckSlotName | ||
max_height: float | ||
allowed_labware: List[LabwareUri] | ||
|
||
|
@@ -123,9 +124,9 @@ def is_allowed(self, item: DeckItem) -> bool: | |
class _NoModule(NamedTuple): | ||
"""No module of any kind is allowed in this slot.""" | ||
|
||
location: int | ||
location: DeckSlotName | ||
source_item: DeckItem | ||
source_location: int | ||
source_location: DeckSlotName | ||
|
||
def is_allowed(self, item: DeckItem) -> bool: | ||
return not isinstance(item, _Module) | ||
|
@@ -134,9 +135,9 @@ def is_allowed(self, item: DeckItem) -> bool: | |
class _NoHeaterShakerModule(NamedTuple): | ||
"""No Heater-Shaker module is allowed in this slot.""" | ||
|
||
location: int | ||
location: DeckSlotName | ||
source_item: DeckItem | ||
source_location: int | ||
source_location: DeckSlotName | ||
|
||
def is_allowed(self, item: DeckItem) -> bool: | ||
return not isinstance(item, HeaterShakerModule) | ||
|
@@ -145,7 +146,7 @@ def is_allowed(self, item: DeckItem) -> bool: | |
class _FixedTrashOnly(NamedTuple): | ||
"""Only fixed-trash labware is allowed in this slot.""" | ||
|
||
location: int = _FIXED_TRASH_SLOT | ||
location: DeckSlotName = _FIXED_TRASH_SLOT | ||
|
||
def is_allowed(self, item: DeckItem) -> bool: | ||
return _is_fixed_trash(item) | ||
|
@@ -169,9 +170,10 @@ class DeckConflictError(ValueError): | |
# things that don't fit into a single deck slot, like the Thermocycler. | ||
# Refactor this interface to take a more symbolic location. | ||
def check( | ||
existing_items: Mapping[int, DeckItem], | ||
existing_items: Mapping[DeckSlotName, DeckItem], | ||
new_item: DeckItem, | ||
new_location: int, | ||
new_location: DeckSlotName, | ||
robot_type: str = "OT-2 Standard", | ||
TamarZanzouri marked this conversation as resolved.
Show resolved
Hide resolved
|
||
) -> None: | ||
"""Check a deck layout for conflicts. | ||
|
||
|
@@ -184,10 +186,11 @@ def check( | |
DeckConflictError: Adding this item should not be allowed. | ||
""" | ||
restrictions: List[_DeckRestriction] = [_FixedTrashOnly()] | ||
SyntaxColoring marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
# build restrictions driven by existing items | ||
for location, item in existing_items.items(): | ||
restrictions += _create_restrictions(item=item, location=location) | ||
restrictions += _create_restrictions( | ||
item=item, location=location, robot_type=robot_type | ||
) | ||
|
||
# check new item against existing restrictions | ||
for r in restrictions: | ||
|
@@ -198,7 +201,9 @@ def check( | |
|
||
# check new restrictions required by new item | ||
# do not interfere with existing items | ||
new_restrictions = _create_restrictions(item=new_item, location=new_location) | ||
new_restrictions = _create_restrictions( | ||
item=new_item, location=new_location, robot_type=robot_type | ||
) | ||
|
||
for r in new_restrictions: | ||
existing_item = existing_items.get(r.location) | ||
|
@@ -211,7 +216,9 @@ def check( | |
) | ||
|
||
|
||
def _create_restrictions(item: DeckItem, location: int) -> List[_DeckRestriction]: | ||
def _create_ot2_restrictions( | ||
item: DeckItem, location: DeckSlotName | ||
) -> List[_DeckRestriction]: | ||
restrictions: List[_DeckRestriction] = [] | ||
|
||
if location != _FIXED_TRASH_SLOT: | ||
|
@@ -228,11 +235,11 @@ def _create_restrictions(item: DeckItem, location: int) -> List[_DeckRestriction | |
# A Heater-Shaker can't safely be placed just south of the fixed trash, | ||
# because the fixed trash blocks access to the screw that locks the | ||
# Heater-Shaker onto the deck. | ||
location_south_of_fixed_trash = get_south_slot(location) | ||
location_south_of_fixed_trash = get_south_slot(location.as_int()) | ||
if location_south_of_fixed_trash is not None: | ||
restrictions.append( | ||
_NoHeaterShakerModule( | ||
location=location_south_of_fixed_trash, | ||
location=DeckSlotName.from_primitive(location_south_of_fixed_trash), | ||
source_item=item, | ||
source_location=location, | ||
) | ||
|
@@ -249,19 +256,19 @@ def _create_restrictions(item: DeckItem, location: int) -> List[_DeckRestriction | |
) | ||
|
||
if isinstance(item, HeaterShakerModule): | ||
for covered_location in get_adjacent_slots(location): | ||
for hs_covered_location in get_adjacent_slots(location.as_int()): | ||
restrictions.append( | ||
_NoModule( | ||
location=covered_location, | ||
location=DeckSlotName.from_primitive(hs_covered_location), | ||
source_item=item, | ||
source_location=location, | ||
) | ||
) | ||
|
||
for covered_location in get_east_west_slots(location): | ||
for hs_covered_location in get_east_west_slots(location.as_int()): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should these functions also take in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I tired doing it but its tied to much to the legacy implementation and we are already parsing this as int in other places so might as well leave it |
||
restrictions.append( | ||
_MaxHeight( | ||
location=covered_location, | ||
location=DeckSlotName.from_primitive(hs_covered_location), | ||
source_item=item, | ||
source_location=location, | ||
max_height=HS_MAX_X_ADJACENT_ITEM_HEIGHT, | ||
|
@@ -272,6 +279,41 @@ def _create_restrictions(item: DeckItem, location: int) -> List[_DeckRestriction | |
return restrictions | ||
|
||
|
||
def _create_flex_restrictions( | ||
item: DeckItem, location: DeckSlotName | ||
) -> List[_DeckRestriction]: | ||
restrictions: List[_DeckRestriction] = [] | ||
|
||
if isinstance(item, ThermocyclerModule): | ||
for covered_location in _flex_slots_covered_by_thermocycler(): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I thought the TC could be in multiple slots, is this not the case? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. its no slot 7,10. I am setting both slots as occupied if we are in the TC case |
||
restrictions.append( | ||
_NothingAllowed( | ||
location=covered_location, | ||
source_item=item, | ||
source_location=location, | ||
) | ||
) | ||
else: | ||
restrictions.append( | ||
_NothingAllowed( | ||
location=location, | ||
source_item=item, | ||
source_location=location, | ||
) | ||
) | ||
return restrictions | ||
|
||
|
||
def _create_restrictions( | ||
item: DeckItem, location: DeckSlotName, robot_type: str | ||
) -> List[_DeckRestriction]: | ||
|
||
if robot_type == "OT-2 Standard": | ||
return _create_ot2_restrictions(item, location) | ||
else: | ||
return _create_flex_restrictions(item, location) | ||
|
||
|
||
def _create_deck_conflict_error_message( | ||
restriction: _DeckRestriction, | ||
new_item: Optional[DeckItem] = None, | ||
|
@@ -302,11 +344,22 @@ def _create_deck_conflict_error_message( | |
return message | ||
|
||
|
||
def _slots_covered_by_thermocycler(thermocycler: ThermocyclerModule) -> Set[int]: | ||
def _slots_covered_by_thermocycler( | ||
TamarZanzouri marked this conversation as resolved.
Show resolved
Hide resolved
|
||
thermocycler: ThermocyclerModule, | ||
) -> Set[DeckSlotName]: | ||
if thermocycler.is_semi_configuration: | ||
return {7, 10} | ||
return {DeckSlotName.SLOT_7, DeckSlotName.SLOT_10} | ||
else: | ||
return {7, 8, 10, 11} | ||
return { | ||
DeckSlotName.SLOT_7, | ||
DeckSlotName.SLOT_10, | ||
DeckSlotName.SLOT_8, | ||
DeckSlotName.SLOT_11, | ||
} | ||
|
||
|
||
def _flex_slots_covered_by_thermocycler() -> Set[DeckSlotName]: | ||
return {DeckSlotName.SLOT_B1, DeckSlotName.SLOT_A1} | ||
|
||
|
||
def _is_fixed_trash(item: DeckItem) -> bool: | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,7 +3,7 @@ | |
from typing import Dict, Optional, Type, Union, List, Tuple | ||
|
||
from opentrons.protocol_engine.commands import LoadModuleResult | ||
from opentrons_shared_data.deck.dev_types import DeckDefinitionV3 | ||
from opentrons_shared_data.deck.dev_types import DeckDefinitionV3, SlotDefV3 | ||
from opentrons_shared_data.labware.labware_definition import LabwareDefinition | ||
from opentrons_shared_data.labware.dev_types import LabwareDefinition as LabwareDefDict | ||
from opentrons_shared_data.pipette.dev_types import PipetteNameType | ||
|
@@ -314,14 +314,21 @@ def load_module( | |
"""Load a module into the protocol.""" | ||
assert configuration is None, "Module `configuration` is deprecated" | ||
|
||
module_type = ModuleType.from_model(model) | ||
# TODO(mc, 2022-10-20): move to public ProtocolContext | ||
# once `Deck` and `ProtocolEngine` play nicely together | ||
if deck_slot is None: | ||
if ModuleType.from_model(model) == ModuleType.THERMOCYCLER: | ||
deck_slot = DeckSlotName.SLOT_7 | ||
if module_type == ModuleType.THERMOCYCLER: | ||
deck_slot = ( | ||
SyntaxColoring marked this conversation as resolved.
Show resolved
Hide resolved
|
||
DeckSlotName.SLOT_7 | ||
if self._engine_client.state.config.robot_type == "OT-2 Standard" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can also use deck robot property but this feels the same. comparing a string. |
||
else DeckSlotName.SLOT_B1 | ||
) | ||
else: | ||
raise InvalidModuleLocationError(deck_slot, model.name) | ||
|
||
self._ensure_module_location(deck_slot, module_type) | ||
|
||
result = self._engine_client.load_module( | ||
model=EngineModuleModel(model), | ||
location=DeckSlotLocation(slotName=deck_slot), | ||
|
@@ -473,6 +480,17 @@ def get_deck_definition(self) -> DeckDefinitionV3: | |
"""Get the geometry definition of the robot's deck.""" | ||
return self._engine_client.state.labware.get_deck_definition() | ||
|
||
def get_slot_definition(self, slot: DeckSlotName) -> SlotDefV3: | ||
return self._engine_client.state.labware.get_slot_definition(slot) | ||
|
||
def _ensure_module_location( | ||
self, slot: DeckSlotName, module_type: ModuleType | ||
) -> None: | ||
slot_def = self.get_slot_definition(slot) | ||
compatible_modules = slot_def["compatibleModuleTypes"] | ||
if module_type.value not in compatible_modules: | ||
raise ValueError(f"A {module_type.value} cannot be loaded into slot {slot}") | ||
|
||
def get_slot_item( | ||
self, slot_name: DeckSlotName | ||
) -> Union[LabwareCore, ModuleCore, NonConnectedModuleCore, None]: | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is totally reasonable to do, but if you really hate it you could create a enum and/or bitfield in which you can specify what items you want to check for collisions and push up the robot type check higher.