Skip to content

Commit

Permalink
Refactor ExternalExecutionTask to better support multiple environments
Browse files Browse the repository at this point in the history
  • Loading branch information
smackesey committed Aug 18, 2023
1 parent 976a79e commit 647f687
Show file tree
Hide file tree
Showing 7 changed files with 481 additions and 328 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
from dagster._core.definitions.materialize import materialize
from dagster._core.errors import DagsterExternalExecutionError
from dagster._core.execution.context.compute import AssetExecutionContext
from dagster._core.external_execution.resource import (
ExternalExecutionResource,
from dagster._core.external_execution.subprocess import (
SubprocessExecutionResource,
)
from dagster._core.instance_for_test import instance_for_test
from dagster_external.protocol import ExternalExecutionIOMode
Expand Down Expand Up @@ -73,7 +73,7 @@ def temp_script(script_fn: Callable[[], Any]) -> Iterator[str]:
("socket", "socket"),
],
)
def test_external_execution_asset(input_spec: str, output_spec: str, tmpdir, capsys):
def test_external_subprocess_asset(input_spec: str, output_spec: str, tmpdir, capsys):
if input_spec in ["stdio", "socket"]:
input_mode = ExternalExecutionIOMode(input_spec)
input_path = None
Expand Down Expand Up @@ -106,13 +106,13 @@ def script_fn():
context.report_asset_data_version("foo", "alpha")

@asset
def foo(context: AssetExecutionContext, ext: ExternalExecutionResource):
def foo(context: AssetExecutionContext, ext: SubprocessExecutionResource):
extras = {"bar": "baz"}
with temp_script(script_fn) as script_path:
cmd = ["python", script_path]
ext.run(cmd, context, extras)
ext.run(cmd, context=context, extras=extras)

