Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(api): Pause when pick_up_tip() errors in a Python protocol #14753

Merged
merged 25 commits into from
Apr 9, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
42a36ec
Type-safe wait_for().
SyntaxColoring Mar 28, 2024
33225f7
Internal support for waiting for a specific command's recovery.
SyntaxColoring Mar 28, 2024
98446ba
WIP
SyntaxColoring Mar 29, 2024
459f14d
Another todo comment.
SyntaxColoring Mar 29, 2024
b4d4d80
Add SetTipUsedAction.
SyntaxColoring Apr 1, 2024
b61913f
Revert "Add SetTipUsedAction."
SyntaxColoring Apr 1, 2024
9322657
Consume tips in failed pickUpTip commands.
SyntaxColoring Apr 1, 2024
33d33c9
Document some background for the estop() method.
SyntaxColoring Apr 2, 2024
dbb03db
Update estop().
SyntaxColoring Apr 2, 2024
b13db26
Enflippen der waitenforen.
SyntaxColoring Apr 3, 2024
838572b
Add unit test for get_recovery_in_progress_for_command().
SyntaxColoring Apr 3, 2024
1d5ba4b
Merge branch 'edge' into papi_pause_on_error
SyntaxColoring Apr 4, 2024
6353d7d
Fix merge mistake.
SyntaxColoring Apr 4, 2024
1ef9bc9
Fix run-stop handling.
SyntaxColoring Apr 4, 2024
cb5bca1
Update various unit tests for new action field.
SyntaxColoring Apr 5, 2024
51bd1d2
Update and port command store tests.
SyntaxColoring Apr 5, 2024
6b79625
If you write a comment describing your sins, that makes the sins okay.
SyntaxColoring Apr 5, 2024
0213657
transports.py linting, docs, and error simplification.
SyntaxColoring Apr 8, 2024
ff22758
Small fixups.
SyntaxColoring Apr 8, 2024
8ad0eef
Refactor _wait_for().
SyntaxColoring Apr 8, 2024
bd14851
Merge branch 'edge' into papi_pause_on_error
SyntaxColoring Apr 8, 2024
a880096
Explicitly keep track of the current recovery target.
SyntaxColoring Apr 8, 2024
50e1706
Update todo comment to be more specific.
SyntaxColoring Apr 8, 2024
97b0ada
Raise a distinct error type.
SyntaxColoring Apr 8, 2024
cf1e936
Replace todo with note.
SyntaxColoring Apr 8, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion api/src/opentrons/protocol_api/core/engine/instrument.py
Original file line number Diff line number Diff line change
Expand Up @@ -408,13 +408,15 @@ def pick_up_tip(
well_name=well_name,
well_location=well_location,
)
self._engine_client.pick_up_tip(
self._engine_client.pick_up_tip_wait_for_recovery(
pipette_id=self._pipette_id,
labware_id=labware_id,
well_name=well_name,
well_location=well_location,
)

# FIX BEFORE MERGE: We should probably only set_last_location() if the
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I disagree - I think there are three answers each of which is honestly reasonable, but one which is the best (and also hardest). As a preamble, we should probably push last-location down to the engine.

  1. the best and hardest is, base the decision of setting last location on when the error occurred (during which action) and what the error is. For instance, if you got a stall while moving to the tiprack, we should clear the last location since position is now indeterminate; but if we failed our tip-state consistency check after the pickup, we should set the last location to the rack.
  2. the simplest and most robust is, always clear the last location on any error
  3. and a slightly more optimistic version is to always set it on any error since the most likely causes of failure will in fact leave the gantry in the specified location

Copy link
Contributor Author

@SyntaxColoring SyntaxColoring Apr 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After looking at this closer, I think it's appropriate to call this set_last_location() unconditionally. See this new comment:

self._engine_client.pick_up_tip_wait_for_recovery(
pipette_id=self._pipette_id,
labware_id=labware_id,
well_name=well_name,
well_location=well_location,
)
# Set the "last location" unconditionally, even if the command failed
# and was recovered from and we don't know if the pipette is physically here.
# This isn't used for path planning, but rather for implicit destination
# selection like in `pipette.aspirate(location=None)`.
self._protocol_core.set_last_location(location=location, mount=self.get_mount())

# pickUpTip succeeded.
self._protocol_core.set_last_location(location=location, mount=self.get_mount())

def drop_tip(
Expand Down
17 changes: 17 additions & 0 deletions api/src/opentrons/protocol_engine/actions/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,11 +154,28 @@ class FailCommandAction:
"""

command_id: str
"""The command to fail."""

error_id: str
"""An ID to assign to the command's error.

Must be unique to this occurrence of the error.
"""

failed_at: datetime
"""When the command failed."""

error: EnumeratedError
"""The underlying exception that caused this command to fail."""

notes: List[CommandNote]
"""Overwrite the command's `.notes` with these."""

type: ErrorRecoveryType
"""How this error should be handled in the context of the overall run."""

running_command: Command
"""The command to fail, in its prior `running` state."""


@dataclass(frozen=True)
Expand Down
19 changes: 19 additions & 0 deletions api/src/opentrons/protocol_engine/clients/sync_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,25 @@ def pick_up_tip(

return cast(commands.PickUpTipResult, result)

# FIX BEFORE MERGE?: See if we can cut down on this duplication.
# We do not want to have to do this for every method here.
def pick_up_tip_wait_for_recovery(
self,
pipette_id: str,
labware_id: str,
well_name: str,
well_location: WellLocation,
) -> None:
request = commands.PickUpTipCreate(
params=commands.PickUpTipParams(
pipetteId=pipette_id,
labwareId=labware_id,
wellName=well_name,
wellLocation=well_location,
)
)
self._transport.execute_command_wait_for_recovery(request=request)

def drop_tip(
self,
pipette_id: str,
Expand Down
6 changes: 6 additions & 0 deletions api/src/opentrons/protocol_engine/clients/transports.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,12 @@ def execute_command(self, request: CommandCreate) -> CommandResult:

return command.result

def execute_command_wait_for_recovery(self, request: CommandCreate) -> None:
run_coroutine_threadsafe(
self._engine.add_and_execute_command_wait_for_recovery(request=request),
loop=self._loop,
).result()

@overload
def call_method(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ async def execute(self, command_id: str) -> None:
FailCommandAction(
error=error,
command_id=running_command.id,
running_command=running_command,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess I am wondering why we need this in the action? dont we have the failed command stored in PE already?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a hack, we probably don't need it. See this comment:

# This is a quick hack so FailCommandAction handlers can get the params of the
# command that failed. We probably want this to be a new "failure details"
# object instead, similar to how succeeded commands can send a "private result"
# to Protocol Engine internals.

error_id=self._model_utils.generate_id(),
failed_at=self._model_utils.get_timestamp(),
notes=note_tracker.get_notes(),
Expand Down
67 changes: 59 additions & 8 deletions api/src/opentrons/protocol_engine/protocol_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,27 @@ async def add_and_execute_command(
await self.wait_for_command(command.id)
return self._state_store.commands.get(command.id)

async def add_and_execute_command_wait_for_recovery(
self, request: commands.CommandCreate
) -> None:
Copy link
Contributor Author

@SyntaxColoring SyntaxColoring Apr 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm open to ideas for better names for this.

Or maybe we should make it a wait_for_recovery: bool argument on the existing add_and_execute() method.

"""Like `add_and_execute_command()`, except wait for error recovery.

Unlike `add_and_execute_command()`, if the command fails, this will not
immediately return the failed command. Instead, if the error is recoverable,
it will wait until error recovery has completed (e.g. when some other task
calls `self.resume_from_recovery()`).
"""
command = self.add_command(request)
await self.wait_for_command(command_id=command.id)
await self._state_store.wait_for(
# Per the warnings on `wait_for()`, we want our condition function to
# specifically check that *this* command's recovery has completed,
# rather than just checking that the overall run state
# != "awaiting-recovery."
self.state_view.commands.get_command_recovery_complete,
command_id=command.id,
)

def estop(
self,
# TODO(mm, 2024-03-26): Maintenance runs are a robot-server concept that
Expand All @@ -251,6 +272,15 @@ def estop(
) -> None:
"""Signal to the engine that an estop event occurred.

If an estop happens while the robot is moving, lower layers physically stop
motion and raise the event as an exception, which fails the Protocol Engine
command. No action from the `ProtocolEngine` caller is needed to handle that.

However, if an estop happens in between commands, or in the middle of
a command like `comment` or `waitForDuration` that doesn't access the hardware,
`ProtocolEngine` needs to be told about it so it can treat it as a fatal run
error and stop executing more commands. This method is how to do that.

If there are any queued commands for the engine, they will be marked
as failed due to the estop event. If there aren't any queued commands
*and* this is a maintenance run (which has commands queued one-by-one),
Expand All @@ -261,14 +291,26 @@ def estop(
"""
if self._state_store.commands.get_is_stopped():
return
current_id = (
running_or_next_queued_id = (
self._state_store.commands.get_running_command_id()
or self._state_store.commands.state.queued_command_ids.head(None)
# TODO(mm, 2024-04-02): Is it possible for the next queued command to
SyntaxColoring marked this conversation as resolved.
Show resolved Hide resolved
# be a setup command? That wouldn't show up in queued_command_ids.
)
running_or_next_queued = (
self._state_store.commands.get(running_or_next_queued_id)
if running_or_next_queued_id is not None
else None
)

if current_id is not None:
if running_or_next_queued_id is not None:
assert running_or_next_queued is not None

fail_action = FailCommandAction(
command_id=current_id,
command_id=running_or_next_queued_id,
# FIXME(mm, 2024-04-02): As of https://github.com/Opentrons/opentrons/pull/14726,
# this action is only legal if the command is running, not queued.
Comment on lines +322 to +323
Copy link
Contributor Author

@SyntaxColoring SyntaxColoring Apr 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is my fault; in #14726, I neglected to account for what this estop() method was doing. I'm going to have to fix this in another PR. EXEC-382

Copy link
Contributor Author

@SyntaxColoring SyntaxColoring Apr 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a lot of this estop() method is due for a rethink. Like, I don't really get why it needs to be so different from stop(), and why it needs to be messing with things like FailCommandActions itself.

running_command=running_or_next_queued,
error_id=self._model_utils.generate_id(),
failed_at=self._model_utils.get_timestamp(),
error=EStopActivatedError(message="Estop Activated"),
Expand All @@ -277,12 +319,21 @@ def estop(
)
self._action_dispatcher.dispatch(fail_action)

# In the case where the running command was a setup command - check if there
# are any pending *run* commands and, if so, clear them all
current_id = self._state_store.commands.state.queued_command_ids.head(None)
if current_id is not None:
# The FailCommandAction above will have cleared all the queued protocol
# OR setup commands, depending on whether we gave it a protocol or setup
# command. We want both to be cleared in either case. So, do that here.
running_or_next_queued_id = (
self._state_store.commands.state.queued_command_ids.head(None)
)
if running_or_next_queued_id is not None:
running_or_next_queued = self._state_store.commands.get(
running_or_next_queued_id
)
fail_action = FailCommandAction(
command_id=current_id,
command_id=running_or_next_queued_id,
# FIXME(mm, 2024-04-02): As of https://github.com/Opentrons/opentrons/pull/14726,
# this action is only legal if the command is running, not queued.
running_command=running_or_next_queued,
error_id=self._model_utils.generate_id(),
failed_at=self._model_utils.get_timestamp(),
error=EStopActivatedError(message="Estop Activated"),
Expand Down
20 changes: 20 additions & 0 deletions api/src/opentrons/protocol_engine/state/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -715,6 +715,26 @@ def get_all_commands_final(self) -> bool:

return no_command_running and no_command_to_execute

def get_command_recovery_complete(self, command_id: str) -> bool:
SyntaxColoring marked this conversation as resolved.
Show resolved Hide resolved
"""Return whether we're finished with error recovery for the given command.

If the given command didn't have a recovery phase (because it hasn't completed
yet, or because it succeeded without an error, or because it failed with an
error that wasn't recoverable), that counts as `True`.

If the given command did have a recovery phase, but it was interrupted by a
stop, that also counts as `True`.
"""
command_is_most_recent_to_fail = (
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a little gross to me, could we save the command we're currently recovering from explicitly? like, what happens if a fixit command fails, wouldn't it instantly make this False?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed in a880096.

self._state.failed_command is not None
and command_id == self._state.failed_command.command.id
)
if command_is_most_recent_to_fail:
recovery_is_ongoing = self.get_status() == EngineStatus.AWAITING_RECOVERY
return not recovery_is_ongoing
else:
return True

def raise_fatal_command_error(self) -> None:
"""Raise the run's fatal command error, if there was one, as an exception.

Expand Down
15 changes: 9 additions & 6 deletions api/src/opentrons/protocol_engine/state/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@

from dataclasses import dataclass
from functools import partial
from typing import Any, Callable, Dict, List, Optional, Sequence, TypeVar
from typing import Callable, Dict, List, Optional, Sequence, TypeVar
from typing_extensions import ParamSpec

from opentrons_shared_data.deck.dev_types import DeckDefinitionV4

Expand All @@ -30,7 +31,9 @@
from .state_summary import StateSummary
from ..types import DeckConfigurationType

ReturnT = TypeVar("ReturnT")

_ParamsT = ParamSpec("_ParamsT")
_ReturnT = TypeVar("_ReturnT")


@dataclass(frozen=True)
Expand Down Expand Up @@ -207,10 +210,10 @@ def handle_action(self, action: Action) -> None:

async def wait_for(
self,
condition: Callable[..., Optional[ReturnT]],
*args: Any,
**kwargs: Any,
) -> ReturnT:
condition: Callable[_ParamsT, Optional[_ReturnT]],
*args: _ParamsT.args,
**kwargs: _ParamsT.kwargs,
) -> _ReturnT:
"""Wait for a condition to become true, checking whenever state changes.

If the condition is already true, return immediately.
Expand Down
37 changes: 35 additions & 2 deletions api/src/opentrons/protocol_engine/state/tips.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@
from ..actions import (
Action,
SucceedCommandAction,
FailCommandAction,
ResetTipsAction,
)
from ..commands import (
Command,
LoadLabwareResult,
PickUpTip,
PickUpTipResult,
DropTipResult,
DropTipInPlaceResult,
Expand All @@ -20,6 +22,7 @@
PipetteConfigUpdateResultMixin,
PipetteNozzleLayoutResultMixin,
)
from ..error_recovery_policy import ErrorRecoveryType

from opentrons.hardware_control.nozzle_manager import NozzleMap

Expand Down Expand Up @@ -71,7 +74,7 @@ def handle_action(self, action: Action) -> None:
self._state.channels_by_pipette_id[pipette_id] = config.channels
self._state.active_channels_by_pipette_id[pipette_id] = config.channels
self._state.nozzle_map_by_pipette_id[pipette_id] = config.nozzle_map
self._handle_command(action.command)
self._handle_succeeded_command(action.command)

if isinstance(action.private_result, PipetteNozzleLayoutResultMixin):
pipette_id = action.private_result.pipette_id
Expand All @@ -86,6 +89,9 @@ def handle_action(self, action: Action) -> None:
pipette_id
] = self._state.channels_by_pipette_id[pipette_id]

elif isinstance(action, FailCommandAction):
self._handle_failed_command(action)

elif isinstance(action, ResetTipsAction):
labware_id = action.labware_id

Expand All @@ -94,7 +100,7 @@ def handle_action(self, action: Action) -> None:
well_name
] = TipRackWellState.CLEAN

def _handle_command(self, command: Command) -> None:
def _handle_succeeded_command(self, command: Command) -> None:
if (
isinstance(command.result, LoadLabwareResult)
and command.result.definition.parameters.isTiprack
Expand Down Expand Up @@ -124,6 +130,33 @@ def _handle_command(self, command: Command) -> None:
pipette_id = command.params.pipetteId
self._state.length_by_pipette_id.pop(pipette_id, None)

def _handle_failed_command(
self,
action: FailCommandAction,
) -> None:
# If a pickUpTip command fails recoverably, mark the tips as used. This way,
# when the protocol is resumed and the Python Protocol API calls
# `get_next_tip()`, we'll move on to other tips as expected.
#
# We don't attempt this for nonrecoverable errors because maybe the failure
# was due to a bad labware ID or well name.
if (
isinstance(action.running_command, PickUpTip)
and action.type != ErrorRecoveryType.FAIL_RUN
):
self._set_used_tips(
pipette_id=action.running_command.params.pipetteId,
labware_id=action.running_command.params.labwareId,
well_name=action.running_command.params.wellName,
)
# TODO(mm, 2024-04-01): We're logically removing the tip from the tip rack,
SyntaxColoring marked this conversation as resolved.
Show resolved Hide resolved
# but we're not logically updating the pipette to have that tip on it,
# which is inconsistent and confusing.
#
# To fix that, we need the tip length. But that traditionally comes to us
# through the command result, which we don't have if the command failed. We
# may need to expand failed commands to have a private result.

def _set_used_tips( # noqa: C901
self, pipette_id: str, well_name: str, labware_id: str
) -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ def map_command( # noqa: C901
results.append(
pe_actions.FailCommandAction(
command_id=running_command.id,
running_command=running_command,
error_id=ModelUtils.generate_id(),
failed_at=now,
error=LegacyContextCommandError(command_error),
Expand Down
Loading