Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(hardware, api): check motor engaged status #14479

Merged
merged 10 commits into from
Mar 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,14 @@ def engaged_axes(self) -> OT3AxisMap[bool]:
"""Get engaged axes."""
...

async def update_engaged_axes(self) -> None:
"""Update engaged axes."""
...

async def is_motor_engaged(self, axis: Axis) -> bool:
"""Check if axis is enabled."""
...

async def disengage_axes(self, axes: List[Axis]) -> None:
"""Disengage axes."""
...
Expand Down
56 changes: 40 additions & 16 deletions api/src/opentrons/hardware_control/backends/ot3controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
set_disable_motor,
set_enable_tip_motor,
set_disable_tip_motor,
get_motor_enabled,
)
from opentrons_hardware.hardware_control.motor_position_status import (
get_motor_position,
Expand Down Expand Up @@ -259,6 +260,7 @@ class OT3Controller(FlexBackend):
_encoder_position: Dict[NodeId, float]
_motor_status: Dict[NodeId, MotorStatus]
_subsystem_manager: SubsystemManager
_engaged_axes: OT3AxisMap[bool]

@classmethod
async def build(
Expand Down Expand Up @@ -334,6 +336,7 @@ def __init__(
self._gear_motor_position: Dict[NodeId, float] = {}
self._encoder_position = self._get_home_position()
self._motor_status = {}
self._engaged_axes = {}
self._check_updates = check_updates
self._initialized = False
self._status_bar = status_bar.StatusBar(messenger=self._usb_messenger)
Expand Down Expand Up @@ -1157,37 +1160,58 @@ async def watch(self, loop: asyncio.AbstractEventLoop) -> None:
def axis_bounds(self) -> OT3AxisMap[Tuple[float, float]]:
"""Get the axis bounds."""
# TODO (AL, 2021-11-18): The bounds need to be defined
phony_bounds = (0, 500)
return {
Axis.Z_L: phony_bounds,
Axis.Z_R: phony_bounds,
Axis.P_L: phony_bounds,
Axis.P_R: phony_bounds,
Axis.X: phony_bounds,
Axis.Y: phony_bounds,
Axis.Z_G: phony_bounds,
Axis.Q: phony_bounds,
Axis.Z_L: (0, 300),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how close to reality are these bounds?

Axis.Z_R: (0, 300),
Axis.P_L: (0, 200),
Axis.P_R: (0, 200),
Axis.X: (0, 550),
Axis.Y: (0, 550),
Axis.Z_G: (0, 300),
Axis.Q: (0, 200),
}

def engaged_axes(self) -> OT3AxisMap[bool]:
"""Get engaged axes."""
return {}
return self._engaged_axes

async def update_engaged_axes(self) -> None:
"""Update engaged axes."""
motor_nodes = self._motor_nodes()
results = await get_motor_enabled(self._messenger, motor_nodes)
for node, status in results.items():
self._engaged_axes[node_to_axis(node)] = status

async def is_motor_engaged(self, axis: Axis) -> bool:
node = axis_to_node(axis)
result = await get_motor_enabled(self._messenger, {node})
engaged = result[node]
self._engaged_axes.update({axis: engaged})
return engaged

async def disengage_axes(self, axes: List[Axis]) -> None:
"""Disengage axes."""
if Axis.Q in axes:
await set_disable_tip_motor(self._messenger, {axis_to_node(Axis.Q)})
nodes = {axis_to_node(ax) for ax in axes if ax is not Axis.Q}
if len(nodes) > 0:
await set_disable_motor(self._messenger, nodes)
self._engaged_axes[Axis.Q] = False
axes = [ax for ax in axes if ax is not Axis.Q]

if len(axes) > 0:
await set_disable_motor(self._messenger, {axis_to_node(ax) for ax in axes})
for ax in axes:
self._engaged_axes[ax] = False

async def engage_axes(self, axes: List[Axis]) -> None:
"""Engage axes."""
if Axis.Q in axes:
await set_enable_tip_motor(self._messenger, {axis_to_node(Axis.Q)})
nodes = {axis_to_node(ax) for ax in axes if ax is not Axis.Q}
if len(nodes) > 0:
await set_enable_motor(self._messenger, nodes)
self._engaged_axes[Axis.Q] = True
axes = [ax for ax in axes if ax is not Axis.Q]

if len(axes) > 0:
await set_enable_motor(self._messenger, {axis_to_node(ax) for ax in axes})
for ax in axes:
self._engaged_axes[ax] = True

@requires_update
async def set_lights(self, button: Optional[bool], rails: Optional[bool]) -> None:
Expand Down
20 changes: 19 additions & 1 deletion api/src/opentrons/hardware_control/backends/ot3simulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ class OT3Simulator(FlexBackend):
_position: Dict[Axis, float]
_encoder_position: Dict[Axis, float]
_motor_status: Dict[Axis, MotorStatus]
_engaged_axes: Dict[Axis, bool]

@classmethod
async def build(
Expand Down Expand Up @@ -148,6 +149,7 @@ def __init__(
self._initialized = False
self._lights = {"button": False, "rails": False}
self._gear_motor_position: Dict[Axis, float] = {}
self._engaged_axes: Dict[Axis, bool] = {}
self._feature_flags = feature_flags or HardwareFeatureFlags()

def _sanitize_attached_instrument(
Expand Down Expand Up @@ -374,6 +376,8 @@ async def move(
Returns:
None
"""
for ax in origin:
self._engaged_axes[ax] = True
self._position.update(target)
self._encoder_position.update(target)

Expand All @@ -396,6 +400,7 @@ async def home(
for h in homed:
self._position[h] = self._get_home_position()[h]
self._motor_status[h] = MotorStatus(True, True)
self._engaged_axes[h] = True
return axis_pad(self._position, 0.0)

@ensure_yield
Expand Down Expand Up @@ -643,16 +648,29 @@ async def update_firmware(

def engaged_axes(self) -> OT3AxisMap[bool]:
"""Get engaged axes."""
return {}
return self._engaged_axes

async def update_engaged_axes(self) -> None:
"""Update engaged axes."""
return None

async def is_motor_engaged(self, axis: Axis) -> bool:
if axis not in self._engaged_axes.keys():
return False
return self._engaged_axes[axis]

@ensure_yield
async def disengage_axes(self, axes: List[Axis]) -> None:
"""Disengage axes."""
for ax in axes:
self._engaged_axes.update({ax: False})
return None

@ensure_yield
async def engage_axes(self, axes: List[Axis]) -> None:
"""Engage axes."""
for ax in axes:
self._engaged_axes.update({ax: True})
return None

@ensure_yield
Expand Down
124 changes: 68 additions & 56 deletions api/src/opentrons/hardware_control/ot3api.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,13 +259,15 @@ def is_idle_mount(self, mount: Union[top_types.Mount, OT3Mount]) -> bool:
the last moved mount.
"""
realmount = OT3Mount.from_mount(mount)
if not self._last_moved_mount or realmount == self._last_moved_mount:
return False

return (
if realmount == OT3Mount.GRIPPER or (
realmount == OT3Mount.LEFT
and self._gantry_load == GantryLoad.HIGH_THROUGHPUT
) or (realmount == OT3Mount.GRIPPER)
):
ax = Axis.by_mount(realmount)
if ax in self.engaged_axes.keys():
return not self.engaged_axes[ax]

return False

@property
def door_state(self) -> DoorState:
Expand Down Expand Up @@ -1316,29 +1318,33 @@ async def _cache_and_maybe_retract_mount(self, mount: OT3Mount) -> None:
the 96-channel or gripper mount if it is about to move.
"""
last_moved = self._last_moved_mount
if self.is_idle_mount(mount):
# home the left/gripper mount if it is current disengaged
await self.home_z(mount)

if mount != last_moved and last_moved:
await self.retract(last_moved, 10)

# disengage Axis.Z_L motor and engage the brake to lower power
# consumption and reduce the chance of the 96-channel pipette dropping
if (
self.gantry_load == GantryLoad.HIGH_THROUGHPUT
and last_moved == OT3Mount.LEFT
):
await self.disengage_axes([Axis.Z_L])
# if gripper exists and it's not the moving mount, it should retract
if (
self.has_gripper()
and mount != OT3Mount.GRIPPER
and not self.is_idle_mount(OT3Mount.GRIPPER)
):
await self.retract(OT3Mount.GRIPPER, 10)
await self.disengage_axes([Axis.Z_G])
await self.idle_gripper()

# disegnage Axis.Z_G when we can to reduce the chance of
# the gripper dropping
if last_moved == OT3Mount.GRIPPER:
await self.disengage_axes([Axis.Z_G])
# if 96-channel pipette is attached and not being moved, it should retract
if (
mount != OT3Mount.LEFT
and self._gantry_load == GantryLoad.HIGH_THROUGHPUT
and not self.is_idle_mount(OT3Mount.LEFT)
):
await self.retract(OT3Mount.LEFT, 10)
await self.disengage_axes([Axis.Z_L])

if mount != OT3Mount.GRIPPER:
await self.idle_gripper()
# if the last moved mount is not covered in neither of the above scenario,
# simply retract the last moved mount
if last_moved and not self.is_idle_mount(last_moved) and mount != last_moved:
await self.retract(last_moved, 10)

# finally, home the current left/gripper mount to prepare for movement
if self.is_idle_mount(mount):
await self.home_z(mount)
self._last_moved_mount = mount

async def prepare_for_mount_movement(
Expand Down Expand Up @@ -1478,6 +1484,22 @@ async def _retrieve_home_position(
target_pos = {axis: self._backend.home_position()[axis]}
return origin_pos, target_pos

async def _enable_before_update_estimation(self, axis: Axis) -> None:
enabled = await self._backend.is_motor_engaged(axis)

if not enabled:
if axis == Axis.Z_L and self.gantry_load == GantryLoad.HIGH_THROUGHPUT:
# we're here if the left mount has been idle and the brake is engaged
# we want to temporarily increase its hold current to prevent the z
# stage from dropping when switching off the ebrake
async with self._backend.increase_z_l_hold_current():
await self.engage_axes([axis])
else:
await self.engage_axes([axis])

# now that motor is enabled, we can update position estimation
await self._update_position_estimation([axis])

@_adjust_high_throughput_z_current
async def _home_axis(self, axis: Axis) -> None:
"""
Expand All @@ -1499,22 +1521,12 @@ async def _home_axis(self, axis: Axis) -> None:
assert axis not in [Axis.G, Axis.Q]

encoder_ok = self._backend.check_encoder_status([axis])
motor_ok = self._backend.check_motor_status([axis])

if encoder_ok:
# ensure stepper position can be updated after boot
if axis == Axis.Z_L and self.gantry_load == GantryLoad.HIGH_THROUGHPUT:
# we're here if the left mount has been idle and the brake is engaged
# we want to temporarily increase its hold current to prevent the z
# stage from dropping when switching off the ebrake
async with self._backend.increase_z_l_hold_current():
await self.engage_axes([axis])
else:
await self.engage_axes([axis])
await self._update_position_estimation([axis])
# refresh motor and encoder statuses after position estimation update
motor_ok = self._backend.check_motor_status([axis])
encoder_ok = self._backend.check_encoder_status([axis])
# enable motor (if needed) and update estimation
await self._enable_before_update_estimation(axis)

# refresh motor status after position estimation update
motor_ok = self._backend.check_motor_status([axis])

if Axis.to_kind(axis) == OT3AxisKind.P:
await self._set_plunger_current_and_home(axis, motor_ok, encoder_ok)
Expand Down Expand Up @@ -1553,22 +1565,21 @@ async def _home_axis(self, axis: Axis) -> None:

async def _home(self, axes: Sequence[Axis]) -> None:
"""Home one axis at a time."""
async with self._motion_lock:
for axis in axes:
try:
if axis == Axis.G:
await self.home_gripper_jaw()
elif axis == Axis.Q:
await self._backend.home([axis], self.gantry_load)
else:
await self._home_axis(axis)
except BaseException as e:
self._log.exception(f"Homing failed: {e}")
self._current_position.clear()
raise
for axis in axes:
try:
if axis == Axis.G:
await self.home_gripper_jaw()
elif axis == Axis.Q:
await self._backend.home([axis], self.gantry_load)
else:
await self._cache_current_position()
await self._cache_encoder_position()
await self._home_axis(axis)
except BaseException as e:
self._log.exception(f"Homing failed: {e}")
self._current_position.clear()
raise
else:
await self._cache_current_position()
await self._cache_encoder_position()

@ExecutionManagerProvider.wait_for_running
async def home(
Expand Down Expand Up @@ -1599,7 +1610,8 @@ async def home(
if (ax in checked_axes and self._backend.axis_is_present(ax))
]
self._log.info(f"home was called with {axes} generating sequence {home_seq}")
await self._home(home_seq)
async with self._motion_lock:
await self._home(home_seq)

def get_engaged_axes(self) -> Dict[Axis, bool]:
"""Which axes are engaged and holding."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ class MessageId(int, Enum):
error_message = 0x02

get_status_request = 0x01
get_gear_status_response = 0x4
get_status_response = 0x05

enable_motor_request = 0x06
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,17 @@ class GetStatusResponse(BaseMessage): # noqa: D101
message_id: Literal[MessageId.get_status_response] = MessageId.get_status_response


@dataclass
class GearStatusResponse(BaseMessage): # noqa: D101
payload: payloads.GetStatusResponsePayload
payload_type: Type[
payloads.GetStatusResponsePayload
] = payloads.GetStatusResponsePayload
message_id: Literal[
MessageId.get_gear_status_response
] = MessageId.get_gear_status_response


@dataclass
class MoveRequest(BaseMessage): # noqa: D101
payload: payloads.MoveRequestPayload
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
defs.StopRequest,
defs.GetStatusRequest,
defs.GetStatusResponse,
defs.GearStatusResponse,
defs.EnableMotorRequest,
defs.DisableMotorRequest,
defs.MoveRequest,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,6 @@ class GetStatusResponsePayload(EmptyPayload):
"""Get status response."""

status: utils.UInt8Field
data: utils.UInt32Field


@dataclass(eq=False)
Expand Down
Loading
Loading