Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(api): Pause when pick_up_tip() errors in a Python protocol #14753

Merged
merged 25 commits into from
Apr 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
42a36ec
Type-safe wait_for().
SyntaxColoring Mar 28, 2024
33225f7
Internal support for waiting for a specific command's recovery.
SyntaxColoring Mar 28, 2024
98446ba
WIP
SyntaxColoring Mar 29, 2024
459f14d
Another todo comment.
SyntaxColoring Mar 29, 2024
b4d4d80
Add SetTipUsedAction.
SyntaxColoring Apr 1, 2024
b61913f
Revert "Add SetTipUsedAction."
SyntaxColoring Apr 1, 2024
9322657
Consume tips in failed pickUpTip commands.
SyntaxColoring Apr 1, 2024
33d33c9
Document some background for the estop() method.
SyntaxColoring Apr 2, 2024
dbb03db
Update estop().
SyntaxColoring Apr 2, 2024
b13db26
Enflippen der waitenforen.
SyntaxColoring Apr 3, 2024
838572b
Add unit test for get_recovery_in_progress_for_command().
SyntaxColoring Apr 3, 2024
1d5ba4b
Merge branch 'edge' into papi_pause_on_error
SyntaxColoring Apr 4, 2024
6353d7d
Fix merge mistake.
SyntaxColoring Apr 4, 2024
1ef9bc9
Fix run-stop handling.
SyntaxColoring Apr 4, 2024
cb5bca1
Update various unit tests for new action field.
SyntaxColoring Apr 5, 2024
51bd1d2
Update and port command store tests.
SyntaxColoring Apr 5, 2024
6b79625
If you write a comment describing your sins, that makes the sins okay.
SyntaxColoring Apr 5, 2024
0213657
transports.py linting, docs, and error simplification.
SyntaxColoring Apr 8, 2024
ff22758
Small fixups.
SyntaxColoring Apr 8, 2024
8ad0eef
Refactor _wait_for().
SyntaxColoring Apr 8, 2024
bd14851
Merge branch 'edge' into papi_pause_on_error
SyntaxColoring Apr 8, 2024
a880096
Explicitly keep track of the current recovery target.
SyntaxColoring Apr 8, 2024
50e1706
Update todo comment to be more specific.
SyntaxColoring Apr 8, 2024
97b0ada
Raise a distinct error type.
SyntaxColoring Apr 8, 2024
cf1e936
Replace todo with note.
SyntaxColoring Apr 8, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion api/src/opentrons/protocol_api/core/engine/instrument.py
Original file line number Diff line number Diff line change
Expand Up @@ -408,13 +408,18 @@ def pick_up_tip(
well_name=well_name,
well_location=well_location,
)
self._engine_client.pick_up_tip(

self._engine_client.pick_up_tip_wait_for_recovery(
pipette_id=self._pipette_id,
labware_id=labware_id,
well_name=well_name,
well_location=well_location,
)

# Set the "last location" unconditionally, even if the command failed
# and was recovered from and we don't know if the pipette is physically here.
# This isn't used for path planning, but rather for implicit destination
# selection like in `pipette.aspirate(location=None)`.
self._protocol_core.set_last_location(location=location, mount=self.get_mount())

def drop_tip(
Expand Down
2 changes: 2 additions & 0 deletions api/src/opentrons/protocol_engine/actions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
PauseAction,
PauseSource,
StopAction,
ResumeFromRecoveryAction,
FinishAction,
HardwareStoppedAction,
QueueCommandAction,
Expand Down Expand Up @@ -38,6 +39,7 @@
"PlayAction",
"PauseAction",
"StopAction",
"ResumeFromRecoveryAction",
"FinishAction",
"HardwareStoppedAction",
"QueueCommandAction",
Expand Down
21 changes: 21 additions & 0 deletions api/src/opentrons/protocol_engine/actions/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,11 +154,32 @@ class FailCommandAction:
"""

command_id: str
"""The command to fail."""

error_id: str
"""An ID to assign to the command's error.

Must be unique to this occurrence of the error.
"""

failed_at: datetime
"""When the command failed."""

error: EnumeratedError
"""The underlying exception that caused this command to fail."""

notes: List[CommandNote]
"""Overwrite the command's `.notes` with these."""

type: ErrorRecoveryType
"""How this error should be handled in the context of the overall run."""

# This is a quick hack so FailCommandAction handlers can get the params of the
# command that failed. We probably want this to be a new "failure details"
# object instead, similar to how succeeded commands can send a "private result"
# to Protocol Engine internals.
running_command: Command
"""The command to fail, in its prior `running` state."""


@dataclass(frozen=True)
Expand Down
23 changes: 23 additions & 0 deletions api/src/opentrons/protocol_engine/clients/sync_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,29 @@ def pick_up_tip(

return cast(commands.PickUpTipResult, result)

def pick_up_tip_wait_for_recovery(
self,
pipette_id: str,
labware_id: str,
well_name: str,
well_location: WellLocation,
) -> commands.PickUpTip:
"""Execute a PickUpTip, wait for any error recovery, and return it.

Note that the returned command will not necessarily have a `result`.
"""
request = commands.PickUpTipCreate(
params=commands.PickUpTipParams(
pipetteId=pipette_id,
labwareId=labware_id,
wellName=well_name,
wellLocation=well_location,
)
)
command = self._transport.execute_command_wait_for_recovery(request=request)

return cast(commands.PickUpTip, command)

def drop_tip(
self,
pipette_id: str,
Expand Down
115 changes: 96 additions & 19 deletions api/src/opentrons/protocol_engine/clients/transports.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,28 @@
"""A helper for controlling a `ProtocolEngine` without async/await."""
from asyncio import AbstractEventLoop, run_coroutine_threadsafe
from typing import Any, overload
from typing import Any, Final, overload
from typing_extensions import Literal

from opentrons_shared_data.labware.dev_types import LabwareUri
from opentrons_shared_data.labware.labware_definition import LabwareDefinition


from ..protocol_engine import ProtocolEngine
from ..errors import ProtocolCommandFailedError
from ..error_recovery_policy import ErrorRecoveryType
from ..state import StateView
from ..commands import CommandCreate, CommandResult
from ..commands import Command, CommandCreate, CommandResult, CommandStatus


class RunStoppedBeforeCommandError(RuntimeError):
"""Raised if the ProtocolEngine was stopped before a command could start."""

def __init__(self, command: Command) -> None:
self._command = command
super().__init__(
f"The run was stopped"
f" before {command.commandType} command {command.id} could execute."
)


class ChildThreadTransport:
Expand All @@ -30,16 +43,22 @@ def __init__(self, engine: ProtocolEngine, loop: AbstractEventLoop) -> None:
want to synchronously access it.
loop: The event loop that `engine` is running in (in the other thread).
"""
self._engine = engine
self._loop = loop
# We might access these from different threads,
SyntaxColoring marked this conversation as resolved.
Show resolved Hide resolved
# so let's make them Final for (shallow) immutability.
self._engine: Final = engine
self._loop: Final = loop

@property
def state(self) -> StateView:
"""Get a view of the Protocol Engine's state."""
return self._engine.state_view

def execute_command(self, request: CommandCreate) -> CommandResult:
"""Execute a ProtocolEngine command, blocking until the command completes.
"""Execute a ProtocolEngine command.

This blocks until the command completes. If the command fails, this will always
raise the failure as an exception--even if ProtocolEngine deemed the failure
recoverable.

Args:
request: The ProtocolEngine command request
Expand All @@ -48,8 +67,11 @@ def execute_command(self, request: CommandCreate) -> CommandResult:
The command's result data.

Raises:
ProtocolEngineError: if the command execution is not successful,
the specific error that cause the command to fail is raised.
ProtocolEngineError: If the command execution was not successful,
the specific error that caused the command to fail is raised.

If the run was stopped before the command could complete, that's
also signaled as this exception.
"""
command = run_coroutine_threadsafe(
self._engine.add_and_execute_command(request=request),
Expand All @@ -64,21 +86,76 @@ def execute_command(self, request: CommandCreate) -> CommandResult:
message=f"{error.errorType}: {error.detail}",
)

# FIXME(mm, 2023-04-10): This assert can easily trigger from this sequence:
#
# 1. The engine is paused.
# 2. The user's Python script calls this method to start a new command,
# which remains `queued` because of the pause.
# 3. The engine is stopped.
#
# The returned command will be `queued`, so it won't have a result.
#
# We need to figure out a proper way to report this condition to callers
# so they correctly interpret it as an intentional stop, not an internal error.
assert command.result is not None, f"Expected Command {command} to have result"
if command.result is None:
# This can happen with a certain pause timing:
#
# 1. The engine is paused.
# 2. The user's Python script calls this method to start a new command,
# which remains `queued` because of the pause.
# 3. The engine is stopped. The returned command will be `queued`
# and won't have a result.
raise RunStoppedBeforeCommandError(command)

return command.result

def execute_command_wait_for_recovery(self, request: CommandCreate) -> Command:
"""Execute a ProtocolEngine command, including error recovery.

This blocks until the command completes. Additionally, if the command fails,
this will continue to block until its error recovery has been completed.

Args:
request: The ProtocolEngine command request.

Returns:
The command. If error recovery happened for it, the command will be
reported here as failed.

Raises:
ProtocolEngineError: If the command failed, *and* the failure was not
recovered from.

If the run was stopped before the command could complete, that's
also signalled as this exception.
"""

async def run_in_pe_thread() -> Command:
command = await self._engine.add_and_execute_command_wait_for_recovery(
request=request
)

if command.error is not None:
error_was_recovered_from = (
self._engine.state_view.commands.get_error_recovery_type(command.id)
== ErrorRecoveryType.WAIT_FOR_RECOVERY
)
if not error_was_recovered_from:
error = command.error
# TODO: this needs to have an actual code
raise ProtocolCommandFailedError(
original_error=error,
message=f"{error.errorType}: {error.detail}",
)

elif command.status == CommandStatus.QUEUED:
# This can happen with a certain pause timing:
#
# 1. The engine is paused.
# 2. The user's Python script calls this method to start a new command,
# which remains `queued` because of the pause.
# 3. The engine is stopped. The returned command will be `queued`,
# and won't have a result.
raise RunStoppedBeforeCommandError(command)

return command

command = run_coroutine_threadsafe(
run_in_pe_thread(),
loop=self._loop,
).result()

return command

@overload
def call_method(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ async def execute(self, command_id: str) -> None:
FailCommandAction(
error=error,
command_id=running_command.id,
running_command=running_command,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess I am wondering why we need this in the action? dont we have the failed command stored in PE already?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a hack, we probably don't need it. See this comment:

# This is a quick hack so FailCommandAction handlers can get the params of the
# command that failed. We probably want this to be a new "failure details"
# object instead, similar to how succeeded commands can send a "private result"
# to Protocol Engine internals.

error_id=self._model_utils.generate_id(),
failed_at=self._model_utils.get_timestamp(),
notes=note_tracker.get_notes(),
Expand Down
3 changes: 3 additions & 0 deletions api/src/opentrons/protocol_engine/execution/queue_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ async def _run_commands(self) -> None:
command_id = await self._state_store.wait_for(
condition=self._state_store.commands.get_next_to_execute
)
# Assert for type hinting. This is valid because the wait_for() above
# only returns when the value is truthy.
assert command_id is not None
except RunStoppedError:
# There are no more commands that we should execute, either because the run has
# completed on its own, or because a client requested it to stop.
Expand Down
81 changes: 71 additions & 10 deletions api/src/opentrons/protocol_engine/protocol_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,14 +234,45 @@ async def add_and_execute_command(
the command in state.

Returns:
The command. If the command completed, it will be succeeded or failed.
The command.

If the command completed, it will be succeeded or failed.

If the engine was stopped before it reached the command,
the command will be queued.
"""
command = self.add_command(request)
await self.wait_for_command(command.id)
return self._state_store.commands.get(command.id)

async def add_and_execute_command_wait_for_recovery(
self, request: commands.CommandCreate
) -> commands.Command:
"""Like `add_and_execute_command()`, except wait for error recovery.

Unlike `add_and_execute_command()`, if the command fails, this will not
immediately return the failed command. Instead, if the error is recoverable,
it will wait until error recovery has completed (e.g. when some other task
calls `self.resume_from_recovery()`).

Returns:
The command.

If the command completed, it will be succeeded or failed. If it failed
and then its failure was recovered from, it will still be failed.

If the engine was stopped before it reached the command,
the command will be queued.
"""
queued_command = self.add_command(request)
await self.wait_for_command(command_id=queued_command.id)
completed_command = self._state_store.commands.get(queued_command.id)
await self._state_store.wait_for_not(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it kind of feels like we want to gate this on a synchronous call to whether we're now in recovery for the command, but this in general feels a little race condition-y because of the separation between command state and run state. Specifically,

  • wait_for_command returns on the asyncio spin after a FailCommandAction or a SucceedCommandAction for this command (we can neglect the queued-and-stopping part for now)
  • but the get_recovery_in_progress_for_command predicate is based on the queue status being awaiting-recovery

Do we guarantee mechanically that the queue status will be set before the asyncio spin after the command terminal action is dispatched? Are we sure this won't occasionally race and return early?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, if I understand your concern correctly:

When await self.wait_for_command(command_id=queued_command.id) returns, we are guaranteed that the action that finalized the command has already been fully processed, and that get_recovery_in_progress_for_command() will see its results on the state.

When we handle an action, we send it to each of the substores in a loop. Only after that's done do we notify subscribers like this one.

self.state_view.commands.get_recovery_in_progress_for_command,
queued_command.id,
)
return completed_command

def estop(
self,
# TODO(mm, 2024-03-26): Maintenance runs are a robot-server concept that
Expand All @@ -251,6 +282,15 @@ def estop(
) -> None:
"""Signal to the engine that an estop event occurred.

If an estop happens while the robot is moving, lower layers physically stop
motion and raise the event as an exception, which fails the Protocol Engine
command. No action from the `ProtocolEngine` caller is needed to handle that.

However, if an estop happens in between commands, or in the middle of
a command like `comment` or `waitForDuration` that doesn't access the hardware,
`ProtocolEngine` needs to be told about it so it can treat it as a fatal run
error and stop executing more commands. This method is how to do that.

If there are any queued commands for the engine, they will be marked
as failed due to the estop event. If there aren't any queued commands
*and* this is a maintenance run (which has commands queued one-by-one),
Expand All @@ -261,15 +301,27 @@ def estop(
"""
if self._state_store.commands.get_is_stopped():
return

current_id = (
running_or_next_queued_id = (
self._state_store.commands.get_running_command_id()
or self._state_store.commands.get_queue_ids().head(None)
# TODO(mm, 2024-04-02): This logic looks wrong whenever the next queued
# command is a setup command, which is the normal case in maintenance
# runs. Setup commands won't show up in commands.get_queue_ids().
)
running_or_next_queued = (
self._state_store.commands.get(running_or_next_queued_id)
if running_or_next_queued_id is not None
else None
)

if current_id is not None:
if running_or_next_queued_id is not None:
assert running_or_next_queued is not None

fail_action = FailCommandAction(
command_id=current_id,
command_id=running_or_next_queued_id,
# FIXME(mm, 2024-04-02): As of https://github.com/Opentrons/opentrons/pull/14726,
# this action is only legal if the command is running, not queued.
Comment on lines +322 to +323
Copy link
Contributor Author

@SyntaxColoring SyntaxColoring Apr 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is my fault; in #14726, I neglected to account for what this estop() method was doing. I'm going to have to fix this in another PR. EXEC-382

Copy link
Contributor Author

@SyntaxColoring SyntaxColoring Apr 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a lot of this estop() method is due for a rethink. Like, I don't really get why it needs to be so different from stop(), and why it needs to be messing with things like FailCommandActions itself.

running_command=running_or_next_queued,
error_id=self._model_utils.generate_id(),
failed_at=self._model_utils.get_timestamp(),
error=EStopActivatedError(message="Estop Activated"),
Expand All @@ -278,12 +330,21 @@ def estop(
)
self._action_dispatcher.dispatch(fail_action)

# In the case where the running command was a setup command - check if there
# are any pending *run* commands and, if so, clear them all
current_id = self._state_store.commands.get_queue_ids().head(None)
if current_id is not None:
# The FailCommandAction above will have cleared all the queued protocol
# OR setup commands, depending on whether we gave it a protocol or setup
# command. We want both to be cleared in either case. So, do that here.
running_or_next_queued_id = self._state_store.commands.get_queue_ids().head(
None
)
if running_or_next_queued_id is not None:
running_or_next_queued = self._state_store.commands.get(
running_or_next_queued_id
)
fail_action = FailCommandAction(
command_id=current_id,
command_id=running_or_next_queued_id,
# FIXME(mm, 2024-04-02): As of https://github.com/Opentrons/opentrons/pull/14726,
# this action is only legal if the command is running, not queued.
running_command=running_or_next_queued,
error_id=self._model_utils.generate_id(),
failed_at=self._model_utils.get_timestamp(),
error=EStopActivatedError(message="Estop Activated"),
Expand Down
Loading
Loading