Skip to content

Commit

Permalink
blocking check factory method
Browse files Browse the repository at this point in the history
  • Loading branch information
johannkm committed Sep 19, 2023
1 parent 373510e commit 58aa295
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 5 deletions.
59 changes: 58 additions & 1 deletion python_modules/dagster/dagster/_core/definitions/asset_checks.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,15 @@
from typing import Any, Dict, Iterable, Iterator, Mapping, NamedTuple, Optional, Sequence, Set
from typing import (
TYPE_CHECKING,
Any,
Dict,
Iterable,
Iterator,
Mapping,
NamedTuple,
Optional,
Sequence,
Set,
)

from dagster import _check as check
from dagster._annotations import experimental, public
Expand All @@ -13,6 +24,9 @@
merge_resource_defs,
)

if TYPE_CHECKING:
from dagster._core.definitions.assets import AssetsDefinition


@experimental
class AssetChecksDefinitionInputOutputProps(NamedTuple):
Expand Down Expand Up @@ -127,3 +141,46 @@ def get_attributes_dict(self) -> Dict[str, Any]:
specs=self._specs,
input_output_props=self._input_output_props,
)


@experimental
def build_blocking_asset_check(
asset: "AssetsDefinition",
checks: Sequence[AssetChecksDefinition],
):
from dagster import In, graph_asset, op, Output

# key_prefix=asset.key[0:-1] if len(asset.key) > 1 else None,
asset_key = "blocking_" + asset.key.path[-1]

check_specs = []
for c in checks:
check_specs.extend(c.specs)

check_output_names = [c.get_python_identifier() for c in check_specs]

@op(ins={"materialization": In(Any), "check_evaluations": In(Any)})
def fan_in_checks_and_materialization(context, materialization, check_evaluations):
yield Output(materialization)

for result in check_evaluations:
if not result.success:
raise Exception("Check failed")

@graph_asset(
name=asset_key,
check_specs=check_specs,
)
def blocking_asset():
asset_result = asset.op()
check_evaluations = [check.node_def(asset_result) for check in checks]

return {
"result": fan_in_checks_and_materialization(asset_result, check_evaluations),
**{
check_output_name: check_result
for check_output_name, check_result in zip(check_output_names, check_evaluations)
},
}

return blocking_asset
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ def _process_user_event(
output_name = step_context.job_def.asset_layer.get_output_name_for_asset_check(
asset_check_evaluation.asset_check_handle
)
output = Output(value=None, output_name=output_name)
output = Output(value=asset_check_evaluation, output_name=output_name)

yield asset_check_evaluation

Expand Down Expand Up @@ -696,9 +696,12 @@ def _store_output(
manager_metadata: Dict[str, MetadataValue] = {}

# don't store asset check outputs
if step_context.step.step_output_named(
step_output_handle.output_name
).properties.asset_check_handle:
if (
step_context.step.step_output_named(
step_output_handle.output_name
).properties.asset_check_handle
and False
): # disable

def _no_op() -> Iterator[DagsterEvent]:
yield from ()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
from dagster import AssetKey, Definitions, ExecuteInProcessResult, asset, asset_check
from dagster._core.definitions.asset_check_result import AssetCheckResult
from dagster._core.definitions.asset_checks import build_blocking_asset_check


def execute_assets_and_checks(
assets=None, asset_checks=None, raise_on_error: bool = True, resources=None, instance=None
) -> ExecuteInProcessResult:
defs = Definitions(assets=assets, asset_checks=asset_checks, resources=resources)
job_def = defs.get_implicit_global_asset_job_def()
return job_def.execute_in_process(raise_on_error=raise_on_error, instance=instance)


def test_blocking():
@asset
def my_asset():
pass

@asset_check(asset="blocking_my_asset")
def pass_check():
return AssetCheckResult(success=True, check_name="pass_check")

@asset_check(asset="blocking_my_asset")
def fail_check():
return AssetCheckResult(success=False, check_name="fail_check")

blocking_asset = build_blocking_asset_check(asset=my_asset, checks=[pass_check, fail_check])

@asset(deps=[blocking_asset])
def downstream_asset():
pass

result = execute_assets_and_checks(
assets=[blocking_asset, downstream_asset], raise_on_error=False
)
assert not result.success

check_evals = result.get_asset_check_evaluations()
assert len(check_evals) == 2
check_evals_by_name = {check_eval.check_name: check_eval for check_eval in check_evals}
assert check_evals_by_name["pass_check"].success
assert check_evals_by_name["pass_check"].asset_key == AssetKey(["blocking_my_asset"])
assert not check_evals_by_name["fail_check"].success
assert check_evals_by_name["fail_check"].asset_key == AssetKey(["blocking_my_asset"])

# downstream asset should not have been materialized
materialization_events = result.get_asset_materialization_events()
assert len(materialization_events) == 1
assert materialization_events[0].asset_key == AssetKey(["blocking_my_asset"])

0 comments on commit 58aa295

Please sign in to comment.