From 64ba417e6b0e3506fad2afcb8117017042b7866b Mon Sep 17 00:00:00 2001 From: Seth Foster Date: Tue, 5 Nov 2024 17:05:22 -0500 Subject: [PATCH] feat(api): track volumes from multichannel configs This PR adds the capability to properly track volume changes made by multichannel pipettes (and partial tip loadings of multichannel pipettes) to the engine. There are two ways in which we need to handle multichannel nozzle configurations specially compared to single-channel configurations. First, and what EXEC-795 is about, is that pipettes with multiple active nozzles will aspirate out of or dispense into multiple wells in an aspirate/dispense/in_place command. Which wells the pipette touches is a matter of projecting the pipette nozzle map out over the layout of the labware and predicting which wells are interacted with. This is itself non-trivial because labware can have many formats. What we can do is make the math work correctly when possible - when the labware is laid out normally enough that we can do projections of this type - and fall back to pretending to be a single channel if we fail. Since we're computing the logical equivalent of actual physical state, and if the labware is irregular it's unlikely that a multiple nozzle layout will physically work with the labware, I think this is safe. Specifically the thing we need to do is generalize the logic used in the tip store to project which tips are picked up by a multichannel to labware of different formats. Our multichannel pipette nozzles are laid out to match SBS 96-well plates, and so that's our "default" labware. On labware that follows SBS patterns but is more dense - a 384 plate, for instance - then we have to subsample, picking a single well in each group of (well_count / 96) that occupies the same space as a 96-well well to interact with. On labware that follows SBS patterns but is less dense - a 12-column reservoir, for instance - then we have to supersample, letting a labware well be touched by multiple nozzles. The second thing we have to deal with is that if the labware is a reservoir or reservoir-like - it has fewer wells than we have nozzles - then the common case is that multiple nozzles are in a well, and in that case if we're keeping track of the volume taken out of or added into a well we have to multiply the operation volume by the number of nozzles per well, which we can get by just dividing sizes without taking into account pattern overlap. Closes EXEC-795 --- .../protocol_engine/commands/aspirate.py | 13 +- .../commands/aspirate_in_place.py | 19 +- .../protocol_engine/commands/dispense.py | 13 +- .../commands/dispense_in_place.py | 19 +- .../protocol_engine/state/_well_math.py | 186 ++++++++++++++++++ .../protocol_engine/state/geometry.py | 46 +++++ .../protocol_engine/state/pipettes.py | 5 + .../opentrons/protocol_engine/state/tips.py | 42 +--- .../protocol_engine/state/update_types.py | 6 +- .../opentrons/protocol_engine/state/wells.py | 28 ++- 10 files changed, 317 insertions(+), 60 deletions(-) create mode 100644 api/src/opentrons/protocol_engine/state/_well_math.py diff --git a/api/src/opentrons/protocol_engine/commands/aspirate.py b/api/src/opentrons/protocol_engine/commands/aspirate.py index b5541c79792..888300b3516 100644 --- a/api/src/opentrons/protocol_engine/commands/aspirate.py +++ b/api/src/opentrons/protocol_engine/commands/aspirate.py @@ -145,7 +145,9 @@ async def execute(self, params: AspirateParams) -> _ExecuteReturn: except PipetteOverpressureError as e: state_update.set_liquid_operated( labware_id=labware_id, - well_name=well_name, + well_names=self._state_view.geometry.get_wells_covered_by_pipette_focused_on_well( + labware_id, well_name, pipette_id + ), volume_added=CLEAR, ) state_update.set_fluid_unknown(pipette_id=params.pipetteId) @@ -167,8 +169,13 @@ async def execute(self, params: AspirateParams) -> _ExecuteReturn: else: state_update.set_liquid_operated( labware_id=labware_id, - well_name=well_name, - volume_added=-volume_aspirated, + well_names=self._state_view.geometry.get_wells_covered_by_pipette_focused_on_well( + labware_id, well_name, pipette_id + ), + volume_added=-volume_aspirated + * self._state_view.geometry.get_nozzles_per_well( + labware_id, well_name, pipette_id + ), ) state_update.set_fluid_aspirated( pipette_id=params.pipetteId, diff --git a/api/src/opentrons/protocol_engine/commands/aspirate_in_place.py b/api/src/opentrons/protocol_engine/commands/aspirate_in_place.py index f25b6c24bbb..14138e793b0 100644 --- a/api/src/opentrons/protocol_engine/commands/aspirate_in_place.py +++ b/api/src/opentrons/protocol_engine/commands/aspirate_in_place.py @@ -112,7 +112,11 @@ async def execute(self, params: AspirateInPlaceParams) -> _ExecuteReturn: ): state_update.set_liquid_operated( labware_id=current_location.labware_id, - well_name=current_location.well_name, + well_names=self._state_view.geometry.get_wells_covered_by_pipette_focused_on_well( + current_location.labware_id, + current_location.well_name, + params.pipetteId, + ), volume_added=CLEAR, ) state_update.set_fluid_unknown(pipette_id=params.pipetteId) @@ -150,8 +154,17 @@ async def execute(self, params: AspirateInPlaceParams) -> _ExecuteReturn: ): state_update.set_liquid_operated( labware_id=current_location.labware_id, - well_name=current_location.well_name, - volume_added=-volume, + well_names=self._state_view.geometry.get_wells_covered_by_pipette_focused_on_well( + current_location.labware_id, + current_location.well_name, + params.pipetteId, + ), + volume_added=-volume + * self._state_view.geometry.get_nozzles_per_well( + current_location.labware_id, + current_location.well_name, + params.pipetteId, + ), ) return SuccessData( diff --git a/api/src/opentrons/protocol_engine/commands/dispense.py b/api/src/opentrons/protocol_engine/commands/dispense.py index 603fa7396a7..0a4a5e85e72 100644 --- a/api/src/opentrons/protocol_engine/commands/dispense.py +++ b/api/src/opentrons/protocol_engine/commands/dispense.py @@ -1,4 +1,5 @@ """Dispense command request, result, and implementation models.""" + from __future__ import annotations from typing import TYPE_CHECKING, Optional, Type, Union from typing_extensions import Literal @@ -109,7 +110,9 @@ async def execute(self, params: DispenseParams) -> _ExecuteReturn: except PipetteOverpressureError as e: state_update.set_liquid_operated( labware_id=labware_id, - well_name=well_name, + well_names=self._state_view.geometry.get_wells_covered_by_pipette_focused_on_well( + labware_id, well_name, params.pipetteId + ), volume_added=CLEAR, ) state_update.set_fluid_unknown(pipette_id=params.pipetteId) @@ -134,9 +137,15 @@ async def execute(self, params: DispenseParams) -> _ExecuteReturn: pipette_id=params.pipetteId, volume=volume ) ) + if volume_added is not None: + volume_added *= self._state_view.geometry.get_nozzles_per_well( + labware_id, well_name, params.pipetteId + ) state_update.set_liquid_operated( labware_id=labware_id, - well_name=well_name, + well_names=self._state_view.geometry.get_wells_covered_by_pipette_focused_on_well( + labware_id, well_name, params.pipetteId + ), volume_added=volume_added if volume_added is not None else CLEAR, ) state_update.set_fluid_ejected(pipette_id=params.pipetteId, volume=volume) diff --git a/api/src/opentrons/protocol_engine/commands/dispense_in_place.py b/api/src/opentrons/protocol_engine/commands/dispense_in_place.py index ee7cae42dc1..7cdf0327500 100644 --- a/api/src/opentrons/protocol_engine/commands/dispense_in_place.py +++ b/api/src/opentrons/protocol_engine/commands/dispense_in_place.py @@ -1,4 +1,5 @@ """Dispense-in-place command request, result, and implementation models.""" + from __future__ import annotations from typing import TYPE_CHECKING, Optional, Type, Union from typing_extensions import Literal @@ -91,7 +92,11 @@ async def execute(self, params: DispenseInPlaceParams) -> _ExecuteReturn: ): state_update.set_liquid_operated( labware_id=current_location.labware_id, - well_name=current_location.well_name, + well_names=self._state_view.geometry.get_wells_covered_by_pipette_focused_on_well( + current_location.labware_id, + current_location.well_name, + params.pipetteId, + ), volume_added=CLEAR, ) state_update.set_fluid_unknown(pipette_id=params.pipetteId) @@ -129,9 +134,19 @@ async def execute(self, params: DispenseInPlaceParams) -> _ExecuteReturn: pipette_id=params.pipetteId, volume=volume ) ) + if volume_added is not None: + volume_added *= self._state_view.geometry.get_nozzles_per_well( + current_location.labware_id, + current_location.well_name, + params.pipetteId, + ) state_update.set_liquid_operated( labware_id=current_location.labware_id, - well_name=current_location.well_name, + well_names=self._state_view.geometry.get_wells_covered_by_pipette_focused_on_well( + current_location.labware_id, + current_location.well_name, + params.pipetteId, + ), volume_added=volume_added if volume_added is not None else CLEAR, ) return SuccessData( diff --git a/api/src/opentrons/protocol_engine/state/_well_math.py b/api/src/opentrons/protocol_engine/state/_well_math.py new file mode 100644 index 00000000000..35595469cf7 --- /dev/null +++ b/api/src/opentrons/protocol_engine/state/_well_math.py @@ -0,0 +1,186 @@ +"""Utilities for doing coverage math on wells.""" + +from typing import Iterator +from typing_extensions import assert_never +from opentrons_shared_data.errors.exceptions import ( + InvalidStoredData, + InvalidProtocolData, +) + +from opentrons.hardware_control.nozzle_manager import NozzleMap + + +def wells_covered_by_pipette_configuration( + nozzle_map: NozzleMap, + target_well: str, + labware_wells_by_column: list[list[str]], +) -> Iterator[str]: + """Compute the wells covered by a pipette nozzle configuration.""" + if len(labware_wells_by_column) >= 12 and len(labware_wells_by_column[0]) >= 8: + yield from wells_covered_dense( + nozzle_map, + target_well, + labware_wells_by_column, + ) + elif len(labware_wells_by_column) < 12 and len(labware_wells_by_column[0]) < 8: + yield from wells_covered_sparse( + nozzle_map, target_well, labware_wells_by_column + ) + else: + raise InvalidStoredData( + "Labware of non-SBS and non-reservoir format cannot be handled" + ) + + +def row_col_ordinals_from_column_major_map( + target_well: str, column_major_wells: list[list[str]] +) -> tuple[int, int]: + """Turn a well name into the index of its row and column (in that order) within the labware.""" + for column_index, column in enumerate(column_major_wells): + if target_well in column: + return column.index(target_well), column_index + raise InvalidStoredData(f"Well name {target_well} is not present in labware") + + +def wells_covered_dense( + nozzle_map: NozzleMap, target_well: str, target_wells_by_column: list[list[str]] +) -> Iterator[str]: + """Get the list of wells covered by a nozzle map on an SBS format labware with a specified multiplier of 96 into the number of wells. + + This will handle the offsetting of the nozzle map into higher-density well plates. For instance, a full column config target at A1 of a + 96 plate would cover wells A1, B1, C1, D1, E1, F1, G1, H1, and use downsample_factor 1.0 (96*1 = 96). A full column config target on a + 384 plate would cover wells A1, C1, E1, G1, I1, K1, M1, O1 and use downsample_factor 4.0 (96*4 = 384), while a full column config + targeting B1 would cover wells B1, D1, F1, H1, J1, L1, N1, P1 - still using downsample_factor 4.0, with the offset gathered from the + target well. + + The function may also handle sub-96 regular labware with fractional downsample factors, but that's physically improbable and it's not + tested. If you have a regular labware with fewer than 96 wells that is still regularly-spaced and has little enough space between well + walls that it's reasonable to use with multiple channels, you probably want wells_covered_trough. + """ + target_row_index, target_column_index = row_col_ordinals_from_column_major_map( + target_well, target_wells_by_column + ) + column_downsample = len(target_wells_by_column) // 12 + row_downsample = len(target_wells_by_column[0]) // 8 + if column_downsample < 1 or row_downsample < 1: + raise InvalidStoredData( + "This labware cannot be used wells_covered_dense because it is less dense than an SBS 96 standard" + ) + + for nozzle_column in range(len(nozzle_map.columns)): + target_column_offset = nozzle_column * column_downsample + for nozzle_row in range(len(nozzle_map.rows)): + target_row_offset = nozzle_row * row_downsample + if nozzle_map.starting_nozzle == "A1": + if ( + target_column_index + target_column_offset + < len(target_wells_by_column) + ) and ( + target_row_index + target_row_offset + < len(target_wells_by_column[target_column_index]) + ): + yield target_wells_by_column[ + target_column_index + target_column_offset + ][target_row_index + target_row_offset] + elif nozzle_map.starting_nozzle == "A12": + if (target_column_index - target_column_offset >= 0) and ( + target_row_index + target_row_offset + < len(target_wells_by_column[target_column_index]) + ): + yield target_wells_by_column[ + target_column_index - target_column_offset + ][target_row_index + target_row_offset] + elif nozzle_map.starting_nozzle == "H1": + if ( + target_column_index + target_column_offset + < len(target_wells_by_column) + ) and (target_row_index - target_row_offset >= 0): + yield target_wells_by_column[ + target_column_index + target_column_offset + ][target_row_index - target_row_offset] + elif nozzle_map.starting_nozzle == "H12": + if (target_column_index - target_column_offset >= 0) and ( + target_row_index - target_row_offset >= 0 + ): + yield target_wells_by_column[ + target_column_index - target_column_offset + ][target_row_index - target_row_offset] + else: + raise InvalidProtocolData( + f"A pipette nozzle configuration may not having a starting nozzle of {nozzle_map.starting_nozzle}" + ) + + +def wells_covered_sparse( + nozzle_map: NozzleMap, target_well: str, target_wells_by_column: list[list[str]] +) -> Iterator[str]: + """Get the list of wells covered by a nozzle map on a column-oriented reservoir. + + This function handles reservoirs whose wells span multiple rows and columns - the most common case is something like a + 12-well reservoir, whose wells are the height of an SBS column and the width of an SBS row, or a 1-well reservoir whose well + is the size of an SBS active area. + """ + target_row_index, target_column_index = row_col_ordinals_from_column_major_map( + target_well, target_wells_by_column + ) + column_upsample = 12 // len(target_wells_by_column) + row_upsample = 8 // len(target_wells_by_column[0]) + if column_upsample < 1 or row_upsample < 1: + raise InvalidStoredData( + "This labware cannot be uased with wells_covered_sparse because it is more dense than an SBS 96 standard." + ) + for nozzle_column in range(max(1, len(nozzle_map.columns) // column_upsample)): + for nozzle_row in range(max(1, len(nozzle_map.rows) // row_upsample)): + if nozzle_map.starting_nozzle == "A1": + if ( + target_column_index + nozzle_column < len(target_wells_by_column) + ) and ( + target_row_index + nozzle_row + < len(target_wells_by_column[target_column_index]) + ): + yield target_wells_by_column[target_column_index + nozzle_column][ + target_row_index + nozzle_row + ] + elif nozzle_map.starting_nozzle == "A12": + if (target_column_index - nozzle_column >= 0) and ( + target_row_index + nozzle_row + < len(target_wells_by_column[target_column_index]) + ): + yield target_wells_by_column[ + target_column_index - nozzle_column + ][target_row_index + nozzle_row] + elif nozzle_map.starting_nozzle == "H1": + if ( + target_column_index + nozzle_column + < len(target_wells_by_column[target_column_index]) + ) and (target_row_index - nozzle_row >= 0): + yield target_wells_by_column[ + target_column_index + nozzle_column + ][target_row_index - nozzle_row] + elif nozzle_map.starting_nozzle == "H12": + if (target_column_index - nozzle_column >= 0) and ( + target_row_index - nozzle_row >= 0 + ): + yield target_wells_by_column[ + target_column_index - nozzle_column + ][target_row_index - nozzle_row] + else: + raise InvalidProtocolData( + f"A pipette nozzle configuration may not having a starting nozzle of {nozzle_map.starting_nozzle}" + ) + + +def nozzles_per_well( + nozzle_map: NozzleMap, target_well: str, target_wells_by_column: list[list[str]] +) -> int: + _, target_column_index = row_col_ordinals_from_column_major_map( + target_well, target_wells_by_column + ) + # labware as or more dense than a 96 plate will only ever have 1 nozzle per well (and some wells won't be touched) + if len(target_wells_by_column) >= len(nozzle_map.columns) and len( + target_wells_by_column[target_column_index] + ) >= len(nozzle_map.rows): + return 1 + return max(1, len(nozzle_map.columns) // len(target_wells_by_column)) * max( + 1, len(nozzle_map.rows) // len(target_wells_by_column[target_column_index]) + ) diff --git a/api/src/opentrons/protocol_engine/state/geometry.py b/api/src/opentrons/protocol_engine/state/geometry.py index 471065adcc2..3ded6d19d45 100644 --- a/api/src/opentrons/protocol_engine/state/geometry.py +++ b/api/src/opentrons/protocol_engine/state/geometry.py @@ -1,4 +1,5 @@ """Geometry state getters.""" + import enum from numpy import array, dot, double as npdouble from numpy.typing import NDArray @@ -8,6 +9,7 @@ from opentrons.types import Point, DeckSlotName, StagingSlotName, MountType +from opentrons_shared_data.errors.exceptions import InvalidStoredData from opentrons_shared_data.labware.constants import WELL_NAME_PATTERN from opentrons_shared_data.deck.types import CutoutFixture from opentrons_shared_data.pipette import PIPETTE_X_SPAN @@ -61,6 +63,7 @@ find_volume_at_well_height, find_height_at_well_volume, ) +from ._well_math import wells_covered_by_pipette_configuration, nozzles_per_well SLOT_WIDTH = 128 @@ -1517,3 +1520,46 @@ def validate_dispense_volume_into_well( raise errors.InvalidDispenseVolumeError( f"Attempting to dispense {volume}µL of liquid into a well that can only hold {well_volumetric_capacity}µL (well {well_name} in labware_id: {labware_id})" ) + + def get_wells_covered_by_pipette_focused_on_well( + self, labware_id: str, focused_on_well_name: str, pipette_id: str + ) -> list[str]: + """Get a flat list of wells that are covered by a pipette when moved to a specified well. + + When you move a pipette in a multichannel configuration to a specific well - here called + "focused on" the well, for lack of a better option - the pipette will operate on other wells as well. + + For instance, a pipette with a COLUMN configuration that is focused on well A1 of an SBS standard labware + will also "cover", under this definition, wells B1-H1. That same pipette, when focused on well C5, will "cover" + wells C5-H5. + + This math only works, and may only be applied, if one of the following is true: + - The pipette is in a SINGLE configuration + - The pipette is in a non-SINGLE configuration, and the labware is an SBS-format 96 or 384 well plate (and is so + marked in its definition's parameters.format key, as 96Standard or 384Standard) + + If all of the following do not apply, regardless of the nozzle configuration of the pipette this function will + return only the labware covered by the primary well. + """ + pipette_nozzle_map = self._pipettes.get_nozzle_configuration(pipette_id) + labware_columns = [ + column for column in self._labware.get_definition(labware_id).ordering + ] + try: + return list( + wells_covered_by_pipette_configuration( + pipette_nozzle_map, focused_on_well_name, labware_columns + ) + ) + except InvalidStoredData: + return [focused_on_well_name] + + def get_nozzles_per_well( + self, labware_id: str, focused_on_well_name: str, pipette_id: str + ) -> int: + """Get the number of nozzles that will interact with each well.""" + return nozzles_per_well( + self._pipettes.get_nozzle_configuration(pipette_id), + focused_on_well_name, + self._labware.get_definition(labware_id).ordering, + ) diff --git a/api/src/opentrons/protocol_engine/state/pipettes.py b/api/src/opentrons/protocol_engine/state/pipettes.py index 8277204a4be..9f90a89ffa7 100644 --- a/api/src/opentrons/protocol_engine/state/pipettes.py +++ b/api/src/opentrons/protocol_engine/state/pipettes.py @@ -14,6 +14,7 @@ from typing_extensions import assert_never from opentrons_shared_data.pipette import pipette_definition +from opentrons_shared_data.labware.utils import well_ordinals_from_well_name from opentrons.config.defaults_ot2 import Z_RETRACT_DISTANCE from opentrons.hardware_control.dev_types import PipetteDict from opentrons.hardware_control import CriticalPoint @@ -667,6 +668,10 @@ def get_primary_nozzle(self, pipette_id: str) -> str: nozzle_map = self._state.nozzle_configuration_by_id[pipette_id] return nozzle_map.starting_nozzle + def get_nozzle_configuration(self, pipette_id: str) -> NozzleMap: + """Get the nozzle map of the pipette.""" + return self._state.nozzle_configuration_by_id[pipette_id] + def _get_critical_point_offset_without_tip( self, pipette_id: str, critical_point: Optional[CriticalPoint] ) -> Point: diff --git a/api/src/opentrons/protocol_engine/state/tips.py b/api/src/opentrons/protocol_engine/state/tips.py index 1ac3e91f795..507ad67bab4 100644 --- a/api/src/opentrons/protocol_engine/state/tips.py +++ b/api/src/opentrons/protocol_engine/state/tips.py @@ -6,6 +6,7 @@ from opentrons.protocol_engine.state import update_types from ._abstract_store import HasState, HandlesActions +from ._well_math import wells_covered_dense from ..actions import Action, ResetTipsAction, get_state_updates from opentrons.hardware_control.nozzle_manager import NozzleMap @@ -108,46 +109,12 @@ def _handle_state_update(self, state_update: update_types.StateUpdate) -> None: column for column in definition.ordering ] - def _set_used_tips( # noqa: C901 - self, pipette_id: str, well_name: str, labware_id: str - ) -> None: + def _set_used_tips(self, pipette_id: str, well_name: str, labware_id: str) -> None: columns = self._state.column_by_labware_id.get(labware_id, []) wells = self._state.tips_by_labware_id.get(labware_id, {}) nozzle_map = self._state.pipette_info_by_pipette_id[pipette_id].nozzle_map - - # TODO (cb, 02-28-2024): Transition from using partial nozzle map to full instrument map for the set used logic - num_nozzle_cols = len(nozzle_map.columns) - num_nozzle_rows = len(nozzle_map.rows) - - critical_column = 0 - critical_row = 0 - for column in columns: - if well_name in column: - critical_row = column.index(well_name) - critical_column = columns.index(column) - - for i in range(num_nozzle_cols): - for j in range(num_nozzle_rows): - if nozzle_map.starting_nozzle == "A1": - if (critical_column + i < len(columns)) and ( - critical_row + j < len(columns[critical_column]) - ): - well = columns[critical_column + i][critical_row + j] - wells[well] = TipRackWellState.USED - elif nozzle_map.starting_nozzle == "A12": - if (critical_column - i >= 0) and ( - critical_row + j < len(columns[critical_column]) - ): - well = columns[critical_column - i][critical_row + j] - wells[well] = TipRackWellState.USED - elif nozzle_map.starting_nozzle == "H1": - if (critical_column + i < len(columns)) and (critical_row - j >= 0): - well = columns[critical_column + i][critical_row - j] - wells[well] = TipRackWellState.USED - elif nozzle_map.starting_nozzle == "H12": - if (critical_column - i >= 0) and (critical_row - j >= 0): - well = columns[critical_column - i][critical_row - j] - wells[well] = TipRackWellState.USED + for well in wells_covered_dense(nozzle_map, well_name, columns): + wells[well] = TipRackWellState.USED class TipView(HasState[TipState]): @@ -174,6 +141,7 @@ def get_next_tip( # noqa: C901 wells = self._state.tips_by_labware_id.get(labware_id, {}) columns = self._state.column_by_labware_id.get(labware_id, []) + # TODO(sf): I'm pretty sure this can be replaced with wells_covered_96 but I'm not quite sure how def _identify_tip_cluster( active_columns: int, active_rows: int, diff --git a/api/src/opentrons/protocol_engine/state/update_types.py b/api/src/opentrons/protocol_engine/state/update_types.py index 4487a503173..fa1febe1d2f 100644 --- a/api/src/opentrons/protocol_engine/state/update_types.py +++ b/api/src/opentrons/protocol_engine/state/update_types.py @@ -206,7 +206,7 @@ class LiquidOperatedUpdate: """An update from operating a liquid.""" labware_id: str - well_name: str + well_names: list[str] volume_added: float | ClearType @@ -441,12 +441,12 @@ def set_liquid_probed( ) def set_liquid_operated( - self, labware_id: str, well_name: str, volume_added: float | ClearType + self, labware_id: str, well_names: list[str], volume_added: float | ClearType ) -> None: """Update liquid volumes in well state. See `OperateLiquidUpdate`.""" self.liquid_operated = LiquidOperatedUpdate( labware_id=labware_id, - well_name=well_name, + well_names=well_names, volume_added=volume_added, ) diff --git a/api/src/opentrons/protocol_engine/state/wells.py b/api/src/opentrons/protocol_engine/state/wells.py index 5b4d3bb8d77..2791346de5c 100644 --- a/api/src/opentrons/protocol_engine/state/wells.py +++ b/api/src/opentrons/protocol_engine/state/wells.py @@ -1,4 +1,5 @@ """Basic well data state and store.""" + from dataclasses import dataclass from typing import Dict, List, Union, Iterator, Optional, Tuple, overload, TypeVar @@ -54,7 +55,7 @@ def _handle_liquid_loaded_update( labware_id = state_update.labware_id if labware_id not in self._state.loaded_volumes: self._state.loaded_volumes[labware_id] = {} - for (well, volume) in state_update.volumes.items(): + for well, volume in state_update.volumes.items(): self._state.loaded_volumes[labware_id][well] = LoadedVolumeInfo( volume=_none_from_clear(volume), last_loaded=state_update.last_loaded, @@ -83,19 +84,28 @@ def _handle_liquid_probed_update( def _handle_liquid_operated_update( self, state_update: update_types.LiquidOperatedUpdate ) -> None: - labware_id = state_update.labware_id - well_name = state_update.well_name + for well_name in state_update.well_names: + self._handle_well_operated( + state_update.labware_id, well_name, state_update.volume_added + ) + + def _handle_well_operated( + self, + labware_id: str, + well_name: str, + volume_added: float | update_types.ClearType, + ) -> None: if ( labware_id in self._state.loaded_volumes and well_name in self._state.loaded_volumes[labware_id] ): - if state_update.volume_added is update_types.CLEAR: + if volume_added is update_types.CLEAR: del self._state.loaded_volumes[labware_id][well_name] else: prev_loaded_vol_info = self._state.loaded_volumes[labware_id][well_name] assert prev_loaded_vol_info.volume is not None self._state.loaded_volumes[labware_id][well_name] = LoadedVolumeInfo( - volume=prev_loaded_vol_info.volume + state_update.volume_added, + volume=prev_loaded_vol_info.volume + volume_added, last_loaded=prev_loaded_vol_info.last_loaded, operations_since_load=prev_loaded_vol_info.operations_since_load + 1, @@ -109,16 +119,14 @@ def _handle_liquid_operated_update( labware_id in self._state.probed_volumes and well_name in self._state.probed_volumes[labware_id] ): - if state_update.volume_added is update_types.CLEAR: + if volume_added is update_types.CLEAR: del self._state.probed_volumes[labware_id][well_name] else: prev_probed_vol_info = self._state.probed_volumes[labware_id][well_name] if prev_probed_vol_info.volume is None: new_vol_info: float | None = None else: - new_vol_info = ( - prev_probed_vol_info.volume + state_update.volume_added - ) + new_vol_info = prev_probed_vol_info.volume + volume_added self._state.probed_volumes[labware_id][well_name] = ProbedVolumeInfo( volume=new_vol_info, last_probed=prev_probed_vol_info.last_probed, @@ -214,7 +222,7 @@ def _volume_from_info(info: Optional[LoadedVolumeInfo]) -> Optional[float]: def _volume_from_info( - info: Union[ProbedVolumeInfo, LoadedVolumeInfo, None] + info: Union[ProbedVolumeInfo, LoadedVolumeInfo, None], ) -> Optional[float]: if info is None: return None