diff --git a/api/src/opentrons/execute.py b/api/src/opentrons/execute.py index 709cd2c6c69..2f4a7fdc9e9 100644 --- a/api/src/opentrons/execute.py +++ b/api/src/opentrons/execute.py @@ -62,7 +62,6 @@ DeckType, EngineStatus, ErrorOccurrence as ProtocolEngineErrorOccurrence, - command_monitor as pe_command_monitor, create_protocol_engine, create_protocol_engine_in_thread, ) @@ -641,10 +640,6 @@ def _run_file_pe( ) -> None: """Run a protocol file with Protocol Engine.""" - def send_command_to_emit_runlog(event: pe_command_monitor.Event) -> None: - if emit_runlog is not None: - emit_runlog(_adapt_command(event)) - async def run(protocol_source: ProtocolSource) -> None: protocol_engine = await create_protocol_engine( hardware_api=hardware_api, @@ -657,12 +652,15 @@ async def run(protocol_source: ProtocolSource) -> None: hardware_api=hardware_api, ) - with pe_command_monitor.monitor_commands( - protocol_engine, callback=send_command_to_emit_runlog - ): + unsubscribe = protocol_runner.broker.subscribe( + "command", lambda event: emit_runlog(event) if emit_runlog else None + ) + try: # TODO(mm, 2023-06-30): This will home and drop tips at the end, which is not how # things have historically behaved with PAPIv2.13 and older or JSONv5 and older. result = await protocol_runner.run(protocol_source) + finally: + unsubscribe() if result.state_summary.status != EngineStatus.SUCCEEDED: raise _ProtocolEngineExecuteError(result.state_summary.errors) @@ -735,32 +733,6 @@ def _adapt_protocol_source( yield protocol_source -def _adapt_command(event: pe_command_monitor.Event) -> command_types.CommandMessage: - """Convert a Protocol Engine command event to an old-school command_types.CommandMesage.""" - before_or_after: command_types.MessageSequenceId = ( - "before" if isinstance(event, pe_command_monitor.RunningEvent) else "after" - ) - - message: command_types.CommentMessage = { - # TODO(mm, 2023-09-26): If we can without breaking the public API, remove the requirement - # to supply a "name" here. If we can't do that, consider adding a special name value - # so we don't have to lie and call every command a comment. - "name": "command.COMMENT", - "id": event.command.id, - "$": before_or_after, - # TODO(mm, 2023-09-26): Convert this machine-readable JSON into a human-readable message - # to match behavior from before Protocol Engine. - # https://opentrons.atlassian.net/browse/RSS-320 - "payload": {"text": event.command.json()}, - # As far as I know, "error" is not part of the public-facing API, so it doesn't matter - # what we put here. Leaving it as `None` to avoid difficulties in converting between - # the Protocol Engine `ErrorOccurrence` model and the regular Python `Exception` type - # that this field expects. - "error": None, - } - return message - - def _get_global_hardware_controller(robot_type: RobotType) -> ThreadManagedHardware: # Build a hardware controller in a worker thread, which is necessary # because ipython runs its notebook in asyncio but the notebook diff --git a/api/src/opentrons/protocol_engine/command_monitor.py b/api/src/opentrons/protocol_engine/command_monitor.py deleted file mode 100644 index 9f2985f59b0..00000000000 --- a/api/src/opentrons/protocol_engine/command_monitor.py +++ /dev/null @@ -1,65 +0,0 @@ -"""Monitor the execution of commands in a `ProtocolEngine`.""" - - -from dataclasses import dataclass -import typing -import contextlib - - -from opentrons.protocol_engine import Command, ProtocolEngine - - -@dataclass -class RunningEvent: - """Emitted when a command starts running.""" - - command: Command - - -@dataclass -class NoLongerRunningEvent: - """Emitted when a command stops running--either because it succeeded, or failed.""" - - command: Command - - -Event = typing.Union[RunningEvent, NoLongerRunningEvent] -Callback = typing.Callable[[Event], None] - - -@contextlib.contextmanager -def monitor_commands( - protocol_engine: ProtocolEngine, - callback: Callback, -) -> typing.Generator[None, None, None]: - """Monitor the execution of commands in `protocol_engine`. - - While this context manager is open, `callback` will be called any time `protocol_engine` - starts or stops a command. - """ - # Subscribe to all state updates in protocol_engine. - # On every update, diff the new state against the last state and see if the currently - # running command has changed. If it has, emit the appropriate events. - - last_running_id: typing.Optional[str] = None - - def handle_state_update(_message_from_broker: None) -> None: - nonlocal last_running_id - - running_id = protocol_engine.state_view.commands.get_running() - if running_id != last_running_id: - if last_running_id is not None: - callback( - NoLongerRunningEvent( - protocol_engine.state_view.commands.get(last_running_id) - ) - ) - - if running_id is not None: - callback( - RunningEvent(protocol_engine.state_view.commands.get(running_id)) - ) - last_running_id = running_id - - with protocol_engine.state_update_broker.subscribed(handle_state_update): - yield diff --git a/api/src/opentrons/protocol_engine/protocol_engine.py b/api/src/opentrons/protocol_engine/protocol_engine.py index 9ad7cb3a27c..207ebddd9d8 100644 --- a/api/src/opentrons/protocol_engine/protocol_engine.py +++ b/api/src/opentrons/protocol_engine/protocol_engine.py @@ -12,8 +12,6 @@ EnumeratedError, ) -from opentrons.util.broker import ReadOnlyBroker - from .errors import ProtocolCommandFailedError, ErrorOccurrence from .errors.exceptions import EStopActivatedError from . import commands, slot_standardization @@ -131,36 +129,6 @@ def state_view(self) -> StateView: """Get an interface to retrieve calculated state values.""" return self._state_store - @property - def state_update_broker(self) -> ReadOnlyBroker[None]: - """Return a broker that you can use to get notified of all state updates. - - For example, you can use this to do something any time a new command starts running. - - `ProtocolEngine` will publish a message to this broker (with the placeholder value `None`) - any time its state updates. Then, when you receive that message, you can get the latest - state through `state_view` and inspect it to see whether something happened that you care - about. - - Warning: - Use this mechanism sparingly, because it has several footguns: - - * Your callbacks will run synchronously, on every state update. - If they take a long time, they will harm analysis and run speed. - - * Your callbacks will run in the thread and asyncio event loop that own this - `ProtocolEngine`. (See the concurrency notes in the `ProtocolEngine` docstring.) - If your callbacks interact with things in other threads or event loops, - take appropriate precautions to keep them concurrency-safe. - - * Currently, if your callback raises an exception, it will propagate into - `ProtocolEngine` and be treated like any other internal error. This will probably - stop the run. If you expect your code to raise exceptions and don't want - that to happen, consider catching and logging them at the top level of your callback, - before they propagate into `ProtocolEngine`. - """ - return self._state_store.update_broker - def add_plugin(self, plugin: AbstractPlugin) -> None: """Add a plugin to the engine to customize behavior.""" self._plugin_starter.start(plugin) diff --git a/api/src/opentrons/protocol_engine/state/change_notifier.py b/api/src/opentrons/protocol_engine/state/change_notifier.py index 629cb89f368..3c72f277913 100644 --- a/api/src/opentrons/protocol_engine/state/change_notifier.py +++ b/api/src/opentrons/protocol_engine/state/change_notifier.py @@ -1,8 +1,6 @@ """Simple state change notification interface.""" import asyncio -from opentrons.util.broker import Broker, ReadOnlyBroker - class ChangeNotifier: """An interface tto emit or subscribe to state change notifications.""" @@ -10,22 +8,12 @@ class ChangeNotifier: def __init__(self) -> None: """Initialize the ChangeNotifier with an internal Event.""" self._event = asyncio.Event() - self._broker = Broker[None]() def notify(self) -> None: """Notify all `wait`'ers that the state has changed.""" self._event.set() - self._broker.publish(None) async def wait(self) -> None: """Wait until the next state change notification.""" self._event.clear() await self._event.wait() - - @property - def broker(self) -> ReadOnlyBroker[None]: - """Return a broker that you can use to get notified of all changes. - - This is an alternative interface to `wait()`. - """ - return self._broker diff --git a/api/src/opentrons/protocol_engine/state/commands.py b/api/src/opentrons/protocol_engine/state/commands.py index ae0ba7898cb..b4301c22920 100644 --- a/api/src/opentrons/protocol_engine/state/commands.py +++ b/api/src/opentrons/protocol_engine/state/commands.py @@ -507,17 +507,13 @@ def get_error(self) -> Optional[ErrorOccurrence]: else: return run_error or finish_error - def get_running(self) -> Optional[str]: - """Return the ID of the command that's currently running, if any.""" - return self._state.running_command_id - def get_current(self) -> Optional[CurrentCommand]: """Return the "current" command, if any. The "current" command is the command that is currently executing, or the most recent command to have completed. """ - if self._state.running_command_id is not None: + if self._state.running_command_id: entry = self._state.commands_by_id[self._state.running_command_id] return CurrentCommand( command_id=entry.command.id, diff --git a/api/src/opentrons/protocol_engine/state/state.py b/api/src/opentrons/protocol_engine/state/state.py index 5fdbd394b32..7e4695e15e6 100644 --- a/api/src/opentrons/protocol_engine/state/state.py +++ b/api/src/opentrons/protocol_engine/state/state.py @@ -8,7 +8,6 @@ from opentrons_shared_data.deck.dev_types import DeckDefinitionV3 from opentrons.protocol_engine.types import ModuleOffsetData -from opentrons.util.broker import ReadOnlyBroker from ..resources import DeckFixedLabware from ..actions import Action, ActionHandler @@ -240,17 +239,6 @@ async def wait_for( return is_done - # We return ReadOnlyBroker[None] instead of ReadOnlyBroker[StateView] in order to avoid - # confusion with state mutability. If a caller needs to know the new state, they can - # retrieve it explicitly with `ProtocolEngine.state_view`. - @property - def update_broker(self) -> ReadOnlyBroker[None]: - """Return a broker that you can use to get notified of all state updates. - - This is an alternative interface to `wait_for()`. - """ - return self._change_notifier.broker - def _get_next_state(self) -> State: """Get a new instance of the state value object.""" return State( diff --git a/api/src/opentrons/protocol_runner/protocol_runner.py b/api/src/opentrons/protocol_runner/protocol_runner.py index 1b49a159087..5aabd878ca3 100644 --- a/api/src/opentrons/protocol_runner/protocol_runner.py +++ b/api/src/opentrons/protocol_runner/protocol_runner.py @@ -59,6 +59,21 @@ class AbstractRunner(ABC): def __init__(self, protocol_engine: ProtocolEngine) -> None: self._protocol_engine = protocol_engine + self._broker = LegacyBroker() + + # TODO(mm, 2023-10-03): `LegacyBroker` is specific to Python protocols and JSON protocols ≤v5. + # We'll need to extend this in order to report progress from newer JSON protocols. + # + # TODO(mm, 2023-10-04): When we switch this to return a new `Broker` instead of a + # `LegacyBroker`, we should annotate the return type as a `ReadOnlyBroker`. + @property + def broker(self) -> LegacyBroker: + """Return a broker that you can subscribe to in order to monitor protocol progress. + + Currently, this only returns messages for `PythonAndLegacyRunner`. + Otherwise, it's a no-op. + """ + return self._broker def was_started(self) -> bool: """Whether the run has been started. @@ -136,20 +151,20 @@ async def load( protocol = self._legacy_file_reader.read( protocol_source, labware_definitions, python_parse_mode ) - broker = None equipment_broker = None if protocol.api_level < LEGACY_PYTHON_API_VERSION_CUTOFF: - broker = LegacyBroker() equipment_broker = Broker[LegacyLoadInfo]() self._protocol_engine.add_plugin( - LegacyContextPlugin(broker=broker, equipment_broker=equipment_broker) + LegacyContextPlugin( + broker=self._broker, equipment_broker=equipment_broker + ) ) context = self._legacy_context_creator.create( protocol=protocol, - broker=broker, + broker=self._broker, equipment_broker=equipment_broker, ) initial_home_command = pe_commands.HomeCreate( diff --git a/api/tests/opentrons/protocol_engine/state/test_change_notifier.py b/api/tests/opentrons/protocol_engine/state/test_change_notifier.py index ec62362d6da..4967e6d254e 100644 --- a/api/tests/opentrons/protocol_engine/state/test_change_notifier.py +++ b/api/tests/opentrons/protocol_engine/state/test_change_notifier.py @@ -54,20 +54,3 @@ async def _do_task_3() -> None: await asyncio.gather(task_1, task_2, task_3) assert results == [1, 2, 3] - - -async def test_broker() -> None: - """Test that notifications are available synchronously through `ChangeNotifier.broker`.""" - notify_count = 5 - - subject = ChangeNotifier() - received = 0 - - def callback(_message_from_broker: None) -> None: - nonlocal received - received += 1 - - with subject.broker.subscribed(callback): - for notify_number in range(notify_count): - subject.notify() - assert received == notify_number + 1 diff --git a/api/tests/opentrons/protocol_engine/state/test_command_monitor.py b/api/tests/opentrons/protocol_engine/state/test_command_monitor.py deleted file mode 100644 index dec820a97f6..00000000000 --- a/api/tests/opentrons/protocol_engine/state/test_command_monitor.py +++ /dev/null @@ -1,92 +0,0 @@ -"""Unit tests for `opentrons.protocol_engine.command_monitor`.""" - - -from datetime import datetime -from typing import List - -from decoy import Decoy - -from opentrons.protocol_engine import CommandStatus, ProtocolEngine, commands -from opentrons.protocol_engine.command_monitor import ( - Event, - NoLongerRunningEvent, - RunningEvent, - monitor_commands as subject, -) -from opentrons.util.broker import Broker - - -def _make_dummy_command(id: str, completed: bool) -> commands.Command: - if completed: - return commands.Comment( - id=id, - key=id, - status=CommandStatus.SUCCEEDED, - createdAt=datetime(2023, 9, 26), - params=commands.CommentParams(message=""), - result=None, - ) - else: - return commands.Comment( - id=id, - key=id, - status=CommandStatus.RUNNING, - createdAt=datetime(2023, 9, 26), - completedAt=datetime(2023, 9, 26), - params=commands.CommentParams(message=""), - result=commands.CommentResult(), - ) - - -def test_monitor_commands(decoy: Decoy) -> None: - """Test that it translates state updates into command running/no-longer-running events.""" - mock_protocol_engine = decoy.mock(cls=ProtocolEngine) - mock_command_view = mock_protocol_engine.state_view.commands - state_update_broker = Broker[None]() - decoy.when(mock_protocol_engine.state_update_broker).then_return( - state_update_broker - ) - - command_1_running = _make_dummy_command(id="command-1", completed=False) - command_1_completed = _make_dummy_command(id="command-1", completed=True) - command_2_running = _make_dummy_command(id="command-2", completed=False) - command_2_completed = _make_dummy_command(id="command-2", completed=True) - - received_events: List[Event] = [] - - def callback(event: Event) -> None: - received_events.append(event) - - with subject(mock_protocol_engine, callback): - # Feed the subject these states, in sequence: - # 1. No command running - # 2. "command-1" running - # 3. "command-2" running - # 4. No command running - # Between each state, notify the subject by publishing a message to the broker that it's - # subscribed to. - - decoy.when(mock_command_view.get_running()).then_return(None) - state_update_broker.publish(message=None) - - decoy.when(mock_command_view.get_running()).then_return("command-1") - decoy.when(mock_command_view.get("command-1")).then_return(command_1_running) - state_update_broker.publish(message=None) - - decoy.when(mock_command_view.get_running()).then_return("command-2") - decoy.when(mock_command_view.get("command-1")).then_return(command_1_completed) - decoy.when(mock_command_view.get("command-2")).then_return(command_2_running) - state_update_broker.publish(message=None) - - decoy.when(mock_command_view.get_running()).then_return(None) - decoy.when(mock_command_view.get("command-2")).then_return(command_2_completed) - state_update_broker.publish(message=None) - - # Make sure the callback converted the sequence of state updates into the expected sequence - # of events. - assert received_events == [ - RunningEvent(command_1_running), - NoLongerRunningEvent(command_1_completed), - RunningEvent(command_2_running), - NoLongerRunningEvent(command_2_completed), - ] diff --git a/api/tests/opentrons/protocol_engine/state/test_command_view.py b/api/tests/opentrons/protocol_engine/state/test_command_view.py index d4f77db8dbe..b9cc6835ce3 100644 --- a/api/tests/opentrons/protocol_engine/state/test_command_view.py +++ b/api/tests/opentrons/protocol_engine/state/test_command_view.py @@ -660,15 +660,6 @@ def test_get_okay_to_clear(subject: CommandView, expected_is_okay: bool) -> None assert subject.get_is_okay_to_clear() is expected_is_okay -def test_get_running() -> None: - """It should return the command that's currently running.""" - subject = get_command_view(running_command_id=None) - assert subject.get_running() is None - - subject = get_command_view(running_command_id="command-id") - assert subject.get_running() == "command-id" - - def test_get_current() -> None: """It should return the "current" command.""" subject = get_command_view( diff --git a/api/tests/opentrons/test_execute.py b/api/tests/opentrons/test_execute.py index d233914af24..9a4ac9fb673 100644 --- a/api/tests/opentrons/test_execute.py +++ b/api/tests/opentrons/test_execute.py @@ -6,7 +6,7 @@ import textwrap import mock from pathlib import Path -from typing import TYPE_CHECKING, Any, Callable, Generator, TextIO, cast +from typing import TYPE_CHECKING, Any, Callable, Generator, List, TextIO, cast import pytest @@ -58,60 +58,35 @@ async def dummy_delay(self: Any, duration_s: float) -> None: return gai_mock -@pytest.mark.parametrize("protocol_file", ["testosaur_v2.py"]) -def test_execute_function_apiv2( - protocol: Protocol, - protocol_file: str, - virtual_smoothie_env: None, - mock_get_attached_instr: mock.AsyncMock, -) -> None: - """Test `execute()` with a Python file.""" - converted_model_v15 = pipette_load_name.convert_pipette_model( - cast(PipetteModel, "p10_single_v1.5") - ) - converted_model_v1 = pipette_load_name.convert_pipette_model( - cast(PipetteModel, "p1000_single_v1") - ) - - mock_get_attached_instr.return_value[types.Mount.LEFT] = { - "config": load_pipette_data.load_definition( - converted_model_v15.pipette_type, - converted_model_v15.pipette_channels, - converted_model_v15.pipette_version, +@pytest.mark.parametrize( + ("protocol_file", "expected_entries"), + [ + ( + "testosaur_v2.py", + [ + "Picking up tip from A1 of Opentrons 96 Tip Rack 1000 µL on 1", + "Aspirating 100.0 uL from A1 of Corning 96 Well Plate 360 µL Flat on 2 at 500.0 uL/sec", + "Dispensing 100.0 uL into B1 of Corning 96 Well Plate 360 µL Flat on 2 at 1000.0 uL/sec", + "Dropping tip into H12 of Opentrons 96 Tip Rack 1000 µL on 1", + ], ), - "id": "testid", - } - mock_get_attached_instr.return_value[types.Mount.RIGHT] = { - "config": load_pipette_data.load_definition( - converted_model_v1.pipette_type, - converted_model_v1.pipette_channels, - converted_model_v1.pipette_version, + ( + # FIXME(2023-10-04): This run log is wrong. It should match the one above. + # https://opentrons.atlassian.net/browse/RSS-368 + "testosaur_v2_14.py", + [ + "Picking up tip from A1 of None", + "Aspirating 100.0 uL from A1 of None at 500.0 uL/sec", + "Dispensing 100.0 uL into B1 of None at 1000.0 uL/sec", + "Dropping tip into H12 of None", + ], ), - "id": "testid2", - } - entries = [] - - def emit_runlog(entry: Any) -> None: - nonlocal entries - entries.append(entry) - - execute.execute(protocol.filelike, protocol.filename, emit_runlog=emit_runlog) - - assert [item["payload"]["text"] for item in entries if item["$"] == "before"] == [ - "Picking up tip from A1 of Opentrons 96 Tip Rack 1000 µL on 1", - "Aspirating 100.0 uL from A1 of Corning 96 Well Plate 360 µL Flat on 2 at 500.0 uL/sec", - "Dispensing 100.0 uL into B1 of Corning 96 Well Plate 360 µL Flat on 2 at 1000.0 uL/sec", - "Dropping tip into H12 of Opentrons 96 Tip Rack 1000 µL on 1", - ] - - -# TODO(mm, 2023-09-26): Merge this with the above test_execute_apiv2_14() function when -# we resolve https://opentrons.atlassian.net/browse/RSS-320 and PAPIv≥2.14 protocols emit -# human-readable run log text. -@pytest.mark.parametrize("protocol_file", ["testosaur_v2_14.py"]) -def test_execute_function_apiv2_14( + ], +) +def test_execute_function_apiv2( protocol: Protocol, protocol_file: str, + expected_entries: List[str], virtual_smoothie_env: None, mock_get_attached_instr: mock.AsyncMock, ) -> None: @@ -147,27 +122,9 @@ def emit_runlog(entry: Any) -> None: execute.execute(protocol.filelike, protocol.filename, emit_runlog=emit_runlog) - # https://opentrons.atlassian.net/browse/RSS-320: - # PAPIv≥2.14 protocols currently emit JSON run log text, not human-readable text. - # Their exact contents can't be tested here because they're too verbose and they have - # unpredictable fields like `createdAt` and `id`. So as an approximation, we just test - # the command types. - command_types = [ - json.loads(item["payload"]["text"])["commandType"] - for item in entries - if item["$"] == "before" - ] - assert command_types == [ - "home", - "home", - "loadLabware", - "loadPipette", - "loadLabware", - "pickUpTip", - "aspirate", - "dispense", - "dropTip", - ] + assert [ + item["payload"]["text"] for item in entries if item["$"] == "before" + ] == expected_entries def test_execute_function_json_v3(