Skip to content

Commit

Permalink
[externals] Remove IO modes entirely (dagster-io#16008)
Browse files Browse the repository at this point in the history
## Summary & Motivation

Removes IO modes entirely from `dagster-external`, standardizing on the
use of temporary files for IPC.

## How I Tested These Changes

Existing test suite (though many tests have been deleted).
  • Loading branch information
smackesey authored and sirawats committed Aug 24, 2023
1 parent 851955c commit 2dafb4c
Show file tree
Hide file tree
Showing 6 changed files with 58 additions and 409 deletions.
73 changes: 17 additions & 56 deletions python_modules/dagster-external/dagster_external/context.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,20 @@
import atexit
import json
import socket
import sys
import os
import warnings
from typing import Any, ClassVar, Mapping, Optional, Sequence, TextIO

from typing_extensions import Self

from dagster_external.params import ExternalExecutionParams, get_external_execution_params
from dagster_external.params import get_external_execution_params

from .protocol import (
DAGSTER_EXTERNAL_ENV_KEYS,
ExternalDataProvenance,
ExternalExecutionContextData,
ExternalExecutionIOMode,
ExternalPartitionKeyRange,
ExternalTimeWindow,
Notification,
SocketServerControlMessage,
)
from .util import (
assert_defined_asset_property,
Expand All @@ -30,75 +28,38 @@


def is_dagster_orchestration_active() -> bool:
params = get_external_execution_params()
return params.is_orchestration_active
return bool(os.getenv(DAGSTER_EXTERNAL_ENV_KEYS["is_orchestration_active"]))


def init_dagster_external() -> "ExternalExecutionContext":
if ExternalExecutionContext.is_initialized():
return ExternalExecutionContext.get()
params = get_external_execution_params()
if params.is_orchestration_active:
data = _read_input(params)
output_stream = _get_output_stream(params)

# Sockets can hang without this.
if output_stream is not sys.stdout:
atexit.register(_close_stream, output_stream)

if is_dagster_orchestration_active():
params = get_external_execution_params()
data = _read_input(params.input_path)
output_stream = _get_output_stream(params.output_path)
atexit.register(_close_stream, output_stream)
context = ExternalExecutionContext(data, output_stream)
else:
from unittest.mock import MagicMock

warnings.warn(
"This process was not launched by a Dagster orchestration process. All calls to the"
" `dagster-external` are no-ops."
" `dagster-external` context are no-ops."
)
context = MagicMock()
ExternalExecutionContext.set(context)
return context


def _read_input(params: ExternalExecutionParams) -> ExternalExecutionContextData:
if params.input_mode == ExternalExecutionIOMode.stdio:
with sys.stdin as f:
return json.load(f)
elif params.input_mode == ExternalExecutionIOMode.file:
assert params.input_path, "input_path must be set when input_mode is `file`"
with open(params.input_path, "r") as f:
return json.load(f)
elif params.input_mode == ExternalExecutionIOMode.fifo:
assert params.input_path, "input_path must be set when input_mode is `fifo`"
with open(params.input_path, "r") as f:
return json.load(f)
elif params.input_mode == ExternalExecutionIOMode.socket:
assert params.host, "host must be set when input_mode is `socket`"
assert params.port, "port must be set when input_mode is `socket`"
with socket.create_connection((params.host, params.port)) as sock:
sock.makefile("w").write(f"{SocketServerControlMessage.get_context.value}\n")
return json.loads(sock.makefile("r").readline())
else:
raise Exception(f"Invalid input mode: {params.input_mode}")


def _get_output_stream(params: ExternalExecutionParams) -> TextIO:
if params.output_mode == ExternalExecutionIOMode.stdio:
return sys.stdout
elif params.output_mode == ExternalExecutionIOMode.file:
assert params.output_path, "output_path must be set when output_mode is `file`"
return open(params.output_path, "a")
elif params.output_mode == ExternalExecutionIOMode.fifo:
assert params.output_path, "output_path must be set when output_mode is `fifo`"
return open(params.output_path, "w")
elif params.output_mode == ExternalExecutionIOMode.socket:
assert params.host, "host must be set when output_mode is `socket`"
assert params.port, "port must be set when output_mode is `socket`"
sock = socket.create_connection((params.host, params.port))
stream = sock.makefile("w")
stream.write(f"{SocketServerControlMessage.initiate_client_stream.value}\n")
return stream
else:
raise Exception(f"Invalid output mode: {params.output_mode}")
def _read_input(path: str) -> ExternalExecutionContextData:
with open(path, "r") as f:
return json.load(f)


def _get_output_stream(path: str) -> TextIO:
return open(path, "a")


def _close_stream(stream) -> None:
Expand Down
35 changes: 8 additions & 27 deletions python_modules/dagster-external/dagster_external/params.py
Original file line number Diff line number Diff line change
@@ -1,42 +1,23 @@
import os
from dataclasses import dataclass
from typing import Optional

from dagster_external.protocol import (
DAGSTER_EXTERNAL_DEFAULT_HOST,
DAGSTER_EXTERNAL_DEFAULT_INPUT_MODE,
DAGSTER_EXTERNAL_DEFAULT_OUTPUT_MODE,
DAGSTER_EXTERNAL_DEFAULT_PORT,
DAGSTER_EXTERNAL_ENV_KEYS,
ExternalExecutionIOMode,
)


def get_external_execution_params() -> "ExternalExecutionParams":
is_orchestration_active = bool(os.getenv("DAGSTER_EXTERNAL_IS_ORCHESTRATION_ACTIVE"))
raw_input_mode = os.getenv(
DAGSTER_EXTERNAL_ENV_KEYS["input_mode"], DAGSTER_EXTERNAL_DEFAULT_INPUT_MODE
)
raw_output_mode = os.getenv(
DAGSTER_EXTERNAL_ENV_KEYS["output_mode"], DAGSTER_EXTERNAL_DEFAULT_OUTPUT_MODE
)
input_path = os.getenv(DAGSTER_EXTERNAL_ENV_KEYS["input"])
output_path = os.getenv(DAGSTER_EXTERNAL_ENV_KEYS["output"])
assert input_path, "input_path must be set"
assert output_path, "output_path must be set"
return ExternalExecutionParams(
is_orchestration_active=is_orchestration_active,
input_mode=ExternalExecutionIOMode[raw_input_mode],
output_mode=ExternalExecutionIOMode[raw_output_mode],
input_path=os.getenv(DAGSTER_EXTERNAL_ENV_KEYS["input"]),
output_path=os.getenv(DAGSTER_EXTERNAL_ENV_KEYS["output"]),
host=os.getenv(DAGSTER_EXTERNAL_ENV_KEYS["host"], DAGSTER_EXTERNAL_DEFAULT_HOST),
port=int(os.getenv(DAGSTER_EXTERNAL_ENV_KEYS["port"], DAGSTER_EXTERNAL_DEFAULT_PORT)),
input_path=input_path,
output_path=output_path,
)


@dataclass
class ExternalExecutionParams:
is_orchestration_active: bool
input_mode: str
output_mode: str
input_path: Optional[str]
output_path: Optional[str]
host: str
port: int
input_path: str
output_path: str
28 changes: 0 additions & 28 deletions python_modules/dagster-external/dagster_external/protocol.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from enum import Enum
from typing import Any, Mapping, Optional, Sequence

from typing_extensions import Final, TypeAlias, TypedDict
Expand All @@ -7,42 +6,15 @@

# ##### PARAMETERS

DAGSTER_EXTERNAL_DEFAULT_HOST: Final = "localhost"
DAGSTER_EXTERNAL_DEFAULT_PORT: Final = 9716
DAGSTER_EXTERNAL_DEFAULT_INPUT_MODE: Final = "file"
DAGSTER_EXTERNAL_DEFAULT_OUTPUT_MODE: Final = "file"
DAGSTER_EXTERNAL_DEFAULT_INPUT_FILENAME: Final = "dagster_external_input"
DAGSTER_EXTERNAL_DEFAULT_OUTPUT_FILENAME: Final = "dagster_external_output"

DAGSTER_EXTERNAL_ENV_KEYS: Final = {
"is_orchestration_active": "DAGSTER_EXTERNAL_IS_ORCHESTRATION_ACTIVE",
"input_mode": "DAGSTER_EXTERNAL_INPUT_MODE",
"output_mode": "DAGSTER_EXTERNAL_OUTPUT_MODE",
"input": "DAGSTER_EXTERNAL_INPUT",
"output": "DAGSTER_EXTERNAL_OUTPUT",
"host": "DAGSTER_EXTERNAL_HOST",
"port": "DAGSTER_EXTERNAL_PORT",
}

# ##### SOCKET SERVER


class SocketServerControlMessage(str, Enum):
shutdown = "__shutdown__"
get_context = "__get_context__"
initiate_client_stream = "__initiate_client_stream__"


# ##### IO MODES


class ExternalExecutionIOMode(str, Enum):
stdio = "stdio"
file = "file"
fifo = "fifo"
socket = "socket"


# ##### NOTIFICATION


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
SubprocessExecutionResource,
)
from dagster._core.instance_for_test import instance_for_test
from dagster_external.protocol import ExternalExecutionIOMode


@contextmanager
Expand All @@ -35,68 +34,17 @@ def temp_script(script_fn: Callable[[], Any]) -> Iterator[str]:


@pytest.mark.parametrize(
["input_spec", "output_spec"],
["input_file_spec", "output_file_spec"],
[
("stdio", "stdio"),
("stdio", "file/auto"),
("stdio", "file/user"),
("stdio", "fifo/auto"),
("stdio", "fifo/user"),
("stdio", "socket"),
("file/auto", "stdio"),
("file/auto", "file/auto"),
("file/auto", "file/user"),
("file/auto", "fifo/auto"),
("file/auto", "fifo/user"),
("file/auto", "socket"),
("file/user", "stdio"),
("file/user", "file/auto"),
("file/user", "file/user"),
("file/user", "fifo/auto"),
("file/user", "fifo/user"),
("file/user", "socket"),
("fifo/auto", "stdio"),
("fifo/auto", "file/auto"),
("fifo/auto", "file/user"),
("fifo/auto", "fifo/auto"),
("fifo/auto", "fifo/user"),
("fifo/auto", "socket"),
("fifo/user", "stdio"),
("fifo/user", "file/auto"),
("fifo/user", "file/user"),
("fifo/user", "fifo/auto"),
("fifo/user", "fifo/user"),
("fifo/user", "socket"),
("socket", "stdio"),
("socket", "file/auto"),
("socket", "file/user"),
("socket", "fifo/auto"),
("socket", "fifo/user"),
("socket", "socket"),
("auto", "auto"),
("auto", "user"),
("user", "auto"),
("user", "user"),
],
)
def test_external_subprocess_asset(input_spec: str, output_spec: str, tmpdir, capsys):
if input_spec in ["stdio", "socket"]:
input_mode = ExternalExecutionIOMode(input_spec)
input_path = None
else:
input_mode_spec, input_path_spec = input_spec.split("/")
input_mode = ExternalExecutionIOMode(input_mode_spec)
if input_path_spec == "auto":
input_path = None
else:
input_path = str(tmpdir.join("input"))

if output_spec in ["stdio", "socket"]:
output_mode = ExternalExecutionIOMode(output_spec)
output_path = None
else:
output_mode_spec, output_path_spec = output_spec.split("/")
output_mode = ExternalExecutionIOMode(output_mode_spec)
if output_path_spec == "auto":
output_path = None
else:
output_path = str(tmpdir.join("output"))
def test_external_subprocess_asset(input_file_spec: str, output_file_spec: str, tmpdir, capsys):
input_path = None if input_file_spec == "auto" else str(tmpdir.join("input"))
output_path = None if output_file_spec == "auto" else str(tmpdir.join("output"))

def script_fn():
from dagster_external import ExternalExecutionContext, init_dagster_external
Expand All @@ -115,8 +63,6 @@ def foo(context: AssetExecutionContext, ext: SubprocessExecutionResource):
ext.run(cmd, context=context, extras=extras)

resource = SubprocessExecutionResource(
input_mode=input_mode,
output_mode=output_mode,
input_path=input_path,
output_path=output_path,
)
Expand Down Expand Up @@ -171,18 +117,14 @@ def foo(context: AssetExecutionContext, ext: SubprocessExecutionResource):


@pytest.mark.parametrize(
["input_mode_name", "input_path", "output_mode_name", "output_path"],
["input_path", "output_path"],
[
("file", PATH_WITH_NONEXISTENT_DIR, "stdio", None),
("fifo", PATH_WITH_NONEXISTENT_DIR, "stdio", None),
("stdio", None, "file", PATH_WITH_NONEXISTENT_DIR),
("stdio", None, "fifo", PATH_WITH_NONEXISTENT_DIR),
(PATH_WITH_NONEXISTENT_DIR, None),
(None, PATH_WITH_NONEXISTENT_DIR),
],
)
def test_external_execution_invalid_path(
input_mode_name: str,
input_path: Optional[str],
output_mode_name: str,
output_path: Optional[str],
):
def script_fn():
Expand All @@ -195,9 +137,7 @@ def foo(context: AssetExecutionContext, ext: SubprocessExecutionResource):
ext.run(cmd, context=context)

resource = SubprocessExecutionResource(
input_mode=ExternalExecutionIOMode(input_mode_name),
input_path=input_path,
output_mode=ExternalExecutionIOMode(output_mode_name),
output_path=output_path,
)
with pytest.raises(CheckError, match=r"directory \S+ does not currently exist"):
Expand All @@ -222,11 +162,11 @@ def script_fn():

with temp_script(script_fn) as script_path:
cmd = ["python", script_path]
stdout, stderr = subprocess.Popen(
_, stderr = subprocess.Popen(
cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE
).communicate()
assert re.search(
r"This process was not launched by a Dagster orchestration process. All calls to the"
r" `dagster-external` are no-ops.",
r" `dagster-external` context are no-ops.",
stderr.decode(),
)
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
from dagster._core.external_execution.subprocess import (
SubprocessExecutionResource,
)
from dagster_external.protocol import ExternalExecutionIOMode
from pydantic import Field

# Add package container to path
Expand Down Expand Up @@ -58,8 +57,6 @@ def number_sum(context: AssetExecutionContext, ext: SubprocessExecutionResource)


ext = SubprocessExecutionResource(
input_mode=ExternalExecutionIOMode.stdio,
output_mode=ExternalExecutionIOMode.stdio,
env=get_env(),
)

Expand Down
Loading

0 comments on commit 2dafb4c

Please sign in to comment.