Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[externals] Remove IO modes entirely #16008

Merged
merged 1 commit into from
Aug 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 17 additions & 56 deletions python_modules/dagster-external/dagster_external/context.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,20 @@
import atexit
import json
import socket
import sys
import os
import warnings
from typing import Any, ClassVar, Mapping, Optional, Sequence, TextIO

from typing_extensions import Self

from dagster_external.params import ExternalExecutionParams, get_external_execution_params
from dagster_external.params import get_external_execution_params

from .protocol import (
DAGSTER_EXTERNAL_ENV_KEYS,
ExternalDataProvenance,
ExternalExecutionContextData,
ExternalExecutionIOMode,
ExternalPartitionKeyRange,
ExternalTimeWindow,
Notification,
SocketServerControlMessage,
)
from .util import (
assert_defined_asset_property,
Expand All @@ -30,75 +28,38 @@


def is_dagster_orchestration_active() -> bool:
params = get_external_execution_params()
return params.is_orchestration_active
return bool(os.getenv(DAGSTER_EXTERNAL_ENV_KEYS["is_orchestration_active"]))


def init_dagster_external() -> "ExternalExecutionContext":
if ExternalExecutionContext.is_initialized():
return ExternalExecutionContext.get()
params = get_external_execution_params()
if params.is_orchestration_active:
data = _read_input(params)
output_stream = _get_output_stream(params)

# Sockets can hang without this.
if output_stream is not sys.stdout:
atexit.register(_close_stream, output_stream)

if is_dagster_orchestration_active():
params = get_external_execution_params()
data = _read_input(params.input_path)
output_stream = _get_output_stream(params.output_path)
atexit.register(_close_stream, output_stream)
context = ExternalExecutionContext(data, output_stream)
else:
from unittest.mock import MagicMock

warnings.warn(
"This process was not launched by a Dagster orchestration process. All calls to the"
" `dagster-external` are no-ops."
" `dagster-external` context are no-ops."
)
context = MagicMock()
ExternalExecutionContext.set(context)
return context


def _read_input(params: ExternalExecutionParams) -> ExternalExecutionContextData:
if params.input_mode == ExternalExecutionIOMode.stdio:
with sys.stdin as f:
return json.load(f)
elif params.input_mode == ExternalExecutionIOMode.file:
assert params.input_path, "input_path must be set when input_mode is `file`"
with open(params.input_path, "r") as f:
return json.load(f)
elif params.input_mode == ExternalExecutionIOMode.fifo:
assert params.input_path, "input_path must be set when input_mode is `fifo`"
with open(params.input_path, "r") as f:
return json.load(f)
elif params.input_mode == ExternalExecutionIOMode.socket:
assert params.host, "host must be set when input_mode is `socket`"
assert params.port, "port must be set when input_mode is `socket`"
with socket.create_connection((params.host, params.port)) as sock:
sock.makefile("w").write(f"{SocketServerControlMessage.get_context.value}\n")
return json.loads(sock.makefile("r").readline())
else:
raise Exception(f"Invalid input mode: {params.input_mode}")


def _get_output_stream(params: ExternalExecutionParams) -> TextIO:
if params.output_mode == ExternalExecutionIOMode.stdio:
return sys.stdout
elif params.output_mode == ExternalExecutionIOMode.file:
assert params.output_path, "output_path must be set when output_mode is `file`"
return open(params.output_path, "a")
elif params.output_mode == ExternalExecutionIOMode.fifo:
assert params.output_path, "output_path must be set when output_mode is `fifo`"
return open(params.output_path, "w")
elif params.output_mode == ExternalExecutionIOMode.socket:
assert params.host, "host must be set when output_mode is `socket`"
assert params.port, "port must be set when output_mode is `socket`"
sock = socket.create_connection((params.host, params.port))
stream = sock.makefile("w")
stream.write(f"{SocketServerControlMessage.initiate_client_stream.value}\n")
return stream
else:
raise Exception(f"Invalid output mode: {params.output_mode}")
def _read_input(path: str) -> ExternalExecutionContextData:
with open(path, "r") as f:
return json.load(f)


def _get_output_stream(path: str) -> TextIO:
return open(path, "a")


def _close_stream(stream) -> None:
Expand Down
35 changes: 8 additions & 27 deletions python_modules/dagster-external/dagster_external/params.py
Original file line number Diff line number Diff line change
@@ -1,42 +1,23 @@
import os
from dataclasses import dataclass
from typing import Optional

from dagster_external.protocol import (
DAGSTER_EXTERNAL_DEFAULT_HOST,
DAGSTER_EXTERNAL_DEFAULT_INPUT_MODE,
DAGSTER_EXTERNAL_DEFAULT_OUTPUT_MODE,
DAGSTER_EXTERNAL_DEFAULT_PORT,
DAGSTER_EXTERNAL_ENV_KEYS,
ExternalExecutionIOMode,
)


def get_external_execution_params() -> "ExternalExecutionParams":
is_orchestration_active = bool(os.getenv("DAGSTER_EXTERNAL_IS_ORCHESTRATION_ACTIVE"))
raw_input_mode = os.getenv(
DAGSTER_EXTERNAL_ENV_KEYS["input_mode"], DAGSTER_EXTERNAL_DEFAULT_INPUT_MODE
)
raw_output_mode = os.getenv(
DAGSTER_EXTERNAL_ENV_KEYS["output_mode"], DAGSTER_EXTERNAL_DEFAULT_OUTPUT_MODE
)
input_path = os.getenv(DAGSTER_EXTERNAL_ENV_KEYS["input"])
output_path = os.getenv(DAGSTER_EXTERNAL_ENV_KEYS["output"])
assert input_path, "input_path must be set"
assert output_path, "output_path must be set"
return ExternalExecutionParams(
is_orchestration_active=is_orchestration_active,
input_mode=ExternalExecutionIOMode[raw_input_mode],
output_mode=ExternalExecutionIOMode[raw_output_mode],
input_path=os.getenv(DAGSTER_EXTERNAL_ENV_KEYS["input"]),
output_path=os.getenv(DAGSTER_EXTERNAL_ENV_KEYS["output"]),
host=os.getenv(DAGSTER_EXTERNAL_ENV_KEYS["host"], DAGSTER_EXTERNAL_DEFAULT_HOST),
port=int(os.getenv(DAGSTER_EXTERNAL_ENV_KEYS["port"], DAGSTER_EXTERNAL_DEFAULT_PORT)),
input_path=input_path,
output_path=output_path,
)


@dataclass
class ExternalExecutionParams:
is_orchestration_active: bool
input_mode: str
output_mode: str
input_path: Optional[str]
output_path: Optional[str]
host: str
port: int
input_path: str
output_path: str
28 changes: 0 additions & 28 deletions python_modules/dagster-external/dagster_external/protocol.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from enum import Enum
from typing import Any, Mapping, Optional, Sequence

from typing_extensions import Final, TypeAlias, TypedDict
Expand All @@ -7,42 +6,15 @@

# ##### PARAMETERS

DAGSTER_EXTERNAL_DEFAULT_HOST: Final = "localhost"
DAGSTER_EXTERNAL_DEFAULT_PORT: Final = 9716
DAGSTER_EXTERNAL_DEFAULT_INPUT_MODE: Final = "file"
DAGSTER_EXTERNAL_DEFAULT_OUTPUT_MODE: Final = "file"
DAGSTER_EXTERNAL_DEFAULT_INPUT_FILENAME: Final = "dagster_external_input"
DAGSTER_EXTERNAL_DEFAULT_OUTPUT_FILENAME: Final = "dagster_external_output"

DAGSTER_EXTERNAL_ENV_KEYS: Final = {
"is_orchestration_active": "DAGSTER_EXTERNAL_IS_ORCHESTRATION_ACTIVE",
"input_mode": "DAGSTER_EXTERNAL_INPUT_MODE",
"output_mode": "DAGSTER_EXTERNAL_OUTPUT_MODE",
"input": "DAGSTER_EXTERNAL_INPUT",
"output": "DAGSTER_EXTERNAL_OUTPUT",
"host": "DAGSTER_EXTERNAL_HOST",
"port": "DAGSTER_EXTERNAL_PORT",
}

# ##### SOCKET SERVER


class SocketServerControlMessage(str, Enum):
shutdown = "__shutdown__"
get_context = "__get_context__"
initiate_client_stream = "__initiate_client_stream__"


# ##### IO MODES


class ExternalExecutionIOMode(str, Enum):
stdio = "stdio"
file = "file"
fifo = "fifo"
socket = "socket"


# ##### NOTIFICATION


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
SubprocessExecutionResource,
)
from dagster._core.instance_for_test import instance_for_test
from dagster_external.protocol import ExternalExecutionIOMode


