Skip to content

Commit

Permalink
feat(api): Heater Shaker Firmware Level Emulator (#11121) (#11207)
Browse files Browse the repository at this point in the history
* feat: Heater-Shaker Firmware Emulator




remove files


Fix some parsing errors


Formatting


Change rpm_per_tick

* Tests

* Modify home functionality


Formatting and linting

* fix rpm emulator g-code and add deactivate test

* Fix tests

* Add deactivate_heater G-Code

* Add start_set_temperature

* Update home and deactivate_heater functionality

Based off of #11121 (comment)

* Linting

* fix: Fix deactivate_heater function

* Make home_delay_time customizable

* Linting

Co-authored-by: jbleon95 <[email protected]>

Co-authored-by: jbleon95 <[email protected]>
  • Loading branch information
DerekMaggio and jbleon95 authored Jul 25, 2022
1 parent 30ee961 commit 0df9319
Show file tree
Hide file tree
Showing 8 changed files with 337 additions and 2 deletions.
2 changes: 2 additions & 0 deletions api/src/opentrons/drivers/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ def parse_rpm_response(rpm_string: str) -> RPM:
"""Example input: T:1233 C:212"""
data = parse_key_values(rpm_string)
try:
# target is listed as Optional for below assignment,
# but None will be represented as 0 in G-code
target: Optional[int] = int(parse_number(data["T"], 0))
if target == 0:
target = None
Expand Down
146 changes: 146 additions & 0 deletions api/src/opentrons/hardware_control/emulation/heater_shaker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
"""An emulation of the opentrons heater shaker module.
The purpose is to provide a fake backend that responds to GCODE commands.
"""
import logging
from time import sleep
from typing import (
Optional,
)

from opentrons.drivers.heater_shaker.driver import (
GCODE,
HS_ACK,
)
from opentrons.hardware_control.emulation.parser import Parser, Command
from opentrons.hardware_control.emulation.settings import HeaterShakerSettings
from . import util

from .abstract_emulator import AbstractEmulator
from .simulations import (
Temperature,
RPM,
)
from .util import TEMPERATURE_ROOM
from ...drivers.types import HeaterShakerLabwareLatchStatus

logger = logging.getLogger(__name__)


class HeaterShakerEmulator(AbstractEmulator):
"""Heater Shaker emulator"""

_temperature: Temperature
_rpm: RPM
_latch_status: HeaterShakerLabwareLatchStatus

def __init__(self, parser: Parser, settings: HeaterShakerSettings) -> None:
self._parser = parser
self._settings = settings
self._gcode_to_function_mapping = {
GCODE.SET_RPM.value: self._set_rpm,
GCODE.GET_RPM.value: self._get_rpm,
GCODE.SET_TEMPERATURE.value: self._set_temp,
GCODE.GET_TEMPERATURE.value: self._get_temp,
GCODE.HOME.value: self._home,
GCODE.ENTER_BOOTLOADER.value: self._enter_bootloader,
GCODE.GET_VERSION.value: self._get_version,
GCODE.OPEN_LABWARE_LATCH.value: self._open_labware_latch,
GCODE.CLOSE_LABWARE_LATCH.value: self._close_labware_latch,
GCODE.GET_LABWARE_LATCH_STATE.value: self._get_labware_latch_state,
GCODE.DEACTIVATE_HEATER.value: self._deactivate_heater,
}
self.reset()

def handle(self, line: str) -> Optional[str]:
"""Handle a line"""
results = (self._handle(c) for c in self._parser.parse(line))
joined = " ".join(r for r in results if r)
return None if not joined else joined

def reset(self) -> None:

self._temperature = Temperature(
per_tick=self._settings.temperature.degrees_per_tick,
current=self._settings.temperature.starting,
)
self._rpm = RPM(
per_tick=self._settings.rpm.rpm_per_tick,
current=self._settings.rpm.starting,
)
self._rpm.set_target(0.0)
self._latch_status = HeaterShakerLabwareLatchStatus.IDLE_OPEN

def _handle(self, command: Command) -> Optional[str]:
"""
Handle a command.
TODO: AL 20210218 create dispatch map and remove 'noqa(C901)'
"""
logger.info(f"Got command {command}")
func_to_run = self._gcode_to_function_mapping.get(command.gcode)
res = None if func_to_run is None else func_to_run(command)
return None if not isinstance(res, str) else f"{res} {HS_ACK}"

def _set_rpm(self, command: Command) -> str:
value = command.params["S"]
assert isinstance(value, float), f"invalid value '{value}'"
self._rpm.set_target(value)
return "M3"

def _get_rpm(self, command: Command) -> str:
res = (
f"M123 C:{self._rpm.current} "
f"T:{self._rpm.target if self._rpm.target is not None else 0}"
)
self._rpm.tick()
return res

def _set_temp(self, command: Command) -> str:
value = command.params["S"]
assert isinstance(value, float), f"invalid value '{value}'"
self._temperature.set_target(value)
return "M104"

def _get_temp(self, command: Command) -> str:
res = (
f"M105 C:{self._temperature.current} "
f"T:{util.OptionalValue(self._temperature.target)}"
)
self._temperature.tick()
return res

def _home(self, command: Command) -> str:
sleep(self._settings.home_delay_time)
self._rpm.deactivate(0.0)
self._rpm.set_target(0.0)
return "G28"

def _enter_bootloader(self, command: Command) -> None:
pass

def _get_version(self, command: Command) -> str:
return (
f"FW:{self._settings.version} "
f"HW:{self._settings.model} "
f"SerialNo:{self._settings.serial_number}"
)

def _open_labware_latch(self, command: Command) -> str:
self._latch_status = HeaterShakerLabwareLatchStatus.IDLE_OPEN
return "M242"

def _close_labware_latch(self, command: Command) -> str:
self._latch_status = HeaterShakerLabwareLatchStatus.IDLE_CLOSED
return "M243"

def _get_labware_latch_state(self, command: Command) -> str:
return f"M241 STATUS:{self._latch_status.value.upper()}"

def _deactivate_heater(self, command: Command) -> str:
self._temperature.deactivate(TEMPERATURE_ROOM)
return "M106"

@staticmethod
def get_terminator() -> bytes:
return b"\n"
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from typing_extensions import Final

from opentrons.hardware_control.emulation.abstract_emulator import AbstractEmulator
from opentrons.hardware_control.emulation.heater_shaker import HeaterShakerEmulator
from opentrons.hardware_control.emulation.types import ModuleType
from opentrons.hardware_control.emulation.magdeck import MagDeckEmulator
from opentrons.hardware_control.emulation.parser import Parser
Expand All @@ -22,12 +23,16 @@
ModuleType.Thermocycler.value: lambda s: ThermocyclerEmulator(
Parser(), s.thermocycler
),
ModuleType.Heatershaker.value: lambda s: HeaterShakerEmulator(
Parser(), s.heatershaker
),
}

