Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(api): New protocols print a JSON "run log" from opentrons_execute and opentrons.execute.execute() #13629

Merged
merged 16 commits into from
Sep 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 40 additions & 11 deletions api/src/opentrons/execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
ThreadManagedHardware,
ThreadManager,
)
from opentrons.protocol_engine.types import PostRunHardwareState

from opentrons.protocols import parse
from opentrons.protocols.api_support.deck_type import (
Expand All @@ -63,9 +62,11 @@
DeckType,
EngineStatus,
ErrorOccurrence as ProtocolEngineErrorOccurrence,
command_monitor as pe_command_monitor,
create_protocol_engine,
create_protocol_engine_in_thread,
)
from opentrons.protocol_engine.types import PostRunHardwareState

from opentrons.protocol_reader import ProtocolReader, ProtocolSource

Expand Down Expand Up @@ -407,13 +408,6 @@ def execute( # noqa: C901
else:
# TODO(mm, 2023-07-06): Once these NotImplementedErrors are resolved, consider removing
# the enclosing if-else block and running everything through _run_file_pe() for simplicity.
if emit_runlog:
raise NotImplementedError(
f"Printing the run log is not currently supported for Python protocols"
f" with apiLevel {ENGINE_CORE_API_VERSION} or newer."
f" Pass --no-print-runlog to opentrons_execute"
f" or emit_runlog=None to opentrons.execute.execute()."
)
if custom_data_paths:
raise NotImplementedError(
f"The custom_data_paths argument is not currently supported for Python protocols"
Expand All @@ -425,6 +419,7 @@ def execute( # noqa: C901
protocol_name=protocol_name,
extra_labware=extra_labware,
hardware_api=_get_global_hardware_controller(_get_robot_type()).wrapped(),
emit_runlog=emit_runlog,
)


Expand Down Expand Up @@ -642,9 +637,14 @@ def _run_file_pe(
protocol_name: str,
extra_labware: Dict[str, FoundLabware],
hardware_api: HardwareControlAPI,
emit_runlog: Optional[_EmitRunlogCallable],
) -> None:
"""Run a protocol file with Protocol Engine."""

def send_command_to_emit_runlog(event: pe_command_monitor.Event) -> None:
if emit_runlog is not None:
emit_runlog(_adapt_command(event))

async def run(protocol_source: ProtocolSource) -> None:
protocol_engine = await create_protocol_engine(
hardware_api=hardware_api,
Expand All @@ -657,9 +657,12 @@ async def run(protocol_source: ProtocolSource) -> None:
hardware_api=hardware_api,
)

# TODO(mm, 2023-06-30): This will home and drop tips at the end, which is not how
# things have historically behaved with PAPIv2.13 and older or JSONv5 and older.
result = await protocol_runner.run(protocol_source)
with pe_command_monitor.monitor_commands(
protocol_engine, callback=send_command_to_emit_runlog
):
# TODO(mm, 2023-06-30): This will home and drop tips at the end, which is not how
# things have historically behaved with PAPIv2.13 and older or JSONv5 and older.
result = await protocol_runner.run(protocol_source)

if result.state_summary.status != EngineStatus.SUCCEEDED:
raise _ProtocolEngineExecuteError(result.state_summary.errors)
Expand Down Expand Up @@ -732,6 +735,32 @@ def _adapt_protocol_source(
yield protocol_source


def _adapt_command(event: pe_command_monitor.Event) -> command_types.CommandMessage:
"""Convert a Protocol Engine command event to an old-school command_types.CommandMesage."""
before_or_after: command_types.MessageSequenceId = (
"before" if isinstance(event, pe_command_monitor.RunningEvent) else "after"
)

message: command_types.CommentMessage = {
# TODO(mm, 2023-09-26): If we can without breaking the public API, remove the requirement
# to supply a "name" here. If we can't do that, consider adding a special name value
# so we don't have to lie and call every command a comment.
"name": "command.COMMENT",
"id": event.command.id,
"$": before_or_after,
# TODO(mm, 2023-09-26): Convert this machine-readable JSON into a human-readable message
# to match behavior from before Protocol Engine.
# https://opentrons.atlassian.net/browse/RSS-320
"payload": {"text": event.command.json()},
# As far as I know, "error" is not part of the public-facing API, so it doesn't matter
# what we put here. Leaving it as `None` to avoid difficulties in converting between
# the Protocol Engine `ErrorOccurrence` model and the regular Python `Exception` type
# that this field expects.
"error": None,
}
return message


def _get_global_hardware_controller(robot_type: RobotType) -> ThreadManagedHardware:
# Build a hardware controller in a worker thread, which is necessary
# because ipython runs its notebook in asyncio but the notebook
Expand Down
65 changes: 65 additions & 0 deletions api/src/opentrons/protocol_engine/command_monitor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
"""Monitor the execution of commands in a `ProtocolEngine`."""


from dataclasses import dataclass
import typing
import contextlib


from opentrons.protocol_engine import Command, ProtocolEngine


@dataclass
class RunningEvent:
"""Emitted when a command starts running."""

command: Command


@dataclass
class NoLongerRunningEvent:
"""Emitted when a command stops running--either because it succeeded, or failed."""

command: Command


Event = typing.Union[RunningEvent, NoLongerRunningEvent]
Callback = typing.Callable[[Event], None]


@contextlib.contextmanager
def monitor_commands(
protocol_engine: ProtocolEngine,
callback: Callback,
) -> typing.Generator[None, None, None]:
"""Monitor the execution of commands in `protocol_engine`.

While this context manager is open, `callback` will be called any time `protocol_engine`
starts or stops a command.
"""
# Subscribe to all state updates in protocol_engine.
# On every update, diff the new state against the last state and see if the currently
# running command has changed. If it has, emit the appropriate events.

last_running_id: typing.Optional[str] = None

def handle_state_update(_message_from_broker: None) -> None:
nonlocal last_running_id
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is new to me :-)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah...Python. 😬


running_id = protocol_engine.state_view.commands.get_running()
if running_id != last_running_id:
if last_running_id is not None:
callback(
NoLongerRunningEvent(
protocol_engine.state_view.commands.get(last_running_id)
)
)

if running_id is not None:
callback(
RunningEvent(protocol_engine.state_view.commands.get(running_id))
)
last_running_id = running_id

with protocol_engine.state_update_broker.subscribed(handle_state_update):
yield
32 changes: 32 additions & 0 deletions api/src/opentrons/protocol_engine/protocol_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
EnumeratedError,
)

from opentrons.util.broker import ReadOnlyBroker

from .errors import ProtocolCommandFailedError, ErrorOccurrence
from .errors.exceptions import EStopActivatedError
from . import commands, slot_standardization
Expand Down Expand Up @@ -129,6 +131,36 @@ def state_view(self) -> StateView:
"""Get an interface to retrieve calculated state values."""
return self._state_store

@property
def state_update_broker(self) -> ReadOnlyBroker[None]:
Comment on lines +134 to +135
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Things I do not like about this:

  • All of the footguns described in the docstring.
  • It's callback-based, which can be spaghetti.

Things I like about this:

  • Making the mechanism this generic—as opposed to something more specific like wait_for_commands()—lets us build more logic outside of ProtocolEngine. This is really nice for curbing ProtocolEngine complexity.
  • This is is roughly compatible with my experiments in notifications for the HTTP API. I'm expecting various internal classes like ProtocolEngine and AnalysisStore to grow interfaces that let callers monitor them for changes in a coarse-grained way. Then, higher-level endpoints filter those notifications down and return only what the client actually cares about.
  • Despite callbacks being ugly, they are suited to the actual requirements of RSS-238. The ultimate user-facing interface is callback-based. If we want async generators later, it's easier to build those atop callbacks than to do it the other way around.

"""Return a broker that you can use to get notified of all state updates.

For example, you can use this to do something any time a new command starts running.

`ProtocolEngine` will publish a message to this broker (with the placeholder value `None`)
any time its state updates. Then, when you receive that message, you can get the latest
state through `state_view` and inspect it to see whether something happened that you care
about.

Warning:
Use this mechanism sparingly, because it has several footguns:

* Your callbacks will run synchronously, on every state update.
If they take a long time, they will harm analysis and run speed.

* Your callbacks will run in the thread and asyncio event loop that own this
`ProtocolEngine`. (See the concurrency notes in the `ProtocolEngine` docstring.)
If your callbacks interact with things in other threads or event loops,
take appropriate precautions to keep them concurrency-safe.

* Currently, if your callback raises an exception, it will propagate into
`ProtocolEngine` and be treated like any other internal error. This will probably
stop the run. If you expect your code to raise exceptions and don't want
that to happen, consider catching and logging them at the top level of your callback,
before they propagate into `ProtocolEngine`.
"""
return self._state_store.update_broker

def add_plugin(self, plugin: AbstractPlugin) -> None:
"""Add a plugin to the engine to customize behavior."""
self._plugin_starter.start(plugin)
Expand Down
12 changes: 12 additions & 0 deletions api/src/opentrons/protocol_engine/state/change_notifier.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,31 @@
"""Simple state change notification interface."""
import asyncio

from opentrons.util.broker import Broker, ReadOnlyBroker


class ChangeNotifier:
"""An interface tto emit or subscribe to state change notifications."""

def __init__(self) -> None:
"""Initialize the ChangeNotifier with an internal Event."""
self._event = asyncio.Event()
self._broker = Broker[None]()

def notify(self) -> None:
"""Notify all `wait`'ers that the state has changed."""
self._event.set()
self._broker.publish(None)

async def wait(self) -> None:
"""Wait until the next state change notification."""
self._event.clear()
await self._event.wait()

@property
def broker(self) -> ReadOnlyBroker[None]:
"""Return a broker that you can use to get notified of all changes.

This is an alternative interface to `wait()`.
"""
return self._broker
6 changes: 5 additions & 1 deletion api/src/opentrons/protocol_engine/state/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -507,13 +507,17 @@ def get_error(self) -> Optional[ErrorOccurrence]:
else:
return run_error or finish_error

def get_running(self) -> Optional[str]:
"""Return the ID of the command that's currently running, if any."""
return self._state.running_command_id

def get_current(self) -> Optional[CurrentCommand]:
"""Return the "current" command, if any.

The "current" command is the command that is currently executing,
or the most recent command to have completed.
"""
if self._state.running_command_id:
if self._state.running_command_id is not None:
entry = self._state.commands_by_id[self._state.running_command_id]
return CurrentCommand(
command_id=entry.command.id,
Expand Down
15 changes: 14 additions & 1 deletion api/src/opentrons/protocol_engine/state/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@
from dataclasses import dataclass
from functools import partial
from typing import Any, Callable, Dict, List, Optional, Sequence, TypeVar
from opentrons.protocol_engine.types import ModuleOffsetVector

from opentrons_shared_data.deck.dev_types import DeckDefinitionV3

from opentrons.protocol_engine.types import ModuleOffsetVector
from opentrons.util.broker import ReadOnlyBroker

from ..resources import DeckFixedLabware
from ..actions import Action, ActionHandler
from .abstract_store import HasState, HandlesActions
Expand Down Expand Up @@ -238,6 +240,17 @@ async def wait_for(

return is_done

# We return ReadOnlyBroker[None] instead of ReadOnlyBroker[StateView] in order to avoid
# confusion with state mutability. If a caller needs to know the new state, they can
# retrieve it explicitly with `ProtocolEngine.state_view`.
@property
def update_broker(self) -> ReadOnlyBroker[None]:
"""Return a broker that you can use to get notified of all state updates.

This is an alternative interface to `wait_for()`.
"""
return self._change_notifier.broker

def _get_next_state(self) -> State:
"""Get a new instance of the state value object."""
return State(
Expand Down
11 changes: 6 additions & 5 deletions api/src/opentrons/protocol_runner/legacy_context_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from opentrons.commands.types import CommandMessage as LegacyCommand
from opentrons.legacy_broker import LegacyBroker
from opentrons.protocol_engine import AbstractPlugin, actions as pe_actions
from opentrons.util.broker import Broker
from opentrons.util.broker import ReadOnlyBroker

from .legacy_wrappers import LegacyLoadInfo
from .legacy_command_mapper import LegacyCommandMapper
Expand Down Expand Up @@ -37,7 +37,7 @@ class LegacyContextPlugin(AbstractPlugin):
def __init__(
self,
broker: LegacyBroker,
equipment_broker: Broker[LegacyLoadInfo],
equipment_broker: ReadOnlyBroker[LegacyLoadInfo],
legacy_command_mapper: Optional[LegacyCommandMapper] = None,
) -> None:
"""Initialize the plugin with its dependencies."""
Expand Down Expand Up @@ -78,10 +78,11 @@ def setup(self) -> None:
)
exit_stack.callback(command_broker_unsubscribe)

equipment_broker_unsubscribe = self._equipment_broker.subscribe(
callback=self._handle_equipment_loaded
exit_stack.enter_context(
self._equipment_broker.subscribed(
callback=self._handle_equipment_loaded
)
)
exit_stack.callback(equipment_broker_unsubscribe)

# All subscriptions succeeded.
# Save the exit stack so our teardown method can use it later
Expand Down
Loading