Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(api): Move opentrons.initialize() to its own file #15191

Merged
merged 9 commits into from
May 16, 2024
118 changes: 9 additions & 109 deletions api/src/opentrons/__init__.py
Original file line number Diff line number Diff line change
@@ -1,39 +1,19 @@
import os

from pathlib import Path
import logging
import re
from typing import Any, List, Tuple

from opentrons.drivers.serial_communication import get_ports_by_name
from opentrons.hardware_control import (
API as HardwareAPI,
ThreadManager,
ThreadManagedHardware,
types as hw_types,
)

from opentrons.config import (
feature_flags as ff,
name,
robot_configs,
IS_ROBOT,
ROBOT_FIRMWARE_DIR,
)
from opentrons.util import logging_config
from typing import List

from opentrons.config import feature_flags as ff
from opentrons.protocols.types import ApiDeprecationError
from opentrons.protocols.api_support.types import APIVersion
SyntaxColoring marked this conversation as resolved.
Show resolved Hide resolved

from ._version import version

HERE = os.path.abspath(os.path.dirname(__file__))
__version__ = version


LEGACY_MODULES = ["robot", "reset", "instruments", "containers", "labware", "modules"]


__all__ = ["version", "__version__", "HERE", "config"]
__all__ = ["version", "__version__", "config"]


def __getattr__(attrname: str) -> None:
Expand All @@ -53,98 +33,18 @@ def __dir__() -> List[str]:
log = logging.getLogger(__name__)


SMOOTHIE_HEX_RE = re.compile("smoothie-(.*).hex")


def _find_smoothie_file() -> Tuple[Path, str]:
resources: List[Path] = []

# Search for smoothie files in /usr/lib/firmware first then fall back to
# value packed in wheel
if IS_ROBOT:
resources.extend(ROBOT_FIRMWARE_DIR.iterdir()) # type: ignore

resources_path = Path(HERE) / "resources"
resources.extend(resources_path.iterdir())

for path in resources:
matches = SMOOTHIE_HEX_RE.search(path.name)
if matches:
branch_plus_ref = matches.group(1)
return path, branch_plus_ref
raise OSError(f"Could not find smoothie firmware file in {resources_path}")


def _get_motor_control_serial_port() -> Any:
port = os.environ.get("OT_SMOOTHIE_EMULATOR_URI")

if port is None:
smoothie_id = os.environ.get("OT_SMOOTHIE_ID", "AMA")
# TODO(mc, 2021-08-01): raise a more informative exception than
# IndexError if a valid serial port is not found
port = get_ports_by_name(device_name=smoothie_id)[0]

log.info(f"Connecting to motor controller at port {port}")
return port


# todo(mm, 2024-05-15): Having functions in the package's top-level __init__.py
# can cause problems with import performance and circular dependencies. Can this
# be moved elsewhere?
def should_use_ot3() -> bool:
"""Return true if ot3 hardware controller should be used."""
if ff.enable_ot3_hardware_controller():
try:
# Try this OT-3-specific import as an extra check in case the feature
# flag is mistakenly enabled on an OT-2 for some reason.
from opentrons_hardware.drivers.can_bus import CanDriver # noqa: F401

return True
except ModuleNotFoundError:
log.exception("Cannot use OT3 Hardware controller.")
return False


async def _create_thread_manager() -> ThreadManagedHardware:
"""Build the hardware controller wrapped in a ThreadManager.

.. deprecated:: 4.6
ThreadManager is on its way out.
"""
if os.environ.get("ENABLE_VIRTUAL_SMOOTHIE"):
log.info("Initialized robot using virtual Smoothie")
thread_manager: ThreadManagedHardware = ThreadManager(
HardwareAPI.build_hardware_simulator
)
elif should_use_ot3():
from opentrons.hardware_control.ot3api import OT3API

thread_manager = ThreadManager(
ThreadManager.nonblocking_builder(OT3API.build_hardware_controller),
use_usb_bus=ff.rear_panel_integration(),
status_bar_enabled=ff.status_bar_enabled(),
feature_flags=hw_types.HardwareFeatureFlags.build_from_ff(),
)
else:
thread_manager = ThreadManager(
ThreadManager.nonblocking_builder(HardwareAPI.build_hardware_controller),
port=_get_motor_control_serial_port(),
firmware=_find_smoothie_file(),
feature_flags=hw_types.HardwareFeatureFlags.build_from_ff(),
)

try:
await thread_manager.managed_thread_ready_async()
except RuntimeError:
log.exception("Could not build hardware controller, forcing virtual")
thread_manager = ThreadManager(HardwareAPI.build_hardware_simulator)

return thread_manager


async def initialize() -> ThreadManagedHardware:
"""
Initialize the Opentrons hardware returning a hardware instance.
"""
robot_conf = robot_configs.load()
logging_config.log_init(robot_conf.log_level)

log.info(f"API server version: {version}")
log.info(f"Robot Name: {name()}")

return await _create_thread_manager()
6 changes: 6 additions & 0 deletions api/src/opentrons/_resources_path.py
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I put this in its own file to keep it close to the resources/ directory that it's referencing. Otherwise we'd be trying to access it from inside hardware_control/ with some fragile relative path.

Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
"""The path to the package's `resources/` directory."""

from pathlib import Path

_THIS = Path(__file__)
RESOURCES_PATH = (_THIS.parent / "resources").absolute()
2 changes: 1 addition & 1 deletion api/src/opentrons/hardware_control/emulation/smoothie.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
import re
from typing import Optional, Dict

