Skip to content

Commit

Permalink
feat(api): Prepare PipetteStore to handle overpressure errors from as…
Browse files Browse the repository at this point in the history
…pirate commands (#15321)
  • Loading branch information
SyntaxColoring authored Jun 6, 2024
1 parent 13b3c6d commit 9097225
Show file tree
Hide file tree
Showing 5 changed files with 364 additions and 164 deletions.
7 changes: 4 additions & 3 deletions api/src/opentrons/protocol_engine/commands/command_unions.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from pydantic import Field

from .command import DefinedErrorData
from .pipetting_common import OverpressureError, OverpressureErrorInternalData

from . import heater_shaker
from . import magnetic_module
Expand Down Expand Up @@ -629,7 +630,7 @@
]

# All `DefinedErrorData`s that implementations will actually return in practice.
# There's just one right now, but this will eventually be a Union.
CommandDefinedErrorData = DefinedErrorData[
TipPhysicallyMissingError, TipPhysicallyMissingErrorInternalData
CommandDefinedErrorData = Union[
DefinedErrorData[TipPhysicallyMissingError, TipPhysicallyMissingErrorInternalData],
DefinedErrorData[OverpressureError, OverpressureErrorInternalData],
]
2 changes: 1 addition & 1 deletion api/src/opentrons/protocol_engine/commands/pick_up_tip.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class TipPhysicallyMissingError(ErrorOccurrence):
detail: str = "No tip detected."


@dataclass
@dataclass(frozen=True)
class TipPhysicallyMissingErrorInternalData:
"""Internal-to-ProtocolEngine data about a TipPhysicallyMissingError."""

Expand Down
30 changes: 29 additions & 1 deletion api/src/opentrons/protocol_engine/commands/pipetting_common.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
"""Common pipetting command base models."""
from dataclasses import dataclass
from opentrons_shared_data.errors import ErrorCodes
from pydantic import BaseModel, Field
from typing import Optional
from typing import Literal, Optional

from opentrons.protocol_engine.errors.error_occurrence import ErrorOccurrence

from ..types import WellLocation, DeckPoint

Expand Down Expand Up @@ -117,3 +121,27 @@ class DestinationPositionResult(BaseModel):
" after the move was completed."
),
)


class OverpressureError(ErrorOccurrence):
"""Returned when sensors detect an overpressure error while moving liquid.
The pipette plunger motion is stopped at the point of the error. The next thing to
move the plunger must be a `home` or `blowout` command; commands like `aspirate`
will return an error.
"""

isDefined: bool = True

errorType: Literal["overpressure"] = "overpressure"

errorCode: str = ErrorCodes.PIPETTE_OVERPRESSURE.value.code
detail: str = ErrorCodes.PIPETTE_OVERPRESSURE.value.detail


@dataclass(frozen=True)
class OverpressureErrorInternalData:
"""Internal-to-ProtocolEngine data about an OverpressureError."""

position: DeckPoint
"""Same meaning as DestinationPositionResult.position."""
197 changes: 125 additions & 72 deletions api/src/opentrons/protocol_engine/state/pipettes.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
"""Basic pipette data state and store."""
from __future__ import annotations
from dataclasses import dataclass
from typing import Dict, List, Mapping, Optional, Tuple
from typing import Dict, List, Mapping, Optional, Tuple, Union
from typing_extensions import assert_type

from opentrons_shared_data.pipette import pipette_definition
from opentrons.config.defaults_ot2 import Z_RETRACT_DISTANCE
Expand All @@ -10,6 +11,13 @@
NozzleConfigurationType,
NozzleMap,
)
from opentrons.protocol_engine.actions.actions import FailCommandAction
from opentrons.protocol_engine.commands.aspirate import Aspirate
from opentrons.protocol_engine.commands.command import DefinedErrorData
from opentrons.protocol_engine.commands.pipetting_common import (
OverpressureError,
OverpressureErrorInternalData,
)
from opentrons.types import MountType, Mount as HwMount, Point

