Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(api,hardware): add hepa/uv config/control commands to api. #14489

Merged
merged 3 commits into from
Feb 16, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion api/src/opentrons/hardware_control/backends/flex_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,12 @@
EstopState,
HardwareEventHandler,
HardwareEventUnsubscriber,
HepaFanState,
HepaUVState,
StatusBarState,
)
from opentrons.hardware_control.module_control import AttachedModulesControl
from ..dev_types import OT3AttachedInstruments
from ..types import StatusBarState
from .types import HWStopCondition

Cls = TypeVar("Cls")
Expand Down Expand Up @@ -417,3 +419,17 @@ def check_gripper_position_within_bounds(
hard_limit_upper: float,
) -> None:
...

async def set_hepa_fan_state(self, fan_on: bool, duty_cycle: int) -> bool:
"""Sets the state and duty cycle of the Hepa/UV module."""
...

async def get_hepa_fan_state(self) -> Optional[HepaFanState]:
...

async def set_hepa_uv_state(self, light_on: bool, timeout_s: int) -> bool:
"""Sets the state and timeout in seconds of the Hepa/UV module."""
...

async def get_hepa_uv_state(self) -> Optional[HepaUVState]:
...
37 changes: 36 additions & 1 deletion api/src/opentrons/hardware_control/backends/ot3controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,12 @@
from opentrons_hardware.hardware_control.gripper_settings import (
get_gripper_jaw_state,
)
from opentrons_hardware.hardware_control.hepa_uv_settings import (
set_hepa_fan_state as set_hepa_fan_state_fw,
get_hepa_fan_state as get_hepa_fan_state_fw,
set_hepa_uv_state as set_hepa_uv_state_fw,
get_hepa_uv_state as get_hepa_uv_state_fw,
)

from opentrons_hardware.drivers.gpio import OT3GPIO, RemoteOT3GPIO
from opentrons_shared_data.pipette.dev_types import PipetteName
Expand All @@ -193,7 +199,7 @@
AttachedGripper,
OT3AttachedInstruments,
)
from ..types import StatusBarState
from ..types import HepaFanState, HepaUVState, StatusBarState

from .types import HWStopCondition
from .flex_protocol import FlexBackend
Expand Down Expand Up @@ -1570,3 +1576,32 @@ def check_gripper_position_within_bounds(
"actual-jaw-width": current_gripper_position,
},
)

async def set_hepa_fan_state(self, fan_on: bool, duty_cycle: int) -> bool:
return await set_hepa_fan_state_fw(self._messenger, fan_on, duty_cycle)

async def get_hepa_fan_state(self) -> Optional[HepaFanState]:
res = await get_hepa_fan_state_fw(self._messenger)
return (
HepaFanState(
fan_on=res.fan_on,
duty_cycle=res.duty_cycle,
)
if res
else None
)

async def set_hepa_uv_state(self, light_on: bool, timeout_s: int) -> bool:
return await set_hepa_uv_state_fw(self._messenger, light_on, timeout_s)

async def get_hepa_uv_state(self) -> Optional[HepaUVState]:
res = await get_hepa_uv_state_fw(self._messenger)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you not just returning res here because of linter complaining?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

its to separate opentrons_hardware dataclasses from opentrons.hardware_control ones.

