Skip to content

Commit

Permalink
[externals] Make dagster-external no-op if launching process was not …
Browse files Browse the repository at this point in the history
…dagster
  • Loading branch information
smackesey committed Aug 21, 2023
1 parent 3422d50 commit 733612e
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 6 deletions.
28 changes: 22 additions & 6 deletions python_modules/dagster-external/dagster_external/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import json
import socket
import sys
import warnings
from typing import Any, ClassVar, Mapping, Optional, Sequence, TextIO

from typing_extensions import Self
Expand Down Expand Up @@ -29,15 +30,26 @@


def init_dagster_external() -> "ExternalExecutionContext":
if ExternalExecutionContext.is_initialized():
return ExternalExecutionContext.get()
params = get_external_execution_params()
data = _read_input(params)
output_stream = _get_output_stream(params)
if params.is_orchestration_active:
data = _read_input(params)
output_stream = _get_output_stream(params)

# Sockets can hang without this.
if output_stream is not sys.stdout:
atexit.register(_close_stream, output_stream)
# Sockets can hang without this.
if output_stream is not sys.stdout:
atexit.register(_close_stream, output_stream)

context = ExternalExecutionContext(data, output_stream)
context = ExternalExecutionContext(data, output_stream)
else:
from unittest.mock import MagicMock

warnings.warn(
"This process was not launched by a Dagster orchestration process. All calls to the"
" `dagster-external` are no-ops."
)
context = MagicMock()
ExternalExecutionContext.set(context)
return context

Expand Down Expand Up @@ -91,6 +103,10 @@ def _close_stream(stream) -> None:
class ExternalExecutionContext:
_instance: ClassVar[Optional[Self]] = None

@classmethod
def is_initialized(cls) -> bool:
return cls._instance is not None

@classmethod
def set(cls, context: Self) -> None:
cls._instance = context
Expand Down
3 changes: 3 additions & 0 deletions python_modules/dagster-external/dagster_external/params.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,15 @@


def get_external_execution_params() -> "ExternalExecutionParams":
is_orchestration_active = bool(os.getenv("DAGSTER_EXTERNAL_IS_ORCHESTRATION_ACTIVE"))
raw_input_mode = os.getenv(
DAGSTER_EXTERNAL_ENV_KEYS["input_mode"], DAGSTER_EXTERNAL_DEFAULT_INPUT_MODE
)
raw_output_mode = os.getenv(
DAGSTER_EXTERNAL_ENV_KEYS["output_mode"], DAGSTER_EXTERNAL_DEFAULT_OUTPUT_MODE
)
return ExternalExecutionParams(
is_orchestration_active=is_orchestration_active,
input_mode=ExternalExecutionIOMode[raw_input_mode],
output_mode=ExternalExecutionIOMode[raw_output_mode],
input_path=os.getenv(DAGSTER_EXTERNAL_ENV_KEYS["input"]),
Expand All @@ -31,6 +33,7 @@ def get_external_execution_params() -> "ExternalExecutionParams":

@dataclass
class ExternalExecutionParams:
is_orchestration_active: bool
input_mode: str
output_mode: str
input_path: Optional[str]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
DAGSTER_EXTERNAL_DEFAULT_OUTPUT_FILENAME: Final = "dagster_external_output"

DAGSTER_EXTERNAL_ENV_KEYS: Final = {
"is_orchestration_active": "DAGSTER_EXTERNAL_IS_ORCHESTRATION_ACTIVE",
"input_mode": "DAGSTER_EXTERNAL_INPUT_MODE",
"output_mode": "DAGSTER_EXTERNAL_OUTPUT_MODE",
"input": "DAGSTER_EXTERNAL_INPUT",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import inspect
import re
import subprocess
import textwrap
from contextlib import contextmanager
from tempfile import NamedTemporaryFile
Expand Down Expand Up @@ -187,3 +188,25 @@ def foo(context: AssetExecutionContext, ext: ExternalExecutionResource):
)
with pytest.raises(CheckError, match=r"directory \S+ does not currently exist"):
materialize([foo], resources={"ext": resource})


def test_external_execution_no_orchestration():
def script_fn():
from dagster_external import ExternalExecutionContext, init_dagster_external

init_dagster_external()
context = ExternalExecutionContext.get()
context.log("hello world")
context.report_asset_metadata("foo", "bar", context.get_extra("bar"))
context.report_asset_data_version("foo", "alpha")

with temp_script(script_fn) as script_path:
cmd = ["python", script_path]
stdout, stderr = subprocess.Popen(
cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE
).communicate()
assert re.search(
r"This process was not launched by a Dagster orchestration process. All calls to the"
r" `dagster-external` are no-ops.",
stderr.decode(),
)
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ def run(self) -> None:
base_env = {
**os.environ,
**self.env,
DAGSTER_EXTERNAL_ENV_KEYS["is_orchestration_active"]: "1",
DAGSTER_EXTERNAL_ENV_KEYS["input_mode"]: self._input_mode.value,
DAGSTER_EXTERNAL_ENV_KEYS["output_mode"]: self._output_mode.value,
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ def run(
base_env = {
**os.environ,
**self.env,
DAGSTER_EXTERNAL_ENV_KEYS["is_orchestration_active"]: "1",
DAGSTER_EXTERNAL_ENV_KEYS["input_mode"]: self._input_mode.value,
DAGSTER_EXTERNAL_ENV_KEYS["output_mode"]: self._output_mode.value,
}
Expand Down

0 comments on commit 733612e

Please sign in to comment.