Skip to content

Commit

Permalink
Add asset checks, data version to MaterializeResult (dagster-io#16514)
Browse files Browse the repository at this point in the history
## Summary & Motivation

Add `check_results` and `data_version` to `MaterializeResult`. This
supports streaming asset check results back from an ext process added
upstack in dagster-io#16466.

## How I Tested These Changes

New unit test.
  • Loading branch information
smackesey authored and zyd14 committed Sep 15, 2023
1 parent ab3d3d0 commit 0b0e1fc
Show file tree
Hide file tree
Showing 5 changed files with 124 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -143,3 +143,11 @@ def to_asset_check_evaluation(
target_materialization_data=target_materialization_data,
severity=self.severity,
)

def get_spec_python_identifier(self, asset_key: Optional[AssetKey]) -> str:
"""Returns a string uniquely identifying the asset check spec associated with this result.
This is used for the output name associated with an `AssetCheckResult`.
"""
asset_key = asset_key or self.asset_key
assert asset_key is not None, "Asset key must be provided if not set on spec"
return f"{asset_key.to_python_identifier()}_{self.check_name}"
19 changes: 17 additions & 2 deletions python_modules/dagster/dagster/_core/definitions/result.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
from typing import NamedTuple, Optional
from typing import NamedTuple, Optional, Sequence

import dagster._check as check
from dagster._annotations import PublicAttr, experimental
from dagster._core.definitions.asset_check_result import AssetCheckResult
from dagster._core.definitions.data_version import DataVersion

from .events import (
AssetKey,
Expand All @@ -16,6 +19,8 @@ class MaterializeResult(
[
("asset_key", PublicAttr[Optional[AssetKey]]),
("metadata", PublicAttr[Optional[MetadataUserInput]]),
("check_results", PublicAttr[Optional[Sequence[AssetCheckResult]]]),
("data_version", PublicAttr[Optional[DataVersion]]),
],
)
):
Expand All @@ -33,11 +38,21 @@ def __new__(
*, # enforce kwargs
asset_key: Optional[CoercibleToAssetKey] = None,
metadata: Optional[MetadataUserInput] = None,
check_results: Optional[Sequence[AssetCheckResult]] = None,
data_version: Optional[DataVersion] = None,
):
asset_key = AssetKey.from_coercible(asset_key) if asset_key else None

return super().__new__(
cls,
asset_key=asset_key,
metadata=metadata, # check?
metadata=check.opt_nullable_mapping_param(
metadata,
"metadata",
key_type=str,
),
check_results=check.opt_nullable_sequence_param(
check_results, "check_results", of_type=AssetCheckResult
),
data_version=check.opt_inst_param(data_version, "data_version", DataVersion),
)
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from dagster._core.definitions.result import MaterializeResult
from dagster._core.errors import DagsterInvariantViolationError
from dagster._core.types.dagster_type import DagsterTypeKind, is_generic_output_annotation
from dagster._utils import is_named_tuple_instance
from dagster._utils.warnings import disable_dagster_warnings

from ..context.compute import OpExecutionContext
Expand Down Expand Up @@ -133,14 +134,45 @@ def _coerce_op_compute_fn_to_iterator(
def _zip_and_iterate_op_result(
result: Any, context: OpExecutionContext, output_defs: Sequence[OutputDefinition]
) -> Iterator[Tuple[int, Any, OutputDefinition]]:
if len(output_defs) > 1:
result = _validate_multi_return(context, result, output_defs)
for position, (output_def, element) in enumerate(zip(output_defs, result)):
# Filtering the expected output defs here is an unfortunate temporary solution to deal with the
# change in expected outputs that occurs as a result of putting `AssetCheckResults` onto
# `MaterializeResults`. Prior to this, `AssetCheckResults` were yielded/returned directly, and
# thus were expected to always be included in the result tuple. Thus we need to remove them from
# the expected output defs if they have been included indirectly via embedding in a
# `MaterializeResult`.
#
# A better solution is surely possible here in a future refactor. The major complicating element
# is the conversion of MaterializeResult into Output, which currently happens in
# execute_step.py and leverages job_def.asset_layer. There is difficulty in moving that logic up
# to here because we can't rely on the presence of asset layer here, since the present code path
# is used by direct invocation. Probably the solution is to expose an asset layer on the
# invocation context.
expected_return_outputs = _filter_expected_output_defs(result, context, output_defs)
if len(expected_return_outputs) > 1:
result = _validate_multi_return(context, result, expected_return_outputs)
for position, (output_def, element) in enumerate(zip(expected_return_outputs, result)):
yield position, output_def, element
else:
yield 0, output_defs[0], result


# Filter out output_defs corresponding to asset check results that already exist on a
# MaterializeResult.
def _filter_expected_output_defs(
result: Any, context: OpExecutionContext, output_defs: Sequence[OutputDefinition]
) -> Sequence[OutputDefinition]:
result_tuple = (
(result,) if not isinstance(result, tuple) or is_named_tuple_instance(result) else result
)
materialize_results = [x for x in result_tuple if isinstance(x, MaterializeResult)]
remove_outputs = [
r.get_spec_python_identifier(x.asset_key or context.asset_key)
for x in materialize_results
for r in x.check_results or []
]
return [out for out in output_defs if out.name not in remove_outputs]


def _validate_multi_return(
context: OpExecutionContext,
result: Any,
Expand Down
72 changes: 40 additions & 32 deletions python_modules/dagster/dagster/_core/execution/plan/execute_step.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,45 +91,53 @@ def _process_asset_results_to_events(
to create a full picture of the asset check's evaluation.
"""
for user_event in user_event_sequence:
if isinstance(user_event, MaterializeResult):
assets_def = step_context.job_def.asset_layer.assets_def_for_node(
step_context.node_handle
yield from _process_user_event(step_context, user_event)


def _process_user_event(
step_context: StepExecutionContext, user_event: OpOutputUnion
) -> Iterator[OpOutputUnion]:
if isinstance(user_event, MaterializeResult):
assets_def = step_context.job_def.asset_layer.assets_def_for_node(step_context.node_handle)
if not assets_def:
raise DagsterInvariantViolationError(
"MaterializeResult is only valid within asset computations, no backing"
" AssetsDefinition found."
)
if not assets_def:
if user_event.asset_key:
asset_key = user_event.asset_key
else:
if len(assets_def.keys) != 1:
raise DagsterInvariantViolationError(
"MaterializeResult is only valid within asset computations, no backing"
" AssetsDefinition found."
"MaterializeResult did not include asset_key and it can not be inferred."
f" Specify which asset_key, options are: {assets_def.keys}."
)
if user_event.asset_key:
asset_key = user_event.asset_key
else:
if len(assets_def.keys) != 1:
raise DagsterInvariantViolationError(
"MaterializeResult did not include asset_key and it can not be inferred."
f" Specify which asset_key, options are: {assets_def.keys}."
)
asset_key = assets_def.key
asset_key = assets_def.key

output_name = assets_def.get_output_name_for_asset_key(asset_key)
output = Output(
value=None,
output_name=output_name,
metadata=user_event.metadata,
)
yield output
elif isinstance(user_event, AssetCheckResult):
asset_check_evaluation = user_event.to_asset_check_evaluation(step_context)
output_name = assets_def.get_output_name_for_asset_key(asset_key)

output_name = step_context.job_def.asset_layer.get_output_name_for_asset_check(
asset_check_evaluation.asset_check_handle
)
output = Output(value=None, output_name=output_name)
for check_result in user_event.check_results or []:
yield from _process_user_event(step_context, check_result)

yield asset_check_evaluation
yield Output(
value=None,
output_name=output_name,
metadata=user_event.metadata,
data_version=user_event.data_version,
)
elif isinstance(user_event, AssetCheckResult):
asset_check_evaluation = user_event.to_asset_check_evaluation(step_context)

yield output
else:
yield user_event
output_name = step_context.job_def.asset_layer.get_output_name_for_asset_check(
asset_check_evaluation.asset_check_handle
)
output = Output(value=None, output_name=output_name)

yield asset_check_evaluation

yield output
else:
yield user_event


def _step_output_error_checked_user_event_sequence(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
)
from dagster._check import CheckError
from dagster._core.definitions import AssetIn, SourceAsset, asset, multi_asset
from dagster._core.definitions.asset_check_result import AssetCheckResult
from dagster._core.definitions.asset_check_spec import AssetCheckSpec
from dagster._core.definitions.asset_graph import AssetGraph
from dagster._core.definitions.asset_spec import AssetSpec
from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy
Expand All @@ -49,6 +51,7 @@
DagsterStepOutputNotFoundError,
)
from dagster._core.instance import DagsterInstance
from dagster._core.storage.asset_check_execution_record import AssetCheckExecutionRecordStatus
from dagster._core.storage.mem_io_manager import InMemoryIOManager
from dagster._core.test_utils import instance_for_test
from dagster._core.types.dagster_type import Nothing
Expand Down Expand Up @@ -1707,6 +1710,27 @@ def ret_mismatch(context: AssetExecutionContext):
mats = _exec_asset(ret_mismatch)


def test_return_materialization_with_asset_checks():
with instance_for_test() as instance:

@asset(check_specs=[AssetCheckSpec(name="foo_check", asset=AssetKey("ret_checks"))])
def ret_checks(context: AssetExecutionContext):
return MaterializeResult(
check_results=[
AssetCheckResult(check_name="foo_check", metadata={"one": 1}, success=True)
]
)

materialize([ret_checks], instance=instance)
asset_check_executions = instance.event_log_storage.get_asset_check_executions(
asset_key=ret_checks.key,
check_name="foo_check",
limit=1,
)
assert len(asset_check_executions) == 1
assert asset_check_executions[0].status == AssetCheckExecutionRecordStatus.SUCCEEDED


def test_return_materialization_multi_asset():
#
# yield successful
Expand Down

0 comments on commit 0b0e1fc

Please sign in to comment.