Skip to content

Commit

Permalink
[ext] add protocol version to all messages
Browse files Browse the repository at this point in the history
  • Loading branch information
smackesey committed Sep 22, 2023
1 parent b824aa3 commit ceceeb3
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 3 deletions.
10 changes: 9 additions & 1 deletion python_modules/dagster-ext/dagster_ext/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@
# ##### PROTOCOL
# ########################

# This represents the version of the protocol, rather than the version of the package. It must be
# manually updated whenever there are changes to the protocol.
EXT_PROTOCOL_VERSION = "0.1"

ExtExtras = Mapping[str, Any]
ExtParams = Mapping[str, Any]
Expand All @@ -62,8 +65,11 @@ def _param_name_to_env_key(key: str) -> str:

# ##### MESSAGE

EXT_PROTOCOL_VERSION_FIELD = "__dagster_ext_version"


class ExtMessage(TypedDict):
EXT_PROTOCOL_VERSION_FIELD: str
method: str
params: Optional[Mapping[str, Any]]

Expand Down Expand Up @@ -677,7 +683,9 @@ def __init__(
self._materialized_assets: set[str] = set()

def _write_message(self, method: str, params: Optional[Mapping[str, Any]] = None) -> None:
message = ExtMessage(method=method, params=params)
message = ExtMessage(
EXT_PROTOCOL_VERSION_FIELD=EXT_PROTOCOL_VERSION, method=method, params=params
)
self._message_channel.write_message(message)

# ########################
Expand Down
4 changes: 2 additions & 2 deletions python_modules/dagster/dagster/_core/ext/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from typing import Iterator, Optional

from dagster_ext import (
EXT_PROTOCOL_VERSION_FIELD,
ExtContextData,
ExtDefaultContextLoader,
ExtDefaultMessageWriter,
Expand Down Expand Up @@ -177,8 +178,7 @@ def extract_message_or_forward_to_stdout(handler: "ExtMessageHandler", log_line:
# exceptions as control flow, you love to see it
try:
message = json.loads(log_line)
# need better message check
if message.keys() == {"method", "params"}:
if EXT_PROTOCOL_VERSION_FIELD in message.keys():
handler.handle_message(message)
except Exception:
# move non-message logs in to stdout for compute log capture
Expand Down

0 comments on commit ceceeb3

Please sign in to comment.