Skip to content

Commit

Permalink
givet tool sensors output options
Browse files Browse the repository at this point in the history
  • Loading branch information
caila-marashaj committed Mar 15, 2024
1 parent a844c66 commit d0c2b5c
Showing 1 changed file with 121 additions and 29 deletions.
150 changes: 121 additions & 29 deletions hardware/opentrons_hardware/hardware_control/tool_sensors.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,17 @@
"""Functions for commanding motion limited by tool sensors."""
import asyncio
from functools import partial
from typing import Union, List, Iterator, Tuple, Dict, Callable, AsyncContextManager
from typing import (
Union,
List,
Iterator,
Tuple,
Dict,
Callable,
AsyncContextManager,
Optional,
)
from enum import Enum, auto
from logging import getLogger
from numpy import float64
from math import copysign
Expand All @@ -22,6 +32,19 @@
SensorDataType,
sensor_fixed_point_conversion,
)
from opentrons_hardware.firmware_bindings.messages.payloads import (
BindSensorOutputRequestPayload,
SendAccumulatedPressureDataPayload,
)
from opentrons_hardware.firmware_bindings.messages.message_definitions import (
BindSensorOutputRequest,
SendAccumulatedPressureDataRequest,
)
from opentrons_hardware.firmware_bindings.messages.fields import (
SensorOutputBindingField,
SensorTypeField,
SensorIdField,
)
from opentrons_hardware.sensors.sensor_types import SensorInformation, PressureSensor
from opentrons_hardware.sensors.scheduler import SensorScheduler
from opentrons_hardware.sensors.utils import SensorThresholdInformation
Expand All @@ -48,7 +71,11 @@ def _build_pass_step(
speed: Dict[NodeId, float],
stop_condition: MoveStopCondition = MoveStopCondition.sync_line,
) -> MoveGroupStep:
return create_step(
pipette_nodes = [
i for i in movers if i in [NodeId.pipette_left, NodeId.pipette_right]
]

move_group = create_step(
distance={ax: float64(abs(distance[ax])) for ax in movers},
velocity={
ax: float64(speed[ax] * copysign(1.0, distance[ax])) for ax in movers
Expand All @@ -60,6 +87,65 @@ def _build_pass_step(
present_nodes=movers,
stop_condition=stop_condition,
)
pipette_move = create_step(
distance={ax: float64(abs(distance[ax])) for ax in movers},
velocity={
ax: float64(speed[ax] * copysign(1.0, distance[ax])) for ax in movers
},
acceleration={},
# use any node present to calculate duration of the move, assuming the durations
# will be the same
duration=float64(abs(distance[movers[0]] / speed[movers[0]])),
present_nodes=pipette_nodes,
stop_condition=MoveStopCondition.sensor_report,
)
print(f"Move group for probe {move_group}")
for node in pipette_nodes:
move_group[node] = pipette_move[node]
print(f"Move group for probe {move_group}")
return move_group


async def run_pass_output_to_csv(
messenger: CanMessenger,
sensor_driver: SensorDriver,
pressure_sensor: PressureSensor,
mount_speed: float,
plunger_speed: float,
threshold_pascals: float,
head_node: NodeId,
move_group: MoveGroupRunner,
) -> Dict[NodeId, MotorPositionStatus]:
file_heading = [
"time(s)",
"Pressure(pascals)",
"z_velocity(mm/s)",
"plunger_velocity(mm/s)",
"threshold(pascals)",
]
sensor_metadata = [0, 0, mount_speed, plunger_speed, threshold_pascals]
sensor_capturer = LogListener(
mount=head_node,
data_file="/var/pressure_sensor_data.csv",
file_heading=file_heading,
sensor_metadata=sensor_metadata,
)
binding = [SensorOutputBinding.sync, SensorOutputBinding.report]

async with sensor_driver.bind_output(messenger, pressure_sensor, binding):
messenger.add_listener(sensor_capturer, None)

async with sensor_capturer:
positions = await move_group.run(can_messenger=messenger)
messenger.remove_listener(sensor_capturer)

return positions


class OutputOptions(Enum):
write_to_csv = auto()
can_bus_only = auto()
none = auto()


async def liquid_probe(
Expand All @@ -70,12 +156,15 @@ async def liquid_probe(
plunger_speed: float,
mount_speed: float,
threshold_pascals: float,
log_pressure: bool = True,
output_format: OutputOptions = OutputOptions.can_bus_only,
data_file: Optional[str] = None,
auto_zero_sensor: bool = True,
num_baseline_reads: int = 10,
sensor_id: SensorId = SensorId.S0,
canbus_output: bool = True,
) -> Dict[NodeId, MotorPositionStatus]:
"""Move the mount and pipette simultaneously while reading from the pressure sensor."""
log_file: str = "/var/pressure_sensor_data.csv" if not data_file else data_file
sensor_driver = SensorDriver()
threshold_fixed_point = threshold_pascals * sensor_fixed_point_conversion
pressure_sensor = PressureSensor.build(
Expand All @@ -90,7 +179,6 @@ async def liquid_probe(
)
LOG.debug(f"found baseline pressure: {pressure_baseline} pascals")

binding = [SensorOutputBinding.sync]
await sensor_driver.send_stop_threshold(messenger, pressure_sensor)

sensor_group = _build_pass_step(
Expand All @@ -102,33 +190,37 @@ async def liquid_probe(

sensor_runner = MoveGroupRunner(move_groups=[[sensor_group]])

if log_pressure:
file_heading = [
"time(s)",
"Pressure(pascals)",
"z_velocity(mm/s)",
"plunger_velocity(mm/s)",
"threshold(pascals)",
]
sensor_metadata = [0, 0, mount_speed, plunger_speed, threshold_pascals]
sensor_capturer = LogListener(
mount=head_node,
data_file="/var/pressure_sensor_data.csv",
file_heading=file_heading,
sensor_metadata=sensor_metadata,
if output_format == OutputOptions.write_to_csv:
return await run_pass_output_to_csv(
messenger,
sensor_driver,
pressure_sensor,
mount_speed,
plunger_speed,
threshold_pascals,
head_node,
sensor_runner,
)
binding.append(SensorOutputBinding.report)

async with sensor_driver.bind_output(messenger, pressure_sensor, binding):
if log_pressure:
messenger.add_listener(sensor_capturer, None)

async with sensor_capturer:
positions = await sensor_runner.run(can_messenger=messenger)
messenger.remove_listener(sensor_capturer)

else:
positions = await sensor_runner.run(can_messenger=messenger)
elif output_format == OutputOptions.can_bus_only:
async with sensor_driver.bind_output(
messenger,
pressure_sensor,
[
SensorOutputBinding.sync,
SensorOutputBinding.report,
],
):
return await sensor_runner.run(can_messenger=messenger)
else: # none
async with sensor_driver.bind_output(
messenger,
pressure_sensor,
[
SensorOutputBinding.sync,
],
):
return await sensor_runner.run(can_messenger=messenger)

return positions

Expand Down

0 comments on commit d0c2b5c

Please sign in to comment.