Skip to content

Commit

Permalink
refactor(api): Move opentrons.initialize() to its own file (#15191)
Browse files Browse the repository at this point in the history
  • Loading branch information
SyntaxColoring authored and Carlos-fernandez committed May 20, 2024
1 parent dd48193 commit bc32d40
Show file tree
Hide file tree
Showing 18 changed files with 207 additions and 143 deletions.
2 changes: 0 additions & 2 deletions api/.flake8
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ noqa-require-code = true
# string lints in these modules; remove entries as they are fixed
per-file-ignores =
setup.py:ANN,D
src/opentrons/__init__.py:ANN,D
src/opentrons/execute.py:ANN,D
src/opentrons/simulate.py:ANN,D
src/opentrons/types.py:ANN,D
Expand All @@ -47,7 +46,6 @@ per-file-ignores =
src/opentrons/util/linal.py:ANN,D
src/opentrons/util/entrypoint_util.py:ANN,D
src/opentrons/util/helpers.py:ANN,D
tests/opentrons/test_init.py:ANN,D
tests/opentrons/test_types.py:ANN,D
tests/opentrons/conftest.py:ANN,D
tests/opentrons/calibration_storage/*:ANN,D
Expand Down
132 changes: 19 additions & 113 deletions api/src/opentrons/__init__.py
Original file line number Diff line number Diff line change
@@ -1,47 +1,33 @@
import os
# noqa: D104

from pathlib import Path
import logging
import re
from typing import Any, List, Tuple

from opentrons.drivers.serial_communication import get_ports_by_name
from opentrons.hardware_control import (
API as HardwareAPI,
ThreadManager,
ThreadManagedHardware,
types as hw_types,
)

from opentrons.config import (
feature_flags as ff,
name,
robot_configs,
IS_ROBOT,
ROBOT_FIRMWARE_DIR,
)
from opentrons.util import logging_config
from opentrons.protocols.types import ApiDeprecationError
from opentrons.protocols.api_support.types import APIVersion
from typing import List

from opentrons.config import feature_flags as ff

from ._version import version

HERE = os.path.abspath(os.path.dirname(__file__))
__version__ = version


LEGACY_MODULES = ["robot", "reset", "instruments", "containers", "labware", "modules"]


__all__ = ["version", "__version__", "HERE", "config"]
__all__ = ["version", "__version__", "config"]


def __getattr__(attrname: str) -> None:
"""
Prevent import of legacy modules from global to officially
deprecate Python API Version 1.0.
"""Prevent import of legacy modules from global.
This is to officially deprecate Python API Version 1.0.
"""
if attrname in LEGACY_MODULES:
# Local imports for performance. This case is not hit frequently, and we
# don't want to drag these imports in any time anything is imported from
# anywhere in the `opentrons` package.
from opentrons.protocols.types import ApiDeprecationError
from opentrons.protocols.api_support.types import APIVersion

raise ApiDeprecationError(APIVersion(1, 0))
raise AttributeError(attrname)

Expand All @@ -53,98 +39,18 @@ def __dir__() -> List[str]:
log = logging.getLogger(__name__)


SMOOTHIE_HEX_RE = re.compile("smoothie-(.*).hex")


def _find_smoothie_file() -> Tuple[Path, str]:
resources: List[Path] = []

# Search for smoothie files in /usr/lib/firmware first then fall back to
# value packed in wheel
if IS_ROBOT:
resources.extend(ROBOT_FIRMWARE_DIR.iterdir()) # type: ignore

resources_path = Path(HERE) / "resources"
resources.extend(resources_path.iterdir())

for path in resources:
matches = SMOOTHIE_HEX_RE.search(path.name)
if matches:
branch_plus_ref = matches.group(1)
return path, branch_plus_ref
raise OSError(f"Could not find smoothie firmware file in {resources_path}")


def _get_motor_control_serial_port() -> Any:
port = os.environ.get("OT_SMOOTHIE_EMULATOR_URI")

if port is None:
smoothie_id = os.environ.get("OT_SMOOTHIE_ID", "AMA")
# TODO(mc, 2021-08-01): raise a more informative exception than
# IndexError if a valid serial port is not found
port = get_ports_by_name(device_name=smoothie_id)[0]

log.info(f"Connecting to motor controller at port {port}")
return port


# todo(mm, 2024-05-15): Having functions in the package's top-level __init__.py
# can cause problems with import performance and circular dependencies. Can this
# be moved elsewhere?
def should_use_ot3() -> bool:
"""Return true if ot3 hardware controller should be used."""
if ff.enable_ot3_hardware_controller():
try:
# Try this OT-3-specific import as an extra check in case the feature
# flag is mistakenly enabled on an OT-2 for some reason.
from opentrons_hardware.drivers.can_bus import CanDriver # noqa: F401

return True
except ModuleNotFoundError:
log.exception("Cannot use OT3 Hardware controller.")
return False


async def _create_thread_manager() -> ThreadManagedHardware:
"""Build the hardware controller wrapped in a ThreadManager.
.. deprecated:: 4.6
ThreadManager is on its way out.
"""
if os.environ.get("ENABLE_VIRTUAL_SMOOTHIE"):
log.info("Initialized robot using virtual Smoothie")
thread_manager: ThreadManagedHardware = ThreadManager(
HardwareAPI.build_hardware_simulator
)
elif should_use_ot3():
from opentrons.hardware_control.ot3api import OT3API

thread_manager = ThreadManager(
ThreadManager.nonblocking_builder(OT3API.build_hardware_controller),
use_usb_bus=ff.rear_panel_integration(),
status_bar_enabled=ff.status_bar_enabled(),
feature_flags=hw_types.HardwareFeatureFlags.build_from_ff(),
)
else:
thread_manager = ThreadManager(
ThreadManager.nonblocking_builder(HardwareAPI.build_hardware_controller),
port=_get_motor_control_serial_port(),
firmware=_find_smoothie_file(),
feature_flags=hw_types.HardwareFeatureFlags.build_from_ff(),
)

try:
await thread_manager.managed_thread_ready_async()
except RuntimeError:
log.exception("Could not build hardware controller, forcing virtual")
thread_manager = ThreadManager(HardwareAPI.build_hardware_simulator)

return thread_manager


async def initialize() -> ThreadManagedHardware:
"""
Initialize the Opentrons hardware returning a hardware instance.
"""
robot_conf = robot_configs.load()
logging_config.log_init(robot_conf.log_level)

log.info(f"API server version: {version}")
log.info(f"Robot Name: {name()}")

return await _create_thread_manager()
6 changes: 6 additions & 0 deletions api/src/opentrons/_resources_path.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
"""The path to the package's `resources/` directory."""

from pathlib import Path

_THIS = Path(__file__)
RESOURCES_PATH = (_THIS.parent / "resources").absolute()
15 changes: 9 additions & 6 deletions api/src/opentrons/config/types.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
from enum import Enum
from dataclasses import dataclass, asdict, fields
from typing import Dict, Tuple, TypeVar, Generic, List, cast, Optional
from typing import Dict, Tuple, TypeVar, Generic, List, cast, Optional, TYPE_CHECKING
from typing_extensions import TypedDict, Literal
from opentrons.hardware_control.types import OT3AxisKind, InstrumentProbeType

if TYPE_CHECKING:
# Work around circular import.
from opentrons.hardware_control.types import OT3AxisKind, InstrumentProbeType


class AxisDict(TypedDict):
Expand Down Expand Up @@ -31,7 +34,7 @@ def __getitem__(self, key: GantryLoad) -> Vt:
return cast(Vt, asdict(self)[key.value])


PerPipetteAxisSettings = ByGantryLoad[Dict[OT3AxisKind, float]]
PerPipetteAxisSettings = ByGantryLoad[Dict["OT3AxisKind", float]]


class CurrentDictDefault(TypedDict):
Expand Down Expand Up @@ -82,7 +85,7 @@ class OT3MotionSettings:

def by_gantry_load(
self, gantry_load: GantryLoad
) -> Dict[str, Dict[OT3AxisKind, float]]:
) -> Dict[str, Dict["OT3AxisKind", float]]:
return dict(
(field.name, getattr(self, field.name)[gantry_load])
for field in fields(self)
Expand All @@ -96,7 +99,7 @@ class OT3CurrentSettings:

def by_gantry_load(
self, gantry_load: GantryLoad
) -> Dict[str, Dict[OT3AxisKind, float]]:
) -> Dict[str, Dict["OT3AxisKind", float]]:
return dict(
(field.name, getattr(self, field.name)[gantry_load])
for field in fields(self)
Expand Down Expand Up @@ -138,7 +141,7 @@ class LiquidProbeSettings:
aspirate_while_sensing: bool
auto_zero_sensor: bool
num_baseline_reads: int
data_files: Optional[Dict[InstrumentProbeType, str]]
data_files: Optional[Dict["InstrumentProbeType", str]]


@dataclass(frozen=True)
Expand Down
2 changes: 1 addition & 1 deletion api/src/opentrons/hardware_control/emulation/smoothie.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
import re
from typing import Optional, Dict

from opentrons import _find_smoothie_file
from opentrons.drivers import utils
from opentrons.drivers.smoothie_drivers.constants import GCODE, HOMED_POSITION
from opentrons.hardware_control.initialization import _find_smoothie_file
from opentrons.hardware_control.emulation.parser import Command, Parser

from .abstract_emulator import AbstractEmulator
Expand Down
114 changes: 114 additions & 0 deletions api/src/opentrons/hardware_control/initialization.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
import os

from pathlib import Path
import logging
import re
from typing import Any, List, Tuple

from opentrons import should_use_ot3
from opentrons.drivers.serial_communication import get_ports_by_name
from opentrons.hardware_control import (
API as HardwareAPI,
ThreadManager,
ThreadManagedHardware,
types as hw_types,
)

from opentrons.config import (
feature_flags as ff,
name,
robot_configs,
IS_ROBOT,
ROBOT_FIRMWARE_DIR,
)
from opentrons.util import logging_config

from opentrons._resources_path import RESOURCES_PATH
from opentrons._version import version


SMOOTHIE_HEX_RE = re.compile("smoothie-(.*).hex")


log = logging.getLogger(__name__)


def _find_smoothie_file() -> Tuple[Path, str]:
resources: List[Path] = []

# Search for smoothie files in /usr/lib/firmware first then fall back to
# value packed in wheel
if IS_ROBOT:
resources.extend(ROBOT_FIRMWARE_DIR.iterdir()) # type: ignore

resources.extend(RESOURCES_PATH.iterdir())

for path in resources:
matches = SMOOTHIE_HEX_RE.search(path.name)
if matches:
branch_plus_ref = matches.group(1)
return path, branch_plus_ref
raise OSError(f"Could not find smoothie firmware file in {RESOURCES_PATH}")


def _get_motor_control_serial_port() -> Any:
port = os.environ.get("OT_SMOOTHIE_EMULATOR_URI")

if port is None:
smoothie_id = os.environ.get("OT_SMOOTHIE_ID", "AMA")
# TODO(mc, 2021-08-01): raise a more informative exception than
# IndexError if a valid serial port is not found
port = get_ports_by_name(device_name=smoothie_id)[0]

log.info(f"Connecting to motor controller at port {port}")
return port


async def _create_thread_manager() -> ThreadManagedHardware:
"""Build the hardware controller wrapped in a ThreadManager.
.. deprecated:: 4.6
ThreadManager is on its way out.
"""
if os.environ.get("ENABLE_VIRTUAL_SMOOTHIE"):
log.info("Initialized robot using virtual Smoothie")
thread_manager: ThreadManagedHardware = ThreadManager(
HardwareAPI.build_hardware_simulator
)
elif should_use_ot3():
from opentrons.hardware_control.ot3api import OT3API

thread_manager = ThreadManager(
ThreadManager.nonblocking_builder(OT3API.build_hardware_controller),
use_usb_bus=ff.rear_panel_integration(),
status_bar_enabled=ff.status_bar_enabled(),
feature_flags=hw_types.HardwareFeatureFlags.build_from_ff(),
)
else:
thread_manager = ThreadManager(
ThreadManager.nonblocking_builder(HardwareAPI.build_hardware_controller),
port=_get_motor_control_serial_port(),
firmware=_find_smoothie_file(),
feature_flags=hw_types.HardwareFeatureFlags.build_from_ff(),
)

try:
await thread_manager.managed_thread_ready_async()
except RuntimeError:
log.exception("Could not build hardware controller, forcing virtual")
thread_manager = ThreadManager(HardwareAPI.build_hardware_simulator)

return thread_manager


async def initialize() -> ThreadManagedHardware:
"""
Initialize the Opentrons hardware returning a hardware instance.
"""
robot_conf = robot_configs.load()
logging_config.log_init(robot_conf.log_level)

log.info(f"API server version: {version}")
log.info(f"Robot Name: {name()}")

return await _create_thread_manager()
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
from typing import Iterator

import pytest
from opentrons import _find_smoothie_file
from opentrons.config.robot_configs import build_config
from opentrons.hardware_control import Controller
from opentrons.hardware_control.initialization import _find_smoothie_file
from opentrons.hardware_control.emulation.settings import Settings
from opentrons.types import Mount

Expand Down
13 changes: 13 additions & 0 deletions api/tests/opentrons/hardware_control/test_initialization.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import pytest
from pathlib import Path

from opentrons.hardware_control import initialization


def test_find_smoothie_file(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None:
dummy_file = tmp_path / "smoothie-edge-2cac98asda.hex"
dummy_file.write_text("hello")
monkeypatch.setattr(initialization, "ROBOT_FIRMWARE_DIR", tmp_path)

monkeypatch.setattr(initialization, "IS_ROBOT", True)
assert initialization._find_smoothie_file() == (dummy_file, "edge-2cac98asda")
Loading

0 comments on commit bc32d40

Please sign in to comment.