Skip to content

Commit

Permalink
refactor(shared-data, robot-server, api): Pipette configuration archi…
Browse files Browse the repository at this point in the history
…tecture refactor to organize by nozzle map and tip type (#15250)

Covers PLAT-334, with effects on PLAT-270, PLAT-269
Addresses capability for nozzle layout and tip type to effect current, speed, distance and tip overlap
  • Loading branch information
CaseyBatten authored and aaron-kulkarni committed Jun 13, 2024
1 parent 831195c commit 01de4d6
Show file tree
Hide file tree
Showing 101 changed files with 9,893 additions and 1,753 deletions.
76 changes: 69 additions & 7 deletions api/src/opentrons/hardware_control/instruments/ot2/pipette.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
PipetteModelVersionType,
PipetteNameType,
PipetteLiquidPropertiesDefinition,
PressFitPickUpTipConfiguration,
)
from opentrons_shared_data.pipette import (
load_data as load_pipette_data,
Expand Down Expand Up @@ -58,6 +59,12 @@
)
from opentrons.hardware_control.dev_types import InstrumentHardwareConfigs

from opentrons.hardware_control.util import (
pick_up_speed_by_configuration,
pick_up_distance_by_configuration,
pick_up_current_by_configuration,
nominal_tip_overlap_dictionary_by_configuration,
)

RECONFIG_KEYS = {"quirks"}

Expand Down Expand Up @@ -112,9 +119,16 @@ def __init__(
pipette_channels=config.channels,
pipette_version=config.version,
)
self._valid_nozzle_maps = load_pipette_data.load_valid_nozzle_maps(
self._pipette_model.pipette_type,
self._pipette_model.pipette_channels,
self._pipette_model.pipette_version,
)
self._nozzle_offset = self._config.nozzle_offset
self._nozzle_manager = (
nozzle_manager.NozzleConfigurationManager.build_from_config(self._config)
nozzle_manager.NozzleConfigurationManager.build_from_config(
self._config, self._valid_nozzle_maps
)
)
self._current_volume = 0.0
self._working_volume = float(self._liquid_class.max_volume)
Expand Down Expand Up @@ -148,7 +162,9 @@ def __init__(
self._active_tip_settings.default_blowout_flowrate.default
)

self._tip_overlap_lookup = self._liquid_class.tip_overlap_dictionary
self._versioned_tip_overlap_dictionary = (
self.get_nominal_tip_overlap_dictionary_by_configuration()
)

if use_old_aspiration_functions:
self._pipetting_function_version = PIPETTING_FUNCTION_FALLBACK_VERSION
Expand Down Expand Up @@ -216,8 +232,8 @@ def pipette_offset(self) -> PipetteOffsetByPipetteMount:
return self._pipette_offset

@property
def tip_overlap(self) -> Dict[str, float]:
return self._tip_overlap_lookup
def tip_overlap(self) -> Dict[str, Dict[str, float]]:
return self._versioned_tip_overlap_dictionary

@property
def channels(self) -> pip_types.PipetteChannelType:
Expand Down Expand Up @@ -290,9 +306,14 @@ def reset_state(self) -> None:
self.active_tip_settings.default_blowout_flowrate.default
)

self._tip_overlap_lookup = self.liquid_class.tip_overlap_dictionary
self._versioned_tip_overlap_dictionary = (
self.get_nominal_tip_overlap_dictionary_by_configuration()
)

self._nozzle_manager = (
nozzle_manager.NozzleConfigurationManager.build_from_config(self._config)
nozzle_manager.NozzleConfigurationManager.build_from_config(
self._config, self._valid_nozzle_maps
)
)

def reset_pipette_offset(self, mount: Mount, to_default: bool) -> None:
Expand Down Expand Up @@ -520,6 +541,45 @@ def remove_tip(self) -> None:
def has_tip(self) -> bool:
return self._has_tip

def get_pick_up_speed_by_configuration(
self,
config: PressFitPickUpTipConfiguration,
) -> float:
return pick_up_speed_by_configuration(
config,
self._nozzle_manager.current_configuration.valid_map_key,
pip_types.PipetteTipType(self._liquid_class.max_volume),
)

def get_pick_up_distance_by_configuration(
self,
config: PressFitPickUpTipConfiguration,
) -> float:
return pick_up_distance_by_configuration(
config,
self._nozzle_manager.current_configuration.valid_map_key,
pip_types.PipetteTipType(self._liquid_class.max_volume),
)

def get_pick_up_current_by_configuration(
self,
config: PressFitPickUpTipConfiguration,
) -> float:
return pick_up_current_by_configuration(
config,
self._nozzle_manager.current_configuration.valid_map_key,
pip_types.PipetteTipType(self._liquid_class.max_volume),
)

def get_nominal_tip_overlap_dictionary_by_configuration(
self,
) -> Dict[str, Dict[str, float]]:
return nominal_tip_overlap_dictionary_by_configuration(
self._config,
self._nozzle_manager.current_configuration.valid_map_key,
pip_types.PipetteTipType(self._liquid_class.max_volume),
)