@contextmanager
Expand All @@ -35,68 +34,17 @@ def temp_script(script_fn: Callable[[], Any]) -> Iterator[str]:


@pytest.mark.parametrize(
["input_spec", "output_spec"],
["input_file_spec", "output_file_spec"],
[
("stdio", "stdio"),
("stdio", "file/auto"),
("stdio", "file/user"),
("stdio", "fifo/auto"),
("stdio", "fifo/user"),
("stdio", "socket"),
("file/auto", "stdio"),
("file/auto", "file/auto"),
("file/auto", "file/user"),
("file/auto", "fifo/auto"),
("file/auto", "fifo/user"),
("file/auto", "socket"),
("file/user", "stdio"),
("file/user", "file/auto"),
("file/user", "file/user"),
("file/user", "fifo/auto"),
("file/user", "fifo/user"),
("file/user", "socket"),
("fifo/auto", "stdio"),
("fifo/auto", "file/auto"),
("fifo/auto", "file/user"),
("fifo/auto", "fifo/auto"),
("fifo/auto", "fifo/user"),
("fifo/auto", "socket"),
("fifo/user", "stdio"),
("fifo/user", "file/auto"),
("fifo/user", "file/user"),
("fifo/user", "fifo/auto"),
("fifo/user", "fifo/user"),
("fifo/user", "socket"),
("socket", "stdio"),
("socket", "file/auto"),
("socket", "file/user"),
("socket", "fifo/auto"),
("socket", "fifo/user"),
("socket", "socket"),
("auto", "auto"),
("auto", "user"),
("user", "auto"),
("user", "user"),
],
)
def test_external_subprocess_asset(input_spec: str, output_spec: str, tmpdir, capsys):
if input_spec in ["stdio", "socket"]:
input_mode = ExternalExecutionIOMode(input_spec)
input_path = None
else:
input_mode_spec, input_path_spec = input_spec.split("/")
input_mode = ExternalExecutionIOMode(input_mode_spec)
if input_path_spec == "auto":
input_path = None
else:
input_path = str(tmpdir.join("input"))