resource = ExternalExecutionResource(
resource = SubprocessExecutionResource(
input_mode=input_mode,
output_mode=output_mode,
input_path=input_path,
Expand Down Expand Up @@ -140,12 +140,12 @@ def script_fn():
raise Exception("foo")

@asset
def foo(context: AssetExecutionContext, ext: ExternalExecutionResource):
def foo(context: AssetExecutionContext, ext: SubprocessExecutionResource):
with temp_script(script_fn) as script_path:
cmd = ["python", script_path]
ext.run(cmd, context)
ext.run(cmd, context=context)

resource = ExternalExecutionResource(
resource = SubprocessExecutionResource(
input_mode=ExternalExecutionIOMode.stdio,
)
with pytest.raises(DagsterExternalExecutionError):
Expand Down Expand Up @@ -174,12 +174,12 @@ def script_fn():
pass

@asset
def foo(context: AssetExecutionContext, ext: ExternalExecutionResource):
def foo(context: AssetExecutionContext, ext: SubprocessExecutionResource):
with temp_script(script_fn) as script_path:
cmd = ["python", script_path]
ext.run(cmd, context)
ext.run(cmd, context=context)

resource = ExternalExecutionResource(
resource = SubprocessExecutionResource(
input_mode=ExternalExecutionIOMode(input_mode_name),
input_path=input_path,
output_mode=ExternalExecutionIOMode(output_mode_name),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
import sys

from dagster import AssetExecutionContext, Config, Definitions, asset
from dagster._core.external_execution.resource import ExternalExecutionResource
from dagster._core.external_execution.subprocess import (
SubprocessExecutionResource,
)
from dagster_external.protocol import ExternalExecutionIOMode
from pydantic import Field

Expand Down Expand Up @@ -32,25 +34,30 @@ class NumberConfig(Config):

@asset
def number_x(
context: AssetExecutionContext, ext: ExternalExecutionResource, config: NumberConfig
context: AssetExecutionContext, ext: SubprocessExecutionResource, config: NumberConfig
) -> None:
extras = {**get_common_extras(context), "multiplier": config.multiplier}
ext.run(command_for_asset("number_x"), context, extras)
ext.run(command_for_asset("number_x"), context=context, extras=extras)


@asset
def number_y(context: AssetExecutionContext, ext: ExternalExecutionResource, config: NumberConfig):
extras = {**get_common_extras(context), "multiplier": config.multiplier}
env = {"NUMBER_Y": "4"}
ext.run(command_for_asset("number_y"), context, extras, env)
def number_y(
context: AssetExecutionContext, ext: SubprocessExecutionResource, config: NumberConfig
):
ext.run(
command_for_asset("number_y"),
context=context,
extras=get_common_extras(context),
env={"NUMBER_Y": "4"},
)


@asset(deps=[number_x, number_y])
def number_sum(context: AssetExecutionContext, ext: ExternalExecutionResource) -> None:
ext.run(command_for_asset("number_sum"), context, get_common_extras(context))
def number_sum(context: AssetExecutionContext, ext: SubprocessExecutionResource) -> None:
ext.run(command_for_asset("number_sum"), context=context, extras=get_common_extras(context))


ext = ExternalExecutionResource(
ext = SubprocessExecutionResource(
input_mode=ExternalExecutionIOMode.stdio,
output_mode=ExternalExecutionIOMode.stdio,
env=get_env(),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,23 +1,14 @@
from typing import Dict, Mapping, Optional, Sequence, Union
from abc import ABC, abstractmethod
from typing import Optional

from dagster_external.protocol import ExternalExecutionExtras, ExternalExecutionIOMode
from pydantic import Field

from dagster._config.pythonic_config import ConfigurableResource
from dagster._core.execution.context.compute import OpExecutionContext
from dagster._core.external_execution.task import (
ExternalExecutionTask,
)


class ExternalExecutionResource(ConfigurableResource):
env: Optional[Dict[str, str]] = Field(
default=None,
description="An optional dict of environment variables to pass to the subprocess.",
)
cwd: Optional[str] = Field(
default=None, description="Working directory in which to launch the subprocess command."
)
class ExternalExecutionResource(ConfigurableResource, ABC):
input_mode: ExternalExecutionIOMode = Field(default="stdio")
output_mode: ExternalExecutionIOMode = Field(default="stdio")
input_path: Optional[str] = Field(
Expand Down Expand Up @@ -47,20 +38,11 @@ class ExternalExecutionResource(ConfigurableResource):
""",
)

@abstractmethod
def run(
self,
command: Union[str, Sequence[str]],
*,
context: OpExecutionContext,
extras: Optional[ExternalExecutionExtras] = None,
env: Optional[Mapping[str, str]] = None,
) -> None:
ExternalExecutionTask(
command=command,
context=context,
extras=extras,
env={**(self.env or {}), **(env or {})},
input_mode=self.input_mode,
output_mode=self.output_mode,
input_path=self.input_path,
output_path=self.output_path,
).run()
...
228 changes: 228 additions & 0 deletions python_modules/dagster/dagster/_core/external_execution/subprocess.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
import os
from contextlib import contextmanager
from dataclasses import dataclass, field
from subprocess import Popen
from threading import Thread
from typing import ContextManager, Iterator, Mapping, Optional, Sequence, Union

from dagster_external.protocol import (
DAGSTER_EXTERNAL_ENV_KEYS,
ExternalExecutionExtras,
ExternalExecutionIOMode,
)
from pydantic import Field

import dagster._check as check
from dagster._core.errors import DagsterExternalExecutionError
from dagster._core.execution.context.compute import OpExecutionContext
from dagster._core.external_execution.resource import ExternalExecutionResource
from dagster._core.external_execution.task import (
ExternalExecutionTask,
ExternalTaskIOParams,
ExternalTaskParams,
SocketAddress,
)


@dataclass
class SubprocessTaskParams(ExternalTaskParams):
command: Sequence[str]
cwd: Optional[str] = None
env: Mapping[str, str] = field(default_factory=dict)


@dataclass
class SubprocessTaskIOParams(ExternalTaskIOParams):
stdio_fd: Optional[int] = None


class ExternalExecutionSubprocessTask(
ExternalExecutionTask[SubprocessTaskParams, SubprocessTaskIOParams]
):
def _launch(
self,
params: SubprocessTaskParams,
input_params: SubprocessTaskIOParams,
output_params: SubprocessTaskIOParams,
) -> None:
process = Popen(
params.command,
cwd=params.cwd,
stdin=input_params.stdio_fd,
stdout=output_params.stdio_fd,
env={**os.environ, **params.env, **input_params.env, **output_params.env},
)
process.wait()

if process.returncode != 0:
raise DagsterExternalExecutionError(
f"External execution process failed with code {process.returncode}"
)

# ########################
# ##### IO CONTEXT MANAGERS
# ########################

def _input_context_manager(
self, tempdir: str, sockaddr: SocketAddress
) -> ContextManager[SubprocessTaskIOParams]:
if self._input_mode == ExternalExecutionIOMode.stdio:
return self._stdio_input()
elif self._input_mode == ExternalExecutionIOMode.file:
return self._file_input(tempdir)
elif self._input_mode == ExternalExecutionIOMode.fifo:
return self._fifo_input(tempdir)
elif self._input_mode == ExternalExecutionIOMode.socket:
assert sockaddr is not None, "`sockaddr` must be set when output_mode is `socket`"
return self._socket_input(sockaddr)
else:
check.failed(f"Unsupported input mode: {self._input_mode}")

@contextmanager
def _stdio_input(self) -> Iterator[SubprocessTaskIOParams]:
read_fd, write_fd = os.pipe()
self._write_input(write_fd)
yield SubprocessTaskIOParams(stdio_fd=read_fd)
os.close(read_fd)

@contextmanager
def _file_input(self, tempdir: str) -> Iterator[SubprocessTaskIOParams]:
path = self._prepare_io_path(self._input_path, "input", tempdir)
env = {DAGSTER_EXTERNAL_ENV_KEYS["input"]: path}
try:
self._write_input(path)
yield SubprocessTaskIOParams(env=env)
finally:
if os.path.exists(path):
os.remove(path)

@contextmanager
def _fifo_input(self, tempdir: str) -> Iterator[SubprocessTaskIOParams]:
path = self._prepare_io_path(self._input_path, "input", tempdir)
if not os.path.exists(path):
os.mkfifo(path)
try:
# Use thread to prevent blocking
thread = Thread(target=self._write_input, args=(path,), daemon=True)
thread.start()
env = {DAGSTER_EXTERNAL_ENV_KEYS["input"]: path}
yield SubprocessTaskIOParams(env=env)
thread.join()
finally:
os.remove(path)

# Socket server is started/shutdown in a dedicated context manager to share logic between input
# and output.
@contextmanager
def _socket_input(self, sockaddr: SocketAddress) -> Iterator[SubprocessTaskIOParams]:
env = {
DAGSTER_EXTERNAL_ENV_KEYS["host"]: sockaddr[0],
DAGSTER_EXTERNAL_ENV_KEYS["port"]: str(sockaddr[1]),
}
yield SubprocessTaskIOParams(env=env)

def _output_context_manager(
self, tempdir: str, sockaddr: Optional[SocketAddress]
) -> ContextManager[SubprocessTaskIOParams]:
if self._output_mode == ExternalExecutionIOMode.stdio:
return self._stdio_output()
elif self._output_mode == ExternalExecutionIOMode.file:
return self._file_output(tempdir)
elif self._output_mode == ExternalExecutionIOMode.fifo:
return self._fifo_output(tempdir)
elif self._output_mode == ExternalExecutionIOMode.socket:
assert sockaddr is not None, "`sockaddr` must be set when output_mode is `socket`"
return self.socket_output(sockaddr)
else:
check.failed(f"Unsupported output mode: {self._output_mode}")

@contextmanager
def _stdio_output(self) -> Iterator[SubprocessTaskIOParams]:
read_fd, write_fd = os.pipe()
thread = self._start_output_thread(read_fd)
yield SubprocessTaskIOParams(stdio_fd=write_fd)
os.close(write_fd)
thread.join()

@contextmanager
def _file_output(self, tempdir: str) -> Iterator[SubprocessTaskIOParams]:
path = self._prepare_io_path(self._output_path, "output", tempdir)
env = {DAGSTER_EXTERNAL_ENV_KEYS["output"]: path}
try:
yield SubprocessTaskIOParams(env=env)
self._read_output(path)
finally:
os.remove(path)

@contextmanager
def _fifo_output(self, tempdir: str) -> Iterator[SubprocessTaskIOParams]:
path = self._prepare_io_path(self._output_path, "output", tempdir)
env = {DAGSTER_EXTERNAL_ENV_KEYS["output"]: path}
if not os.path.exists(path):
os.mkfifo(path)
try:
# We open a write file descriptor for a FIFO in the orchestration
# process in order to keep the FIFO open for reading. Otherwise, the
# FIFO will receive EOF after the first file descriptor writing it is
# closed in an external process. Note that `O_RDWR` is required for
# this call to succeed when no readers are yet available, and
# `O_NONBLOCK` is required to prevent the `open` call from blocking.
dummy_handle = os.open(path, os.O_RDWR | os.O_NONBLOCK)
thread = self._start_output_thread(path)
yield SubprocessTaskIOParams(env=env)
os.close(dummy_handle)
thread.join()
finally:
os.remove(path)

# Socket server is started/shutdown in a dedicated context manager to share logic between input
# and output.
@contextmanager
def socket_output(self, sockaddr: SocketAddress) -> Iterator[SubprocessTaskIOParams]:
env = {
DAGSTER_EXTERNAL_ENV_KEYS["host"]: sockaddr[0],
DAGSTER_EXTERNAL_ENV_KEYS["port"]: str(sockaddr[1]),
}
yield SubprocessTaskIOParams(env=env)

# ########################
# ##### IO ROUTINES
# ########################

def _start_output_thread(self, path_or_fd: Union[str, int]) -> Thread:
thread = Thread(target=self._read_output, args=(path_or_fd,), daemon=True)
thread.start()
return thread


class SubprocessExecutionResource(ExternalExecutionResource):
cwd: Optional[str] = Field(
default=None, description="Working directory in which to launch the subprocess command."
)
env: Optional[Mapping[str, str]] = Field(
default=None,
description="An optional dict of environment variables to pass to the subprocess.",
)

def run(
self,
command: Union[str, Sequence[str]],
*,
context: OpExecutionContext,
extras: Optional[ExternalExecutionExtras] = None,
env: Optional[Mapping[str, str]] = None,
cwd: Optional[str] = None,
) -> None:
params = SubprocessTaskParams(
command=command,
env={**(env or {}), **(self.env or {})},
cwd=(cwd or self.cwd),
)
ExternalExecutionSubprocessTask(
context=context,
extras=extras,
input_mode=self.input_mode,
output_mode=self.output_mode,
input_path=self.input_path,
output_path=self.output_path,
).run(params)
Loading

0 comments on commit 647f687

Please sign in to comment.