Skip to content

Commit

Permalink
fix(api): New protocols print a JSON "run log" from opentrons_execute…
Browse files Browse the repository at this point in the history
… and opentrons.execute.execute() (#13629)
  • Loading branch information
SyntaxColoring authored Sep 28, 2023
1 parent b538156 commit 24eb3c1
Show file tree
Hide file tree
Showing 14 changed files with 473 additions and 79 deletions.
51 changes: 40 additions & 11 deletions api/src/opentrons/execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
ThreadManagedHardware,
ThreadManager,
)
from opentrons.protocol_engine.types import PostRunHardwareState

from opentrons.protocols import parse
from opentrons.protocols.api_support.deck_type import (
Expand All @@ -63,9 +62,11 @@
DeckType,
EngineStatus,
ErrorOccurrence as ProtocolEngineErrorOccurrence,
command_monitor as pe_command_monitor,
create_protocol_engine,
create_protocol_engine_in_thread,
)
from opentrons.protocol_engine.types import PostRunHardwareState

from opentrons.protocol_reader import ProtocolReader, ProtocolSource

Expand Down Expand Up @@ -407,13 +408,6 @@ def execute( # noqa: C901
else:
# TODO(mm, 2023-07-06): Once these NotImplementedErrors are resolved, consider removing
# the enclosing if-else block and running everything through _run_file_pe() for simplicity.
if emit_runlog:
raise NotImplementedError(
f"Printing the run log is not currently supported for Python protocols"
f" with apiLevel {ENGINE_CORE_API_VERSION} or newer."
f" Pass --no-print-runlog to opentrons_execute"
f" or emit_runlog=None to opentrons.execute.execute()."
)
if custom_data_paths:
raise NotImplementedError(
f"The custom_data_paths argument is not currently supported for Python protocols"
Expand All @@ -425,6 +419,7 @@ def execute( # noqa: C901
protocol_name=protocol_name,
extra_labware=extra_labware,
hardware_api=_get_global_hardware_controller(_get_robot_type()).wrapped(),
emit_runlog=emit_runlog,
)


Expand Down Expand Up @@ -642,9 +637,14 @@ def _run_file_pe(
protocol_name: str,
extra_labware: Dict[str, FoundLabware],
hardware_api: HardwareControlAPI,
emit_runlog: Optional[_EmitRunlogCallable],
) -> None:
"""Run a protocol file with Protocol Engine."""

def send_command_to_emit_runlog(event: pe_command_monitor.Event) -> None:
if emit_runlog is not None:
emit_runlog(_adapt_command(event))

async def run(protocol_source: ProtocolSource) -> None:
protocol_engine = await create_protocol_engine(
hardware_api=hardware_api,
Expand All @@ -657,9 +657,12 @@ async def run(protocol_source: ProtocolSource) -> None:
hardware_api=hardware_api,
)

# TODO(mm, 2023-06-30): This will home and drop tips at the end, which is not how
# things have historically behaved with PAPIv2.13 and older or JSONv5 and older.
result = await protocol_runner.run(protocol_source)
with pe_command_monitor.monitor_commands(
protocol_engine, callback=send_command_to_emit_runlog
):
# TODO(mm, 2023-06-30): This will home and drop tips at the end, which is not how
# things have historically behaved with PAPIv2.13 and older or JSONv5 and older.
result = await protocol_runner.run(protocol_source)

if result.state_summary.status != EngineStatus.SUCCEEDED:
raise _ProtocolEngineExecuteError(result.state_summary.errors)
Expand Down Expand Up @@ -732,6 +735,32 @@ def _adapt_protocol_source(
yield protocol_source


def _adapt_command(event: pe_command_monitor.Event) -> command_types.CommandMessage:
"""Convert a Protocol Engine command event to an old-school command_types.CommandMesage."""
before_or_after: command_types.MessageSequenceId = (
"before" if isinstance(event, pe_command_monitor.RunningEvent) else "after"
)

message: command_types.CommentMessage = {
# TODO(mm, 2023-09-26): If we can without breaking the public API, remove the requirement
# to supply a "name" here. If we can't do that, consider adding a special name value
# so we don't have to lie and call every command a comment.
"name": "command.COMMENT",
"id": event.command.id,
"$": before_or_after,
# TODO(mm, 2023-09-26): Convert this machine-readable JSON into a human-readable message
# to match behavior from before Protocol Engine.
# https://opentrons.atlassian.net/browse/RSS-320
"payload": {"text": event.command.json()},
# As far as I know, "error" is not part of the public-facing API, so it doesn't matter
# what we put here. Leaving it as `None` to avoid difficulties in converting between
# the Protocol Engine `ErrorOccurrence` model and the regular Python `Exception` type
# that this field expects.
"error": None,
}
return message


def _get_global_hardware_controller(robot_type: RobotType) -> ThreadManagedHardware:
# Build a hardware controller in a worker thread, which is necessary
# because ipython runs its notebook in asyncio but the notebook
Expand Down
65 changes: 65 additions & 0 deletions api/src/opentrons/protocol_engine/command_monitor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
"""Monitor the execution of commands in a `ProtocolEngine`."""


from dataclasses import dataclass
import typing
import contextlib


from opentrons.protocol_engine import Command, ProtocolEngine


@dataclass
class RunningEvent:
"""Emitted when a command starts running."""

command: Command


@dataclass
class NoLongerRunningEvent:
"""Emitted when a command stops running--either because it succeeded, or failed."""

command: Command


Event = typing.Union[RunningEvent, NoLongerRunningEvent]
Callback = typing.Callable[[Event], None]


@contextlib.contextmanager
def monitor_commands(
protocol_engine: ProtocolEngine,
callback: Callback,
) -> typing.Generator[None, None, None]:
"""Monitor the execution of commands in `protocol_engine`.
While this context manager is open, `callback` will be called any time `protocol_engine`
starts or stops a command.
"""
# Subscribe to all state updates in protocol_engine.
# On every update, diff the new state against the last state and see if the currently
# running command has changed. If it has, emit the appropriate events.

last_running_id: typing.Optional[str] = None

def handle_state_update(_message_from_broker: None) -> None:
nonlocal last_running_id

running_id = protocol_engine.state_view.commands.get_running()
if running_id != last_running_id:
if last_running_id is not None:
callback(
NoLongerRunningEvent(
protocol_engine.state_view.commands.get(last_running_id)
)
)

if running_id is not None:
callback(
RunningEvent(protocol_engine.state_view.commands.get(running_id))
)
last_running_id = running_id

with protocol_engine.state_update_broker.subscribed(handle_state_update):
yield
32 changes: 32 additions & 0 deletions api/src/opentrons/protocol_engine/protocol_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
EnumeratedError,
)

from opentrons.util.broker import ReadOnlyBroker

from .errors import ProtocolCommandFailedError, ErrorOccurrence
from .errors.exceptions import EStopActivatedError
from . import commands, slot_standardization
Expand Down Expand Up @@ -129,6 +131,36 @@ def state_view(self) -> StateView:
"""Get an interface to retrieve calculated state values."""
return self._state_store

@property
def state_update_broker(self) -> ReadOnlyBroker[None]:
"""Return a broker that you can use to get notified of all state updates.
For example, you can use this to do something any time a new command starts running.
`ProtocolEngine` will publish a message to this broker (with the placeholder value `None`)
any time its state updates. Then, when you receive that message, you can get the latest
state through `state_view` and inspect it to see whether something happened that you care
about.
Warning:
Use this mechanism sparingly, because it has several footguns:
* Your callbacks will run synchronously, on every state update.
If they take a long time, they will harm analysis and run speed.
* Your callbacks will run in the thread and asyncio event loop that own this
`ProtocolEngine`. (See the concurrency notes in the `ProtocolEngine` docstring.)
If your callbacks interact with things in other threads or event loops,
take appropriate precautions to keep them concurrency-safe.
* Currently, if your callback raises an exception, it will propagate into
`ProtocolEngine` and be treated like any other internal error. This will probably
stop the run. If you expect your code to raise exceptions and don't want
that to happen, consider catching and logging them at the top level of your callback,
before they propagate into `ProtocolEngine`.
"""
return self._state_store.update_broker

def add_plugin(self, plugin: AbstractPlugin) -> None:
"""Add a plugin to the engine to customize behavior."""
self._plugin_starter.start(plugin)
Expand Down
12 changes: 12 additions & 0 deletions api/src/opentrons/protocol_engine/state/change_notifier.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,31 @@
"""Simple state change notification interface."""
import asyncio

from opentrons.util.broker import Broker, ReadOnlyBroker


class ChangeNotifier:
"""An interface tto emit or subscribe to state change notifications."""

def __init__(self) -> None:
"""Initialize the ChangeNotifier with an internal Event."""
self._event = asyncio.Event()
self._broker = Broker[None]()

def notify(self) -> None:
"""Notify all `wait`'ers that the state has changed."""
self._event.set()
self._broker.publish(None)

async def wait(self) -> None:
"""Wait until the next state change notification."""
self._event.clear()
await self._event.wait()

@property
def broker(self) -> ReadOnlyBroker[None]:
"""Return a broker that you can use to get notified of all changes.
This is an alternative interface to `wait()`.
"""
return self._broker
6 changes: 5 additions & 1 deletion api/src/opentrons/protocol_engine/state/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -507,13 +507,17 @@ def get_error(self) -> Optional[ErrorOccurrence]:
else:
return run_error or finish_error

def get_running(self) -> Optional[str]:
"""Return the ID of the command that's currently running, if any."""
return self._state.running_command_id

def get_current(self) -> Optional[CurrentCommand]:
"""Return the "current" command, if any.
The "current" command is the command that is currently executing,
or the most recent command to have completed.
"""
if self._state.running_command_id:
if self._state.running_command_id is not None:
entry = self._state.commands_by_id[self._state.running_command_id]
return CurrentCommand(
command_id=entry.command.id,
Expand Down
15 changes: 14 additions & 1 deletion api/src/opentrons/protocol_engine/state/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@
from dataclasses import dataclass
from functools import partial
from typing import Any, Callable, Dict, List, Optional, Sequence, TypeVar
from opentrons.protocol_engine.types import ModuleOffsetVector

from opentrons_shared_data.deck.dev_types import DeckDefinitionV3

from opentrons.protocol_engine.types import ModuleOffsetVector
from opentrons.util.broker import ReadOnlyBroker

from ..resources import DeckFixedLabware
from ..actions import Action, ActionHandler
from .abstract_store import HasState, HandlesActions
Expand Down Expand Up @@ -238,6 +240,17 @@ async def wait_for(

return is_done

# We return ReadOnlyBroker[None] instead of ReadOnlyBroker[StateView] in order to avoid
# confusion with state mutability. If a caller needs to know the new state, they can
# retrieve it explicitly with `ProtocolEngine.state_view`.
@property
def update_broker(self) -> ReadOnlyBroker[None]:
"""Return a broker that you can use to get notified of all state updates.
This is an alternative interface to `wait_for()`.
"""
return self._change_notifier.broker

def _get_next_state(self) -> State:
"""Get a new instance of the state value object."""
return State(
Expand Down
11 changes: 6 additions & 5 deletions api/src/opentrons/protocol_runner/legacy_context_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from opentrons.commands.types import CommandMessage as LegacyCommand
from opentrons.legacy_broker import LegacyBroker
from opentrons.protocol_engine import AbstractPlugin, actions as pe_actions
from opentrons.util.broker import Broker
from opentrons.util.broker import ReadOnlyBroker

from .legacy_wrappers import LegacyLoadInfo
from .legacy_command_mapper import LegacyCommandMapper
Expand Down Expand Up @@ -37,7 +37,7 @@ class LegacyContextPlugin(AbstractPlugin):
def __init__(
self,
broker: LegacyBroker,
equipment_broker: Broker[LegacyLoadInfo],
equipment_broker: ReadOnlyBroker[LegacyLoadInfo],
legacy_command_mapper: Optional[LegacyCommandMapper] = None,
) -> None:
"""Initialize the plugin with its dependencies."""
Expand Down Expand Up @@ -78,10 +78,11 @@ def setup(self) -> None:
)
exit_stack.callback(command_broker_unsubscribe)

equipment_broker_unsubscribe = self._equipment_broker.subscribe(
callback=self._handle_equipment_loaded
exit_stack.enter_context(
self._equipment_broker.subscribed(
callback=self._handle_equipment_loaded
)
)
exit_stack.callback(equipment_broker_unsubscribe)

# All subscriptions succeeded.
# Save the exit stack so our teardown method can use it later
Expand Down
Loading

0 comments on commit 24eb3c1

Please sign in to comment.