Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

blocking check factory method #16612

Merged
merged 5 commits into from
Sep 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 90 additions & 1 deletion python_modules/dagster/dagster/_core/definitions/asset_checks.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,15 @@
from typing import Any, Dict, Iterable, Iterator, Mapping, NamedTuple, Optional, Sequence, Set
from typing import (
TYPE_CHECKING,
Any,
Dict,
Iterable,
Iterator,
Mapping,
NamedTuple,
Optional,
Sequence,
Set,
)

from dagster import _check as check
from dagster._annotations import experimental, public
Expand All @@ -12,6 +23,11 @@
ResourceRequirement,
merge_resource_defs,
)
from dagster._core.errors import DagsterAssetCheckFailedError
from dagster._core.types.dagster_type import Nothing

if TYPE_CHECKING:
from dagster._core.definitions.assets import AssetsDefinition


@experimental
Expand Down Expand Up @@ -127,3 +143,76 @@ def get_attributes_dict(self) -> Dict[str, Any]:
specs=self._specs,
input_output_props=self._input_output_props,
)


@experimental
def build_asset_with_blocking_check(
asset_def: "AssetsDefinition",
checks: Sequence[AssetChecksDefinition],
) -> "AssetsDefinition":
from dagster import AssetIn, In, OpExecutionContext, Output, op
from dagster._core.definitions.decorators.asset_decorator import graph_asset_no_defaults
from dagster._core.storage.asset_check_execution_record import AssetCheckExecutionRecordStatus

check_specs = []
for c in checks:
check_specs.extend(c.specs)

check_output_names = [c.get_python_identifier() for c in check_specs]

check.invariant(len(asset_def.op.output_defs) == 1)
asset_out_type = asset_def.op.output_defs[0].dagster_type

@op(ins={"asset_return_value": In(asset_out_type), "check_evaluations": In(Nothing)})
def fan_in_checks_and_asset_return_value(context: OpExecutionContext, asset_return_value: Any):
# we pass the asset_return_value through and store it again so that downstream assets can load it.
# This is a little silly- we only do this because this op has the asset key in its StepOutputProperties
# so the output is written to the right path. We could probably get the asset_def.op to write to the
# asset path (and make sure we don't override it here) to avoid the double write.
yield Output(asset_return_value)

for check_spec in check_specs:
executions = context.instance.event_log_storage.get_asset_check_executions(
asset_key=asset_def.key, check_name=check_spec.name, limit=1
)
check.invariant(
len(executions) == 1, "Expected asset check {check_spec.name} to execute"
)
execution = executions[0]
check.invariant(
execution.run_id == context.run_id,
"Expected asset check {check_spec.name} to execute in the current run",
)
if execution.status != AssetCheckExecutionRecordStatus.SUCCEEDED:
raise DagsterAssetCheckFailedError()

# kwargs are the inputs to the asset_def.op that we are wrapping
def blocking_asset(**kwargs):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we not know what the arguments are ahead of time? If so, let's break it out. If we do not, leave a comment as to what these represent.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

they're the inputs to the passed in asset op. commented

asset_return_value = asset_def.op.with_replaced_properties(name="asset_op")(**kwargs)
check_evaluations = [check.node_def(asset_return_value) for check in checks]

return {
"result": fan_in_checks_and_asset_return_value(asset_return_value, check_evaluations),
**{
check_output_name: check_result
for check_output_name, check_result in zip(check_output_names, check_evaluations)
},
}

