Skip to content

Commit

Permalink
fix(api): clean up tip motor distance caching/usage (#14156)
Browse files Browse the repository at this point in the history
  • Loading branch information
fsinapi authored Dec 8, 2023
1 parent 2053ac9 commit d26e72b
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 27 deletions.
40 changes: 26 additions & 14 deletions api/src/opentrons/hardware_control/backends/ot3controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -735,13 +735,19 @@ async def home_tip_motors(
if not self._feature_flags.stall_detection_enabled
else False,
)
positions = await runner.run(can_messenger=self._messenger)
if NodeId.pipette_left in positions:
self._gear_motor_position = {
NodeId.pipette_left: positions[NodeId.pipette_left].motor_position
}
else:
log.debug("no position returned from NodeId.pipette_left")
try:
positions = await runner.run(can_messenger=self._messenger)
if NodeId.pipette_left in positions:
self._gear_motor_position = {
NodeId.pipette_left: positions[NodeId.pipette_left].motor_position
}
else:
log.debug("no position returned from NodeId.pipette_left")
self._gear_motor_position = {}
except Exception as e:
log.error("Clearing tip motor position due to failed movement")
self._gear_motor_position = {}
raise e

async def tip_action(
self,
Expand All @@ -755,13 +761,19 @@ async def tip_action(
if not self._feature_flags.stall_detection_enabled
else False,
)
positions = await runner.run(can_messenger=self._messenger)
if NodeId.pipette_left in positions:
self._gear_motor_position = {
NodeId.pipette_left: positions[NodeId.pipette_left].motor_position
}
else:
log.debug("no position returned from NodeId.pipette_left")
try:
positions = await runner.run(can_messenger=self._messenger)
if NodeId.pipette_left in positions:
self._gear_motor_position = {
NodeId.pipette_left: positions[NodeId.pipette_left].motor_position
}
else:
log.debug("no position returned from NodeId.pipette_left")
self._gear_motor_position = {}
except Exception as e:
log.error("Clearing tip motor position due to failed movement")
self._gear_motor_position = {}
raise e

@requires_update
@requires_estop
Expand Down
36 changes: 23 additions & 13 deletions api/src/opentrons/hardware_control/ot3api.py
Original file line number Diff line number Diff line change
Expand Up @@ -905,10 +905,11 @@ async def home_gear_motors(self) -> None:
GantryLoad.HIGH_THROUGHPUT
][OT3AxisKind.Q]

max_distance = self._backend.axis_bounds[Axis.Q][1]
# if position is not known, move toward limit switch at a constant velocity
if not any(self._backend.gear_motor_position):
if len(self._backend.gear_motor_position.keys()) == 0:
await self._backend.home_tip_motors(
distance=self._backend.axis_bounds[Axis.Q][1],
distance=max_distance,
velocity=homing_velocity,
)
return
Expand All @@ -917,7 +918,13 @@ async def home_gear_motors(self) -> None:
Axis.P_L
]

if current_pos_float > self._config.safe_home_distance:
# We filter out a distance more than `max_distance` because, if the tip motor was stopped during
# a slow-home motion, the position may be stuck at an enormous large value.
if (
current_pos_float > self._config.safe_home_distance
and current_pos_float < max_distance
):

fast_home_moves = self._build_moves(
{Axis.Q: current_pos_float}, {Axis.Q: self._config.safe_home_distance}
)
Expand All @@ -931,7 +938,9 @@ async def home_gear_motors(self) -> None:

# move until the limit switch is triggered, with no acceleration
await self._backend.home_tip_motors(
distance=(current_pos_float + self._config.safe_home_distance),
distance=min(
current_pos_float + self._config.safe_home_distance, max_distance
),
velocity=homing_velocity,
)

Expand Down Expand Up @@ -2001,15 +2010,16 @@ async def get_tip_presence_status(
Check tip presence status. If a high throughput pipette is present,
move the tip motors down before checking the sensor status.
"""
real_mount = OT3Mount.from_mount(mount)
async with contextlib.AsyncExitStack() as stack:
if (
real_mount == OT3Mount.LEFT
and self._gantry_load == GantryLoad.HIGH_THROUGHPUT
):
await stack.enter_async_context(self._high_throughput_check_tip())
result = await self._backend.get_tip_status(real_mount)
return result
async with self._motion_lock:
real_mount = OT3Mount.from_mount(mount)
async with contextlib.AsyncExitStack() as stack:
if (
real_mount == OT3Mount.LEFT
and self._gantry_load == GantryLoad.HIGH_THROUGHPUT
):
await stack.enter_async_context(self._high_throughput_check_tip())
result = await self._backend.get_tip_status(real_mount)
return result

async def verify_tip_presence(
self, mount: Union[top_types.Mount, OT3Mount], expected: TipStateType
Expand Down

0 comments on commit d26e72b

Please sign in to comment.