From ecb517853001c54c62f2f61f6eb8cb966a5e267e Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Thu, 21 Sep 2023 16:11:18 -0400 Subject: [PATCH] move relevant updates from another branch --- .../_core/execution/plan/compute_generator.py | 21 ++++++++++++------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/python_modules/dagster/dagster/_core/execution/plan/compute_generator.py b/python_modules/dagster/dagster/_core/execution/plan/compute_generator.py index 075fc884243fe..1f76644415de1 100644 --- a/python_modules/dagster/dagster/_core/execution/plan/compute_generator.py +++ b/python_modules/dagster/dagster/_core/execution/plan/compute_generator.py @@ -242,17 +242,22 @@ def _check_output_object_name( def validate_and_coerce_op_result_to_iterator( - result: Any, context: OpExecutionContext, output_defs: Sequence[OutputDefinition] + result: Any, + context: Union[OpExecutionContext, AssetExecutionContext], + output_defs: Sequence[OutputDefinition], ) -> Iterator[Any]: if isinstance(context, AssetExecutionContext): + step_description = f" asset '{context.op_execution_context.op_def.name}'" context = context.op_execution_context + else: + step_description = context.describe_op() if inspect.isgenerator(result): # this happens when a user explicitly returns a generator in the op for event in result: yield event elif isinstance(result, (AssetMaterialization, ExpectationResult)): raise DagsterInvariantViolationError( - f"Error in {context.describe_op()}: If you are " + f"Error in {step_description}: If you are " "returning an AssetMaterialization " "or an ExpectationResult from " f"{context.op_def.node_type_str} you must yield them " @@ -265,7 +270,7 @@ def validate_and_coerce_op_result_to_iterator( yield result elif result is not None and not output_defs: raise DagsterInvariantViolationError( - f"Error in {context.describe_op()}: Unexpectedly returned output of type" + f"Error in {step_description}: Unexpectedly returned output of type" f" {type(result)}. {context.op_def.node_type_str.capitalize()} is explicitly defined to" " return no results." ) @@ -277,7 +282,7 @@ def validate_and_coerce_op_result_to_iterator( if output_def.is_dynamic: if not isinstance(element, list): raise DagsterInvariantViolationError( - f"Error with output for {context.describe_op()}: " + f"Error with output for {step_description}: " f"dynamic output '{output_def.name}' expected a list of " "DynamicOutput objects, but instead received instead an " f"object of type {type(element)}." @@ -285,7 +290,7 @@ def validate_and_coerce_op_result_to_iterator( for item in element: if not isinstance(item, DynamicOutput): raise DagsterInvariantViolationError( - f"Error with output for {context.describe_op()}: " + f"Error with output for {step_description}: " f"dynamic output '{output_def.name}' at position {position} expected a " "list of DynamicOutput objects, but received an " f"item with type {type(item)}." @@ -307,7 +312,7 @@ def validate_and_coerce_op_result_to_iterator( annotation ): raise DagsterInvariantViolationError( - f"Error with output for {context.describe_op()}: received Output object for" + f"Error with output for {step_description}: received Output object for" f" output '{output_def.name}' which does not have an Output annotation." f" Annotation has type {annotation}." ) @@ -325,7 +330,7 @@ def validate_and_coerce_op_result_to_iterator( # output object was not received, throw an error. if is_generic_output_annotation(annotation): raise DagsterInvariantViolationError( - f"Error with output for {context.describe_op()}: output " + f"Error with output for {step_description}: output " f"'{output_def.name}' has generic output annotation, " "but did not receive an Output object for this output. " f"Received instead an object of type {type(element)}." @@ -333,7 +338,7 @@ def validate_and_coerce_op_result_to_iterator( if result is None and output_def.is_required is False: context.log.warning( 'Value "None" returned for non-required output ' - f'"{output_def.name}" of {context.describe_op()}. ' + f'"{output_def.name}" of {step_description}. ' "This value will be passed to downstream " f"{context.op_def.node_type_str}s. For conditional " "execution, results must be yielded: "