return graph_asset_no_defaults(
compose_fn=blocking_asset,
name=None,
key_prefix=None,
key=asset_def.key,
group_name=asset_def.group_names_by_key.get(asset_def.key),
partitions_def=asset_def.partitions_def,
check_specs=check_specs,
description=asset_def.descriptions_by_key.get(asset_def.key),
ins={name: AssetIn(key) for name, key in asset_def.keys_by_input_name.items()},
resource_defs=asset_def.resource_defs,
metadata=asset_def.metadata_by_key.get(asset_def.key),
freshness_policy=asset_def.freshness_policies_by_key.get(asset_def.key),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really like this- easy to miss a field, annoying to test them all. Any recs?

Copy link
Member

@schrockn schrockn Sep 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In terms of easy to miss a field and for future proofing, the only way I can really think of is to refactor graph_asset to immediately call a function graph_asset_no_defaults and then write test cases against graph_asset_no_defaults. That way anyone who adds something to graph_asset in the future will break all the tests, which is the desired behavior, as the right fix will undoubtedly be to change some code here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On the "test them all" front, need more context on what you need/want to test

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tests that assert these parameters are threaded through. 👍 to graph_asset_no_defaults

auto_materialize_policy=asset_def.auto_materialize_policies_by_key.get(asset_def.key),
backfill_policy=asset_def.backfill_policy,
config=None, # gets config from asset_def.op
)
Original file line number Diff line number Diff line change
Expand Up @@ -1061,65 +1061,100 @@ def slack_files_table():
key=key,
)
else:
ins = ins or {}
asset_ins = build_asset_ins(compose_fn, ins or {}, set())
out_asset_key, _asset_name = _resolve_key_and_name(
key=key,
key_prefix=key_prefix,
return graph_asset_no_defaults(
compose_fn=compose_fn,
name=name,
decorator="@graph_asset",
fn=compose_fn,
)

keys_by_input_name = {
input_name: asset_key for asset_key, (input_name, _) in asset_ins.items()
}
partition_mappings = {
input_name: asset_in.partition_mapping
for input_name, asset_in in ins.items()
if asset_in.partition_mapping
}

check_specs_by_output_name = _validate_and_assign_output_names_to_check_specs(
check_specs, [out_asset_key]
)
check_outs_by_output_name: Mapping[str, GraphOut] = {
output_name: GraphOut() for output_name in check_specs_by_output_name.keys()
}

combined_outs_by_output_name: Mapping = {
"result": GraphOut(),
**check_outs_by_output_name,
}

op_graph = graph(
name=out_asset_key.to_python_identifier(),
description=description,
ins=ins,
config=config,
ins={input_name: GraphIn() for _, (input_name, _) in asset_ins.items()},
out=combined_outs_by_output_name,
)(compose_fn)
return AssetsDefinition.from_graph(
op_graph,
keys_by_input_name=keys_by_input_name,
keys_by_output_name={"result": out_asset_key},
partitions_def=partitions_def,
partition_mappings=partition_mappings if partition_mappings else None,
key_prefix=key_prefix,
group_name=group_name,
metadata_by_output_name={"result": metadata} if metadata else None,
freshness_policies_by_output_name=(
{"result": freshness_policy} if freshness_policy else None
),
auto_materialize_policies_by_output_name=(
{"result": auto_materialize_policy} if auto_materialize_policy else None
),
partitions_def=partitions_def,
metadata=metadata,
freshness_policy=freshness_policy,
auto_materialize_policy=auto_materialize_policy,
backfill_policy=backfill_policy,
descriptions_by_output_name={"result": description} if description else None,
resource_defs=resource_defs,
check_specs=check_specs,
key=key,
)


def graph_asset_no_defaults(
*,
compose_fn: Callable,
name: Optional[str],
description: Optional[str],
ins: Optional[Mapping[str, AssetIn]],
config: Optional[Union[ConfigMapping, Mapping[str, Any]]],
key_prefix: Optional[CoercibleToAssetKeyPrefix],
group_name: Optional[str],
partitions_def: Optional[PartitionsDefinition],
metadata: Optional[MetadataUserInput],
freshness_policy: Optional[FreshnessPolicy],
auto_materialize_policy: Optional[AutoMaterializePolicy],
backfill_policy: Optional[BackfillPolicy],
resource_defs: Optional[Mapping[str, ResourceDefinition]],
check_specs: Optional[Sequence[AssetCheckSpec]],
key: Optional[CoercibleToAssetKey],
) -> AssetsDefinition:
ins = ins or {}
asset_ins = build_asset_ins(compose_fn, ins or {}, set())
out_asset_key, _asset_name = _resolve_key_and_name(
key=key,
key_prefix=key_prefix,
name=name,
decorator="@graph_asset",
fn=compose_fn,
)

keys_by_input_name = {input_name: asset_key for asset_key, (input_name, _) in asset_ins.items()}
partition_mappings = {
input_name: asset_in.partition_mapping
for input_name, asset_in in ins.items()
if asset_in.partition_mapping
}

check_specs_by_output_name = _validate_and_assign_output_names_to_check_specs(
check_specs, [out_asset_key]
)
check_outs_by_output_name: Mapping[str, GraphOut] = {
output_name: GraphOut() for output_name in check_specs_by_output_name.keys()
}

combined_outs_by_output_name: Mapping = {
"result": GraphOut(),
**check_outs_by_output_name,
}

op_graph = graph(
name=out_asset_key.to_python_identifier(),
description=description,
config=config,
ins={input_name: GraphIn() for _, (input_name, _) in asset_ins.items()},
out=combined_outs_by_output_name,
)(compose_fn)
return AssetsDefinition.from_graph(
op_graph,
keys_by_input_name=keys_by_input_name,
keys_by_output_name={"result": out_asset_key},
partitions_def=partitions_def,
partition_mappings=partition_mappings if partition_mappings else None,
group_name=group_name,
metadata_by_output_name={"result": metadata} if metadata else None,
freshness_policies_by_output_name=(
{"result": freshness_policy} if freshness_policy else None
),
auto_materialize_policies_by_output_name=(
{"result": auto_materialize_policy} if auto_materialize_policy else None
),
backfill_policy=backfill_policy,
descriptions_by_output_name={"result": description} if description else None,
resource_defs=resource_defs,
check_specs=check_specs,
)


def graph_multi_asset(
*,
outs: Mapping[str, AssetOut],
Expand Down
Loading