-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add asset checks, data version to MaterializeResult #16514
Conversation
Current dependencies on/for this PR: This comment was auto-generated by Graphite. |
666971f
to
b0166c4
Compare
b0166c4
to
e9829f5
Compare
Deploy preview for dagit-core-storybook ready! ✅ Preview Built with commit 1bfe0bf. |
90f7b6f
to
bf462da
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Req'ing changes to suggest alt approach
@@ -16,6 +19,8 @@ class MaterializeResult( | |||
[ | |||
("asset_key", PublicAttr[Optional[AssetKey]]), | |||
("metadata", PublicAttr[Optional[MetadataUserInput]]), | |||
("data_version", PublicAttr[Optional[DataVersion]]), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's list data_version
after check_results
. I think we should consider that feature "more important"
def _zip_and_iterate_op_result( | ||
result: Any, context: OpExecutionContext, output_defs: Sequence[OutputDefinition] | ||
) -> Iterator[Tuple[int, Any, OutputDefinition]]: | ||
if len(output_defs) > 1: | ||
result = _validate_multi_return(context, result, output_defs) | ||
for position, (output_def, element) in enumerate(zip(output_defs, result)): | ||
expected_return_outputs = _filter_expected_output_defs(result, context, output_defs) | ||
if len(expected_return_outputs) > 1: | ||
result = _validate_multi_return(context, result, expected_return_outputs) | ||
for position, (output_def, element) in enumerate(zip(expected_return_outputs, result)): | ||
yield position, output_def, element | ||
else: | ||
yield 0, output_defs[0], result | ||
|
||
|
||
# Filter out output_defs corresponding to asset check results that already exist on a | ||
# MaterializeResult. | ||
def _filter_expected_output_defs( | ||
result: Any, context: OpExecutionContext, output_defs: Sequence[OutputDefinition] | ||
) -> Sequence[OutputDefinition]: | ||
result_tuple = ( | ||
(result,) if not isinstance(result, tuple) or is_named_tuple_instance(result) else result | ||
) | ||
materialize_results = [x for x in result_tuple if isinstance(x, MaterializeResult)] | ||
remove_outputs = [ | ||
r.get_spec_python_identifier(x.asset_key or context.asset_key) | ||
for x in materialize_results | ||
for r in x.check_results or [] | ||
] | ||
return [out for out in output_defs if out.name not in remove_outputs] | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just thought on alternative approach here. What if we unpacked MaterializeResult
right in _zip_and_iterate_op_result
and yielded the appropriate AssetCheckResult
and Output
objects right there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe doable but not trivial because the logic that currently converts MaterializeResult
to Output
in execute_step.py
accesses job_def.asset_layer
, which is not available for direct invocation, and _zip_and_iterate_op_result
is hit in direct invocation.
Recommend exploring this in the proposed followup refactor rather than this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok. Please leave comment in _zip_and_iterate_op_result
that contains all the context in your head about this problematic code path (and is appropriately apologetic in tone :-))
This is a fairly risk change and I want any future people who have to deal with this before you fix it to understand all that context.
bf462da
to
5556a49
Compare
5556a49
to
1bfe0bf
Compare
Deploy preview for dagster-docs ready! Preview available at https://dagster-docs-cso2j6eoq-elementl.vercel.app Direct link to changed pages: |
Deploy preview for dagit-storybook ready! ✅ Preview Built with commit 1bfe0bf. |
## Summary & Motivation Add `check_results` and `data_version` to `MaterializeResult`. This supports streaming asset check results back from an ext process added upstack in #16466. ## How I Tested These Changes New unit test.
## Summary & Motivation Add `check_results` and `data_version` to `MaterializeResult`. This supports streaming asset check results back from an ext process added upstack in dagster-io#16466. ## How I Tested These Changes New unit test.
Summary & Motivation
Add
check_results
anddata_version
toMaterializeResult
. This supports streaming asset check results back from an ext process added upstack in #16466.How I Tested These Changes
New unit test.