# Cache max is chosen somewhat arbitrarily. With a float is input we don't
# want this to unbounded.
@functools.lru_cache(maxsize=100)
Expand Down Expand Up @@ -571,7 +631,9 @@ def as_dict(self) -> "Pipette.DictType":
"default_dispense_flow_rates": self.dispense_flow_rates_lookup,
"tip_length": self.current_tip_length,
"return_tip_height": self.active_tip_settings.default_return_tip_height,
"tip_overlap": self.tip_overlap,
"tip_overlap": self.tip_overlap[
"v0"
], # TODO(cb, 2024-06-11): hard coded to "v0" - when versioned tip overlaps are fully integrated this must change
"back_compat_names": self._config.pipette_backcompat_names,
"supported_tips": self.liquid_class.supported_tips,
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -772,7 +772,7 @@ def plan_check_pick_up_tip( # type: ignore[no-untyped-def]
if instrument.has_tip:
raise UnexpectedTipAttachError("pick_up_tip", instrument.name, mount.name)
self._ihp_log.debug(f"Picking up tip on {mount.name}")
tip_count = instrument.nozzle_manager.current_configuration.tip_count

if presses is None or presses < 0:
checked_presses = instrument.pick_up_configurations.press_fit.presses
else:
Expand All @@ -783,11 +783,12 @@ def plan_check_pick_up_tip( # type: ignore[no-untyped-def]
else:
check_incr = increment

pick_up_speed = instrument.pick_up_configurations.press_fit.speed_by_tip_count[
tip_count
]
pick_up_distance = (
instrument.pick_up_configurations.press_fit.distance_by_tip_count[tip_count]
pick_up_speed = instrument.get_pick_up_speed_by_configuration(
instrument.pick_up_configurations.press_fit
)

pick_up_distance = instrument.get_pick_up_distance_by_configuration(
instrument.pick_up_configurations.press_fit
)

def build_presses() -> Iterator[Tuple[float, float]]:
Expand Down Expand Up @@ -817,9 +818,9 @@ def add_tip_to_instr() -> None:
current={
Axis.by_mount(
mount
): instrument.pick_up_configurations.press_fit.current_by_tip_count[
tip_count
]
): instrument.get_pick_up_current_by_configuration(
instrument.pick_up_configurations.press_fit
)
},
speed=pick_up_speed,
relative_down=top_types.Point(0, 0, press_dist),
Expand All @@ -846,9 +847,9 @@ def add_tip_to_instr() -> None:
current={
Axis.by_mount(
mount
): instrument.pick_up_configurations.press_fit.current_by_tip_count[
instrument.nozzle_manager.current_configuration.tip_count
]
): instrument.get_pick_up_current_by_configuration(
instrument.pick_up_configurations.press_fit
)
},
speed=pick_up_speed,
relative_down=top_types.Point(0, 0, press_dist),
Expand Down
122 changes: 100 additions & 22 deletions api/src/opentrons/hardware_control/instruments/ot3/pipette.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,13 @@
from opentrons.hardware_control.errors import InvalidCriticalPoint
from opentrons.hardware_control import nozzle_manager

from opentrons.hardware_control.util import (
pick_up_speed_by_configuration,
pick_up_distance_by_configuration,
pick_up_current_by_configuration,
nominal_tip_overlap_dictionary_by_configuration,
)

mod_log = logging.getLogger(__name__)


Expand Down Expand Up @@ -96,9 +103,16 @@ def __init__(
pipette_channels=config.channels,
pipette_version=config.version,
)
self._valid_nozzle_maps = load_pipette_data.load_valid_nozzle_maps(
self._pipette_model.pipette_type,
self._pipette_model.pipette_channels,
self._pipette_model.pipette_version,
)
self._nozzle_offset = self._config.nozzle_offset
self._nozzle_manager = (
nozzle_manager.NozzleConfigurationManager.build_from_config(self._config)
nozzle_manager.NozzleConfigurationManager.build_from_config(
self._config, self._valid_nozzle_maps
)
)
self._current_volume = 0.0
self._working_volume = float(self._liquid_class.max_volume)
Expand Down Expand Up @@ -133,7 +147,9 @@ def __init__(
)
self._flow_acceleration = self._active_tip_settings.default_flow_acceleration

self._tip_overlap_lookup = self._liquid_class.tip_overlap_dictionary
self._versioned_tip_overlap_dictionary = (
self.get_nominal_tip_overlap_dictionary_by_configuration()
)

if use_old_aspiration_functions:
self._pipetting_function_version = PIPETTING_FUNCTION_FALLBACK_VERSION
Expand Down Expand Up @@ -161,8 +177,8 @@ def backlash_distance(self) -> float:
return self._backlash_distance

@property
def tip_overlap(self) -> Dict[str, float]:
return self._tip_overlap_lookup
def tip_overlap(self) -> Dict[str, Dict[str, float]]:
return self._versioned_tip_overlap_dictionary

@property
def nozzle_offset(self) -> Point:
Expand Down Expand Up @@ -254,9 +270,13 @@ def reset_state(self) -> None:
)
self._flow_acceleration = self._active_tip_settings.default_flow_acceleration

