Skip to content

Commit

Permalink
feat(api): Rotate the module calibration offset if the module was phy…
Browse files Browse the repository at this point in the history
…sically moved and rotated. (#13441)
  • Loading branch information
vegano1 authored Oct 4, 2023
1 parent 419d225 commit af7c4d4
Show file tree
Hide file tree
Showing 13 changed files with 226 additions and 96 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -155,3 +155,6 @@ opentrons-robot-app.tar.gz

# local VERSION.json file when pushing to Flex
*new_version_file.json

# swap files
*.swp
10 changes: 5 additions & 5 deletions api/src/opentrons/hardware_control/module_control.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,10 +231,10 @@ def get_module_by_module_id(
return found_module

def load_module_offset(
self, module_type: ModuleType, module_id: str, slot: Optional[str] = None
) -> ModuleCalibrationOffset:
self, module_type: ModuleType, module_id: str
) -> Optional[ModuleCalibrationOffset]:
log.info(f"Loading module offset for {module_type} {module_id}")
return load_module_calibration_offset(module_type, module_id, slot)
return load_module_calibration_offset(module_type, module_id)

def save_module_offset(
self,
Expand All @@ -244,9 +244,9 @@ def save_module_offset(
slot: str,
offset: Point,
instrument_id: Optional[str] = None,
) -> ModuleCalibrationOffset:
) -> Optional[ModuleCalibrationOffset]:
log.info(f"Saving module {module} {module_id} offset: {offset} for slot {slot}")
save_module_calibration_offset(
offset, mount, slot, module, module_id, instrument_id
)
return load_module_calibration_offset(module, module_id, slot)
return load_module_calibration_offset(module, module_id)
49 changes: 18 additions & 31 deletions api/src/opentrons/hardware_control/modules/module_calibration.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
save_module_calibration,
)
from opentrons.calibration_storage.types import SourceType
from opentrons.config.robot_configs import default_module_calibration_offset
from opentrons.hardware_control.modules.types import ModuleType
from opentrons.hardware_control.types import OT3Mount

