Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(api): add option to ignore different tip presence states #14980

Merged
merged 1 commit into from
Apr 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,9 @@ async def capacitive_pass(
def subsystems(self) -> Dict[SubSystem, SubSystemState]:
...

async def get_tip_status(self, mount: OT3Mount) -> TipStateType:
async def get_tip_status(
self, mount: OT3Mount, ht_operation_sensor: Optional[InstrumentProbeType] = None
) -> TipStateType:
...

def current_tip_state(self, mount: OT3Mount) -> Optional[bool]:
Expand Down
10 changes: 8 additions & 2 deletions api/src/opentrons/hardware_control/backends/ot3controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -1521,8 +1521,14 @@ async def update_tip_detector(self, mount: OT3Mount, sensor_count: int) -> None:
async def teardown_tip_detector(self, mount: OT3Mount) -> None:
await self._tip_presence_manager.clear_detector(mount)

async def get_tip_status(self, mount: OT3Mount) -> TipStateType:
return await self.tip_presence_manager.get_tip_status(mount)
async def get_tip_status(
self,
mount: OT3Mount,
ht_operational_sensor: Optional[InstrumentProbeType] = None,
) -> TipStateType:
return await self.tip_presence_manager.get_tip_status(
mount, ht_operational_sensor
)

def current_tip_state(self, mount: OT3Mount) -> Optional[bool]:
return self.tip_presence_manager.current_tip_state(mount)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -780,7 +780,11 @@ def subsystems(self) -> Dict[SubSystem, SubSystemState]:
for axis in self._present_axes
}

async def get_tip_status(self, mount: OT3Mount) -> TipStateType:
async def get_tip_status(
self,
mount: OT3Mount,
ht_operational_sensor: Optional[InstrumentProbeType] = None,
) -> TipStateType:
return TipStateType(self._sim_tip_state[mount])

def current_tip_state(self, mount: OT3Mount) -> Optional[bool]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from typing import cast, Callable, Optional, List, Set
from typing_extensions import TypedDict, Literal

from opentrons.hardware_control.types import TipStateType, OT3Mount
from opentrons.hardware_control.types import TipStateType, OT3Mount, InstrumentProbeType

from opentrons_hardware.drivers.can_bus import CanMessenger
from opentrons_hardware.firmware_bindings.constants import NodeId
Expand All @@ -14,8 +14,11 @@
from opentrons_shared_data.errors.exceptions import (
TipDetectorNotFound,
UnmatchedTipPresenceStates,
GeneralError,
)

from .ot3utils import sensor_id_for_instrument

log = logging.getLogger(__name__)

TipListener = Callable[[OT3Mount, bool], None]
Expand Down Expand Up @@ -111,17 +114,40 @@ def current_tip_state(self, mount: OT3Mount) -> Optional[bool]:
return state

@staticmethod
def _get_tip_presence(results: List[tip_types.TipNotification]) -> TipStateType:
def _get_tip_presence(
results: List[tip_types.TipNotification],
ht_operational_sensor: Optional[InstrumentProbeType] = None,
) -> TipStateType:
"""
We can use ht_operational_sensor used to specify that we only care
about the status of one tip presence sensor on a high throughput
pipette, and the other is allowed to be different.
"""
if ht_operational_sensor:
target_sensor_id = sensor_id_for_instrument(ht_operational_sensor)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the case that an OT2 has somehow reached this stage, can it be expected that any sensors it has will have properly mapped IDs (primary/secondary), or do we assume an OT2 in this state would deflect to the raise case?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should not happen, the OT2 does not have any tip sensing capabilities and does not use the tip presence manager.

for r in results:
if r.sensor == target_sensor_id:
return TipStateType(r.presence)
# raise an error if requested sensor response isn't found
raise GeneralError(
message=f"Requested status for sensor {ht_operational_sensor} not found."
)
# more than one sensor reported, we have to check if their states match
if len(set(r.presence for r in results)) > 1:
raise UnmatchedTipPresenceStates(
{int(r.sensor): int(r.presence) for r in results}
)
return TipStateType(results[0].presence)