emulator_port: Final[Dict[str, Callable[[Settings], ProxySettings]]] = {
ModuleType.Magnetic.value: lambda s: s.magdeck_proxy,
ModuleType.Temperature.value: lambda s: s.temperature_proxy,
ModuleType.Thermocycler.value: lambda s: s.thermocycler_proxy,
ModuleType.Heatershaker.value: lambda s: s.heatershaker_proxy,
}


Expand Down
19 changes: 19 additions & 0 deletions api/src/opentrons/hardware_control/emulation/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ class TemperatureModelSettings(BaseModel):
starting: float = float(TEMPERATURE_ROOM)


class RPMModelSettings(BaseModel):
rpm_per_tick: float = 100.0
starting: float = 0.0


class MagDeckSettings(BaseModuleSettings):
pass

Expand All @@ -42,6 +47,12 @@ class ThermocyclerSettings(BaseModuleSettings):
plate_temperature: TemperatureModelSettings


class HeaterShakerSettings(BaseModuleSettings):
temperature: TemperatureModelSettings
rpm: RPMModelSettings
home_delay_time: int = 0


class ProxySettings(BaseModel):
"""Settings for a proxy."""

Expand Down Expand Up @@ -75,6 +86,14 @@ class Settings(BaseSettings):
lid_temperature=TemperatureModelSettings(),
plate_temperature=TemperatureModelSettings(),
)
heatershaker: HeaterShakerSettings = HeaterShakerSettings(
serial_number="heater_shaker_emulator",
model="v01",
version="v0.0.1",
temperature=TemperatureModelSettings(),
rpm=RPMModelSettings(),
home_delay_time=0,
)

