Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(hardware): read tip update from firmware whenever a notification is received #13822

Merged
51 changes: 20 additions & 31 deletions api/src/opentrons/hardware_control/backends/ot3controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
moving_axes_in_move_group,
gripper_jaw_state_from_fw,
)
from .tip_presence_manager import TipPresenceManager

try:
import aionotify # type: ignore[import]
Expand Down Expand Up @@ -86,7 +87,6 @@
update_motor_position_estimation,
)
from opentrons_hardware.hardware_control.limit_switches import get_limit_switches
from opentrons_hardware.hardware_control.tip_presence import get_tip_ejector_state
from opentrons_hardware.hardware_control.current_settings import (
set_run_current,
set_hold_current,
Expand Down Expand Up @@ -127,7 +127,6 @@
SubSystemState,
SubSystem,
TipStateType,
FailedTipStateCheck,
EstopState,
GripperJawState,
)
Expand Down Expand Up @@ -172,7 +171,6 @@
from opentrons_shared_data.errors.exceptions import (
EStopActivatedError,
EStopNotPresentError,
UnmatchedTipPresenceStates,
PipetteOverpressureError,
FirmwareUpdateRequiredError,
)
Expand Down Expand Up @@ -316,6 +314,7 @@ def __init__(
"or door, likely because not running on linux"
)
self._current_settings: Optional[OT3AxisMap[CurrentConfig]] = None
self._tip_presence_manager = TipPresenceManager(self._messenger)

async def get_serial_number(self) -> Optional[str]:
if not self.initialized:
Expand Down Expand Up @@ -867,34 +866,6 @@ async def get_limit_switches(self) -> OT3AxisMap[bool]:
res = await get_limit_switches(self._messenger, motor_nodes)
return {node_to_axis(node): bool(val) for node, val in res.items()}

async def check_for_tip_presence(
self,
mount: OT3Mount,
tip_state: TipStateType,
expect_multiple_responses: bool = False,
) -> None:
"""Raise an error if the expected tip state does not match the current state."""
res = await self.get_tip_present_state(mount, expect_multiple_responses)
if res != tip_state.value:
raise FailedTipStateCheck(tip_state, res)

async def get_tip_present_state(
self,
mount: OT3Mount,
expect_multiple_responses: bool = False,
) -> bool:
"""Get the state of the tip ejector flag for a given mount."""
expected_responses = 2 if expect_multiple_responses else 1
node = sensor_node_for_mount(OT3Mount(mount.value))
assert node != NodeId.gripper
res = await get_tip_ejector_state(self._messenger, node, expected_responses) # type: ignore[arg-type]
vals = list(res.values())
if not all([r == vals[0] for r in vals]):
states = {int(sensor): res[sensor] for sensor in res}
raise UnmatchedTipPresenceStates(states)
tip_present_state = bool(vals[0])
return tip_present_state

@staticmethod
def _tip_motor_nodes(axis_current_keys: KeysView[Axis]) -> List[NodeId]:
return [axis_to_node(Axis.Q)] if Axis.Q in axis_current_keys else []
Expand Down Expand Up @@ -1333,3 +1304,21 @@ async def build_estop_detector(self) -> bool:
def estop_state_machine(self) -> EstopStateMachine:
"""Accessor for the API to get the state machine, if it exists."""
return self._estop_state_machine

@property
def tip_presence_manager(self) -> TipPresenceManager:
return self._tip_presence_manager

async def update_tip_detector(self, mount: OT3Mount, sensor_count: int) -> None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably want a way to remove one when a pipette is removed, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When it pipette is removed, we'll pass sensor_count as 0, which then removes the attached detector

"""Build indiviudal tip detector for a mount."""
await self.teardown_tip_detector(mount)
await self._tip_presence_manager.build_detector(mount, sensor_count)

async def teardown_tip_detector(self, mount: OT3Mount) -> None:
await self._tip_presence_manager.clear_detector(mount)

async def get_tip_status(self, mount: OT3Mount) -> TipStateType:
return await self.tip_presence_manager.get_tip_status(mount)

