Skip to content

Commit

Permalink
refactor(api): Monitor PAPI commands, not PE commands (#13724)
Browse files Browse the repository at this point in the history
* Remove broker from ProtocolEngine.

This partially reverts commit 24eb3c1 ("fix(api): New protocols print a JSON "run log" from opentrons_execute and opentrons.execute.execute()  (#13629)").

* Add broker to ProtocolRunner.

* Update execute.py.

* Update test_execute.py.

* Further deduplicate test_execute.py.
  • Loading branch information
SyntaxColoring committed Oct 18, 2023
1 parent 894979f commit 83628f3
Show file tree
Hide file tree
Showing 11 changed files with 55 additions and 353 deletions.
40 changes: 6 additions & 34 deletions api/src/opentrons/execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@
DeckType,
EngineStatus,
ErrorOccurrence as ProtocolEngineErrorOccurrence,
command_monitor as pe_command_monitor,
create_protocol_engine,
create_protocol_engine_in_thread,
)
Expand Down Expand Up @@ -641,10 +640,6 @@ def _run_file_pe(
) -> None:
"""Run a protocol file with Protocol Engine."""

def send_command_to_emit_runlog(event: pe_command_monitor.Event) -> None:
if emit_runlog is not None:
emit_runlog(_adapt_command(event))

async def run(protocol_source: ProtocolSource) -> None:
protocol_engine = await create_protocol_engine(
hardware_api=hardware_api,
Expand All @@ -657,12 +652,15 @@ async def run(protocol_source: ProtocolSource) -> None:
hardware_api=hardware_api,
)

with pe_command_monitor.monitor_commands(
protocol_engine, callback=send_command_to_emit_runlog
):
unsubscribe = protocol_runner.broker.subscribe(
"command", lambda event: emit_runlog(event) if emit_runlog else None
)
try:
# TODO(mm, 2023-06-30): This will home and drop tips at the end, which is not how
# things have historically behaved with PAPIv2.13 and older or JSONv5 and older.
result = await protocol_runner.run(protocol_source)
finally:
unsubscribe()

if result.state_summary.status != EngineStatus.SUCCEEDED:
raise _ProtocolEngineExecuteError(result.state_summary.errors)
Expand Down Expand Up @@ -735,32 +733,6 @@ def _adapt_protocol_source(
yield protocol_source


def _adapt_command(event: pe_command_monitor.Event) -> command_types.CommandMessage:
"""Convert a Protocol Engine command event to an old-school command_types.CommandMesage."""
before_or_after: command_types.MessageSequenceId = (
"before" if isinstance(event, pe_command_monitor.RunningEvent) else "after"
)

message: command_types.CommentMessage = {
# TODO(mm, 2023-09-26): If we can without breaking the public API, remove the requirement
# to supply a "name" here. If we can't do that, consider adding a special name value
# so we don't have to lie and call every command a comment.
"name": "command.COMMENT",
"id": event.command.id,
"$": before_or_after,
# TODO(mm, 2023-09-26): Convert this machine-readable JSON into a human-readable message
# to match behavior from before Protocol Engine.
# https://opentrons.atlassian.net/browse/RSS-320
"payload": {"text": event.command.json()},
# As far as I know, "error" is not part of the public-facing API, so it doesn't matter
# what we put here. Leaving it as `None` to avoid difficulties in converting between
# the Protocol Engine `ErrorOccurrence` model and the regular Python `Exception` type
# that this field expects.
"error": None,
}
return message


def _get_global_hardware_controller(robot_type: RobotType) -> ThreadManagedHardware:
# Build a hardware controller in a worker thread, which is necessary
# because ipython runs its notebook in asyncio but the notebook
Expand Down
65 changes: 0 additions & 65 deletions api/src/opentrons/protocol_engine/command_monitor.py

This file was deleted.

32 changes: 0 additions & 32 deletions api/src/opentrons/protocol_engine/protocol_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
EnumeratedError,
)

from opentrons.util.broker import ReadOnlyBroker

from .errors import ProtocolCommandFailedError, ErrorOccurrence
from .errors.exceptions import EStopActivatedError
from . import commands, slot_standardization
Expand Down Expand Up @@ -131,36 +129,6 @@ def state_view(self) -> StateView:
"""Get an interface to retrieve calculated state values."""
return self._state_store

@property
def state_update_broker(self) -> ReadOnlyBroker[None]:
"""Return a broker that you can use to get notified of all state updates.
For example, you can use this to do something any time a new command starts running.
`ProtocolEngine` will publish a message to this broker (with the placeholder value `None`)
any time its state updates. Then, when you receive that message, you can get the latest
state through `state_view` and inspect it to see whether something happened that you care
about.
Warning:
Use this mechanism sparingly, because it has several footguns:
* Your callbacks will run synchronously, on every state update.
If they take a long time, they will harm analysis and run speed.
* Your callbacks will run in the thread and asyncio event loop that own this
`ProtocolEngine`. (See the concurrency notes in the `ProtocolEngine` docstring.)
If your callbacks interact with things in other threads or event loops,
take appropriate precautions to keep them concurrency-safe.
* Currently, if your callback raises an exception, it will propagate into
`ProtocolEngine` and be treated like any other internal error. This will probably
stop the run. If you expect your code to raise exceptions and don't want
that to happen, consider catching and logging them at the top level of your callback,
before they propagate into `ProtocolEngine`.
"""
return self._state_store.update_broker

