diff --git a/api/src/opentrons/protocol_runner/protocol_runner.py b/api/src/opentrons/protocol_runner/protocol_runner.py index 69851dea0d0c..2287ca737075 100644 --- a/api/src/opentrons/protocol_runner/protocol_runner.py +++ b/api/src/opentrons/protocol_runner/protocol_runner.py @@ -401,6 +401,7 @@ def __init__( self._hardware_api.should_taskify_movement_execution(taskify=False) + # TODO(tz, 6-10-2024): explore moving this method into the constructor. def prepare(self) -> None: """Set the task queue to wait until all commands are executed.""" self._task_queue.set_run_func(func=self._protocol_engine.wait_until_complete) diff --git a/api/src/opentrons/protocol_runner/run_orchestrator.py b/api/src/opentrons/protocol_runner/run_orchestrator.py index bbd6088b411c..4b046f95d0e8 100644 --- a/api/src/opentrons/protocol_runner/run_orchestrator.py +++ b/api/src/opentrons/protocol_runner/run_orchestrator.py @@ -1,12 +1,44 @@ """Engine/Runner provider.""" from __future__ import annotations -from typing import Optional, Union +from typing import Optional, Union, List, Dict -from . import protocol_runner, AnyRunner +from anyio import move_on_after + +from opentrons_shared_data.labware.dev_types import LabwareUri +from opentrons_shared_data.labware.labware_definition import LabwareDefinition +from opentrons_shared_data.errors import GeneralError +from opentrons_shared_data.robot.dev_types import RobotType + +from . import protocol_runner, RunResult, JsonRunner, PythonAndLegacyRunner from ..hardware_control import HardwareControlAPI -from ..protocol_engine import ProtocolEngine -from ..protocol_engine.types import PostRunHardwareState -from ..protocol_reader import JsonProtocolConfig, PythonProtocolConfig +from ..hardware_control.modules import AbstractModule as HardwareModuleAPI +from ..protocol_engine import ( + ProtocolEngine, + CommandCreate, + Command, + StateSummary, + CommandPointer, + CommandSlice, +) +from ..protocol_engine.types import ( + PostRunHardwareState, + EngineStatus, + LabwareOffsetCreate, + LabwareOffset, + DeckConfigurationType, + RunTimeParameter, + RunTimeParamValuesType, +) +from ..protocol_reader import JsonProtocolConfig, PythonProtocolConfig, ProtocolSource +from ..protocols.parse import PythonParseMode + + +class NoProtocolRunAvailable(RuntimeError): + """An error raised if there is no protocol run available.""" + + +class RunNotFound(GeneralError): + """An error raised if there is no run associated.""" class RunOrchestrator: @@ -20,6 +52,7 @@ class RunOrchestrator: ] _setup_runner: protocol_runner.LiveRunner _fixit_runner: protocol_runner.LiveRunner + _protocol_live_runner: protocol_runner.LiveRunner _hardware_api: HardwareControlAPI _protocol_engine: ProtocolEngine @@ -29,6 +62,7 @@ def __init__( hardware_api: HardwareControlAPI, fixit_runner: protocol_runner.LiveRunner, setup_runner: protocol_runner.LiveRunner, + protocol_live_runner: protocol_runner.LiveRunner, json_or_python_protocol_runner: Optional[ Union[protocol_runner.PythonAndLegacyRunner, protocol_runner.JsonRunner] ] = None, @@ -41,25 +75,27 @@ def __init__( hardware_api: Hardware control API instance. fixit_runner: LiveRunner for fixit commands. setup_runner: LiveRunner for setup commands. + protocol_live_runner: LiveRunner for protocol commands. json_or_python_protocol_runner: JsonRunner/PythonAndLegacyRunner for protocol commands. run_id: run id if any, associated to the runner/engine. """ - self.run_id = run_id + self._run_id = run_id self._protocol_engine = protocol_engine self._hardware_api = hardware_api self._protocol_runner = json_or_python_protocol_runner self._setup_runner = setup_runner self._fixit_runner = fixit_runner + self._protocol_live_runner = protocol_live_runner - @property - def engine(self) -> ProtocolEngine: - """Get the "current" persisted ProtocolEngine.""" - return self._protocol_engine + self._fixit_runner.prepare() + self._setup_runner.prepare() @property - def runner(self) -> AnyRunner: - """Get the "current" persisted ProtocolRunner.""" - return self._protocol_runner or self._setup_runner + def run_id(self) -> str: + """Get the "current" persisted run_id.""" + if not self._run_id: + raise RunNotFound() + return self._run_id @classmethod def build_orchestrator( @@ -82,6 +118,10 @@ def build_orchestrator( protocol_engine=protocol_engine, hardware_api=hardware_api, ) + protocol_live_runner = protocol_runner.LiveRunner( + protocol_engine=protocol_engine, + hardware_api=hardware_api, + ) json_or_python_runner = None if protocol_config: json_or_python_runner = protocol_runner.create_protocol_runner( @@ -98,4 +138,194 @@ def build_orchestrator( fixit_runner=fixit_runner, hardware_api=hardware_api, protocol_engine=protocol_engine, + protocol_live_runner=protocol_live_runner, + ) + + def play(self, deck_configuration: Optional[DeckConfigurationType] = None) -> None: + """Start or resume the run.""" + self._protocol_engine.play(deck_configuration=deck_configuration) + + async def run(self, deck_configuration: DeckConfigurationType) -> RunResult: + """Start the run.""" + if self._protocol_runner: + return await self._protocol_runner.run( + deck_configuration=deck_configuration + ) + elif self._protocol_live_runner: + return await self._protocol_live_runner.run( + deck_configuration=deck_configuration + ) + else: + return await self._setup_runner.run(deck_configuration=deck_configuration) + + def pause(self) -> None: + """Pause the run.""" + self._protocol_engine.request_pause() + + async def stop(self) -> None: + """Stop the run.""" + if self.run_has_started(): + await self._protocol_engine.request_stop() + else: + await self.finish( + drop_tips_after_run=False, + set_run_status=False, + post_run_hardware_state=PostRunHardwareState.STAY_ENGAGED_IN_PLACE, + ) + + def resume_from_recovery(self) -> None: + """Resume the run from recovery.""" + self._protocol_engine.resume_from_recovery() + + async def finish( + self, + error: Optional[Exception] = None, + drop_tips_after_run: bool = True, + set_run_status: bool = True, + post_run_hardware_state: PostRunHardwareState = PostRunHardwareState.HOME_AND_STAY_ENGAGED, + ) -> None: + """Finish the run.""" + await self._protocol_engine.finish( + error=error, + drop_tips_after_run=drop_tips_after_run, + set_run_status=set_run_status, + post_run_hardware_state=post_run_hardware_state, + ) + + def get_state_summary(self) -> StateSummary: + """Get protocol run data.""" + return self._protocol_engine.state_view.get_summary() + + def get_loaded_labware_definitions(self) -> List[LabwareDefinition]: + """Get loaded labware definitions.""" + return self._protocol_engine.state_view.labware.get_loaded_labware_definitions() + + def get_run_time_parameters(self) -> List[RunTimeParameter]: + """Parameter definitions defined by protocol, if any. Will always be empty before execution.""" + return ( + [] + if self._protocol_runner is None + else self._protocol_runner.run_time_parameters + ) + + def get_current_command(self) -> Optional[CommandPointer]: + """Get the current running command.""" + return self._protocol_engine.state_view.commands.get_current() + + def get_command_slice( + self, + cursor: Optional[int], + length: int, + ) -> CommandSlice: + """Get a slice of run commands. + + Args: + cursor: Requested index of first command in the returned slice. + length: Length of slice to return. + """ + return self._protocol_engine.state_view.commands.get_slice( + cursor=cursor, length=length + ) + + def get_command_recovery_target(self) -> Optional[CommandPointer]: + """Get the current error recovery target.""" + return self._protocol_engine.state_view.commands.get_recovery_target() + + def get_command(self, command_id: str) -> Command: + """Get a run's command by ID.""" + return self._protocol_engine.state_view.commands.get(command_id=command_id) + + def get_all_commands(self) -> List[Command]: + """Get all run commands.""" + return self._protocol_engine.state_view.commands.get_all() + + def get_run_status(self) -> EngineStatus: + """Get the current execution status of the engine.""" + return self._protocol_engine.state_view.commands.get_status() + + def get_is_run_terminal(self) -> bool: + """Get whether engine is in a terminal state.""" + return self._protocol_engine.state_view.commands.get_is_terminal() + + def run_has_started(self) -> bool: + """Get whether the run has started.""" + return self._protocol_engine.state_view.commands.has_been_played() + + def run_has_stopped(self) -> bool: + """Get whether the run has stopped.""" + return self._protocol_engine.state_view.commands.get_is_stopped() + + def add_labware_offset(self, request: LabwareOffsetCreate) -> LabwareOffset: + """Add a new labware offset to state.""" + return self._protocol_engine.add_labware_offset(request) + + def add_labware_definition(self, definition: LabwareDefinition) -> LabwareUri: + """Add a new labware definition to state.""" + return self._protocol_engine.add_labware_definition(definition) + + async def add_command_and_wait_for_interval( + self, + command: CommandCreate, + wait_until_complete: bool = False, + timeout: Optional[int] = None, + failed_command_id: Optional[str] = None, + ) -> Command: + """Add a new command to execute and wait for it to complete if needed.""" + added_command = self._protocol_engine.add_command( + request=command, failed_command_id=failed_command_id ) + if wait_until_complete: + timeout_sec = None if timeout is None else timeout / 1000.0 + with move_on_after(timeout_sec): + await self._protocol_engine.wait_for_command(added_command.id) + return added_command + + def estop(self) -> None: + """Handle an E-stop event from the hardware API.""" + return self._protocol_engine.estop() + + async def use_attached_modules( + self, modules_by_id: Dict[str, HardwareModuleAPI] + ) -> None: + """Load attached modules directly into state, without locations.""" + await self._protocol_engine.use_attached_modules(modules_by_id=modules_by_id) + + def get_protocol_runner(self) -> Optional[Union[JsonRunner, PythonAndLegacyRunner]]: + """Get run's protocol runner if any, if not return None.""" + return self._protocol_runner + + async def load_json( + self, + protocol_source: ProtocolSource, + ) -> None: + """Load a json protocol.""" + assert self._protocol_runner is not None + assert isinstance(self._protocol_runner, JsonRunner) + await self._protocol_runner.load(protocol_source=protocol_source) + + async def load_python( + self, + protocol_source: ProtocolSource, + python_parse_mode: PythonParseMode, + run_time_param_values: Optional[RunTimeParamValuesType], + ) -> None: + """Load a python protocol.""" + assert self._protocol_runner is not None + assert isinstance(self._protocol_runner, PythonAndLegacyRunner) + await self._protocol_runner.load( + protocol_source=protocol_source, + python_parse_mode=python_parse_mode, + run_time_param_values=run_time_param_values, + ) + + def get_is_okay_to_clear(self) -> bool: + """Get whether the engine is stopped or sitting idly, so it could be removed.""" + return self._protocol_engine.state_view.commands.get_is_okay_to_clear() + + def prepare(self) -> None: + """Prepare live runner for a run.""" + self._protocol_live_runner.prepare() + + def get_robot_type(self) -> RobotType: + """Get engine robot type.""" + return self._protocol_engine.state_view.config.robot_type diff --git a/api/tests/opentrons/protocol_runner/test_run_orchestrator.py b/api/tests/opentrons/protocol_runner/test_run_orchestrator.py new file mode 100644 index 000000000000..bedc11a8f037 --- /dev/null +++ b/api/tests/opentrons/protocol_runner/test_run_orchestrator.py @@ -0,0 +1,496 @@ +"""Tests for the RunOrchestrator.""" +from pathlib import Path + +import pytest +from datetime import datetime + +from pytest_lazyfixture import lazy_fixture # type: ignore[import-untyped] +from decoy import Decoy +from typing import Union + +from opentrons.protocols.api_support.types import APIVersion +from opentrons.protocol_engine import ProtocolEngine +from opentrons.protocol_engine.types import PostRunHardwareState +from opentrons.protocol_engine import commands as pe_commands +from opentrons.hardware_control import API as HardwareAPI +from opentrons.protocol_reader import ( + JsonProtocolConfig, + PythonProtocolConfig, + ProtocolSource, +) +from opentrons.protocol_runner.run_orchestrator import RunOrchestrator, RunNotFound +from opentrons import protocol_runner +from opentrons.protocol_runner.protocol_runner import ( + JsonRunner, + PythonAndLegacyRunner, + LiveRunner, +) +from opentrons.protocols.parse import PythonParseMode + + +@pytest.fixture +def mock_protocol_python_runner(decoy: Decoy) -> PythonAndLegacyRunner: + """Get a mocked out PythonAndLegacyRunner dependency.""" + return decoy.mock(cls=PythonAndLegacyRunner) + + +@pytest.fixture +def mock_protocol_json_runner(decoy: Decoy) -> JsonRunner: + """Get a mocked out PythonAndLegacyRunner dependency.""" + return decoy.mock(cls=JsonRunner) + + +@pytest.fixture +def mock_setup_runner(decoy: Decoy) -> LiveRunner: + """Get a mocked out LiveRunner dependency.""" + return decoy.mock(cls=LiveRunner) + + +@pytest.fixture +def mock_fixit_runner(decoy: Decoy) -> LiveRunner: + """Get a mocked out LiveRunner dependency.""" + return decoy.mock(cls=LiveRunner) + + +@pytest.fixture +def mock_protocol_live_runner(decoy: Decoy) -> LiveRunner: + """Get a mock of a LiveRunner for protocol commands.""" + return decoy.mock(cls=LiveRunner) + + +@pytest.fixture +def mock_protocol_engine(decoy: Decoy) -> ProtocolEngine: + """Get a mocked out ProtocolEngine dependency.""" + return decoy.mock(cls=ProtocolEngine) + + +@pytest.fixture +def mock_hardware_api(decoy: Decoy) -> HardwareAPI: + """Get a mocked out HardwareAPI dependency.""" + return decoy.mock(cls=HardwareAPI) + + +@pytest.fixture +def json_protocol_subject( + mock_protocol_engine: ProtocolEngine, + mock_hardware_api: HardwareAPI, + mock_protocol_json_runner: JsonRunner, + mock_fixit_runner: LiveRunner, + mock_setup_runner: LiveRunner, + mock_protocol_live_runner: LiveRunner, +) -> RunOrchestrator: + """Get a RunOrchestrator subject with a json runner.""" + return RunOrchestrator( + protocol_engine=mock_protocol_engine, + hardware_api=mock_hardware_api, + fixit_runner=mock_fixit_runner, + setup_runner=mock_setup_runner, + json_or_python_protocol_runner=mock_protocol_json_runner, + protocol_live_runner=mock_protocol_live_runner, + ) + + +@pytest.fixture +def python_protocol_subject( + mock_protocol_engine: ProtocolEngine, + mock_hardware_api: HardwareAPI, + mock_protocol_python_runner: PythonAndLegacyRunner, + mock_fixit_runner: LiveRunner, + mock_setup_runner: LiveRunner, + mock_protocol_live_runner: LiveRunner, +) -> RunOrchestrator: + """Get a RunOrchestrator subject with a python runner.""" + return RunOrchestrator( + protocol_engine=mock_protocol_engine, + hardware_api=mock_hardware_api, + fixit_runner=mock_fixit_runner, + setup_runner=mock_setup_runner, + json_or_python_protocol_runner=mock_protocol_python_runner, + protocol_live_runner=mock_protocol_live_runner, + ) + + +@pytest.fixture +def live_protocol_subject( + mock_protocol_engine: ProtocolEngine, + mock_hardware_api: HardwareAPI, + mock_fixit_runner: LiveRunner, + mock_setup_runner: LiveRunner, + mock_protocol_live_runner: LiveRunner, +) -> RunOrchestrator: + """Get a RunOrchestrator subject with a live runner.""" + return RunOrchestrator( + protocol_engine=mock_protocol_engine, + hardware_api=mock_hardware_api, + fixit_runner=mock_fixit_runner, + setup_runner=mock_setup_runner, + protocol_live_runner=mock_protocol_live_runner, + ) + + +@pytest.mark.parametrize( + "input_protocol_config, mock_protocol_runner, subject", + [ + ( + JsonProtocolConfig(schema_version=7), + lazy_fixture("mock_protocol_json_runner"), + lazy_fixture("json_protocol_subject"), + ), + ( + PythonProtocolConfig(api_version=APIVersion(2, 14)), + lazy_fixture("mock_protocol_python_runner"), + lazy_fixture("python_protocol_subject"), + ), + ], +) +def test_build_run_orchestrator_provider( + decoy: Decoy, + monkeypatch: pytest.MonkeyPatch, + subject: RunOrchestrator, + mock_protocol_engine: ProtocolEngine, + mock_hardware_api: HardwareAPI, + input_protocol_config: Union[PythonProtocolConfig, JsonProtocolConfig], + mock_setup_runner: LiveRunner, + mock_fixit_runner: LiveRunner, + mock_protocol_runner: Union[PythonAndLegacyRunner, JsonRunner], +) -> None: + """Should get a RunOrchestrator instance.""" + mock_create_runner_func = decoy.mock(func=protocol_runner.create_protocol_runner) + monkeypatch.setattr( + protocol_runner, "create_protocol_runner", mock_create_runner_func + ) + + decoy.when( + mock_create_runner_func( + protocol_config=input_protocol_config, + protocol_engine=mock_protocol_engine, + hardware_api=mock_hardware_api, + post_run_hardware_state=PostRunHardwareState.HOME_AND_STAY_ENGAGED, + drop_tips_after_run=True, + ) + ).then_return(mock_protocol_runner) + + result = subject.build_orchestrator( + protocol_engine=mock_protocol_engine, + hardware_api=mock_hardware_api, + protocol_config=input_protocol_config, + ) + + assert isinstance(result, RunOrchestrator) + assert isinstance(result._setup_runner, LiveRunner) + assert isinstance(result._fixit_runner, LiveRunner) + assert isinstance(result._protocol_runner, (PythonAndLegacyRunner, JsonRunner)) + + +@pytest.mark.parametrize( + "subject, runner", + [ + ( + lazy_fixture("json_protocol_subject"), + lazy_fixture("mock_protocol_json_runner"), + ), + ( + lazy_fixture("python_protocol_subject"), + lazy_fixture("mock_protocol_python_runner"), + ), + ], +) +async def test_run_calls_protocol_runner( + subject: RunOrchestrator, + runner: Union[JsonRunner, PythonAndLegacyRunner], + decoy: Decoy, +) -> None: + """Should call protocol runner run method.""" + await subject.run(deck_configuration=[]) + decoy.verify(await runner.run(deck_configuration=[])) + + +async def test_run_calls_protocol_live_runner( + live_protocol_subject: RunOrchestrator, + mock_protocol_live_runner: LiveRunner, + decoy: Decoy, +) -> None: + """Should call protocol runner run method.""" + await live_protocol_subject.run(deck_configuration=[]) + decoy.verify(await mock_protocol_live_runner.run(deck_configuration=[])) + + +def test_get_run_time_parameters_returns_an_empty_list_no_protocol( + live_protocol_subject: RunOrchestrator, +) -> None: + """Should return an empty list in case the protocol runner is not initialized.""" + result = live_protocol_subject.get_run_time_parameters() + assert result == [] + + +def test_get_run_time_parameters_returns_an_empty_list_json_runner( + decoy: Decoy, + mock_protocol_json_runner: JsonRunner, + json_protocol_subject: RunOrchestrator, +) -> None: + """Should return an empty list in case the protocol runner is a json runner.""" + decoy.when(mock_protocol_json_runner.run_time_parameters).then_return([]) + result = json_protocol_subject.get_run_time_parameters() + assert result == [] + + +@pytest.mark.parametrize( + "wait_for_interval_input, verify_calls", [(True, 1), (False, 0)] +) +async def test_add_command_and_wait_for_interval( + decoy: Decoy, + json_protocol_subject: RunOrchestrator, + mock_protocol_engine: ProtocolEngine, + wait_for_interval_input: bool, + verify_calls: int, +) -> None: + """Should add a command a wait for it to complete.""" + load_command = pe_commands.HomeCreate.construct( + params=pe_commands.HomeParams.construct() + ) + added_command = pe_commands.Home( + params=pe_commands.HomeParams.construct(), + id="test-123", + createdAt=datetime(year=2024, month=1, day=1), + key="123", + status=pe_commands.CommandStatus.QUEUED, + ) + decoy.when( + mock_protocol_engine.add_command(request=load_command, failed_command_id=None) + ).then_return(added_command) + + result = await json_protocol_subject.add_command_and_wait_for_interval( + command=load_command, wait_until_complete=wait_for_interval_input, timeout=999 + ) + + assert result == added_command + + decoy.verify( + await mock_protocol_engine.wait_for_command(command_id="test-123"), + times=verify_calls, + ) + + +def test_estop( + decoy: Decoy, + live_protocol_subject: RunOrchestrator, + mock_protocol_engine: ProtocolEngine, +) -> None: + """Verify an estop call.""" + live_protocol_subject.estop() + decoy.verify(mock_protocol_engine.estop()) + + +async def test_use_attached_modules( + decoy: Decoy, + live_protocol_subject: RunOrchestrator, + mock_protocol_engine: ProtocolEngine, +) -> None: + """Verify a call to use_attached_modules.""" + await live_protocol_subject.use_attached_modules(modules_by_id={}) + decoy.verify(await mock_protocol_engine.use_attached_modules({})) + + +def test_get_protocol_runner( + json_protocol_subject: RunOrchestrator, + python_protocol_subject: RunOrchestrator, + live_protocol_subject: RunOrchestrator, +) -> None: + """Should return the equivalent runner.""" + json_runner = json_protocol_subject.get_protocol_runner() + assert isinstance(json_runner, JsonRunner) + + python_runner = python_protocol_subject.get_protocol_runner() + assert isinstance(python_runner, PythonAndLegacyRunner) + + live_runner = live_protocol_subject.get_protocol_runner() + assert live_runner is None + + +async def test_load_json( + decoy: Decoy, + json_protocol_subject: RunOrchestrator, + mock_protocol_engine: ProtocolEngine, + mock_protocol_json_runner: JsonRunner, +) -> None: + """Should load a json protocol runner.""" + protocol_source = ProtocolSource( + directory=Path("/dev/null"), + main_file=Path("/dev/null/abc.json"), + files=[], + metadata={}, + robot_type="OT-2 Standard", + config=JsonProtocolConfig(schema_version=6), + content_hash="abc123", + ) + await json_protocol_subject.load_json(protocol_source=protocol_source) + + decoy.verify(await mock_protocol_json_runner.load(protocol_source)) + + +async def test_load_python( + decoy: Decoy, + python_protocol_subject: RunOrchestrator, + mock_protocol_engine: ProtocolEngine, + mock_protocol_python_runner: PythonAndLegacyRunner, +) -> None: + """Should load a json protocol runner.""" + protocol_source = ProtocolSource( + directory=Path("/dev/null"), + main_file=Path("/dev/null/abc.json"), + files=[], + metadata={}, + robot_type="OT-2 Standard", + config=JsonProtocolConfig(schema_version=6), + content_hash="abc123", + ) + await python_protocol_subject.load_python( + protocol_source=protocol_source, + python_parse_mode=PythonParseMode.NORMAL, + run_time_param_values=None, + ) + + decoy.verify( + await mock_protocol_python_runner.load( + protocol_source=protocol_source, + python_parse_mode=PythonParseMode.NORMAL, + run_time_param_values=None, + ) + ) + + +async def test_load_json_raises_no_protocol( + decoy: Decoy, + live_protocol_subject: RunOrchestrator, + mock_protocol_engine: ProtocolEngine, +) -> None: + """Should raise that there is no protocol runner.""" + protocol_source = ProtocolSource( + directory=Path("/dev/null"), + main_file=Path("/dev/null/abc.json"), + files=[], + metadata={}, + robot_type="OT-2 Standard", + config=JsonProtocolConfig(schema_version=6), + content_hash="abc123", + ) + with pytest.raises(AssertionError): + await live_protocol_subject.load_json(protocol_source=protocol_source) + + +async def test_load_json_raises_no_runner_match( + decoy: Decoy, + json_protocol_subject: RunOrchestrator, + mock_protocol_engine: ProtocolEngine, +) -> None: + """Should raise that there is no protocol runner.""" + protocol_source = ProtocolSource( + directory=Path("/dev/null"), + main_file=Path("/dev/null/abc.json"), + files=[], + metadata={}, + robot_type="OT-2 Standard", + config=JsonProtocolConfig(schema_version=6), + content_hash="abc123", + ) + with pytest.raises(AssertionError): + await json_protocol_subject.load_python( + protocol_source=protocol_source, + python_parse_mode=PythonParseMode.NORMAL, + run_time_param_values=None, + ) + + +def test_get_run_id( + mock_protocol_engine: ProtocolEngine, + mock_hardware_api: HardwareAPI, + mock_fixit_runner: LiveRunner, + mock_setup_runner: LiveRunner, + mock_protocol_live_runner: LiveRunner, +) -> None: + """Should get run_id if builder was created with a run id.""" + orchestrator = RunOrchestrator( + run_id="test-123", + protocol_engine=mock_protocol_engine, + hardware_api=mock_hardware_api, + fixit_runner=mock_fixit_runner, + setup_runner=mock_setup_runner, + protocol_live_runner=mock_protocol_live_runner, + ) + assert orchestrator.run_id == "test-123" + + +def test_get_run_id_raises( + mock_protocol_engine: ProtocolEngine, + mock_hardware_api: HardwareAPI, + mock_fixit_runner: LiveRunner, + mock_setup_runner: LiveRunner, + mock_protocol_live_runner: LiveRunner, +) -> None: + """Should get run_id if builder was created with a run id.""" + orchestrator = RunOrchestrator( + protocol_engine=mock_protocol_engine, + hardware_api=mock_hardware_api, + fixit_runner=mock_fixit_runner, + setup_runner=mock_setup_runner, + protocol_live_runner=mock_protocol_live_runner, + ) + with pytest.raises(RunNotFound): + orchestrator.run_id + + +def test_get_is_okay_to_clear( + decoy: Decoy, + mock_protocol_engine: ProtocolEngine, + live_protocol_subject: RunOrchestrator, +) -> None: + """Should return if is ok to clear run or not.""" + decoy.when( + mock_protocol_engine.state_view.commands.get_is_okay_to_clear() + ).then_return(True) + result = live_protocol_subject.get_is_okay_to_clear() + + assert result is True + + decoy.when( + mock_protocol_engine.state_view.commands.get_is_okay_to_clear() + ).then_return(False) + result = live_protocol_subject.get_is_okay_to_clear() + + assert result is False + + +def test_prepare( + decoy: Decoy, + live_protocol_subject: RunOrchestrator, + mock_protocol_live_runner: LiveRunner, +) -> None: + """Verify prepare calls runner prepare.""" + live_protocol_subject.prepare() + decoy.verify(mock_protocol_live_runner.prepare()) + + +async def test_stop( + decoy: Decoy, + mock_protocol_engine: ProtocolEngine, + live_protocol_subject: RunOrchestrator, +) -> None: + """Should verify a call to stop/finish the run.""" + decoy.when(mock_protocol_engine.state_view.commands.has_been_played()).then_return( + True + ) + await live_protocol_subject.stop() + decoy.verify(await mock_protocol_engine.request_stop()) + + decoy.when(mock_protocol_engine.state_view.commands.has_been_played()).then_return( + False + ) + await live_protocol_subject.stop() + decoy.verify( + await mock_protocol_engine.finish( + error=None, + drop_tips_after_run=False, + set_run_status=False, + post_run_hardware_state=PostRunHardwareState.STAY_ENGAGED_IN_PLACE, + ) + ) diff --git a/api/tests/opentrons/protocol_runner/test_run_orchestrator_provider.py b/api/tests/opentrons/protocol_runner/test_run_orchestrator_provider.py deleted file mode 100644 index 4c25ae28a4ea..000000000000 --- a/api/tests/opentrons/protocol_runner/test_run_orchestrator_provider.py +++ /dev/null @@ -1,143 +0,0 @@ -"""Tests for the RunOrchestrator.""" -import pytest -from pytest_lazyfixture import lazy_fixture # type: ignore[import-untyped] -from decoy import Decoy -from typing import Union - -from opentrons.protocols.api_support.types import APIVersion -from opentrons.protocol_engine import ProtocolEngine -from opentrons.protocol_engine.types import PostRunHardwareState -from opentrons.hardware_control import API as HardwareAPI -from opentrons.protocol_reader import JsonProtocolConfig, PythonProtocolConfig -from opentrons.protocol_runner.run_orchestrator import RunOrchestrator -from opentrons import protocol_runner -from opentrons.protocol_runner.protocol_runner import ( - JsonRunner, - PythonAndLegacyRunner, - LiveRunner, -) - - -@pytest.fixture -def mock_protocol_python_runner(decoy: Decoy) -> PythonAndLegacyRunner: - """Get a mocked out PythonAndLegacyRunner dependency.""" - return decoy.mock(cls=PythonAndLegacyRunner) - - -@pytest.fixture -def mock_protocol_json_runner(decoy: Decoy) -> JsonRunner: - """Get a mocked out PythonAndLegacyRunner dependency.""" - return decoy.mock(cls=JsonRunner) - - -@pytest.fixture -def mock_setup_runner(decoy: Decoy) -> LiveRunner: - """Get a mocked out LiveRunner dependency.""" - return decoy.mock(cls=LiveRunner) - - -@pytest.fixture -def mock_fixit_runner(decoy: Decoy) -> LiveRunner: - """Get a mocked out LiveRunner dependency.""" - return decoy.mock(cls=LiveRunner) - - -@pytest.fixture -def mock_protocol_engine(decoy: Decoy) -> ProtocolEngine: - """Get a mocked out ProtocolEngine dependency.""" - return decoy.mock(cls=ProtocolEngine) - - -@pytest.fixture -def mock_hardware_api(decoy: Decoy) -> HardwareAPI: - """Get a mocked out HardwareAPI dependency.""" - return decoy.mock(cls=HardwareAPI) - - -@pytest.fixture -def json_protocol_subject( - mock_protocol_engine: ProtocolEngine, - mock_hardware_api: HardwareAPI, - mock_protocol_json_runner: JsonRunner, - mock_fixit_runner: LiveRunner, - mock_setup_runner: LiveRunner, -) -> RunOrchestrator: - """Get a RunOrchestrator subject with a json runner.""" - return RunOrchestrator( - protocol_engine=mock_protocol_engine, - hardware_api=mock_hardware_api, - fixit_runner=mock_fixit_runner, - setup_runner=mock_setup_runner, - json_or_python_protocol_runner=mock_protocol_json_runner, - ) - - -@pytest.fixture -def python_protocol_subject( - mock_protocol_engine: ProtocolEngine, - mock_hardware_api: HardwareAPI, - mock_protocol_python_runner: PythonAndLegacyRunner, - mock_fixit_runner: LiveRunner, - mock_setup_runner: LiveRunner, -) -> RunOrchestrator: - """Get a RunOrchestrator subject with a python runner.""" - return RunOrchestrator( - protocol_engine=mock_protocol_engine, - hardware_api=mock_hardware_api, - fixit_runner=mock_fixit_runner, - setup_runner=mock_setup_runner, - json_or_python_protocol_runner=mock_protocol_python_runner, - ) - - -@pytest.mark.parametrize( - "input_protocol_config, mock_protocol_runner, subject", - [ - ( - JsonProtocolConfig(schema_version=7), - lazy_fixture("mock_protocol_json_runner"), - lazy_fixture("json_protocol_subject"), - ), - ( - PythonProtocolConfig(api_version=APIVersion(2, 14)), - lazy_fixture("mock_protocol_python_runner"), - lazy_fixture("python_protocol_subject"), - ), - ], -) -def test_build_run_orchestrator_provider( - decoy: Decoy, - monkeypatch: pytest.MonkeyPatch, - subject: RunOrchestrator, - mock_protocol_engine: ProtocolEngine, - mock_hardware_api: HardwareAPI, - input_protocol_config: Union[PythonProtocolConfig, JsonProtocolConfig], - mock_setup_runner: LiveRunner, - mock_fixit_runner: LiveRunner, - mock_protocol_runner: Union[PythonAndLegacyRunner, JsonRunner], -) -> None: - """Should get a RunOrchestrator instance.""" - mock_create_runner_func = decoy.mock(func=protocol_runner.create_protocol_runner) - monkeypatch.setattr( - protocol_runner, "create_protocol_runner", mock_create_runner_func - ) - - decoy.when( - mock_create_runner_func( - protocol_config=input_protocol_config, - protocol_engine=mock_protocol_engine, - hardware_api=mock_hardware_api, - post_run_hardware_state=PostRunHardwareState.HOME_AND_STAY_ENGAGED, - drop_tips_after_run=True, - ) - ).then_return(mock_protocol_runner) - - result = subject.build_orchestrator( - protocol_engine=mock_protocol_engine, - hardware_api=mock_hardware_api, - protocol_config=input_protocol_config, - ) - - assert isinstance(result, RunOrchestrator) - assert isinstance(result._setup_runner, LiveRunner) - assert isinstance(result._fixit_runner, LiveRunner) diff --git a/robot-server/robot_server/commands/get_default_engine.py b/robot-server/robot_server/commands/get_default_orchestrator.py similarity index 81% rename from robot-server/robot_server/commands/get_default_engine.py rename to robot-server/robot_server/commands/get_default_orchestrator.py index 385b6eaba78f..7282b1c70486 100644 --- a/robot-server/robot_server/commands/get_default_engine.py +++ b/robot-server/robot_server/commands/get_default_orchestrator.py @@ -4,7 +4,7 @@ from fastapi import Depends, status from opentrons.hardware_control import HardwareControlAPI -from opentrons.protocol_engine import ProtocolEngine +from opentrons.protocol_runner import RunOrchestrator from opentrons_shared_data.errors import ErrorCodes @@ -30,14 +30,14 @@ class RunActive(ErrorDetails): errorCode: str = ErrorCodes.ROBOT_IN_USE.value.code -async def get_default_engine( +async def get_default_orchestrator( engine_store: EngineStore = Depends(get_engine_store), hardware_api: HardwareControlAPI = Depends(get_hardware), module_identifier: ModuleIdentifier = Depends(ModuleIdentifier), -) -> ProtocolEngine: - """Get the default engine with attached modules loaded.""" +) -> RunOrchestrator: + """Get the default run orchestrator with attached modules loaded.""" try: - engine = await engine_store.get_default_engine() + orchestrator = await engine_store.get_default_orchestrator() except EngineConflictError as e: raise RunActive.from_exc(e).as_error(status.HTTP_409_CONFLICT) from e @@ -47,6 +47,6 @@ async def get_default_engine( for mod in attached_modules } - await engine.use_attached_modules(attached_module_spec) + await orchestrator.use_attached_modules(attached_module_spec) - return engine + return orchestrator diff --git a/robot-server/robot_server/commands/router.py b/robot-server/robot_server/commands/router.py index 0d617e38a5a7..15f2d2d38608 100644 --- a/robot-server/robot_server/commands/router.py +++ b/robot-server/robot_server/commands/router.py @@ -2,11 +2,13 @@ from typing import List, Optional, cast from typing_extensions import Final, Literal -from anyio import move_on_after from fastapi import APIRouter, Depends, Query, status -from opentrons.protocol_engine import ProtocolEngine, CommandIntent +from opentrons.protocol_engine import CommandIntent from opentrons.protocol_engine.errors import CommandDoesNotExistError + +from opentrons.protocol_runner import RunOrchestrator + from opentrons_shared_data.errors import ErrorCodes from robot_server.errors.error_responses import ErrorDetails, ErrorBody @@ -18,7 +20,7 @@ PydanticResponse, ) -from .get_default_engine import get_default_engine, RunActive +from .get_default_orchestrator import get_default_orchestrator, RunActive from .stateless_commands import StatelessCommand, StatelessCommandCreate _DEFAULT_COMMAND_LIST_LENGTH: Final = 20 @@ -91,7 +93,7 @@ async def create_command( " the default was 30 seconds, not infinite." ), ), - engine: ProtocolEngine = Depends(get_default_engine), + orchestrator: RunOrchestrator = Depends(get_default_orchestrator), ) -> PydanticResponse[SimpleBody[StatelessCommand]]: """Enqueue and execute a command. @@ -102,17 +104,14 @@ async def create_command( Else, return immediately. Comes from a query parameter in the URL. timeout: The maximum time, in seconds, to wait before returning. Comes from a query parameter in the URL. - engine: The `ProtocolEngine` on which the command will be enqueued. + orchestrator: The `RunOrchestrator` handling engine for command to be enqueued. """ command_create = request_body.data.copy(update={"intent": CommandIntent.SETUP}) - command = engine.add_command(command_create) - - if waitUntilComplete: - timeout_sec = None if timeout is None else timeout / 1000.0 - with move_on_after(timeout_sec): - await engine.wait_for_command(command.id) + command = await orchestrator.add_command_and_wait_for_interval( + command=command_create, wait_until_complete=waitUntilComplete, timeout=timeout + ) - response_data = cast(StatelessCommand, engine.state_view.commands.get(command.id)) + response_data = cast(StatelessCommand, orchestrator.get_command(command.id)) return await PydanticResponse.create( content=SimpleBody.construct(data=response_data), @@ -134,7 +133,7 @@ async def create_command( }, ) async def get_commands_list( - engine: ProtocolEngine = Depends(get_default_engine), + orchestrator: RunOrchestrator = Depends(get_default_orchestrator), cursor: Optional[int] = Query( None, description=( @@ -151,11 +150,11 @@ async def get_commands_list( """Get a list of stateless commands. Arguments: - engine: Protocol engine with commands. + orchestrator: Run orchestrator with commands. cursor: Cursor index for the collection response. pageLength: Maximum number of items to return. """ - cmd_slice = engine.state_view.commands.get_slice(cursor=cursor, length=pageLength) + cmd_slice = orchestrator.get_command_slice(cursor=cursor, length=pageLength) commands = cast(List[StatelessCommand], cmd_slice.commands) meta = MultiBodyMeta(cursor=cmd_slice.cursor, totalLength=cmd_slice.total_length) @@ -181,16 +180,16 @@ async def get_commands_list( ) async def get_command( commandId: str, - engine: ProtocolEngine = Depends(get_default_engine), + orchestrator: RunOrchestrator = Depends(get_default_orchestrator), ) -> PydanticResponse[SimpleBody[StatelessCommand]]: """Get a single stateless command. Arguments: commandId: Command identifier from the URL parameter. - engine: Protocol engine with commands. + orchestrator: Run orchestrator with commands. """ try: - command = engine.state_view.commands.get(commandId) + command = orchestrator.get_command(commandId) except CommandDoesNotExistError as e: raise CommandNotFound.from_exc(e).as_error(status.HTTP_404_NOT_FOUND) from e diff --git a/robot-server/robot_server/runs/dependencies.py b/robot-server/robot_server/runs/dependencies.py index 8ff687464e2b..f29c47d4f55c 100644 --- a/robot-server/robot_server/runs/dependencies.py +++ b/robot-server/robot_server/runs/dependencies.py @@ -27,7 +27,7 @@ ) from .run_auto_deleter import RunAutoDeleter -from .engine_store import EngineStore, NoRunnerEngineError +from .engine_store import EngineStore, NoRunOrchestrator from .run_store import RunStore from .run_data_manager import RunDataManager from robot_server.errors.robot_errors import ( @@ -130,13 +130,10 @@ async def get_is_okay_to_create_maintenance_run( ) -> bool: """Whether a maintenance run can be created if a protocol run already exists.""" try: - protocol_run_state = engine_store.engine.state_view - except NoRunnerEngineError: + orchestrator = engine_store.run_orchestrator + except NoRunOrchestrator: return True - return ( - not protocol_run_state.commands.has_been_played() - or protocol_run_state.commands.get_is_terminal() - ) + return not orchestrator.run_has_started() or orchestrator.get_is_run_terminal() async def get_run_data_manager( diff --git a/robot-server/robot_server/runs/engine_store.py b/robot-server/robot_server/runs/engine_store.py index 3e630cef0ec9..6daaafe59f38 100644 --- a/robot-server/robot_server/runs/engine_store.py +++ b/robot-server/robot_server/runs/engine_store.py @@ -4,7 +4,9 @@ from typing import List, Optional, Callable from opentrons.protocol_engine.errors.exceptions import EStopActivatedError -from opentrons.protocol_engine.types import PostRunHardwareState +from opentrons.protocol_engine.types import PostRunHardwareState, RunTimeParameter + +from opentrons_shared_data.labware.labware_definition import LabwareDefinition from opentrons_shared_data.robot.dev_types import RobotType from opentrons_shared_data.robot.dev_types import RobotTypeEnum @@ -19,7 +21,6 @@ from opentrons.protocols.parse import PythonParseMode from opentrons.protocols.api_support.deck_type import should_load_fixed_trash from opentrons.protocol_runner import ( - AnyRunner, JsonRunner, PythonAndLegacyRunner, RunResult, @@ -29,16 +30,22 @@ Config as ProtocolEngineConfig, DeckType, LabwareOffsetCreate, - ProtocolEngine, StateSummary, create_protocol_engine, + CommandSlice, + CommandPointer, + Command, + CommandCreate, + LabwareOffset, ) from robot_server.protocols.protocol_store import ProtocolResource from opentrons.protocol_engine.types import ( DeckConfigurationType, RunTimeParamValuesType, + EngineStatus, ) +from opentrons_shared_data.labware.dev_types import LabwareUri _log = logging.getLogger(__name__) @@ -52,8 +59,8 @@ class EngineConflictError(RuntimeError): """ -class NoRunnerEngineError(RuntimeError): - """Raised if you try to get the current engine or runner while there is none.""" +class NoRunOrchestrator(RuntimeError): + """Raised if you try to get the current run orchestrator while there is none.""" async def handle_estop_event(engine_store: "EngineStore", event: HardwareEvent) -> None: @@ -72,8 +79,8 @@ async def handle_estop_event(engine_store: "EngineStore", event: HardwareEvent) return # todo(mm, 2024-04-17): This estop teardown sequencing belongs in the # runner layer. - engine_store.engine.estop() - await engine_store.engine.finish(error=EStopActivatedError()) + engine_store.run_orchestrator.estop() + await engine_store.run_orchestrator.finish(error=EStopActivatedError()) except Exception: # This is a background task kicked off by a hardware event, # so there's no one to propagate this exception to. @@ -119,50 +126,40 @@ def __init__( self._hardware_api = hardware_api self._robot_type = robot_type self._deck_type = deck_type - self._default_engine: Optional[ProtocolEngine] = None + self._default_run_orchestrator: Optional[RunOrchestrator] = None hardware_api.register_callback(_get_estop_listener(self)) @property - def engine(self) -> ProtocolEngine: - """Get the "current" persisted ProtocolEngine.""" - if self._run_orchestrator is None: - raise NoRunnerEngineError() - return self._run_orchestrator.engine - - @property - def runner(self) -> AnyRunner: - """Get the "current" persisted ProtocolRunner.""" + def run_orchestrator(self) -> RunOrchestrator: + """Get the "current" RunOrchestrator.""" if self._run_orchestrator is None: - raise NoRunnerEngineError() - return self._run_orchestrator.runner + raise NoRunOrchestrator() + return self._run_orchestrator @property def current_run_id(self) -> Optional[str]: """Get the run identifier associated with the current engine/runner pair.""" return ( - self._run_orchestrator.run_id - if self._run_orchestrator is not None - else None + self.run_orchestrator.run_id if self._run_orchestrator is not None else None ) - # TODO(tz, 2024-5-14): remove this once its all redirected via orchestrator # TODO(mc, 2022-03-21): this resource locking is insufficient; # come up with something more sophisticated without race condition holes. - async def get_default_engine(self) -> ProtocolEngine: - """Get a "default" ProtocolEngine to use outside the context of a run. + async def get_default_orchestrator(self) -> RunOrchestrator: + """Get a "default" RunOrchestrator to use outside the context of a run. Raises: EngineConflictError: if a run-specific engine is active. """ if ( self._run_orchestrator is not None - and self.engine.state_view.commands.has_been_played() - and not self.engine.state_view.commands.get_is_stopped() + and self.run_orchestrator.run_has_started() + and not self.run_orchestrator.run_has_stopped() ): raise EngineConflictError("An engine for a run is currently active") - engine = self._default_engine - if engine is None: + default_orchestrator = self._default_run_orchestrator + if default_orchestrator is None: # TODO(mc, 2022-03-21): potential race condition engine = await create_protocol_engine( hardware_api=self._hardware_api, @@ -172,8 +169,11 @@ async def get_default_engine(self) -> ProtocolEngine: block_on_door_open=False, ), ) - self._default_engine = engine - return engine + self._default_run_orchestrator = RunOrchestrator.build_orchestrator( + protocol_engine=engine, hardware_api=self._hardware_api + ) + return self._default_run_orchestrator + return default_orchestrator async def create( self, @@ -235,15 +235,16 @@ async def create( drop_tips_after_run=drop_tips_after_run, ) + runner = self.run_orchestrator.get_protocol_runner() # FIXME(mm, 2022-12-21): These `await runner.load()`s introduce a # concurrency hazard. If two requests simultaneously call this method, # they will both "succeed" (with undefined results) instead of one # raising EngineConflictError. - if isinstance(self.runner, PythonAndLegacyRunner): + if isinstance(runner, PythonAndLegacyRunner): assert ( protocol is not None ), "A Python protocol should have a protocol source file." - await self.runner.load( + await self.run_orchestrator.load_python( protocol.source, # Conservatively assume that we're re-running a protocol that # was uploaded before we added stricter validation, and that @@ -251,18 +252,18 @@ async def create( python_parse_mode=PythonParseMode.ALLOW_LEGACY_METADATA_AND_REQUIREMENTS, run_time_param_values=run_time_param_values, ) - elif isinstance(self.runner, JsonRunner): + elif isinstance(runner, JsonRunner): assert ( protocol is not None ), "A JSON protocol should have a protocol source file." - await self.runner.load(protocol.source) + await self.run_orchestrator.load_json(protocol.source) else: - self.runner.prepare() + self.run_orchestrator.prepare() for offset in labware_offsets: - engine.add_labware_offset(offset) + self.run_orchestrator.add_labware_offset(offset) - return engine.state_view.get_summary() + return self.run_orchestrator.get_state_summary() async def clear(self) -> RunResult: """Remove the persisted ProtocolEngine. @@ -271,10 +272,8 @@ async def clear(self) -> RunResult: EngineConflictError: The current runner/engine pair is not idle, so they cannot be cleared. """ - engine = self.engine - runner = self.runner - if engine.state_view.commands.get_is_okay_to_clear(): - await engine.finish( + if self.run_orchestrator.get_is_okay_to_clear(): + await self.run_orchestrator.finish( drop_tips_after_run=False, set_run_status=False, post_run_hardware_state=PostRunHardwareState.STAY_ENGAGED_IN_PLACE, @@ -282,12 +281,108 @@ async def clear(self) -> RunResult: else: raise EngineConflictError("Current run is not idle or stopped.") - run_data = engine.state_view.get_summary() - commands = engine.state_view.commands.get_all() - run_time_parameters = runner.run_time_parameters if runner else [] + run_data = self.run_orchestrator.get_state_summary() + commands = self.run_orchestrator.get_all_commands() + run_time_parameters = self.run_orchestrator.get_run_time_parameters() self._run_orchestrator = None return RunResult( state_summary=run_data, commands=commands, parameters=run_time_parameters ) + + def play(self, deck_configuration: Optional[DeckConfigurationType] = None) -> None: + """Start or resume the run.""" + self.run_orchestrator.play(deck_configuration=deck_configuration) + + async def run(self, deck_configuration: DeckConfigurationType) -> RunResult: + """Start the run.""" + return await self.run_orchestrator.run(deck_configuration=deck_configuration) + + def pause(self) -> None: + """Pause the run.""" + self.run_orchestrator.pause() + + async def stop(self) -> None: + """Stop the run.""" + await self.run_orchestrator.stop() + + def resume_from_recovery(self) -> None: + """Resume the run from recovery mode.""" + self.run_orchestrator.resume_from_recovery() + + async def finish(self, error: Optional[Exception]) -> None: + """Finish the run.""" + await self.run_orchestrator.finish(error=error) + + def get_state_summary(self) -> StateSummary: + """Get protocol run data.""" + return self.run_orchestrator.get_state_summary() + + def get_loaded_labware_definitions(self) -> List[LabwareDefinition]: + """Get loaded labware definitions.""" + return self.run_orchestrator.get_loaded_labware_definitions() + + def get_run_time_parameters(self) -> List[RunTimeParameter]: + """Parameter definitions defined by protocol, if any. Will always be empty before execution.""" + return self.run_orchestrator.get_run_time_parameters() + + def get_current_command(self) -> Optional[CommandPointer]: + """Get the current running command.""" + return self.run_orchestrator.get_current_command() + + def get_command_slice( + self, + cursor: Optional[int], + length: int, + ) -> CommandSlice: + """Get a slice of run commands. + + Args: + cursor: Requested index of first command in the returned slice. + length: Length of slice to return. + """ + return self.run_orchestrator.get_command_slice(cursor=cursor, length=length) + + def get_command_recovery_target(self) -> Optional[CommandPointer]: + """Get the current error recovery target.""" + return self.run_orchestrator.get_command_recovery_target() + + def get_command(self, command_id: str) -> Command: + """Get a run's command by ID.""" + return self.run_orchestrator.get_command(command_id=command_id) + + def get_status(self) -> EngineStatus: + """Get the current execution status of the engine.""" + return self.run_orchestrator.get_run_status() + + def get_is_run_terminal(self) -> bool: + """Get whether engine is in a terminal state.""" + return self.run_orchestrator.get_is_run_terminal() + + def run_was_started(self) -> bool: + """Get whether the run has started.""" + return self.run_orchestrator.run_has_started() + + def add_labware_offset(self, request: LabwareOffsetCreate) -> LabwareOffset: + """Add a new labware offset to state.""" + return self.run_orchestrator.add_labware_offset(request) + + def add_labware_definition(self, definition: LabwareDefinition) -> LabwareUri: + """Add a new labware definition to state.""" + return self.run_orchestrator.add_labware_definition(definition) + + async def add_command_and_wait_for_interval( + self, + request: CommandCreate, + wait_until_complete: bool = False, + timeout: Optional[int] = None, + failed_command_id: Optional[str] = None, + ) -> Command: + """Add a new command to execute and wait for it to complete if needed.""" + return await self.run_orchestrator.add_command_and_wait_for_interval( + command=request, + failed_command_id=failed_command_id, + wait_until_complete=wait_until_complete, + timeout=timeout, + ) diff --git a/robot-server/robot_server/runs/light_control_task.py b/robot-server/robot_server/runs/light_control_task.py index 1cb2ad716164..399a2c6f3173 100644 --- a/robot-server/robot_server/runs/light_control_task.py +++ b/robot-server/robot_server/runs/light_control_task.py @@ -135,7 +135,7 @@ def _get_current_engine_status(self) -> Optional[EngineStatus]: return None current_id = self._engine_store.current_run_id if current_id is not None: - return self._engine_store.engine.state_view.commands.get_status() + return self._engine_store.get_status() return None diff --git a/robot-server/robot_server/runs/router/commands_router.py b/robot-server/robot_server/runs/router/commands_router.py index 563979c2c416..1a5383b820a2 100644 --- a/robot-server/robot_server/runs/router/commands_router.py +++ b/robot-server/robot_server/runs/router/commands_router.py @@ -3,13 +3,10 @@ from typing import Optional, Union from typing_extensions import Final, Literal -from anyio import move_on_after from fastapi import APIRouter, Depends, Query, status - from opentrons.protocol_engine import ( CommandPointer, - ProtocolEngine, commands as pe_commands, errors as pe_errors, ) @@ -33,7 +30,7 @@ from ..run_models import RunCommandSummary from ..run_data_manager import RunDataManager, PreSerializedCommandsNotAvailableError from ..engine_store import EngineStore -from ..run_store import RunStore, CommandNotFoundError +from ..run_store import CommandNotFoundError, RunStore from ..run_models import RunNotFoundError from ..dependencies import get_engine_store, get_run_data_manager, get_run_store from .base_router import RunNotFound, RunStopped @@ -77,12 +74,12 @@ class PreSerializedCommandsNotAvailable(ErrorDetails): ) -async def get_current_run_engine_from_url( +async def get_current_run_from_url( runId: str, engine_store: EngineStore = Depends(get_engine_store), run_store: RunStore = Depends(get_run_store), -) -> ProtocolEngine: - """Get run protocol engine. +) -> str: + """Get run from url. Args: runId: Run ID to associate the command with. @@ -99,7 +96,7 @@ async def get_current_run_engine_from_url( status.HTTP_409_CONFLICT ) - return engine_store.engine + return runId @PydanticResponse.wrap_route( @@ -185,8 +182,9 @@ async def create_run_command( "FIXIT command use only. Reference of the failed command id we are trying to fix." ), ), - protocol_engine: ProtocolEngine = Depends(get_current_run_engine_from_url), + engine_store: EngineStore = Depends(get_engine_store), check_estop: bool = Depends(require_estop_in_good_state), + run_id: str = Depends(get_current_run_from_url), ) -> PydanticResponse[SimpleBody[pe_commands.Command]]: """Enqueue a protocol command. @@ -199,17 +197,22 @@ async def create_run_command( Comes from a query parameter in the URL. failedCommandId: FIXIT command use only. Reference of the failed command id we are trying to fix. - protocol_engine: The run's `ProtocolEngine` on which the new + engine_store: The run's `EngineStore` on which the new command will be enqueued. check_estop: Dependency to verify the estop is in a valid state. + run_id: Run identification to attach command to. """ # TODO(mc, 2022-05-26): increment the HTTP API version so that default # behavior is to pass through `command_intent` without overriding it command_intent = request_body.data.intent or pe_commands.CommandIntent.SETUP command_create = request_body.data.copy(update={"intent": command_intent}) + try: - command = protocol_engine.add_command( - request=command_create, failed_command_id=failedCommandId + command = await engine_store.add_command_and_wait_for_interval( + request=command_create, + failed_command_id=failedCommandId, + wait_until_complete=waitUntilComplete, + timeout=timeout, ) except pe_errors.SetupCommandNotAllowedError as e: @@ -219,12 +222,7 @@ async def create_run_command( except pe_errors.CommandNotAllowedError as e: raise CommandNotAllowed.from_exc(e).as_error(status.HTTP_400_BAD_REQUEST) - if waitUntilComplete: - timeout_sec = None if timeout is None else timeout / 1000.0 - with move_on_after(timeout_sec): - await protocol_engine.wait_for_command(command.id) - - response_data = protocol_engine.state_view.commands.get(command.id) + response_data = engine_store.get_command(command.id) return await PydanticResponse.create( content=SimpleBody.construct(data=response_data), diff --git a/robot-server/robot_server/runs/router/labware_router.py b/robot-server/robot_server/runs/router/labware_router.py index 58e828ca0527..8d50dacc63ed 100644 --- a/robot-server/robot_server/runs/router/labware_router.py +++ b/robot-server/robot_server/runs/router/labware_router.py @@ -64,7 +64,7 @@ async def add_labware_offset( status.HTTP_409_CONFLICT ) - added_offset = engine_store.engine.add_labware_offset(request_body.data) + added_offset = engine_store.add_labware_offset(request_body.data) log.info(f'Added labware offset "{added_offset.id}"' f' to run "{run.id}".') return await PydanticResponse.create( @@ -106,7 +106,7 @@ async def add_labware_definition( status.HTTP_409_CONFLICT ) - uri = engine_store.engine.add_labware_definition(request_body.data) + uri = engine_store.add_labware_definition(request_body.data) log.info(f'Added labware definition "{uri}"' f' to run "{run.id}".') return PydanticResponse( diff --git a/robot-server/robot_server/runs/run_controller.py b/robot-server/robot_server/runs/run_controller.py index e7e55080aed7..03680fcb740e 100644 --- a/robot-server/robot_server/runs/run_controller.py +++ b/robot-server/robot_server/runs/run_controller.py @@ -67,9 +67,9 @@ def create_action( try: if action_type == RunActionType.PLAY: - if self._engine_store.runner.was_started(): + if self._engine_store.run_was_started(): log.info(f'Resuming run "{self._run_id}".') - self._engine_store.runner.play() + self._engine_store.play() else: log.info(f'Starting run "{self._run_id}".') # TODO(mc, 2022-05-13): engine_store.runner.run could raise @@ -83,14 +83,14 @@ def create_action( elif action_type == RunActionType.PAUSE: log.info(f'Pausing run "{self._run_id}".') - self._engine_store.runner.pause() + self._engine_store.pause() elif action_type == RunActionType.STOP: log.info(f'Stopping run "{self._run_id}".') - self._task_runner.run(self._engine_store.runner.stop) + self._task_runner.run(self._engine_store.stop) elif action_type == RunActionType.RESUME_FROM_RECOVERY: - self._engine_store.runner.resume_from_recovery() + self._engine_store.resume_from_recovery() except ProtocolEngineError as e: raise RunActionNotAllowedError(message=e.message, wrapping=[e]) from e @@ -103,7 +103,7 @@ def create_action( async def _run_protocol_and_insert_result( self, deck_configuration: DeckConfigurationType ) -> None: - result = await self._engine_store.runner.run( + result = await self._engine_store.run( deck_configuration=deck_configuration, ) self._run_store.update_run_state( diff --git a/robot-server/robot_server/runs/run_data_manager.py b/robot-server/robot_server/runs/run_data_manager.py index 2fe67a48c27e..7d5651160bc1 100644 --- a/robot-server/robot_server/runs/run_data_manager.py +++ b/robot-server/robot_server/runs/run_data_manager.py @@ -254,9 +254,7 @@ def get_run_loaded_labware_definitions( f"Cannot get load labware definitions of {run_id} because it is not the current run." ) - return ( - self._engine_store.engine.state_view.labware.get_loaded_labware_definitions() - ) + return self._engine_store.get_loaded_labware_definitions() def get_all(self, length: Optional[int]) -> List[Union[Run, BadRun]]: """Get current and stored run resources. @@ -335,9 +333,8 @@ async def update(self, run_id: str, current: Optional[bool]) -> Union[Run, BadRu run_id ) else: - state_summary = self._engine_store.engine.state_view.get_summary() - runner = self._engine_store.runner - parameters = runner.run_time_parameters if runner else [] + state_summary = self._engine_store.get_state_summary() + parameters = self._engine_store.get_run_time_parameters() run_resource = self._run_store.get(run_id=run_id) return _build_run( @@ -364,10 +361,7 @@ def get_commands_slice( RunNotFoundError: The given run identifier was not found in the database. """ if run_id == self._engine_store.current_run_id: - the_slice = self._engine_store.engine.state_view.commands.get_slice( - cursor=cursor, length=length - ) - return the_slice + return self._engine_store.get_command_slice(cursor=cursor, length=length) # Let exception propagate return self._run_store.get_commands_slice( @@ -384,7 +378,7 @@ def get_current_command(self, run_id: str) -> Optional[CommandPointer]: run_id: ID of the run. """ if self._engine_store.current_run_id == run_id: - return self._engine_store.engine.state_view.commands.get_current() + return self._engine_store.get_current_command() else: # todo(mm, 2024-05-20): # For historical runs to behave consistently with the current run, @@ -400,7 +394,7 @@ def get_recovery_target_command(self, run_id: str) -> Optional[CommandPointer]: run_id: ID of the run. """ if self._engine_store.current_run_id == run_id: - return self._engine_store.engine.state_view.commands.get_recovery_target() + return self._engine_store.get_command_recovery_target() else: # Historical runs can't have any ongoing error recovery. return None @@ -417,9 +411,7 @@ def get_command(self, run_id: str, command_id: str) -> Command: CommandNotFoundError: The given command identifier was not found. """ if self._engine_store.current_run_id == run_id: - return self._engine_store.engine.state_view.commands.get( - command_id=command_id - ) + return self._engine_store.get_command(command_id=command_id) return self._run_store.get_command(run_id=run_id, command_id=command_id) @@ -427,7 +419,7 @@ def get_all_commands_as_preserialized_list(self, run_id: str) -> List[str]: """Get all commands of a run in a serialized json list.""" if ( run_id == self._engine_store.current_run_id - and not self._engine_store.engine.state_view.commands.get_is_terminal() + and not self._engine_store.get_is_run_terminal() ): raise PreSerializedCommandsNotAvailableError( "Pre-serialized commands are only available after a run has ended." @@ -436,7 +428,7 @@ def get_all_commands_as_preserialized_list(self, run_id: str) -> List[str]: def _get_state_summary(self, run_id: str) -> Union[StateSummary, BadStateSummary]: if run_id == self._engine_store.current_run_id: - return self._engine_store.engine.state_view.get_summary() + return self._engine_store.get_state_summary() else: return self._run_store.get_state_summary(run_id=run_id) @@ -446,7 +438,6 @@ def _get_good_state_summary(self, run_id: str) -> Optional[StateSummary]: def _get_run_time_parameters(self, run_id: str) -> List[RunTimeParameter]: if run_id == self._engine_store.current_run_id: - runner = self._engine_store.runner - return runner.run_time_parameters if runner else [] + return self._engine_store.get_run_time_parameters() else: return self._run_store.get_run_time_parameters(run_id=run_id) diff --git a/robot-server/tests/commands/test_get_default_engine.py b/robot-server/tests/commands/test_get_default_engine.py index 7e687501218c..c99b82e2c674 100644 --- a/robot-server/tests/commands/test_get_default_engine.py +++ b/robot-server/tests/commands/test_get_default_engine.py @@ -1,21 +1,21 @@ -"""Tests for robot_server.commands.get_default_engine.""" +"""Tests for robot_server.commands.get_default_orchestrator.""" import pytest from decoy import Decoy from opentrons.hardware_control import HardwareControlAPI from opentrons.hardware_control.modules import MagDeck, TempDeck -from opentrons.protocol_engine import ProtocolEngine +from opentrons.protocol_runner import RunOrchestrator from robot_server.errors.error_responses import ApiError from robot_server.runs.engine_store import EngineStore, EngineConflictError from robot_server.modules.module_identifier import ModuleIdentifier, ModuleIdentity -from robot_server.commands.get_default_engine import get_default_engine +from robot_server.commands.get_default_orchestrator import get_default_orchestrator @pytest.fixture() -def protocol_engine(decoy: Decoy) -> ProtocolEngine: +def run_orchestrator(decoy: Decoy) -> RunOrchestrator: """Get a mocked out ProtocolEngine.""" - return decoy.mock(cls=ProtocolEngine) + return decoy.mock(cls=RunOrchestrator) @pytest.fixture() @@ -30,11 +30,11 @@ def module_identifier(decoy: Decoy) -> ModuleIdentifier: return decoy.mock(cls=ModuleIdentifier) -async def test_get_default_engine( +async def test_get_default_orchestrator( decoy: Decoy, engine_store: EngineStore, hardware_api: HardwareControlAPI, - protocol_engine: ProtocolEngine, + run_orchestrator: RunOrchestrator, module_identifier: ModuleIdentifier, ) -> None: """It should get a default engine with modules pre-loaded.""" @@ -63,30 +63,32 @@ async def test_get_default_engine( decoy.when(hardware_api.attached_modules).then_return([mod_1, mod_2]) - decoy.when(await engine_store.get_default_engine()).then_return(protocol_engine) + decoy.when(await engine_store.get_default_orchestrator()).then_return( + run_orchestrator + ) - result = await get_default_engine( + result = await get_default_orchestrator( engine_store=engine_store, hardware_api=hardware_api, module_identifier=module_identifier, ) - assert result is protocol_engine + assert result is run_orchestrator decoy.verify( - await protocol_engine.use_attached_modules({"mod-1": mod_1, "mod-2": mod_2}), + await run_orchestrator.use_attached_modules({"mod-1": mod_1, "mod-2": mod_2}), times=1, ) async def test_raises_conflict(decoy: Decoy, engine_store: EngineStore) -> None: """It should raise a 409 conflict if the default engine is not availble.""" - decoy.when(await engine_store.get_default_engine()).then_raise( + decoy.when(await engine_store.get_default_orchestrator()).then_raise( EngineConflictError("oh no") ) with pytest.raises(ApiError) as exc_info: - await get_default_engine(engine_store=engine_store) + await get_default_orchestrator(engine_store=engine_store) assert exc_info.value.status_code == 409 assert exc_info.value.content["errors"][0]["id"] == "RunActive" diff --git a/robot-server/tests/commands/test_router.py b/robot-server/tests/commands/test_router.py index 2d8dc6ac435a..259af673fe90 100644 --- a/robot-server/tests/commands/test_router.py +++ b/robot-server/tests/commands/test_router.py @@ -4,12 +4,12 @@ from decoy import Decoy from opentrons.protocol_engine import ( - ProtocolEngine, CommandSlice, CommandPointer, commands as pe_commands, ) from opentrons.protocol_engine.errors import CommandDoesNotExistError +from opentrons.protocol_runner import RunOrchestrator from robot_server.service.json_api import MultiBodyMeta from robot_server.errors.error_responses import ApiError @@ -22,14 +22,14 @@ @pytest.fixture() -def protocol_engine(decoy: Decoy) -> ProtocolEngine: - """Get a mocked out ProtocolEngine.""" - return decoy.mock(cls=ProtocolEngine) +def run_orchestrator(decoy: Decoy) -> RunOrchestrator: + """Get a mocked out RunOrchestrator.""" + return decoy.mock(cls=RunOrchestrator) async def test_create_command( decoy: Decoy, - protocol_engine: ProtocolEngine, + run_orchestrator: RunOrchestrator, ) -> None: """It should be able to create a command.""" command_create = pe_commands.HomeCreate(params=pe_commands.HomeParams()) @@ -43,17 +43,17 @@ async def test_create_command( ) def _stub_queued_command_state(*_a: object, **_k: object) -> pe_commands.Command: - decoy.when(protocol_engine.state_view.commands.get("abc123")).then_return( - queued_command - ) + decoy.when(run_orchestrator.get_command("abc123")).then_return(queued_command) return queued_command decoy.when( - protocol_engine.add_command( - pe_commands.HomeCreate( + await run_orchestrator.add_command_and_wait_for_interval( + command=pe_commands.HomeCreate( params=pe_commands.HomeParams(), intent=pe_commands.CommandIntent.SETUP, - ) + ), + wait_until_complete=False, + timeout=42, ) ).then_do(_stub_queued_command_state) @@ -61,31 +61,23 @@ def _stub_queued_command_state(*_a: object, **_k: object) -> pe_commands.Command RequestModelWithStatelessCommandCreate(data=command_create), waitUntilComplete=False, timeout=42, - engine=protocol_engine, + orchestrator=run_orchestrator, ) assert result.content.data == queued_command assert result.status_code == 201 - decoy.verify(await protocol_engine.wait_for_command("abc123"), times=0) async def test_create_command_wait_for_complete( decoy: Decoy, - protocol_engine: ProtocolEngine, + run_orchestrator: RunOrchestrator, ) -> None: """It should be able to create a command.""" command_create = pe_commands.HomeCreate( params=pe_commands.HomeParams(), intent=pe_commands.CommandIntent.SETUP, ) - queued_command = pe_commands.Home( - id="abc123", - key="command-key", - createdAt=datetime(year=2021, month=1, day=1), - status=pe_commands.CommandStatus.QUEUED, - params=pe_commands.HomeParams(), - result=None, - ) + completed_command = pe_commands.Home( id="abc123", key="command-key", @@ -96,30 +88,21 @@ async def test_create_command_wait_for_complete( result=None, ) - def _stub_queued_command_state(*_a: object, **_k: object) -> pe_commands.Command: - decoy.when(protocol_engine.state_view.commands.get("abc123")).then_return( - queued_command + decoy.when( + await run_orchestrator.add_command_and_wait_for_interval( + command=command_create, + wait_until_complete=True, + timeout=42, ) - return queued_command + ).then_return(completed_command) - def _stub_completed_command_state(*_a: object, **_k: object) -> None: - decoy.when(protocol_engine.state_view.commands.get("abc123")).then_return( - completed_command - ) - - decoy.when(protocol_engine.add_command(command_create)).then_do( - _stub_queued_command_state - ) - - decoy.when(await protocol_engine.wait_for_command("abc123")).then_do( - _stub_completed_command_state - ) + decoy.when(run_orchestrator.get_command("abc123")).then_return(completed_command) result = await create_command( RequestModelWithStatelessCommandCreate(data=command_create), waitUntilComplete=True, timeout=42, - engine=protocol_engine, + orchestrator=run_orchestrator, ) assert result.content.data == completed_command @@ -128,7 +111,7 @@ def _stub_completed_command_state(*_a: object, **_k: object) -> None: async def test_get_commands_list( decoy: Decoy, - protocol_engine: ProtocolEngine, + run_orchestrator: RunOrchestrator, ) -> None: """It should get a list of commands.""" command_1 = pe_commands.Home( @@ -146,7 +129,7 @@ async def test_get_commands_list( params=pe_commands.HomeParams(), ) - decoy.when(protocol_engine.state_view.commands.get_current()).then_return( + decoy.when(run_orchestrator.get_current_command()).then_return( CommandPointer( command_id="abc123", command_key="command-key-1", @@ -154,14 +137,12 @@ async def test_get_commands_list( index=0, ) ) - decoy.when( - protocol_engine.state_view.commands.get_slice(cursor=1337, length=42) - ).then_return( + decoy.when(run_orchestrator.get_command_slice(cursor=1337, length=42)).then_return( CommandSlice(commands=[command_1, command_2], cursor=0, total_length=2) ) result = await get_commands_list( - engine=protocol_engine, + orchestrator=run_orchestrator, cursor=1337, pageLength=42, ) @@ -173,7 +154,7 @@ async def test_get_commands_list( async def test_get_command( decoy: Decoy, - protocol_engine: ProtocolEngine, + run_orchestrator: RunOrchestrator, ) -> None: """It should get a single command by ID.""" command_1 = pe_commands.Home( @@ -184,9 +165,9 @@ async def test_get_command( params=pe_commands.HomeParams(), ) - decoy.when(protocol_engine.state_view.commands.get("abc123")).then_return(command_1) + decoy.when(run_orchestrator.get_command("abc123")).then_return(command_1) - result = await get_command(commandId="abc123", engine=protocol_engine) + result = await get_command(commandId="abc123", orchestrator=run_orchestrator) assert result.content.data == command_1 assert result.status_code == 200 @@ -194,15 +175,15 @@ async def test_get_command( async def test_get_command_not_found( decoy: Decoy, - protocol_engine: ProtocolEngine, + run_orchestrator: RunOrchestrator, ) -> None: """It should raise a 404 if command is not found.""" - decoy.when(protocol_engine.state_view.commands.get("abc123")).then_raise( + decoy.when(run_orchestrator.get_command("abc123")).then_raise( CommandDoesNotExistError("oh no") ) with pytest.raises(ApiError) as exc_info: - await get_command(commandId="abc123", engine=protocol_engine) + await get_command(commandId="abc123", orchestrator=run_orchestrator) assert exc_info.value.status_code == 404 assert exc_info.value.content["errors"][0]["id"] == "StatelessCommandNotFound" diff --git a/robot-server/tests/runs/router/test_commands_router.py b/robot-server/tests/runs/router/test_commands_router.py index d0cf29ecd85c..eab4a8a516c0 100644 --- a/robot-server/tests/runs/router/test_commands_router.py +++ b/robot-server/tests/runs/router/test_commands_router.py @@ -7,7 +7,6 @@ from opentrons.protocol_engine import ( CommandSlice, CommandPointer, - ProtocolEngine, CommandNote, commands as pe_commands, errors as pe_errors, @@ -22,7 +21,7 @@ CommandLink, CommandLinkMeta, ) -from robot_server.runs.run_store import RunStore, CommandNotFoundError +from robot_server.runs.run_store import CommandNotFoundError, RunStore from robot_server.runs.engine_store import EngineStore from robot_server.runs.run_data_manager import RunDataManager from robot_server.runs.run_models import RunCommandSummary, RunNotFoundError @@ -30,11 +29,11 @@ create_run_command, get_run_command, get_run_commands, - get_current_run_engine_from_url, + get_current_run_from_url, ) -async def test_get_current_run_engine_from_url( +async def test_get_current_run_from_url( decoy: Decoy, mock_engine_store: EngineStore, mock_run_store: RunStore, @@ -43,16 +42,16 @@ async def test_get_current_run_engine_from_url( decoy.when(mock_run_store.has("run-id")).then_return(True) decoy.when(mock_engine_store.current_run_id).then_return("run-id") - result = await get_current_run_engine_from_url( + result = await get_current_run_from_url( runId="run-id", engine_store=mock_engine_store, run_store=mock_run_store, ) - assert result is mock_engine_store.engine + assert result == "run-id" -async def test_get_current_run_engine_no_run( +async def test_get_current_run_no_run( decoy: Decoy, mock_engine_store: EngineStore, mock_run_store: RunStore, @@ -61,7 +60,7 @@ async def test_get_current_run_engine_no_run( decoy.when(mock_run_store.has("run-id")).then_return(False) with pytest.raises(ApiError) as exc_info: - await get_current_run_engine_from_url( + await get_current_run_from_url( runId="run-id", engine_store=mock_engine_store, run_store=mock_run_store, @@ -71,7 +70,7 @@ async def test_get_current_run_engine_no_run( assert exc_info.value.content["errors"][0]["id"] == "RunNotFound" -async def test_get_current_run_engine_from_url_not_current( +async def test_get_current_run_from_url_not_current( decoy: Decoy, mock_engine_store: EngineStore, mock_run_store: RunStore, @@ -81,7 +80,7 @@ async def test_get_current_run_engine_from_url_not_current( decoy.when(mock_engine_store.current_run_id).then_return("some-other-run-id") with pytest.raises(ApiError) as exc_info: - await get_current_run_engine_from_url( + await get_current_run_from_url( runId="run-id", engine_store=mock_engine_store, run_store=mock_run_store, @@ -93,7 +92,7 @@ async def test_get_current_run_engine_from_url_not_current( async def test_create_run_command( decoy: Decoy, - mock_protocol_engine: ProtocolEngine, + mock_engine_store: EngineStore, ) -> None: """It should add the requested command to the ProtocolEngine and return it.""" command_request = pe_commands.WaitForResumeCreate( @@ -109,17 +108,19 @@ async def test_create_run_command( ) def _stub_queued_command_state(*_a: object, **_k: object) -> pe_commands.Command: - decoy.when( - mock_protocol_engine.state_view.commands.get("command-id") - ).then_return(command_once_added) + decoy.when(mock_engine_store.get_command("command-id")).then_return( + command_once_added + ) return command_once_added decoy.when( - mock_protocol_engine.add_command( + await mock_engine_store.add_command_and_wait_for_interval( request=pe_commands.WaitForResumeCreate( params=pe_commands.WaitForResumeParams(message="Hello"), intent=pe_commands.CommandIntent.SETUP, ), + wait_until_complete=False, + timeout=12, failed_command_id=None, ) ).then_do(_stub_queued_command_state) @@ -127,24 +128,24 @@ def _stub_queued_command_state(*_a: object, **_k: object) -> pe_commands.Command result = await create_run_command( request_body=RequestModelWithCommandCreate(data=command_request), waitUntilComplete=False, - protocol_engine=mock_protocol_engine, + engine_store=mock_engine_store, failedCommandId=None, + timeout=12, ) assert result.content.data == command_once_added assert result.status_code == 201 - decoy.verify(await mock_protocol_engine.wait_for_command("command-id"), times=0) async def test_create_command_with_failed_command_raises( decoy: Decoy, - mock_protocol_engine: ProtocolEngine, + mock_engine_store: EngineStore, ) -> None: """It should return 400 bad request.""" command_create = pe_commands.HomeCreate(params=pe_commands.HomeParams()) decoy.when( - mock_protocol_engine.add_command( + mock_engine_store.add_command_and_wait_for_interval( pe_commands.HomeCreate( params=pe_commands.HomeParams(), intent=pe_commands.CommandIntent.SETUP, @@ -158,14 +159,14 @@ async def test_create_command_with_failed_command_raises( RequestModelWithCommandCreate(data=command_create), waitUntilComplete=False, timeout=42, - protocol_engine=mock_protocol_engine, + engine_store=mock_engine_store, failedCommandId="123", ) async def test_create_run_command_blocking_completion( decoy: Decoy, - mock_protocol_engine: ProtocolEngine, + mock_engine_store: EngineStore, ) -> None: """It should be able to create a command and wait for it to execute.""" command_request = pe_commands.WaitForResumeCreate( @@ -173,14 +174,6 @@ async def test_create_run_command_blocking_completion( intent=pe_commands.CommandIntent.PROTOCOL, ) - command_once_added = pe_commands.WaitForResume( - id="command-id", - key="command-key", - createdAt=datetime(year=2021, month=1, day=1), - status=pe_commands.CommandStatus.QUEUED, - params=pe_commands.WaitForResumeParams(message="Hello"), - ) - command_once_completed = pe_commands.WaitForResume( id="command-id", key="command-key", @@ -190,41 +183,35 @@ async def test_create_run_command_blocking_completion( result=pe_commands.WaitForResumeResult(), ) - def _stub_queued_command_state(*_a: object, **_k: object) -> pe_commands.Command: - decoy.when( - mock_protocol_engine.state_view.commands.get("command-id") - ).then_return(command_once_added) - - return command_once_added - - def _stub_completed_command_state(*_a: object, **_k: object) -> None: - decoy.when( - mock_protocol_engine.state_view.commands.get("command-id") - ).then_return(command_once_completed) - - decoy.when(mock_protocol_engine.add_command(command_request, None)).then_do( - _stub_queued_command_state - ) + decoy.when( + await mock_engine_store.add_command_and_wait_for_interval( + request=command_request, + failed_command_id=None, + wait_until_complete=True, + timeout=999, + ) + ).then_return(command_once_completed) - decoy.when(await mock_protocol_engine.wait_for_command("command-id")).then_do( - _stub_completed_command_state + decoy.when(mock_engine_store.get_command("command-id")).then_return( + command_once_completed ) result = await create_run_command( request_body=RequestModelWithCommandCreate(data=command_request), waitUntilComplete=True, timeout=999, - protocol_engine=mock_protocol_engine, + engine_store=mock_engine_store, failedCommandId=None, ) + print(result.content.data) assert result.content.data == command_once_completed assert result.status_code == 201 async def test_add_conflicting_setup_command( decoy: Decoy, - mock_protocol_engine: ProtocolEngine, + mock_engine_store: EngineStore, ) -> None: """It should raise an error if the setup command cannot be added.""" command_request = pe_commands.WaitForResumeCreate( @@ -232,15 +219,17 @@ async def test_add_conflicting_setup_command( intent=pe_commands.CommandIntent.SETUP, ) - decoy.when(mock_protocol_engine.add_command(command_request, None)).then_raise( - pe_errors.SetupCommandNotAllowedError("oh no") - ) + decoy.when( + mock_engine_store.add_command_and_wait_for_interval( + request=command_request, failed_command_id=None + ) + ).then_raise(pe_errors.SetupCommandNotAllowedError("oh no")) with pytest.raises(ApiError) as exc_info: await create_run_command( request_body=RequestModelWithCommandCreate(data=command_request), waitUntilComplete=False, - protocol_engine=mock_protocol_engine, + engine_store=mock_engine_store, failedCommandId=None, ) @@ -253,7 +242,7 @@ async def test_add_conflicting_setup_command( async def test_add_command_to_stopped_engine( decoy: Decoy, - mock_protocol_engine: ProtocolEngine, + mock_engine_store: EngineStore, ) -> None: """It should raise an error if the setup command cannot be added.""" command_request = pe_commands.HomeCreate( @@ -261,15 +250,17 @@ async def test_add_command_to_stopped_engine( intent=pe_commands.CommandIntent.SETUP, ) - decoy.when(mock_protocol_engine.add_command(command_request, None)).then_raise( - pe_errors.RunStoppedError("oh no") - ) + decoy.when( + mock_engine_store.add_command_and_wait_for_interval( + request=command_request, failed_command_id=None + ) + ).then_raise(pe_errors.RunStoppedError("oh no")) with pytest.raises(ApiError) as exc_info: await create_run_command( request_body=RequestModelWithCommandCreate(data=command_request), waitUntilComplete=False, - protocol_engine=mock_protocol_engine, + engine_store=mock_engine_store, failedCommandId=None, ) diff --git a/robot-server/tests/runs/router/test_labware_router.py b/robot-server/tests/runs/router/test_labware_router.py index 3bcf763a42d3..13e467ca0f22 100644 --- a/robot-server/tests/runs/router/test_labware_router.py +++ b/robot-server/tests/runs/router/test_labware_router.py @@ -70,7 +70,7 @@ async def test_add_labware_offset( ) decoy.when( - mock_engine_store.engine.add_labware_offset(labware_offset_request) + mock_engine_store.add_labware_offset(labware_offset_request) ).then_return(labware_offset) result = await add_labware_offset( @@ -118,7 +118,7 @@ async def test_add_labware_definition( uri = pe_types.LabwareUri("some/definition/uri") decoy.when( - mock_engine_store.engine.add_labware_definition(labware_definition) + mock_engine_store.add_labware_definition(labware_definition) ).then_return(uri) result = await add_labware_definition( diff --git a/robot-server/tests/runs/test_engine_store.py b/robot-server/tests/runs/test_engine_store.py index 49c474b2ce9b..45d6f53b71d8 100644 --- a/robot-server/tests/runs/test_engine_store.py +++ b/robot-server/tests/runs/test_engine_store.py @@ -10,15 +10,17 @@ from opentrons.types import DeckSlotName from opentrons.hardware_control import HardwareControlAPI, API from opentrons.hardware_control.types import EstopStateNotification, EstopState -from opentrons.protocol_engine import ProtocolEngine, StateSummary, types as pe_types -from opentrons.protocol_runner import RunResult, LiveRunner, JsonRunner +from opentrons.protocol_engine import ( + StateSummary, + types as pe_types, +) +from opentrons.protocol_runner import RunResult, RunOrchestrator from opentrons.protocol_reader import ProtocolReader, ProtocolSource -from robot_server.protocols.protocol_store import ProtocolResource from robot_server.runs.engine_store import ( EngineStore, EngineConflictError, - NoRunnerEngineError, + NoRunOrchestrator, handle_estop_event, ) @@ -61,37 +63,8 @@ async def test_create_engine(decoy: Decoy, subject: EngineStore) -> None: assert subject.current_run_id == "run-id" assert isinstance(result, StateSummary) - assert isinstance(subject.runner, LiveRunner) - assert isinstance(subject.engine, ProtocolEngine) - - -async def test_create_engine_with_protocol( - subject: EngineStore, - json_protocol_source: ProtocolSource, -) -> None: - """It should create an engine for a run with protocol. - - Tests only basic engine & runner creation with creation result. - Loading of protocols/ live run commands is tested in integration test. - """ - protocol = ProtocolResource( - protocol_id="my cool protocol", - protocol_key=None, - created_at=datetime(year=2021, month=1, day=1), - source=json_protocol_source, - ) - - result = await subject.create( - run_id="run-id", - labware_offsets=[], - deck_configuration=[], - protocol=protocol, - notify_publishers=mock_notify_publishers, - ) - assert subject.current_run_id == "run-id" - assert isinstance(result, StateSummary) - assert isinstance(subject.runner, JsonRunner) - assert isinstance(subject.engine, ProtocolEngine) + assert subject._run_orchestrator is not None + assert isinstance(subject._run_orchestrator, RunOrchestrator) @pytest.mark.parametrize("robot_type", ["OT-2 Standard", "OT-3 Standard"]) @@ -115,7 +88,7 @@ async def test_create_engine_uses_robot_type( notify_publishers=mock_notify_publishers, ) - assert subject.engine.state_view.config.robot_type == robot_type + assert subject._run_orchestrator is not None async def test_create_engine_with_labware_offsets(subject: EngineStore) -> None: @@ -176,17 +149,14 @@ async def test_clear_engine(subject: EngineStore) -> None: protocol=None, notify_publishers=mock_notify_publishers, ) - await subject.runner.run(deck_configuration=[]) + assert subject._run_orchestrator is not None result = await subject.clear() assert subject.current_run_id is None assert isinstance(result, RunResult) - with pytest.raises(NoRunnerEngineError): - subject.engine - - with pytest.raises(NoRunnerEngineError): - subject.runner + with pytest.raises(NoRunOrchestrator): + subject.run_orchestrator async def test_clear_engine_not_stopped_or_idle( @@ -200,8 +170,8 @@ async def test_clear_engine_not_stopped_or_idle( protocol=None, notify_publishers=mock_notify_publishers, ) - subject.runner.play(deck_configuration=[]) - + assert subject._run_orchestrator is not None + subject._run_orchestrator.play(deck_configuration=[]) with pytest.raises(EngineConflictError): await subject.clear() @@ -215,30 +185,27 @@ async def test_clear_idle_engine(subject: EngineStore) -> None: protocol=None, notify_publishers=mock_notify_publishers, ) - assert subject.engine is not None - assert subject.runner is not None + assert subject._run_orchestrator is not None await subject.clear() # TODO: test engine finish is called - with pytest.raises(NoRunnerEngineError): - subject.engine - with pytest.raises(NoRunnerEngineError): - subject.runner + with pytest.raises(NoRunOrchestrator): + subject.run_orchestrator -async def test_get_default_engine_idempotent(subject: EngineStore) -> None: +async def test_get_default_orchestrator_idempotent(subject: EngineStore) -> None: """It should create and retrieve the same default ProtocolEngine.""" - result = await subject.get_default_engine() - repeated_result = await subject.get_default_engine() + result = await subject.get_default_orchestrator() + repeated_result = await subject.get_default_orchestrator() - assert isinstance(result, ProtocolEngine) + assert isinstance(result, RunOrchestrator) assert repeated_result is result @pytest.mark.parametrize("robot_type", ["OT-2 Standard", "OT-3 Standard"]) @pytest.mark.parametrize("deck_type", pe_types.DeckType) -async def test_get_default_engine_robot_type( +async def test_get_default_orchestrator_robot_type( decoy: Decoy, robot_type: RobotType, deck_type: pe_types.DeckType ) -> None: """It should create default ProtocolEngines with the given robot and deck type.""" @@ -251,12 +218,12 @@ async def test_get_default_engine_robot_type( deck_type=deck_type, ) - result = await subject.get_default_engine() + result = await subject.get_default_orchestrator() - assert result.state_view.config.robot_type == robot_type + assert result.get_robot_type() == robot_type -async def test_get_default_engine_current_unstarted(subject: EngineStore) -> None: +async def test_get_default_orchestrator_current_unstarted(subject: EngineStore) -> None: """It should allow a default engine if another engine current but unstarted.""" await subject.create( run_id="run-id", @@ -266,11 +233,11 @@ async def test_get_default_engine_current_unstarted(subject: EngineStore) -> Non notify_publishers=mock_notify_publishers, ) - result = await subject.get_default_engine() - assert isinstance(result, ProtocolEngine) + result = await subject.get_default_orchestrator() + assert isinstance(result, RunOrchestrator) -async def test_get_default_engine_conflict(subject: EngineStore) -> None: +async def test_get_default_orchestrator_conflict(subject: EngineStore) -> None: """It should not allow a default engine if another engine is executing commands.""" await subject.create( run_id="run-id", @@ -279,13 +246,13 @@ async def test_get_default_engine_conflict(subject: EngineStore) -> None: protocol=None, notify_publishers=mock_notify_publishers, ) - subject.engine.play() + subject.play() with pytest.raises(EngineConflictError): - await subject.get_default_engine() + await subject.get_default_orchestrator() -async def test_get_default_engine_run_stopped(subject: EngineStore) -> None: +async def test_get_default_orchestrator_run_stopped(subject: EngineStore) -> None: """It allow a default engine if another engine is terminal.""" await subject.create( run_id="run-id", @@ -294,10 +261,10 @@ async def test_get_default_engine_run_stopped(subject: EngineStore) -> None: protocol=None, notify_publishers=mock_notify_publishers, ) - await subject.engine.finish() + await subject.finish(error=None) - result = await subject.get_default_engine() - assert isinstance(result, ProtocolEngine) + result = await subject.get_default_orchestrator() + assert isinstance(result, RunOrchestrator) async def test_estop_callback( @@ -315,21 +282,25 @@ async def test_estop_callback( decoy.when(engine_store.current_run_id).then_return(None) await handle_estop_event(engine_store, disengage_event) + assert engine_store.run_orchestrator is not None decoy.verify( - engine_store.engine.estop(), + engine_store.run_orchestrator.estop(), ignore_extra_args=True, times=0, ) decoy.verify( - await engine_store.engine.finish(), + await engine_store.finish(error=None), ignore_extra_args=True, times=0, ) decoy.when(engine_store.current_run_id).then_return("fake-run-id") await handle_estop_event(engine_store, engage_event) + assert engine_store._run_orchestrator is not None decoy.verify( - engine_store.engine.estop(), - await engine_store.engine.finish(error=matchers.IsA(EStopActivatedError)), + engine_store.run_orchestrator.estop(), + await engine_store.run_orchestrator.finish( + error=matchers.IsA(EStopActivatedError) + ), times=1, ) diff --git a/robot-server/tests/runs/test_light_control_task.py b/robot-server/tests/runs/test_light_control_task.py index 6d4c89e69dec..845a09b25a93 100644 --- a/robot-server/tests/runs/test_light_control_task.py +++ b/robot-server/tests/runs/test_light_control_task.py @@ -57,7 +57,7 @@ async def test_get_current_status_ot2( ) -> None: """Test LightController.get_current_status.""" decoy.when(engine_store.current_run_id).then_return("fake_id" if active else None) - decoy.when(engine_store.engine.state_view.commands.get_status()).then_return(status) + decoy.when(engine_store.get_status()).then_return(status) decoy.when(hardware_api.get_estop_state()).then_return(estop) expected = Status( @@ -198,9 +198,7 @@ async def test_provide_engine_store( ) decoy.when(engine_store.current_run_id).then_return("fake_id") - decoy.when(engine_store.engine.state_view.commands.get_status()).then_return( - EngineStatus.RUNNING - ) + decoy.when(engine_store.get_status()).then_return(EngineStatus.RUNNING) subject.update_engine_store(engine_store=engine_store) assert subject.get_current_status() == Status( diff --git a/robot-server/tests/runs/test_run_controller.py b/robot-server/tests/runs/test_run_controller.py index 71fc92f84660..6e3461f0d105 100644 --- a/robot-server/tests/runs/test_run_controller.py +++ b/robot-server/tests/runs/test_run_controller.py @@ -12,7 +12,7 @@ errors as pe_errors, ) from opentrons.protocol_engine.types import RunTimeParameter, BooleanParameter -from opentrons.protocol_runner import RunResult, JsonRunner, PythonAndLegacyRunner +from opentrons.protocol_runner import RunResult from robot_server.service.notifications import RunsPublisher from robot_server.service.task_runner import TaskRunner @@ -117,9 +117,7 @@ async def test_create_play_action_to_resume( subject: RunController, ) -> None: """It should resume a run.""" - mock_json_runner = decoy.mock(cls=JsonRunner) - decoy.when(mock_engine_store.runner).then_return(mock_json_runner) - decoy.when(mock_json_runner.was_started()).then_return(True) + decoy.when(mock_engine_store.run_was_started()).then_return(True) result = subject.create_action( action_id="some-action-id", @@ -135,8 +133,8 @@ async def test_create_play_action_to_resume( ) decoy.verify(mock_run_store.insert_action(run_id, result), times=1) - decoy.verify(mock_json_runner.play(), times=1) - decoy.verify(await mock_json_runner.run(deck_configuration=[]), times=0) + decoy.verify(mock_engine_store.play(), times=1) + decoy.verify(await mock_engine_store.run(deck_configuration=[]), times=0) async def test_create_play_action_to_start( @@ -152,9 +150,7 @@ async def test_create_play_action_to_start( subject: RunController, ) -> None: """It should start a run.""" - mock_python_runner = decoy.mock(cls=PythonAndLegacyRunner) - decoy.when(mock_engine_store.runner).then_return(mock_python_runner) - decoy.when(mock_python_runner.was_started()).then_return(False) + decoy.when(mock_engine_store.run_was_started()).then_return(False) result = subject.create_action( action_id="some-action-id", @@ -174,7 +170,7 @@ async def test_create_play_action_to_start( background_task_captor = matchers.Captor() decoy.verify(mock_task_runner.run(background_task_captor, deck_configuration=[])) - decoy.when(await mock_python_runner.run(deck_configuration=[])).then_return( + decoy.when(await mock_engine_store.run(deck_configuration=[])).then_return( RunResult( commands=protocol_commands, state_summary=engine_state_summary, @@ -218,7 +214,7 @@ def test_create_pause_action( ) decoy.verify(mock_run_store.insert_action(run_id, result), times=1) - decoy.verify(mock_engine_store.runner.pause(), times=1) + decoy.verify(mock_engine_store.pause(), times=1) def test_create_stop_action( @@ -244,7 +240,7 @@ def test_create_stop_action( ) decoy.verify(mock_run_store.insert_action(run_id, result), times=1) - decoy.verify(mock_task_runner.run(mock_engine_store.runner.stop), times=1) + decoy.verify(mock_task_runner.run(mock_engine_store.stop), times=1) def test_create_resume_from_recovery_action( @@ -270,7 +266,7 @@ def test_create_resume_from_recovery_action( ) decoy.verify(mock_run_store.insert_action(run_id, result), times=1) - decoy.verify(mock_engine_store.runner.resume_from_recovery()) + decoy.verify(mock_engine_store.resume_from_recovery()) @pytest.mark.parametrize( @@ -292,9 +288,9 @@ async def test_action_not_allowed( exception: Exception, ) -> None: """It should raise a RunActionNotAllowedError if a play/pause action is rejected.""" - decoy.when(mock_engine_store.runner.was_started()).then_return(True) - decoy.when(mock_engine_store.runner.play()).then_raise(exception) - decoy.when(mock_engine_store.runner.pause()).then_raise(exception) + decoy.when(mock_engine_store.run_was_started()).then_return(True) + decoy.when(mock_engine_store.play()).then_raise(exception) + decoy.when(mock_engine_store.pause()).then_raise(exception) with pytest.raises(RunActionNotAllowedError, match="oh no"): subject.create_action( diff --git a/robot-server/tests/runs/test_run_data_manager.py b/robot-server/tests/runs/test_run_data_manager.py index 100f57a4fef1..853110e0ca5c 100644 --- a/robot-server/tests/runs/test_run_data_manager.py +++ b/robot-server/tests/runs/test_run_data_manager.py @@ -324,10 +324,8 @@ async def test_get_current_run( decoy.when(mock_run_store.get(run_id=run_id)).then_return(run_resource) decoy.when(mock_engine_store.current_run_id).then_return(run_id) - decoy.when(mock_engine_store.engine.state_view.get_summary()).then_return( - engine_state_summary - ) - decoy.when(mock_engine_store.runner.run_time_parameters).then_return( + decoy.when(mock_engine_store.get_state_summary()).then_return(engine_state_summary) + decoy.when(mock_engine_store.get_run_time_parameters()).then_return( run_time_parameters ) @@ -493,10 +491,8 @@ async def test_get_all_runs( ) decoy.when(mock_engine_store.current_run_id).then_return("current-run") - decoy.when(mock_engine_store.engine.state_view.get_summary()).then_return( - current_run_data - ) - decoy.when(mock_engine_store.runner.run_time_parameters).then_return( + decoy.when(mock_engine_store.get_state_summary()).then_return(current_run_data) + decoy.when(mock_engine_store.get_run_time_parameters()).then_return( current_run_time_parameters ) decoy.when(mock_run_store.get_state_summary("historical-run")).then_return( @@ -649,10 +645,8 @@ async def test_update_current_noop( """It should noop on current=None and current=True.""" run_id = "hello world" decoy.when(mock_engine_store.current_run_id).then_return(run_id) - decoy.when(mock_engine_store.engine.state_view.get_summary()).then_return( - engine_state_summary - ) - decoy.when(mock_engine_store.runner.run_time_parameters).then_return( + decoy.when(mock_engine_store.get_state_summary()).then_return(engine_state_summary) + decoy.when(mock_engine_store.get_run_time_parameters()).then_return( run_time_parameters ) decoy.when(mock_run_store.get(run_id=run_id)).then_return(run_resource) @@ -819,9 +813,9 @@ def test_get_commands_slice_current_run( commands=expected_commands_result, cursor=1, total_length=3 ) decoy.when(mock_engine_store.current_run_id).then_return("run-id") - decoy.when( - mock_engine_store.engine.state_view.commands.get_slice(1, 2) - ).then_return(expected_command_slice) + decoy.when(mock_engine_store.get_command_slice(1, 2)).then_return( + expected_command_slice + ) result = subject.get_commands_slice("run-id", 1, 2) @@ -854,9 +848,7 @@ def test_get_current_command( index=0, ) decoy.when(mock_engine_store.current_run_id).then_return("run-id") - decoy.when(mock_engine_store.engine.state_view.commands.get_current()).then_return( - expected_current - ) + decoy.when(mock_engine_store.get_current_command()).then_return(expected_current) result = subject.get_current_command("run-id") assert result == expected_current @@ -884,9 +876,7 @@ def test_get_command_from_engine( ) -> None: """Should get command by id from engine store.""" decoy.when(mock_engine_store.current_run_id).then_return("run-id") - decoy.when( - mock_engine_store.engine.state_view.commands.get("command-id") - ).then_return(run_command) + decoy.when(mock_engine_store.get_command("command-id")).then_return(run_command) result = subject.get_command("run-id", "command-id") assert result == run_command @@ -968,9 +958,7 @@ def test_get_all_commands_as_preserialized_list_errors_for_active_runs( ) -> None: """It should raise an error when fetching pre-serialized commands list while run is active.""" decoy.when(mock_engine_store.current_run_id).then_return("current-run-id") - decoy.when( - mock_engine_store.engine.state_view.commands.get_is_terminal() - ).then_return(False) + decoy.when(mock_engine_store.get_is_run_terminal()).then_return(False) with pytest.raises(PreSerializedCommandsNotAvailableError): subject.get_all_commands_as_preserialized_list("current-run-id") @@ -984,9 +972,7 @@ async def test_get_current_run_labware_definition( ) -> None: """It should get the current run labware definition from the engine.""" decoy.when(mock_engine_store.current_run_id).then_return("run-id") - decoy.when( - mock_engine_store.engine.state_view.labware.get_loaded_labware_definitions() - ).then_return( + decoy.when(mock_engine_store.get_loaded_labware_definitions()).then_return( [ LabwareDefinition.construct(namespace="test_1"), # type: ignore[call-arg] LabwareDefinition.construct(namespace="test_2"), # type: ignore[call-arg]