Skip to content

Commit

Permalink
utilize loaded static pipette
Browse files Browse the repository at this point in the history
  • Loading branch information
CaseyBatten committed Feb 22, 2024
1 parent 0d1c8c7 commit 94b7106
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 33 deletions.
28 changes: 19 additions & 9 deletions api/src/opentrons/protocol_api/core/engine/labware.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@

from opentrons_shared_data.labware.labware_definition import LabwareRole

from opentrons.protocol_engine import LoadedPipette
from opentrons.protocol_engine.errors import LabwareNotOnDeckError, ModuleNotOnDeckError
from opentrons.protocol_engine.clients import SyncClient as ProtocolEngineClient
from opentrons.types import DeckSlotName, Point
from opentrons.types import DeckSlotName, Point, Mount, MountType

from ..labware import AbstractLabware, LabwareLoadParams
from .well import WellCore
Expand Down Expand Up @@ -123,18 +124,27 @@ def reset_tips(self) -> None:

def get_next_tip(
self,
mount: Mount,
num_tips: int,
starting_tip: Optional[WellCore],
) -> Optional[str]:
return self._engine_client.state.tips.get_next_tip(
labware_id=self._labware_id,
num_tips=num_tips,
starting_tip_name=(
starting_tip.get_name()
if starting_tip and starting_tip.labware_id == self._labware_id
else None
),
pipette = self._engine_client.state.pipettes.get_by_mount(
MountType.from_hw_mount(mount)
)
if isinstance(pipette, LoadedPipette):
pipette_id = pipette.id
return self._engine_client.state.tips.get_next_tip(
pipette_id=pipette_id,
labware_id=self._labware_id,
num_tips=num_tips,
starting_tip_name=(
starting_tip.get_name()
if starting_tip and starting_tip.labware_id == self._labware_id
else None
),
)
else:
raise ValueError(f"No valid Pipette found for Mount {mount.name}")

def get_well_columns(self) -> List[List[str]]:
"""Get the all well names, organized by column, from the labware's definition."""
Expand Down
3 changes: 2 additions & 1 deletion api/src/opentrons/protocol_api/core/labware.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
LabwareDefinition as LabwareDefinitionDict,
)

from opentrons.types import DeckSlotName, Point
from opentrons.types import DeckSlotName, Point, Mount

from .well import WellCoreType

Expand Down Expand Up @@ -111,6 +111,7 @@ def reset_tips(self) -> None:
@abstractmethod
def get_next_tip(
self,
mount: Mount,
num_tips: int,
starting_tip: Optional[WellCoreType],
) -> Optional[str]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from opentrons.protocols.geometry.labware_geometry import LabwareGeometry
from opentrons.protocols.api_support.tip_tracker import TipTracker

from opentrons.types import DeckSlotName, Location, Point
from opentrons.types import DeckSlotName, Location, Point, Mount
from opentrons_shared_data.labware.dev_types import LabwareParameters, LabwareDefinition

from ..labware import AbstractLabware, LabwareLoadParams
Expand Down Expand Up @@ -154,6 +154,7 @@ def reset_tips(self) -> None:

def get_next_tip(
self,
mount: Mount,
num_tips: int,
starting_tip: Optional[LegacyWellCore],
) -> Optional[str]:
Expand Down
4 changes: 4 additions & 0 deletions api/src/opentrons/protocol_api/instrument_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -880,6 +880,7 @@ def pick_up_tip( # noqa: C901
" to pick up."
)
tip_rack, well = labware.next_available_tip(
mount=self._core.get_mount(),
starting_tip=self.starting_tip,
tip_racks=self.tip_racks,
channels=active_channels,
Expand All @@ -891,6 +892,7 @@ def pick_up_tip( # noqa: C901

elif isinstance(location, labware.Labware):
tip_rack, well = labware.next_available_tip(
mount=self._core.get_mount(),
starting_tip=None,
tip_racks=[location],
channels=active_channels,
Expand All @@ -906,6 +908,7 @@ def pick_up_tip( # noqa: C901

elif maybe_tip_rack is not None:
tip_rack, well = labware.next_available_tip(
mount=self._core.get_mount(),
starting_tip=None,
tip_racks=[maybe_tip_rack],
channels=active_channels,
Expand Down Expand Up @@ -1321,6 +1324,7 @@ def transfer( # noqa: C901

if new_tip != types.TransferTipPolicy.NEVER:
tr, next_tip = labware.next_available_tip(
self._core.get_mount(),
self.starting_tip,
self.tip_racks,
active_channels,
Expand Down
13 changes: 9 additions & 4 deletions api/src/opentrons/protocol_api/labware.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

from opentrons_shared_data.labware.dev_types import LabwareDefinition, LabwareParameters

from opentrons.types import Location, Point
from opentrons.types import Location, Point, Mount
from opentrons.protocols.api_support.types import APIVersion
from opentrons.protocols.api_support.util import requires_version, APIVersionError

Expand Down Expand Up @@ -884,6 +884,7 @@ def tip_length(self, length: float) -> None:
# TODO(mc, 2022-11-09): implementation detail; deprecate public method
def next_tip(
self,
mount: Mount,
num_tips: int = 1,
starting_tip: Optional[Well] = None,
) -> Optional[Well]:
Expand All @@ -904,6 +905,7 @@ def next_tip(
assert num_tips > 0, f"num_tips must be positive integer, but got {num_tips}"

well_name = self._core.get_next_tip(
mount=mount,
num_tips=num_tips,
starting_tip=starting_tip._core if starting_tip else None,
)
Expand Down Expand Up @@ -1063,6 +1065,7 @@ def split_tipracks(tip_racks: List[Labware]) -> Tuple[Labware, List[Labware]]:

# TODO(mc, 2022-11-09): implementation detail, move to core
def select_tiprack_from_list(
mount: Mount,
tip_racks: List[Labware],
num_channels: int,
starting_point: Optional[Well] = None,
Expand All @@ -1081,11 +1084,11 @@ def select_tiprack_from_list(
else:
first_well = first.wells()[0]

next_tip = first.next_tip(num_channels, first_well)
next_tip = first.next_tip(mount, num_channels, first_well)
if next_tip:
return first, next_tip
else:
return select_tiprack_from_list(rest, num_channels)
return select_tiprack_from_list(mount, rest, num_channels)


# TODO(mc, 2022-11-09): implementation detail, move to core
Expand All @@ -1097,15 +1100,17 @@ def filter_tipracks_to_start(

# TODO(mc, 2022-11-09): implementation detail, move to core
def next_available_tip(
mount: Mount,
starting_tip: Optional[Well],
tip_racks: List[Labware],
channels: int,
) -> Tuple[Labware, Well]:
start = starting_tip
if start is None:
return select_tiprack_from_list(tip_racks, channels)
return select_tiprack_from_list(mount, tip_racks, channels)
else:
return select_tiprack_from_list(
mount,
filter_tipracks_to_start(start, tip_racks),
channels,
start,
Expand Down
44 changes: 26 additions & 18 deletions api/src/opentrons/protocol_engine/state/tips.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class TipState:
channels_by_pipette_id: Dict[str, int]
length_by_pipette_id: Dict[str, float]
active_channels_by_pipette_id: Dict[str, int]
nozzle_map: Optional[NozzleMap]
nozzle_map_by_pipette_id: Dict[str, NozzleMap]


class TipStore(HasState[TipState], HandlesActions):
Expand All @@ -59,7 +59,7 @@ def __init__(self) -> None:
channels_by_pipette_id={},
length_by_pipette_id={},
active_channels_by_pipette_id={},
nozzle_map=None,
nozzle_map_by_pipette_id={},
)

def handle_action(self, action: Action) -> None:
Expand All @@ -70,6 +70,7 @@ def handle_action(self, action: Action) -> None:
config = action.private_result.config
self._state.channels_by_pipette_id[pipette_id] = config.channels
self._state.active_channels_by_pipette_id[pipette_id] = config.channels
self._state.nozzle_map_by_pipette_id[pipette_id] = config.nozzle_map
self._handle_command(action.command)

if isinstance(action.private_result, PipetteNozzleLayoutResultMixin):
Expand All @@ -79,7 +80,7 @@ def handle_action(self, action: Action) -> None:
self._state.active_channels_by_pipette_id[
pipette_id
] = nozzle_map.tip_count
self._state.nozzle_map = nozzle_map
self._state.nozzle_map_by_pipette_id[pipette_id] = nozzle_map
else:
self._state.active_channels_by_pipette_id[
pipette_id
Expand Down Expand Up @@ -114,18 +115,24 @@ def _handle_command(self, command: Command) -> None:
well_name = command.params.wellName
pipette_id = command.params.pipetteId
length = command.result.tipLength
self._set_used_tips(well_name=well_name, labware_id=labware_id)
self._set_used_tips(
pipette_id=pipette_id, well_name=well_name, labware_id=labware_id
)
self._state.length_by_pipette_id[pipette_id] = length

elif isinstance(command.result, (DropTipResult, DropTipInPlaceResult)):
pipette_id = command.params.pipetteId
self._state.length_by_pipette_id.pop(pipette_id, None)

def _set_used_tips(self, well_name: str, labware_id: str) -> None: # noqa: C901
def _set_used_tips(
self, pipette_id: str, well_name: str, labware_id: str
) -> None: # noqa: C901
columns = self._state.column_by_labware_id.get(labware_id, [])
wells = self._state.tips_by_labware_id.get(labware_id, {})
nozzle_map = self._state.nozzle_map
nozzle_map = self._state.nozzle_map_by_pipette_id[pipette_id]

# fix this None logic?
# replace this with the whole nozzle map mask instead of the active nozzles?
if nozzle_map is not None:
num_nozzle_cols = len(nozzle_map.columns)
num_nozzle_rows = len(nozzle_map.rows)
Expand All @@ -152,8 +159,7 @@ def _set_used_tips(self, well_name: str, labware_id: str) -> None: # noqa: C901
well = columns[critical_column - i][critical_row - j]
wells[well] = TipRackWellState.USED
else:
# TODO: (cb, 2024-2-14): update/remove this case as soon as we gaurantee a nozzle map upon loading a pipette with (Jira RSS-441.)
wells[well_name] = TipRackWellState.USED
raise RuntimeError(f"No Nozzle Map found for Pipette-ID: {pipette_id}.")


class TipView(HasState[TipState]):
Expand All @@ -175,17 +181,19 @@ def __init__(self, state: TipState) -> None:
# or backmost single nozzle configuration of an 8-channel.
def get_next_tip( # noqa: C901
self,
pipette_id: str,
labware_id: str,
num_tips: int,
starting_tip_name: Optional[str],
) -> Optional[str]:
"""Get the next available clean tip."""
wells = self._state.tips_by_labware_id.get(labware_id, {})
columns = self._state.column_by_labware_id.get(labware_id, [])
nozzle_map = self._state.nozzle_map_by_pipette_id[pipette_id]

if self._state.nozzle_map is not None:
num_nozzle_cols = len(self._state.nozzle_map.columns)
num_nozzle_rows = len(self._state.nozzle_map.rows)
if nozzle_map is not None:
num_nozzle_cols = len(nozzle_map.columns)
num_nozzle_rows = len(nozzle_map.rows)

def _identify_tip_cluster(
critical_column: int, critical_row: int
Expand Down Expand Up @@ -233,7 +241,7 @@ def _validate_tip_cluster(tip_cluster: List[str]) -> Optional[str]:
f"Tiprack {labware_id} has no valid tip selection for current Nozzle Configuration."
)

if self._state.nozzle_map.starting_nozzle == "A1":
if nozzle_map.starting_nozzle == "A1":
# Define the critical well by the position of the well relative to Tip Rack entry point H12
critical_column = len(columns) - num_nozzle_cols
critical_row = len(columns[critical_column]) - num_nozzle_rows
Expand All @@ -255,12 +263,12 @@ def _validate_tip_cluster(tip_cluster: List[str]) -> Optional[str]:
)
return None

elif self._state.nozzle_map.starting_nozzle == "A12":
elif nozzle_map.starting_nozzle == "A12":
# Define the critical well by the position of the well relative to Tip Rack entry point H1
critical_column = num_nozzle_cols - 1
critical_row = len(columns[critical_column]) - num_nozzle_rows

while critical_column <= len(columns): # change to max size of labware
while critical_column <= len(columns): # change to max size of labware
tip_cluster = _identify_tip_cluster(critical_column, critical_row)
result = _validate_tip_cluster(tip_cluster)
if isinstance(result, str):
Expand All @@ -277,7 +285,7 @@ def _validate_tip_cluster(tip_cluster: List[str]) -> Optional[str]:
)
return None

elif self._state.nozzle_map.starting_nozzle == "H1":
elif nozzle_map.starting_nozzle == "H1":
# Define the critical well by the position of the well relative to Tip Rack entry point A12
critical_column = len(columns) - num_nozzle_cols
critical_row = num_nozzle_rows - 1
Expand All @@ -297,7 +305,7 @@ def _validate_tip_cluster(tip_cluster: List[str]) -> Optional[str]:
critical_row = num_nozzle_rows - 1
return None

elif self._state.nozzle_map.starting_nozzle == "H12":
elif nozzle_map.starting_nozzle == "H12":
# Define the critical well by the position of the well relative to Tip Rack entry point A1
critical_column = num_nozzle_cols - 1
critical_row = num_nozzle_rows - 1
Expand All @@ -319,10 +327,10 @@ def _validate_tip_cluster(tip_cluster: List[str]) -> Optional[str]:

else:
raise ValueError(
f"Nozzle {self._state.nozzle_map.starting_nozzle} is an invalid starting tip for automatic tip pickup."
f"Nozzle {nozzle_map.starting_nozzle} is an invalid starting tip for automatic tip pickup."
)
else:
return None
raise RuntimeError(f"No Nozzle Map found for Pipette-ID: {pipette_id}.")

def get_pipette_channels(self, pipette_id: str) -> int:
"""Return the given pipette's number of channels."""
Expand Down

0 comments on commit 94b7106

Please sign in to comment.