Skip to content

Commit

Permalink
feat(api): command executors can add notes
Browse files Browse the repository at this point in the history
This PR adds an interface for command executors to add notes to the
command that they're executing, succeed or fail, and uses that interface
to implement an example note warning of aspirate volume rounding.

The interface is done by callbacks from the command implementations to
the command executor that is executing each command. Implementations, or
the functions they call, are responsible for creating the CommandNote
instances. The executor just gathers a list of commands and then issues
an UpdateCommandAction with the new list. If there are already notes on
the command before execution, the new ones are appended. While this
doesn't seem necessary right now, we'll want this behavior generically
when we have notes also coming from hardware.

One thing that's a little annoying about this interface is that the
callbacks have to be threaded through all the way to whatever wants to
add a note. For instance, in the example aspirate-rounded note, we have
to pass the callback all the way into the pipetting executor. An
alternative would be to have an interface that can add notes on one of
the state instances that can be called from anything that can access the
state instance. The thing is, that function won't have the context to
know when commands are currently executing or are done executing without
having some sort of state machine that only looks at
UpdateCommandActions from elsewhere. That code would be pretty ugly. The
alternative there would be to issue a new UpdateCommandAction for each
note; this would have race condition issues if it provided new values
for the notes list. The alternative _there_ would be a new action called
like AddCommandNoteAction that carries the attention of adding to the
notes list along with it. Of course, that command is not idempotent.

This interface works well enough for now that I think we should roll
with it, and feel free to change it later.

Closes EXEC-291
  • Loading branch information
sfoster1 committed Mar 13, 2024
1 parent 051b50d commit ed31522
Show file tree
Hide file tree
Showing 16 changed files with 351 additions and 58 deletions.
2 changes: 1 addition & 1 deletion api/src/opentrons/protocol_engine/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@
)
from .protocol_engine import ProtocolEngine
from .errors import ProtocolEngineError, ErrorOccurrence
from .notes import CommandNote
from .commands import (
Command,
CommandParams,
CommandCreate,
CommandStatus,
CommandType,
CommandIntent,
CommandNote,
)
from .state import State, StateView, StateSummary, CommandSlice, CurrentCommand, Config
from .plugins import AbstractPlugin
Expand Down
2 changes: 0 additions & 2 deletions api/src/opentrons/protocol_engine/commands/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
BaseCommandCreate,
CommandStatus,
CommandIntent,
CommandNote,
)

from .command_unions import (
Expand Down Expand Up @@ -333,7 +332,6 @@
"BaseCommandCreate",
"CommandStatus",
"CommandIntent",
"CommandNote",
# command parameter hashing
"hash_command_params",
# command schema generation
Expand Down
8 changes: 7 additions & 1 deletion api/src/opentrons/protocol_engine/commands/aspirate.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
if TYPE_CHECKING:
from ..execution import MovementHandler, PipettingHandler
from ..state import StateView
from ..notes import CommandNoteAdder


AspirateCommandType = Literal["aspirate"]
Expand Down Expand Up @@ -48,12 +49,14 @@ def __init__(
state_view: StateView,
hardware_api: HardwareControlAPI,
movement: MovementHandler,
command_note_adder: CommandNoteAdder,
**kwargs: object,
) -> None:
self._pipetting = pipetting
self._state_view = state_view
self._hardware_api = hardware_api
self._movement = movement
self._command_note_adder = command_note_adder

async def execute(self, params: AspirateParams) -> AspirateResult:
"""Move to and aspirate from the requested well.
Expand Down Expand Up @@ -98,7 +101,10 @@ async def execute(self, params: AspirateParams) -> AspirateResult:
)

volume = await self._pipetting.aspirate_in_place(
pipette_id=pipette_id, volume=params.volume, flow_rate=params.flowRate
pipette_id=pipette_id,
volume=params.volume,
flow_rate=params.flowRate,
command_note_adder=self._command_note_adder,
)

return AspirateResult(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
if TYPE_CHECKING:
from ..execution import PipettingHandler
from ..state import StateView

from ..notes import CommandNoteAdder

AspirateInPlaceCommandType = Literal["aspirateInPlace"]

Expand All @@ -45,11 +45,13 @@ def __init__(
pipetting: PipettingHandler,
hardware_api: HardwareControlAPI,
state_view: StateView,
command_note_adder: CommandNoteAdder,
**kwargs: object,
) -> None:
self._pipetting = pipetting
self._state_view = state_view
self._hardware_api = hardware_api
self._command_note_adder = command_note_adder

async def execute(self, params: AspirateInPlaceParams) -> AspirateInPlaceResult:
"""Aspirate without moving the pipette.
Expand All @@ -69,7 +71,10 @@ async def execute(self, params: AspirateInPlaceParams) -> AspirateInPlaceResult:
" so the plunger can be reset in a known safe position."
)
volume = await self._pipetting.aspirate_in_place(
pipette_id=params.pipetteId, volume=params.volume, flow_rate=params.flowRate
pipette_id=params.pipetteId,
volume=params.volume,
flow_rate=params.flowRate,
command_note_adder=self._command_note_adder,
)

