From 61681a245f27406d99a600a0a3645999794e0028 Mon Sep 17 00:00:00 2001 From: vegano1 Date: Sat, 2 Mar 2024 19:09:28 -0500 Subject: [PATCH] fix(api, hardware-testing): Changes to allow hepa lifetime on the Flex devkit - dont assert when there are no motor nodes - parameterize ot3 reset for build ot3 async api --- .../backends/ot3controller.py | 6 +- .../opentrons_api/helpers_ot3.py | 4 +- .../scripts/hepa_uv_lifetime_test.py | 92 +++++++++++++++---- 3 files changed, 79 insertions(+), 23 deletions(-) diff --git a/api/src/opentrons/hardware_control/backends/ot3controller.py b/api/src/opentrons/hardware_control/backends/ot3controller.py index 31d6642fcfb..82cf81c206b 100644 --- a/api/src/opentrons/hardware_control/backends/ot3controller.py +++ b/api/src/opentrons/hardware_control/backends/ot3controller.py @@ -488,9 +488,9 @@ def update_feature_flags(self, feature_flags: HardwareFeatureFlags) -> None: async def update_motor_status(self) -> None: """Retreieve motor and encoder status and position from all present nodes""" motor_nodes = self._motor_nodes() - assert len(motor_nodes) - response = await get_motor_position(self._messenger, motor_nodes) - self._handle_motor_status_response(response) + if motor_nodes: + response = await get_motor_position(self._messenger, motor_nodes) + self._handle_motor_status_response(response) async def update_motor_estimation(self, axes: Sequence[Axis]) -> None: """Update motor position estimation for commanded nodes, and update cache of data.""" diff --git a/hardware-testing/hardware_testing/opentrons_api/helpers_ot3.py b/hardware-testing/hardware_testing/opentrons_api/helpers_ot3.py index 4beae74bdd9..c3dedda0607 100644 --- a/hardware-testing/hardware_testing/opentrons_api/helpers_ot3.py +++ b/hardware-testing/hardware_testing/opentrons_api/helpers_ot3.py @@ -224,6 +224,7 @@ async def build_async_ot3_hardware_api( gripper: Optional[str] = None, loop: Optional[asyncio.AbstractEventLoop] = None, stall_detection_enable: Optional[bool] = None, + reset_ot3_api: Optional[bool] = True, ) -> OT3API: """Built an OT3 Hardware API instance.""" if stall_detection_enable is not None: @@ -258,7 +259,8 @@ async def build_async_ot3_hardware_api( print(e) kwargs["use_usb_bus"] = False # type: ignore[assignment] api = await builder(loop=loop, **kwargs) # type: ignore[arg-type] - await reset_api(api) + if reset_ot3_api: + await reset_api(api) return api diff --git a/hardware-testing/hardware_testing/scripts/hepa_uv_lifetime_test.py b/hardware-testing/hardware_testing/scripts/hepa_uv_lifetime_test.py index eb829223d05..ab18ca2b948 100644 --- a/hardware-testing/hardware_testing/scripts/hepa_uv_lifetime_test.py +++ b/hardware-testing/hardware_testing/scripts/hepa_uv_lifetime_test.py @@ -5,11 +5,11 @@ import datetime import logging import logging.config +import subprocess -from typing import Optional, cast, Dict +from typing import Any, Optional, Dict from hardware_testing.opentrons_api import helpers_ot3 -from opentrons.hardware_control.backends.ot3controller import OT3Controller from opentrons.hardware_control.ot3api import OT3API from opentrons.hardware_control.types import ( SubSystem, @@ -26,7 +26,15 @@ MAX_UV_DOSAGE: int = 60 * 60 # 1hr max dosage DEFAULT_CYCLES: int = 1 -log = logging.getLogger(__name__) + +# Get the name of the robot +try: + ROBOT_NAME = subprocess.check_output(["hostnamectl", "--pretty"]).decode().strip() +except Exception: + ROBOT_NAME = "HepaLifetime" + + +log = logging.getLogger(ROBOT_NAME) async def _turn_off_hepa_uv(api: OT3API) -> None: @@ -51,7 +59,7 @@ async def run_hepa_fan( on_time: int, off_time: int, cycles: int, -) -> None: +) -> Optional[Dict[str, Any]]: """Coroutine that will run the hepa fan.""" fan_duty_cycle = max(0, min(duty_cycle, MAX_DUTY_CYCLE)) fan_on_time = on_time if on_time > 0 else 0 @@ -61,7 +69,7 @@ async def run_hepa_fan( # Dont run task if there are no valid parameters if not fan_on_time and not fan_off_time: - return + return None log.info( f"Hepa Task: Starting - duty_cycle={fan_duty_cycle}, " @@ -69,6 +77,10 @@ async def run_hepa_fan( f"run_forever: {run_forever}" ) + # record the result + RESULT = { + "HEPA": {cycle + 1: None for cycle in range(cycles)} + } fan_on: bool = False cycle: int = 1 while True: @@ -78,23 +90,34 @@ async def run_hepa_fan( break # on time - if not fan_on: + if not fan_on or fan_on_time: fan_on = True log.info(f"Hepa Task: cycle {cycle}") msg = "forever" if run_forever else f"for {fan_on_time} seconds" log.info(f"Hepa Task: Turning on fan {msg}") - await api.set_hepa_fan_state(turn_on=True, duty_cycle=fan_duty_cycle) + success = await api.set_hepa_fan_state( + turn_on=True, duty_cycle=fan_duty_cycle + ) + RESULT["HEPA"][cycle] = success # type: ignore + if not success: + log.error("Hepa Task: FAILED to turn ON fan.") + break await asyncio.sleep(fan_on_time) # off time if fan_off_time: log.info(f"Hepa Task: Turning off fan for {fan_off_time} seconds") - await api.set_hepa_fan_state(turn_on=False, duty_cycle=0) + success = await api.set_hepa_fan_state(turn_on=False, duty_cycle=0) + RESULT["HEPA"][cycle] = success # type: ignore + if not success: + log.error("Hepa Task: FAILED to turn OFF fan.") + break fan_on = False # sleep and increment the cycle await asyncio.sleep(fan_off_time or 1) if not run_forever: + # record result cycle += 1 except asyncio.CancelledError: break @@ -104,9 +127,12 @@ async def run_hepa_fan( elapsed_time = datetime.datetime.now() - start_time log.info(f"Hepa Task: Elapsed time={elapsed_time}") + return RESULT -async def run_hepa_uv(api: OT3API, on_time: int, off_time: int, cycles: int) -> None: +async def run_hepa_uv( + api: OT3API, on_time: int, off_time: int, cycles: int +) -> Optional[Dict[str, Any]]: """Coroutine that will run the hepa uv light.""" light_on_time = max(0, min(on_time, MAX_UV_DOSAGE)) light_off_time = off_time if off_time > 0 else 0 @@ -114,18 +140,21 @@ async def run_hepa_uv(api: OT3API, on_time: int, off_time: int, cycles: int) -> # Dont run task if there are no valid parameters if not light_on_time and not light_off_time: - return + return None if api.door_state == DoorState.OPEN: log.warning("UV Task: Flex Door must be closed to operate the UV light") - return + return None log.info( f"Hepa UV Task: Starting - on_time={light_on_time}s, " f"off_time={light_off_time}s, cycles={cycles}" ) - log.info("===========================================") + # record the result + RESULT = { + "UV": {cycle + 1: None for cycle in range(cycles)} + } uv_light_on: bool = False cycle: int = 1 while True: @@ -135,13 +164,19 @@ async def run_hepa_uv(api: OT3API, on_time: int, off_time: int, cycles: int) -> break # on time - if not uv_light_on: + if not uv_light_on or light_on_time: uv_light_on = True log.info(f"UV Task: cycle number={cycle}") log.info( f"UV Task: Turning on the UV Light for {light_on_time} seconds" ) - await api.set_hepa_uv_state(turn_on=True, uv_duration_s=light_on_time) + success = await api.set_hepa_uv_state( + turn_on=True, uv_duration_s=light_on_time + ) + RESULT["UV"][cycle] = success # type: ignore + if not success: + log.error("UV Task: FAILED to turned ON uv light.") + break await asyncio.sleep(light_on_time) # off time @@ -149,7 +184,11 @@ async def run_hepa_uv(api: OT3API, on_time: int, off_time: int, cycles: int) -> log.info( f"UV Task: Turning off the UV Light for {light_off_time} seconds" ) - await api.set_hepa_uv_state(turn_on=False, uv_duration_s=0) + success = await api.set_hepa_uv_state(turn_on=False, uv_duration_s=0) + RESULT["UV"][cycle] = success # type: ignore + if not success: + log.error("UV Task: FAILED to turned OFF uv light.") + break uv_light_on = False # Sleep and increment the cycle @@ -163,6 +202,7 @@ async def run_hepa_uv(api: OT3API, on_time: int, off_time: int, cycles: int) -> elapsed_time = datetime.datetime.now() - start_time log.info(f"UV Task: Elapsed time={elapsed_time}") + return RESULT async def _control_task( @@ -177,6 +217,7 @@ async def _control_task( uv_task.cancel() if uv_task.done() and hepa_task.done(): + log.info("Control Task: No more running tasks") break await asyncio.sleep(1) @@ -184,16 +225,17 @@ async def _control_task( async def _main(args: argparse.Namespace) -> None: api = await helpers_ot3.build_async_ot3_hardware_api( - is_simulating=args.is_simulating + is_simulating=args.is_simulating, + reset_ot3_api=False, ) - # Scan for subsystems and make sure we have a hepa/uv module if not simulating + # Make sure we have a hepa/uv module if not simulating if not args.is_simulating: - await cast(OT3Controller, api._backend).probe_network() assert ( SubSystem.hepa_uv in api.attached_subsystems ), "No Hepa/UV module detected!" + log.info(f"=============== {ROBOT_NAME} ==========================") # Make sure everything is off before we start testing await _turn_off_hepa_uv(api) @@ -209,11 +251,21 @@ async def _main(args: argparse.Namespace) -> None: control_task = asyncio.create_task(_control_task(api, hepa_fan_task, hepa_uv_task)) # start the tasks + results = [] try: - await asyncio.gather(control_task, hepa_fan_task, hepa_uv_task) + results = await asyncio.gather(control_task, hepa_fan_task, hepa_uv_task) finally: # Make sure we always turn OFF everything! await _turn_off_hepa_uv(api) + log.info("===================== RESULT ======================") + for result in results: + if result is None: + continue + for task, data in result.items(): + for cycle, success in data.items(): + msg = "PASSED" if success else "FAILED" + log.info(f"{task}: cycle={cycle} result={msg}") + log.info("===================== RESULT =======================") def log_config(log_level: int) -> Dict: @@ -322,3 +374,5 @@ def log_config(log_level: int) -> Dict: asyncio.run(_main(args)) except KeyboardInterrupt: log.warning("KeyBoard Interrupt") + finally: + log.info("Exiting...")