def add_plugin(self, plugin: AbstractPlugin) -> None:
"""Add a plugin to the engine to customize behavior."""
self._plugin_starter.start(plugin)
Expand Down
12 changes: 0 additions & 12 deletions api/src/opentrons/protocol_engine/state/change_notifier.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,19 @@
"""Simple state change notification interface."""
import asyncio

from opentrons.util.broker import Broker, ReadOnlyBroker


class ChangeNotifier:
"""An interface tto emit or subscribe to state change notifications."""

def __init__(self) -> None:
"""Initialize the ChangeNotifier with an internal Event."""
self._event = asyncio.Event()
self._broker = Broker[None]()

def notify(self) -> None:
"""Notify all `wait`'ers that the state has changed."""
self._event.set()
self._broker.publish(None)

async def wait(self) -> None:
"""Wait until the next state change notification."""
self._event.clear()
await self._event.wait()

@property
def broker(self) -> ReadOnlyBroker[None]:
"""Return a broker that you can use to get notified of all changes.
This is an alternative interface to `wait()`.
"""
return self._broker
6 changes: 1 addition & 5 deletions api/src/opentrons/protocol_engine/state/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -507,17 +507,13 @@ def get_error(self) -> Optional[ErrorOccurrence]:
else:
return run_error or finish_error

def get_running(self) -> Optional[str]:
"""Return the ID of the command that's currently running, if any."""
return self._state.running_command_id

def get_current(self) -> Optional[CurrentCommand]:
"""Return the "current" command, if any.
The "current" command is the command that is currently executing,
or the most recent command to have completed.
"""
if self._state.running_command_id is not None:
if self._state.running_command_id:
entry = self._state.commands_by_id[self._state.running_command_id]
return CurrentCommand(
command_id=entry.command.id,
Expand Down
11 changes: 0 additions & 11 deletions api/src/opentrons/protocol_engine/state/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,17 +240,6 @@ async def wait_for(

return is_done

# We return ReadOnlyBroker[None] instead of ReadOnlyBroker[StateView] in order to avoid
# confusion with state mutability. If a caller needs to know the new state, they can
# retrieve it explicitly with `ProtocolEngine.state_view`.
@property
def update_broker(self) -> ReadOnlyBroker[None]:
"""Return a broker that you can use to get notified of all state updates.
This is an alternative interface to `wait_for()`.
"""
return self._change_notifier.broker

def _get_next_state(self) -> State:
"""Get a new instance of the state value object."""
return State(
Expand Down
23 changes: 19 additions & 4 deletions api/src/opentrons/protocol_runner/protocol_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,21 @@ class AbstractRunner(ABC):

def __init__(self, protocol_engine: ProtocolEngine) -> None:
self._protocol_engine = protocol_engine
self._broker = LegacyBroker()

# TODO(mm, 2023-10-03): `LegacyBroker` is specific to Python protocols and JSON protocols ≤v5.
# We'll need to extend this in order to report progress from newer JSON protocols.
#
# TODO(mm, 2023-10-04): When we switch this to return a new `Broker` instead of a
# `LegacyBroker`, we should annotate the return type as a `ReadOnlyBroker`.
@property
def broker(self) -> LegacyBroker:
"""Return a broker that you can subscribe to in order to monitor protocol progress.
Currently, this only returns messages for `PythonAndLegacyRunner`.
Otherwise, it's a no-op.
"""
return self._broker

def was_started(self) -> bool:
"""Whether the run has been started.
Expand Down Expand Up @@ -136,20 +151,20 @@ async def load(
protocol = self._legacy_file_reader.read(
protocol_source, labware_definitions, python_parse_mode
)
broker = None
equipment_broker = None

if protocol.api_level < LEGACY_PYTHON_API_VERSION_CUTOFF:
broker = LegacyBroker()
equipment_broker = Broker[LegacyLoadInfo]()

self._protocol_engine.add_plugin(
LegacyContextPlugin(broker=broker, equipment_broker=equipment_broker)
LegacyContextPlugin(
broker=self._broker, equipment_broker=equipment_broker
)
)

context = self._legacy_context_creator.create(
protocol=protocol,
broker=broker,
broker=self._broker,
equipment_broker=equipment_broker,
)
initial_home_command = pe_commands.HomeCreate(
Expand Down
17 changes: 0 additions & 17 deletions api/tests/opentrons/protocol_engine/state/test_change_notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,20 +54,3 @@ async def _do_task_3() -> None:
await asyncio.gather(task_1, task_2, task_3)

assert results == [1, 2, 3]


async def test_broker() -> None:
"""Test that notifications are available synchronously through `ChangeNotifier.broker`."""
notify_count = 5

subject = ChangeNotifier()
received = 0

def callback(_message_from_broker: None) -> None:
nonlocal received
received += 1

with subject.broker.subscribed(callback):
for notify_number in range(notify_count):
subject.notify()
assert received == notify_number + 1
Loading

0 comments on commit 83628f3

Please sign in to comment.