From 7124a5437769835e50652b0eea0b5ce2a322ff14 Mon Sep 17 00:00:00 2001 From: Sean Mackesey Date: Thu, 21 Sep 2023 18:16:03 -0400 Subject: [PATCH] Add explicit_mode to compute contexts --- .../dagster/_core/definitions/asset_layer.py | 14 ++++- .../_core/execution/context/compute.py | 13 +++++ .../_core/execution/context/invocation.py | 16 ++++++ .../dagster/_core/execution/context/system.py | 18 +++++++ .../dagster/_core/execution/plan/compute.py | 38 ++++++++++++-- .../_core/execution/plan/compute_generator.py | 11 ++++ .../test_require_typed_event_stream.py | 51 +++++++++++++++++++ 7 files changed, 156 insertions(+), 5 deletions(-) create mode 100644 python_modules/dagster/dagster_tests/execution_tests/test_require_typed_event_stream.py diff --git a/python_modules/dagster/dagster/_core/definitions/asset_layer.py b/python_modules/dagster/dagster/_core/definitions/asset_layer.py index aa0578ee7cb3f..fcb3a875a676f 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_layer.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_layer.py @@ -27,7 +27,10 @@ ) from dagster._core.selector.subset_selector import AssetSelectionData -from ..errors import DagsterInvalidSubsetError +from ..errors import ( + DagsterInvalidSubsetError, + DagsterInvariantViolationError, +) from .config import ConfigMapping from .dependency import NodeHandle, NodeInputHandle, NodeOutput, NodeOutputHandle from .events import AssetKey @@ -606,6 +609,15 @@ def node_output_handle_for_asset(self, asset_key: AssetKey) -> NodeOutputHandle: def assets_def_for_node(self, node_handle: NodeHandle) -> Optional["AssetsDefinition"]: return self.assets_defs_by_node_handle.get(node_handle) + def asset_key_for_node(self, node_handle: NodeHandle) -> AssetKey: + assets_def = self.assets_def_for_node(node_handle) + if not assets_def or len(assets_def.keys_by_output_name.keys()) > 1: + raise DagsterInvariantViolationError( + "Cannot call `asset_key_for_node` in a multi_asset with more than one asset." + " Multiple asset keys defined." + ) + return next(iter(assets_def.keys_by_output_name.values())) + def asset_check_specs_for_node(self, node_handle: NodeHandle) -> Sequence[AssetCheckSpec]: assets_def_for_node = self.assets_def_for_node(node_handle) checks_def_for_node = self.asset_checks_def_for_node(node_handle) diff --git a/python_modules/dagster/dagster/_core/execution/context/compute.py b/python_modules/dagster/dagster/_core/execution/context/compute.py index d76c79afe3a8d..5dc6602060651 100644 --- a/python_modules/dagster/dagster/_core/execution/context/compute.py +++ b/python_modules/dagster/dagster/_core/execution/context/compute.py @@ -1222,6 +1222,19 @@ def asset_check_spec(self) -> AssetCheckSpec: ) return asset_checks_def.spec + # In this mode no conversion is done on returned values and missing but expected outputs are not + # allowed. + @property + def has_require_typed_event_stream(self) -> bool: + return self._step_execution_context.has_require_typed_event_stream + + @property + def typed_event_stream_error_message(self) -> Optional[str]: + return self._step_execution_context.typed_event_stream_error_message + + def set_require_typed_event_stream(self, *, error_message: Optional[str] = None) -> None: + self._step_execution_context.set_require_typed_event_stream(error_message=error_message) + # actually forking the object type for assets is tricky for users in the cases of: # * manually constructing ops to make AssetsDefinitions diff --git a/python_modules/dagster/dagster/_core/execution/context/invocation.py b/python_modules/dagster/dagster/_core/execution/context/invocation.py index 46e0363f31b9d..87103deeb646e 100644 --- a/python_modules/dagster/dagster/_core/execution/context/invocation.py +++ b/python_modules/dagster/dagster/_core/execution/context/invocation.py @@ -447,6 +447,8 @@ def __init__( self._partition_key = partition_key self._partition_key_range = partition_key_range self._assets_def = assets_def + self._require_typed_event_stream = False + self._typed_event_stream_error_message = None @property def op_config(self) -> Any: @@ -714,6 +716,20 @@ def add_metadata_two_outputs(context) -> Tuple[str, int]: else: self._output_metadata[output_name] = metadata + # In this mode no conversion is done on returned values and missing but expected outputs are not + # allowed. + @property + def has_require_typed_event_stream(self) -> bool: + return self._require_typed_event_stream + + @property + def typed_event_stream_error_message(self) -> Optional[str]: + return self._typed_event_stream_error_message + + def set_require_typed_event_stream(self, *, error_message: Optional[str]) -> None: + self._require_typed_event_stream = True + self._typed_event_stream_error_message = error_message + def build_op_context( resources: Optional[Mapping[str, Any]] = None, diff --git a/python_modules/dagster/dagster/_core/execution/context/system.py b/python_modules/dagster/dagster/_core/execution/context/system.py index e36f176ef9b27..8c8dd4e86905f 100644 --- a/python_modules/dagster/dagster/_core/execution/context/system.py +++ b/python_modules/dagster/dagster/_core/execution/context/system.py @@ -559,6 +559,24 @@ def __init__( self._is_external_input_asset_version_info_loaded = False self._data_version_cache: Dict[AssetKey, "DataVersion"] = {} + self._require_typed_event_stream = False + self._typed_event_stream_error_message = None + + # In this mode no conversion is done on returned values and missing but expected outputs are not + # allowed. + @property + def has_require_typed_event_stream(self) -> bool: + return self._require_typed_event_stream + + @property + def typed_event_stream_error_message(self) -> Optional[str]: + return self._typed_event_stream_error_message + + # Error message will be appended to the default error message. + def set_require_typed_event_stream(self, *, error_message: Optional[str] = None): + self._require_typed_event_stream = True + self._typed_event_stream_error_message = error_message + @property def step(self) -> ExecutionStep: return self._step diff --git a/python_modules/dagster/dagster/_core/execution/plan/compute.py b/python_modules/dagster/dagster/_core/execution/plan/compute.py index 549c091f655a2..104eebfa98fff 100644 --- a/python_modules/dagster/dagster/_core/execution/plan/compute.py +++ b/python_modules/dagster/dagster/_core/execution/plan/compute.py @@ -27,6 +27,7 @@ NodeHandle, Output, ) +from dagster._core.definitions.asset_check_spec import AssetCheckHandle from dagster._core.definitions.asset_layer import AssetLayer from dagster._core.definitions.op_definition import OpComputeFunction from dagster._core.definitions.result import MaterializeResult @@ -203,14 +204,43 @@ def execute_core_compute( for step_output in _yield_compute_results(step_context, inputs, compute_fn): yield step_output if isinstance(step_output, (DynamicOutput, Output)): - emitted_result_names.add(step_output.output_name) + output_name = step_output.output_name + elif isinstance(step_output, MaterializeResult): + asset_key = ( + step_output.asset_key + or step_context.job_def.asset_layer.asset_key_for_node(step_context.node_handle) + ) + output_name = step_context.job_def.asset_layer.node_output_handle_for_asset( + asset_key + ).output_name + elif isinstance(step_output, AssetCheckEvaluation): + output_name = step_context.job_def.asset_layer.get_output_name_for_asset_check( + step_output.asset_check_handle + ) + elif isinstance(step_output, AssetCheckResult): + if step_output.asset_key and step_output.check_name: + handle = AssetCheckHandle(step_output.asset_key, step_output.check_name) + else: + handle = step_output.to_asset_check_evaluation(step_context).asset_check_handle + output_name = step_context.job_def.asset_layer.get_output_name_for_asset_check(handle) + else: + output_name = None + if output_name: + emitted_result_names.add(output_name) expected_op_output_names = { output.name for output in step.step_outputs if not output.properties.asset_check_handle } omitted_outputs = expected_op_output_names.difference(emitted_result_names) if omitted_outputs: - step_context.log.info( - f"{step_context.op_def.node_type_str} '{step.node_handle}' did not fire " - f"outputs {omitted_outputs!r}" + message = ( + f"{step_context.op_def.node_type_str} '{step.node_handle}' did not yield or return " + f"expected outputs {omitted_outputs!r}." ) + + if step_context.has_require_typed_event_stream: + if step_context.typed_event_stream_error_message: + message += " " + step_context.typed_event_stream_error_message + raise DagsterInvariantViolationError(message) + else: + step_context.log.info(message) diff --git a/python_modules/dagster/dagster/_core/execution/plan/compute_generator.py b/python_modules/dagster/dagster/_core/execution/plan/compute_generator.py index 360f50f62b851..020f22578bd5c 100644 --- a/python_modules/dagster/dagster/_core/execution/plan/compute_generator.py +++ b/python_modules/dagster/dagster/_core/execution/plan/compute_generator.py @@ -267,6 +267,17 @@ def validate_and_coerce_op_result_to_iterator( f" {type(result)}. {context.op_def.node_type_str.capitalize()} is explicitly defined to" " return no results." ) + # Skip any return-specific validation and treat it like a generator op + elif output_defs and context.has_require_typed_event_stream: + # If nothing was returned, treat it as an empty tuple instead of a `(None,)`. + # This is important for delivering the correct error message when an output is missing. + if result is None: + result_tuple = tuple() + elif not isinstance(result, tuple) or is_named_tuple_instance(result): + result_tuple = (result,) + else: + result_tuple = result + yield from result_tuple elif output_defs: for position, output_def, element in _zip_and_iterate_op_result( result, context, output_defs diff --git a/python_modules/dagster/dagster_tests/execution_tests/test_require_typed_event_stream.py b/python_modules/dagster/dagster_tests/execution_tests/test_require_typed_event_stream.py new file mode 100644 index 0000000000000..745d90ab6dc6e --- /dev/null +++ b/python_modules/dagster/dagster_tests/execution_tests/test_require_typed_event_stream.py @@ -0,0 +1,51 @@ +from contextlib import contextmanager +from typing import Iterator + +import pytest +from dagster import OpExecutionContext, Out, asset, multi_asset, op +from dagster._core.definitions.asset_spec import AssetSpec +from dagster._core.definitions.events import Output +from dagster._core.definitions.materialize import materialize +from dagster._core.errors import DagsterInvariantViolationError +from dagster._utils.test import wrap_op_in_graph_and_execute + +EXTRA_ERROR_MESSAGE = "Hello" + + +@contextmanager +def raises_missing_output_error() -> Iterator[None]: + with pytest.raises( + DagsterInvariantViolationError, + match=f"did not yield or return expected outputs.*{EXTRA_ERROR_MESSAGE}$", + ): + yield + + +def test_explicit_mode_op(): + @op(out={"a": Out(int), "b": Out(int)}) + def explicit_mode_op(context: OpExecutionContext): + context.set_require_typed_event_stream(error_message=EXTRA_ERROR_MESSAGE) + + with raises_missing_output_error(): + wrap_op_in_graph_and_execute(explicit_mode_op) + + +def test_explicit_mode_asset(): + @asset + def explicit_mode_asset(context: OpExecutionContext): + context.set_require_typed_event_stream(error_message=EXTRA_ERROR_MESSAGE) + pass + + with raises_missing_output_error(): + materialize([explicit_mode_asset]) + + +def test_explicit_mode_multi_asset(): + @multi_asset(specs=[AssetSpec("foo"), AssetSpec("bar")]) + def explicit_mode_multi_asset(context: OpExecutionContext): + context.set_require_typed_event_stream(error_message=EXTRA_ERROR_MESSAGE) + yield Output(None, output_name="foo") + pass + + with raises_missing_output_error(): + materialize([explicit_mode_multi_asset])