-
Notifications
You must be signed in to change notification settings - Fork 179
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(hardware-testing): add life-time testing script for the Hepa/UV …
…Module. (#14538)
- Loading branch information
Showing
1 changed file
with
324 additions
and
0 deletions.
There are no files selected for viewing
324 changes: 324 additions & 0 deletions
324
hardware-testing/hardware_testing/scripts/hepa_uv_lifetime_test.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,324 @@ | ||
"""This is the life-time testing script for the Hepa/UV module.""" | ||
|
||
import asyncio | ||
import argparse | ||
import datetime | ||
import logging | ||
import logging.config | ||
|
||
from typing import Optional, cast, Dict | ||
|
||
from hardware_testing.opentrons_api import helpers_ot3 | ||
from opentrons.hardware_control.backends.ot3controller import OT3Controller | ||
from opentrons.hardware_control.ot3api import OT3API | ||
from opentrons.hardware_control.types import ( | ||
SubSystem, | ||
HepaFanState, | ||
HepaUVState, | ||
DoorState, | ||
) | ||
|
||
|
||
# Default constants | ||
DEFAULT_DUTY_CYCLE: int = 75 | ||
MAX_DUTY_CYCLE: int = 100 | ||
DEFAULT_UV_DOSAGE_DURATION: int = 900 # 15m | ||
MAX_UV_DOSAGE: int = 60 * 60 # 1hr max dosage | ||
DEFAULT_CYCLES: int = 1 | ||
|
||
log = logging.getLogger(__name__) | ||
|
||
|
||
async def _turn_off_hepa_uv(api: OT3API) -> None: | ||
"""Set and Make sure that the Hepa fan and UV light are off.""" | ||
log.info("Turning off Hepa Fan and UV Light.") | ||
await api.set_hepa_uv_state(turn_on=False) | ||
await api.set_hepa_fan_state(turn_on=False) | ||
|
||
# Confirm that they are off | ||
hepa_uv_state: Optional[HepaUVState] = await api.get_hepa_uv_state() | ||
if hepa_uv_state: | ||
assert not hepa_uv_state.light_on, "Hepa UV did not turn OFF!" | ||
|
||
hepa_fan_state: Optional[HepaFanState] = await api.get_hepa_fan_state() | ||
if hepa_fan_state: | ||
assert not hepa_fan_state.fan_on, "Hepa Fan did not turn OFF!" | ||
|
||
|
||
async def run_hepa_fan( | ||
api: OT3API, | ||
duty_cycle: int, | ||
on_time: int, | ||
off_time: int, | ||
cycles: int, | ||
) -> None: | ||
"""Coroutine that will run the hepa fan.""" | ||
fan_duty_cycle = max(0, min(duty_cycle, MAX_DUTY_CYCLE)) | ||
fan_on_time = on_time if on_time > 0 else 0 | ||
fan_off_time = off_time if off_time > 0 else 0 | ||
run_forever = on_time == -1 | ||
start_time = datetime.datetime.now() | ||
|
||
# Dont run task if there are no valid parameters | ||
if not fan_on_time and not fan_off_time: | ||
return | ||
|
||
log.info( | ||
f"Hepa Task: Starting - duty_cycle={fan_duty_cycle}, " | ||
f"on_time={fan_on_time}s, off_time={fan_off_time}s, cycles={cycles}, " | ||
f"run_forever: {run_forever}" | ||
) | ||
|
||
fan_on: bool = False | ||
cycle: int = 1 | ||
while True: | ||
try: | ||
if not run_forever and cycle > cycles: | ||
log.info(f"Hepa Task: Reached target cycles={cycles}") | ||
break | ||
|
||
# on time | ||
if not fan_on: | ||
fan_on = True | ||
log.info(f"Hepa Task: cycle {cycle}") | ||
msg = "forever" if run_forever else f"for {fan_on_time} seconds" | ||
log.info(f"Hepa Task: Turning on fan {msg}") | ||
await api.set_hepa_fan_state(turn_on=True, duty_cycle=fan_duty_cycle) | ||
await asyncio.sleep(fan_on_time) | ||
|
||
# off time | ||
if fan_off_time: | ||
log.info(f"Hepa Task: Turning off fan for {fan_off_time} seconds") | ||
await api.set_hepa_fan_state(turn_on=False, duty_cycle=0) | ||
fan_on = False | ||
|
||
# sleep and increment the cycle | ||
await asyncio.sleep(fan_off_time or 1) | ||
if not run_forever: | ||
cycle += 1 | ||
except asyncio.CancelledError: | ||
break | ||
|
||
log.info("Hepa Task: Finished - Turning off Fan") | ||
await api.set_hepa_fan_state(turn_on=False, duty_cycle=DEFAULT_DUTY_CYCLE) | ||
|
||
elapsed_time = datetime.datetime.now() - start_time | ||
log.info(f"Hepa Task: Elapsed time={elapsed_time}") | ||
|
||
|
||
async def run_hepa_uv(api: OT3API, on_time: int, off_time: int, cycles: int) -> None: | ||
"""Coroutine that will run the hepa uv light.""" | ||
light_on_time = max(0, min(on_time, MAX_UV_DOSAGE)) | ||
light_off_time = off_time if off_time > 0 else 0 | ||
start_time = datetime.datetime.now() | ||
|
||
# Dont run task if there are no valid parameters | ||
if not light_on_time and not light_off_time: | ||
return | ||
|
||
if api.door_state == DoorState.OPEN: | ||
log.warning("UV Task: Flex Door must be closed to operate the UV light") | ||
return | ||
|
||
log.info( | ||
f"Hepa UV Task: Starting - on_time={light_on_time}s, " | ||
f"off_time={light_off_time}s, cycles={cycles}" | ||
) | ||
log.info("===========================================") | ||
|
||
uv_light_on: bool = False | ||
cycle: int = 1 | ||
while True: | ||
try: | ||
if cycle > cycles: | ||
log.info(f"UV Task: Reached target cycles={cycles}") | ||
break | ||
|
||
# on time | ||
if not uv_light_on: | ||
uv_light_on = True | ||
log.info(f"UV Task: cycle number={cycle}") | ||
log.info( | ||
f"UV Task: Turning on the UV Light for {light_on_time} seconds" | ||
) | ||
await api.set_hepa_uv_state(turn_on=True, uv_duration_s=light_on_time) | ||
await asyncio.sleep(light_on_time) | ||
|
||
# off time | ||
if light_off_time: | ||
log.info( | ||
f"UV Task: Turning off the UV Light for {light_off_time} seconds" | ||
) | ||
await api.set_hepa_uv_state(turn_on=False, uv_duration_s=0) | ||
uv_light_on = False | ||
|
||
# Sleep and increment the cycle | ||
await asyncio.sleep(light_off_time or 1) | ||
cycle += 1 | ||
except asyncio.CancelledError: | ||
break | ||
|
||
log.info("UV Task: Finished - Turning off UV Light ") | ||
await api.set_hepa_uv_state(turn_on=False, uv_duration_s=DEFAULT_UV_DOSAGE_DURATION) | ||
|
||
elapsed_time = datetime.datetime.now() - start_time | ||
log.info(f"UV Task: Elapsed time={elapsed_time}") | ||
|
||
|
||
async def _control_task( | ||
api: OT3API, hepa_task: asyncio.Task, uv_task: asyncio.Task | ||
) -> None: | ||
"""Checks robot status and cancels tasks.""" | ||
while True: | ||
# Make sure the door is closed | ||
if api.door_state == DoorState.OPEN: | ||
if not uv_task.done(): | ||
log.warning("Control Task: Flex Door Opened, stopping UV task") | ||
uv_task.cancel() | ||
|
||
if uv_task.done() and hepa_task.done(): | ||
break | ||
|
||
await asyncio.sleep(1) | ||
|
||
|
||
async def _main(args: argparse.Namespace) -> None: | ||
api = await helpers_ot3.build_async_ot3_hardware_api( | ||
is_simulating=args.is_simulating | ||
) | ||
|
||
# Scan for subsystems and make sure we have a hepa/uv module if not simulating | ||
if not args.is_simulating: | ||
await cast(OT3Controller, api._backend).probe_network() | ||
assert ( | ||
SubSystem.hepa_uv in api.attached_subsystems | ||
), "No Hepa/UV module detected!" | ||
|
||
# Make sure everything is off before we start testing | ||
await _turn_off_hepa_uv(api) | ||
|
||
# create tasks | ||
hepa_fan_task = asyncio.create_task( | ||
run_hepa_fan( | ||
api, args.fan_duty_cycle, args.fan_on_time, args.fan_off_time, args.cycles | ||
) | ||
) | ||
hepa_uv_task = asyncio.create_task( | ||
run_hepa_uv(api, args.uv_on_time, args.uv_off_time, args.cycles) | ||
) | ||
control_task = asyncio.create_task(_control_task(api, hepa_fan_task, hepa_uv_task)) | ||
|
||
# start the tasks | ||
try: | ||
await asyncio.gather(control_task, hepa_fan_task, hepa_uv_task) | ||
finally: | ||
# Make sure we always turn OFF everything! | ||
await _turn_off_hepa_uv(api) | ||
|
||
|
||
def log_config(log_level: int) -> Dict: | ||
"""Configure logging.""" | ||
return { | ||
"version": 1, | ||
"disable_existing_loggers": False, | ||
"formatters": { | ||
"basic": {"format": "%(asctime)s %(name)s %(levelname)s %(message)s"}, | ||
"production_trace": {"format": "%(asctime)s %(message)s"}, | ||
}, | ||
"handlers": { | ||
"main_log_handler": { | ||
"class": "logging.handlers.RotatingFileHandler", | ||
"formatter": "basic", | ||
"filename": "/var/log/hepauv_lifetime.log", | ||
"maxBytes": 5000000, | ||
"level": log_level, | ||
"backupCount": 3, | ||
}, | ||
"stream_handler": { | ||
"class": "logging.StreamHandler", | ||
"formatter": "basic", | ||
"level": log_level, | ||
}, | ||
}, | ||
"loggers": { | ||
"": { | ||
"handlers": ( | ||
["main_log_handler"] | ||
if log_level > logging.INFO | ||
else ["main_log_handler", "stream_handler"] | ||
), | ||
"level": log_level, | ||
}, | ||
}, | ||
} | ||
|
||
|
||
if __name__ == "__main__": | ||
parser = argparse.ArgumentParser( | ||
prog="Hepa/UV Life-Time Test", | ||
description="Program to test the lifetime of the Hepa/UV module.", | ||
) | ||
parser.add_argument( | ||
"--log-level", | ||
help=( | ||
"Developer logging level. At DEBUG or below, logs are written " | ||
"to console; at INFO or above, logs are only written to " | ||
"/var/log/hepauv_lifetime.log" | ||
), | ||
type=str, | ||
choices=["DEBUG", "INFO", "WARNING", "ERROR"], | ||
default="INFO", | ||
) | ||
parser.add_argument( | ||
"--is_simulating", | ||
action="store_true", | ||
help="Whether this is a simulation or not.", | ||
) | ||
parser.add_argument( | ||
"--fan-duty-cycle", | ||
type=int, | ||
default=DEFAULT_DUTY_CYCLE, | ||
help="The duty cycle of the hepa fan 0-100%.", | ||
) | ||
parser.add_argument( | ||
"--fan-on-time", | ||
type=int, | ||
default=0, | ||
help="The time in seconds the fan should stay on for. " | ||
"0 turns off fan (default), -1 stays on until program is stopped", | ||
) | ||
parser.add_argument( | ||
"--fan-off-time", | ||
type=int, | ||
default=0, | ||
help="The time in seconds the fan should stay on for. ignored if not set", | ||
) | ||
parser.add_argument( | ||
"--uv-on-time", | ||
type=int, | ||
default=0, | ||
help="The time in seconds the UV light will be turned on for. " | ||
"0 turns off uv light (default), " | ||
f"The max value is {MAX_UV_DOSAGE} seconds.", | ||
) | ||
parser.add_argument( | ||
"--uv-off-time", | ||
type=int, | ||
default=0, | ||
help="The time in seconds the UV light will be turned off for. " | ||
"if 0 DONT turn off the uv light explictly but wait for " | ||
"the hepa/uv to turn off on its on based on --uv-on-time.", | ||
) | ||
parser.add_argument( | ||
"--cycles", | ||
type=int, | ||
default=DEFAULT_CYCLES, | ||
help="The number of cycles to run.", | ||
) | ||
args = parser.parse_args() | ||
logging.config.dictConfig(log_config(getattr(logging, args.log_level))) | ||
|
||
try: | ||
asyncio.run(_main(args)) | ||
except KeyboardInterrupt: | ||
log.warning("KeyBoard Interrupt") |