From cfefcbc024d6e8902e19f5b614abb875ffff239b Mon Sep 17 00:00:00 2001 From: Caila Marashaj <98041399+caila-marashaj@users.noreply.github.com> Date: Tue, 23 Apr 2024 11:34:55 -0400 Subject: [PATCH] feat(api): add option to ignore different tip presence states (#14980) ## Overview This code adds an argument called `ht_operational_sensor` to `get_tip_presence_status`, that when used tells the api to only return the tip presence state of the instrument probe type specified. This allows calibration and partial tip flows to execute and check against their expected tip status without failing. ## TODO A follow-up pr will go up using this parameter for the `get_tip_presence` call in the calibration flow. ## Review Requests I'll most likely address any non-blocking change requests in a follow-up pr so we can cut the internal release as fast as possible, but let me know if: - `ht_operational_sensor` makes sense or if we can think of a better name - we should otherwise go about anything differently here. --- .../backends/flex_protocol.py | 4 +- .../backends/ot3controller.py | 10 +++- .../hardware_control/backends/ot3simulator.py | 6 ++- .../backends/tip_presence_manager.py | 34 ++++++++++++-- api/src/opentrons/hardware_control/ot3api.py | 12 +++-- .../backends/test_ot3_tip_presence_manager.py | 47 ++++++++++++++++++- 6 files changed, 101 insertions(+), 12 deletions(-) diff --git a/api/src/opentrons/hardware_control/backends/flex_protocol.py b/api/src/opentrons/hardware_control/backends/flex_protocol.py index 53efde79a23..7bd2969de6b 100644 --- a/api/src/opentrons/hardware_control/backends/flex_protocol.py +++ b/api/src/opentrons/hardware_control/backends/flex_protocol.py @@ -383,7 +383,9 @@ async def capacitive_pass( def subsystems(self) -> Dict[SubSystem, SubSystemState]: ... - async def get_tip_status(self, mount: OT3Mount) -> TipStateType: + async def get_tip_status( + self, mount: OT3Mount, ht_operation_sensor: Optional[InstrumentProbeType] = None + ) -> TipStateType: ... def current_tip_state(self, mount: OT3Mount) -> Optional[bool]: diff --git a/api/src/opentrons/hardware_control/backends/ot3controller.py b/api/src/opentrons/hardware_control/backends/ot3controller.py index 9316fb67e90..ea0b610f8b4 100644 --- a/api/src/opentrons/hardware_control/backends/ot3controller.py +++ b/api/src/opentrons/hardware_control/backends/ot3controller.py @@ -1521,8 +1521,14 @@ async def update_tip_detector(self, mount: OT3Mount, sensor_count: int) -> None: async def teardown_tip_detector(self, mount: OT3Mount) -> None: await self._tip_presence_manager.clear_detector(mount) - async def get_tip_status(self, mount: OT3Mount) -> TipStateType: - return await self.tip_presence_manager.get_tip_status(mount) + async def get_tip_status( + self, + mount: OT3Mount, + ht_operational_sensor: Optional[InstrumentProbeType] = None, + ) -> TipStateType: + return await self.tip_presence_manager.get_tip_status( + mount, ht_operational_sensor + ) def current_tip_state(self, mount: OT3Mount) -> Optional[bool]: return self.tip_presence_manager.current_tip_state(mount) diff --git a/api/src/opentrons/hardware_control/backends/ot3simulator.py b/api/src/opentrons/hardware_control/backends/ot3simulator.py index b96be54026e..26d6237e9a3 100644 --- a/api/src/opentrons/hardware_control/backends/ot3simulator.py +++ b/api/src/opentrons/hardware_control/backends/ot3simulator.py @@ -780,7 +780,11 @@ def subsystems(self) -> Dict[SubSystem, SubSystemState]: for axis in self._present_axes } - async def get_tip_status(self, mount: OT3Mount) -> TipStateType: + async def get_tip_status( + self, + mount: OT3Mount, + ht_operational_sensor: Optional[InstrumentProbeType] = None, + ) -> TipStateType: return TipStateType(self._sim_tip_state[mount]) def current_tip_state(self, mount: OT3Mount) -> Optional[bool]: diff --git a/api/src/opentrons/hardware_control/backends/tip_presence_manager.py b/api/src/opentrons/hardware_control/backends/tip_presence_manager.py index 9d2be3901da..0e46d713955 100644 --- a/api/src/opentrons/hardware_control/backends/tip_presence_manager.py +++ b/api/src/opentrons/hardware_control/backends/tip_presence_manager.py @@ -3,7 +3,7 @@ from typing import cast, Callable, Optional, List, Set from typing_extensions import TypedDict, Literal -from opentrons.hardware_control.types import TipStateType, OT3Mount +from opentrons.hardware_control.types import TipStateType, OT3Mount, InstrumentProbeType from opentrons_hardware.drivers.can_bus import CanMessenger from opentrons_hardware.firmware_bindings.constants import NodeId @@ -14,8 +14,11 @@ from opentrons_shared_data.errors.exceptions import ( TipDetectorNotFound, UnmatchedTipPresenceStates, + GeneralError, ) +from .ot3utils import sensor_id_for_instrument + log = logging.getLogger(__name__) TipListener = Callable[[OT3Mount, bool], None] @@ -111,7 +114,24 @@ def current_tip_state(self, mount: OT3Mount) -> Optional[bool]: return state @staticmethod - def _get_tip_presence(results: List[tip_types.TipNotification]) -> TipStateType: + def _get_tip_presence( + results: List[tip_types.TipNotification], + ht_operational_sensor: Optional[InstrumentProbeType] = None, + ) -> TipStateType: + """ + We can use ht_operational_sensor used to specify that we only care + about the status of one tip presence sensor on a high throughput + pipette, and the other is allowed to be different. + """ + if ht_operational_sensor: + target_sensor_id = sensor_id_for_instrument(ht_operational_sensor) + for r in results: + if r.sensor == target_sensor_id: + return TipStateType(r.presence) + # raise an error if requested sensor response isn't found + raise GeneralError( + message=f"Requested status for sensor {ht_operational_sensor} not found." + ) # more than one sensor reported, we have to check if their states match if len(set(r.presence for r in results)) > 1: raise UnmatchedTipPresenceStates( @@ -119,9 +139,15 @@ def _get_tip_presence(results: List[tip_types.TipNotification]) -> TipStateType: ) return TipStateType(results[0].presence) - async def get_tip_status(self, mount: OT3Mount) -> TipStateType: + async def get_tip_status( + self, + mount: OT3Mount, + ht_operational_sensor: Optional[InstrumentProbeType] = None, + ) -> TipStateType: detector = self.get_detector(mount) - return self._get_tip_presence(await detector.request_tip_status()) + return self._get_tip_presence( + await detector.request_tip_status(), ht_operational_sensor + ) def get_detector(self, mount: OT3Mount) -> TipDetector: detector = self._detectors[self._get_key(mount)] diff --git a/api/src/opentrons/hardware_control/ot3api.py b/api/src/opentrons/hardware_control/ot3api.py index 37f1f43e75c..dbc76181f24 100644 --- a/api/src/opentrons/hardware_control/ot3api.py +++ b/api/src/opentrons/hardware_control/ot3api.py @@ -2072,6 +2072,7 @@ async def _high_throughput_check_tip(self) -> AsyncIterator[None]: async def get_tip_presence_status( self, mount: Union[top_types.Mount, OT3Mount], + ht_operational_sensor: Optional[InstrumentProbeType] = None, ) -> TipStateType: """ Check tip presence status. If a high throughput pipette is present, @@ -2085,14 +2086,19 @@ async def get_tip_presence_status( and self._gantry_load == GantryLoad.HIGH_THROUGHPUT ): await stack.enter_async_context(self._high_throughput_check_tip()) - result = await self._backend.get_tip_status(real_mount) + result = await self._backend.get_tip_status( + real_mount, ht_operational_sensor + ) return result async def verify_tip_presence( - self, mount: Union[top_types.Mount, OT3Mount], expected: TipStateType + self, + mount: Union[top_types.Mount, OT3Mount], + expected: TipStateType, + ht_operational_sensor: Optional[InstrumentProbeType] = None, ) -> None: real_mount = OT3Mount.from_mount(mount) - status = await self.get_tip_presence_status(real_mount) + status = await self.get_tip_presence_status(real_mount, ht_operational_sensor) if status != expected: raise FailedTipStateCheck(expected, status.value) diff --git a/api/tests/opentrons/hardware_control/backends/test_ot3_tip_presence_manager.py b/api/tests/opentrons/hardware_control/backends/test_ot3_tip_presence_manager.py index 543f7b3b400..6ea39738fc2 100644 --- a/api/tests/opentrons/hardware_control/backends/test_ot3_tip_presence_manager.py +++ b/api/tests/opentrons/hardware_control/backends/test_ot3_tip_presence_manager.py @@ -2,7 +2,7 @@ from typing import AsyncIterator, Dict from decoy import Decoy -from opentrons.hardware_control.types import OT3Mount, TipStateType +from opentrons.hardware_control.types import OT3Mount, TipStateType, InstrumentProbeType from opentrons.hardware_control.backends.tip_presence_manager import TipPresenceManager from opentrons_hardware.hardware_control.tip_presence import ( TipDetector, @@ -110,6 +110,51 @@ async def test_get_tip_status_for_high_throughput( result == expected_type +@pytest.mark.parametrize( + "tip_presence,expected_type,sensor_to_look_at", + [ + ( + {SensorId.S0: False, SensorId.S1: False}, + TipStateType.ABSENT, + InstrumentProbeType.PRIMARY, + ), + ( + {SensorId.S0: True, SensorId.S1: True}, + TipStateType.PRESENT, + InstrumentProbeType.SECONDARY, + ), + ( + {SensorId.S0: False, SensorId.S1: True}, + TipStateType.ABSENT, + InstrumentProbeType.PRIMARY, + ), + ( + {SensorId.S0: False, SensorId.S1: True}, + TipStateType.PRESENT, + InstrumentProbeType.SECONDARY, + ), + ], +) +async def test_allow_different_tip_states_ht( + subject: TipPresenceManager, + tip_detector_controller: TipDetectorController, + tip_presence: Dict[SensorId, bool], + expected_type: TipStateType, + sensor_to_look_at: InstrumentProbeType, +) -> None: + mount = OT3Mount.LEFT + await tip_detector_controller.retrieve_tip_status_highthroughput(tip_presence) + + result = await subject.get_tip_status(mount, sensor_to_look_at) + result == expected_type + + # if sensor_to_look_at is not used, different tip states + # should result in an UnmatchedTipStates error + if len(set(tip_presence[t] for t in tip_presence)) > 1: + with pytest.raises(UnmatchedTipPresenceStates): + result = await subject.get_tip_status(mount) + + @pytest.mark.parametrize( "tip_presence", [