return AspirateInPlaceResult(volume=volume)
Expand Down
28 changes: 3 additions & 25 deletions api/src/opentrons/protocol_engine/commands/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
Optional,
TypeVar,
Tuple,
Union,
Literal,
List,
)

Expand All @@ -23,6 +21,7 @@
from opentrons.hardware_control import HardwareControlAPI

from ..errors import ErrorOccurrence
from ..notes import CommandNote, CommandNoteAdder

# Work around type-only circular dependencies.
if TYPE_CHECKING:
Expand All @@ -36,29 +35,6 @@

CommandPrivateResultT = TypeVar("CommandPrivateResultT")

NoteKind = Union[Literal["warning", "information"], str]


class CommandNote(BaseModel):
"""A note about a command's execution or dispatch."""

noteKind: NoteKind = Field(
...,
description="The kind of note this is. Only the literal possibilities should be"
" relied upon programmatically.",
)
shortMessage: str = Field(
...,
description="The accompanying human-readable short message (suitable for display in a single line)",
)
longMessage: str = Field(
...,
description="A longer message that may contain newlines and formatting characters describing the note.",
)
source: str = Field(
..., description="An identifier for the party that created the note"
)


class CommandStatus(str, Enum):
"""Command execution status."""
Expand Down Expand Up @@ -215,6 +191,7 @@ def __init__(
run_control: execution.RunControlHandler,
rail_lights: execution.RailLightsHandler,
status_bar: execution.StatusBarHandler,
command_note_adder: CommandNoteAdder,
) -> None:
"""Initialize the command implementation with execution handlers."""
pass
Expand Down Expand Up @@ -256,6 +233,7 @@ def __init__(
run_control: execution.RunControlHandler,
rail_lights: execution.RailLightsHandler,
status_bar: execution.StatusBarHandler,
command_note_adder: CommandNoteAdder,
) -> None:
"""Initialize the command implementation with execution handlers."""
pass
Expand Down
68 changes: 61 additions & 7 deletions api/src/opentrons/protocol_engine/execution/command_executor.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""Command side-effect execution logic container."""
import asyncio
from logging import getLogger
from typing import Optional
from typing import Optional, List, Dict, Any, Protocol

from opentrons.hardware_control import HardwareControlAPI

Expand All @@ -18,10 +18,12 @@
AbstractCommandImpl,
CommandResult,
CommandPrivateResult,
Command,
)
from ..actions import ActionDispatcher, UpdateCommandAction, FailCommandAction
from ..errors import RunStoppedError
from ..errors.exceptions import EStopActivatedError as PE_EStopActivatedError
from ..notes import CommandNote, CommandNoteTracker
from .equipment import EquipmentHandler
from .movement import MovementHandler
from .gantry_mover import GantryMover
Expand All @@ -36,6 +38,29 @@
log = getLogger(__name__)


class CommandNoteTrackerProvider(Protocol):
"""The correct shape for a function that provides a CommandNoteTracker.
This function will be called by the executor once for each call to execute().
It is mostly useful for testing harnesses.
"""

def __call__(self) -> CommandNoteTracker:
"""Provide a new CommandNoteTracker."""
...


class _NoteTracker(CommandNoteTracker):
def __init__(self) -> None:
self._notes: List[CommandNote] = []

def __call__(self, note: CommandNote) -> None:
self._notes.append(note)

def get_notes(self) -> List[CommandNote]:
return self._notes


class CommandExecutor:
"""CommandExecutor container class.
Expand All @@ -58,6 +83,7 @@ def __init__(
rail_lights: RailLightsHandler,
status_bar: StatusBarHandler,
model_utils: Optional[ModelUtils] = None,
command_note_tracker_provider: Optional[CommandNoteTrackerProvider] = None,
) -> None:
"""Initialize the CommandExecutor with access to its dependencies."""
self._hardware_api = hardware_api
Expand All @@ -73,6 +99,9 @@ def __init__(
self._rail_lights = rail_lights
self._model_utils = model_utils or ModelUtils()
self._status_bar = status_bar
self._command_note_tracker_provider = (
command_note_tracker_provider or _NoteTracker
)

async def execute(self, command_id: str) -> None:
"""Run a given command's execution procedure.
Expand All @@ -82,6 +111,7 @@ async def execute(self, command_id: str) -> None:
command itself will be looked up from state.
"""
command = self._state_store.commands.get(command_id=command_id)
note_tracker = self._command_note_tracker_provider()
command_impl = command._ImplementationCls(
state_view=self._state_store,
hardware_api=self._hardware_api,
Expand All @@ -94,6 +124,7 @@ async def execute(self, command_id: str) -> None:
run_control=self._run_control,
rail_lights=self._rail_lights,
status_bar=self._status_bar,
command_note_adder=note_tracker,
)

started_at = self._model_utils.get_timestamp()
Expand Down Expand Up @@ -128,6 +159,17 @@ async def execute(self, command_id: str) -> None:
error = PE_EStopActivatedError(message=str(error), wrapping=[error])
elif not isinstance(error, EnumeratedError):
error = PythonException(error)
notes_update = _append_notes_if_notes(
running_command, note_tracker.get_notes()
)

if notes_update:
command_with_new_notes = running_command.copy(update=notes_update)
self._action_dispatcher.dispatch(
UpdateCommandAction(
command=command_with_new_notes, private_result=None
)
)

self._action_dispatcher.dispatch(
FailCommandAction(
Expand All @@ -138,15 +180,27 @@ async def execute(self, command_id: str) -> None:
)
)
else:
completed_command = running_command.copy(
update={
"result": result,
"status": CommandStatus.SUCCEEDED,
"completedAt": self._model_utils.get_timestamp(),
}
update = {
"result": result,
"status": CommandStatus.SUCCEEDED,
"completedAt": self._model_utils.get_timestamp(),
}
update.update(
_append_notes_if_notes(running_command, note_tracker.get_notes())
)
completed_command = running_command.copy(update=update)
self._action_dispatcher.dispatch(
UpdateCommandAction(
command=completed_command, private_result=private_result
),
)


def _append_notes_if_notes(
running_command: Command, notes: List[CommandNote]
) -> Dict[str, Any]:
if not notes:
return {}
if running_command.notes is None:
return {"notes": notes}
return {"notes": running_command.notes + notes}
35 changes: 31 additions & 4 deletions api/src/opentrons/protocol_engine/execution/pipetting.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from opentrons.hardware_control import HardwareControlAPI

from ..state import StateView, HardwarePipette
from ..notes import CommandNoteAdder, CommandNote
from ..errors.exceptions import (
TipNotAttachedError,
InvalidAspirateVolumeError,
Expand Down Expand Up @@ -39,6 +40,7 @@ async def aspirate_in_place(
pipette_id: str,
volume: float,
flow_rate: float,
command_note_adder: CommandNoteAdder,
) -> float:
"""Set flow-rate and aspirate."""

Expand Down Expand Up @@ -88,11 +90,15 @@ async def aspirate_in_place(
pipette_id: str,
volume: float,
flow_rate: float,
command_note_adder: CommandNoteAdder,
) -> float:
"""Set flow-rate and aspirate."""
# get mount and config data from state and hardware controller
adjusted_volume = _validate_aspirate_volume(
state_view=self._state_view, pipette_id=pipette_id, aspirate_volume=volume
state_view=self._state_view,
pipette_id=pipette_id,
aspirate_volume=volume,
command_note_adder=command_note_adder,
)
hw_pipette = self._state_view.pipettes.get_hardware_pipette(
pipette_id=pipette_id,
Expand Down Expand Up @@ -199,11 +205,15 @@ async def aspirate_in_place(
pipette_id: str,
volume: float,
flow_rate: float,
command_note_adder: CommandNoteAdder,
) -> float:
"""Virtually aspirate (no-op)."""
self._validate_tip_attached(pipette_id=pipette_id, command_name="aspirate")
return _validate_aspirate_volume(
state_view=self._state_view, pipette_id=pipette_id, aspirate_volume=volume
state_view=self._state_view,
pipette_id=pipette_id,
aspirate_volume=volume,
command_note_adder=command_note_adder,
)

async def dispense_in_place(
Expand Down Expand Up @@ -252,7 +262,10 @@ def create_pipetting_handler(


def _validate_aspirate_volume(
state_view: StateView, pipette_id: str, aspirate_volume: float
state_view: StateView,
pipette_id: str,
aspirate_volume: float,
command_note_adder: CommandNoteAdder,
) -> float:
"""Get whether the given volume is valid to aspirate right now.
Expand Down Expand Up @@ -285,7 +298,21 @@ def _validate_aspirate_volume(
),
)
else:
return min(aspirate_volume, available_volume)
volume_to_aspirate = min(aspirate_volume, available_volume)
if volume_to_aspirate < aspirate_volume:
command_note_adder(
CommandNote(
noteKind="warning",
shortMessage=f"Aspirate clamped to {available_volume} µL",
longMessage=(
f"Command requested to aspirate {aspirate_volume} µL but only"
f" {available_volume} µL were available in the pipette. This is"
" probably a floating point artifact."
),
source="execution",
)
)
return volume_to_aspirate


def _validate_dispense_volume(
Expand Down
5 changes: 5 additions & 0 deletions api/src/opentrons/protocol_engine/notes/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
"""Protocol engine notes module."""

from .notes import NoteKind, CommandNote, CommandNoteAdder, CommandNoteTracker

__all__ = ["NoteKind", "CommandNote", "CommandNoteAdder", "CommandNoteTracker"]
Loading

0 comments on commit ed31522

Please sign in to comment.