from opentrons import _find_smoothie_file
from opentrons.drivers import utils
from opentrons.drivers.smoothie_drivers.constants import GCODE, HOMED_POSITION
from opentrons.hardware_control.initialization import _find_smoothie_file
from opentrons.hardware_control.emulation.parser import Command, Parser

from .abstract_emulator import AbstractEmulator
Expand Down
114 changes: 114 additions & 0 deletions api/src/opentrons/hardware_control/initialization.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
import os

from pathlib import Path
import logging
import re
from typing import Any, List, Tuple

from opentrons import should_use_ot3
from opentrons.drivers.serial_communication import get_ports_by_name
from opentrons.hardware_control import (
API as HardwareAPI,
ThreadManager,
ThreadManagedHardware,
types as hw_types,
)

from opentrons.config import (
feature_flags as ff,
name,
robot_configs,
IS_ROBOT,
ROBOT_FIRMWARE_DIR,
)
from opentrons.util import logging_config

from opentrons._resources_path import RESOURCES_PATH
from opentrons._version import version


SMOOTHIE_HEX_RE = re.compile("smoothie-(.*).hex")


log = logging.getLogger(__name__)


def _find_smoothie_file() -> Tuple[Path, str]:
resources: List[Path] = []

# Search for smoothie files in /usr/lib/firmware first then fall back to
# value packed in wheel
if IS_ROBOT:
resources.extend(ROBOT_FIRMWARE_DIR.iterdir()) # type: ignore

resources.extend(RESOURCES_PATH.iterdir())

for path in resources:
matches = SMOOTHIE_HEX_RE.search(path.name)
if matches:
branch_plus_ref = matches.group(1)
return path, branch_plus_ref
raise OSError(f"Could not find smoothie firmware file in {RESOURCES_PATH}")


def _get_motor_control_serial_port() -> Any:
port = os.environ.get("OT_SMOOTHIE_EMULATOR_URI")

if port is None:
smoothie_id = os.environ.get("OT_SMOOTHIE_ID", "AMA")
# TODO(mc, 2021-08-01): raise a more informative exception than
# IndexError if a valid serial port is not found
port = get_ports_by_name(device_name=smoothie_id)[0]

log.info(f"Connecting to motor controller at port {port}")
return port


async def _create_thread_manager() -> ThreadManagedHardware:
"""Build the hardware controller wrapped in a ThreadManager.

.. deprecated:: 4.6
ThreadManager is on its way out.
"""
if os.environ.get("ENABLE_VIRTUAL_SMOOTHIE"):
log.info("Initialized robot using virtual Smoothie")
thread_manager: ThreadManagedHardware = ThreadManager(
HardwareAPI.build_hardware_simulator
)
elif should_use_ot3():
from opentrons.hardware_control.ot3api import OT3API

thread_manager = ThreadManager(
ThreadManager.nonblocking_builder(OT3API.build_hardware_controller),
use_usb_bus=ff.rear_panel_integration(),
status_bar_enabled=ff.status_bar_enabled(),
feature_flags=hw_types.HardwareFeatureFlags.build_from_ff(),
)
else:
thread_manager = ThreadManager(
ThreadManager.nonblocking_builder(HardwareAPI.build_hardware_controller),
port=_get_motor_control_serial_port(),
firmware=_find_smoothie_file(),
feature_flags=hw_types.HardwareFeatureFlags.build_from_ff(),
)

try:
await thread_manager.managed_thread_ready_async()
except RuntimeError:
log.exception("Could not build hardware controller, forcing virtual")
thread_manager = ThreadManager(HardwareAPI.build_hardware_simulator)

return thread_manager


async def initialize() -> ThreadManagedHardware:
"""
Initialize the Opentrons hardware returning a hardware instance.
"""
robot_conf = robot_configs.load()
logging_config.log_init(robot_conf.log_level)

log.info(f"API server version: {version}")
log.info(f"Robot Name: {name()}")

return await _create_thread_manager()
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
from typing import Iterator

import pytest
from opentrons import _find_smoothie_file
from opentrons.config.robot_configs import build_config
from opentrons.hardware_control import Controller
from opentrons.hardware_control.initialization import _find_smoothie_file
from opentrons.hardware_control.emulation.settings import Settings
from opentrons.types import Mount

Expand Down
13 changes: 13 additions & 0 deletions api/tests/opentrons/hardware_control/test_initialization.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import pytest
from pathlib import Path

from opentrons.hardware_control import initialization


def test_find_smoothie_file(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None:
dummy_file = tmp_path / "smoothie-edge-2cac98asda.hex"
dummy_file.write_text("hello")
monkeypatch.setattr(initialization, "ROBOT_FIRMWARE_DIR", tmp_path)

monkeypatch.setattr(initialization, "IS_ROBOT", True)
assert initialization._find_smoothie_file() == (dummy_file, "edge-2cac98asda")
13 changes: 0 additions & 13 deletions api/tests/opentrons/test_init.py

This file was deleted.

10 changes: 10 additions & 0 deletions api/tests/opentrons/test_resources_path.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
"""Unit tests for opentrons._resources_path."""


from opentrons._resources_path import RESOURCES_PATH


def test_resources_path() -> None:
"""Make sure the resource path is basically accessible."""
matches = list(RESOURCES_PATH.glob("smoothie-*"))
assert matches
3 changes: 2 additions & 1 deletion robot-server/robot_server/hardware.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
from opentrons_shared_data import deck
from opentrons_shared_data.robot.dev_types import RobotType, RobotTypeEnum

from opentrons import initialize as initialize_api, should_use_ot3
from opentrons import should_use_ot3
from opentrons.hardware_control.initialization import initialize as initialize_api
from opentrons.config import (
IS_ROBOT,
ARCHITECTURE,
Expand Down
Loading