Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(api): only restore changed currents #13683

Merged
merged 3 commits into from
Oct 3, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 23 additions & 5 deletions api/src/opentrons/hardware_control/backends/ot3controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -932,14 +932,32 @@ async def set_hold_current(self, axis_currents: OT3AxisMap[float]) -> None:
self._current_settings[axis].hold_current = current

@asynccontextmanager
async def restore_current(self) -> AsyncIterator[None]:
"""Save the current."""
old_current_settings = deepcopy(self._current_settings)
async def restore_current(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

kind of a nit i guess but really this isn't restore_current anymore, can we call it like motor_current or something

self,
run_currents: OT3AxisMap[float] = {},
hold_currents: OT3AxisMap[float] = {},
) -> AsyncIterator[None]:
"""Update and restore current."""
assert self._current_settings
old_settings = deepcopy(self._current_settings)
if run_currents:
await self.set_active_current(run_currents)
if hold_currents:
await self.set_hold_current(hold_currents)
try:
yield
finally:
self._current_settings = old_current_settings
await self.set_default_currents()
if run_currents:
await self.set_active_current(
{ax: old_settings[ax].run_current for ax in run_currents.keys()}
)
if hold_currents:
await self.set_hold_current(
{ax: old_settings[ax].hold_current for ax in hold_currents.keys()}
)
if not run_currents and not hold_currents:
self._current_settings = old_settings
await self.set_default_currents()

@asynccontextmanager
async def restore_z_r_run_current(self) -> AsyncIterator[None]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -540,7 +540,11 @@ async def set_active_current(self, axis_currents: OT3AxisMap[float]) -> None:
return None

@asynccontextmanager
async def restore_current(self) -> AsyncIterator[None]:
async def restore_current(
self,
run_currents: OT3AxisMap[float] = {},
hold_currents: OT3AxisMap[float] = {},
) -> AsyncIterator[None]:
"""Save the current."""
yield

Expand Down
41 changes: 21 additions & 20 deletions api/src/opentrons/hardware_control/ot3api.py
Original file line number Diff line number Diff line change
Expand Up @@ -1329,21 +1329,21 @@ async def _set_plunger_current_and_home(
moves = self._build_moves(
origin, target_pos, instr.config.plunger_homing_configurations.speed
)
async with self._backend.restore_current():
await self._backend.set_active_current(
{axis: instr.config.plunger_homing_configurations.current}
)
async with self._backend.restore_current(
run_currents={
axis: instr.config.plunger_homing_configurations.current
}
):
await self._backend.move(
origin,
moves[0],
MoveStopCondition.none,
)
await self._backend.home([axis], self.gantry_load)
else:
async with self._backend.restore_current():
await self._backend.set_active_current(
{axis: instr.config.plunger_homing_configurations.current}
)
async with self._backend.restore_current(
run_currents={axis: instr.config.plunger_homing_configurations.current}
):
await self._backend.home([axis], self.gantry_load)

async def _retrieve_home_position(
Expand Down Expand Up @@ -1695,10 +1695,11 @@ async def _move_to_plunger_bottom(
# NOTE: plunger position (mm) decreases up towards homing switch
# NOTE: if already at BOTTOM, we still need to run backlash-compensation movement,
# because we do not know if we arrived at BOTTOM from above or below.
async with self._backend.restore_current():
await self._backend.set_active_current(
{pip_ax: instrument.config.plunger_homing_configurations.current}
)
async with self._backend.restore_current(
run_currents={
pip_ax: instrument.config.plunger_homing_configurations.current
}
):
if self._current_position[pip_ax] < backlash_pos[pip_ax]:
await self._move(
backlash_pos,
Expand Down Expand Up @@ -1880,10 +1881,9 @@ async def _force_pick_up_tip(
self, mount: OT3Mount, pipette_spec: PickUpTipSpec
) -> None:
for press in pipette_spec.presses:
async with self._backend.restore_current():
await self._backend.set_active_current(
{axis: current for axis, current in press.current.items()}
)
async with self._backend.restore_current(
run_currents={axis: current for axis, current in press.current.items()}
):
target_down = target_position_from_relative(
mount, press.relative_down, self._current_position
)
Expand All @@ -1899,10 +1899,11 @@ async def _force_pick_up_tip(
async def _motor_pick_up_tip(
self, mount: OT3Mount, pipette_spec: TipMotorPickUpTipSpec
) -> None:
async with self._backend.restore_current():
await self._backend.set_active_current(
{axis: current for axis, current in pipette_spec.currents.items()}
)
async with self._backend.restore_current(
run_currents={
axis: current for axis, current in pipette_spec.currents.items()
}
):
# Move to pick up position
target_down = target_position_from_relative(
mount,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
TipStateType,
FailedTipStateCheck,
EstopState,
CurrentConfig,
)
from opentrons.hardware_control.errors import (
FirmwareUpdateRequired,
Expand Down Expand Up @@ -1110,3 +1111,47 @@ async def test_requires_estop(

with expectation:
await controller.home([Axis.X, Axis.Y], gantry_load=GantryLoad.LOW_THROUGHPUT)


@pytest.mark.parametrize(
"run_currents, hold_currents",
[
[{Axis.X: 1.0}, {}],
[{}, {Axis.X: 1.0}],
[{Axis.X: 1.0}, {Axis.X: 1.0}],
[{}, {}],
],
)
async def test_restore_current(
controller: OT3Controller,
run_currents: OT3AxisMap[float],
hold_currents: OT3AxisMap[float],
) -> None:
"""Test that restore current actually works."""
controller._current_settings = {Axis.X: CurrentConfig(0.0, 0.0)}

with mock.patch.object(controller, "set_active_current") as mock_run_currents:
with mock.patch.object(controller, "set_hold_current") as mock_hold_currents:
with mock.patch.object(controller, "set_default_currents") as mock_default:

async with controller.restore_current(run_currents, hold_currents):
await controller.update_position()

if not run_currents and not hold_currents:
mock_default.assert_called_once()
mock_run_currents.assert_not_called()
mock_hold_currents.assert_not_called()
elif run_currents:
mock_run_currents.assert_has_calls(
[
mock.call({Axis.X: 1.0}),
mock.call({Axis.X: 0.0}),
],
)
elif hold_currents:
mock_hold_currents.assert_has_calls(
[
mock.call({Axis.X: 1.0}),
mock.call({Axis.X: 0.0}),
],
)
Loading