diff --git a/python_modules/dagster-external/dagster_external/context.py b/python_modules/dagster-external/dagster_external/context.py index 665492ea1d3af..80bdbe3cd3ac5 100644 --- a/python_modules/dagster-external/dagster_external/context.py +++ b/python_modules/dagster-external/dagster_external/context.py @@ -1,22 +1,20 @@ import atexit import json -import socket -import sys +import os import warnings from typing import Any, ClassVar, Mapping, Optional, Sequence, TextIO from typing_extensions import Self -from dagster_external.params import ExternalExecutionParams, get_external_execution_params +from dagster_external.params import get_external_execution_params from .protocol import ( + DAGSTER_EXTERNAL_ENV_KEYS, ExternalDataProvenance, ExternalExecutionContextData, - ExternalExecutionIOMode, ExternalPartitionKeyRange, ExternalTimeWindow, Notification, - SocketServerControlMessage, ) from .util import ( assert_defined_asset_property, @@ -30,75 +28,38 @@ def is_dagster_orchestration_active() -> bool: - params = get_external_execution_params() - return params.is_orchestration_active + return bool(os.getenv(DAGSTER_EXTERNAL_ENV_KEYS["is_orchestration_active"])) def init_dagster_external() -> "ExternalExecutionContext": if ExternalExecutionContext.is_initialized(): return ExternalExecutionContext.get() - params = get_external_execution_params() - if params.is_orchestration_active: - data = _read_input(params) - output_stream = _get_output_stream(params) - - # Sockets can hang without this. - if output_stream is not sys.stdout: - atexit.register(_close_stream, output_stream) + if is_dagster_orchestration_active(): + params = get_external_execution_params() + data = _read_input(params.input_path) + output_stream = _get_output_stream(params.output_path) + atexit.register(_close_stream, output_stream) context = ExternalExecutionContext(data, output_stream) else: from unittest.mock import MagicMock warnings.warn( "This process was not launched by a Dagster orchestration process. All calls to the" - " `dagster-external` are no-ops." + " `dagster-external` context are no-ops." ) context = MagicMock() ExternalExecutionContext.set(context) return context -def _read_input(params: ExternalExecutionParams) -> ExternalExecutionContextData: - if params.input_mode == ExternalExecutionIOMode.stdio: - with sys.stdin as f: - return json.load(f) - elif params.input_mode == ExternalExecutionIOMode.file: - assert params.input_path, "input_path must be set when input_mode is `file`" - with open(params.input_path, "r") as f: - return json.load(f) - elif params.input_mode == ExternalExecutionIOMode.fifo: - assert params.input_path, "input_path must be set when input_mode is `fifo`" - with open(params.input_path, "r") as f: - return json.load(f) - elif params.input_mode == ExternalExecutionIOMode.socket: - assert params.host, "host must be set when input_mode is `socket`" - assert params.port, "port must be set when input_mode is `socket`" - with socket.create_connection((params.host, params.port)) as sock: - sock.makefile("w").write(f"{SocketServerControlMessage.get_context.value}\n") - return json.loads(sock.makefile("r").readline()) - else: - raise Exception(f"Invalid input mode: {params.input_mode}") - - -def _get_output_stream(params: ExternalExecutionParams) -> TextIO: - if params.output_mode == ExternalExecutionIOMode.stdio: - return sys.stdout - elif params.output_mode == ExternalExecutionIOMode.file: - assert params.output_path, "output_path must be set when output_mode is `file`" - return open(params.output_path, "a") - elif params.output_mode == ExternalExecutionIOMode.fifo: - assert params.output_path, "output_path must be set when output_mode is `fifo`" - return open(params.output_path, "w") - elif params.output_mode == ExternalExecutionIOMode.socket: - assert params.host, "host must be set when output_mode is `socket`" - assert params.port, "port must be set when output_mode is `socket`" - sock = socket.create_connection((params.host, params.port)) - stream = sock.makefile("w") - stream.write(f"{SocketServerControlMessage.initiate_client_stream.value}\n") - return stream - else: - raise Exception(f"Invalid output mode: {params.output_mode}") +def _read_input(path: str) -> ExternalExecutionContextData: + with open(path, "r") as f: + return json.load(f) + + +def _get_output_stream(path: str) -> TextIO: + return open(path, "a") def _close_stream(stream) -> None: diff --git a/python_modules/dagster-external/dagster_external/params.py b/python_modules/dagster-external/dagster_external/params.py index f4332370c9c43..6ec50037a9bc9 100644 --- a/python_modules/dagster-external/dagster_external/params.py +++ b/python_modules/dagster-external/dagster_external/params.py @@ -1,42 +1,23 @@ import os from dataclasses import dataclass -from typing import Optional from dagster_external.protocol import ( - DAGSTER_EXTERNAL_DEFAULT_HOST, - DAGSTER_EXTERNAL_DEFAULT_INPUT_MODE, - DAGSTER_EXTERNAL_DEFAULT_OUTPUT_MODE, - DAGSTER_EXTERNAL_DEFAULT_PORT, DAGSTER_EXTERNAL_ENV_KEYS, - ExternalExecutionIOMode, ) def get_external_execution_params() -> "ExternalExecutionParams": - is_orchestration_active = bool(os.getenv("DAGSTER_EXTERNAL_IS_ORCHESTRATION_ACTIVE")) - raw_input_mode = os.getenv( - DAGSTER_EXTERNAL_ENV_KEYS["input_mode"], DAGSTER_EXTERNAL_DEFAULT_INPUT_MODE - ) - raw_output_mode = os.getenv( - DAGSTER_EXTERNAL_ENV_KEYS["output_mode"], DAGSTER_EXTERNAL_DEFAULT_OUTPUT_MODE - ) + input_path = os.getenv(DAGSTER_EXTERNAL_ENV_KEYS["input"]) + output_path = os.getenv(DAGSTER_EXTERNAL_ENV_KEYS["output"]) + assert input_path, "input_path must be set" + assert output_path, "output_path must be set" return ExternalExecutionParams( - is_orchestration_active=is_orchestration_active, - input_mode=ExternalExecutionIOMode[raw_input_mode], - output_mode=ExternalExecutionIOMode[raw_output_mode], - input_path=os.getenv(DAGSTER_EXTERNAL_ENV_KEYS["input"]), - output_path=os.getenv(DAGSTER_EXTERNAL_ENV_KEYS["output"]), - host=os.getenv(DAGSTER_EXTERNAL_ENV_KEYS["host"], DAGSTER_EXTERNAL_DEFAULT_HOST), - port=int(os.getenv(DAGSTER_EXTERNAL_ENV_KEYS["port"], DAGSTER_EXTERNAL_DEFAULT_PORT)), + input_path=input_path, + output_path=output_path, ) @dataclass class ExternalExecutionParams: - is_orchestration_active: bool - input_mode: str - output_mode: str - input_path: Optional[str] - output_path: Optional[str] - host: str - port: int + input_path: str + output_path: str diff --git a/python_modules/dagster-external/dagster_external/protocol.py b/python_modules/dagster-external/dagster_external/protocol.py index bdc5f74c5902f..dedcc64092224 100644 --- a/python_modules/dagster-external/dagster_external/protocol.py +++ b/python_modules/dagster-external/dagster_external/protocol.py @@ -1,4 +1,3 @@ -from enum import Enum from typing import Any, Mapping, Optional, Sequence from typing_extensions import Final, TypeAlias, TypedDict @@ -7,42 +6,15 @@ # ##### PARAMETERS -DAGSTER_EXTERNAL_DEFAULT_HOST: Final = "localhost" -DAGSTER_EXTERNAL_DEFAULT_PORT: Final = 9716 -DAGSTER_EXTERNAL_DEFAULT_INPUT_MODE: Final = "file" -DAGSTER_EXTERNAL_DEFAULT_OUTPUT_MODE: Final = "file" DAGSTER_EXTERNAL_DEFAULT_INPUT_FILENAME: Final = "dagster_external_input" DAGSTER_EXTERNAL_DEFAULT_OUTPUT_FILENAME: Final = "dagster_external_output" DAGSTER_EXTERNAL_ENV_KEYS: Final = { "is_orchestration_active": "DAGSTER_EXTERNAL_IS_ORCHESTRATION_ACTIVE", - "input_mode": "DAGSTER_EXTERNAL_INPUT_MODE", - "output_mode": "DAGSTER_EXTERNAL_OUTPUT_MODE", "input": "DAGSTER_EXTERNAL_INPUT", "output": "DAGSTER_EXTERNAL_OUTPUT", - "host": "DAGSTER_EXTERNAL_HOST", - "port": "DAGSTER_EXTERNAL_PORT", } -# ##### SOCKET SERVER - - -class SocketServerControlMessage(str, Enum): - shutdown = "__shutdown__" - get_context = "__get_context__" - initiate_client_stream = "__initiate_client_stream__" - - -# ##### IO MODES - - -class ExternalExecutionIOMode(str, Enum): - stdio = "stdio" - file = "file" - fifo = "fifo" - socket = "socket" - - # ##### NOTIFICATION diff --git a/python_modules/dagster-external/dagster_external_tests/test_external_execution.py b/python_modules/dagster-external/dagster_external_tests/test_external_execution.py index 98fd7136afa52..fd70a6d5f7e6b 100644 --- a/python_modules/dagster-external/dagster_external_tests/test_external_execution.py +++ b/python_modules/dagster-external/dagster_external_tests/test_external_execution.py @@ -21,7 +21,6 @@ SubprocessExecutionResource, ) from dagster._core.instance_for_test import instance_for_test -from dagster_external.protocol import ExternalExecutionIOMode @contextmanager @@ -35,68 +34,17 @@ def temp_script(script_fn: Callable[[], Any]) -> Iterator[str]: @pytest.mark.parametrize( - ["input_spec", "output_spec"], + ["input_file_spec", "output_file_spec"], [ - ("stdio", "stdio"), - ("stdio", "file/auto"), - ("stdio", "file/user"), - ("stdio", "fifo/auto"), - ("stdio", "fifo/user"), - ("stdio", "socket"), - ("file/auto", "stdio"), - ("file/auto", "file/auto"), - ("file/auto", "file/user"), - ("file/auto", "fifo/auto"), - ("file/auto", "fifo/user"), - ("file/auto", "socket"), - ("file/user", "stdio"), - ("file/user", "file/auto"), - ("file/user", "file/user"), - ("file/user", "fifo/auto"), - ("file/user", "fifo/user"), - ("file/user", "socket"), - ("fifo/auto", "stdio"), - ("fifo/auto", "file/auto"), - ("fifo/auto", "file/user"), - ("fifo/auto", "fifo/auto"), - ("fifo/auto", "fifo/user"), - ("fifo/auto", "socket"), - ("fifo/user", "stdio"), - ("fifo/user", "file/auto"), - ("fifo/user", "file/user"), - ("fifo/user", "fifo/auto"), - ("fifo/user", "fifo/user"), - ("fifo/user", "socket"), - ("socket", "stdio"), - ("socket", "file/auto"), - ("socket", "file/user"), - ("socket", "fifo/auto"), - ("socket", "fifo/user"), - ("socket", "socket"), + ("auto", "auto"), + ("auto", "user"), + ("user", "auto"), + ("user", "user"), ], ) -def test_external_subprocess_asset(input_spec: str, output_spec: str, tmpdir, capsys): - if input_spec in ["stdio", "socket"]: - input_mode = ExternalExecutionIOMode(input_spec) - input_path = None - else: - input_mode_spec, input_path_spec = input_spec.split("/") - input_mode = ExternalExecutionIOMode(input_mode_spec) - if input_path_spec == "auto": - input_path = None - else: - input_path = str(tmpdir.join("input")) - - if output_spec in ["stdio", "socket"]: - output_mode = ExternalExecutionIOMode(output_spec) - output_path = None - else: - output_mode_spec, output_path_spec = output_spec.split("/") - output_mode = ExternalExecutionIOMode(output_mode_spec) - if output_path_spec == "auto": - output_path = None - else: - output_path = str(tmpdir.join("output")) +def test_external_subprocess_asset(input_file_spec: str, output_file_spec: str, tmpdir, capsys): + input_path = None if input_file_spec == "auto" else str(tmpdir.join("input")) + output_path = None if output_file_spec == "auto" else str(tmpdir.join("output")) def script_fn(): from dagster_external import ExternalExecutionContext, init_dagster_external @@ -115,8 +63,6 @@ def foo(context: AssetExecutionContext, ext: SubprocessExecutionResource): ext.run(cmd, context=context, extras=extras) resource = SubprocessExecutionResource( - input_mode=input_mode, - output_mode=output_mode, input_path=input_path, output_path=output_path, ) @@ -171,18 +117,14 @@ def foo(context: AssetExecutionContext, ext: SubprocessExecutionResource): @pytest.mark.parametrize( - ["input_mode_name", "input_path", "output_mode_name", "output_path"], + ["input_path", "output_path"], [ - ("file", PATH_WITH_NONEXISTENT_DIR, "stdio", None), - ("fifo", PATH_WITH_NONEXISTENT_DIR, "stdio", None), - ("stdio", None, "file", PATH_WITH_NONEXISTENT_DIR), - ("stdio", None, "fifo", PATH_WITH_NONEXISTENT_DIR), + (PATH_WITH_NONEXISTENT_DIR, None), + (None, PATH_WITH_NONEXISTENT_DIR), ], ) def test_external_execution_invalid_path( - input_mode_name: str, input_path: Optional[str], - output_mode_name: str, output_path: Optional[str], ): def script_fn(): @@ -195,9 +137,7 @@ def foo(context: AssetExecutionContext, ext: SubprocessExecutionResource): ext.run(cmd, context=context) resource = SubprocessExecutionResource( - input_mode=ExternalExecutionIOMode(input_mode_name), input_path=input_path, - output_mode=ExternalExecutionIOMode(output_mode_name), output_path=output_path, ) with pytest.raises(CheckError, match=r"directory \S+ does not currently exist"): @@ -222,11 +162,11 @@ def script_fn(): with temp_script(script_fn) as script_path: cmd = ["python", script_path] - stdout, stderr = subprocess.Popen( + _, stderr = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE ).communicate() assert re.search( r"This process was not launched by a Dagster orchestration process. All calls to the" - r" `dagster-external` are no-ops.", + r" `dagster-external` context are no-ops.", stderr.decode(), ) diff --git a/python_modules/dagster-test/dagster_test/toys/external_execution/__init__.py b/python_modules/dagster-test/dagster_test/toys/external_execution/__init__.py index 57982d0473616..ac9a6c97fc6c0 100644 --- a/python_modules/dagster-test/dagster_test/toys/external_execution/__init__.py +++ b/python_modules/dagster-test/dagster_test/toys/external_execution/__init__.py @@ -5,7 +5,6 @@ from dagster._core.external_execution.subprocess import ( SubprocessExecutionResource, ) -from dagster_external.protocol import ExternalExecutionIOMode from pydantic import Field # Add package container to path @@ -58,8 +57,6 @@ def number_sum(context: AssetExecutionContext, ext: SubprocessExecutionResource) ext = SubprocessExecutionResource( - input_mode=ExternalExecutionIOMode.stdio, - output_mode=ExternalExecutionIOMode.stdio, env=get_env(), ) diff --git a/python_modules/dagster/dagster/_core/external_execution/subprocess.py b/python_modules/dagster/dagster/_core/external_execution/subprocess.py index 2d5d510003414..912e80113e72d 100644 --- a/python_modules/dagster/dagster/_core/external_execution/subprocess.py +++ b/python_modules/dagster/dagster/_core/external_execution/subprocess.py @@ -1,33 +1,23 @@ -import json import os -import socket -import socketserver -from contextlib import contextmanager, nullcontext +from contextlib import contextmanager from dataclasses import dataclass, field from subprocess import Popen -from threading import Event, Thread +from threading import Event from typing import ContextManager, Iterator, Mapping, Optional, Sequence, Union from dagster_external.protocol import ( - DAGSTER_EXTERNAL_DEFAULT_HOST, - DAGSTER_EXTERNAL_DEFAULT_PORT, DAGSTER_EXTERNAL_ENV_KEYS, ExternalExecutionExtras, - ExternalExecutionIOMode, - SocketServerControlMessage, ) from pydantic import Field -import dagster._check as check from dagster._core.errors import DagsterExternalExecutionError from dagster._core.execution.context.compute import OpExecutionContext -from dagster._core.external_execution.context import build_external_execution_context from dagster._core.external_execution.resource import ExternalExecutionResource from dagster._core.external_execution.task import ( ExternalExecutionTask, ExternalTaskIOParams, ExternalTaskParams, - SocketAddress, ) @@ -35,8 +25,6 @@ class SubprocessTaskParams(ExternalTaskParams): command: Sequence[str] cwd: Optional[str] = None - input_mode: ExternalExecutionIOMode = ExternalExecutionIOMode.stdio - output_mode: ExternalExecutionIOMode = ExternalExecutionIOMode.stdio socket_server_host: Optional[str] = None socket_server_port: Optional[int] = None env: Mapping[str, str] = field(default_factory=dict) @@ -55,35 +43,24 @@ def _launch( input_params: SubprocessTaskIOParams, output_params: SubprocessTaskIOParams, ) -> None: - socket_server_context = ( - self._socket_server_context_manager(params) - if ExternalExecutionIOMode.socket in (params.input_mode, params.output_mode) - else nullcontext() + process = Popen( + params.command, + cwd=params.cwd, + stdin=input_params.stdio_fd, + stdout=output_params.stdio_fd, + env={ + **base_env, + **params.env, + **input_params.env, + **output_params.env, + }, ) - io_mode_env = { - DAGSTER_EXTERNAL_ENV_KEYS["input_mode"]: params.input_mode.value, - DAGSTER_EXTERNAL_ENV_KEYS["output_mode"]: params.output_mode.value, - } - with socket_server_context: - process = Popen( - params.command, - cwd=params.cwd, - stdin=input_params.stdio_fd, - stdout=output_params.stdio_fd, - env={ - **base_env, - **io_mode_env, - **params.env, - **input_params.env, - **output_params.env, - }, - ) - process.wait() + process.wait() - if process.returncode != 0: - raise DagsterExternalExecutionError( - f"External execution process failed with code {process.returncode}" - ) + if process.returncode != 0: + raise DagsterExternalExecutionError( + f"External execution process failed with code {process.returncode}" + ) # ######################## # ##### IO CONTEXT MANAGERS @@ -92,24 +69,7 @@ def _launch( def _input_context_manager( self, tempdir: str, params: SubprocessTaskParams ) -> ContextManager[SubprocessTaskIOParams]: - if params.input_mode == ExternalExecutionIOMode.stdio: - return self._stdio_input() - elif params.input_mode == ExternalExecutionIOMode.file: - return self._file_input(tempdir) - elif params.input_mode == ExternalExecutionIOMode.fifo: - return self._fifo_input(tempdir) - elif params.input_mode == ExternalExecutionIOMode.socket: - sockaddr = self._sockaddr_from_params(params) - return self._socket_input(sockaddr) - else: - check.failed(f"Unsupported input mode: {params.input_mode}") - - @contextmanager - def _stdio_input(self) -> Iterator[SubprocessTaskIOParams]: - read_fd, write_fd = os.pipe() - self._write_input(write_fd) - yield SubprocessTaskIOParams(stdio_fd=read_fd) - os.close(read_fd) + return self._file_input(tempdir) @contextmanager def _file_input(self, tempdir: str) -> Iterator[SubprocessTaskIOParams]: @@ -122,59 +82,10 @@ def _file_input(self, tempdir: str) -> Iterator[SubprocessTaskIOParams]: if os.path.exists(path): os.remove(path) - @contextmanager - def _fifo_input(self, tempdir: str) -> Iterator[SubprocessTaskIOParams]: - path = self._prepare_io_path(self._input_path, "input", tempdir) - if not os.path.exists(path): - os.mkfifo(path) - try: - # Use thread to prevent blocking - thread = Thread(target=self._write_input, args=(path,), daemon=True) - thread.start() - env = {DAGSTER_EXTERNAL_ENV_KEYS["input"]: path} - yield SubprocessTaskIOParams(env=env) - thread.join() - finally: - os.remove(path) - - # Socket server is started/shutdown in a dedicated context manager to share logic between input - # and output. - @contextmanager - def _socket_input(self, sockaddr: SocketAddress) -> Iterator[SubprocessTaskIOParams]: - env = { - DAGSTER_EXTERNAL_ENV_KEYS["host"]: sockaddr[0], - DAGSTER_EXTERNAL_ENV_KEYS["port"]: str(sockaddr[1]), - } - yield SubprocessTaskIOParams(env=env) - def _output_context_manager( self, tempdir: str, params: SubprocessTaskParams ) -> ContextManager[SubprocessTaskIOParams]: - if params.output_mode == ExternalExecutionIOMode.stdio: - return self._stdio_output() - elif params.output_mode == ExternalExecutionIOMode.file: - return self._file_output(tempdir) - elif params.output_mode == ExternalExecutionIOMode.fifo: - return self._fifo_output(tempdir) - elif params.output_mode == ExternalExecutionIOMode.socket: - sockaddr = self._sockaddr_from_params(params) - return self.socket_output(sockaddr) - else: - check.failed(f"Unsupported output mode: {params.output_mode}") - - @contextmanager - def _stdio_output(self) -> Iterator[SubprocessTaskIOParams]: - read_fd, write_fd = os.pipe() - is_task_complete = Event() - thread = None - try: - thread = self._start_output_thread(read_fd, is_task_complete) - yield SubprocessTaskIOParams(stdio_fd=write_fd) - finally: - os.close(write_fd) - is_task_complete.set() - if thread: - thread.join() + return self._file_output(tempdir) @contextmanager def _file_output(self, tempdir: str) -> Iterator[SubprocessTaskIOParams]: @@ -193,119 +104,8 @@ def _file_output(self, tempdir: str) -> Iterator[SubprocessTaskIOParams]: if os.path.exists(path): os.remove(path) - @contextmanager - def _fifo_output(self, tempdir: str) -> Iterator[SubprocessTaskIOParams]: - path = self._prepare_io_path(self._output_path, "output", tempdir) - env = {DAGSTER_EXTERNAL_ENV_KEYS["output"]: path} - if not os.path.exists(path): - os.mkfifo(path) - is_task_complete = Event() - thread = None - dummy_handle = None - try: - thread = self._start_output_thread(path, is_task_complete) - dummy_handle = os.open(path, os.O_RDWR | os.O_NONBLOCK) - # We open a write file descriptor for a FIFO in the orchestration - # process in order to keep the FIFO open for reading. Otherwise, the - # FIFO will receive EOF after the first file descriptor writing it is - # closed in an external process. Note that `O_RDWR` is required for - # this call to succeed when no readers are yet available, and - # `O_NONBLOCK` is required to prevent the `open` call from blocking. - yield SubprocessTaskIOParams(env) - finally: - is_task_complete.set() - if dummy_handle: - os.close(dummy_handle) - if thread: - thread.join() - if os.path.exists(path): - os.remove(path) - - # Socket server is started/shutdown in a dedicated context manager to share logic between input - # and output. - @contextmanager - def socket_output(self, sockaddr: SocketAddress) -> Iterator[SubprocessTaskIOParams]: - env = { - DAGSTER_EXTERNAL_ENV_KEYS["host"]: sockaddr[0], - DAGSTER_EXTERNAL_ENV_KEYS["port"]: str(sockaddr[1]), - } - yield SubprocessTaskIOParams(env=env) - - # ######################## - # ##### SOCKET SERVER - # ######################## - - @contextmanager - def _socket_server_context_manager( - self, params: SubprocessTaskParams - ) -> Iterator[SocketAddress]: - sockaddr = self._sockaddr_from_params(params) - thread = None - try: - thread = self._start_socket_server_thread(sockaddr) - yield sockaddr - finally: - if thread: - self._shutdown_socket_server(sockaddr) - thread.join() - - def _start_socket_server_thread(self, sockaddr: SocketAddress) -> Thread: - is_server_started = Event() - thread = Thread( - target=self._start_socket_server, args=(sockaddr, is_server_started), daemon=True - ) - thread.start() - is_server_started.wait() - return thread - - # Only used in socket mode - def _start_socket_server(self, sockaddr: SocketAddress, is_server_started: Event) -> None: - context = build_external_execution_context(self._context, self._extras) - handle_notification = self.handle_notification - - class Handler(socketserver.StreamRequestHandler): - def handle(self): - data = self.rfile.readline().strip().decode("utf-8") - if data == SocketServerControlMessage.shutdown: - self.server.shutdown() - elif data == SocketServerControlMessage.get_context: - response_data = f"{json.dumps(context)}\n".encode("utf-8") - self.wfile.write(response_data) - elif data == SocketServerControlMessage.initiate_client_stream: - self.notification_stream_loop() - else: - raise Exception(f"Unrecognized control message: {data}") - - def notification_stream_loop(self): - while True: - data = self.rfile.readline().strip().decode("utf-8") - notification = json.loads(data) - handle_notification(notification) - - # It is essential to set `allow_reuse_address` to True here, otherwise `socket.bind` seems - # to nondeterministically block when running multiple tests using the same address in - # succession. - class Server(socketserver.ThreadingTCPServer): - allow_reuse_address = True - - with Server(sockaddr, Handler) as server: - is_server_started.set() - server.serve_forever() - - # Only used in socket mode - def _shutdown_socket_server(self, sockaddr: SocketAddress) -> None: - with socket.create_connection(sockaddr) as sock: - sock.makefile("w").write(f"{SocketServerControlMessage.shutdown.value}\n") - - def _sockaddr_from_params(self, params: SubprocessTaskParams) -> SocketAddress: - host = params.socket_server_host or DAGSTER_EXTERNAL_DEFAULT_HOST - port = params.socket_server_port or DAGSTER_EXTERNAL_DEFAULT_PORT - return (host, port) - class SubprocessExecutionResource(ExternalExecutionResource): - input_mode: ExternalExecutionIOMode = Field(default="stdio") - output_mode: ExternalExecutionIOMode = Field(default="stdio") cwd: Optional[str] = Field( default=None, description="Working directory in which to launch the subprocess command." ) @@ -327,8 +127,6 @@ def run( command=command, env={**(env or {}), **(self.env or {})}, cwd=(cwd or self.cwd), - input_mode=self.input_mode, - output_mode=self.output_mode, ) SubprocessExecutionTask( context=context,