if output_spec in ["stdio", "socket"]:
output_mode = ExternalExecutionIOMode(output_spec)
output_path = None
else:
output_mode_spec, output_path_spec = output_spec.split("/")
output_mode = ExternalExecutionIOMode(output_mode_spec)
if output_path_spec == "auto":
output_path = None
else:
output_path = str(tmpdir.join("output"))
def test_external_subprocess_asset(input_file_spec: str, output_file_spec: str, tmpdir, capsys):
input_path = None if input_file_spec == "auto" else str(tmpdir.join("input"))
output_path = None if output_file_spec == "auto" else str(tmpdir.join("output"))

def script_fn():
from dagster_external import ExternalExecutionContext, init_dagster_external
Expand All @@ -115,8 +63,6 @@ def foo(context: AssetExecutionContext, ext: SubprocessExecutionResource):
ext.run(cmd, context=context, extras=extras)

resource = SubprocessExecutionResource(
input_mode=input_mode,
output_mode=output_mode,
input_path=input_path,
output_path=output_path,
)
Expand Down Expand Up @@ -171,18 +117,14 @@ def foo(context: AssetExecutionContext, ext: SubprocessExecutionResource):


@pytest.mark.parametrize(
["input_mode_name", "input_path", "output_mode_name", "output_path"],
["input_path", "output_path"],
[
("file", PATH_WITH_NONEXISTENT_DIR, "stdio", None),
("fifo", PATH_WITH_NONEXISTENT_DIR, "stdio", None),
("stdio", None, "file", PATH_WITH_NONEXISTENT_DIR),
("stdio", None, "fifo", PATH_WITH_NONEXISTENT_DIR),
(PATH_WITH_NONEXISTENT_DIR, None),
(None, PATH_WITH_NONEXISTENT_DIR),
],
)
def test_external_execution_invalid_path(
input_mode_name: str,
input_path: Optional[str],
output_mode_name: str,
output_path: Optional[str],
):
def script_fn():
Expand All @@ -195,9 +137,7 @@ def foo(context: AssetExecutionContext, ext: SubprocessExecutionResource):
ext.run(cmd, context=context)

resource = SubprocessExecutionResource(
input_mode=ExternalExecutionIOMode(input_mode_name),
input_path=input_path,
output_mode=ExternalExecutionIOMode(output_mode_name),
output_path=output_path,
)
with pytest.raises(CheckError, match=r"directory \S+ does not currently exist"):
Expand All @@ -222,11 +162,11 @@ def script_fn():

with temp_script(script_fn) as script_path:
cmd = ["python", script_path]
stdout, stderr = subprocess.Popen(
_, stderr = subprocess.Popen(
cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE
).communicate()
assert re.search(
r"This process was not launched by a Dagster orchestration process. All calls to the"
r" `dagster-external` are no-ops.",
r" `dagster-external` context are no-ops.",
stderr.decode(),
)
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
from dagster._core.external_execution.subprocess import (
SubprocessExecutionResource,
)
from dagster_external.protocol import ExternalExecutionIOMode
from pydantic import Field

# Add package container to path
Expand Down Expand Up @@ -58,8 +57,6 @@ def number_sum(context: AssetExecutionContext, ext: SubprocessExecutionResource)


ext = SubprocessExecutionResource(
input_mode=ExternalExecutionIOMode.stdio,
output_mode=ExternalExecutionIOMode.stdio,
env=get_env(),
)

Expand Down
Loading