def current_tip_state(self, mount: OT3Mount) -> Optional[bool]:
return self.tip_presence_manager.current_tip_state(mount)
31 changes: 16 additions & 15 deletions api/src/opentrons/hardware_control/backends/ot3simulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,10 @@ def _sanitize_attached_instrument(
self._present_nodes = nodes
self._current_settings: Optional[OT3AxisMap[CurrentConfig]] = None
self._sim_jaw_state = GripperJawState.HOMED_READY
self._sim_tip_state: Dict[OT3Mount, Optional[bool]] = {
mount: False if self._attached_instruments[mount] else None
for mount in [OT3Mount.LEFT, OT3Mount.RIGHT]
}

async def get_serial_number(self) -> Optional[str]:
return "simulator"
Expand Down Expand Up @@ -391,21 +395,6 @@ async def gripper_hold_jaw(
self._encoder_position[NodeId.gripper_g] = encoder_position_um / 1000.0
self._sim_jaw_state = GripperJawState.HOLDING

async def check_for_tip_presence(
self,
mount: OT3Mount,
tip_state: TipStateType,
expect_multiple_responses: bool = False,
) -> None:
"""Raise an error if the given state doesn't match the physical state."""
pass

async def get_tip_present_state(
self, mount: OT3Mount, expect_multiple_responses: bool = False
) -> bool:
"""Get the state of the tip ejector flag for a given mount."""
pass

async def get_jaw_state(self) -> GripperJawState:
"""Get the state of the gripper jaw."""
return self._sim_jaw_state
Expand Down Expand Up @@ -747,3 +736,15 @@ def subsystems(self) -> Dict[SubSystem, SubSystemState]:
def estop_state_machine(self) -> EstopStateMachine:
"""Return an estop state machine locked in the "disengaged" state."""
return self._estop_state_machine

async def get_tip_status(self, mount: OT3Mount) -> TipStateType:
return TipStateType(self._sim_tip_state[mount])

def current_tip_state(self, mount: OT3Mount) -> Optional[bool]:
return self._sim_tip_state[mount]

async def update_tip_detector(self, mount: OT3Mount, sensor_count: int) -> None:
pass

async def teardown_tip_detector(self, mount: OT3Mount) -> None:
pass
146 changes: 146 additions & 0 deletions api/src/opentrons/hardware_control/backends/tip_presence_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
import logging
from functools import partial
from typing import cast, Callable, Optional, List, Set
from typing_extensions import TypedDict, Literal

from opentrons.hardware_control.types import TipStateType, OT3Mount

from opentrons_hardware.drivers.can_bus import CanMessenger
from opentrons_hardware.firmware_bindings.constants import NodeId
from opentrons_hardware.hardware_control.tip_presence import (
TipDetector,
types as tip_types,
)
from opentrons_shared_data.errors.exceptions import (
TipDetectorNotFound,
UnmatchedTipPresenceStates,
)

log = logging.getLogger(__name__)

TipListener = Callable[[OT3Mount, bool], None]
PipetteMountKeys = Literal["left", "right"]


class TipDetectorByMount(TypedDict):
left: Optional[TipDetector]
right: Optional[TipDetector]


class UnsubMethodByMount(TypedDict):
left: Optional[Callable[[], None]]
right: Optional[Callable[[], None]]


class TipUpdateByMount(TypedDict):
left: Optional[bool]
right: Optional[bool]


def _mount_to_node(mount: OT3Mount) -> NodeId:
return {
OT3Mount.LEFT: NodeId.pipette_left,
OT3Mount.RIGHT: NodeId.pipette_right,
}[mount]


class TipPresenceManager:
"""Handle tip change notification coming from CAN."""

_listeners: Set[TipListener]
_detectors: TipDetectorByMount
_unsub_methods: UnsubMethodByMount
_last_state: TipUpdateByMount

def __init__(
self,
can_messenger: CanMessenger,
listeners: Set[TipListener] = set(),
) -> None:
self._messenger = can_messenger
self._listeners = listeners
self._detectors = TipDetectorByMount(left=None, right=None)
self._unsub_methods = UnsubMethodByMount(left=None, right=None)
self._last_state = TipUpdateByMount(left=None, right=None)

@staticmethod
def _get_key(mount: OT3Mount) -> PipetteMountKeys:
assert mount != OT3Mount.GRIPPER
return cast(PipetteMountKeys, mount.name.lower())

async def clear_detector(self, mount: OT3Mount) -> None:
"""Clean up and remove tip detector."""

def _unsubscribe() -> None:
"""Unsubscribe from detector."""
unsub = self._unsub_methods[self._get_key(mount)]
if unsub:
unsub()
self.set_unsub(mount, None)

detector = self.get_detector(mount)
if detector:
_unsubscribe()
detector.cleanup()
self.set_detector(mount, None)

async def build_detector(self, mount: OT3Mount, sensor_count: int) -> None:
assert self.get_detector(mount) is None
# set up and subscribe to the detector
d = TipDetector(self._messenger, _mount_to_node(mount), sensor_count)
# listens to the detector so we can immediately notify listeners
# the most up-to-date tip state
unsub = d.add_subscriber(partial(self._handle_tip_update, mount))
self.set_unsub(mount, unsub)
self.set_detector(mount, d)

def _handle_tip_update(
self, mount: OT3Mount, update: tip_types.TipNotification
) -> None:
"""Callback for detector."""
self._last_state[self._get_key(mount)] = update.presence

for listener in self._listeners:
listener(mount, update.presence)

def current_tip_state(self, mount: OT3Mount) -> Optional[bool]:
state = self._last_state[self._get_key(mount)]
if state is None:
log.warning("Tip state for {mount} is unknown")
return state

@staticmethod
def _get_tip_presence(results: List[tip_types.TipNotification]) -> TipStateType:
# more than one sensor reported, we have to check if their states match
if len(set(r.presence for r in results)) > 1:
raise UnmatchedTipPresenceStates(
{int(r.sensor): int(r.presence) for r in results}
)
return TipStateType(results[0].presence)

async def get_tip_status(self, mount: OT3Mount) -> TipStateType:
detector = self.get_detector(mount)
return self._get_tip_presence(await detector.request_tip_status())

def get_detector(self, mount: OT3Mount) -> TipDetector:
detector = self._detectors[self._get_key(mount)]
if not detector:
raise TipDetectorNotFound(
message=f"Tip detector not set up for {mount} mount",
detail={"mount": str(mount)},
)
return detector

def set_detector(self, mount: OT3Mount, detector: Optional[TipDetector]) -> None:
self._detectors[self._get_key(mount)] = detector

def set_unsub(self, mount: OT3Mount, unsub: Optional[Callable[[], None]]) -> None:
self._unsub_methods[self._get_key(mount)] = unsub

def add_listener(self, listener: TipListener) -> Callable[[], None]:
self._listeners.add(listener)

def remove() -> None:
self._listeners.discard(listener)

return remove
Loading
Loading