Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(api): segment 96-channel tip pickup and dropoff moves #13669

Merged
merged 19 commits into from
Oct 20, 2023
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,14 @@ def has_tip(self) -> bool:
def tip_presence_check_dist_mm(self) -> float:
return self._config.tip_presence_check_distance_mm

@property
def connect_tiprack_distance_mm(self) -> float:
return self._config.connect_tiprack_distance_mm

@property
def end_tip_action_retract_distance_mm(self) -> float:
return self._config.end_tip_action_retract_distance_mm

# Cache max is chosen somewhat arbitrarily. With a float is input we don't
# want this to unbounded.
@functools.lru_cache(maxsize=100)
Expand Down
302 changes: 121 additions & 181 deletions api/src/opentrons/hardware_control/instruments/ot3/pipette_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@
Any,
cast,
List,
Sequence,
Iterator,
TypeVar,
)
from typing_extensions import Final
Expand Down Expand Up @@ -72,59 +70,21 @@ class LiquidActionSpec:
current: float


# TODO during refactor we should figure out if
# we still need these dataclasses


@dataclass(frozen=True)
class PickUpTipPressSpec:
relative_down: top_types.Point
relative_up: top_types.Point
current: Dict[Axis, float]
speed: float


@dataclass(frozen=True)
class TipMotorPickUpTipSpec:
tiprack_down: top_types.Point
tiprack_up: top_types.Point
pick_up_distance: float
speed: float
currents: Dict[Axis, float]
# FIXME we should throw this in a config
# file of some sort
home_buffer: float = 0


@dataclass(frozen=True)
class PickUpTipSpec:
plunger_prep_pos: float
plunger_currents: Dict[Axis, float]
presses: List[PickUpTipPressSpec]
shake_off_list: List[Tuple[top_types.Point, Optional[float]]]
retract_target: float
pick_up_motor_actions: Optional[TipMotorPickUpTipSpec]


@dataclass(frozen=True)
class DropTipMove:
target_position: float
current: Dict[Axis, float]
class TipActionMoveSpec:
distance: float
currents: Optional[Dict[Axis, float]]
speed: Optional[float]
home_after: bool = False
home_after_safety_margin: float = 0
home_axes: Sequence[Axis] = tuple()
is_ht_tip_action: bool = False
# FIXME we should throw this in a config
# file of some sort
home_buffer: float = 0


@dataclass(frozen=True)
class DropTipSpec:
drop_moves: List[DropTipMove]
shake_moves: List[Tuple[top_types.Point, Optional[float]]]
ending_current: Dict[Axis, float]
class TipActionSpec:
tip_action_moves: List[
TipActionMoveSpec
]
shake_off_moves: List[Tuple[top_types.Point, Optional[float]]]
z_distance_to_tiprack: Optional[float] = None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can these be moves?

ending_z_retract_distance: Optional[float] = None


class OT3PipetteHandler:
Expand Down Expand Up @@ -704,6 +664,34 @@ def plan_check_blow_out(
current=instrument.plunger_motor_current.run,
)

@staticmethod
def _build_tip_motor_moves(
prep_move_dist: float,
clamp_move_dist: float,
prep_move_speed: float,
clamp_move_speed: float,
tip_motor_current: float,
plunger_current: float,
) -> List[TipActionMoveSpec]:
return [
TipActionMoveSpec(
distance=prep_move_dist,
speed=prep_move_speed,
currents={
Axis.P_L: plunger_current,
Axis.Q: tip_motor_current,
},
),
TipActionMoveSpec(
distance=prep_move_dist + clamp_move_dist,
speed=clamp_move_speed,
currents={
Axis.P_L: plunger_current,
Axis.Q: tip_motor_current,
},
),
]

