Skip to content

Commit

Permalink
feat(api): track volumes from multichannel configs
Browse files Browse the repository at this point in the history
This PR adds the capability to properly track volume changes made by
multichannel pipettes (and partial tip loadings of multichannel
pipettes) to the engine.

There are two ways in which we need to handle multichannel nozzle
configurations specially compared to single-channel configurations.

First, and what EXEC-795 is about, is that pipettes with multiple active
nozzles will aspirate out of or dispense into multiple wells in an
aspirate/dispense/in_place command. Which wells the pipette touches is a
matter of projecting the pipette nozzle map out over the layout of the
labware and predicting which wells are interacted with.

This is itself non-trivial because labware can have many formats. What
we can do is make the math work correctly when possible - when the
labware is laid out normally enough that we can do projections of this
type - and fall back to pretending to be a single channel if we fail.
Since we're computing the logical equivalent of actual physical state,
and if the labware is irregular it's unlikely that a multiple nozzle
layout will physically work with the labware, I think this is safe.

Specifically the thing we need to do is generalize the logic used in the
tip store to project which tips are picked up by a multichannel to
labware of different formats. Our multichannel pipette nozzles are laid
out to match SBS 96-well plates, and so that's our "default" labware. On
labware that follows SBS patterns but is more dense - a 384 plate, for
instance - then we have to subsample, picking a single well in each
group of (well_count / 96) that occupies the same space as a 96-well
well to interact with. On labware that follows SBS patterns but is less
dense - a 12-column reservoir, for instance - then we have to
supersample, letting a labware well be touched by multiple nozzles.

The second thing we have to deal with is that if the labware is a
reservoir or reservoir-like - it has fewer wells than we have nozzles -
then the common case is that multiple nozzles are in a well, and in that
case if we're keeping track of the volume taken out of or added into a
well we have to multiply the operation volume by the number of nozzles
per well, which we can get by just dividing sizes without taking into
account pattern overlap.

Closes EXEC-795
  • Loading branch information
sfoster1 committed Nov 5, 2024
1 parent 3200655 commit 64ba417
Show file tree
Hide file tree
Showing 10 changed files with 317 additions and 60 deletions.
13 changes: 10 additions & 3 deletions api/src/opentrons/protocol_engine/commands/aspirate.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,9 @@ async def execute(self, params: AspirateParams) -> _ExecuteReturn:
except PipetteOverpressureError as e:
state_update.set_liquid_operated(
labware_id=labware_id,
well_name=well_name,
well_names=self._state_view.geometry.get_wells_covered_by_pipette_focused_on_well(
labware_id, well_name, pipette_id
),
volume_added=CLEAR,
)
state_update.set_fluid_unknown(pipette_id=params.pipetteId)
Expand All @@ -167,8 +169,13 @@ async def execute(self, params: AspirateParams) -> _ExecuteReturn:
else:
state_update.set_liquid_operated(
labware_id=labware_id,
well_name=well_name,
volume_added=-volume_aspirated,
well_names=self._state_view.geometry.get_wells_covered_by_pipette_focused_on_well(
labware_id, well_name, pipette_id
),
volume_added=-volume_aspirated
* self._state_view.geometry.get_nozzles_per_well(
labware_id, well_name, pipette_id
),
)
state_update.set_fluid_aspirated(
pipette_id=params.pipetteId,
Expand Down
19 changes: 16 additions & 3 deletions api/src/opentrons/protocol_engine/commands/aspirate_in_place.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,11 @@ async def execute(self, params: AspirateInPlaceParams) -> _ExecuteReturn:
):
state_update.set_liquid_operated(
labware_id=current_location.labware_id,
well_name=current_location.well_name,
well_names=self._state_view.geometry.get_wells_covered_by_pipette_focused_on_well(
current_location.labware_id,
current_location.well_name,
params.pipetteId,
),
volume_added=CLEAR,
)
state_update.set_fluid_unknown(pipette_id=params.pipetteId)
Expand Down Expand Up @@ -150,8 +154,17 @@ async def execute(self, params: AspirateInPlaceParams) -> _ExecuteReturn:
):
state_update.set_liquid_operated(
labware_id=current_location.labware_id,
well_name=current_location.well_name,
volume_added=-volume,
well_names=self._state_view.geometry.get_wells_covered_by_pipette_focused_on_well(
current_location.labware_id,
current_location.well_name,
params.pipetteId,
),
volume_added=-volume
* self._state_view.geometry.get_nozzles_per_well(
current_location.labware_id,
current_location.well_name,
params.pipetteId,
),
)