async def get_tip_status(self, mount: OT3Mount) -> TipStateType:
async def get_tip_status(
self,
mount: OT3Mount,
ht_operational_sensor: Optional[InstrumentProbeType] = None,
) -> TipStateType:
detector = self.get_detector(mount)
return self._get_tip_presence(await detector.request_tip_status())
return self._get_tip_presence(
await detector.request_tip_status(), ht_operational_sensor
)

def get_detector(self, mount: OT3Mount) -> TipDetector:
detector = self._detectors[self._get_key(mount)]
Expand Down
12 changes: 9 additions & 3 deletions api/src/opentrons/hardware_control/ot3api.py
Original file line number Diff line number Diff line change
Expand Up @@ -2078,6 +2078,7 @@ async def _high_throughput_check_tip(self) -> AsyncIterator[None]:
async def get_tip_presence_status(
self,
mount: Union[top_types.Mount, OT3Mount],
ht_operational_sensor: Optional[InstrumentProbeType] = None,
) -> TipStateType:
"""
Check tip presence status. If a high throughput pipette is present,
Expand All @@ -2091,14 +2092,19 @@ async def get_tip_presence_status(
and self._gantry_load == GantryLoad.HIGH_THROUGHPUT
):
await stack.enter_async_context(self._high_throughput_check_tip())
result = await self._backend.get_tip_status(real_mount)
result = await self._backend.get_tip_status(
real_mount, ht_operational_sensor
)
return result

async def verify_tip_presence(
self, mount: Union[top_types.Mount, OT3Mount], expected: TipStateType
self,
mount: Union[top_types.Mount, OT3Mount],
expected: TipStateType,
ht_operational_sensor: Optional[InstrumentProbeType] = None,
) -> None:
real_mount = OT3Mount.from_mount(mount)
status = await self.get_tip_presence_status(real_mount)
status = await self.get_tip_presence_status(real_mount, ht_operational_sensor)
if status != expected:
raise FailedTipStateCheck(expected, status.value)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from typing import AsyncIterator, Dict
from decoy import Decoy

from opentrons.hardware_control.types import OT3Mount, TipStateType
from opentrons.hardware_control.types import OT3Mount, TipStateType, InstrumentProbeType
from opentrons.hardware_control.backends.tip_presence_manager import TipPresenceManager
from opentrons_hardware.hardware_control.tip_presence import (
TipDetector,
Expand Down Expand Up @@ -110,6 +110,51 @@ async def test_get_tip_status_for_high_throughput(
result == expected_type


@pytest.mark.parametrize(
"tip_presence,expected_type,sensor_to_look_at",
[
(
{SensorId.S0: False, SensorId.S1: False},
TipStateType.ABSENT,
InstrumentProbeType.PRIMARY,
),
(
{SensorId.S0: True, SensorId.S1: True},
TipStateType.PRESENT,
InstrumentProbeType.SECONDARY,
),
(
{SensorId.S0: False, SensorId.S1: True},
TipStateType.ABSENT,
InstrumentProbeType.PRIMARY,
),
(
{SensorId.S0: False, SensorId.S1: True},
TipStateType.PRESENT,
InstrumentProbeType.SECONDARY,
),
],
)
async def test_allow_different_tip_states_ht(
subject: TipPresenceManager,
tip_detector_controller: TipDetectorController,
tip_presence: Dict[SensorId, bool],
expected_type: TipStateType,
sensor_to_look_at: InstrumentProbeType,
) -> None:
mount = OT3Mount.LEFT
await tip_detector_controller.retrieve_tip_status_highthroughput(tip_presence)

result = await subject.get_tip_status(mount, sensor_to_look_at)
result == expected_type

# if sensor_to_look_at is not used, different tip states
# should result in an UnmatchedTipStates error
if len(set(tip_presence[t] for t in tip_presence)) > 1:
with pytest.raises(UnmatchedTipPresenceStates):
result = await subject.get_tip_status(mount)


@pytest.mark.parametrize(
"tip_presence",
[
Expand Down
Loading