@staticmethod
def _build_pickup_shakes(
instrument: Pipette,
Expand All @@ -723,23 +711,41 @@ def build_one_shake() -> List[Tuple[top_types.Point, Optional[float]]]:

return []

def plan_check_pick_up_tip(
def plan_ht_pick_up_tip(self, mount) -> TipActionSpec:
caila-marashaj marked this conversation as resolved.
Show resolved Hide resolved
# Prechecks: ready for pickup tip and press/increment are valid
instrument = self.get_pipette(mount)
if instrument.has_tip:
raise UnexpectedTipAttachError("pick_up_tip", instrument.name, mount.name)
self._ihp_log.debug(f"Picking up tip on {mount.name}")

tip_motor_moves = self._build_tip_motor_moves(
prep_move_dist=instrument.pick_up_configurations.prep_move_distance,
clamp_move_dist=instrument.pick_up_configurations.distance,
prep_move_speed=instrument.pick_up_configurations.prep_move_speed,
clamp_move_speed=instrument.pick_up_configurations.speed,
plunger_current=instrument.plunger_motor_current.run,
tip_motor_current=instrument.pick_up_configurations.current,
)

return TipActionSpec(
tip_action_moves=tip_motor_moves,
shake_off_moves=[],
z_distance_to_tiprack=(-1 * instrument.connect_tiprack_distance_mm),
ending_z_retract_distance=instrument.end_tip_action_retract_distance_mm,
)

def plan_lt_pick_up_tip(
self,
mount: OT3Mount,
tip_length: float,
presses: Optional[int],
increment: Optional[float],
) -> Tuple[PickUpTipSpec, Callable[[], None]]:
) -> TipActionSpec:
# Prechecks: ready for pickup tip and press/increment are valid
instrument = self.get_pipette(mount)
if instrument.has_tip:
raise UnexpectedTipAttachError("pick_up_tip", instrument.name, mount.name)
self._ihp_log.debug(f"Picking up tip on {mount.name}")

def add_tip_to_instr() -> None:
instrument.add_tip(tip_length=tip_length)
instrument.set_current_volume(0)

if presses is None or presses < 0:
checked_presses = instrument.pick_up_configurations.presses
else:
Expand All @@ -752,71 +758,41 @@ def add_tip_to_instr() -> None:

pick_up_speed = instrument.pick_up_configurations.speed

def build_presses() -> Iterator[Tuple[float, float]]:
def build_presses() -> List[TipActionMoveSpec]:
# Press the nozzle into the tip <presses> number of times,
# moving further by <increment> mm after each press
press_moves = []
for i in range(checked_presses):
# move nozzle down into the tip
press_dist = (
-1.0 * instrument.pick_up_configurations.distance
+ -1.0 * check_incr * i
)
# move nozzle back up
backup_dist = -press_dist
yield (press_dist, backup_dist)

if instrument.channels == 96:
return (
PickUpTipSpec(
plunger_prep_pos=instrument.plunger_positions.bottom,
plunger_currents={
Axis.of_main_tool_actuator(
mount
): instrument.plunger_motor_current.run,
},
presses=[],
shake_off_list=[],
retract_target=instrument.pick_up_configurations.distance,
pick_up_motor_actions=TipMotorPickUpTipSpec(
# Move onto the posts
tiprack_down=top_types.Point(0, 0, -7),
tiprack_up=top_types.Point(0, 0, 2),
pick_up_distance=instrument.pick_up_configurations.distance,
speed=instrument.pick_up_configurations.speed,
currents={Axis.Q: instrument.pick_up_configurations.current},
home_buffer=10,
),
),
add_tip_to_instr,
)
return (
PickUpTipSpec(
plunger_prep_pos=instrument.plunger_positions.bottom,
plunger_currents={
Axis.of_main_tool_actuator(
mount
): instrument.plunger_motor_current.run
},
presses=[
PickUpTipPressSpec(
current={
press_moves.append(
TipActionMoveSpec(
distance=press_dist,
speed=pick_up_speed,
currents={
Axis.by_mount(
mount
): instrument.pick_up_configurations.current
},
)
)
# move nozzle back up
backup_dist = -press_dist
press_moves.append(
TipActionMoveSpec(
distance=backup_dist,
speed=pick_up_speed,
relative_down=top_types.Point(0, 0, press_dist),
relative_up=top_types.Point(0, 0, backup_dist),
currents=None,
caila-marashaj marked this conversation as resolved.
Show resolved Hide resolved
)
for press_dist, backup_dist in build_presses()
],
shake_off_list=self._build_pickup_shakes(instrument),
retract_target=instrument.pick_up_configurations.distance
+ check_incr * checked_presses
+ 2,
pick_up_motor_actions=None,
),
add_tip_to_instr,
)
return press_moves

return TipActionSpec(
tip_action_moves=build_presses(),
shake_off_moves=self._build_pickup_shakes(instrument),
)

@staticmethod
Expand All @@ -839,89 +815,53 @@ def _shake_off_tips_drop(
(top_types.Point(0, 0, DROP_TIP_RELEASE_DISTANCE), None), # top
]

def _droptip_sequence_builder(
self,
bottom_pos: float,
droptip_pos: float,
plunger_currents: Dict[Axis, float],
drop_tip_currents: Dict[Axis, float],
speed: float,
home_after: bool,
home_axes: Sequence[Axis],
is_ht_pipette: bool = False,
) -> Callable[[], List[DropTipMove]]:
def build() -> List[DropTipMove]:
base = [
DropTipMove(
target_position=bottom_pos, current=plunger_currents, speed=None
),
DropTipMove(
target_position=droptip_pos,
current=drop_tip_currents,
speed=speed,
home_after=home_after,
home_after_safety_margin=abs(bottom_pos - droptip_pos),
home_axes=home_axes,
is_ht_tip_action=is_ht_pipette,
home_buffer=10 if is_ht_pipette else 0,
),
DropTipMove( # always finish drop-tip at a known safe plunger position
target_position=bottom_pos, current=plunger_currents, speed=None
),
]
return base

return build

def plan_check_drop_tip(
def plan_lt_drop_tip(
self,
mount: OT3Mount,
home_after: bool,
) -> Tuple[DropTipSpec, Callable[[], None]]:
) -> TipActionSpec:
instrument = self.get_pipette(mount)

is_96_chan = instrument.channels == 96
drop_seq = [
TipActionMoveSpec(
distance=instrument.plunger_positions.drop_tip,
speed=instrument.drop_configurations.speed,
currents={
Axis.of_main_tool_actuator(
mount
): instrument.drop_configurations.current,
},
),
TipActionMoveSpec(
distance=instrument.plunger_positions.bottom,
speed=None,
currents={
Axis.of_main_tool_actuator(
mount
): instrument.plunger_motor_current.run,
},
),
]

bottom = instrument.plunger_positions.bottom
droptip = (
instrument.drop_configurations.distance
if is_96_chan
else instrument.plunger_positions.drop_tip
return TipActionSpec(
tip_action_moves=drop_seq,
shake_off_moves=[],
)
speed = instrument.drop_configurations.speed
shakes: List[Tuple[top_types.Point, Optional[float]]] = []

def _remove_tips() -> None:
instrument.set_current_volume(0)
instrument.current_tiprack_diameter = 0.0
instrument.remove_tip()
def plan_ht_drop_tip(self, mount: OT3Mount) -> TipActionSpec:
instrument = self.get_pipette(mount)

drop_tip_current_axis = (
Axis.Q if is_96_chan else Axis.of_main_tool_actuator(mount)
)
seq_builder_ot3 = self._droptip_sequence_builder(
bottom,
droptip,
{Axis.of_main_tool_actuator(mount): instrument.plunger_motor_current.run},
{drop_tip_current_axis: instrument.drop_configurations.current},
speed,
home_after,
(Axis.of_main_tool_actuator(mount),),
is_96_chan,
drop_seq = self._build_tip_motor_moves(
prep_move_dist=instrument.drop_configurations.prep_move_distance,
clamp_move_dist=instrument.drop_configurations.distance,
prep_move_speed=instrument.drop_configurations.prep_move_speed,
clamp_move_speed=instrument.drop_configurations.speed,
plunger_current=instrument.plunger_motor_current.run,
tip_motor_current=instrument.drop_configurations.current,
)

seq_ot3 = seq_builder_ot3()
return (
DropTipSpec(
drop_moves=seq_ot3,
shake_moves=shakes,
ending_current={
Axis.of_main_tool_actuator(
mount
): instrument.plunger_motor_current.run
},
),
_remove_tips,
return TipActionSpec(
tip_action_moves=drop_seq,
shake_off_moves=[],
)

def has_pipette(self, mount: OT3Mount) -> bool:
Expand Down
Loading
Loading