-
Notifications
You must be signed in to change notification settings - Fork 179
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(hardware): read tip update from firmware whenever a notification is received #13822
Merged
ahiuchingau
merged 12 commits into
edge
from
RLIQ-366-support-caching-asynchronous-messages-from-firmware
Nov 1, 2023
Merged
Changes from 11 commits
Commits
Show all changes
12 commits
Select commit
Hold shift + click to select a range
30920b5
add tip presence package
ahiuchingau c808e60
update tip presence test
ahiuchingau 2c952c8
add cleanup method
ahiuchingau b1ce7d2
add tip presence manager
ahiuchingau 205015b
add debounce check
ahiuchingau 034ede9
fix tip_presence_manager.py
ahiuchingau fda0376
add error code
ahiuchingau f84433e
update pipette & handler
ahiuchingau e23b980
remove verify_tip_presence
ahiuchingau da15f02
add tests
ahiuchingau 4222961
add unsub methods
ahiuchingau 037bf60
fix hardare-testing script
ahiuchingau File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
146 changes: 146 additions & 0 deletions
146
api/src/opentrons/hardware_control/backends/tip_presence_manager.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,146 @@ | ||
import logging | ||
from functools import partial | ||
from typing import cast, Callable, Optional, List, Set | ||
from typing_extensions import TypedDict, Literal | ||
|
||
from opentrons.hardware_control.types import TipStateType, OT3Mount | ||
|
||
from opentrons_hardware.drivers.can_bus import CanMessenger | ||
from opentrons_hardware.firmware_bindings.constants import NodeId | ||
from opentrons_hardware.hardware_control.tip_presence import ( | ||
TipDetector, | ||
types as tip_types, | ||
) | ||
from opentrons_shared_data.errors.exceptions import ( | ||
TipDetectorNotFound, | ||
UnmatchedTipPresenceStates, | ||
) | ||
|
||
log = logging.getLogger(__name__) | ||
|
||
TipListener = Callable[[OT3Mount, bool], None] | ||
PipetteMountKeys = Literal["left", "right"] | ||
|
||
|
||
class TipDetectorByMount(TypedDict): | ||
left: Optional[TipDetector] | ||
right: Optional[TipDetector] | ||
|
||
|
||
class UnsubMethodByMount(TypedDict): | ||
left: Optional[Callable[[], None]] | ||
right: Optional[Callable[[], None]] | ||
|
||
|
||
class TipUpdateByMount(TypedDict): | ||
left: Optional[bool] | ||
right: Optional[bool] | ||
|
||
|
||
def _mount_to_node(mount: OT3Mount) -> NodeId: | ||
return { | ||
OT3Mount.LEFT: NodeId.pipette_left, | ||
OT3Mount.RIGHT: NodeId.pipette_right, | ||
}[mount] | ||
|
||
|
||
class TipPresenceManager: | ||
"""Handle tip change notification coming from CAN.""" | ||
|
||
_listeners: Set[TipListener] | ||
_detectors: TipDetectorByMount | ||
_unsub_methods: UnsubMethodByMount | ||
_last_state: TipUpdateByMount | ||
|
||
def __init__( | ||
self, | ||
can_messenger: CanMessenger, | ||
listeners: Set[TipListener] = set(), | ||
) -> None: | ||
self._messenger = can_messenger | ||
self._listeners = listeners | ||
self._detectors = TipDetectorByMount(left=None, right=None) | ||
self._unsub_methods = UnsubMethodByMount(left=None, right=None) | ||
self._last_state = TipUpdateByMount(left=None, right=None) | ||
|
||
@staticmethod | ||
def _get_key(mount: OT3Mount) -> PipetteMountKeys: | ||
assert mount != OT3Mount.GRIPPER | ||
return cast(PipetteMountKeys, mount.name.lower()) | ||
|
||
async def clear_detector(self, mount: OT3Mount) -> None: | ||
"""Clean up and remove tip detector.""" | ||
|
||
def _unsubscribe() -> None: | ||
"""Unsubscribe from detector.""" | ||
unsub = self._unsub_methods[self._get_key(mount)] | ||
if unsub: | ||
unsub() | ||
self.set_unsub(mount, None) | ||
|
||
detector = self.get_detector(mount) | ||
if detector: | ||
_unsubscribe() | ||
detector.cleanup() | ||
self.set_detector(mount, None) | ||
|
||
async def build_detector(self, mount: OT3Mount, sensor_count: int) -> None: | ||
assert self.get_detector(mount) is None | ||
# set up and subscribe to the detector | ||
d = TipDetector(self._messenger, _mount_to_node(mount), sensor_count) | ||
# listens to the detector so we can immediately notify listeners | ||
# the most up-to-date tip state | ||
unsub = d.add_subscriber(partial(self._handle_tip_update, mount)) | ||
self.set_unsub(mount, unsub) | ||
self.set_detector(mount, d) | ||
|
||
def _handle_tip_update( | ||
self, mount: OT3Mount, update: tip_types.TipNotification | ||
) -> None: | ||
"""Callback for detector.""" | ||
self._last_state[self._get_key(mount)] = update.presence | ||
|
||
for listener in self._listeners: | ||
listener(mount, update.presence) | ||
|
||
def current_tip_state(self, mount: OT3Mount) -> Optional[bool]: | ||
state = self._last_state[self._get_key(mount)] | ||
if state is None: | ||
log.warning("Tip state for {mount} is unknown") | ||
return state | ||
|
||
@staticmethod | ||
def _get_tip_presence(results: List[tip_types.TipNotification]) -> TipStateType: | ||
# more than one sensor reported, we have to check if their states match | ||
if len(set(r.presence for r in results)) > 1: | ||
raise UnmatchedTipPresenceStates( | ||
{int(r.sensor): int(r.presence) for r in results} | ||
) | ||
return TipStateType(results[0].presence) | ||
|
||
async def get_tip_status(self, mount: OT3Mount) -> TipStateType: | ||
detector = self.get_detector(mount) | ||
return self._get_tip_presence(await detector.request_tip_status()) | ||
|
||
def get_detector(self, mount: OT3Mount) -> TipDetector: | ||
detector = self._detectors[self._get_key(mount)] | ||
if not detector: | ||
raise TipDetectorNotFound( | ||
message=f"Tip detector not set up for {mount} mount", | ||
detail={"mount": str(mount)}, | ||
) | ||
return detector | ||
|
||
def set_detector(self, mount: OT3Mount, detector: Optional[TipDetector]) -> None: | ||
self._detectors[self._get_key(mount)] = detector | ||
|
||
def set_unsub(self, mount: OT3Mount, unsub: Optional[Callable[[], None]]) -> None: | ||
self._unsub_methods[self._get_key(mount)] = unsub | ||
|
||
def add_listener(self, listener: TipListener) -> Callable[[], None]: | ||
self._listeners.add(listener) | ||
|
||
def remove() -> None: | ||
self._listeners.discard(listener) | ||
|
||
return remove |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
probably want a way to remove one when a pipette is removed, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When it pipette is removed, we'll pass sensor_count as 0, which then removes the attached detector