From a46586fa18b4b3f22f1b870f1f97f37c9a41e737 Mon Sep 17 00:00:00 2001 From: Johann Miller Date: Mon, 18 Sep 2023 23:47:39 -0400 Subject: [PATCH] blocking check factory method --- .../dagster/_core/definitions/asset_checks.py | 59 ++++++++++++++++++- .../_core/execution/plan/execute_step.py | 11 ++-- .../test_blocking_asset_checks.py | 49 +++++++++++++++ 3 files changed, 114 insertions(+), 5 deletions(-) create mode 100644 python_modules/dagster/dagster_tests/definitions_tests/test_blocking_asset_checks.py diff --git a/python_modules/dagster/dagster/_core/definitions/asset_checks.py b/python_modules/dagster/dagster/_core/definitions/asset_checks.py index 517f28864bf35..ca5f53a3313ab 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_checks.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_checks.py @@ -1,4 +1,15 @@ -from typing import Any, Dict, Iterable, Iterator, Mapping, NamedTuple, Optional, Sequence, Set +from typing import ( + TYPE_CHECKING, + Any, + Dict, + Iterable, + Iterator, + Mapping, + NamedTuple, + Optional, + Sequence, + Set, +) from dagster import _check as check from dagster._annotations import experimental, public @@ -13,6 +24,9 @@ merge_resource_defs, ) +if TYPE_CHECKING: + from dagster._core.definitions.assets import AssetsDefinition + @experimental class AssetChecksDefinitionInputOutputProps(NamedTuple): @@ -127,3 +141,46 @@ def get_attributes_dict(self) -> Dict[str, Any]: specs=self._specs, input_output_props=self._input_output_props, ) + + +@experimental +def build_blocking_asset_check( + asset: "AssetsDefinition", + checks: Sequence[AssetChecksDefinition], +): + from dagster import In, graph_asset, op, Output + + # key_prefix=asset.key[0:-1] if len(asset.key) > 1 else None, + asset_key = "blocking_" + asset.key.path[-1] + + check_specs = [] + for c in checks: + check_specs.extend(c.specs) + + check_output_names = [c.get_python_identifier() for c in check_specs] + + @op(ins={"materialization": In(Any), "check_evaluations": In(Any)}) + def fan_in_checks_and_materialization(context, materialization, check_evaluations): + yield Output(materialization) + + for result in check_evaluations: + if not result.success: + raise Exception("Check failed") + + @graph_asset( + name=asset_key, + check_specs=check_specs, + ) + def blocking_asset(): + asset_result = asset.op() + check_evaluations = [check.node_def(asset_result) for check in checks] + + return { + "result": fan_in_checks_and_materialization(asset_result, check_evaluations), + **{ + check_output_name: check_result + for check_output_name, check_result in zip(check_output_names, check_evaluations) + }, + } + + return blocking_asset diff --git a/python_modules/dagster/dagster/_core/execution/plan/execute_step.py b/python_modules/dagster/dagster/_core/execution/plan/execute_step.py index 2552d5f61b8d1..a01ef33c2c98e 100644 --- a/python_modules/dagster/dagster/_core/execution/plan/execute_step.py +++ b/python_modules/dagster/dagster/_core/execution/plan/execute_step.py @@ -131,7 +131,7 @@ def _process_user_event( output_name = step_context.job_def.asset_layer.get_output_name_for_asset_check( asset_check_evaluation.asset_check_handle ) - output = Output(value=None, output_name=output_name) + output = Output(value=asset_check_evaluation, output_name=output_name) yield asset_check_evaluation @@ -696,9 +696,12 @@ def _store_output( manager_metadata: Dict[str, MetadataValue] = {} # don't store asset check outputs - if step_context.step.step_output_named( - step_output_handle.output_name - ).properties.asset_check_handle: + if ( + step_context.step.step_output_named( + step_output_handle.output_name + ).properties.asset_check_handle + and False + ): # disable def _no_op() -> Iterator[DagsterEvent]: yield from () diff --git a/python_modules/dagster/dagster_tests/definitions_tests/test_blocking_asset_checks.py b/python_modules/dagster/dagster_tests/definitions_tests/test_blocking_asset_checks.py new file mode 100644 index 0000000000000..5b5fb4b0cea8f --- /dev/null +++ b/python_modules/dagster/dagster_tests/definitions_tests/test_blocking_asset_checks.py @@ -0,0 +1,49 @@ +from dagster import AssetKey, Definitions, ExecuteInProcessResult, asset, asset_check +from dagster._core.definitions.asset_check_result import AssetCheckResult +from dagster._core.definitions.asset_checks import build_blocking_asset_check + + +def execute_assets_and_checks( + assets=None, asset_checks=None, raise_on_error: bool = True, resources=None, instance=None +) -> ExecuteInProcessResult: + defs = Definitions(assets=assets, asset_checks=asset_checks, resources=resources) + job_def = defs.get_implicit_global_asset_job_def() + return job_def.execute_in_process(raise_on_error=raise_on_error, instance=instance) + + +def test_blocking(): + @asset + def my_asset(): + pass + + @asset_check(asset="blocking_my_asset") + def pass_check(): + return AssetCheckResult(success=True, check_name="pass_check") + + @asset_check(asset="blocking_my_asset") + def fail_check(): + return AssetCheckResult(success=False, check_name="fail_check") + + blocking_asset = build_blocking_asset_check(asset=my_asset, checks=[pass_check, fail_check]) + + @asset(deps=[blocking_asset]) + def downstream_asset(): + pass + + result = execute_assets_and_checks( + assets=[blocking_asset, downstream_asset], raise_on_error=False + ) + assert not result.success + + check_evals = result.get_asset_check_evaluations() + assert len(check_evals) == 2 + check_evals_by_name = {check_eval.check_name: check_eval for check_eval in check_evals} + assert check_evals_by_name["pass_check"].success + assert check_evals_by_name["pass_check"].asset_key == AssetKey(["blocking_my_asset"]) + assert not check_evals_by_name["fail_check"].success + assert check_evals_by_name["fail_check"].asset_key == AssetKey(["blocking_my_asset"]) + + # downstream asset should not have been materialized + materialization_events = result.get_asset_materialization_events() + assert len(materialization_events) == 1 + assert materialization_events[0].asset_key == AssetKey(["blocking_my_asset"])