Skip to content

Commit

Permalink
feat(api): raise an error if pipette nozzles might collide with therm…
Browse files Browse the repository at this point in the history
…ocycler lid clips (#14522)

Addresses RQA-2311
  • Loading branch information
sanni-t authored Feb 20, 2024
1 parent 6d552e8 commit 7b456ed
Show file tree
Hide file tree
Showing 6 changed files with 255 additions and 38 deletions.
139 changes: 103 additions & 36 deletions api/src/opentrons/protocol_api/core/engine/deck_conflict.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@
overload,
Union,
TYPE_CHECKING,
List,
)

from opentrons_shared_data.errors.exceptions import MotionPlanningFailureError
from opentrons_shared_data.module import FLEX_TC_LID_CLIP_POSITIONS_IN_DECK_COORDINATES

from opentrons.hardware_control.nozzle_manager import NozzleConfigurationType
from opentrons.hardware_control.modules.types import ModuleType
Expand All @@ -30,7 +32,10 @@
DropTipWellLocation,
)
from opentrons.protocol_engine.errors.exceptions import LabwareNotLoadedOnModuleError
from opentrons.protocol_engine.types import StagingSlotLocation, Dimensions
from opentrons.protocol_engine.types import (
StagingSlotLocation,
Dimensions,
)
from opentrons.types import DeckSlotName, StagingSlotName, Point
from ..._trash_bin import TrashBin
from ..._waste_chute import WasteChute
Expand Down Expand Up @@ -195,7 +200,8 @@ def check(
)


