Skip to content

Commit

Permalink
move relevant updates from another branch
Browse files Browse the repository at this point in the history
  • Loading branch information
jamiedemaria committed Sep 21, 2023
1 parent ab0adc8 commit ecb5178
Showing 1 changed file with 13 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -242,17 +242,22 @@ def _check_output_object_name(


def validate_and_coerce_op_result_to_iterator(
result: Any, context: OpExecutionContext, output_defs: Sequence[OutputDefinition]
result: Any,
context: Union[OpExecutionContext, AssetExecutionContext],
output_defs: Sequence[OutputDefinition],
) -> Iterator[Any]:
if isinstance(context, AssetExecutionContext):
step_description = f" asset '{context.op_execution_context.op_def.name}'"
context = context.op_execution_context
else:
step_description = context.describe_op()
if inspect.isgenerator(result):
# this happens when a user explicitly returns a generator in the op
for event in result:
yield event
elif isinstance(result, (AssetMaterialization, ExpectationResult)):
raise DagsterInvariantViolationError(
f"Error in {context.describe_op()}: If you are "
f"Error in {step_description}: If you are "
"returning an AssetMaterialization "
"or an ExpectationResult from "
f"{context.op_def.node_type_str} you must yield them "
Expand All @@ -265,7 +270,7 @@ def validate_and_coerce_op_result_to_iterator(
yield result
elif result is not None and not output_defs:
raise DagsterInvariantViolationError(
f"Error in {context.describe_op()}: Unexpectedly returned output of type"
f"Error in {step_description}: Unexpectedly returned output of type"
f" {type(result)}. {context.op_def.node_type_str.capitalize()} is explicitly defined to"
" return no results."
)
Expand All @@ -277,15 +282,15 @@ def validate_and_coerce_op_result_to_iterator(
if output_def.is_dynamic:
if not isinstance(element, list):
raise DagsterInvariantViolationError(
f"Error with output for {context.describe_op()}: "
f"Error with output for {step_description}: "
f"dynamic output '{output_def.name}' expected a list of "
"DynamicOutput objects, but instead received instead an "
f"object of type {type(element)}."
)
for item in element:
if not isinstance(item, DynamicOutput):
raise DagsterInvariantViolationError(
f"Error with output for {context.describe_op()}: "
f"Error with output for {step_description}: "
f"dynamic output '{output_def.name}' at position {position} expected a "
"list of DynamicOutput objects, but received an "
f"item with type {type(item)}."
Expand All @@ -307,7 +312,7 @@ def validate_and_coerce_op_result_to_iterator(
annotation
):
raise DagsterInvariantViolationError(
f"Error with output for {context.describe_op()}: received Output object for"
f"Error with output for {step_description}: received Output object for"
f" output '{output_def.name}' which does not have an Output annotation."
f" Annotation has type {annotation}."
)
Expand All @@ -325,15 +330,15 @@ def validate_and_coerce_op_result_to_iterator(
# output object was not received, throw an error.
if is_generic_output_annotation(annotation):
raise DagsterInvariantViolationError(
f"Error with output for {context.describe_op()}: output "
f"Error with output for {step_description}: output "
f"'{output_def.name}' has generic output annotation, "
"but did not receive an Output object for this output. "
f"Received instead an object of type {type(element)}."
)
if result is None and output_def.is_required is False:
context.log.warning(
'Value "None" returned for non-required output '
f'"{output_def.name}" of {context.describe_op()}. '
f'"{output_def.name}" of {step_description}. '
"This value will be passed to downstream "
f"{context.op_def.node_type_str}s. For conditional "
"execution, results must be yielded: "
Expand Down

0 comments on commit ecb5178

Please sign in to comment.