Skip to content

Commit

Permalink
Merge branch 'chore_release-7.1.0' into correct_addressable_area_posi…
Browse files Browse the repository at this point in the history
…tions

Resolve a conflict in api/src/opentrons/protocol_api/core/engine/instrument.py.
  • Loading branch information
SyntaxColoring committed Dec 6, 2023
2 parents 79e1bc5 + 134f0b2 commit dfbd88c
Show file tree
Hide file tree
Showing 27 changed files with 1,115 additions and 239 deletions.
230 changes: 228 additions & 2 deletions api/src/opentrons/protocol_api/core/engine/deck_conflict.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,51 @@
"""A Protocol-Engine-friendly wrapper for opentrons.motion_planning.deck_conflict."""

import itertools
from typing import Collection, Dict, Optional, Tuple, overload
import logging
from typing import Collection, Dict, Optional, Tuple, overload, Union

from opentrons_shared_data.errors.exceptions import MotionPlanningFailureError

from opentrons.hardware_control.nozzle_manager import NozzleConfigurationType
from opentrons.hardware_control.modules.types import ModuleType
from opentrons.motion_planning import deck_conflict as wrapped_deck_conflict
from opentrons.motion_planning.adjacent_slots_getters import (
get_north_slot,
get_west_slot,
)
from opentrons.protocol_engine import (
StateView,
DeckSlotLocation,
ModuleLocation,
OnLabwareLocation,
AddressableAreaLocation,
OFF_DECK_LOCATION,
WellLocation,
DropTipWellLocation,
)
from opentrons.protocol_engine.errors.exceptions import LabwareNotLoadedOnModuleError
from opentrons.types import DeckSlotName
from opentrons.types import DeckSlotName, Point


class PartialTipMovementNotAllowedError(MotionPlanningFailureError):
"""Error raised when trying to perform a partial tip movement to an illegal location."""

def __init__(self, message: str) -> None:
super().__init__(
message=message,
)


_log = logging.getLogger(__name__)

# TODO (spp, 2023-12-06): move this to a location like motion planning where we can
# derive these values from geometry definitions
# Bounding box measurements
A12_column_front_left_bound = Point(x=-1.8, y=2)
A12_column_back_right_bound = Point(x=592, y=506.2)

# Arbitrary safety margin in z-direction
Z_SAFETY_MARGIN = 10


@overload
Expand Down Expand Up @@ -106,6 +137,201 @@ def check(
)


def check_safe_for_pipette_movement(
engine_state: StateView,
pipette_id: str,
labware_id: str,
well_name: str,
well_location: Union[WellLocation, DropTipWellLocation],
) -> None:
"""Check if the labware is safe to move to with a pipette in partial tip configuration.
Args:
engine_state: engine state view
pipette_id: ID of the pipette to be moved
labware_id: ID of the labware we are moving to
well_name: Name of the well to move to
well_location: exact location within the well to move to
"""
# TODO: either hide unsupported configurations behind an advance setting
# or log a warning that deck conflicts cannot be checked for tip config other than
# column config with A12 primary nozzle for the 96 channel
# or single tip config for 8-channel.
if engine_state.pipettes.get_channels(pipette_id) == 96:
_check_deck_conflict_for_96_channel(
engine_state=engine_state,
pipette_id=pipette_id,
labware_id=labware_id,
well_name=well_name,
well_location=well_location,
)
elif engine_state.pipettes.get_channels(pipette_id) == 8:
_check_deck_conflict_for_8_channel(
engine_state=engine_state,
pipette_id=pipette_id,
labware_id=labware_id,
well_name=well_name,
well_location=well_location,
)


def _check_deck_conflict_for_96_channel(
engine_state: StateView,
pipette_id: str,
labware_id: str,
well_name: str,
well_location: Union[WellLocation, DropTipWellLocation],
) -> None:
"""Check if there are any conflicts moving to the given labware with the configuration of 96-ch pipette."""
if not (
engine_state.pipettes.get_nozzle_layout_type(pipette_id)
== NozzleConfigurationType.COLUMN
and engine_state.pipettes.get_primary_nozzle(pipette_id) == "A12"
):
# Checking deck conflicts only for 12th column config
return

if isinstance(well_location, DropTipWellLocation):
# convert to WellLocation
well_location = engine_state.geometry.get_checked_tip_drop_location(
pipette_id=pipette_id,
labware_id=labware_id,
well_location=well_location,
partially_configured=True,
)

well_location_point = engine_state.geometry.get_well_position(
labware_id=labware_id, well_name=well_name, well_location=well_location
)