return SuccessData(
Expand Down
13 changes: 11 additions & 2 deletions api/src/opentrons/protocol_engine/commands/dispense.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Dispense command request, result, and implementation models."""

from __future__ import annotations
from typing import TYPE_CHECKING, Optional, Type, Union
from typing_extensions import Literal
Expand Down Expand Up @@ -109,7 +110,9 @@ async def execute(self, params: DispenseParams) -> _ExecuteReturn:
except PipetteOverpressureError as e:
state_update.set_liquid_operated(
labware_id=labware_id,
well_name=well_name,
well_names=self._state_view.geometry.get_wells_covered_by_pipette_focused_on_well(
labware_id, well_name, params.pipetteId
),
volume_added=CLEAR,
)
state_update.set_fluid_unknown(pipette_id=params.pipetteId)
Expand All @@ -134,9 +137,15 @@ async def execute(self, params: DispenseParams) -> _ExecuteReturn:
pipette_id=params.pipetteId, volume=volume
)
)
if volume_added is not None:
volume_added *= self._state_view.geometry.get_nozzles_per_well(
labware_id, well_name, params.pipetteId
)
state_update.set_liquid_operated(
labware_id=labware_id,
well_name=well_name,
well_names=self._state_view.geometry.get_wells_covered_by_pipette_focused_on_well(
labware_id, well_name, params.pipetteId
),
volume_added=volume_added if volume_added is not None else CLEAR,
)
state_update.set_fluid_ejected(pipette_id=params.pipetteId, volume=volume)
Expand Down
19 changes: 17 additions & 2 deletions api/src/opentrons/protocol_engine/commands/dispense_in_place.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Dispense-in-place command request, result, and implementation models."""

from __future__ import annotations
from typing import TYPE_CHECKING, Optional, Type, Union
from typing_extensions import Literal
Expand Down Expand Up @@ -91,7 +92,11 @@ async def execute(self, params: DispenseInPlaceParams) -> _ExecuteReturn:
):
state_update.set_liquid_operated(
labware_id=current_location.labware_id,
well_name=current_location.well_name,
well_names=self._state_view.geometry.get_wells_covered_by_pipette_focused_on_well(
current_location.labware_id,
current_location.well_name,
params.pipetteId,
),
volume_added=CLEAR,
)
state_update.set_fluid_unknown(pipette_id=params.pipetteId)
Expand Down Expand Up @@ -129,9 +134,19 @@ async def execute(self, params: DispenseInPlaceParams) -> _ExecuteReturn:
pipette_id=params.pipetteId, volume=volume
)
)
if volume_added is not None:
volume_added *= self._state_view.geometry.get_nozzles_per_well(
current_location.labware_id,
current_location.well_name,
params.pipetteId,
)
state_update.set_liquid_operated(
labware_id=current_location.labware_id,
well_name=current_location.well_name,
well_names=self._state_view.geometry.get_wells_covered_by_pipette_focused_on_well(
current_location.labware_id,
current_location.well_name,
params.pipetteId,
),
volume_added=volume_added if volume_added is not None else CLEAR,
)
return SuccessData(
Expand Down
186 changes: 186 additions & 0 deletions api/src/opentrons/protocol_engine/state/_well_math.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
"""Utilities for doing coverage math on wells."""

from typing import Iterator
from typing_extensions import assert_never
from opentrons_shared_data.errors.exceptions import (
InvalidStoredData,
InvalidProtocolData,
)

from opentrons.hardware_control.nozzle_manager import NozzleMap


def wells_covered_by_pipette_configuration(
nozzle_map: NozzleMap,
target_well: str,
labware_wells_by_column: list[list[str]],
) -> Iterator[str]:
"""Compute the wells covered by a pipette nozzle configuration."""
if len(labware_wells_by_column) >= 12 and len(labware_wells_by_column[0]) >= 8:
yield from wells_covered_dense(
nozzle_map,
target_well,
labware_wells_by_column,
)
elif len(labware_wells_by_column) < 12 and len(labware_wells_by_column[0]) < 8:
yield from wells_covered_sparse(
nozzle_map, target_well, labware_wells_by_column
)
else:
raise InvalidStoredData(
"Labware of non-SBS and non-reservoir format cannot be handled"
)


def row_col_ordinals_from_column_major_map(
target_well: str, column_major_wells: list[list[str]]
) -> tuple[int, int]:
"""Turn a well name into the index of its row and column (in that order) within the labware."""
for column_index, column in enumerate(column_major_wells):
if target_well in column:
return column.index(target_well), column_index
raise InvalidStoredData(f"Well name {target_well} is not present in labware")


def wells_covered_dense(
nozzle_map: NozzleMap, target_well: str, target_wells_by_column: list[list[str]]
) -> Iterator[str]:
"""Get the list of wells covered by a nozzle map on an SBS format labware with a specified multiplier of 96 into the number of wells.
This will handle the offsetting of the nozzle map into higher-density well plates. For instance, a full column config target at A1 of a
96 plate would cover wells A1, B1, C1, D1, E1, F1, G1, H1, and use downsample_factor 1.0 (96*1 = 96). A full column config target on a
384 plate would cover wells A1, C1, E1, G1, I1, K1, M1, O1 and use downsample_factor 4.0 (96*4 = 384), while a full column config
targeting B1 would cover wells B1, D1, F1, H1, J1, L1, N1, P1 - still using downsample_factor 4.0, with the offset gathered from the
target well.
The function may also handle sub-96 regular labware with fractional downsample factors, but that's physically improbable and it's not
tested. If you have a regular labware with fewer than 96 wells that is still regularly-spaced and has little enough space between well
walls that it's reasonable to use with multiple channels, you probably want wells_covered_trough.
"""
target_row_index, target_column_index = row_col_ordinals_from_column_major_map(
target_well, target_wells_by_column
)
column_downsample = len(target_wells_by_column) // 12
row_downsample = len(target_wells_by_column[0]) // 8
if column_downsample < 1 or row_downsample < 1:
raise InvalidStoredData(
"This labware cannot be used wells_covered_dense because it is less dense than an SBS 96 standard"
)

for nozzle_column in range(len(nozzle_map.columns)):
target_column_offset = nozzle_column * column_downsample
for nozzle_row in range(len(nozzle_map.rows)):
target_row_offset = nozzle_row * row_downsample
if nozzle_map.starting_nozzle == "A1":
if (
target_column_index + target_column_offset
< len(target_wells_by_column)
) and (
target_row_index + target_row_offset
< len(target_wells_by_column[target_column_index])
):
yield target_wells_by_column[
target_column_index + target_column_offset
][target_row_index + target_row_offset]
elif nozzle_map.starting_nozzle == "A12":
if (target_column_index - target_column_offset >= 0) and (
target_row_index + target_row_offset
< len(target_wells_by_column[target_column_index])
):
yield target_wells_by_column[
target_column_index - target_column_offset
][target_row_index + target_row_offset]
elif nozzle_map.starting_nozzle == "H1":
if (
target_column_index + target_column_offset
< len(target_wells_by_column)
) and (target_row_index - target_row_offset >= 0):
yield target_wells_by_column[
target_column_index + target_column_offset
][target_row_index - target_row_offset]
elif nozzle_map.starting_nozzle == "H12":
if (target_column_index - target_column_offset >= 0) and (
target_row_index - target_row_offset >= 0
):
yield target_wells_by_column[
target_column_index - target_column_offset
][target_row_index - target_row_offset]
else:
raise InvalidProtocolData(
f"A pipette nozzle configuration may not having a starting nozzle of {nozzle_map.starting_nozzle}"
)


def wells_covered_sparse(
nozzle_map: NozzleMap, target_well: str, target_wells_by_column: list[list[str]]
) -> Iterator[str]:
"""Get the list of wells covered by a nozzle map on a column-oriented reservoir.
This function handles reservoirs whose wells span multiple rows and columns - the most common case is something like a
12-well reservoir, whose wells are the height of an SBS column and the width of an SBS row, or a 1-well reservoir whose well
is the size of an SBS active area.
"""
target_row_index, target_column_index = row_col_ordinals_from_column_major_map(
target_well, target_wells_by_column
)
column_upsample = 12 // len(target_wells_by_column)
row_upsample = 8 // len(target_wells_by_column[0])
if column_upsample < 1 or row_upsample < 1:
raise InvalidStoredData(
"This labware cannot be uased with wells_covered_sparse because it is more dense than an SBS 96 standard."
)
for nozzle_column in range(max(1, len(nozzle_map.columns) // column_upsample)):
for nozzle_row in range(max(1, len(nozzle_map.rows) // row_upsample)):
if nozzle_map.starting_nozzle == "A1":
if (
target_column_index + nozzle_column < len(target_wells_by_column)
) and (
target_row_index + nozzle_row
< len(target_wells_by_column[target_column_index])
):
yield target_wells_by_column[target_column_index + nozzle_column][
target_row_index + nozzle_row
]
elif nozzle_map.starting_nozzle == "A12":
if (target_column_index - nozzle_column >= 0) and (
target_row_index + nozzle_row
< len(target_wells_by_column[target_column_index])
):
yield target_wells_by_column[
target_column_index - nozzle_column
][target_row_index + nozzle_row]
elif nozzle_map.starting_nozzle == "H1":
if (
target_column_index + nozzle_column
< len(target_wells_by_column[target_column_index])
) and (target_row_index - nozzle_row >= 0):
yield target_wells_by_column[
target_column_index + nozzle_column
][target_row_index - nozzle_row]
elif nozzle_map.starting_nozzle == "H12":
if (target_column_index - nozzle_column >= 0) and (
target_row_index - nozzle_row >= 0
):
yield target_wells_by_column[
target_column_index - nozzle_column
][target_row_index - nozzle_row]
else:
raise InvalidProtocolData(
f"A pipette nozzle configuration may not having a starting nozzle of {nozzle_map.starting_nozzle}"
)


def nozzles_per_well(
nozzle_map: NozzleMap, target_well: str, target_wells_by_column: list[list[str]]
) -> int:
_, target_column_index = row_col_ordinals_from_column_major_map(
target_well, target_wells_by_column
)
# labware as or more dense than a 96 plate will only ever have 1 nozzle per well (and some wells won't be touched)
if len(target_wells_by_column) >= len(nozzle_map.columns) and len(
target_wells_by_column[target_column_index]
) >= len(nozzle_map.rows):
return 1
return max(1, len(nozzle_map.columns) // len(target_wells_by_column)) * max(
1, len(nozzle_map.rows) // len(target_wells_by_column[target_column_index])
)
46 changes: 46 additions & 0 deletions api/src/opentrons/protocol_engine/state/geometry.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Geometry state getters."""

import enum
from numpy import array, dot, double as npdouble
from numpy.typing import NDArray
Expand All @@ -8,6 +9,7 @@

from opentrons.types import Point, DeckSlotName, StagingSlotName, MountType

from opentrons_shared_data.errors.exceptions import InvalidStoredData
from opentrons_shared_data.labware.constants import WELL_NAME_PATTERN
from opentrons_shared_data.deck.types import CutoutFixture
from opentrons_shared_data.pipette import PIPETTE_X_SPAN
Expand Down Expand Up @@ -61,6 +63,7 @@
find_volume_at_well_height,
find_height_at_well_volume,
)
from ._well_math import wells_covered_by_pipette_configuration, nozzles_per_well


SLOT_WIDTH = 128
Expand Down Expand Up @@ -1517,3 +1520,46 @@ def validate_dispense_volume_into_well(
raise errors.InvalidDispenseVolumeError(
f"Attempting to dispense {volume}µL of liquid into a well that can only hold {well_volumetric_capacity}µL (well {well_name} in labware_id: {labware_id})"
)

def get_wells_covered_by_pipette_focused_on_well(
self, labware_id: str, focused_on_well_name: str, pipette_id: str
) -> list[str]:
"""Get a flat list of wells that are covered by a pipette when moved to a specified well.
When you move a pipette in a multichannel configuration to a specific well - here called
"focused on" the well, for lack of a better option - the pipette will operate on other wells as well.
For instance, a pipette with a COLUMN configuration that is focused on well A1 of an SBS standard labware
will also "cover", under this definition, wells B1-H1. That same pipette, when focused on well C5, will "cover"
wells C5-H5.
This math only works, and may only be applied, if one of the following is true:
- The pipette is in a SINGLE configuration
- The pipette is in a non-SINGLE configuration, and the labware is an SBS-format 96 or 384 well plate (and is so
marked in its definition's parameters.format key, as 96Standard or 384Standard)
If all of the following do not apply, regardless of the nozzle configuration of the pipette this function will
return only the labware covered by the primary well.
"""
pipette_nozzle_map = self._pipettes.get_nozzle_configuration(pipette_id)
labware_columns = [
column for column in self._labware.get_definition(labware_id).ordering
]
try:
return list(
wells_covered_by_pipette_configuration(
pipette_nozzle_map, focused_on_well_name, labware_columns
)
)
except InvalidStoredData:
return [focused_on_well_name]

def get_nozzles_per_well(
self, labware_id: str, focused_on_well_name: str, pipette_id: str
) -> int:
"""Get the number of nozzles that will interact with each well."""
return nozzles_per_well(
self._pipettes.get_nozzle_configuration(pipette_id),
focused_on_well_name,
self._labware.get_definition(labware_id).ordering,
)
Loading

0 comments on commit 64ba417

Please sign in to comment.