from .. import errors
Expand All @@ -24,7 +32,6 @@
TipGeometry,
)
from ..commands import (
Command,
LoadPipetteResult,
AspirateResult,
AspirateInPlaceResult,
Expand All @@ -46,7 +53,6 @@
TouchTipResult,
thermocycler,
heater_shaker,
CommandPrivateResult,
PrepareToAspirateResult,
)
from ..commands.configuring_common import (
Expand Down Expand Up @@ -150,16 +156,22 @@ def __init__(self) -> None:

def handle_action(self, action: Action) -> None:
"""Modify state in reaction to an action."""
if isinstance(action, SucceedCommandAction):
self._handle_command(action.command, action.private_result)
if isinstance(action, (SucceedCommandAction, FailCommandAction)):
self._handle_command(action)
elif isinstance(action, SetPipetteMovementSpeedAction):
self._state.movement_speed_by_id[action.pipette_id] = action.speed

def _handle_command( # noqa: C901
self, command: Command, private_result: CommandPrivateResult
self, action: Union[SucceedCommandAction, FailCommandAction]
) -> None:
self._update_current_location(command)
self._update_deck_point(command)
self._update_current_location(action)
self._update_deck_point(action)
self._update_volumes(action)

if not isinstance(action, SucceedCommandAction):
return

command, private_result = action.command, action.private_result

if isinstance(private_result, PipetteConfigUpdateResultMixin):
config = private_result.config
Expand Down Expand Up @@ -212,23 +224,6 @@ def _handle_command( # noqa: C901
pipette_id
] = static_config.default_nozzle_map

elif isinstance(command.result, (AspirateResult, AspirateInPlaceResult)):
pipette_id = command.params.pipetteId
previous_volume = self._state.aspirated_volume_by_id[pipette_id] or 0
# PipetteHandler will have clamped command.result.volume for us, so
# next_volume should always be in bounds.
next_volume = previous_volume + command.result.volume

self._state.aspirated_volume_by_id[pipette_id] = next_volume

elif isinstance(command.result, (DispenseResult, DispenseInPlaceResult)):
pipette_id = command.params.pipetteId
previous_volume = self._state.aspirated_volume_by_id[pipette_id] or 0
# PipetteHandler will have clamped command.result.volume for us, so
# next_volume should always be in bounds.
next_volume = previous_volume - command.result.volume
self._state.aspirated_volume_by_id[pipette_id] = next_volume

elif isinstance(command.result, PickUpTipResult):
pipette_id = command.params.pipetteId
attached_tip = TipGeometry(
Expand Down Expand Up @@ -277,19 +272,14 @@ def _handle_command( # noqa: C901
default_aspirate=tip_configuration.default_aspirate_flowrate.values_by_api_level,
default_dispense=tip_configuration.default_dispense_flowrate.values_by_api_level,
)
elif isinstance(command.result, (BlowOutResult, BlowOutInPlaceResult)):
pipette_id = command.params.pipetteId
self._state.aspirated_volume_by_id[pipette_id] = None

elif isinstance(command.result, PrepareToAspirateResult):
pipette_id = command.params.pipetteId
self._state.aspirated_volume_by_id[pipette_id] = 0

def _update_current_location(self, command: Command) -> None:
def _update_current_location( # noqa: C901
self, action: Union[SucceedCommandAction, FailCommandAction]
) -> None:
# These commands leave the pipette in a new location.
# Update current_location to reflect that.
if isinstance(
command.result,
if isinstance(action, SucceedCommandAction) and isinstance(
action.command.result,
(
MoveToWellResult,
PickUpTipResult,
Expand All @@ -301,27 +291,37 @@ def _update_current_location(self, command: Command) -> None:
),
):
self._state.current_location = CurrentWell(
pipette_id=command.params.pipetteId,
labware_id=command.params.labwareId,
well_name=command.params.wellName,
pipette_id=action.command.params.pipetteId,
labware_id=action.command.params.labwareId,
well_name=action.command.params.wellName,
)

elif isinstance(
command.result,
elif (
isinstance(action, FailCommandAction)
and isinstance(action.running_command, Aspirate)
and isinstance(action.error, DefinedErrorData)
and isinstance(action.error.public, OverpressureError)
):
self._state.current_location = CurrentWell(
pipette_id=action.running_command.params.pipetteId,
labware_id=action.running_command.params.labwareId,
well_name=action.running_command.params.wellName,
)
elif isinstance(action, SucceedCommandAction) and isinstance(
action.command.result,
(MoveToAddressableAreaResult, MoveToAddressableAreaForDropTipResult),
):
self._state.current_location = CurrentAddressableArea(
pipette_id=command.params.pipetteId,
addressable_area_name=command.params.addressableAreaName,
pipette_id=action.command.params.pipetteId,
addressable_area_name=action.command.params.addressableAreaName,
)

# These commands leave the pipette in a place that we can't logically associate
# with a well. Clear current_location to reflect the fact that it's now unknown.
#
# TODO(mc, 2021-11-12): Wipe out current_location on movement failures, too.
# TODO(jbl 2023-02-14): Need to investigate whether move relative should clear current location
elif isinstance(
command.result,
elif isinstance(action, SucceedCommandAction) and isinstance(
action.command.result,
(
HomeResult,
RetractAxisResult,
Expand All @@ -334,14 +334,14 @@ def _update_current_location(self, command: Command) -> None:

# Heater-Shaker commands may have left the pipette in a place that we can't
# associate with a logical location, depending on their result.
elif isinstance(
command.result,
elif isinstance(action, SucceedCommandAction) and isinstance(
action.command.result,
(
heater_shaker.SetAndWaitForShakeSpeedResult,
heater_shaker.OpenLabwareLatchResult,
),
):
if command.result.pipetteRetracted:
if action.command.result.pipetteRetracted:
self._state.current_location = None

# A moveLabware command may have moved the labware that contains the current
Expand All @@ -350,9 +350,11 @@ def _update_current_location(self, command: Command) -> None:
#
# This is necessary for safe motion planning in case the next movement
# goes to the same labware (now in a new place).
elif isinstance(command.result, MoveLabwareResult):
moved_labware_id = command.params.labwareId
if command.params.strategy == "usingGripper":
elif isinstance(action, SucceedCommandAction) and isinstance(
action.command.result, MoveLabwareResult
):
moved_labware_id = action.command.params.labwareId
if action.command.params.strategy == "usingGripper":
# All mounts will have been retracted.
self._state.current_location = None
elif (
Expand All @@ -361,9 +363,14 @@ def _update_current_location(self, command: Command) -> None:
):
self._state.current_location = None

def _update_deck_point(self, command: Command) -> None:
if isinstance(
command.result,
def _update_deck_point(
self, action: Union[SucceedCommandAction, FailCommandAction]
) -> None:
# This function mostly mirrors self._update_current_location().
# See there for explanations.

if isinstance(action, SucceedCommandAction) and isinstance(
action.command.result,
(
MoveToWellResult,
MoveToCoordinatesResult,
Expand All @@ -378,20 +385,28 @@ def _update_deck_point(self, command: Command) -> None:
TouchTipResult,
),
):
pipette_id = command.params.pipetteId
deck_point = command.result.position

try:
loaded_pipette = self._state.pipettes_by_id[pipette_id]
except KeyError:
self._clear_deck_point()
else:
self._state.current_deck_point = CurrentDeckPoint(
mount=loaded_pipette.mount, deck_point=deck_point
)
pipette_id = action.command.params.pipetteId
deck_point = action.command.result.position
loaded_pipette = self._state.pipettes_by_id[pipette_id]
self._state.current_deck_point = CurrentDeckPoint(
mount=loaded_pipette.mount, deck_point=deck_point
)
elif (
isinstance(action, FailCommandAction)
and isinstance(action.running_command, Aspirate)
and isinstance(action.error, DefinedErrorData)
and isinstance(action.error.public, OverpressureError)
):
assert_type(action.error.private, OverpressureErrorInternalData)
pipette_id = action.running_command.params.pipetteId
deck_point = action.error.private.position
loaded_pipette = self._state.pipettes_by_id[pipette_id]
self._state.current_deck_point = CurrentDeckPoint(
mount=loaded_pipette.mount, deck_point=deck_point
)

elif isinstance(
command.result,
elif isinstance(action, SucceedCommandAction) and isinstance(
action.command.result,
(
HomeResult,
RetractAxisResult,
Expand All @@ -401,21 +416,59 @@ def _update_deck_point(self, command: Command) -> None:
):
self._clear_deck_point()

elif isinstance(
command.result,
elif isinstance(action, SucceedCommandAction) and isinstance(
action.command.result,
(
heater_shaker.SetAndWaitForShakeSpeedResult,
heater_shaker.OpenLabwareLatchResult,
),
):
if command.result.pipetteRetracted:
if action.command.result.pipetteRetracted:
self._clear_deck_point()

elif isinstance(command.result, MoveLabwareResult):
if command.params.strategy == "usingGripper":
elif isinstance(action, SucceedCommandAction) and isinstance(
action.command.result, MoveLabwareResult
):
if action.command.params.strategy == "usingGripper":
# All mounts will have been retracted.
self._clear_deck_point()

def _update_volumes(
self, action: Union[SucceedCommandAction, FailCommandAction]
) -> None:
if isinstance(action, SucceedCommandAction) and isinstance(
action.command.result, (AspirateResult, AspirateInPlaceResult)
):
pipette_id = action.command.params.pipetteId
previous_volume = self._state.aspirated_volume_by_id[pipette_id] or 0
# PipetteHandler will have clamped action.command.result.volume for us, so
# next_volume should always be in bounds.
next_volume = previous_volume + action.command.result.volume

self._state.aspirated_volume_by_id[pipette_id] = next_volume

elif isinstance(action, SucceedCommandAction) and isinstance(
action.command.result, (DispenseResult, DispenseInPlaceResult)
):
pipette_id = action.command.params.pipetteId
previous_volume = self._state.aspirated_volume_by_id[pipette_id] or 0
# PipetteHandler will have clamped action.command.result.volume for us, so
# next_volume should always be in bounds.
next_volume = previous_volume - action.command.result.volume
self._state.aspirated_volume_by_id[pipette_id] = next_volume

elif isinstance(action, SucceedCommandAction) and isinstance(
action.command.result, (BlowOutResult, BlowOutInPlaceResult)
):
pipette_id = action.command.params.pipetteId
self._state.aspirated_volume_by_id[pipette_id] = None

elif isinstance(action, SucceedCommandAction) and isinstance(
action.command.result, PrepareToAspirateResult
):
pipette_id = action.command.params.pipetteId
self._state.aspirated_volume_by_id[pipette_id] = 0

def _clear_deck_point(self) -> None:
"""Reset last deck point to default None value for mount and point."""
self._state.current_deck_point = CurrentDeckPoint(mount=None, deck_point=None)
Expand Down
Loading

0 comments on commit 9097225

Please sign in to comment.