heatershaker_proxy: ProxySettings = ProxySettings(
emulator_port=9000, driver_port=9995
Expand Down
51 changes: 50 additions & 1 deletion api/src/opentrons/hardware_control/emulation/simulations.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,56 @@ def deactivate(self, temperature: float) -> None:
self._target = None
self._current = temperature

def set_target(self, target: float) -> None:
def set_target(self, target: Optional[float]) -> None:
self._target = target

@property
def current(self) -> float:
return self._current

@property
def target(self) -> Optional[float]:
return self._target


class RPM(Simulation):
"""A model with a current and target rpm. The current rpm is
always moving towards the target.
"""

def __init__(self, per_tick: float, current: float) -> None:
"""Construct a rpm simulation.
Args:
per_tick: amount to move per tick,
current: the starting rpm
"""
self._per_tick = per_tick
self._current = current
self._target: Optional[float] = None

def tick(self) -> None:

if self._target is None:
target = 0.0
else:
target = self._target

diff = target - self._current

if abs(diff) < self._per_tick:
self._current = target
elif diff > 0:
self._current += self._per_tick
else:
self._current -= self._per_tick

def deactivate(self, rpm: float) -> None:
"""Deactivate and reset to rpm"""
self._target = None
self._current = rpm

def set_target(self, target: Optional[float]) -> None:
self._target = target

@property
Expand Down
7 changes: 6 additions & 1 deletion api/tests/opentrons/hardware_control/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,12 @@ def emulator_settings() -> Settings:
@pytest.fixture(scope="session")
def emulation_app(emulator_settings: Settings) -> Iterator[None]:
"""Run the emulators"""
modules = [ModuleType.Magnetic, ModuleType.Temperature, ModuleType.Thermocycler]
modules = [
ModuleType.Magnetic,
ModuleType.Temperature,
ModuleType.Thermocycler,
ModuleType.Heatershaker,
]

def _run_app() -> None:
async def _async_run() -> None:
Expand Down
102 changes: 102 additions & 0 deletions api/tests/opentrons/hardware_control/integration/test_heatershaker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
import asyncio
from typing import AsyncIterator, Iterator

import pytest
from mock import AsyncMock
from opentrons.drivers.rpi_drivers.types import USBPort
from opentrons.hardware_control.emulation.settings import Settings
from opentrons.hardware_control.emulation.util import TEMPERATURE_ROOM

from opentrons.hardware_control.modules import HeaterShaker

TEMP_ROOM_LOW = TEMPERATURE_ROOM - 0.7
TEMP_ROOM_HIGH = TEMPERATURE_ROOM + 0.7


@pytest.fixture
async def heatershaker(
emulation_app: Iterator[None],
emulator_settings: Settings,
) -> AsyncIterator[HeaterShaker]:
module = await HeaterShaker.build(
port=f"socket://127.0.0.1:{emulator_settings.heatershaker_proxy.driver_port}",
execution_manager=AsyncMock(),
usb_port=USBPort(name="", port_number=1, device_path="", hub=1),
loop=asyncio.get_running_loop(),
polling_period=0.5,
)
yield module
await module.cleanup()


def test_device_info(heatershaker: HeaterShaker):
"""Confirm device_info returns correct values."""
assert heatershaker.device_info == {
"model": "v01",
"version": "v0.0.1",
"serial": "heater_shaker_emulator",
}


async def test_latch_status(heatershaker: HeaterShaker) -> None:
"""It should run open and close latch."""
await heatershaker.wait_next_poll()
assert heatershaker.labware_latch_status.value == "idle_open"

await heatershaker.close_labware_latch()
assert heatershaker.labware_latch_status.value == "idle_closed"

await heatershaker.open_labware_latch()
assert heatershaker.labware_latch_status.value == "idle_open"


async def test_speed(heatershaker: HeaterShaker) -> None:
"""It should speed up, then slow down."""

await heatershaker.wait_next_poll()
await heatershaker.set_speed(550)
assert heatershaker.target_speed == 550

# The acceptable delta for actual speed is 100
assert 450 <= heatershaker.speed <= 650


async def test_deactivate_shaker(heatershaker: HeaterShaker) -> None:
"""It should speed up, then slow down."""

await heatershaker.wait_next_poll()
await heatershaker.set_speed(150)
assert heatershaker.target_speed == 150

await heatershaker.deactivate_shaker()

assert heatershaker.speed == 0
assert heatershaker.target_speed is None


async def test_deactivate_heater(heatershaker: HeaterShaker) -> None:
await heatershaker.wait_next_poll()
await heatershaker.start_set_temperature(30.0)
await heatershaker.await_temperature(30.0)
assert heatershaker.target_temperature == 30.0
assert 29.3 <= heatershaker.temperature <= 30.7

await heatershaker.deactivate_heater()
assert heatershaker.target_temperature is None
assert TEMP_ROOM_LOW <= heatershaker.temperature <= TEMP_ROOM_HIGH


async def test_temp(heatershaker: HeaterShaker) -> None:
"""Test setting temp"""

# Have to wait for next poll because target temp will not update until then
await heatershaker.wait_next_poll()
await heatershaker.start_set_temperature(50.0)
assert heatershaker.target_temperature == 50.0
assert heatershaker.temperature != 50.0

await heatershaker.await_temperature(50.0)
assert heatershaker.target_temperature == 50.0

# Acceptable delta is 0.7 degrees
assert 49.3 <= heatershaker.temperature <= 50.7
Loading

0 comments on commit 0df9319

Please sign in to comment.