From 9bf52b9f1b8ef456860362a23ee27f0f142eca1a Mon Sep 17 00:00:00 2001 From: Johann Miller Date: Mon, 18 Sep 2023 23:47:39 -0400 Subject: [PATCH 1/5] blocking check factory method --- .../dagster/_core/definitions/asset_checks.py | 61 ++++++++++++++++++- .../dagster/_core/definitions/assets.py | 1 + .../definitions/decorators/asset_decorator.py | 5 +- .../_core/execution/plan/execute_step.py | 2 +- .../test_blocking_asset_checks.py | 49 +++++++++++++++ 5 files changed, 115 insertions(+), 3 deletions(-) create mode 100644 python_modules/dagster/dagster_tests/definitions_tests/test_blocking_asset_checks.py diff --git a/python_modules/dagster/dagster/_core/definitions/asset_checks.py b/python_modules/dagster/dagster/_core/definitions/asset_checks.py index 81d03dd1ffb49..22a0b3dca860c 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_checks.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_checks.py @@ -1,4 +1,15 @@ -from typing import Any, Dict, Iterable, Iterator, Mapping, NamedTuple, Optional, Sequence, Set +from typing import ( + TYPE_CHECKING, + Any, + Dict, + Iterable, + Iterator, + Mapping, + NamedTuple, + Optional, + Sequence, + Set, +) from dagster import _check as check from dagster._annotations import experimental, public @@ -12,6 +23,10 @@ ResourceRequirement, merge_resource_defs, ) +from dagster._core.errors import DagsterAssetCheckFailedError + +if TYPE_CHECKING: + from dagster._core.definitions.assets import AssetsDefinition @experimental @@ -127,3 +142,47 @@ def get_attributes_dict(self) -> Dict[str, Any]: specs=self._specs, input_output_props=self._input_output_props, ) + + +@experimental +def build_blocking_asset_check( + asset_def: "AssetsDefinition", + checks: Sequence[AssetChecksDefinition], +) -> "AssetsDefinition": + from dagster import In, Output, graph_asset, op + + + check_specs = [] + for c in checks: + check_specs.extend(c.specs) + + check_output_names = [c.get_python_identifier() for c in check_specs] + + @op(ins={"materialization": In(Any), "check_evaluations": In(Any)}) + def fan_in_checks_and_materialization(context, materialization, check_evaluations): + yield Output(materialization) + + for result in check_evaluations: + if not result.success: + raise DagsterAssetCheckFailedError() + + @graph_asset( + name=asset_def.key.path[-1], + key_prefix=asset_def.key.path[:-1] if len(asset_def.key.path) > 1 else None, + check_specs=check_specs, + # if we don't rename the graph, it will conflict with asset_def's Op + _graph_name=asset_def.key.to_python_identifier() + "_blocking_asset_check", + ) + def blocking_asset(): + asset_result = asset_def.op() + check_evaluations = [check.node_def(asset_result) for check in checks] + + return { + "result": fan_in_checks_and_materialization(asset_result, check_evaluations), + **{ + check_output_name: check_result + for check_output_name, check_result in zip(check_output_names, check_evaluations) + }, + } + + return blocking_asset diff --git a/python_modules/dagster/dagster/_core/definitions/assets.py b/python_modules/dagster/dagster/_core/definitions/assets.py index 390b47bf7a8fa..827e627577c72 100644 --- a/python_modules/dagster/dagster/_core/definitions/assets.py +++ b/python_modules/dagster/dagster/_core/definitions/assets.py @@ -1262,6 +1262,7 @@ def with_resources(self, resource_defs: Mapping[str, ResourceDefinition]) -> "As ) return self.__class__(**attributes_dict) + def get_attributes_dict(self) -> Dict[str, Any]: return dict( keys_by_input_name=self._keys_by_input_name, diff --git a/python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py b/python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py index 613dfc586025a..c70d23303e773 100644 --- a/python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py +++ b/python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py @@ -962,6 +962,7 @@ def graph_asset( resource_defs: Optional[Mapping[str, ResourceDefinition]] = ..., check_specs: Optional[Sequence[AssetCheckSpec]] = None, key: Optional[CoercibleToAssetKey] = None, + _graph_name: Optional[str] = None, ) -> Callable[[Callable[..., Any]], AssetsDefinition]: ... @@ -982,6 +983,7 @@ def graph_asset( resource_defs: Optional[Mapping[str, ResourceDefinition]] = None, check_specs: Optional[Sequence[AssetCheckSpec]] = None, key: Optional[CoercibleToAssetKey] = None, + _graph_name: Optional[str] = None, ) -> Union[AssetsDefinition, Callable[[Callable[..., Any]], AssetsDefinition]]: """Creates a software-defined asset that's computed using a graph of ops. @@ -1059,6 +1061,7 @@ def slack_files_table(): resource_defs=resource_defs, check_specs=check_specs, key=key, + _graph_name=_graph_name, ) else: ins = ins or {} @@ -1093,7 +1096,7 @@ def slack_files_table(): } op_graph = graph( - name=out_asset_key.to_python_identifier(), + name=_graph_name or out_asset_key.to_python_identifier(), description=description, config=config, ins={input_name: GraphIn() for _, (input_name, _) in asset_ins.items()}, diff --git a/python_modules/dagster/dagster/_core/execution/plan/execute_step.py b/python_modules/dagster/dagster/_core/execution/plan/execute_step.py index 26cfc3a7dba7f..1ecc1ced9658f 100644 --- a/python_modules/dagster/dagster/_core/execution/plan/execute_step.py +++ b/python_modules/dagster/dagster/_core/execution/plan/execute_step.py @@ -131,7 +131,7 @@ def _process_user_event( output_name = step_context.job_def.asset_layer.get_output_name_for_asset_check( asset_check_evaluation.asset_check_key ) - output = Output(value=None, output_name=output_name) + output = Output(value=asset_check_evaluation, output_name=output_name) yield asset_check_evaluation diff --git a/python_modules/dagster/dagster_tests/definitions_tests/test_blocking_asset_checks.py b/python_modules/dagster/dagster_tests/definitions_tests/test_blocking_asset_checks.py new file mode 100644 index 0000000000000..4d40a139fe7be --- /dev/null +++ b/python_modules/dagster/dagster_tests/definitions_tests/test_blocking_asset_checks.py @@ -0,0 +1,49 @@ +from dagster import AssetKey, Definitions, ExecuteInProcessResult, asset, asset_check +from dagster._core.definitions.asset_check_result import AssetCheckResult +from dagster._core.definitions.asset_checks import build_blocking_asset_check + + +def execute_assets_and_checks( + assets=None, asset_checks=None, raise_on_error: bool = True, resources=None, instance=None +) -> ExecuteInProcessResult: + defs = Definitions(assets=assets, asset_checks=asset_checks, resources=resources) + job_def = defs.get_implicit_global_asset_job_def() + return job_def.execute_in_process(raise_on_error=raise_on_error, instance=instance) + + +def test_blocking(): + @asset + def my_asset(): + pass + + @asset_check(asset="my_asset") + def pass_check(): + return AssetCheckResult(success=True, check_name="pass_check") + + @asset_check(asset="my_asset") + def fail_check(): + return AssetCheckResult(success=False, check_name="fail_check") + + blocking_asset = build_blocking_asset_check(asset_def=my_asset, checks=[pass_check, fail_check]) + + @asset(deps=[blocking_asset]) + def downstream_asset(): + pass + + result = execute_assets_and_checks( + assets=[blocking_asset, downstream_asset], raise_on_error=False + ) + assert not result.success + + check_evals = result.get_asset_check_evaluations() + assert len(check_evals) == 2 + check_evals_by_name = {check_eval.check_name: check_eval for check_eval in check_evals} + assert check_evals_by_name["pass_check"].success + assert check_evals_by_name["pass_check"].asset_key == AssetKey(["my_asset"]) + assert not check_evals_by_name["fail_check"].success + assert check_evals_by_name["fail_check"].asset_key == AssetKey(["my_asset"]) + + # downstream asset should not have been materialized + materialization_events = result.get_asset_materialization_events() + assert len(materialization_events) == 1 + assert materialization_events[0].asset_key == AssetKey(["my_asset"]) From 342c40690a1dba338ddb864f920658106a5c7d84 Mon Sep 17 00:00:00 2001 From: Johann Miller Date: Wed, 20 Sep 2023 15:56:39 -0400 Subject: [PATCH 2/5] feedback --- .../dagster/_core/definitions/asset_checks.py | 32 ++-- .../dagster/_core/definitions/assets.py | 1 - .../definitions/decorators/asset_decorator.py | 2 +- .../_core/execution/plan/execute_step.py | 2 +- .../test_blocking_asset_checks.py | 159 +++++++++++++++--- 5 files changed, 160 insertions(+), 36 deletions(-) diff --git a/python_modules/dagster/dagster/_core/definitions/asset_checks.py b/python_modules/dagster/dagster/_core/definitions/asset_checks.py index 22a0b3dca860c..176560332d1f1 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_checks.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_checks.py @@ -24,6 +24,7 @@ merge_resource_defs, ) from dagster._core.errors import DagsterAssetCheckFailedError +from dagster._core.types.dagster_type import Nothing if TYPE_CHECKING: from dagster._core.definitions.assets import AssetsDefinition @@ -149,8 +150,8 @@ def build_blocking_asset_check( asset_def: "AssetsDefinition", checks: Sequence[AssetChecksDefinition], ) -> "AssetsDefinition": - from dagster import In, Output, graph_asset, op - + from dagster import AssetIn, In, OpExecutionContext, Output, graph_asset, op + from dagster._core.storage.asset_check_execution_record import AssetCheckExecutionRecordStatus check_specs = [] for c in checks: @@ -158,23 +159,34 @@ def build_blocking_asset_check( check_output_names = [c.get_python_identifier() for c in check_specs] - @op(ins={"materialization": In(Any), "check_evaluations": In(Any)}) - def fan_in_checks_and_materialization(context, materialization, check_evaluations): + @op(ins={"materialization": In(Any), "check_evaluations": In(Nothing)}) + def fan_in_checks_and_materialization(context: OpExecutionContext, materialization): yield Output(materialization) - for result in check_evaluations: - if not result.success: + for check_spec in check_specs: + executions = context.instance.event_log_storage.get_asset_check_executions( + asset_key=asset_def.key, check_name=check_spec.name, limit=1 + ) + check.invariant( + len(executions) == 1, "Expected asset check {check_spec.name} to execute" + ) + execution = executions[0] + check.invariant( + execution.run_id == context.run_id, + "Expected asset check {check_spec.name} to execute in the current run", + ) + if execution.status != AssetCheckExecutionRecordStatus.SUCCEEDED: raise DagsterAssetCheckFailedError() @graph_asset( name=asset_def.key.path[-1], key_prefix=asset_def.key.path[:-1] if len(asset_def.key.path) > 1 else None, check_specs=check_specs, - # if we don't rename the graph, it will conflict with asset_def's Op - _graph_name=asset_def.key.to_python_identifier() + "_blocking_asset_check", + description=asset_def.descriptions_by_key.get(asset_def.key), + ins={name: AssetIn(key) for name, key in asset_def.keys_by_input_name.items()} ) - def blocking_asset(): - asset_result = asset_def.op() + def blocking_asset(**kwargs): + asset_result = asset_def.op.with_replaced_properties(name="asset_op")(**kwargs) check_evaluations = [check.node_def(asset_result) for check in checks] return { diff --git a/python_modules/dagster/dagster/_core/definitions/assets.py b/python_modules/dagster/dagster/_core/definitions/assets.py index 827e627577c72..390b47bf7a8fa 100644 --- a/python_modules/dagster/dagster/_core/definitions/assets.py +++ b/python_modules/dagster/dagster/_core/definitions/assets.py @@ -1262,7 +1262,6 @@ def with_resources(self, resource_defs: Mapping[str, ResourceDefinition]) -> "As ) return self.__class__(**attributes_dict) - def get_attributes_dict(self) -> Dict[str, Any]: return dict( keys_by_input_name=self._keys_by_input_name, diff --git a/python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py b/python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py index c70d23303e773..28ef00c1af402 100644 --- a/python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py +++ b/python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py @@ -1096,7 +1096,7 @@ def slack_files_table(): } op_graph = graph( - name=_graph_name or out_asset_key.to_python_identifier(), + name=out_asset_key.to_python_identifier(), description=description, config=config, ins={input_name: GraphIn() for _, (input_name, _) in asset_ins.items()}, diff --git a/python_modules/dagster/dagster/_core/execution/plan/execute_step.py b/python_modules/dagster/dagster/_core/execution/plan/execute_step.py index 1ecc1ced9658f..26cfc3a7dba7f 100644 --- a/python_modules/dagster/dagster/_core/execution/plan/execute_step.py +++ b/python_modules/dagster/dagster/_core/execution/plan/execute_step.py @@ -131,7 +131,7 @@ def _process_user_event( output_name = step_context.job_def.asset_layer.get_output_name_for_asset_check( asset_check_evaluation.asset_check_key ) - output = Output(value=asset_check_evaluation, output_name=output_name) + output = Output(value=None, output_name=output_name) yield asset_check_evaluation diff --git a/python_modules/dagster/dagster_tests/definitions_tests/test_blocking_asset_checks.py b/python_modules/dagster/dagster_tests/definitions_tests/test_blocking_asset_checks.py index 4d40a139fe7be..2b9fedca1846c 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/test_blocking_asset_checks.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/test_blocking_asset_checks.py @@ -1,37 +1,82 @@ -from dagster import AssetKey, Definitions, ExecuteInProcessResult, asset, asset_check -from dagster._core.definitions.asset_check_result import AssetCheckResult +from dagster import ( + AssetCheckResult, + AssetKey, + Definitions, + ExecuteInProcessResult, + asset, + asset_check, +) from dagster._core.definitions.asset_checks import build_blocking_asset_check +from dagster._core.definitions.asset_in import AssetIn def execute_assets_and_checks( - assets=None, asset_checks=None, raise_on_error: bool = True, resources=None, instance=None + assets=None, + asset_checks=None, + raise_on_error: bool = True, + resources=None, + instance=None, + tags=None, ) -> ExecuteInProcessResult: defs = Definitions(assets=assets, asset_checks=asset_checks, resources=resources) job_def = defs.get_implicit_global_asset_job_def() - return job_def.execute_in_process(raise_on_error=raise_on_error, instance=instance) + return job_def.execute_in_process(raise_on_error=raise_on_error, instance=instance, tags=tags) +@asset +def upstream_asset(): + pass -def test_blocking(): - @asset - def my_asset(): - pass +@asset(deps=[upstream_asset]) +def my_asset(): + pass - @asset_check(asset="my_asset") - def pass_check(): - return AssetCheckResult(success=True, check_name="pass_check") - @asset_check(asset="my_asset") - def fail_check(): - return AssetCheckResult(success=False, check_name="fail_check") +@asset_check(asset="my_asset") +def pass_check(): + return AssetCheckResult(success=True, check_name="pass_check") - blocking_asset = build_blocking_asset_check(asset_def=my_asset, checks=[pass_check, fail_check]) - @asset(deps=[blocking_asset]) - def downstream_asset(): - pass +@asset_check(asset="my_asset") +def fail_check_if_tagged(context): + return AssetCheckResult( + success=not context.has_tag("fail_check"), check_name="fail_check_if_tagged" + ) + + +blocking_asset = build_blocking_asset_check( + asset_def=my_asset, checks=[pass_check, fail_check_if_tagged] +) + + +@asset(deps=[blocking_asset]) +def downstream_asset(): + pass + +def test_check_pass(): + result = execute_assets_and_checks( + assets=[upstream_asset, blocking_asset, downstream_asset], raise_on_error=False + ) + assert result.success + + check_evals = result.get_asset_check_evaluations() + assert len(check_evals) == 2 + check_evals_by_name = {check_eval.check_name: check_eval for check_eval in check_evals} + assert check_evals_by_name["pass_check"].success + assert check_evals_by_name["pass_check"].asset_key == AssetKey(["my_asset"]) + assert check_evals_by_name["fail_check_if_tagged"].success + assert check_evals_by_name["fail_check_if_tagged"].asset_key == AssetKey(["my_asset"]) + + # downstream asset materializes + materialization_events = result.get_asset_materialization_events() + assert len(materialization_events) == 3 + assert materialization_events[0].asset_key == AssetKey(["upstream_asset"]) + assert materialization_events[1].asset_key == AssetKey(["my_asset"]) + assert materialization_events[2].asset_key == AssetKey(["downstream_asset"]) + +def test_check_fail_and_block(): result = execute_assets_and_checks( - assets=[blocking_asset, downstream_asset], raise_on_error=False + assets=[upstream_asset, blocking_asset, downstream_asset], raise_on_error=False, tags={"fail_check": "true"} ) assert not result.success @@ -40,10 +85,78 @@ def downstream_asset(): check_evals_by_name = {check_eval.check_name: check_eval for check_eval in check_evals} assert check_evals_by_name["pass_check"].success assert check_evals_by_name["pass_check"].asset_key == AssetKey(["my_asset"]) - assert not check_evals_by_name["fail_check"].success - assert check_evals_by_name["fail_check"].asset_key == AssetKey(["my_asset"]) + assert not check_evals_by_name["fail_check_if_tagged"].success + assert check_evals_by_name["fail_check_if_tagged"].asset_key == AssetKey(["my_asset"]) + + # downstream asset should not have been materialized + materialization_events = result.get_asset_materialization_events() + assert len(materialization_events) == 2 + assert materialization_events[0].asset_key == AssetKey(["upstream_asset"]) + assert materialization_events[1].asset_key == AssetKey(["my_asset"]) + + + +@asset +def my_asset_with_managed_input(upstream_asset): + pass + + + +@asset_check(asset="my_asset_with_managed_input") +def fail_check_if_tagged_2(context): + return AssetCheckResult( + success=not context.has_tag("fail_check"), check_name="fail_check_if_tagged" + ) + + +blocking_asset_with_managed_input = build_blocking_asset_check( + asset_def=my_asset_with_managed_input, checks=[fail_check_if_tagged_2] +) + +@asset(ins={"input_asset": AssetIn(blocking_asset_with_managed_input.key)}) +def downstream_asset_2(input_asset): + pass + +def test_check_pass_with_inputs(): + result = execute_assets_and_checks( + assets=[upstream_asset, blocking_asset_with_managed_input, downstream_asset_2], raise_on_error=False + ) + assert result.success + + check_evals = result.get_asset_check_evaluations() + assert len(check_evals) == 2 + check_evals_by_name = {check_eval.check_name: check_eval for check_eval in check_evals} + assert check_evals_by_name["pass_check"].success + assert check_evals_by_name["pass_check"].asset_key == AssetKey(["my_asset_with_managed_input"]) + assert check_evals_by_name["fail_check_if_tagged"].success + assert check_evals_by_name["fail_check_if_tagged"].asset_key == AssetKey(["my_asset_with_managed_input"]) + + # downstream asset materializes + materialization_events = result.get_asset_materialization_events() + assert len(materialization_events) == 3 + assert materialization_events[0].asset_key == AssetKey(["upstream_asset"]) + assert materialization_events[1].asset_key == AssetKey(["my_asset_with_managed_input"]) + assert materialization_events[2].asset_key == AssetKey(["downstream_asset"]) + + +def test_check_fail_and_block_with_inputs(): + result = execute_assets_and_checks( + assets=[upstream_asset, blocking_asset_with_managed_input, downstream_asset_2], raise_on_error=False, tags={"fail_check": "true"} + ) + assert not result.success + + check_evals = result.get_asset_check_evaluations() + assert len(check_evals) == 2 + check_evals_by_name = {check_eval.check_name: check_eval for check_eval in check_evals} + assert check_evals_by_name["pass_check"].success + assert check_evals_by_name["pass_check"].asset_key == AssetKey(["my_asset_with_managed_input"]) + assert not check_evals_by_name["fail_check_if_tagged"].success + assert check_evals_by_name["fail_check_if_tagged"].asset_key == AssetKey(["my_asset_with_managed_input"]) # downstream asset should not have been materialized materialization_events = result.get_asset_materialization_events() - assert len(materialization_events) == 1 - assert materialization_events[0].asset_key == AssetKey(["my_asset"]) + assert len(materialization_events) == 2 + assert materialization_events[0].asset_key == AssetKey(["upstream_asset"]) + assert materialization_events[1].asset_key == AssetKey(["my_asset_with_managed_input"]) + + From eb1f7f04383ed2420990f3bed1bbe5e367022d8d Mon Sep 17 00:00:00 2001 From: Johann Miller Date: Wed, 20 Sep 2023 18:11:02 -0400 Subject: [PATCH 3/5] fix inputs --- .../dagster/_core/definitions/asset_checks.py | 14 ++++- .../test_blocking_asset_checks.py | 54 +++++++++++-------- 2 files changed, 43 insertions(+), 25 deletions(-) diff --git a/python_modules/dagster/dagster/_core/definitions/asset_checks.py b/python_modules/dagster/dagster/_core/definitions/asset_checks.py index 176560332d1f1..d0bd4ae9da4aa 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_checks.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_checks.py @@ -159,7 +159,10 @@ def build_blocking_asset_check( check_output_names = [c.get_python_identifier() for c in check_specs] - @op(ins={"materialization": In(Any), "check_evaluations": In(Nothing)}) + check.invariant(len(asset_def.op.output_defs) == 1) + asset_out_type = asset_def.op.output_defs[0].dagster_type + + @op(ins={"materialization": In(asset_out_type), "check_evaluations": In(Nothing)}) def fan_in_checks_and_materialization(context: OpExecutionContext, materialization): yield Output(materialization) @@ -181,9 +184,16 @@ def fan_in_checks_and_materialization(context: OpExecutionContext, materializati @graph_asset( name=asset_def.key.path[-1], key_prefix=asset_def.key.path[:-1] if len(asset_def.key.path) > 1 else None, + group_name=asset_def.group_names_by_key.get(asset_def.key), + partitions_def=asset_def.partitions_def, check_specs=check_specs, description=asset_def.descriptions_by_key.get(asset_def.key), - ins={name: AssetIn(key) for name, key in asset_def.keys_by_input_name.items()} + ins={name: AssetIn(key) for name, key in asset_def.keys_by_input_name.items()}, + resource_defs=asset_def.resource_defs, + metadata=asset_def.metadata_by_key.get(asset_def.key), + freshness_policy=asset_def.freshness_policies_by_key.get(asset_def.key), + auto_materialize_policy=asset_def.auto_materialize_policies_by_key.get(asset_def.key), + backfill_policy=asset_def.backfill_policy, ) def blocking_asset(**kwargs): asset_result = asset_def.op.with_replaced_properties(name="asset_op")(**kwargs) diff --git a/python_modules/dagster/dagster_tests/definitions_tests/test_blocking_asset_checks.py b/python_modules/dagster/dagster_tests/definitions_tests/test_blocking_asset_checks.py index 2b9fedca1846c..0ef0e27ad1b91 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/test_blocking_asset_checks.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/test_blocking_asset_checks.py @@ -22,9 +22,11 @@ def execute_assets_and_checks( job_def = defs.get_implicit_global_asset_job_def() return job_def.execute_in_process(raise_on_error=raise_on_error, instance=instance, tags=tags) + @asset def upstream_asset(): - pass + return "foo" + @asset(deps=[upstream_asset]) def my_asset(): @@ -52,6 +54,7 @@ def fail_check_if_tagged(context): def downstream_asset(): pass + def test_check_pass(): result = execute_assets_and_checks( assets=[upstream_asset, blocking_asset, downstream_asset], raise_on_error=False @@ -76,7 +79,9 @@ def test_check_pass(): def test_check_fail_and_block(): result = execute_assets_and_checks( - assets=[upstream_asset, blocking_asset, downstream_asset], raise_on_error=False, tags={"fail_check": "true"} + assets=[upstream_asset, blocking_asset, downstream_asset], + raise_on_error=False, + tags={"fail_check": "true"}, ) assert not result.success @@ -95,17 +100,17 @@ def test_check_fail_and_block(): assert materialization_events[1].asset_key == AssetKey(["my_asset"]) - @asset def my_asset_with_managed_input(upstream_asset): - pass - + assert upstream_asset == "foo" + return "bar" @asset_check(asset="my_asset_with_managed_input") -def fail_check_if_tagged_2(context): +def fail_check_if_tagged_2(context, my_asset_with_managed_input): + assert my_asset_with_managed_input == "bar" return AssetCheckResult( - success=not context.has_tag("fail_check"), check_name="fail_check_if_tagged" + success=not context.has_tag("fail_check"), check_name="fail_check_if_tagged_2" ) @@ -113,50 +118,53 @@ def fail_check_if_tagged_2(context): asset_def=my_asset_with_managed_input, checks=[fail_check_if_tagged_2] ) + @asset(ins={"input_asset": AssetIn(blocking_asset_with_managed_input.key)}) def downstream_asset_2(input_asset): - pass + assert input_asset == "bar" + def test_check_pass_with_inputs(): result = execute_assets_and_checks( - assets=[upstream_asset, blocking_asset_with_managed_input, downstream_asset_2], raise_on_error=False + assets=[upstream_asset, blocking_asset_with_managed_input, downstream_asset_2], + raise_on_error=False, ) assert result.success check_evals = result.get_asset_check_evaluations() - assert len(check_evals) == 2 + assert len(check_evals) == 1 check_evals_by_name = {check_eval.check_name: check_eval for check_eval in check_evals} - assert check_evals_by_name["pass_check"].success - assert check_evals_by_name["pass_check"].asset_key == AssetKey(["my_asset_with_managed_input"]) - assert check_evals_by_name["fail_check_if_tagged"].success - assert check_evals_by_name["fail_check_if_tagged"].asset_key == AssetKey(["my_asset_with_managed_input"]) + assert check_evals_by_name["fail_check_if_tagged_2"].success + assert check_evals_by_name["fail_check_if_tagged_2"].asset_key == AssetKey( + ["my_asset_with_managed_input"] + ) # downstream asset materializes materialization_events = result.get_asset_materialization_events() assert len(materialization_events) == 3 assert materialization_events[0].asset_key == AssetKey(["upstream_asset"]) assert materialization_events[1].asset_key == AssetKey(["my_asset_with_managed_input"]) - assert materialization_events[2].asset_key == AssetKey(["downstream_asset"]) + assert materialization_events[2].asset_key == AssetKey(["downstream_asset_2"]) def test_check_fail_and_block_with_inputs(): result = execute_assets_and_checks( - assets=[upstream_asset, blocking_asset_with_managed_input, downstream_asset_2], raise_on_error=False, tags={"fail_check": "true"} + assets=[upstream_asset, blocking_asset_with_managed_input, downstream_asset_2], + raise_on_error=False, + tags={"fail_check": "true"}, ) assert not result.success check_evals = result.get_asset_check_evaluations() - assert len(check_evals) == 2 + assert len(check_evals) == 1 check_evals_by_name = {check_eval.check_name: check_eval for check_eval in check_evals} - assert check_evals_by_name["pass_check"].success - assert check_evals_by_name["pass_check"].asset_key == AssetKey(["my_asset_with_managed_input"]) - assert not check_evals_by_name["fail_check_if_tagged"].success - assert check_evals_by_name["fail_check_if_tagged"].asset_key == AssetKey(["my_asset_with_managed_input"]) + assert not check_evals_by_name["fail_check_if_tagged_2"].success + assert check_evals_by_name["fail_check_if_tagged_2"].asset_key == AssetKey( + ["my_asset_with_managed_input"] + ) # downstream asset should not have been materialized materialization_events = result.get_asset_materialization_events() assert len(materialization_events) == 2 assert materialization_events[0].asset_key == AssetKey(["upstream_asset"]) assert materialization_events[1].asset_key == AssetKey(["my_asset_with_managed_input"]) - - From 2fec27eaf7a746456557077712ec984c58526ade Mon Sep 17 00:00:00 2001 From: Johann Miller Date: Sat, 23 Sep 2023 22:19:30 -0400 Subject: [PATCH 4/5] graph_asset_no_defaults --- .../dagster/_core/definitions/asset_checks.py | 45 +++++---- .../definitions/decorators/asset_decorator.py | 98 +++++++++++++++---- .../test_blocking_asset_checks.py | 6 +- 3 files changed, 108 insertions(+), 41 deletions(-) diff --git a/python_modules/dagster/dagster/_core/definitions/asset_checks.py b/python_modules/dagster/dagster/_core/definitions/asset_checks.py index d0bd4ae9da4aa..21fbddd85eb0f 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_checks.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_checks.py @@ -146,11 +146,12 @@ def get_attributes_dict(self) -> Dict[str, Any]: @experimental -def build_blocking_asset_check( +def build_asset_with_blocking_check( asset_def: "AssetsDefinition", checks: Sequence[AssetChecksDefinition], ) -> "AssetsDefinition": - from dagster import AssetIn, In, OpExecutionContext, Output, graph_asset, op + from dagster import AssetIn, In, OpExecutionContext, Output, op + from dagster._core.definitions.decorators.asset_decorator import graph_asset_no_defaults from dagster._core.storage.asset_check_execution_record import AssetCheckExecutionRecordStatus check_specs = [] @@ -162,9 +163,13 @@ def build_blocking_asset_check( check.invariant(len(asset_def.op.output_defs) == 1) asset_out_type = asset_def.op.output_defs[0].dagster_type - @op(ins={"materialization": In(asset_out_type), "check_evaluations": In(Nothing)}) - def fan_in_checks_and_materialization(context: OpExecutionContext, materialization): - yield Output(materialization) + @op(ins={"asset_return_value": In(asset_out_type), "check_evaluations": In(Nothing)}) + def fan_in_checks_and_asset_return_value(context: OpExecutionContext, asset_return_value: Any): + # we pass the asset_return_value through and store it again so that downstream assets can load it. + # This is a little silly- we only do this because this op has the asset key in its StepOutputProperties + # so the output is written to the right path. We could probably get the asset_def.op to write to the + # asset path (and make sure we don't override it here) to avoid the double write. + yield Output(asset_return_value) for check_spec in check_specs: executions = context.instance.event_log_storage.get_asset_check_executions( @@ -181,7 +186,21 @@ def fan_in_checks_and_materialization(context: OpExecutionContext, materializati if execution.status != AssetCheckExecutionRecordStatus.SUCCEEDED: raise DagsterAssetCheckFailedError() - @graph_asset( + # kwargs are the inputs to the asset_def.op that we are wrapping + def blocking_asset(**kwargs): + asset_return_value = asset_def.op.with_replaced_properties(name="asset_op")(**kwargs) + check_evaluations = [check.node_def(asset_return_value) for check in checks] + + return { + "result": fan_in_checks_and_asset_return_value(asset_return_value, check_evaluations), + **{ + check_output_name: check_result + for check_output_name, check_result in zip(check_output_names, check_evaluations) + }, + } + + return graph_asset_no_defaults( + compose_fn=blocking_asset, name=asset_def.key.path[-1], key_prefix=asset_def.key.path[:-1] if len(asset_def.key.path) > 1 else None, group_name=asset_def.group_names_by_key.get(asset_def.key), @@ -194,17 +213,5 @@ def fan_in_checks_and_materialization(context: OpExecutionContext, materializati freshness_policy=asset_def.freshness_policies_by_key.get(asset_def.key), auto_materialize_policy=asset_def.auto_materialize_policies_by_key.get(asset_def.key), backfill_policy=asset_def.backfill_policy, + config=None, # gets config from asset_def.op ) - def blocking_asset(**kwargs): - asset_result = asset_def.op.with_replaced_properties(name="asset_op")(**kwargs) - check_evaluations = [check.node_def(asset_result) for check in checks] - - return { - "result": fan_in_checks_and_materialization(asset_result, check_evaluations), - **{ - check_output_name: check_result - for check_output_name, check_result in zip(check_output_names, check_evaluations) - }, - } - - return blocking_asset diff --git a/python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py b/python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py index 28ef00c1af402..3dd3b072a0a02 100644 --- a/python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py +++ b/python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py @@ -1095,34 +1095,94 @@ def slack_files_table(): **check_outs_by_output_name, } - op_graph = graph( - name=out_asset_key.to_python_identifier(), + return graph_asset_no_defaults( + compose_fn=compose_fn, + name=name, description=description, + ins=ins, config=config, - ins={input_name: GraphIn() for _, (input_name, _) in asset_ins.items()}, - out=combined_outs_by_output_name, - )(compose_fn) - return AssetsDefinition.from_graph( - op_graph, - keys_by_input_name=keys_by_input_name, - keys_by_output_name={"result": out_asset_key}, - partitions_def=partitions_def, - partition_mappings=partition_mappings if partition_mappings else None, + key_prefix=key_prefix, group_name=group_name, - metadata_by_output_name={"result": metadata} if metadata else None, - freshness_policies_by_output_name=( - {"result": freshness_policy} if freshness_policy else None - ), - auto_materialize_policies_by_output_name=( - {"result": auto_materialize_policy} if auto_materialize_policy else None - ), + partitions_def=partitions_def, + metadata=metadata, + freshness_policy=freshness_policy, + auto_materialize_policy=auto_materialize_policy, backfill_policy=backfill_policy, - descriptions_by_output_name={"result": description} if description else None, resource_defs=resource_defs, check_specs=check_specs, ) +def graph_asset_no_defaults( + *, + compose_fn: Callable, + name: Optional[str], + description: Optional[str], + ins: Optional[Mapping[str, AssetIn]], + config: Optional[Union[ConfigMapping, Mapping[str, Any]]], + key_prefix: Optional[CoercibleToAssetKeyPrefix], + group_name: Optional[str], + partitions_def: Optional[PartitionsDefinition], + metadata: Optional[MetadataUserInput], + freshness_policy: Optional[FreshnessPolicy], + auto_materialize_policy: Optional[AutoMaterializePolicy], + backfill_policy: Optional[BackfillPolicy], + resource_defs: Optional[Mapping[str, ResourceDefinition]], + check_specs: Optional[Sequence[AssetCheckSpec]], +) -> AssetsDefinition: + key_prefix = [key_prefix] if isinstance(key_prefix, str) else key_prefix + ins = ins or {} + asset_name = name or compose_fn.__name__ + asset_ins = build_asset_ins(compose_fn, ins or {}, set()) + out_asset_key = AssetKey(list(filter(None, [*(key_prefix or []), asset_name]))) + + keys_by_input_name = {input_name: asset_key for asset_key, (input_name, _) in asset_ins.items()} + partition_mappings = { + input_name: asset_in.partition_mapping + for input_name, asset_in in ins.items() + if asset_in.partition_mapping + } + + check_specs_by_output_name = _validate_and_assign_output_names_to_check_specs( + check_specs, [out_asset_key] + ) + check_outs_by_output_name: Mapping[str, GraphOut] = { + output_name: GraphOut() for output_name in check_specs_by_output_name.keys() + } + + combined_outs_by_output_name: Mapping = { + "result": GraphOut(), + **check_outs_by_output_name, + } + + op_graph = graph( + name=out_asset_key.to_python_identifier(), + description=description, + config=config, + ins={input_name: GraphIn() for _, (input_name, _) in asset_ins.items()}, + out=combined_outs_by_output_name, + )(compose_fn) + return AssetsDefinition.from_graph( + op_graph, + keys_by_input_name=keys_by_input_name, + keys_by_output_name={"result": out_asset_key}, + partitions_def=partitions_def, + partition_mappings=partition_mappings if partition_mappings else None, + group_name=group_name, + metadata_by_output_name={"result": metadata} if metadata else None, + freshness_policies_by_output_name=( + {"result": freshness_policy} if freshness_policy else None + ), + auto_materialize_policies_by_output_name=( + {"result": auto_materialize_policy} if auto_materialize_policy else None + ), + backfill_policy=backfill_policy, + descriptions_by_output_name={"result": description} if description else None, + resource_defs=resource_defs, + check_specs=check_specs, + ) + + def graph_multi_asset( *, outs: Mapping[str, AssetOut], diff --git a/python_modules/dagster/dagster_tests/definitions_tests/test_blocking_asset_checks.py b/python_modules/dagster/dagster_tests/definitions_tests/test_blocking_asset_checks.py index 0ef0e27ad1b91..9fab533a6fb44 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/test_blocking_asset_checks.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/test_blocking_asset_checks.py @@ -6,7 +6,7 @@ asset, asset_check, ) -from dagster._core.definitions.asset_checks import build_blocking_asset_check +from dagster._core.definitions.asset_checks import build_asset_with_blocking_check from dagster._core.definitions.asset_in import AssetIn @@ -45,7 +45,7 @@ def fail_check_if_tagged(context): ) -blocking_asset = build_blocking_asset_check( +blocking_asset = build_asset_with_blocking_check( asset_def=my_asset, checks=[pass_check, fail_check_if_tagged] ) @@ -114,7 +114,7 @@ def fail_check_if_tagged_2(context, my_asset_with_managed_input): ) -blocking_asset_with_managed_input = build_blocking_asset_check( +blocking_asset_with_managed_input = build_asset_with_blocking_check( asset_def=my_asset_with_managed_input, checks=[fail_check_if_tagged_2] ) From ad85f4a593be25dd001f969122b8346e0868fc4f Mon Sep 17 00:00:00 2001 From: Johann Miller Date: Tue, 26 Sep 2023 11:50:03 -0400 Subject: [PATCH 5/5] rebase on asset decs --- .../dagster/_core/definitions/asset_checks.py | 5 +- .../definitions/decorators/asset_decorator.py | 46 ++++--------------- 2 files changed, 12 insertions(+), 39 deletions(-) diff --git a/python_modules/dagster/dagster/_core/definitions/asset_checks.py b/python_modules/dagster/dagster/_core/definitions/asset_checks.py index 21fbddd85eb0f..20d28ad947b71 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_checks.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_checks.py @@ -201,8 +201,9 @@ def blocking_asset(**kwargs): return graph_asset_no_defaults( compose_fn=blocking_asset, - name=asset_def.key.path[-1], - key_prefix=asset_def.key.path[:-1] if len(asset_def.key.path) > 1 else None, + name=None, + key_prefix=None, + key=asset_def.key, group_name=asset_def.group_names_by_key.get(asset_def.key), partitions_def=asset_def.partitions_def, check_specs=check_specs, diff --git a/python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py b/python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py index 3dd3b072a0a02..d509b8b60983e 100644 --- a/python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py +++ b/python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py @@ -962,7 +962,6 @@ def graph_asset( resource_defs: Optional[Mapping[str, ResourceDefinition]] = ..., check_specs: Optional[Sequence[AssetCheckSpec]] = None, key: Optional[CoercibleToAssetKey] = None, - _graph_name: Optional[str] = None, ) -> Callable[[Callable[..., Any]], AssetsDefinition]: ... @@ -983,7 +982,6 @@ def graph_asset( resource_defs: Optional[Mapping[str, ResourceDefinition]] = None, check_specs: Optional[Sequence[AssetCheckSpec]] = None, key: Optional[CoercibleToAssetKey] = None, - _graph_name: Optional[str] = None, ) -> Union[AssetsDefinition, Callable[[Callable[..., Any]], AssetsDefinition]]: """Creates a software-defined asset that's computed using a graph of ops. @@ -1061,40 +1059,8 @@ def slack_files_table(): resource_defs=resource_defs, check_specs=check_specs, key=key, - _graph_name=_graph_name, ) else: - ins = ins or {} - asset_ins = build_asset_ins(compose_fn, ins or {}, set()) - out_asset_key, _asset_name = _resolve_key_and_name( - key=key, - key_prefix=key_prefix, - name=name, - decorator="@graph_asset", - fn=compose_fn, - ) - - keys_by_input_name = { - input_name: asset_key for asset_key, (input_name, _) in asset_ins.items() - } - partition_mappings = { - input_name: asset_in.partition_mapping - for input_name, asset_in in ins.items() - if asset_in.partition_mapping - } - - check_specs_by_output_name = _validate_and_assign_output_names_to_check_specs( - check_specs, [out_asset_key] - ) - check_outs_by_output_name: Mapping[str, GraphOut] = { - output_name: GraphOut() for output_name in check_specs_by_output_name.keys() - } - - combined_outs_by_output_name: Mapping = { - "result": GraphOut(), - **check_outs_by_output_name, - } - return graph_asset_no_defaults( compose_fn=compose_fn, name=name, @@ -1110,6 +1076,7 @@ def slack_files_table(): backfill_policy=backfill_policy, resource_defs=resource_defs, check_specs=check_specs, + key=key, ) @@ -1129,12 +1096,17 @@ def graph_asset_no_defaults( backfill_policy: Optional[BackfillPolicy], resource_defs: Optional[Mapping[str, ResourceDefinition]], check_specs: Optional[Sequence[AssetCheckSpec]], + key: Optional[CoercibleToAssetKey], ) -> AssetsDefinition: - key_prefix = [key_prefix] if isinstance(key_prefix, str) else key_prefix ins = ins or {} - asset_name = name or compose_fn.__name__ asset_ins = build_asset_ins(compose_fn, ins or {}, set()) - out_asset_key = AssetKey(list(filter(None, [*(key_prefix or []), asset_name]))) + out_asset_key, _asset_name = _resolve_key_and_name( + key=key, + key_prefix=key_prefix, + name=name, + decorator="@graph_asset", + fn=compose_fn, + ) keys_by_input_name = {input_name: asset_key for asset_key, (input_name, _) in asset_ins.items()} partition_mappings = {