self._tip_overlap_lookup = self.liquid_class.tip_overlap_dictionary
self._versioned_tip_overlap_dictionary = (
self.get_nominal_tip_overlap_dictionary_by_configuration()
)
self._nozzle_manager = (
nozzle_manager.NozzleConfigurationManager.build_from_config(self._config)
nozzle_manager.NozzleConfigurationManager.build_from_config(
self._config, self._valid_nozzle_maps
)
)

def reset_pipette_offset(self, mount: OT3Mount, to_default: bool) -> None:
Expand Down Expand Up @@ -560,7 +580,7 @@ def as_dict(self) -> "Pipette.DictType":
"default_flow_acceleration": self.active_tip_settings.default_flow_acceleration,
"tip_length": self.current_tip_length,
"return_tip_height": self.active_tip_settings.default_return_tip_height,
"tip_overlap": self.tip_overlap,
"tip_overlap": self.tip_overlap["v0"],
"back_compat_names": self._config.pipette_backcompat_names,
"supported_tips": self.liquid_class.supported_tips,
}
Expand Down Expand Up @@ -655,32 +675,90 @@ def set_tip_type(self, tip_type: pip_types.PipetteTipType) -> None:
self._flow_acceleration = self._active_tip_settings.default_flow_acceleration

self._fallback_tip_length = self._active_tip_settings.default_tip_length
self._tip_overlap_lookup = self.liquid_class.tip_overlap_dictionary
self._versioned_tip_overlap_dictionary = (
self.get_nominal_tip_overlap_dictionary_by_configuration()
)
self._working_volume = min(tip_type.value, self.liquid_class.max_volume)

def get_pick_up_configuration_for_tip_count(
self, count: int
def get_pick_up_configuration( # noqa: C901
self,
) -> Union[CamActionPickUpTipConfiguration, PressFitPickUpTipConfiguration]:
for config in (
self._config.pick_up_tip_configurations.press_fit,
self._config.pick_up_tip_configurations.cam_action,
):
if not config:
continue

if isinstance(config, PressFitPickUpTipConfiguration) and all(
[
config.speed_by_tip_count.get(count),
config.distance_by_tip_count.get(count),
config.current_by_tip_count.get(count),
]
):
return config
elif config.current_by_tip_count.get(count) is not None:
return config
config_values = None
try:
config_values = config.configuration_by_nozzle_map[
self._nozzle_manager.current_configuration.valid_map_key
][self._active_tip_setting_name.name]
except KeyError:
try:
config_values = config.configuration_by_nozzle_map[
self._nozzle_manager.current_configuration.valid_map_key
].get("default")
if config_values is None:
raise KeyError(
f"Default tip type configuration values do not exist for Nozzle Map {self._nozzle_manager.current_configuration.valid_map_key}."
)
except KeyError:
# No valid key found for the approved nozzle map under this configuration - try the next
continue
if config_values is not None:
if isinstance(config, PressFitPickUpTipConfiguration) and all(
[
config_values.speed,
config_values.distance,
config_values.current,
]
):
return config
elif config_values.current is not None:
return config

raise CommandPreconditionViolated(
message=f"No pick up tip configuration for {count} tips",
message="No valid pick up tip configuration values found in instrument definition.",
)

def get_pick_up_speed_by_configuration(
self,
config: Union[CamActionPickUpTipConfiguration, PressFitPickUpTipConfiguration],
) -> float:
return pick_up_speed_by_configuration(
config,
self._nozzle_manager.current_configuration.valid_map_key,
self._active_tip_setting_name,
)

def get_pick_up_distance_by_configuration(
self,
config: Union[CamActionPickUpTipConfiguration, PressFitPickUpTipConfiguration],
) -> float:
return pick_up_distance_by_configuration(
config,
self._nozzle_manager.current_configuration.valid_map_key,
self._active_tip_setting_name,
)

def get_pick_up_current_by_configuration(
self,
config: Union[CamActionPickUpTipConfiguration, PressFitPickUpTipConfiguration],
) -> float:
return pick_up_current_by_configuration(
config,
self._nozzle_manager.current_configuration.valid_map_key,
self._active_tip_setting_name,
)

def get_nominal_tip_overlap_dictionary_by_configuration(
self,
) -> Dict[str, Dict[str, float]]:
return nominal_tip_overlap_dictionary_by_configuration(
self._config,
self._nozzle_manager.current_configuration.valid_map_key,
self._active_tip_setting_name,
)


Expand Down
Loading

0 comments on commit 01de4d6

Please sign in to comment.