-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[brownfield-essentials] Refactor ExternalExecutionTask to better support multiple environments #15892
Conversation
Current dependencies on/for this PR:
This comment was auto-generated by Graphite. |
0618b7e
to
22020ee
Compare
DAGSTER_EXTERNAL_ENV_KEYS["host"]: "host.docker.internal", | ||
DAGSTER_EXTERNAL_ENV_KEYS["port"]: port, | ||
} | ||
yield DockerTaskIOParams(env=env, ports=ipc_ports) | ||
|
||
|
||
class DockerExecutionResource(ExternalExecutionResource): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
any thoughts on how to improve this layer ? Should ExternalExecutionIOMode
be shared or should we have separate mode enums? Can you compose config nicely along ConfigurableResource
inheritance or should we change tactics here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should ExternalExecutionIOMode be shared or should we have separate mode enums?
It should be shared because it is used by both the orchestration and transform process-- i.e. dagster-externals
uses it and that is shared between all orch-side adapters (what are we calling these things?)
any thoughts on how to improve this layer? ... Can you compose config nicely along ConfigurableResource inheritance or should we change tactics here
Not clear on exactly what you mean by "compose config nicely along ConfigurableResource inheritance". If the question is just whether you can add additional fields with ConfigurableResource
inheritance, the answer is yes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what I was thinking about was that the valid input/output modes differ between docker and subprocess, and ideally thats reflected in the config. I guess one answer is to sort of mirror the TaskIOParams
tree with these resources and only have the modes defined on the subprocess/docker resources
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what I was thinking about was that the valid input/output modes differ between docker and subprocess, and ideally thats reflected in the config.
Yeah this is a good point.
I think one solution is to type the resources with Literal
instead of the Enum class and just use the enum class inside our internal machinery:
class DockerExecutionResource(ExternalExecutionResource):
input_mode: Literal["file", "socket"] # overrides input mode from above
That currently doesn't work but I think it might be a minor addition to the config machinery to make it work, since that machinery does accept enums and this is effectively an inline enum.
44078a5
to
327506e
Compare
25fc082
to
7cac386
Compare
7cac386
to
647f687
Compare
Deploy preview for dagit-storybook ready! ✅ Preview Built with commit 647f687. |
Deploy preview for dagit-core-storybook ready! ✅ Preview Built with commit 647f687. |
Deploy preview for dagster-docs ready! Preview available at https://dagster-docs-cyhgxmuxu-elementl.vercel.app Direct link to changed pages: |
647f687
to
45cdf4c
Compare
733612e
to
4f3a22d
Compare
45cdf4c
to
068b9f1
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This all feels prematurely abstracted to me.
Please push the "mode" stuff to be subprocess-specific only to contain and eventually eliminate it.
Push out a quick PR to renaming ExternalExecutionResource to SubprocessExecutionResource (or introduce a trivial base class). I'd like to start writing against that API asap.
input_mode: ExternalExecutionIOMode = Field(default="stdio") | ||
output_mode: ExternalExecutionIOMode = Field(default="stdio") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the inclusion of modes on this generic class is upping my priority of getting rid of the concept entirely
@dataclass | ||
class SubprocessTaskParams(ExternalTaskParams): | ||
command: Sequence[str] | ||
cwd: Optional[str] = None | ||
env: Mapping[str, str] = field(default_factory=dict) | ||
|
||
|
||
@dataclass | ||
class SubprocessTaskIOParams(ExternalTaskIOParams): | ||
stdio_fd: Optional[int] = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
strongly agree with alex that we should not do this "family inheritance" approach.
068b9f1
to
d836ca6
Compare
fa108d2
to
b6634ea
Compare
d836ca6
to
ff4643a
Compare
42018fb
to
00d6815
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See comments. I think we should halt this approach.
IO mode removal is stacked on this PR. The simplest way to proceed is just to merge this and then IO mode removal. I am experimenting with getting away from |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
…ort multiple environments (dagster-io#15892) ## Summary & Motivation This is another iteration of the external execution API, stacking on and motivated by @alangenfeld's `DockerExecutionTask` in dagster-io#15820. The goal here is to move toward base external execution APIs that can easily be extended to different environments. - Split `ExternalExecutionTask` into: - `ExternalExecutionTaskBase`: an abstract base class that provides extension points at `_input_context_manager`, `_output_context_manager`, and `_launch`. - `ExternalExecutionSubprocessTask`: implements `ExternalExecutionTaskBase` for the subprocess case. - Refactor `DockerExecutionTask`: - Inherit from `ExternalExecutionTaskBase` - Instead of overriding `run`, override `_launch`, `_input_context_manager`, `_output_context_manager` ### Detailed explanation of current design Let's call the combination of the `dagster-external` library and associated set of orchestration-side abstractions the "Dagster External API". An "adapter" is an environment-specific implementation of the Dagster externals API. Currently there are two adapters in master: subprocess and Docker. In these PR, these are each defined with similar formatting in dedicated modules (`dagster._core.external_execution.subprocess` and `dagster_docker.external_resource`). An adapter should define the following parts: - `AdapterExecutionTask`: A class inheriting from abstract base class `ExternalExecutionTask` (e.g. `SubprocessExecutionTask`) - `AdapterExecutionResource:` A resource inheriting from `ExternalExecutionResource` (e.g. `SubprocessExecutionResource`) - `AdapterTaskParams`: A dataclass inheriting from `ExternalTaskParams` (e.g. `SubprocessTaskParams`) - `AdapterTaskIOParams` An optional dataclass inheriting from `ExternalTaskIOParams` (e.g. `SubprocessTaskIOParams`) The user-facing component of an adapter is the `AdapterExecutionResource`. This exposes a method `run` that is responsible for taking a flat list of user-provided parameters, instantiating an `AdapterExternalTask`, and calling `run` on this instance. `run` returns once the `AdapterExternalTask` has completed successfully and all associated resources have been cleaned up. `run` throws an error if `AdapterExternalTask` is not successful. In order to provide common infrastructure for all adapters, the `AdapterExecutionTask` uses different channels for parameters standardized across the Dagster Externals API ("API parameters") and those that are specific to an adapter ("adapter parameters"). This distinction is immaterial to the user, who mixes these parameters together at the level of the configuration of `AdapterResource` and call to `AdapterResource.run`. The`AdapterResource.run` method makes the distinction, partitioning the params into the API parameters and adapter parameters. The API parameters are passed to the `AdapterExecutionTask` constructor. The adapter parameters are packaged into an instance of `AdapterTaskParams` and passed to `AdapterExecutionTask.run`. The adapter task is responsible for launching the external task, supplying it with orchestration context info, and reading notifications streamed back during process execution. Most of the plumbing for this is provided by the base `ExternalExecutionTask`. Adapter tasks plug in by defining three required methods: - `_input_context_manager`: returns a context manager that wraps `_launch` and handles IPC input (i.e. writing context) to the external task. This context manager must yield an instance of `AdapterIOTaskParams`, which at a minimum contains environment variables that `dagster-external` uses to establish communication with the orchestration process. - `_output_context_manager`: returns a context manager that wraps `_launch` and handles IPC output from the external task. Just like the input context manager, this context manager also yields an instance of `AdapterIOTaskParams`. - `_launch`: launches the external process and monitors it for completion. It returns if execution was successful, throws an error if not (e.g. a subprocess with non-zero exit). It does not need to handle any I/O or associated resource cleanup-- this is done by the base-class provided plumbing and adapter input/output context managers. `_launch` receives the `AdapterTaskParams` instance provided by the calling `AdapterExecutionResource` as well as the two `AdapterIOTaskParams` instances yielded by the context managers. It may use arbitrary logic to combine these parameters to configure the external system. **NOTE**: @alangenfeld has suggested composition as an alternative to inheritance, and I'm not opposed to that. I thought about trying to implement it but the path forward wasn't clear, whereas moving to an ABC was a straightforward improvement over the status quo. We can continue iterating. ## How I Tested These Changes Existing test suite.
Summary & Motivation
This is another iteration of the external execution API, stacking on and motivated by @alangenfeld's
DockerExecutionTask
in #15820. The goal here is to move toward base external execution APIs that can easily be extended to different environments.ExternalExecutionTask
into:ExternalExecutionTaskBase
: an abstract base class that provides extension points at_input_context_manager
,_output_context_manager
, and_launch
.ExternalExecutionSubprocessTask
: implementsExternalExecutionTaskBase
for the subprocess case.DockerExecutionTask
:ExternalExecutionTaskBase
run
, override_launch
,_input_context_manager
,_output_context_manager
Detailed explanation of current design
Let's call the combination of the
dagster-external
library and associated set of orchestration-side abstractions the "Dagster External API". An "adapter" is an environment-specific implementation of the Dagster externals API. Currently there are two adapters in master: subprocess and Docker. In these PR, these are each defined with similar formatting in dedicated modules (dagster._core.external_execution.subprocess
anddagster_docker.external_resource
).An adapter should define the following parts:
AdapterExecutionTask
: A class inheriting from abstract base classExternalExecutionTask
(e.g.SubprocessExecutionTask
)AdapterExecutionResource:
A resource inheriting fromExternalExecutionResource
(e.g.SubprocessExecutionResource
)AdapterTaskParams
: A dataclass inheriting fromExternalTaskParams
(e.g.SubprocessTaskParams
)AdapterTaskIOParams
An optional dataclass inheriting fromExternalTaskIOParams
(e.g.SubprocessTaskIOParams
)The user-facing component of an adapter is the
AdapterExecutionResource
. This exposes a methodrun
that is responsible for taking a flat list of user-provided parameters, instantiating anAdapterExternalTask
, and callingrun
on this instance.run
returns once theAdapterExternalTask
has completed successfully and all associated resources have been cleaned up.run
throws an error ifAdapterExternalTask
is not successful.In order to provide common infrastructure for all adapters, the
AdapterExecutionTask
uses different channels for parameters standardized across the Dagster Externals API ("API parameters") and those that are specific to an adapter ("adapter parameters"). This distinction is immaterial to the user, who mixes these parameters together at the level of the configuration ofAdapterResource
and call toAdapterResource.run
.The
AdapterResource.run
method makes the distinction, partitioning the params into the API parameters and adapter parameters. The API parameters are passed to theAdapterExecutionTask
constructor. The adapter parameters are packaged into an instance ofAdapterTaskParams
and passed toAdapterExecutionTask.run
.The adapter task is responsible for launching the external task, supplying it with orchestration context info, and reading notifications streamed back during process execution. Most of the plumbing for this is provided by the base
ExternalExecutionTask
. Adapter tasks plug in by defining three required methods:_input_context_manager
: returns a context manager that wraps_launch
and handles IPC input (i.e. writing context) to the external task. This context manager must yield an instance ofAdapterIOTaskParams
, which at a minimum contains environment variables thatdagster-external
uses to establish communication with the orchestration process._output_context_manager
: returns a context manager that wraps_launch
and handles IPC output from the external task. Just like the input context manager, this context manager also yields an instance ofAdapterIOTaskParams
._launch
: launches the external process and monitors it for completion. It returns if execution was successful, throws an error if not (e.g. a subprocess with non-zero exit). It does not need to handle any I/O or associated resource cleanup-- this is done by the base-class provided plumbing and adapter input/output context managers._launch
receives theAdapterTaskParams
instance provided by the callingAdapterExecutionResource
as well as the twoAdapterIOTaskParams
instances yielded by the context managers. It may use arbitrary logic to combine these parameters to configure the external system.NOTE: @alangenfeld has suggested composition as an alternative to inheritance, and I'm not opposed to that. I thought about trying to implement it but the path forward wasn't clear, whereas moving to an ABC was a straightforward improvement over the status quo. We can continue iterating.
How I Tested These Changes
Existing test suite.