Expand All @@ -26,7 +25,7 @@ class ModuleCalibrationOffset:
module: ModuleType
source: SourceType
status: CalibrationStatus
slot: Optional[str] = None
slot: str
mount: Optional[OT3Mount] = None
instrument_id: Optional[str] = None
last_modified: Optional[datetime] = None
Expand All @@ -35,37 +34,26 @@ class ModuleCalibrationOffset:
def load_module_calibration_offset(
module_type: ModuleType,
module_id: str,
slot: Optional[str] = None,
) -> ModuleCalibrationOffset:
) -> Optional[ModuleCalibrationOffset]:
"""Loads the calibration offset for a module."""
# load default if module offset data do not exist
module_cal_obj = ModuleCalibrationOffset(
slot=slot,
offset=Point(*default_module_calibration_offset()),
module_offset_data = get_module_offset(module_type, module_id)
if not module_offset_data:
return None
return ModuleCalibrationOffset(
module=module_type,
module_id=module_id,
source=SourceType.default,
status=CalibrationStatus(),
slot=module_offset_data.slot,
mount=module_offset_data.mount,
offset=module_offset_data.offset,
last_modified=module_offset_data.lastModified,
instrument_id=module_offset_data.instrument_id,
source=module_offset_data.source,
status=CalibrationStatus(
markedAt=module_offset_data.status.markedAt,
markedBad=module_offset_data.status.markedBad,
source=module_offset_data.status.source,
),
)
if module_id:
module_offset_data = get_module_offset(module_type, module_id)
if module_offset_data:
return ModuleCalibrationOffset(
module=module_type,
module_id=module_id,
slot=module_offset_data.slot,
mount=module_offset_data.mount,
offset=module_offset_data.offset,
last_modified=module_offset_data.lastModified,
instrument_id=module_offset_data.instrument_id,
source=module_offset_data.source,
status=CalibrationStatus(
markedAt=module_offset_data.status.markedAt,
markedBad=module_offset_data.status.markedBad,
source=module_offset_data.status.source,
),
)
return module_cal_obj


def save_module_calibration_offset(
Expand All @@ -77,8 +65,7 @@ def save_module_calibration_offset(
instrument_id: Optional[str] = None,
) -> None:
"""Save the calibration offset for a given module."""
if module_id:
save_module_calibration(offset, mount, slot, module, module_id, instrument_id)
save_module_calibration(offset, mount, slot, module, module_id, instrument_id)


def load_all_module_calibrations() -> List[ModuleCalibrationOffset]:
Expand Down
12 changes: 8 additions & 4 deletions api/src/opentrons/hardware_control/ot3api.py
Original file line number Diff line number Diff line change
Expand Up @@ -2084,11 +2084,15 @@ async def save_module_offset(
self._log.warning(f"Could not save calibration: unknown module {module_id}")
return None
# TODO (ba, 2023-03-22): gripper_id and pipette_id should probably be combined to instrument_id
instrument_id = None
if self._gripper_handler.has_gripper():
instrument_id = self._gripper_handler.get_gripper().gripper_id
elif self._pipette_handler.has_pipette(mount):
if self._pipette_handler.has_pipette(mount):
instrument_id = self._pipette_handler.get_pipette(mount).pipette_id
elif mount == OT3Mount.GRIPPER and self._gripper_handler.has_gripper():
instrument_id = self._gripper_handler.get_gripper().gripper_id
else:
self._log.warning(
f"Could not save calibration: no instrument found for {mount}"
)
return None
module_type = module.MODULE_TYPE
self._log.info(
f"Saving module offset: {offset} for module {module_type.name} {module_id}."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
if TYPE_CHECKING:
from ...state import StateView

from ...types import ModuleOffsetVector
from ...types import ModuleOffsetVector, DeckSlotLocation

from opentrons.hardware_control import HardwareControlAPI
from opentrons.hardware_control.types import OT3Mount
Expand Down Expand Up @@ -46,6 +46,10 @@ class CalibrateModuleResult(BaseModel):
..., description="Offset of calibrated module."
)

location: DeckSlotLocation = Field(
..., description="The deck slot this module was calibrated in."
)


class CalibrateModuleImplementation(
AbstractCommandImpl[CalibrateModuleParams, CalibrateModuleResult]
Expand All @@ -67,7 +71,7 @@ async def execute(self, params: CalibrateModuleParams) -> CalibrateModuleResult:
self._hardware_api,
)
ot3_mount = OT3Mount.from_mount(params.mount)
slot = self._state_view.modules.get_location(params.moduleId).slotName.id
slot = self._state_view.modules.get_location(params.moduleId)
module_serial = self._state_view.modules.get_serial_number(params.moduleId)
# NOTE (ba, 2023-03-31): There are two wells for calibration labware definitions
# well A1 represents the location calibration square center relative to the adapters bottom-left corner
Expand All @@ -78,13 +82,14 @@ async def execute(self, params: CalibrateModuleParams) -> CalibrateModuleResult:

# start the calibration
module_offset = await calibration.calibrate_module(
ot3_api, ot3_mount, slot, module_serial, nominal_position
ot3_api, ot3_mount, slot.slotName.id, module_serial, nominal_position
)

return CalibrateModuleResult(
moduleOffset=ModuleOffsetVector(
x=module_offset.x, y=module_offset.y, z=module_offset.z
)
),
location=slot,
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,14 @@
)
from opentrons_shared_data.module import load_definition

from ..types import ModuleModel, ModuleDefinition, ModuleOffsetVector
from opentrons.types import DeckSlotName
from ..types import (
ModuleModel,
ModuleDefinition,
ModuleOffsetVector,
ModuleOffsetData,
DeckSlotLocation,
)


class ModuleDataProvider:
Expand All @@ -18,15 +25,20 @@ def get_definition(model: ModuleModel) -> ModuleDefinition:
return ModuleDefinition.parse_obj(data)

@staticmethod
def load_module_calibrations() -> Dict[str, ModuleOffsetVector]:
def load_module_calibrations() -> Dict[str, ModuleOffsetData]:
"""Load the module calibration offsets."""
module_calibrations: Dict[str, ModuleOffsetVector] = dict()
module_calibrations: Dict[str, ModuleOffsetData] = dict()
calibration_data = load_all_module_calibrations()
for calibration in calibration_data:
# NOTE module_id is really the module serial number, change this
module_calibrations[calibration.module_id] = ModuleOffsetVector(
x=calibration.offset.x,
y=calibration.offset.y,
z=calibration.offset.z,
module_calibrations[calibration.module_id] = ModuleOffsetData(
moduleOffsetVector=ModuleOffsetVector(
x=calibration.offset.x,
y=calibration.offset.y,
z=calibration.offset.z,
),
location=DeckSlotLocation(
slotName=DeckSlotName.from_primitive(calibration.slot),
),
)
return module_calibrations
38 changes: 36 additions & 2 deletions api/src/opentrons/protocol_engine/state/geometry.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Geometry state getters."""
import enum
from numpy import array, dot
from typing import Optional, List, Set, Tuple, Union, cast

from opentrons.types import Point, DeckSlotName, MountType
Expand All @@ -18,9 +19,10 @@
DeckSlotLocation,
ModuleLocation,
OnLabwareLocation,
ModuleOffsetVector,
LabwareLocation,
LabwareOffsetVector,
ModuleOffsetVector,
ModuleOffsetData,
DeckType,
CurrentWell,
TipGeometry,
Expand Down Expand Up @@ -186,13 +188,45 @@ def _get_labware_position_offset(
f"Either it has been loaded off-deck or its been moved off-deck."
)

def _normalize_module_calibration_offset(
self,
module_location: DeckSlotLocation,
offset_data: Optional[ModuleOffsetData],
) -> ModuleOffsetVector:
"""Normalize the module calibration offset depending on the module location."""
if not offset_data:
return ModuleOffsetVector(x=0, y=0, z=0)
offset = offset_data.moduleOffsetVector
calibrated_slot = offset_data.location.slotName
calibrated_slot_column = self.get_slot_column(calibrated_slot)
current_slot_column = self.get_slot_column(module_location.slotName)
# make sure that we have valid colums since we cant have modules in the middle of the deck
assert set([calibrated_slot_column, current_slot_column]).issubset(
{1, 3}
), f"Module calibration offset is an invalid slot {calibrated_slot}"

# Check if the module has moved from one side of the deck to the other
if calibrated_slot_column != current_slot_column:
# Since the module was rotated, the calibration offset vector needs to be rotated by 180 degrees along the z axis
saved_offset = array([offset.x, offset.y, offset.z])
rotation_matrix = array([[-1, 0, 0], [0, -1, 0], [0, 0, 1]])
new_offset = dot(saved_offset, rotation_matrix) # type: ignore[no-untyped-call]
offset = ModuleOffsetVector(
x=new_offset[0], y=new_offset[1], z=new_offset[2]
)
return offset

def _get_calibrated_module_offset(
self, location: LabwareLocation
) -> ModuleOffsetVector:
"""Get a labware location's underlying calibrated module offset, if it is on a module."""
if isinstance(location, ModuleLocation):
module_id = location.moduleId
return self._modules.get_module_calibration_offset(module_id)
module_location = self._modules.get_location(module_id)
offset_data = self._modules.get_module_calibration_offset(module_id)
return self._normalize_module_calibration_offset(
module_location, offset_data
)
elif isinstance(location, DeckSlotLocation):
return ModuleOffsetVector(x=0, y=0, z=0)
elif isinstance(location, OnLabwareLocation):
Expand Down
48 changes: 21 additions & 27 deletions api/src/opentrons/protocol_engine/state/modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
LoadedModule,
ModuleModel,
ModuleOffsetVector,
ModuleOffsetData,
ModuleType,
ModuleDefinition,
DeckSlotLocation,
Expand Down Expand Up @@ -144,7 +145,7 @@ class ModuleState:
substate_by_module_id: Dict[str, ModuleSubStateType]
"""Information about each module that's specific to the module type."""

module_offset_by_serial: Dict[str, ModuleOffsetVector]
module_offset_by_serial: Dict[str, ModuleOffsetData]
"""Information about each modules offsets."""


Expand All @@ -154,7 +155,7 @@ class ModuleStore(HasState[ModuleState], HandlesActions):
_state: ModuleState

def __init__(
self, module_calibration_offsets: Optional[Dict[str, ModuleOffsetVector]] = None
self, module_calibration_offsets: Optional[Dict[str, ModuleOffsetData]] = None
) -> None:
"""Initialize a ModuleStore and its state."""
self._state = ModuleState(
Expand Down Expand Up @@ -195,6 +196,7 @@ def _handle_command(self, command: Command) -> None:
self._update_module_calibration(
module_id=command.params.moduleId,
module_offset=command.result.moduleOffset,
location=command.result.location,
)

if isinstance(
Expand Down Expand Up @@ -289,15 +291,21 @@ def _add_module_substate(
)

def _update_module_calibration(
self, module_id: str, module_offset: ModuleOffsetVector
self,
module_id: str,
module_offset: ModuleOffsetVector,
location: DeckSlotLocation,
) -> None:
module = self._state.hardware_by_module_id.get(module_id)
if module:
module_serial = module.serial_number
assert (
module_serial is not None
), "Expected a module SN and got None instead."
self._state.module_offset_by_serial[module_serial] = module_offset
self._state.module_offset_by_serial[module_serial] = ModuleOffsetData(
moduleOffsetVector=module_offset,
location=location,
)

def _handle_heater_shaker_commands(
self,
Expand Down Expand Up @@ -650,19 +658,10 @@ def get_dimensions(self, module_id: str) -> ModuleDimensions:
"""Get the specified module's dimensions."""
return self.get_definition(module_id).dimensions

def get_module_calibration_offset(self, module_id: str) -> ModuleOffsetVector:
"""Get the stored module calibration offset."""
module_serial = self.get(module_id).serialNumber
if module_serial is not None:
offset = self._state.module_offset_by_serial.get(module_serial)
if offset:
return offset
return ModuleOffsetVector(x=0, y=0, z=0)

def get_nominal_module_offset(
self, module_id: str, deck_type: DeckType
) -> LabwareOffsetVector:
"""Get the module's offset vector computed with slot transform."""
"""Get the module's nominal offset vector computed with slot transform."""
definition = self.get_definition(module_id)
slot = self.get_location(module_id).slotName.id

Expand All @@ -689,19 +688,14 @@ def get_nominal_module_offset(
z=xformed[2],
)

def get_module_offset(
self, module_id: str, deck_type: DeckType
) -> LabwareOffsetVector:
"""Get the module's offset vector computed with slot transform and calibrated module offsets."""
offset_vector = self.get_nominal_module_offset(module_id, deck_type)

# add the calibrated module offset if there is one
cal_offset = self.get_module_calibration_offset(module_id)
return LabwareOffsetVector(
x=offset_vector.x + cal_offset.x,
y=offset_vector.y + cal_offset.y,
z=offset_vector.z + cal_offset.z,
)
def get_module_calibration_offset(
self, module_id: str
) -> Optional[ModuleOffsetData]:
"""Get the calibration module offset."""
module_serial = self.get(module_id).serialNumber
if module_serial:
return self._state.module_offset_by_serial.get(module_serial)
return None

def get_overall_height(self, module_id: str) -> float:
"""Get the height of the module, excluding any labware loaded atop it."""
Expand Down
Loading

0 comments on commit af7c4d4

Please sign in to comment.