-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add require_typed_event_stream to compute contexts #16706
Conversation
Current dependencies on/for this PR:
This comment was auto-generated by Graphite. |
de74f48
to
c733061
Compare
c733061
to
eafa0ea
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
comments inline
python_modules/dagster/dagster/_core/execution/plan/compute_generator.py
Outdated
Show resolved
Hide resolved
python_modules/dagster/dagster/_core/execution/context/system.py
Outdated
Show resolved
Hide resolved
f5469be
to
e6198f2
Compare
else: | ||
output_name = None | ||
if output_name: | ||
emitted_result_names.add(output_name) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would be very nice to have a universal function you can call to get output name from result object somewhere, not sure how to cover all cases or where to put it at this time.
7124a54
to
d18dea9
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this good but absolutely want @alangenfeld 's eyes on this and his approval
def test_explicit_mode_op(): | ||
@op(out={"a": Out(int), "b": Out(int)}) | ||
def explicit_mode_op(context: OpExecutionContext): | ||
context.set_require_typed_event_stream(error_message=EXTRA_ERROR_MESSAGE) | ||
|
||
with raises_missing_output_error(): | ||
wrap_op_in_graph_and_execute(explicit_mode_op) | ||
|
||
|
||
def test_explicit_mode_asset(): | ||
@asset | ||
def explicit_mode_asset(context: OpExecutionContext): | ||
context.set_require_typed_event_stream(error_message=EXTRA_ERROR_MESSAGE) | ||
pass | ||
|
||
with raises_missing_output_error(): | ||
materialize([explicit_mode_asset]) | ||
|
||
|
||
def test_explicit_mode_multi_asset(): | ||
@multi_asset(specs=[AssetSpec("foo"), AssetSpec("bar")]) | ||
def explicit_mode_multi_asset(context: OpExecutionContext): | ||
context.set_require_typed_event_stream(error_message=EXTRA_ERROR_MESSAGE) | ||
yield Output(None, output_name="foo") | ||
pass | ||
|
||
with raises_missing_output_error(): | ||
materialize([explicit_mode_multi_asset]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
think its worth having success conditions under test as well ass error conditions
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good idea, added many tests
@property | ||
def has_require_typed_event_stream(self) -> bool: | ||
return self._require_typed_event_stream |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I find has_require
odd, maybe requires_
or has_set_require_
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed to requires_typed_event_stream
output_name = step_output.output_name | ||
elif isinstance(step_output, MaterializeResult): | ||
asset_key = ( | ||
step_output.asset_key | ||
or step_context.job_def.asset_layer.asset_key_for_node(step_context.node_handle) | ||
) | ||
output_name = step_context.job_def.asset_layer.node_output_handle_for_asset( | ||
asset_key | ||
).output_name | ||
elif isinstance(step_output, AssetCheckEvaluation): | ||
output_name = step_context.job_def.asset_layer.get_output_name_for_asset_check( | ||
step_output.asset_check_handle | ||
) | ||
elif isinstance(step_output, AssetCheckResult): | ||
if step_output.asset_key and step_output.check_name: | ||
handle = AssetCheckHandle(step_output.asset_key, step_output.check_name) | ||
else: | ||
handle = step_output.to_asset_check_evaluation(step_context).asset_check_handle | ||
output_name = step_context.job_def.asset_layer.get_output_name_for_asset_check(handle) | ||
else: | ||
output_name = None | ||
if output_name: | ||
emitted_result_names.add(output_name) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is the scariest part of this PR since its not gated by the new bool, and I am skeptical that the current print a warning message is under much test coverage
elif isinstance(step_output, AssetCheckResult): | ||
if step_output.asset_key and step_output.check_name: | ||
handle = AssetCheckHandle(step_output.asset_key, step_output.check_name) | ||
else: | ||
handle = step_output.to_asset_check_evaluation(step_context).asset_check_handle | ||
output_name = step_context.job_def.asset_layer.get_output_name_for_asset_check(handle) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
theres custom logic to bypass check handle outputs on 232 to avoid printing the warning message. I think that means currently asset checks would bypass would not trigger this has_require_typed_event_stream
check? Needs test
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch, they were indeed not being caught, fixed and tests added
# Skip any return-specific validation and treat it like a generator op | ||
elif output_defs and context.has_require_typed_event_stream: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think its worth fleshing this comment out a bit, maybe just explaining more about has_require_typed_event_stream
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
d18dea9
to
e9ff404
Compare
Deploy preview for dagster-docs ready! Preview available at https://dagster-docs-cakyddxs1-elementl.vercel.app Direct link to changed pages: |
e9ff404
to
71b478f
Compare
approved in comment and alex approved
@smackesey Can we add a test case around using this API with a multi asset that supports subsetting? Otherwise, I think this API would produce incorrect errors if we integrate it against |
Summary & Motivation
Per conversation with @schrockn and @rexledesma, add a
require_typed_event_stream
switch onOpExecutionContext
. This is off by default and has to be explicitly turned on, which we do upstack inext_protocol
. The switch has two effects:The implementation here is greasy as hell and should be considered temporary. A better solution will take time since the code paths governing op results are complex.
How I Tested These Changes
New unit tests.