-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
blocking check factory method #16612
Conversation
Current dependencies on/for this PR:
This comment was auto-generated by Graphite. |
python_modules/dagster/dagster/_core/definitions/asset_checks.py
Outdated
Show resolved
Hide resolved
@@ -131,7 +131,7 @@ def _process_user_event( | |||
output_name = step_context.job_def.asset_layer.get_output_name_for_asset_check( | |||
asset_check_evaluation.asset_check_handle | |||
) | |||
output = Output(value=None, output_name=output_name) | |||
output = Output(value=asset_check_evaluation, output_name=output_name) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can fetch the check evaluations through the event storage instead, but I turned on io managers for them here. Makes the io manager case nicer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah this is nice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
replaced with fetching via checks table for now, since that works with both the io managed and not managed cases
c0d8bd2
to
58aa295
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is quite lovely actually.
❤️ composition over cursed bools
python_modules/dagster/dagster/_core/definitions/asset_checks.py
Outdated
Show resolved
Hide resolved
python_modules/dagster/dagster/_core/definitions/asset_checks.py
Outdated
Show resolved
Hide resolved
python_modules/dagster/dagster/_core/definitions/asset_checks.py
Outdated
Show resolved
Hide resolved
@@ -131,7 +131,7 @@ def _process_user_event( | |||
output_name = step_context.job_def.asset_layer.get_output_name_for_asset_check( | |||
asset_check_evaluation.asset_check_handle | |||
) | |||
output = Output(value=None, output_name=output_name) | |||
output = Output(value=asset_check_evaluation, output_name=output_name) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah this is nice.
58aa295
to
a46586f
Compare
Deploy preview for dagster-docs ready! Preview available at https://dagster-docs-omtjanny5-elementl.vercel.app Direct link to changed pages: |
Deploy preview for dagit-core-storybook ready! ✅ Preview Built with commit cc1c7ae. |
Deploy preview for dagit-storybook ready! ✅ Preview Built with commit cc1c7ae. |
3d84229
to
f7e32d4
Compare
Deploy preview for dagster-university ready! ✅ Preview Built with commit f7e32d4. |
f7e32d4
to
01d7720
Compare
ins={name: AssetIn(key) for name, key in asset_def.keys_by_input_name.items()}, | ||
resource_defs=asset_def.resource_defs, | ||
metadata=asset_def.metadata_by_key.get(asset_def.key), | ||
freshness_policy=asset_def.freshness_policies_by_key.get(asset_def.key), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't really like this- easy to miss a field, annoying to test them all. Any recs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In terms of easy to miss a field and for future proofing, the only way I can really think of is to refactor graph_asset
to immediately call a function graph_asset_no_defaults
and then write test cases against graph_asset_no_defaults
. That way anyone who adds something to graph_asset
in the future will break all the tests, which is the desired behavior, as the right fix will undoubtedly be to change some code here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On the "test them all" front, need more context on what you need/want to test
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tests that assert these parameters are threaded through. 👍 to graph_asset_no_defaults
@graph_asset( | ||
name=asset_def.key.path[-1], | ||
key_prefix=asset_def.key.path[:-1] if len(asset_def.key.path) > 1 else None, | ||
check_specs=check_specs, | ||
description=asset_def.descriptions_by_key.get(asset_def.key), | ||
ins={name: AssetIn(key) for name, key in asset_def.keys_by_input_name.items()} | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As a follow, let's have graph_asset
support a bare key
argument just like all the other variants so we don't have to do these silly gymnastics around key_prefix
check_output_names = [c.get_python_identifier() for c in check_specs] | ||
|
||
@op(ins={"materialization": In(Any), "check_evaluations": In(Nothing)}) | ||
def fan_in_checks_and_materialization(context: OpExecutionContext, materialization): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typehint would be very helpful on materialization
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
^--- unaddressed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added "materialization": In(asset_out_type)
. Is there some fancy way to get a type var with the output type of the asset op?
python_modules/dagster/dagster/_core/execution/plan/execute_step.py
Outdated
Show resolved
Hide resolved
de4173e
to
0540761
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remaining question around the differing treatment of asset check evaluations and materializations.
check_output_names = [c.get_python_identifier() for c in check_specs] | ||
|
||
@op(ins={"materialization": In(Any), "check_evaluations": In(Nothing)}) | ||
def fan_in_checks_and_materialization(context: OpExecutionContext, materialization): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
^--- unaddressed
if execution.status != AssetCheckExecutionRecordStatus.SUCCEEDED: | ||
raise DagsterAssetCheckFailedError() | ||
|
||
def blocking_asset(**kwargs): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we not know what the arguments are ahead of time? If so, let's break it out. If we do not, leave a comment as to what these represent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
they're the inputs to the passed in asset op. commented
|
||
@op(ins={"materialization": In(asset_out_type), "check_evaluations": In(Nothing)}) | ||
def fan_in_checks_and_materialization(context: OpExecutionContext, materialization): | ||
yield Output(materialization) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is also very worthy of explanation as it is quite confusing at first blush.
You may want to consider--for no other reason that clarifying things for code readers–-a couple alternative approches:
- A wrapper class around
AssetMaterialization
that expresses the intent of the author––i.e. you––that you are shuffling this information between ops on purpose. Without more context, this looks like the author misunderstand concepts in the framework and screwed thing up - Since we ended up just fetching check evaluations from the instance anyways, just do that for the materialization as well.
I bias towards figuring out a way to make 1) work for all the events you are shuffling around via wrapper classes. However I don't have full contect on the asset check eval decisions. If there is a very good reason to do that, then let's make the materialization treatment similar.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added a comment. I think the crux is getting the correct io manager output written, not passing the events around
cc1c7ae
to
446c5d7
Compare
asset_out_type = asset_def.op.output_defs[0].dagster_type | ||
|
||
@op(ins={"asset_result": In(asset_out_type), "check_evaluations": In(Nothing)}) | ||
def fan_in_checks_and_asset_result(context: OpExecutionContext, asset_result): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are we not type-hinting asset_result
on purpose because of dagster typing machinery?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what would the hint be? The input is the output from the @asset decorated function, I don't know how to turn that in to a type variable. You can get it at runtime with op_def.get_output_annotation()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we'd have to do some very fancy generics with @asset down to the AssetsDefinition to here to get an annotation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it understood. Just typing asset_result
as Any
would be helpful. Using the term "result" is a little problematic here since we now have "Result" objects and this was the source of my confusion. I think choosing a different variable name would be helpful. asset_return_value
perhaps.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor naming thing, but in the case I think it is important because the current state is a bit confusing.
446c5d7
to
9702291
Compare
9702291
to
dbdbb56
Compare
renaming fn to |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍🏻
dbdbb56
to
03fd1a9
Compare
Hmm why? Seems an odd name for a function the returns an |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
name change kind of a big shift
I think that |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh I read that the opposite way thinking you were renaming it to build_blocking_asset_check
. You are building an asset not a check. 👍🏻
## Summary & Motivation `graph_asset`'s parameters are out-of-sync with `asset`'s on a structural level, and we do not have the discipline or the tests to keep them in sync. This PR adds an explicit `key` which is nice for users and would have made a recent PR of @johannkm's nicer, who had to do some contortions in #16612 around `key_prefix`. This diff also adds documentation for `key` on `@asset`. An upstack PR will consolidate the logic in `@asset` and `@graph_asset` around this stuff into a single helper function, since it is pretty gross, tricky code. ## How I Tested These Changes
03fd1a9
to
54abeaa
Compare
54abeaa
to
c7b6e26
Compare
c7b6e26
to
ad85f4a
Compare
Quick sketch of a factory method for building a graph asset that materializes, runs checks, and blocks downstreams if the checks fail.
Do we have other asset factory methods that I could mimic for all the other asset attributes?