diff --git a/api/src/opentrons/protocol_engine/commands/aspirate.py b/api/src/opentrons/protocol_engine/commands/aspirate.py index b5541c79792..888300b3516 100644 --- a/api/src/opentrons/protocol_engine/commands/aspirate.py +++ b/api/src/opentrons/protocol_engine/commands/aspirate.py @@ -145,7 +145,9 @@ async def execute(self, params: AspirateParams) -> _ExecuteReturn: except PipetteOverpressureError as e: state_update.set_liquid_operated( labware_id=labware_id, - well_name=well_name, + well_names=self._state_view.geometry.get_wells_covered_by_pipette_focused_on_well( + labware_id, well_name, pipette_id + ), volume_added=CLEAR, ) state_update.set_fluid_unknown(pipette_id=params.pipetteId) @@ -167,8 +169,13 @@ async def execute(self, params: AspirateParams) -> _ExecuteReturn: else: state_update.set_liquid_operated( labware_id=labware_id, - well_name=well_name, - volume_added=-volume_aspirated, + well_names=self._state_view.geometry.get_wells_covered_by_pipette_focused_on_well( + labware_id, well_name, pipette_id + ), + volume_added=-volume_aspirated + * self._state_view.geometry.get_nozzles_per_well( + labware_id, well_name, pipette_id + ), ) state_update.set_fluid_aspirated( pipette_id=params.pipetteId, diff --git a/api/src/opentrons/protocol_engine/commands/aspirate_in_place.py b/api/src/opentrons/protocol_engine/commands/aspirate_in_place.py index f25b6c24bbb..14138e793b0 100644 --- a/api/src/opentrons/protocol_engine/commands/aspirate_in_place.py +++ b/api/src/opentrons/protocol_engine/commands/aspirate_in_place.py @@ -112,7 +112,11 @@ async def execute(self, params: AspirateInPlaceParams) -> _ExecuteReturn: ): state_update.set_liquid_operated( labware_id=current_location.labware_id, - well_name=current_location.well_name, + well_names=self._state_view.geometry.get_wells_covered_by_pipette_focused_on_well( + current_location.labware_id, + current_location.well_name, + params.pipetteId, + ), volume_added=CLEAR, ) state_update.set_fluid_unknown(pipette_id=params.pipetteId) @@ -150,8 +154,17 @@ async def execute(self, params: AspirateInPlaceParams) -> _ExecuteReturn: ): state_update.set_liquid_operated( labware_id=current_location.labware_id, - well_name=current_location.well_name, - volume_added=-volume, + well_names=self._state_view.geometry.get_wells_covered_by_pipette_focused_on_well( + current_location.labware_id, + current_location.well_name, + params.pipetteId, + ), + volume_added=-volume + * self._state_view.geometry.get_nozzles_per_well( + current_location.labware_id, + current_location.well_name, + params.pipetteId, + ), ) return SuccessData( diff --git a/api/src/opentrons/protocol_engine/commands/dispense.py b/api/src/opentrons/protocol_engine/commands/dispense.py index 603fa7396a7..0a4a5e85e72 100644 --- a/api/src/opentrons/protocol_engine/commands/dispense.py +++ b/api/src/opentrons/protocol_engine/commands/dispense.py @@ -1,4 +1,5 @@ """Dispense command request, result, and implementation models.""" + from __future__ import annotations from typing import TYPE_CHECKING, Optional, Type, Union from typing_extensions import Literal @@ -109,7 +110,9 @@ async def execute(self, params: DispenseParams) -> _ExecuteReturn: except PipetteOverpressureError as e: state_update.set_liquid_operated( labware_id=labware_id, - well_name=well_name, + well_names=self._state_view.geometry.get_wells_covered_by_pipette_focused_on_well( + labware_id, well_name, params.pipetteId + ), volume_added=CLEAR, ) state_update.set_fluid_unknown(pipette_id=params.pipetteId) @@ -134,9 +137,15 @@ async def execute(self, params: DispenseParams) -> _ExecuteReturn: pipette_id=params.pipetteId, volume=volume ) ) + if volume_added is not None: + volume_added *= self._state_view.geometry.get_nozzles_per_well( + labware_id, well_name, params.pipetteId + ) state_update.set_liquid_operated( labware_id=labware_id, - well_name=well_name, + well_names=self._state_view.geometry.get_wells_covered_by_pipette_focused_on_well( + labware_id, well_name, params.pipetteId + ), volume_added=volume_added if volume_added is not None else CLEAR, ) state_update.set_fluid_ejected(pipette_id=params.pipetteId, volume=volume) diff --git a/api/src/opentrons/protocol_engine/commands/dispense_in_place.py b/api/src/opentrons/protocol_engine/commands/dispense_in_place.py index ee7cae42dc1..7cdf0327500 100644 --- a/api/src/opentrons/protocol_engine/commands/dispense_in_place.py +++ b/api/src/opentrons/protocol_engine/commands/dispense_in_place.py @@ -1,4 +1,5 @@ """Dispense-in-place command request, result, and implementation models.""" + from __future__ import annotations from typing import TYPE_CHECKING, Optional, Type, Union from typing_extensions import Literal @@ -91,7 +92,11 @@ async def execute(self, params: DispenseInPlaceParams) -> _ExecuteReturn: ): state_update.set_liquid_operated( labware_id=current_location.labware_id, - well_name=current_location.well_name, + well_names=self._state_view.geometry.get_wells_covered_by_pipette_focused_on_well( + current_location.labware_id, + current_location.well_name, + params.pipetteId, + ), volume_added=CLEAR, ) state_update.set_fluid_unknown(pipette_id=params.pipetteId) @@ -129,9 +134,19 @@ async def execute(self, params: DispenseInPlaceParams) -> _ExecuteReturn: pipette_id=params.pipetteId, volume=volume ) ) + if volume_added is not None: + volume_added *= self._state_view.geometry.get_nozzles_per_well( + current_location.labware_id, + current_location.well_name, + params.pipetteId, + ) state_update.set_liquid_operated( labware_id=current_location.labware_id, - well_name=current_location.well_name, + well_names=self._state_view.geometry.get_wells_covered_by_pipette_focused_on_well( + current_location.labware_id, + current_location.well_name, + params.pipetteId, + ), volume_added=volume_added if volume_added is not None else CLEAR, ) return SuccessData( diff --git a/api/src/opentrons/protocol_engine/state/_well_math.py b/api/src/opentrons/protocol_engine/state/_well_math.py new file mode 100644 index 00000000000..35595469cf7 --- /dev/null +++ b/api/src/opentrons/protocol_engine/state/_well_math.py @@ -0,0 +1,186 @@ +"""Utilities for doing coverage math on wells.""" + +from typing import Iterator +from typing_extensions import assert_never +from opentrons_shared_data.errors.exceptions import ( + InvalidStoredData, + InvalidProtocolData, +) + +from opentrons.hardware_control.nozzle_manager import NozzleMap + + +def wells_covered_by_pipette_configuration( + nozzle_map: NozzleMap, + target_well: str, + labware_wells_by_column: list[list[str]], +) -> Iterator[str]: + """Compute the wells covered by a pipette nozzle configuration.""" + if len(labware_wells_by_column) >= 12 and len(labware_wells_by_column[0]) >= 8: + yield from wells_covered_dense( + nozzle_map, + target_well, + labware_wells_by_column, + ) + elif len(labware_wells_by_column) < 12 and len(labware_wells_by_column[0]) < 8: + yield from wells_covered_sparse( + nozzle_map, target_well, labware_wells_by_column + ) + else: + raise InvalidStoredData( + "Labware of non-SBS and non-reservoir format cannot be handled" + ) + + +def row_col_ordinals_from_column_major_map( + target_well: str, column_major_wells: list[list[str]] +) -> tuple[int, int]: + """Turn a well name into the index of its row and column (in that order) within the labware.""" + for column_index, column in enumerate(column_major_wells): + if target_well in column: + return column.index(target_well), column_index + raise InvalidStoredData(f"Well name {target_well} is not present in labware") + + +def wells_covered_dense( + nozzle_map: NozzleMap, target_well: str, target_wells_by_column: list[list[str]] +) -> Iterator[str]: + """Get the list of wells covered by a nozzle map on an SBS format labware with a specified multiplier of 96 into the number of wells. + + This will handle the offsetting of the nozzle map into higher-density well plates. For instance, a full column config target at A1 of a + 96 plate would cover wells A1, B1, C1, D1, E1, F1, G1, H1, and use downsample_factor 1.0 (96*1 = 96). A full column config target on a + 384 plate would cover wells A1, C1, E1, G1, I1, K1, M1, O1 and use downsample_factor 4.0 (96*4 = 384), while a full column config + targeting B1 would cover wells B1, D1, F1, H1, J1, L1, N1, P1 - still using downsample_factor 4.0, with the offset gathered from the + target well. + + The function may also handle sub-96 regular labware with fractional downsample factors, but that's physically improbable and it's not + tested. If you have a regular labware with fewer than 96 wells that is still regularly-spaced and has little enough space between well + walls that it's reasonable to use with multiple channels, you probably want wells_covered_trough. + """ + target_row_index, target_column_index = row_col_ordinals_from_column_major_map( + target_well, target_wells_by_column + ) + column_downsample = len(target_wells_by_column) // 12 + row_downsample = len(target_wells_by_column[0]) // 8 + if column_downsample < 1 or row_downsample < 1: + raise InvalidStoredData( + "This labware cannot be used wells_covered_dense because it is less dense than an SBS 96 standard" + ) + + for nozzle_column in range(len(nozzle_map.columns)): + target_column_offset = nozzle_column * column_downsample + for nozzle_row in range(len(nozzle_map.rows)): + target_row_offset = nozzle_row * row_downsample + if nozzle_map.starting_nozzle == "A1": + if ( + target_column_index + target_column_offset + < len(target_wells_by_column) + ) and ( + target_row_index + target_row_offset + < len(target_wells_by_column[target_column_index]) + ): + yield target_wells_by_column[ + target_column_index + target_column_offset + ][target_row_index + target_row_offset] + elif nozzle_map.starting_nozzle == "A12": + if (target_column_index - target_column_offset >= 0) and ( + target_row_index + target_row_offset + < len(target_wells_by_column[target_column_index]) + ): + yield target_wells_by_column[ + target_column_index - target_column_offset + ][target_row_index + target_row_offset] + elif nozzle_map.starting_nozzle == "H1": + if ( + target_column_index + target_column_offset + < len(target_wells_by_column) + ) and (target_row_index - target_row_offset >= 0): + yield target_wells_by_column[ + target_column_index + target_column_offset + ][target_row_index - target_row_offset] + elif nozzle_map.starting_nozzle == "H12": + if (target_column_index - target_column_offset >= 0) and ( + target_row_index - target_row_offset >= 0 + ): + yield target_wells_by_column[ + target_column_index - target_column_offset + ][target_row_index - target_row_offset] + else: + raise InvalidProtocolData( + f"A pipette nozzle configuration may not having a starting nozzle of {nozzle_map.starting_nozzle}" + ) + + +def wells_covered_sparse( + nozzle_map: NozzleMap, target_well: str, target_wells_by_column: list[list[str]] +) -> Iterator[str]: + """Get the list of wells covered by a nozzle map on a column-oriented reservoir. + + This function handles reservoirs whose wells span multiple rows and columns - the most common case is something like a + 12-well reservoir, whose wells are the height of an SBS column and the width of an SBS row, or a 1-well reservoir whose well + is the size of an SBS active area. + """ + target_row_index, target_column_index = row_col_ordinals_from_column_major_map( + target_well, target_wells_by_column + ) + column_upsample = 12 // len(target_wells_by_column) + row_upsample = 8 // len(target_wells_by_column[0]) + if column_upsample < 1 or row_upsample < 1: + raise InvalidStoredData( + "This labware cannot be uased with wells_covered_sparse because it is more dense than an SBS 96 standard." + ) + for nozzle_column in range(max(1, len(nozzle_map.columns) // column_upsample)): + for nozzle_row in range(max(1, len(nozzle_map.rows) // row_upsample)): + if nozzle_map.starting_nozzle == "A1": + if ( + target_column_index + nozzle_column < len(target_wells_by_column) + ) and ( + target_row_index + nozzle_row + < len(target_wells_by_column[target_column_index]) + ): + yield target_wells_by_column[target_column_index + nozzle_column][ + target_row_index + nozzle_row + ] + elif nozzle_map.starting_nozzle == "A12": + if (target_column_index - nozzle_column >= 0) and ( + target_row_index + nozzle_row + < len(target_wells_by_column[target_column_index]) + ): + yield target_wells_by_column[ + target_column_index - nozzle_column + ][target_row_index + nozzle_row] + elif nozzle_map.starting_nozzle == "H1": + if ( + target_column_index + nozzle_column + < len(target_wells_by_column[target_column_index]) + ) and (target_row_index - nozzle_row >= 0): + yield target_wells_by_column[ + target_column_index + nozzle_column + ][target_row_index - nozzle_row] + elif nozzle_map.starting_nozzle == "H12": + if (target_column_index - nozzle_column >= 0) and ( + target_row_index - nozzle_row >= 0 + ): + yield target_wells_by_column[ + target_column_index - nozzle_column + ][target_row_index - nozzle_row] + else: + raise InvalidProtocolData( + f"A pipette nozzle configuration may not having a starting nozzle of {nozzle_map.starting_nozzle}" + ) + + +def nozzles_per_well( + nozzle_map: NozzleMap, target_well: str, target_wells_by_column: list[list[str]] +) -> int: + _, target_column_index = row_col_ordinals_from_column_major_map( + target_well, target_wells_by_column + ) + # labware as or more dense than a 96 plate will only ever have 1 nozzle per well (and some wells won't be touched) + if len(target_wells_by_column) >= len(nozzle_map.columns) and len( + target_wells_by_column[target_column_index] + ) >= len(nozzle_map.rows): + return 1 + return max(1, len(nozzle_map.columns) // len(target_wells_by_column)) * max( + 1, len(nozzle_map.rows) // len(target_wells_by_column[target_column_index]) + ) diff --git a/api/src/opentrons/protocol_engine/state/geometry.py b/api/src/opentrons/protocol_engine/state/geometry.py index 471065adcc2..3ded6d19d45 100644 --- a/api/src/opentrons/protocol_engine/state/geometry.py +++ b/api/src/opentrons/protocol_engine/state/geometry.py @@ -1,4 +1,5 @@ """Geometry state getters.""" + import enum from numpy import array, dot, double as npdouble from numpy.typing import NDArray @@ -8,6 +9,7 @@ from opentrons.types import Point, DeckSlotName, StagingSlotName, MountType +from opentrons_shared_data.errors.exceptions import InvalidStoredData from opentrons_shared_data.labware.constants import WELL_NAME_PATTERN from opentrons_shared_data.deck.types import CutoutFixture from opentrons_shared_data.pipette import PIPETTE_X_SPAN @@ -61,6 +63,7 @@ find_volume_at_well_height, find_height_at_well_volume, ) +from ._well_math import wells_covered_by_pipette_configuration, nozzles_per_well SLOT_WIDTH = 128 @@ -1517,3 +1520,46 @@ def validate_dispense_volume_into_well( raise errors.InvalidDispenseVolumeError( f"Attempting to dispense {volume}µL of liquid into a well that can only hold {well_volumetric_capacity}µL (well {well_name} in labware_id: {labware_id})" ) + + def get_wells_covered_by_pipette_focused_on_well( + self, labware_id: str, focused_on_well_name: str, pipette_id: str + ) -> list[str]: + """Get a flat list of wells that are covered by a pipette when moved to a specified well. + + When you move a pipette in a multichannel configuration to a specific well - here called + "focused on" the well, for lack of a better option - the pipette will operate on other wells as well. + + For instance, a pipette with a COLUMN configuration that is focused on well A1 of an SBS standard labware + will also "cover", under this definition, wells B1-H1. That same pipette, when focused on well C5, will "cover" + wells C5-H5. + + This math only works, and may only be applied, if one of the following is true: + - The pipette is in a SINGLE configuration + - The pipette is in a non-SINGLE configuration, and the labware is an SBS-format 96 or 384 well plate (and is so + marked in its definition's parameters.format key, as 96Standard or 384Standard) + + If all of the following do not apply, regardless of the nozzle configuration of the pipette this function will + return only the labware covered by the primary well. + """ + pipette_nozzle_map = self._pipettes.get_nozzle_configuration(pipette_id) + labware_columns = [ + column for column in self._labware.get_definition(labware_id).ordering + ] + try: + return list( + wells_covered_by_pipette_configuration( + pipette_nozzle_map, focused_on_well_name, labware_columns + ) + ) + except InvalidStoredData: + return [focused_on_well_name] + + def get_nozzles_per_well( + self, labware_id: str, focused_on_well_name: str, pipette_id: str + ) -> int: + """Get the number of nozzles that will interact with each well.""" + return nozzles_per_well( + self._pipettes.get_nozzle_configuration(pipette_id), + focused_on_well_name, + self._labware.get_definition(labware_id).ordering, + ) diff --git a/api/src/opentrons/protocol_engine/state/pipettes.py b/api/src/opentrons/protocol_engine/state/pipettes.py index 8277204a4be..9f90a89ffa7 100644 --- a/api/src/opentrons/protocol_engine/state/pipettes.py +++ b/api/src/opentrons/protocol_engine/state/pipettes.py @@ -14,6 +14,7 @@ from typing_extensions import assert_never from opentrons_shared_data.pipette import pipette_definition +from opentrons_shared_data.labware.utils import well_ordinals_from_well_name from opentrons.config.defaults_ot2 import Z_RETRACT_DISTANCE from opentrons.hardware_control.dev_types import PipetteDict from opentrons.hardware_control import CriticalPoint @@ -667,6 +668,10 @@ def get_primary_nozzle(self, pipette_id: str) -> str: nozzle_map = self._state.nozzle_configuration_by_id[pipette_id] return nozzle_map.starting_nozzle + def get_nozzle_configuration(self, pipette_id: str) -> NozzleMap: + """Get the nozzle map of the pipette.""" + return self._state.nozzle_configuration_by_id[pipette_id] + def _get_critical_point_offset_without_tip( self, pipette_id: str, critical_point: Optional[CriticalPoint] ) -> Point: diff --git a/api/src/opentrons/protocol_engine/state/tips.py b/api/src/opentrons/protocol_engine/state/tips.py index 1ac3e91f795..507ad67bab4 100644 --- a/api/src/opentrons/protocol_engine/state/tips.py +++ b/api/src/opentrons/protocol_engine/state/tips.py @@ -6,6 +6,7 @@ from opentrons.protocol_engine.state import update_types from ._abstract_store import HasState, HandlesActions +from ._well_math import wells_covered_dense from ..actions import Action, ResetTipsAction, get_state_updates from opentrons.hardware_control.nozzle_manager import NozzleMap @@ -108,46 +109,12 @@ def _handle_state_update(self, state_update: update_types.StateUpdate) -> None: column for column in definition.ordering ] - def _set_used_tips( # noqa: C901 - self, pipette_id: str, well_name: str, labware_id: str - ) -> None: + def _set_used_tips(self, pipette_id: str, well_name: str, labware_id: str) -> None: columns = self._state.column_by_labware_id.get(labware_id, []) wells = self._state.tips_by_labware_id.get(labware_id, {}) nozzle_map = self._state.pipette_info_by_pipette_id[pipette_id].nozzle_map - - # TODO (cb, 02-28-2024): Transition from using partial nozzle map to full instrument map for the set used logic - num_nozzle_cols = len(nozzle_map.columns) - num_nozzle_rows = len(nozzle_map.rows) - - critical_column = 0 - critical_row = 0 - for column in columns: - if well_name in column: - critical_row = column.index(well_name) - critical_column = columns.index(column) - - for i in range(num_nozzle_cols): - for j in range(num_nozzle_rows): - if nozzle_map.starting_nozzle == "A1": - if (critical_column + i < len(columns)) and ( - critical_row + j < len(columns[critical_column]) - ): - well = columns[critical_column + i][critical_row + j] - wells[well] = TipRackWellState.USED - elif nozzle_map.starting_nozzle == "A12": - if (critical_column - i >= 0) and ( - critical_row + j < len(columns[critical_column]) - ): - well = columns[critical_column - i][critical_row + j] - wells[well] = TipRackWellState.USED - elif nozzle_map.starting_nozzle == "H1": - if (critical_column + i < len(columns)) and (critical_row - j >= 0): - well = columns[critical_column + i][critical_row - j] - wells[well] = TipRackWellState.USED - elif nozzle_map.starting_nozzle == "H12": - if (critical_column - i >= 0) and (critical_row - j >= 0): - well = columns[critical_column - i][critical_row - j] - wells[well] = TipRackWellState.USED + for well in wells_covered_dense(nozzle_map, well_name, columns): + wells[well] = TipRackWellState.USED class TipView(HasState[TipState]): @@ -174,6 +141,7 @@ def get_next_tip( # noqa: C901 wells = self._state.tips_by_labware_id.get(labware_id, {}) columns = self._state.column_by_labware_id.get(labware_id, []) + # TODO(sf): I'm pretty sure this can be replaced with wells_covered_96 but I'm not quite sure how def _identify_tip_cluster( active_columns: int, active_rows: int, diff --git a/api/src/opentrons/protocol_engine/state/update_types.py b/api/src/opentrons/protocol_engine/state/update_types.py index 4487a503173..fa1febe1d2f 100644 --- a/api/src/opentrons/protocol_engine/state/update_types.py +++ b/api/src/opentrons/protocol_engine/state/update_types.py @@ -206,7 +206,7 @@ class LiquidOperatedUpdate: """An update from operating a liquid.""" labware_id: str - well_name: str + well_names: list[str] volume_added: float | ClearType @@ -441,12 +441,12 @@ def set_liquid_probed( ) def set_liquid_operated( - self, labware_id: str, well_name: str, volume_added: float | ClearType + self, labware_id: str, well_names: list[str], volume_added: float | ClearType ) -> None: """Update liquid volumes in well state. See `OperateLiquidUpdate`.""" self.liquid_operated = LiquidOperatedUpdate( labware_id=labware_id, - well_name=well_name, + well_names=well_names, volume_added=volume_added, ) diff --git a/api/src/opentrons/protocol_engine/state/wells.py b/api/src/opentrons/protocol_engine/state/wells.py index 5b4d3bb8d77..2791346de5c 100644 --- a/api/src/opentrons/protocol_engine/state/wells.py +++ b/api/src/opentrons/protocol_engine/state/wells.py @@ -1,4 +1,5 @@ """Basic well data state and store.""" + from dataclasses import dataclass from typing import Dict, List, Union, Iterator, Optional, Tuple, overload, TypeVar @@ -54,7 +55,7 @@ def _handle_liquid_loaded_update( labware_id = state_update.labware_id if labware_id not in self._state.loaded_volumes: self._state.loaded_volumes[labware_id] = {} - for (well, volume) in state_update.volumes.items(): + for well, volume in state_update.volumes.items(): self._state.loaded_volumes[labware_id][well] = LoadedVolumeInfo( volume=_none_from_clear(volume), last_loaded=state_update.last_loaded, @@ -83,19 +84,28 @@ def _handle_liquid_probed_update( def _handle_liquid_operated_update( self, state_update: update_types.LiquidOperatedUpdate ) -> None: - labware_id = state_update.labware_id - well_name = state_update.well_name + for well_name in state_update.well_names: + self._handle_well_operated( + state_update.labware_id, well_name, state_update.volume_added + ) + + def _handle_well_operated( + self, + labware_id: str, + well_name: str, + volume_added: float | update_types.ClearType, + ) -> None: if ( labware_id in self._state.loaded_volumes and well_name in self._state.loaded_volumes[labware_id] ): - if state_update.volume_added is update_types.CLEAR: + if volume_added is update_types.CLEAR: del self._state.loaded_volumes[labware_id][well_name] else: prev_loaded_vol_info = self._state.loaded_volumes[labware_id][well_name] assert prev_loaded_vol_info.volume is not None self._state.loaded_volumes[labware_id][well_name] = LoadedVolumeInfo( - volume=prev_loaded_vol_info.volume + state_update.volume_added, + volume=prev_loaded_vol_info.volume + volume_added, last_loaded=prev_loaded_vol_info.last_loaded, operations_since_load=prev_loaded_vol_info.operations_since_load + 1, @@ -109,16 +119,14 @@ def _handle_liquid_operated_update( labware_id in self._state.probed_volumes and well_name in self._state.probed_volumes[labware_id] ): - if state_update.volume_added is update_types.CLEAR: + if volume_added is update_types.CLEAR: del self._state.probed_volumes[labware_id][well_name] else: prev_probed_vol_info = self._state.probed_volumes[labware_id][well_name] if prev_probed_vol_info.volume is None: new_vol_info: float | None = None else: - new_vol_info = ( - prev_probed_vol_info.volume + state_update.volume_added - ) + new_vol_info = prev_probed_vol_info.volume + volume_added self._state.probed_volumes[labware_id][well_name] = ProbedVolumeInfo( volume=new_vol_info, last_probed=prev_probed_vol_info.last_probed, @@ -214,7 +222,7 @@ def _volume_from_info(info: Optional[LoadedVolumeInfo]) -> Optional[float]: def _volume_from_info( - info: Union[ProbedVolumeInfo, LoadedVolumeInfo, None] + info: Union[ProbedVolumeInfo, LoadedVolumeInfo, None], ) -> Optional[float]: if info is None: return None