def check_safe_for_pipette_movement( # noqa: C901
# TODO (spp, 2023-02-16): move pipette movement safety checks to its own separate file.
def check_safe_for_pipette_movement(
engine_state: StateView,
pipette_id: str,
labware_id: str,
Expand Down Expand Up @@ -249,44 +255,105 @@ def check_safe_for_pipette_movement( # noqa: C901
slot=labware_slot.as_int(), robot_type=engine_state.config.robot_type
)

def _check_conflict_with_slot_item(
surrounding_slot: Union[DeckSlotName, StagingSlotName],
) -> None:
"""Raises error if the pipette is expected to collide with surrounding slot items."""
# Check if slot overlaps with pipette position
slot_pos = engine_state.addressable_areas.get_addressable_area_position(
addressable_area_name=surrounding_slot.id,
do_compatibility_check=False,
)
slot_bounds = engine_state.addressable_areas.get_addressable_area_bounding_box(
addressable_area_name=surrounding_slot.id,
do_compatibility_check=False,
if _will_collide_with_thermocycler_lid(
engine_state=engine_state,
pipette_bounds=pipette_bounds_at_well_location,
surrounding_regular_slots=surrounding_slots.regular_slots,
):
raise PartialTipMovementNotAllowedError(
f"Moving to {engine_state.labware.get_display_name(labware_id)} in slot"
f" {labware_slot} with {primary_nozzle} nozzle partial configuration"
f" will result in collision with thermocycler lid in deck slot A1."
)
for bound_vertex in pipette_bounds_at_well_location:
if not _point_overlaps_with_slot(
slot_pos, slot_bounds, nozzle_point=bound_vertex
):
continue
# Check z-height of items in overlapping slot
if isinstance(surrounding_slot, DeckSlotName):
slot_highest_z = engine_state.geometry.get_highest_z_in_slot(
DeckSlotLocation(slotName=surrounding_slot)
)
else:
slot_highest_z = engine_state.geometry.get_highest_z_in_slot(
StagingSlotLocation(slotName=surrounding_slot)
)
if slot_highest_z + Z_SAFETY_MARGIN > pipette_bounds_at_well_location[0].z:
raise PartialTipMovementNotAllowedError(
f"Moving to {engine_state.labware.get_display_name(labware_id)} in slot"
f" {labware_slot} with {primary_nozzle} nozzle partial configuration"
f" will result in collision with items in deck slot {surrounding_slot}."
)

for regular_slot in surrounding_slots.regular_slots:
_check_conflict_with_slot_item(regular_slot)
if _slot_has_potential_colliding_object(
engine_state=engine_state,
pipette_bounds=pipette_bounds_at_well_location,
surrounding_slot=regular_slot,
):
raise PartialTipMovementNotAllowedError(
f"Moving to {engine_state.labware.get_display_name(labware_id)} in slot"
f" {labware_slot} with {primary_nozzle} nozzle partial configuration"
f" will result in collision with items in deck slot {regular_slot}."
)
for staging_slot in surrounding_slots.staging_slots:
_check_conflict_with_slot_item(staging_slot)
if _slot_has_potential_colliding_object(
engine_state=engine_state,
pipette_bounds=pipette_bounds_at_well_location,
surrounding_slot=staging_slot,
):
raise PartialTipMovementNotAllowedError(
f"Moving to {engine_state.labware.get_display_name(labware_id)} in slot"
f" {labware_slot} with {primary_nozzle} nozzle partial configuration"
f" will result in collision with items in staging slot {staging_slot}."
)


def _slot_has_potential_colliding_object(
engine_state: StateView,
pipette_bounds: Tuple[Point, Point, Point, Point],
surrounding_slot: Union[DeckSlotName, StagingSlotName],
) -> bool:
"""Return the slot, if any, that has an item that the pipette might collide into."""
# Check if slot overlaps with pipette position
slot_pos = engine_state.addressable_areas.get_addressable_area_position(
addressable_area_name=surrounding_slot.id,
do_compatibility_check=False,
)
slot_bounds = engine_state.addressable_areas.get_addressable_area_bounding_box(
addressable_area_name=surrounding_slot.id,
do_compatibility_check=False,
)
for bound_vertex in pipette_bounds:
if not _point_overlaps_with_slot(
slot_pos, slot_bounds, nozzle_point=bound_vertex
):
continue
# Check z-height of items in overlapping slot
if isinstance(surrounding_slot, DeckSlotName):
slot_highest_z = engine_state.geometry.get_highest_z_in_slot(
DeckSlotLocation(slotName=surrounding_slot)
)
else:
slot_highest_z = engine_state.geometry.get_highest_z_in_slot(
StagingSlotLocation(slotName=surrounding_slot)
)
if slot_highest_z + Z_SAFETY_MARGIN > pipette_bounds[0].z:
return True
return False


def _will_collide_with_thermocycler_lid(
engine_state: StateView,
pipette_bounds: Tuple[Point, Point, Point, Point],
surrounding_regular_slots: List[DeckSlotName],
) -> bool:
"""Return whether the pipette might collide with thermocycler's lid/clips on a Flex.
If any of the pipette's bounding vertices lie inside the no-go zone of the thermocycler-
which is the area that's to the left, back and below the thermocycler's lid's
protruding clips, then we will mark the movement for possible collision.
This could cause false raises for the case where an 8-channel is accessing the
thermocycler labware in a location such that the pipette is in the area between
the clips but not touching either clips. But that's a tradeoff we'll need to make
between a complicated check involving accurate positions of all entities involved
and a crude check that disallows all partial tip movements around the thermocycler.
"""
if (
DeckSlotName.SLOT_A1 in surrounding_regular_slots
and engine_state.modules.is_flex_deck_with_thermocycler()
):
tc_right_clip_pos = FLEX_TC_LID_CLIP_POSITIONS_IN_DECK_COORDINATES["right_clip"]
for bound_vertex in pipette_bounds:
if (
bound_vertex.x <= tc_right_clip_pos["x"]
and bound_vertex.y >= tc_right_clip_pos["y"]
and bound_vertex.z <= tc_right_clip_pos["z"]
):
return True
return False


def _point_overlaps_with_slot(
Expand Down
14 changes: 14 additions & 0 deletions api/src/opentrons/protocol_engine/state/modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -1049,3 +1049,17 @@ def get_overflowed_module_in_slot(
return self.get(module_id)

return None

def is_flex_deck_with_thermocycler(self) -> bool:
"""Return if this is a Flex deck with a thermocycler loaded in B1-A1 slots."""
maybe_module = self.get_by_slot(
DeckSlotName.SLOT_A1
) or self.get_overflowed_module_in_slot(DeckSlotName.SLOT_A1)
if (
self._state.deck_type == DeckType.OT3_STANDARD
and maybe_module
and maybe_module.model == ModuleModel.THERMOCYCLER_MODULE_V2
):
return True
else:
return False
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ def test_maps_trash_bins(decoy: Decoy, mock_state_view: StateView) -> None:
),
pytest.raises(
deck_conflict.PartialTipMovementNotAllowedError,
match="collision with items in deck slot C4",
match="collision with items in staging slot C4",
),
),
],
Expand Down Expand Up @@ -536,6 +536,73 @@ def test_deck_conflict_raises_for_bad_pipette_move(
)


@pytest.mark.parametrize(
("robot_type", "deck_type"),
[("OT-3 Standard", DeckType.OT3_STANDARD)],
)
def test_deck_conflict_raises_for_collision_with_tc_lid(
decoy: Decoy,
mock_state_view: StateView,
) -> None:
"""It should raise an error if pipette might collide with thermocycler lid on the Flex."""
destination_well_point = Point(x=123, y=123, z=123)
nozzle_bounds_at_destination = (
Point(x=50, y=150, z=60),
Point(x=150, y=50, z=60),
Point(x=97, y=403, z=204.5),
Point(x=50, y=50, z=60),
)

decoy.when(
mock_state_view.pipettes.get_is_partially_configured("pipette-id")
).then_return(True)
decoy.when(mock_state_view.pipettes.get_primary_nozzle("pipette-id")).then_return(
"A12"
)
decoy.when(
mock_state_view.geometry.get_ancestor_slot_name("destination-labware-id")
).then_return(DeckSlotName.SLOT_C2)

decoy.when(
mock_state_view.geometry.get_well_position(
labware_id="destination-labware-id",
well_name="A2",
well_location=WellLocation(origin=WellOrigin.TOP, offset=WellOffset(z=10)),
)
).then_return(destination_well_point)
decoy.when(
mock_state_view.pipettes.get_nozzle_bounds_at_specified_move_to_position(
pipette_id="pipette-id", destination_position=destination_well_point
)
).then_return(nozzle_bounds_at_destination)

decoy.when(
adjacent_slots_getters.get_surrounding_slots(5, robot_type="OT-3 Standard")
).then_return(
_MixedTypeSlots(
regular_slots=[
DeckSlotName.SLOT_A1,
DeckSlotName.SLOT_B1,
],
staging_slots=[StagingSlotName.SLOT_C4],
)
)
decoy.when(mock_state_view.modules.is_flex_deck_with_thermocycler()).then_return(
True
)
with pytest.raises(
deck_conflict.PartialTipMovementNotAllowedError,
match="collision with thermocycler lid in deck slot A1.",
):
deck_conflict.check_safe_for_pipette_movement(
engine_state=mock_state_view,
pipette_id="pipette-id",
labware_id="destination-labware-id",
well_name="A2",
well_location=WellLocation(origin=WellOrigin.TOP, offset=WellOffset(z=10)),
)


@pytest.mark.parametrize(
("robot_type", "deck_type"),
[("OT-3 Standard", DeckType.OT3_STANDARD)],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ def test_deck_conflicts_for_96_ch_a12_column_configuration() -> None:
)

thermocycler = protocol_context.load_module("thermocyclerModuleV2")
tc_adjacent_plate = protocol_context.load_labware(
"opentrons_96_wellplate_200ul_pcr_full_skirt", "A2"
)
accessible_plate = thermocycler.load_labware(
"opentrons_96_wellplate_200ul_pcr_full_skirt"
)
Expand Down Expand Up @@ -67,8 +70,13 @@ def test_deck_conflicts_for_96_ch_a12_column_configuration() -> None:
):
instrument.dispense(50, badly_placed_labware.wells()[0])

# Currently does not raise a 'collision with thermocycler lid' error`
# because it's the pipette outer cover that hits the lid, but we don't include
# the cover in pipette dimensions yet.
instrument.dispense(10, tc_adjacent_plate.wells_by_name()["A1"])

# No error cuz dispensing from high above plate, so it clears tuberack in west slot
instrument.dispense(25, badly_placed_labware.wells_by_name()["A1"].top(150))
instrument.dispense(15, badly_placed_labware.wells_by_name()["A1"].top(150))

thermocycler.open_lid() # type: ignore[union-attr]

Expand Down
52 changes: 52 additions & 0 deletions api/tests/opentrons/protocol_engine/state/test_module_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -1819,3 +1819,55 @@ def test_get_overflowed_module_in_slot(tempdeck_v1_def: ModuleDefinition) -> Non
location=DeckSlotLocation(slotName=DeckSlotName.SLOT_1),
serialNumber="serial-number",
)


