diff --git a/python_modules/dagster/dagster/_core/definitions/asset_check_result.py b/python_modules/dagster/dagster/_core/definitions/asset_check_result.py index 4d4dc26f0f585..1fe0e97b25b62 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_check_result.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_check_result.py @@ -144,10 +144,14 @@ def to_asset_check_evaluation( severity=self.severity, ) - def get_spec_python_identifier(self, asset_key: Optional[AssetKey]) -> str: + def get_spec_python_identifier( + self, *, asset_key: Optional[AssetKey] = None, check_name: Optional[str] = None + ) -> str: """Returns a string uniquely identifying the asset check spec associated with this result. This is used for the output name associated with an `AssetCheckResult`. """ asset_key = asset_key or self.asset_key + check_name = check_name or self.check_name + assert asset_key is not None, "Asset key must be provided if not set on spec" assert asset_key is not None, "Asset key must be provided if not set on spec" return f"{asset_key.to_python_identifier()}_{self.check_name}" diff --git a/python_modules/dagster/dagster/_core/definitions/asset_layer.py b/python_modules/dagster/dagster/_core/definitions/asset_layer.py index aa0578ee7cb3f..fcb3a875a676f 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_layer.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_layer.py @@ -27,7 +27,10 @@ ) from dagster._core.selector.subset_selector import AssetSelectionData -from ..errors import DagsterInvalidSubsetError +from ..errors import ( + DagsterInvalidSubsetError, + DagsterInvariantViolationError, +) from .config import ConfigMapping from .dependency import NodeHandle, NodeInputHandle, NodeOutput, NodeOutputHandle from .events import AssetKey @@ -606,6 +609,15 @@ def node_output_handle_for_asset(self, asset_key: AssetKey) -> NodeOutputHandle: def assets_def_for_node(self, node_handle: NodeHandle) -> Optional["AssetsDefinition"]: return self.assets_defs_by_node_handle.get(node_handle) + def asset_key_for_node(self, node_handle: NodeHandle) -> AssetKey: + assets_def = self.assets_def_for_node(node_handle) + if not assets_def or len(assets_def.keys_by_output_name.keys()) > 1: + raise DagsterInvariantViolationError( + "Cannot call `asset_key_for_node` in a multi_asset with more than one asset." + " Multiple asset keys defined." + ) + return next(iter(assets_def.keys_by_output_name.values())) + def asset_check_specs_for_node(self, node_handle: NodeHandle) -> Sequence[AssetCheckSpec]: assets_def_for_node = self.assets_def_for_node(node_handle) checks_def_for_node = self.asset_checks_def_for_node(node_handle) diff --git a/python_modules/dagster/dagster/_core/execution/context/compute.py b/python_modules/dagster/dagster/_core/execution/context/compute.py index d76c79afe3a8d..77ba9ef72825e 100644 --- a/python_modules/dagster/dagster/_core/execution/context/compute.py +++ b/python_modules/dagster/dagster/_core/execution/context/compute.py @@ -1222,6 +1222,19 @@ def asset_check_spec(self) -> AssetCheckSpec: ) return asset_checks_def.spec + # In this mode no conversion is done on returned values and missing but expected outputs are not + # allowed. + @property + def requires_typed_event_stream(self) -> bool: + return self._step_execution_context.requires_typed_event_stream + + @property + def typed_event_stream_error_message(self) -> Optional[str]: + return self._step_execution_context.typed_event_stream_error_message + + def set_requires_typed_event_stream(self, *, error_message: Optional[str] = None) -> None: + self._step_execution_context.set_requires_typed_event_stream(error_message=error_message) + # actually forking the object type for assets is tricky for users in the cases of: # * manually constructing ops to make AssetsDefinitions diff --git a/python_modules/dagster/dagster/_core/execution/context/invocation.py b/python_modules/dagster/dagster/_core/execution/context/invocation.py index 46e0363f31b9d..82f51e4d9e65c 100644 --- a/python_modules/dagster/dagster/_core/execution/context/invocation.py +++ b/python_modules/dagster/dagster/_core/execution/context/invocation.py @@ -447,6 +447,8 @@ def __init__( self._partition_key = partition_key self._partition_key_range = partition_key_range self._assets_def = assets_def + self._requires_typed_event_stream = False + self._typed_event_stream_error_message = None @property def op_config(self) -> Any: @@ -714,6 +716,20 @@ def add_metadata_two_outputs(context) -> Tuple[str, int]: else: self._output_metadata[output_name] = metadata + # In this mode no conversion is done on returned values and missing but expected outputs are not + # allowed. + @property + def requires_typed_event_stream(self) -> bool: + return self._requires_typed_event_stream + + @property + def typed_event_stream_error_message(self) -> Optional[str]: + return self._typed_event_stream_error_message + + def set_requires_typed_event_stream(self, *, error_message: Optional[str]) -> None: + self._requires_typed_event_stream = True + self._typed_event_stream_error_message = error_message + def build_op_context( resources: Optional[Mapping[str, Any]] = None, diff --git a/python_modules/dagster/dagster/_core/execution/context/system.py b/python_modules/dagster/dagster/_core/execution/context/system.py index f0f11c3ee22af..a0902b3a81aba 100644 --- a/python_modules/dagster/dagster/_core/execution/context/system.py +++ b/python_modules/dagster/dagster/_core/execution/context/system.py @@ -575,6 +575,24 @@ def __init__( self._is_external_input_asset_version_info_loaded = False self._data_version_cache: Dict[AssetKey, "DataVersion"] = {} + self._requires_typed_event_stream = False + self._typed_event_stream_error_message = None + + # In this mode no conversion is done on returned values and missing but expected outputs are not + # allowed. + @property + def requires_typed_event_stream(self) -> bool: + return self._requires_typed_event_stream + + @property + def typed_event_stream_error_message(self) -> Optional[str]: + return self._typed_event_stream_error_message + + # Error message will be appended to the default error message. + def set_requires_typed_event_stream(self, *, error_message: Optional[str] = None): + self._requires_typed_event_stream = True + self._typed_event_stream_error_message = error_message + @property def step(self) -> ExecutionStep: return self._step diff --git a/python_modules/dagster/dagster/_core/execution/plan/compute.py b/python_modules/dagster/dagster/_core/execution/plan/compute.py index 549c091f655a2..7117bc4aae36e 100644 --- a/python_modules/dagster/dagster/_core/execution/plan/compute.py +++ b/python_modules/dagster/dagster/_core/execution/plan/compute.py @@ -27,6 +27,7 @@ NodeHandle, Output, ) +from dagster._core.definitions.asset_check_spec import AssetCheckHandle from dagster._core.definitions.asset_layer import AssetLayer from dagster._core.definitions.op_definition import OpComputeFunction from dagster._core.definitions.result import MaterializeResult @@ -204,13 +205,50 @@ def execute_core_compute( yield step_output if isinstance(step_output, (DynamicOutput, Output)): emitted_result_names.add(step_output.output_name) + elif isinstance(step_output, MaterializeResult): + asset_key = ( + step_output.asset_key + or step_context.job_def.asset_layer.asset_key_for_node(step_context.node_handle) + ) + emitted_result_names.add( + step_context.job_def.asset_layer.node_output_handle_for_asset(asset_key).output_name + ) + # Check results embedded in MaterializeResult are counted + for check_result in step_output.check_results or []: + handle = check_result.to_asset_check_evaluation(step_context).asset_check_handle + output_name = step_context.job_def.asset_layer.get_output_name_for_asset_check( + handle + ) + emitted_result_names.add(output_name) + elif isinstance(step_output, AssetCheckEvaluation): + output_name = step_context.job_def.asset_layer.get_output_name_for_asset_check( + step_output.asset_check_handle + ) + emitted_result_names.add(output_name) + elif isinstance(step_output, AssetCheckResult): + if step_output.asset_key and step_output.check_name: + handle = AssetCheckHandle(step_output.asset_key, step_output.check_name) + else: + handle = step_output.to_asset_check_evaluation(step_context).asset_check_handle + output_name = step_context.job_def.asset_layer.get_output_name_for_asset_check(handle) + emitted_result_names.add(output_name) expected_op_output_names = { - output.name for output in step.step_outputs if not output.properties.asset_check_handle + output.name + for output in step.step_outputs + # checks are required if we're in requires_typed_event_stream mode + if step_context.requires_typed_event_stream or output.properties.asset_check_handle } omitted_outputs = expected_op_output_names.difference(emitted_result_names) if omitted_outputs: - step_context.log.info( - f"{step_context.op_def.node_type_str} '{step.node_handle}' did not fire " - f"outputs {omitted_outputs!r}" + message = ( + f"{step_context.op_def.node_type_str} '{step.node_handle}' did not yield or return " + f"expected outputs {omitted_outputs!r}." ) + + if step_context.requires_typed_event_stream: + if step_context.typed_event_stream_error_message: + message += " " + step_context.typed_event_stream_error_message + raise DagsterInvariantViolationError(message) + else: + step_context.log.info(message) diff --git a/python_modules/dagster/dagster/_core/execution/plan/compute_generator.py b/python_modules/dagster/dagster/_core/execution/plan/compute_generator.py index 360f50f62b851..b750914aee15d 100644 --- a/python_modules/dagster/dagster/_core/execution/plan/compute_generator.py +++ b/python_modules/dagster/dagster/_core/execution/plan/compute_generator.py @@ -175,7 +175,7 @@ def _filter_expected_output_defs( ) materialize_results = [x for x in result_tuple if isinstance(x, MaterializeResult)] remove_outputs = [ - r.get_spec_python_identifier(x.asset_key or context.asset_key) + r.get_spec_python_identifier(asset_key=x.asset_key or context.asset_key) for x in materialize_results for r in x.check_results or [] ] @@ -267,6 +267,20 @@ def validate_and_coerce_op_result_to_iterator( f" {type(result)}. {context.op_def.node_type_str.capitalize()} is explicitly defined to" " return no results." ) + # `requires_typed_event_stream` is a mode where we require users to return/yield exactly the + # results that will be registered in the instance, without additional fancy inference (like + # wrapping `None` in an `Output`). We therefore skip any return-specific validation for this + # mode and treat returned values as if they were yielded. + elif output_defs and context.requires_typed_event_stream: + # If nothing was returned, treat it as an empty tuple instead of a `(None,)`. + # This is important for delivering the correct error message when an output is missing. + if result is None: + result_tuple = tuple() + elif not isinstance(result, tuple) or is_named_tuple_instance(result): + result_tuple = (result,) + else: + result_tuple = result + yield from result_tuple elif output_defs: for position, output_def, element in _zip_and_iterate_op_result( result, context, output_defs diff --git a/python_modules/dagster/dagster_tests/execution_tests/test_require_typed_event_stream.py b/python_modules/dagster/dagster_tests/execution_tests/test_require_typed_event_stream.py new file mode 100644 index 0000000000000..978c909664ba1 --- /dev/null +++ b/python_modules/dagster/dagster_tests/execution_tests/test_require_typed_event_stream.py @@ -0,0 +1,206 @@ +from contextlib import contextmanager +from typing import Iterator + +import pytest +from dagster import OpExecutionContext, Out, asset, multi_asset, op +from dagster._core.definitions.asset_check_result import AssetCheckResult +from dagster._core.definitions.asset_check_spec import AssetCheckSpec +from dagster._core.definitions.asset_spec import AssetSpec +from dagster._core.definitions.events import AssetKey, Output +from dagster._core.definitions.materialize import materialize +from dagster._core.definitions.result import MaterializeResult +from dagster._core.errors import DagsterInvariantViolationError, DagsterStepOutputNotFoundError +from dagster._utils.test import wrap_op_in_graph_and_execute + +EXTRA_ERROR_MESSAGE = "Hello" + + +@contextmanager +def raises_missing_output_error() -> Iterator[None]: + with pytest.raises( + DagsterInvariantViolationError, + match=f"did not yield or return expected outputs.*{EXTRA_ERROR_MESSAGE}$", + ): + yield + + +@contextmanager +def raises_missing_check_output_error() -> Iterator[None]: + with pytest.raises( + DagsterStepOutputNotFoundError, + match="did not return an output for non-optional output", + ): + yield + + +def test_requires_typed_event_stream_op(): + @op + def op_fails(context: OpExecutionContext): + context.set_requires_typed_event_stream(error_message=EXTRA_ERROR_MESSAGE) + + with raises_missing_output_error(): + wrap_op_in_graph_and_execute(op_fails) + + @op(out={"a": Out(int), "b": Out(int)}) + def op_fails_partial_yield(context: OpExecutionContext): + context.set_requires_typed_event_stream(error_message=EXTRA_ERROR_MESSAGE) + yield Output(1, output_name="a") + + with raises_missing_output_error(): + wrap_op_in_graph_and_execute(op_fails_partial_yield) + + @op(out={"a": Out(int), "b": Out(int)}) + def op_fails_partial_return(context: OpExecutionContext): + context.set_requires_typed_event_stream(error_message=EXTRA_ERROR_MESSAGE) + yield Output(1, output_name="a") + + with raises_missing_output_error(): + wrap_op_in_graph_and_execute(op_fails_partial_return) + + @op(out={"a": Out(int), "b": Out(int)}) + def op_succeeds_yield(context: OpExecutionContext): + context.set_requires_typed_event_stream(error_message=EXTRA_ERROR_MESSAGE) + yield Output(1, output_name="a") + yield Output(2, output_name="b") + + assert wrap_op_in_graph_and_execute(op_succeeds_yield) + + @op(out={"a": Out(int), "b": Out(int)}) + def op_succeeds_return(context: OpExecutionContext): + context.set_requires_typed_event_stream(error_message=EXTRA_ERROR_MESSAGE) + return Output(1, output_name="a"), Output(2, output_name="b") + + assert wrap_op_in_graph_and_execute(op_succeeds_return) + + +def test_requires_typed_event_stream_asset(): + @asset + def asset_fails(context: OpExecutionContext): + context.set_requires_typed_event_stream(error_message=EXTRA_ERROR_MESSAGE) + + with raises_missing_output_error(): + materialize([asset_fails]) + + @asset + def asset_succeeds_output_yield(context: OpExecutionContext): + context.set_requires_typed_event_stream(error_message=EXTRA_ERROR_MESSAGE) + yield Output(1) + + assert materialize([asset_succeeds_output_yield]) + + @asset + def asset_succeeds_output_return(context: OpExecutionContext): + context.set_requires_typed_event_stream(error_message=EXTRA_ERROR_MESSAGE) + return Output(1) + + assert materialize([asset_succeeds_output_return]) + + @asset + def asset_succeeds_materialize_result_yield(context: OpExecutionContext): + context.set_requires_typed_event_stream(error_message=EXTRA_ERROR_MESSAGE) + yield MaterializeResult() + + assert materialize([asset_succeeds_materialize_result_yield]) + + @asset + def asset_succeeds_materialize_result_return(context: OpExecutionContext): + context.set_requires_typed_event_stream(error_message=EXTRA_ERROR_MESSAGE) + return MaterializeResult() + + assert materialize([asset_succeeds_materialize_result_return]) + + @asset( + check_specs=[ + AssetCheckSpec(name="foo", asset=AssetKey(["asset_succeeds_check_separate_yield"])) + ] + ) + def asset_succeeds_check_separate_yield(context: OpExecutionContext): + context.set_requires_typed_event_stream(error_message=EXTRA_ERROR_MESSAGE) + yield MaterializeResult() + yield AssetCheckResult(success=True) + + assert materialize([asset_succeeds_check_separate_yield]) + + @asset( + check_specs=[ + AssetCheckSpec(name="foo", asset=AssetKey(["asset_succeeds_check_separate_return"])) + ] + ) + def asset_succeeds_check_separate_return(context: OpExecutionContext): + context.set_requires_typed_event_stream(error_message=EXTRA_ERROR_MESSAGE) + return MaterializeResult(), AssetCheckResult(success=True) + + assert materialize([asset_succeeds_check_separate_return]) + + @asset( + check_specs=[ + AssetCheckSpec(name="foo", asset=AssetKey(["asset_succeeds_check_embedded_yield"])) + ] + ) + def asset_succeeds_check_embedded_yield(context: OpExecutionContext): + context.set_requires_typed_event_stream(error_message=EXTRA_ERROR_MESSAGE) + yield MaterializeResult(check_results=[AssetCheckResult(success=True)]) + + assert materialize([asset_succeeds_check_embedded_yield]) + + @asset( + check_specs=[ + AssetCheckSpec(name="foo", asset=AssetKey(["asset_succeeds_check_embedded_return"])) + ] + ) + def asset_succeeds_check_embedded_return(context: OpExecutionContext): + context.set_requires_typed_event_stream(error_message=EXTRA_ERROR_MESSAGE) + return MaterializeResult(check_results=[AssetCheckResult(success=True)]) + + assert materialize([asset_succeeds_check_embedded_return]) + + @asset( + check_specs=[ + AssetCheckSpec(name="foo", asset=AssetKey(["asset_fails_missing_check_yield"])) + ] + ) + def asset_fails_missing_check_yield(context: OpExecutionContext): + context.set_requires_typed_event_stream(error_message=EXTRA_ERROR_MESSAGE) + yield MaterializeResult() + + # with raises_missing_check_output_error(): + with raises_missing_output_error(): + materialize([asset_fails_missing_check_yield]) + + @asset( + check_specs=[ + AssetCheckSpec(name="foo", asset=AssetKey(["asset_fails_missing_check_return"])) + ] + ) + def asset_fails_missing_check_return(context: OpExecutionContext): + context.set_requires_typed_event_stream(error_message=EXTRA_ERROR_MESSAGE) + return MaterializeResult() + + # with raises_missing_check_output_error(): + with raises_missing_output_error(): + materialize([asset_fails_missing_check_return]) + + +def test_requires_typed_event_stream_multi_asset(): + @multi_asset(specs=[AssetSpec("foo"), AssetSpec("bar")]) + def asset_fails_multi_asset(context: OpExecutionContext): + context.set_requires_typed_event_stream(error_message=EXTRA_ERROR_MESSAGE) + yield Output(None, output_name="foo") + + with raises_missing_output_error(): + materialize([asset_fails_multi_asset]) + + @multi_asset(specs=[AssetSpec("foo"), AssetSpec("bar")]) + def asset_succeeds_multi_asset_yield(context: OpExecutionContext): + context.set_requires_typed_event_stream(error_message=EXTRA_ERROR_MESSAGE) + yield Output(None, output_name="foo") + yield Output(None, output_name="bar") + + assert materialize([asset_succeeds_multi_asset_yield]) + + @multi_asset(specs=[AssetSpec("foo"), AssetSpec("bar")]) + def asset_succeeds_multi_asset_return(context: OpExecutionContext): + context.set_requires_typed_event_stream(error_message=EXTRA_ERROR_MESSAGE) + return Output(None, output_name="foo"), Output(None, output_name="bar") + + assert materialize([asset_succeeds_multi_asset_return])