Skip to content

Commit

Permalink
fix inputs
Browse files Browse the repository at this point in the history
  • Loading branch information
johannkm committed Sep 24, 2023
1 parent ace4b46 commit 01d7720
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -181,9 +181,17 @@ def fan_in_checks_and_materialization(context: OpExecutionContext, materializati
@graph_asset(
name=asset_def.key.path[-1],
key_prefix=asset_def.key.path[:-1] if len(asset_def.key.path) > 1 else None,
group_name=asset_def.group_names_by_key.get(asset_def.key),
partitions_def=asset_def.partitions_def,
check_specs=check_specs,
description=asset_def.descriptions_by_key.get(asset_def.key),
ins={name: AssetIn(key) for name, key in asset_def.keys_by_input_name.items()}
ins={name: AssetIn(key) for name, key in asset_def.keys_by_input_name.items()},
resource_defs=asset_def.resource_defs,
metadata=asset_def.metadata_by_key.get(asset_def.key),
freshness_policy=asset_def.freshness_policies_by_key.get(asset_def.key),
auto_materialize_policy=asset_def.auto_materialize_policies_by_key.get(asset_def.key),
backfill_policy=asset_def.backfill_policy,
)
def blocking_asset(**kwargs):
asset_result = asset_def.op.with_replaced_properties(name="asset_op")(**kwargs)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def execute_assets_and_checks(

@asset
def upstream_asset():
pass
return 'foo'

@asset(deps=[upstream_asset])
def my_asset():
Expand Down Expand Up @@ -98,14 +98,16 @@ def test_check_fail_and_block():

@asset
def my_asset_with_managed_input(upstream_asset):
pass
assert upstream_asset == "foo"
return "bar"



@asset_check(asset="my_asset_with_managed_input")
def fail_check_if_tagged_2(context):
def fail_check_if_tagged_2(context, my_asset_with_managed_input):
assert my_asset_with_managed_input == "bar"
return AssetCheckResult(
success=not context.has_tag("fail_check"), check_name="fail_check_if_tagged"
success=not context.has_tag("fail_check"), check_name="fail_check_if_tagged_2"
)


Expand All @@ -115,7 +117,7 @@ def fail_check_if_tagged_2(context):

@asset(ins={"input_asset": AssetIn(blocking_asset_with_managed_input.key)})
def downstream_asset_2(input_asset):
pass
assert input_asset == "bar"

def test_check_pass_with_inputs():
result = execute_assets_and_checks(
Expand All @@ -124,19 +126,17 @@ def test_check_pass_with_inputs():
assert result.success

check_evals = result.get_asset_check_evaluations()
assert len(check_evals) == 2
assert len(check_evals) == 1
check_evals_by_name = {check_eval.check_name: check_eval for check_eval in check_evals}
assert check_evals_by_name["pass_check"].success
assert check_evals_by_name["pass_check"].asset_key == AssetKey(["my_asset_with_managed_input"])
assert check_evals_by_name["fail_check_if_tagged"].success
assert check_evals_by_name["fail_check_if_tagged"].asset_key == AssetKey(["my_asset_with_managed_input"])
assert check_evals_by_name["fail_check_if_tagged_2"].success
assert check_evals_by_name["fail_check_if_tagged_2"].asset_key == AssetKey(["my_asset_with_managed_input"])

# downstream asset materializes
materialization_events = result.get_asset_materialization_events()
assert len(materialization_events) == 3
assert materialization_events[0].asset_key == AssetKey(["upstream_asset"])
assert materialization_events[1].asset_key == AssetKey(["my_asset_with_managed_input"])
assert materialization_events[2].asset_key == AssetKey(["downstream_asset"])
assert materialization_events[2].asset_key == AssetKey(["downstream_asset_2"])


def test_check_fail_and_block_with_inputs():
Expand All @@ -146,12 +146,10 @@ def test_check_fail_and_block_with_inputs():
assert not result.success

check_evals = result.get_asset_check_evaluations()
assert len(check_evals) == 2
assert len(check_evals) == 1
check_evals_by_name = {check_eval.check_name: check_eval for check_eval in check_evals}
assert check_evals_by_name["pass_check"].success
assert check_evals_by_name["pass_check"].asset_key == AssetKey(["my_asset_with_managed_input"])
assert not check_evals_by_name["fail_check_if_tagged"].success
assert check_evals_by_name["fail_check_if_tagged"].asset_key == AssetKey(["my_asset_with_managed_input"])
assert not check_evals_by_name["fail_check_if_tagged_2"].success
assert check_evals_by_name["fail_check_if_tagged_2"].asset_key == AssetKey(["my_asset_with_managed_input"])

# downstream asset should not have been materialized
materialization_events = result.get_asset_materialization_events()
Expand Down

0 comments on commit 01d7720

Please sign in to comment.