diff --git a/python_modules/dagster/dagster/_core/definitions/asset_checks.py b/python_modules/dagster/dagster/_core/definitions/asset_checks.py index 81d03dd1ffb49..20d28ad947b71 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_checks.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_checks.py @@ -1,4 +1,15 @@ -from typing import Any, Dict, Iterable, Iterator, Mapping, NamedTuple, Optional, Sequence, Set +from typing import ( + TYPE_CHECKING, + Any, + Dict, + Iterable, + Iterator, + Mapping, + NamedTuple, + Optional, + Sequence, + Set, +) from dagster import _check as check from dagster._annotations import experimental, public @@ -12,6 +23,11 @@ ResourceRequirement, merge_resource_defs, ) +from dagster._core.errors import DagsterAssetCheckFailedError +from dagster._core.types.dagster_type import Nothing + +if TYPE_CHECKING: + from dagster._core.definitions.assets import AssetsDefinition @experimental @@ -127,3 +143,76 @@ def get_attributes_dict(self) -> Dict[str, Any]: specs=self._specs, input_output_props=self._input_output_props, ) + + +@experimental +def build_asset_with_blocking_check( + asset_def: "AssetsDefinition", + checks: Sequence[AssetChecksDefinition], +) -> "AssetsDefinition": + from dagster import AssetIn, In, OpExecutionContext, Output, op + from dagster._core.definitions.decorators.asset_decorator import graph_asset_no_defaults + from dagster._core.storage.asset_check_execution_record import AssetCheckExecutionRecordStatus + + check_specs = [] + for c in checks: + check_specs.extend(c.specs) + + check_output_names = [c.get_python_identifier() for c in check_specs] + + check.invariant(len(asset_def.op.output_defs) == 1) + asset_out_type = asset_def.op.output_defs[0].dagster_type + + @op(ins={"asset_return_value": In(asset_out_type), "check_evaluations": In(Nothing)}) + def fan_in_checks_and_asset_return_value(context: OpExecutionContext, asset_return_value: Any): + # we pass the asset_return_value through and store it again so that downstream assets can load it. + # This is a little silly- we only do this because this op has the asset key in its StepOutputProperties + # so the output is written to the right path. We could probably get the asset_def.op to write to the + # asset path (and make sure we don't override it here) to avoid the double write. + yield Output(asset_return_value) + + for check_spec in check_specs: + executions = context.instance.event_log_storage.get_asset_check_executions( + asset_key=asset_def.key, check_name=check_spec.name, limit=1 + ) + check.invariant( + len(executions) == 1, "Expected asset check {check_spec.name} to execute" + ) + execution = executions[0] + check.invariant( + execution.run_id == context.run_id, + "Expected asset check {check_spec.name} to execute in the current run", + ) + if execution.status != AssetCheckExecutionRecordStatus.SUCCEEDED: + raise DagsterAssetCheckFailedError() + + # kwargs are the inputs to the asset_def.op that we are wrapping + def blocking_asset(**kwargs): + asset_return_value = asset_def.op.with_replaced_properties(name="asset_op")(**kwargs) + check_evaluations = [check.node_def(asset_return_value) for check in checks] + + return { + "result": fan_in_checks_and_asset_return_value(asset_return_value, check_evaluations), + **{ + check_output_name: check_result + for check_output_name, check_result in zip(check_output_names, check_evaluations) + }, + } + + return graph_asset_no_defaults( + compose_fn=blocking_asset, + name=None, + key_prefix=None, + key=asset_def.key, + group_name=asset_def.group_names_by_key.get(asset_def.key), + partitions_def=asset_def.partitions_def, + check_specs=check_specs, + description=asset_def.descriptions_by_key.get(asset_def.key), + ins={name: AssetIn(key) for name, key in asset_def.keys_by_input_name.items()}, + resource_defs=asset_def.resource_defs, + metadata=asset_def.metadata_by_key.get(asset_def.key), + freshness_policy=asset_def.freshness_policies_by_key.get(asset_def.key), + auto_materialize_policy=asset_def.auto_materialize_policies_by_key.get(asset_def.key), + backfill_policy=asset_def.backfill_policy, + config=None, # gets config from asset_def.op + ) diff --git a/python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py b/python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py index 613dfc586025a..d509b8b60983e 100644 --- a/python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py +++ b/python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py @@ -1061,65 +1061,100 @@ def slack_files_table(): key=key, ) else: - ins = ins or {} - asset_ins = build_asset_ins(compose_fn, ins or {}, set()) - out_asset_key, _asset_name = _resolve_key_and_name( - key=key, - key_prefix=key_prefix, + return graph_asset_no_defaults( + compose_fn=compose_fn, name=name, - decorator="@graph_asset", - fn=compose_fn, - ) - - keys_by_input_name = { - input_name: asset_key for asset_key, (input_name, _) in asset_ins.items() - } - partition_mappings = { - input_name: asset_in.partition_mapping - for input_name, asset_in in ins.items() - if asset_in.partition_mapping - } - - check_specs_by_output_name = _validate_and_assign_output_names_to_check_specs( - check_specs, [out_asset_key] - ) - check_outs_by_output_name: Mapping[str, GraphOut] = { - output_name: GraphOut() for output_name in check_specs_by_output_name.keys() - } - - combined_outs_by_output_name: Mapping = { - "result": GraphOut(), - **check_outs_by_output_name, - } - - op_graph = graph( - name=out_asset_key.to_python_identifier(), description=description, + ins=ins, config=config, - ins={input_name: GraphIn() for _, (input_name, _) in asset_ins.items()}, - out=combined_outs_by_output_name, - )(compose_fn) - return AssetsDefinition.from_graph( - op_graph, - keys_by_input_name=keys_by_input_name, - keys_by_output_name={"result": out_asset_key}, - partitions_def=partitions_def, - partition_mappings=partition_mappings if partition_mappings else None, + key_prefix=key_prefix, group_name=group_name, - metadata_by_output_name={"result": metadata} if metadata else None, - freshness_policies_by_output_name=( - {"result": freshness_policy} if freshness_policy else None - ), - auto_materialize_policies_by_output_name=( - {"result": auto_materialize_policy} if auto_materialize_policy else None - ), + partitions_def=partitions_def, + metadata=metadata, + freshness_policy=freshness_policy, + auto_materialize_policy=auto_materialize_policy, backfill_policy=backfill_policy, - descriptions_by_output_name={"result": description} if description else None, resource_defs=resource_defs, check_specs=check_specs, + key=key, ) +def graph_asset_no_defaults( + *, + compose_fn: Callable, + name: Optional[str], + description: Optional[str], + ins: Optional[Mapping[str, AssetIn]], + config: Optional[Union[ConfigMapping, Mapping[str, Any]]], + key_prefix: Optional[CoercibleToAssetKeyPrefix], + group_name: Optional[str], + partitions_def: Optional[PartitionsDefinition], + metadata: Optional[MetadataUserInput], + freshness_policy: Optional[FreshnessPolicy], + auto_materialize_policy: Optional[AutoMaterializePolicy], + backfill_policy: Optional[BackfillPolicy], + resource_defs: Optional[Mapping[str, ResourceDefinition]], + check_specs: Optional[Sequence[AssetCheckSpec]], + key: Optional[CoercibleToAssetKey], +) -> AssetsDefinition: + ins = ins or {} + asset_ins = build_asset_ins(compose_fn, ins or {}, set()) + out_asset_key, _asset_name = _resolve_key_and_name( + key=key, + key_prefix=key_prefix, + name=name, + decorator="@graph_asset", + fn=compose_fn, + ) + + keys_by_input_name = {input_name: asset_key for asset_key, (input_name, _) in asset_ins.items()} + partition_mappings = { + input_name: asset_in.partition_mapping + for input_name, asset_in in ins.items() + if asset_in.partition_mapping + } + + check_specs_by_output_name = _validate_and_assign_output_names_to_check_specs( + check_specs, [out_asset_key] + ) + check_outs_by_output_name: Mapping[str, GraphOut] = { + output_name: GraphOut() for output_name in check_specs_by_output_name.keys() + } + + combined_outs_by_output_name: Mapping = { + "result": GraphOut(), + **check_outs_by_output_name, + } + + op_graph = graph( + name=out_asset_key.to_python_identifier(), + description=description, + config=config, + ins={input_name: GraphIn() for _, (input_name, _) in asset_ins.items()}, + out=combined_outs_by_output_name, + )(compose_fn) + return AssetsDefinition.from_graph( + op_graph, + keys_by_input_name=keys_by_input_name, + keys_by_output_name={"result": out_asset_key}, + partitions_def=partitions_def, + partition_mappings=partition_mappings if partition_mappings else None, + group_name=group_name, + metadata_by_output_name={"result": metadata} if metadata else None, + freshness_policies_by_output_name=( + {"result": freshness_policy} if freshness_policy else None + ), + auto_materialize_policies_by_output_name=( + {"result": auto_materialize_policy} if auto_materialize_policy else None + ), + backfill_policy=backfill_policy, + descriptions_by_output_name={"result": description} if description else None, + resource_defs=resource_defs, + check_specs=check_specs, + ) + + def graph_multi_asset( *, outs: Mapping[str, AssetOut], diff --git a/python_modules/dagster/dagster_tests/definitions_tests/test_blocking_asset_checks.py b/python_modules/dagster/dagster_tests/definitions_tests/test_blocking_asset_checks.py new file mode 100644 index 0000000000000..9fab533a6fb44 --- /dev/null +++ b/python_modules/dagster/dagster_tests/definitions_tests/test_blocking_asset_checks.py @@ -0,0 +1,170 @@ +from dagster import ( + AssetCheckResult, + AssetKey, + Definitions, + ExecuteInProcessResult, + asset, + asset_check, +) +from dagster._core.definitions.asset_checks import build_asset_with_blocking_check +from dagster._core.definitions.asset_in import AssetIn + + +def execute_assets_and_checks( + assets=None, + asset_checks=None, + raise_on_error: bool = True, + resources=None, + instance=None, + tags=None, +) -> ExecuteInProcessResult: + defs = Definitions(assets=assets, asset_checks=asset_checks, resources=resources) + job_def = defs.get_implicit_global_asset_job_def() + return job_def.execute_in_process(raise_on_error=raise_on_error, instance=instance, tags=tags) + + +@asset +def upstream_asset(): + return "foo" + + +@asset(deps=[upstream_asset]) +def my_asset(): + pass + + +@asset_check(asset="my_asset") +def pass_check(): + return AssetCheckResult(success=True, check_name="pass_check") + + +@asset_check(asset="my_asset") +def fail_check_if_tagged(context): + return AssetCheckResult( + success=not context.has_tag("fail_check"), check_name="fail_check_if_tagged" + ) + + +blocking_asset = build_asset_with_blocking_check( + asset_def=my_asset, checks=[pass_check, fail_check_if_tagged] +) + + +@asset(deps=[blocking_asset]) +def downstream_asset(): + pass + + +def test_check_pass(): + result = execute_assets_and_checks( + assets=[upstream_asset, blocking_asset, downstream_asset], raise_on_error=False + ) + assert result.success + + check_evals = result.get_asset_check_evaluations() + assert len(check_evals) == 2 + check_evals_by_name = {check_eval.check_name: check_eval for check_eval in check_evals} + assert check_evals_by_name["pass_check"].success + assert check_evals_by_name["pass_check"].asset_key == AssetKey(["my_asset"]) + assert check_evals_by_name["fail_check_if_tagged"].success + assert check_evals_by_name["fail_check_if_tagged"].asset_key == AssetKey(["my_asset"]) + + # downstream asset materializes + materialization_events = result.get_asset_materialization_events() + assert len(materialization_events) == 3 + assert materialization_events[0].asset_key == AssetKey(["upstream_asset"]) + assert materialization_events[1].asset_key == AssetKey(["my_asset"]) + assert materialization_events[2].asset_key == AssetKey(["downstream_asset"]) + + +def test_check_fail_and_block(): + result = execute_assets_and_checks( + assets=[upstream_asset, blocking_asset, downstream_asset], + raise_on_error=False, + tags={"fail_check": "true"}, + ) + assert not result.success + + check_evals = result.get_asset_check_evaluations() + assert len(check_evals) == 2 + check_evals_by_name = {check_eval.check_name: check_eval for check_eval in check_evals} + assert check_evals_by_name["pass_check"].success + assert check_evals_by_name["pass_check"].asset_key == AssetKey(["my_asset"]) + assert not check_evals_by_name["fail_check_if_tagged"].success + assert check_evals_by_name["fail_check_if_tagged"].asset_key == AssetKey(["my_asset"]) + + # downstream asset should not have been materialized + materialization_events = result.get_asset_materialization_events() + assert len(materialization_events) == 2 + assert materialization_events[0].asset_key == AssetKey(["upstream_asset"]) + assert materialization_events[1].asset_key == AssetKey(["my_asset"]) + + +@asset +def my_asset_with_managed_input(upstream_asset): + assert upstream_asset == "foo" + return "bar" + + +@asset_check(asset="my_asset_with_managed_input") +def fail_check_if_tagged_2(context, my_asset_with_managed_input): + assert my_asset_with_managed_input == "bar" + return AssetCheckResult( + success=not context.has_tag("fail_check"), check_name="fail_check_if_tagged_2" + ) + + +blocking_asset_with_managed_input = build_asset_with_blocking_check( + asset_def=my_asset_with_managed_input, checks=[fail_check_if_tagged_2] +) + + +@asset(ins={"input_asset": AssetIn(blocking_asset_with_managed_input.key)}) +def downstream_asset_2(input_asset): + assert input_asset == "bar" + + +def test_check_pass_with_inputs(): + result = execute_assets_and_checks( + assets=[upstream_asset, blocking_asset_with_managed_input, downstream_asset_2], + raise_on_error=False, + ) + assert result.success + + check_evals = result.get_asset_check_evaluations() + assert len(check_evals) == 1 + check_evals_by_name = {check_eval.check_name: check_eval for check_eval in check_evals} + assert check_evals_by_name["fail_check_if_tagged_2"].success + assert check_evals_by_name["fail_check_if_tagged_2"].asset_key == AssetKey( + ["my_asset_with_managed_input"] + ) + + # downstream asset materializes + materialization_events = result.get_asset_materialization_events() + assert len(materialization_events) == 3 + assert materialization_events[0].asset_key == AssetKey(["upstream_asset"]) + assert materialization_events[1].asset_key == AssetKey(["my_asset_with_managed_input"]) + assert materialization_events[2].asset_key == AssetKey(["downstream_asset_2"]) + + +def test_check_fail_and_block_with_inputs(): + result = execute_assets_and_checks( + assets=[upstream_asset, blocking_asset_with_managed_input, downstream_asset_2], + raise_on_error=False, + tags={"fail_check": "true"}, + ) + assert not result.success + + check_evals = result.get_asset_check_evaluations() + assert len(check_evals) == 1 + check_evals_by_name = {check_eval.check_name: check_eval for check_eval in check_evals} + assert not check_evals_by_name["fail_check_if_tagged_2"].success + assert check_evals_by_name["fail_check_if_tagged_2"].asset_key == AssetKey( + ["my_asset_with_managed_input"] + ) + + # downstream asset should not have been materialized + materialization_events = result.get_asset_materialization_events() + assert len(materialization_events) == 2 + assert materialization_events[0].asset_key == AssetKey(["upstream_asset"]) + assert materialization_events[1].asset_key == AssetKey(["my_asset_with_managed_input"])