From 0b0e1fc1f2709218dab9f5a262721e9badd422ed Mon Sep 17 00:00:00 2001 From: Sean Mackesey Date: Fri, 15 Sep 2023 08:36:54 -0400 Subject: [PATCH] Add asset checks, data version to MaterializeResult (#16514) ## Summary & Motivation Add `check_results` and `data_version` to `MaterializeResult`. This supports streaming asset check results back from an ext process added upstack in #16466. ## How I Tested These Changes New unit test. --- .../_core/definitions/asset_check_result.py | 8 +++ .../dagster/_core/definitions/result.py | 19 ++++- .../_core/execution/plan/compute_generator.py | 38 +++++++++- .../_core/execution/plan/execute_step.py | 72 ++++++++++--------- .../asset_defs_tests/test_assets.py | 24 +++++++ 5 files changed, 124 insertions(+), 37 deletions(-) diff --git a/python_modules/dagster/dagster/_core/definitions/asset_check_result.py b/python_modules/dagster/dagster/_core/definitions/asset_check_result.py index 411fe5d688285..4d4dc26f0f585 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_check_result.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_check_result.py @@ -143,3 +143,11 @@ def to_asset_check_evaluation( target_materialization_data=target_materialization_data, severity=self.severity, ) + + def get_spec_python_identifier(self, asset_key: Optional[AssetKey]) -> str: + """Returns a string uniquely identifying the asset check spec associated with this result. + This is used for the output name associated with an `AssetCheckResult`. + """ + asset_key = asset_key or self.asset_key + assert asset_key is not None, "Asset key must be provided if not set on spec" + return f"{asset_key.to_python_identifier()}_{self.check_name}" diff --git a/python_modules/dagster/dagster/_core/definitions/result.py b/python_modules/dagster/dagster/_core/definitions/result.py index 216502d509d56..61c50036285e7 100644 --- a/python_modules/dagster/dagster/_core/definitions/result.py +++ b/python_modules/dagster/dagster/_core/definitions/result.py @@ -1,6 +1,9 @@ -from typing import NamedTuple, Optional +from typing import NamedTuple, Optional, Sequence +import dagster._check as check from dagster._annotations import PublicAttr, experimental +from dagster._core.definitions.asset_check_result import AssetCheckResult +from dagster._core.definitions.data_version import DataVersion from .events import ( AssetKey, @@ -16,6 +19,8 @@ class MaterializeResult( [ ("asset_key", PublicAttr[Optional[AssetKey]]), ("metadata", PublicAttr[Optional[MetadataUserInput]]), + ("check_results", PublicAttr[Optional[Sequence[AssetCheckResult]]]), + ("data_version", PublicAttr[Optional[DataVersion]]), ], ) ): @@ -33,11 +38,21 @@ def __new__( *, # enforce kwargs asset_key: Optional[CoercibleToAssetKey] = None, metadata: Optional[MetadataUserInput] = None, + check_results: Optional[Sequence[AssetCheckResult]] = None, + data_version: Optional[DataVersion] = None, ): asset_key = AssetKey.from_coercible(asset_key) if asset_key else None return super().__new__( cls, asset_key=asset_key, - metadata=metadata, # check? + metadata=check.opt_nullable_mapping_param( + metadata, + "metadata", + key_type=str, + ), + check_results=check.opt_nullable_sequence_param( + check_results, "check_results", of_type=AssetCheckResult + ), + data_version=check.opt_inst_param(data_version, "data_version", DataVersion), ) diff --git a/python_modules/dagster/dagster/_core/execution/plan/compute_generator.py b/python_modules/dagster/dagster/_core/execution/plan/compute_generator.py index dce65ea88fdc9..0e09cacf2e347 100644 --- a/python_modules/dagster/dagster/_core/execution/plan/compute_generator.py +++ b/python_modules/dagster/dagster/_core/execution/plan/compute_generator.py @@ -31,6 +31,7 @@ from dagster._core.definitions.result import MaterializeResult from dagster._core.errors import DagsterInvariantViolationError from dagster._core.types.dagster_type import DagsterTypeKind, is_generic_output_annotation +from dagster._utils import is_named_tuple_instance from dagster._utils.warnings import disable_dagster_warnings from ..context.compute import OpExecutionContext @@ -133,14 +134,45 @@ def _coerce_op_compute_fn_to_iterator( def _zip_and_iterate_op_result( result: Any, context: OpExecutionContext, output_defs: Sequence[OutputDefinition] ) -> Iterator[Tuple[int, Any, OutputDefinition]]: - if len(output_defs) > 1: - result = _validate_multi_return(context, result, output_defs) - for position, (output_def, element) in enumerate(zip(output_defs, result)): + # Filtering the expected output defs here is an unfortunate temporary solution to deal with the + # change in expected outputs that occurs as a result of putting `AssetCheckResults` onto + # `MaterializeResults`. Prior to this, `AssetCheckResults` were yielded/returned directly, and + # thus were expected to always be included in the result tuple. Thus we need to remove them from + # the expected output defs if they have been included indirectly via embedding in a + # `MaterializeResult`. + # + # A better solution is surely possible here in a future refactor. The major complicating element + # is the conversion of MaterializeResult into Output, which currently happens in + # execute_step.py and leverages job_def.asset_layer. There is difficulty in moving that logic up + # to here because we can't rely on the presence of asset layer here, since the present code path + # is used by direct invocation. Probably the solution is to expose an asset layer on the + # invocation context. + expected_return_outputs = _filter_expected_output_defs(result, context, output_defs) + if len(expected_return_outputs) > 1: + result = _validate_multi_return(context, result, expected_return_outputs) + for position, (output_def, element) in enumerate(zip(expected_return_outputs, result)): yield position, output_def, element else: yield 0, output_defs[0], result +# Filter out output_defs corresponding to asset check results that already exist on a +# MaterializeResult. +def _filter_expected_output_defs( + result: Any, context: OpExecutionContext, output_defs: Sequence[OutputDefinition] +) -> Sequence[OutputDefinition]: + result_tuple = ( + (result,) if not isinstance(result, tuple) or is_named_tuple_instance(result) else result + ) + materialize_results = [x for x in result_tuple if isinstance(x, MaterializeResult)] + remove_outputs = [ + r.get_spec_python_identifier(x.asset_key or context.asset_key) + for x in materialize_results + for r in x.check_results or [] + ] + return [out for out in output_defs if out.name not in remove_outputs] + + def _validate_multi_return( context: OpExecutionContext, result: Any, diff --git a/python_modules/dagster/dagster/_core/execution/plan/execute_step.py b/python_modules/dagster/dagster/_core/execution/plan/execute_step.py index 1ffa93dd6a731..c1141d9dba0d2 100644 --- a/python_modules/dagster/dagster/_core/execution/plan/execute_step.py +++ b/python_modules/dagster/dagster/_core/execution/plan/execute_step.py @@ -91,45 +91,53 @@ def _process_asset_results_to_events( to create a full picture of the asset check's evaluation. """ for user_event in user_event_sequence: - if isinstance(user_event, MaterializeResult): - assets_def = step_context.job_def.asset_layer.assets_def_for_node( - step_context.node_handle + yield from _process_user_event(step_context, user_event) + + +def _process_user_event( + step_context: StepExecutionContext, user_event: OpOutputUnion +) -> Iterator[OpOutputUnion]: + if isinstance(user_event, MaterializeResult): + assets_def = step_context.job_def.asset_layer.assets_def_for_node(step_context.node_handle) + if not assets_def: + raise DagsterInvariantViolationError( + "MaterializeResult is only valid within asset computations, no backing" + " AssetsDefinition found." ) - if not assets_def: + if user_event.asset_key: + asset_key = user_event.asset_key + else: + if len(assets_def.keys) != 1: raise DagsterInvariantViolationError( - "MaterializeResult is only valid within asset computations, no backing" - " AssetsDefinition found." + "MaterializeResult did not include asset_key and it can not be inferred." + f" Specify which asset_key, options are: {assets_def.keys}." ) - if user_event.asset_key: - asset_key = user_event.asset_key - else: - if len(assets_def.keys) != 1: - raise DagsterInvariantViolationError( - "MaterializeResult did not include asset_key and it can not be inferred." - f" Specify which asset_key, options are: {assets_def.keys}." - ) - asset_key = assets_def.key + asset_key = assets_def.key - output_name = assets_def.get_output_name_for_asset_key(asset_key) - output = Output( - value=None, - output_name=output_name, - metadata=user_event.metadata, - ) - yield output - elif isinstance(user_event, AssetCheckResult): - asset_check_evaluation = user_event.to_asset_check_evaluation(step_context) + output_name = assets_def.get_output_name_for_asset_key(asset_key) - output_name = step_context.job_def.asset_layer.get_output_name_for_asset_check( - asset_check_evaluation.asset_check_handle - ) - output = Output(value=None, output_name=output_name) + for check_result in user_event.check_results or []: + yield from _process_user_event(step_context, check_result) - yield asset_check_evaluation + yield Output( + value=None, + output_name=output_name, + metadata=user_event.metadata, + data_version=user_event.data_version, + ) + elif isinstance(user_event, AssetCheckResult): + asset_check_evaluation = user_event.to_asset_check_evaluation(step_context) - yield output - else: - yield user_event + output_name = step_context.job_def.asset_layer.get_output_name_for_asset_check( + asset_check_evaluation.asset_check_handle + ) + output = Output(value=None, output_name=output_name) + + yield asset_check_evaluation + + yield output + else: + yield user_event def _step_output_error_checked_user_event_sequence( diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets.py b/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets.py index 2ac29d9fedf56..7478e7d12745f 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets.py @@ -36,6 +36,8 @@ ) from dagster._check import CheckError from dagster._core.definitions import AssetIn, SourceAsset, asset, multi_asset +from dagster._core.definitions.asset_check_result import AssetCheckResult +from dagster._core.definitions.asset_check_spec import AssetCheckSpec from dagster._core.definitions.asset_graph import AssetGraph from dagster._core.definitions.asset_spec import AssetSpec from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy @@ -49,6 +51,7 @@ DagsterStepOutputNotFoundError, ) from dagster._core.instance import DagsterInstance +from dagster._core.storage.asset_check_execution_record import AssetCheckExecutionRecordStatus from dagster._core.storage.mem_io_manager import InMemoryIOManager from dagster._core.test_utils import instance_for_test from dagster._core.types.dagster_type import Nothing @@ -1707,6 +1710,27 @@ def ret_mismatch(context: AssetExecutionContext): mats = _exec_asset(ret_mismatch) +def test_return_materialization_with_asset_checks(): + with instance_for_test() as instance: + + @asset(check_specs=[AssetCheckSpec(name="foo_check", asset=AssetKey("ret_checks"))]) + def ret_checks(context: AssetExecutionContext): + return MaterializeResult( + check_results=[ + AssetCheckResult(check_name="foo_check", metadata={"one": 1}, success=True) + ] + ) + + materialize([ret_checks], instance=instance) + asset_check_executions = instance.event_log_storage.get_asset_check_executions( + asset_key=ret_checks.key, + check_name="foo_check", + limit=1, + ) + assert len(asset_check_executions) == 1 + assert asset_check_executions[0].status == AssetCheckExecutionRecordStatus.SUCCEEDED + + def test_return_materialization_multi_asset(): # # yield successful