@pytest.mark.parametrize(
argnames=["deck_type", "module_def", "module_slot", "expected_result"],
argvalues=[
(
DeckType.OT3_STANDARD,
lazy_fixture("thermocycler_v2_def"),
DeckSlotName.SLOT_A1,
True,
),
(
DeckType.OT3_STANDARD,
lazy_fixture("tempdeck_v1_def"),
DeckSlotName.SLOT_A1,
False,
),
(
DeckType.OT3_STANDARD,
lazy_fixture("thermocycler_v2_def"),
DeckSlotName.SLOT_1,
False,
),
(
DeckType.OT2_STANDARD,
lazy_fixture("thermocycler_v2_def"),
DeckSlotName.SLOT_A1,
False,
),
],
)
def test_is_flex_deck_with_thermocycler(
deck_type: DeckType,
module_def: ModuleDefinition,
module_slot: DeckSlotName,
expected_result: bool,
) -> None:
"""It should return True if there is a thermocycler on Flex."""
subject = make_module_view(
slot_by_module_id={"module-id": DeckSlotName.SLOT_B1},
hardware_by_module_id={
"module-id": HardwareModule(
serial_number="serial-number",
definition=module_def,
)
},
additional_slots_occupied_by_module_id={
"module-id": [module_slot, DeckSlotName.SLOT_C1],
},
deck_type=deck_type,
)
assert subject.is_flex_deck_with_thermocycler() == expected_result
9 changes: 9 additions & 0 deletions shared-data/python/opentrons_shared_data/module/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,18 @@

OLD_TC_GEN2_LABWARE_OFFSET = {"x": 0, "y": 68.06, "z": 98.26}

# TODO (spp, 2023-02-14): these values are measured experimentally, and aren't from
# machine drawings. We should replace them with values from CAD files and
# possibly make them a part of thermocycler/ deck definitions
FLEX_TC_LID_CLIP_POSITIONS_IN_DECK_COORDINATES = {
"left_clip": {"x": -3.25, "y": 402, "z": 205},
"right_clip": {"x": 97.75, "y": 402, "z": 205},
}

# TODO (spp, 2022-05-12): Python has a built-in error called `ModuleNotFoundError` so,
# maybe rename this one?


class ModuleNotFoundError(KeyError):
def __init__(self, version: str, model_or_loadname: str):
super().__init__(model_or_loadname)
Expand Down

0 comments on commit 7b456ed

Please sign in to comment.