return (
HepaUVState(
light_on=res.uv_light_on,
config_timeout=res.timeout_s,
remaining_time_s=res.remaining_time_s,
)
if res
else None
)
14 changes: 14 additions & 0 deletions api/src/opentrons/hardware_control/backends/ot3simulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
from opentrons.hardware_control.types import (
BoardRevision,
Axis,
HepaFanState,
HepaUVState,
OT3Mount,
OT3AxisMap,
CurrentConfig,
Expand Down Expand Up @@ -803,3 +805,15 @@ def check_gripper_position_within_bounds(
# This is a (pretty bad) simulation of the gripper actually gripping something,
# but it should work.
self._encoder_position[Axis.G] = (hard_limit_upper - jaw_width) / 2

async def set_hepa_fan_state(self, fan_on: bool, duty_cycle: int) -> bool:
return False

async def get_hepa_fan_state(self) -> Optional[HepaFanState]:
return None

async def set_hepa_uv_state(self, light_on: bool, timeout_s: int) -> bool:
return False

async def get_hepa_uv_state(self) -> Optional[HepaUVState]:
return None
20 changes: 20 additions & 0 deletions api/src/opentrons/hardware_control/ot3api.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@
HardwareEvent,
HardwareEventHandler,
HardwareAction,
HepaFanState,
HepaUVState,
MotionChecks,
SubSystem,
PauseType,
Expand Down Expand Up @@ -2685,3 +2687,21 @@ def estop_acknowledge_and_clear(self) -> EstopOverallStatus:

def get_estop_state(self) -> EstopState:
return self._backend.get_estop_state()

async def set_hepa_fan_state(
self, turn_on: bool = False, duty_cycle: int = 75
) -> bool:
"""Sets the state and duty cycle of the Hepa/UV module."""
return await self._backend.set_hepa_fan_state(turn_on, duty_cycle)

async def get_hepa_fan_state(self) -> Optional[HepaFanState]:
return await self._backend.get_hepa_fan_state()

async def set_hepa_uv_state(
self, turn_on: bool = False, timeout_s: int = 900
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this long of a timeout?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah 900s (15m) is the default uv light timeout

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we call it like dose_duration or something?

) -> bool:
"""Sets the state and timeout in seconds of the Hepa/UV module."""
return await self._backend.set_hepa_uv_state(turn_on, timeout_s)

async def get_hepa_uv_state(self) -> Optional[HepaUVState]:
return await self._backend.get_hepa_uv_state()
13 changes: 13 additions & 0 deletions api/src/opentrons/hardware_control/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,19 @@ class EstopOverallStatus:
right_physical_state: EstopPhysicalStatus


@dataclass
class HepaFanState:
fan_on: bool
duty_cycle: int


@dataclass
class HepaUVState:
light_on: bool
config_timeout: int
remaining_time_s: int


@dataclass(frozen=True)
class DoorStateNotification:
event: Literal[
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -642,7 +642,7 @@ class SetHepaFanStateRequestPayload(EmptyPayload):
"""A request to set the state and pwm of a the hepa fan."""

duty_cycle: utils.UInt32Field
fan_on: utils.Int8Field
fan_on: utils.UInt8Field


@dataclass(eq=False)
Expand Down
149 changes: 149 additions & 0 deletions hardware/opentrons_hardware/hardware_control/hepa_uv_settings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
"""Utilities for controlling the hepa/uv extension module."""
import logging
import asyncio
from typing import Optional
from dataclasses import dataclass
from opentrons_hardware.drivers.can_bus.can_messenger import CanMessenger
from opentrons_hardware.firmware_bindings.arbitration_id import ArbitrationId

from opentrons_hardware.firmware_bindings.messages import payloads
from opentrons_hardware.firmware_bindings.messages.messages import MessageDefinition
from opentrons_hardware.firmware_bindings.messages.message_definitions import (
SetHepaFanStateRequest,
GetHepaFanStateRequest,
GetHepaFanStateResponse,
SetHepaUVStateRequest,
GetHepaUVStateRequest,
GetHepaUVStateResponse,
)
from opentrons_hardware.firmware_bindings.utils import (
UInt8Field,
UInt32Field,
)
from opentrons_hardware.firmware_bindings.constants import (
MessageId,
NodeId,
ErrorCode,
)

log = logging.getLogger(__name__)


@dataclass(frozen=True)
class HepaFanState:
"""Hepa Fan Config."""

fan_on: bool
duty_cycle: int


@dataclass(frozen=True)
class HepaUVState:
"""Hepa UV Light Config."""

uv_light_on: bool
timeout_s: int
remaining_time_s: int


async def set_hepa_fan_state(
can_messenger: CanMessenger,
fan_on: bool,
duty_cycle: int,
) -> bool:
"""Set the Hepa fan state and duty cycle."""
error = await can_messenger.ensure_send(
node_id=NodeId.hepa_uv,
message=SetHepaFanStateRequest(
payload=payloads.SetHepaFanStateRequestPayload(
duty_cycle=UInt32Field(duty_cycle), fan_on=UInt8Field(fan_on)
),
),
expected_nodes=[NodeId.hepa_uv],
)
if error != ErrorCode.ok:
log.error(f"recieved error trying to set hepa fan state {str(error)}")
return error == ErrorCode.ok


async def get_hepa_fan_state(can_messenger: CanMessenger) -> Optional[HepaFanState]:
"""Gets the state of the Hepa fan."""
fan_state: Optional[HepaFanState] = None

event = asyncio.Event()

def _listener(message: MessageDefinition, arb_id: ArbitrationId) -> None:
nonlocal fan_state
if isinstance(message, GetHepaFanStateResponse):
event.set()
fan_state = HepaFanState(
fan_on=bool(message.payload.fan_on.value),
duty_cycle=int(message.payload.duty_cycle.value),
)

def _filter(arb_id: ArbitrationId) -> bool:
return (NodeId(arb_id.parts.originating_node_id) == NodeId.hepa_uv) and (
MessageId(arb_id.parts.message_id) == MessageId.get_hepa_fan_state_response
)

can_messenger.add_listener(_listener, _filter)
await can_messenger.send(node_id=NodeId.hepa_uv, message=GetHepaFanStateRequest())
try:
await asyncio.wait_for(event.wait(), 1.0)
except asyncio.TimeoutError:
log.warning("hepa fan state request timed out")

Check warning on line 94 in hardware/opentrons_hardware/hardware_control/hepa_uv_settings.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/hardware_control/hepa_uv_settings.py#L93-L94

Added lines #L93 - L94 were not covered by tests
finally:
can_messenger.remove_listener(_listener)
return fan_state


async def set_hepa_uv_state(
can_messenger: CanMessenger,
uv_light_on: bool,
timeout_s: int,
) -> bool:
"""Set the Hepa UV state and timeout in seconds."""
error = await can_messenger.ensure_send(
node_id=NodeId.hepa_uv,
message=SetHepaUVStateRequest(
payload=payloads.SetHepaUVStateRequestPayload(
timeout_s=UInt32Field(timeout_s), uv_light_on=UInt8Field(uv_light_on)
),
),
expected_nodes=[NodeId.hepa_uv],
)
if error != ErrorCode.ok:
log.error(f"recieved error trying to set hepa uv light state {str(error)}")
return error == ErrorCode.ok


async def get_hepa_uv_state(can_messenger: CanMessenger) -> Optional[HepaUVState]:
"""Gets the state of the Hepa uv light."""
uv_state: Optional[HepaUVState] = None

event = asyncio.Event()

def _listener(message: MessageDefinition, arb_id: ArbitrationId) -> None:
nonlocal uv_state
if isinstance(message, GetHepaUVStateResponse):
event.set()
uv_state = HepaUVState(
uv_light_on=bool(message.payload.uv_light_on.value),
timeout_s=int(message.payload.timeout_s.value),
remaining_time_s=int(message.payload.remaining_time_s.value),
)

def _filter(arb_id: ArbitrationId) -> bool:
return (NodeId(arb_id.parts.originating_node_id) == NodeId.hepa_uv) and (
MessageId(arb_id.parts.message_id) == MessageId.get_hepa_uv_state_response
)

can_messenger.add_listener(_listener, _filter)
await can_messenger.send(node_id=NodeId.hepa_uv, message=GetHepaUVStateRequest())
try:
await asyncio.wait_for(event.wait(), 1.0)
except asyncio.TimeoutError:
log.warning("hepa uv light state request timed out")

Check warning on line 146 in hardware/opentrons_hardware/hardware_control/hepa_uv_settings.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/hardware_control/hepa_uv_settings.py#L145-L146

Added lines #L145 - L146 were not covered by tests
finally:
can_messenger.remove_listener(_listener)
return uv_state
Loading
Loading