-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[ext] Use MaterializeResult for ext_protocol #16624
Conversation
Current dependencies on/for this PR:
This comment was auto-generated by Graphite. |
a0ae406
to
df0afcb
Compare
@@ -88,6 +89,9 @@ def run( | |||
raise DagsterExternalExecutionError( | |||
f"External execution process failed with code {process.returncode}" | |||
) | |||
_ext_context = ext_context |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
did you get some error that caused to you do this re-assign? From what I understand the values from a with
statement are not scoped to the opened block, you have access to them post exit.
I think we can just do ext_context.get_materialize_results()
and have some explicit runtime error if its called too early. Might need to have ExtOrchestrationContext(ContextManager)
to do it cleanly
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
did you get some error that caused to you do this re-assign? From what I understand the values from a with statement are not scoped to the opened block, you have access to them post exit.
Hmm you're right. Not sure why I thought the yielded var became unbound.
I think we can just do ext_context.get_materialize_results() and have some explicit runtime error if its called too early. Might need to have ExtOrchestrationContext(ContextManager) to do it cleanly
I will do a quick rejigger
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might need to have ExtOrchestrationContext(ContextManager) to do it cleanly
I take this back, I think what you really care about is that the message_reader is in a post exit state. The context obj will exit before reader unless you do extra goofy shit, so you really want to just check on that thing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did the rejigger. I just made ext_protocol
set an is_task_finished
on ExtOrchestrationContext
when it exits. You are correct that we just care about the message reader, and we know the message reader has exited if ext_protocol
has.
df0afcb
to
dd06840
Compare
context_data=context_data, | ||
message_handler=msg_handler, | ||
context_injector_params=ci_params, | ||
message_reader_params=mr_params, | ||
) | ||
yield ext_context | ||
ext_context.is_task_finished = True |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
may need finally
block protection to ensure this is set on exception driven exit ? trying to think if theres a case where you exit CM via exception and still have access to ExtOrchestrationContext such that it may be called
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs a test either way to confirm behavior
@@ -313,6 +313,9 @@ def subproc_run(context: AssetExecutionContext): | |||
extras=extras, | |||
) as ext_context: | |||
subprocess.run(cmd, env=ext_context.get_external_process_env_vars(), check=False) | |||
_ext_context = ext_context |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: grab this one too
@@ -108,6 +119,7 @@ class ExtOrchestrationContext: | |||
message_handler: ExtMessageHandler | |||
context_injector_params: ExtParams | |||
message_reader_params: ExtParams | |||
is_task_finished: bool = False |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: consider a name that more explicitly maps to what were tracking versus what we believe that corresponds to ie has_exited_cm
something in that direction
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm-- something like has_exited_cm
feels very opaque to me.
Feels like we need a term for what happens in the scope of an ext_protocol
block-- maybe an "ext session"? Then it could be is_ext_session_closed
, meaning that any comms with the external process are terminated, which is what happens on ext_protocol
exit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ext_session_closed sounds great to me
What does the multi asset case look like here where we want to be able to stream materializations rather than wait for op completion? |
This makes me definitely want to do an approach more like |
1d81248
to
01edc65
Compare
We would need to provide a method on the
|
01edc65
to
5230330
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have to hop into a meeting but truly want to avoid a Union as the return type. We'll regret that. One way of doing things please.
@@ -21,7 +23,7 @@ def run( | |||
*, | |||
context: OpExecutionContext, | |||
extras: Optional[ExtExtras] = None, | |||
) -> None: ... | |||
) -> Union["MaterializeResult", Tuple["MaterializeResult", ...]]: ... |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would very much want this type signature to not be "dual state" like this. We shouldn't specialize the one versus many use case
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah this is really unfortunate and I want to avoid it. The impetus is that the framework complains if you return a 1-tuple when expecting a single result, and I wanted to be able to just return ExtClient.run
from an @asset
.
Possible solutions:
- Go with the iterator approach described above
- Adjust framework to accept a 1-tuple when expecting a single result
- Always return a tuple and require the user to unpack to a single object for a 1-tuple
- Provide a separate method
get_materialize_result
that returns a singleMaterializeResult
.
Atm I like solution (1).
metadata_value = self._resolve_metadata_value(value, type) | ||
self._context.add_output_metadata({label: metadata_value}, output_name) | ||
self._metadata.setdefault(resolved_asset_key, {})[label] = metadata_value |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
prefer a not mutative version of this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As in reassigning the whole of self._metadata
whenever a new value is added?
self._metadata = {
**self._metadata,
resolved_asset_key: {
**self._metadata.get(resolved_asset_key, {}),
label: metadata_value
}
}
IMO this is a lot harder to parse and mutation is appropriate here since we are buffering values as they come in.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would prefer more explicit and less clever code
if resolved_asset_key not in self._metadata:
self._metadata[resolved_asset_key] = {}
self._metadata[resolved_asset_key][label] = metadata_value
rather than what is currently there:
self._metadata.setdefault(resolved_asset_key, {})[label] = metadata_value
which is less code but hurts my brain
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's see what the iterator version looks like
5230330
to
8830b33
Compare
Deploy preview for dagit-storybook ready! ✅ Preview Built with commit 8830b33. |
Deploy preview for dagit-core-storybook ready! ✅ Preview Built with commit 903ce6f. |
pyproject.toml
Outdated
# We use `id` in many places and almost never want to use the python builtin. | ||
builtins-ignorelist = ["id"] | ||
# Id and type are frequently helpful as local variable or parameter names. | ||
builtins-ignorelist = ["id", "type"] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We already exclude id
, which is a Python builtin, and type
is in the same category of common and useful variable name. IMO we should also change the keyword arg on report_asset_metadata
to just type
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did not realize that we do this for id
. I'm not convinced that is a good idea either. I would rather not make this policy decision coupled with this PR.
Deploy preview for dagster-docs ready! Preview available at https://dagster-docs-i0i5axror-elementl.vercel.app Direct link to changed pages: |
8830b33
to
bf82121
Compare
It's up. |
57b514a
to
7ccfd18
Compare
c733061
to
eafa0ea
Compare
7661113
to
5ba294c
Compare
f5469be
to
e6198f2
Compare
5ba294c
to
698d78b
Compare
e6198f2
to
7124a54
Compare
6d635e3
to
76fe8e0
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sweet. I would like @rexledesma to take and look and approve as well to ensure that this is usable in the dbt integration
@@ -185,6 +185,12 @@ def extract_message_or_forward_to_stdout(handler: "ExtMessageHandler", log_line: | |||
sys.stdout.writelines((log_line, "\n")) | |||
|
|||
|
|||
_FAIL_TO_YIELD_ERROR_MESSAGE = ( | |||
"Did you forget to `yield from ext_context.get_results()`? This should be called once after the" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/This/get_results/
7124a54
to
d18dea9
Compare
82e09c3
to
47acf2c
Compare
d18dea9
to
e9ff404
Compare
47acf2c
to
762811c
Compare
e9ff404
to
71b478f
Compare
762811c
to
9a7b448
Compare
979cbc7
to
d884d4b
Compare
d884d4b
to
f7f8147
Compare
Left a comment on the original PR that introduces the context taint: #16706 (comment). I'll have confidence in this once we have a test case for its usage against a subsetted multi asset. |
Summary & Motivation
Make ext rely on yielding
MaterializeResult
to register metadata and data version, as opposed to modifyingOpExecutionContext
. This allows results to stream back as they are reported rather than being bulk reported when computation completes.This required addition of a
report_asset_materialization
method that can be called on theExtContext
. This will queue aMaterializationResult
on the orchestration side. The queue can be cleared from theExtOrchestrationContext
at any time by callingExtOrchestrationContext.get_results
. Errors are raised if attempting materialize an asset twice or report data version/metadata after materialization.Once the
ext_protocol
block exits, any as-yet-unmaterialized assets are queued on theMessageHandler
, so that callingExtOrchestrationContext.get_results
after exit will yield all the remainingMaterializeResult
objects. Note that yielding from this method afterext_protocol
close is required to guarantee all buffered data is yielded, since there is no guarantee that all messages have been processed beforeext_protocol
completes its exit routine.To head off the confusing scenario where a user forgets to yield outside the block and sees auto-created materializations that lack any reported metadata, we call
set_require_typed_event_stream
on theOpExecutionContext
. This will cause an error during output processing if an expected output was not returned/yielded.How I Tested These Changes
New unit tests.