if not _is_within_pipette_extents(
engine_state=engine_state, pipette_id=pipette_id, location=well_location_point
):
raise PartialTipMovementNotAllowedError(
"Requested motion with A12 nozzle column configuration"
" is outside of robot bounds for the 96-channel."
)

labware_slot = engine_state.geometry.get_ancestor_slot_name(labware_id)
west_slot_number = get_west_slot(
_deck_slot_to_int(DeckSlotLocation(slotName=labware_slot))
)
if west_slot_number is None:
return

west_slot = DeckSlotName.from_primitive(
west_slot_number
).to_equivalent_for_robot_type(engine_state.config.robot_type)

west_slot_highest_z = engine_state.geometry.get_highest_z_in_slot(
DeckSlotLocation(slotName=west_slot)
)

pipette_tip = engine_state.pipettes.get_attached_tip(pipette_id)
tip_length = pipette_tip.length if pipette_tip else 0.0

if (
west_slot_highest_z + Z_SAFETY_MARGIN > well_location_point.z + tip_length
): # a safe margin magic number
raise PartialTipMovementNotAllowedError(
f"Moving to {engine_state.labware.get_load_name(labware_id)} in slot {labware_slot}"
f" with a Column nozzle configuration will result in collision with"
f" items in deck slot {west_slot}."
)


def _check_deck_conflict_for_8_channel(
engine_state: StateView,
pipette_id: str,
labware_id: str,
well_name: str,
well_location: Union[WellLocation, DropTipWellLocation],
) -> None:
"""Check if there are any conflicts moving to the given labware with the configuration of 8-ch pipette."""
if not (
engine_state.pipettes.get_nozzle_layout_type(pipette_id)
== NozzleConfigurationType.SINGLE
and engine_state.pipettes.get_primary_nozzle(pipette_id) == "H1"
):
# Checking deck conflicts only for H1 single tip config
return

if isinstance(well_location, DropTipWellLocation):
# convert to WellLocation
well_location = engine_state.geometry.get_checked_tip_drop_location(
pipette_id=pipette_id,
labware_id=labware_id,
well_location=well_location,
partially_configured=True,
)

well_location_point = engine_state.geometry.get_well_position(
labware_id=labware_id, well_name=well_name, well_location=well_location
)

if not _is_within_pipette_extents(
engine_state=engine_state, pipette_id=pipette_id, location=well_location_point
):
# WARNING: (spp, 2023-11-30: this needs to be wired up to check for
# 8-channel pipette extents on both OT2 & Flex!!)
raise PartialTipMovementNotAllowedError(
"Requested motion with single H1 nozzle configuration"
" is outside of robot bounds for the 8-channel."
)

labware_slot = engine_state.geometry.get_ancestor_slot_name(labware_id)
north_slot_number = get_north_slot(
_deck_slot_to_int(DeckSlotLocation(slotName=labware_slot))
)
if north_slot_number is None:
return

north_slot = DeckSlotName.from_primitive(
north_slot_number
).to_equivalent_for_robot_type(engine_state.config.robot_type)

north_slot_highest_z = engine_state.geometry.get_highest_z_in_slot(
DeckSlotLocation(slotName=north_slot)
)

pipette_tip = engine_state.pipettes.get_attached_tip(pipette_id)
tip_length = pipette_tip.length if pipette_tip else 0.0

if north_slot_highest_z + Z_SAFETY_MARGIN > well_location_point.z + tip_length:
raise PartialTipMovementNotAllowedError(
f"Moving to {engine_state.labware.get_load_name(labware_id)} in slot {labware_slot}"
f" with a Single nozzle configuration will result in collision with"
f" items in deck slot {north_slot}."
)


def _is_within_pipette_extents(
engine_state: StateView,
pipette_id: str,
location: Point,
) -> bool:
"""Whether a given point is within the extents of a configured pipette on the specified robot."""
robot_type = engine_state.config.robot_type
pipette_channels = engine_state.pipettes.get_channels(pipette_id)
nozzle_config = engine_state.pipettes.get_nozzle_layout_type(pipette_id)
primary_nozzle = engine_state.pipettes.get_primary_nozzle(pipette_id)
if robot_type == "OT-3 Standard":
if (
pipette_channels == 96
and nozzle_config == NozzleConfigurationType.COLUMN
and primary_nozzle == "A12"
):
return (
A12_column_front_left_bound.x
< location.x
< A12_column_back_right_bound.x
and A12_column_front_left_bound.y
< location.y
< A12_column_back_right_bound.y
)
# TODO (spp, 2023-11-07): check for 8-channel nozzle H1 extents on Flex & OT2
return True


def _map_labware(
engine_state: StateView,
labware_id: str,
Expand Down
Loading

0 comments on commit dfbd88c

Please sign in to comment.