Skip to content

Commit

Permalink
fix(api, hardware-testing): Changes to allow hepa lifetime on the Fle…
Browse files Browse the repository at this point in the history
…x devkit

- dont assert when there are no motor nodes
- parameterize ot3 reset for build ot3 async api
  • Loading branch information
vegano1 committed Mar 3, 2024
1 parent 050cf4d commit 61681a2
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 23 deletions.
6 changes: 3 additions & 3 deletions api/src/opentrons/hardware_control/backends/ot3controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -488,9 +488,9 @@ def update_feature_flags(self, feature_flags: HardwareFeatureFlags) -> None:
async def update_motor_status(self) -> None:
"""Retreieve motor and encoder status and position from all present nodes"""
motor_nodes = self._motor_nodes()
assert len(motor_nodes)
response = await get_motor_position(self._messenger, motor_nodes)
self._handle_motor_status_response(response)
if motor_nodes:
response = await get_motor_position(self._messenger, motor_nodes)
self._handle_motor_status_response(response)

async def update_motor_estimation(self, axes: Sequence[Axis]) -> None:
"""Update motor position estimation for commanded nodes, and update cache of data."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ async def build_async_ot3_hardware_api(
gripper: Optional[str] = None,
loop: Optional[asyncio.AbstractEventLoop] = None,
stall_detection_enable: Optional[bool] = None,
reset_ot3_api: Optional[bool] = True,
) -> OT3API:
"""Built an OT3 Hardware API instance."""
if stall_detection_enable is not None:
Expand Down Expand Up @@ -258,7 +259,8 @@ async def build_async_ot3_hardware_api(
print(e)
kwargs["use_usb_bus"] = False # type: ignore[assignment]
api = await builder(loop=loop, **kwargs) # type: ignore[arg-type]
await reset_api(api)
if reset_ot3_api:
await reset_api(api)
return api


Expand Down
92 changes: 73 additions & 19 deletions hardware-testing/hardware_testing/scripts/hepa_uv_lifetime_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@
import datetime
import logging
import logging.config
import subprocess

from typing import Optional, cast, Dict
from typing import Any, Optional, Dict

from hardware_testing.opentrons_api import helpers_ot3
from opentrons.hardware_control.backends.ot3controller import OT3Controller
from opentrons.hardware_control.ot3api import OT3API
from opentrons.hardware_control.types import (
SubSystem,
Expand All @@ -26,7 +26,15 @@
MAX_UV_DOSAGE: int = 60 * 60 # 1hr max dosage
DEFAULT_CYCLES: int = 1

log = logging.getLogger(__name__)

# Get the name of the robot
try:
ROBOT_NAME = subprocess.check_output(["hostnamectl", "--pretty"]).decode().strip()
except Exception:
ROBOT_NAME = "HepaLifetime"


log = logging.getLogger(ROBOT_NAME)


async def _turn_off_hepa_uv(api: OT3API) -> None:
Expand All @@ -51,7 +59,7 @@ async def run_hepa_fan(
on_time: int,
off_time: int,
cycles: int,
) -> None:
) -> Optional[Dict[str, Any]]:
"""Coroutine that will run the hepa fan."""
fan_duty_cycle = max(0, min(duty_cycle, MAX_DUTY_CYCLE))
fan_on_time = on_time if on_time > 0 else 0
Expand All @@ -61,14 +69,18 @@ async def run_hepa_fan(

# Dont run task if there are no valid parameters
if not fan_on_time and not fan_off_time:
return
return None

log.info(
f"Hepa Task: Starting - duty_cycle={fan_duty_cycle}, "
f"on_time={fan_on_time}s, off_time={fan_off_time}s, cycles={cycles}, "
f"run_forever: {run_forever}"
)

# record the result
RESULT = {
"HEPA": {cycle + 1: None for cycle in range(cycles)}
}
fan_on: bool = False
cycle: int = 1
while True:
Expand All @@ -78,23 +90,34 @@ async def run_hepa_fan(
break

# on time
if not fan_on:
if not fan_on or fan_on_time:
fan_on = True
log.info(f"Hepa Task: cycle {cycle}")
msg = "forever" if run_forever else f"for {fan_on_time} seconds"
log.info(f"Hepa Task: Turning on fan {msg}")
await api.set_hepa_fan_state(turn_on=True, duty_cycle=fan_duty_cycle)
success = await api.set_hepa_fan_state(
turn_on=True, duty_cycle=fan_duty_cycle
)
RESULT["HEPA"][cycle] = success # type: ignore
if not success:
log.error("Hepa Task: FAILED to turn ON fan.")
break
await asyncio.sleep(fan_on_time)

# off time
if fan_off_time:
log.info(f"Hepa Task: Turning off fan for {fan_off_time} seconds")
await api.set_hepa_fan_state(turn_on=False, duty_cycle=0)
success = await api.set_hepa_fan_state(turn_on=False, duty_cycle=0)
RESULT["HEPA"][cycle] = success # type: ignore
if not success:
log.error("Hepa Task: FAILED to turn OFF fan.")
break
fan_on = False

# sleep and increment the cycle
await asyncio.sleep(fan_off_time or 1)
if not run_forever:
# record result
cycle += 1
except asyncio.CancelledError:
break
Expand All @@ -104,28 +127,34 @@ async def run_hepa_fan(

elapsed_time = datetime.datetime.now() - start_time
log.info(f"Hepa Task: Elapsed time={elapsed_time}")
return RESULT


async def run_hepa_uv(api: OT3API, on_time: int, off_time: int, cycles: int) -> None:
async def run_hepa_uv(
api: OT3API, on_time: int, off_time: int, cycles: int
) -> Optional[Dict[str, Any]]:
"""Coroutine that will run the hepa uv light."""
light_on_time = max(0, min(on_time, MAX_UV_DOSAGE))
light_off_time = off_time if off_time > 0 else 0
start_time = datetime.datetime.now()

# Dont run task if there are no valid parameters
if not light_on_time and not light_off_time:
return
return None

if api.door_state == DoorState.OPEN:
log.warning("UV Task: Flex Door must be closed to operate the UV light")
return
return None

log.info(
f"Hepa UV Task: Starting - on_time={light_on_time}s, "
f"off_time={light_off_time}s, cycles={cycles}"
)
log.info("===========================================")

# record the result
RESULT = {
"UV": {cycle + 1: None for cycle in range(cycles)}
}
uv_light_on: bool = False
cycle: int = 1
while True:
Expand All @@ -135,21 +164,31 @@ async def run_hepa_uv(api: OT3API, on_time: int, off_time: int, cycles: int) ->
break

# on time
if not uv_light_on:
if not uv_light_on or light_on_time:
uv_light_on = True
log.info(f"UV Task: cycle number={cycle}")
log.info(
f"UV Task: Turning on the UV Light for {light_on_time} seconds"
)
await api.set_hepa_uv_state(turn_on=True, uv_duration_s=light_on_time)
success = await api.set_hepa_uv_state(
turn_on=True, uv_duration_s=light_on_time
)
RESULT["UV"][cycle] = success # type: ignore
if not success:
log.error("UV Task: FAILED to turned ON uv light.")
break
await asyncio.sleep(light_on_time)

# off time
if light_off_time:
log.info(
f"UV Task: Turning off the UV Light for {light_off_time} seconds"
)
await api.set_hepa_uv_state(turn_on=False, uv_duration_s=0)
success = await api.set_hepa_uv_state(turn_on=False, uv_duration_s=0)
RESULT["UV"][cycle] = success # type: ignore
if not success:
log.error("UV Task: FAILED to turned OFF uv light.")
break
uv_light_on = False

# Sleep and increment the cycle
Expand All @@ -163,6 +202,7 @@ async def run_hepa_uv(api: OT3API, on_time: int, off_time: int, cycles: int) ->

elapsed_time = datetime.datetime.now() - start_time
log.info(f"UV Task: Elapsed time={elapsed_time}")
return RESULT


async def _control_task(
Expand All @@ -177,23 +217,25 @@ async def _control_task(
uv_task.cancel()

if uv_task.done() and hepa_task.done():
log.info("Control Task: No more running tasks")
break

await asyncio.sleep(1)


async def _main(args: argparse.Namespace) -> None:
api = await helpers_ot3.build_async_ot3_hardware_api(
is_simulating=args.is_simulating
is_simulating=args.is_simulating,
reset_ot3_api=False,
)

# Scan for subsystems and make sure we have a hepa/uv module if not simulating
# Make sure we have a hepa/uv module if not simulating
if not args.is_simulating:
await cast(OT3Controller, api._backend).probe_network()
assert (
SubSystem.hepa_uv in api.attached_subsystems
), "No Hepa/UV module detected!"

log.info(f"=============== {ROBOT_NAME} ==========================")
# Make sure everything is off before we start testing
await _turn_off_hepa_uv(api)

Expand All @@ -209,11 +251,21 @@ async def _main(args: argparse.Namespace) -> None:
control_task = asyncio.create_task(_control_task(api, hepa_fan_task, hepa_uv_task))

# start the tasks
results = []
try:
await asyncio.gather(control_task, hepa_fan_task, hepa_uv_task)
results = await asyncio.gather(control_task, hepa_fan_task, hepa_uv_task)
finally:
# Make sure we always turn OFF everything!
await _turn_off_hepa_uv(api)
log.info("===================== RESULT ======================")
for result in results:
if result is None:
continue
for task, data in result.items():
for cycle, success in data.items():
msg = "PASSED" if success else "FAILED"
log.info(f"{task}: cycle={cycle} result={msg}")
log.info("===================== RESULT =======================")


def log_config(log_level: int) -> Dict:
Expand Down Expand Up @@ -322,3 +374,5 @@ def log_config(log_level: int) -> Dict:
asyncio.run(_main(args))
except KeyboardInterrupt:
log.warning("KeyBoard Interrupt")
finally:
log.info("Exiting...")

0 comments on commit 61681a2

Please sign in to comment.