diff --git a/python_modules/dagster/dagster/_core/definitions/op_invocation.py b/python_modules/dagster/dagster/_core/definitions/op_invocation.py index 640cbe9439c5a..2b10f8fd8a4d3 100644 --- a/python_modules/dagster/dagster/_core/definitions/op_invocation.py +++ b/python_modules/dagster/dagster/_core/definitions/op_invocation.py @@ -18,7 +18,6 @@ DagsterInvariantViolationError, DagsterTypeCheckDidNotPass, ) -from dagster._core.execution.context.invocation import UnboundAssetExecutionContext from .events import ( AssetMaterialization, @@ -30,7 +29,7 @@ from .output import DynamicOutputDefinition if TYPE_CHECKING: - from ..execution.context.invocation import BoundOpExecutionContext + from ..execution.context.invocation import BoundAssetExecutionContext, BoundOpExecutionContext from .assets import AssetsDefinition from .composition import PendingNodeInvocation from .decorators.op_decorator import DecoratedOpFunction @@ -108,6 +107,7 @@ def direct_invocation_result( ) -> Any: from dagster._config.pythonic_config import Config from dagster._core.execution.context.invocation import ( + UnboundAssetExecutionContext, UnboundOpExecutionContext, build_op_context, ) @@ -155,7 +155,10 @@ def direct_invocation_result( f"Decorated function '{compute_fn.name}' has context argument, " "but no context was provided when invoking." ) - context = cast(UnboundOpExecutionContext, args[0]) + if isinstance(args[0], UnboundAssetExecutionContext): + context = cast(UnboundAssetExecutionContext, args[0]) + else: + context = cast(UnboundOpExecutionContext, args[0]) # update args to omit context args = args[1:] else: # context argument is provided under kwargs @@ -166,14 +169,22 @@ def direct_invocation_result( f"'{context_param_name}', but no value for '{context_param_name}' was " f"found when invoking. Provided kwargs: {kwargs}" ) - context = cast(UnboundOpExecutionContext, kwargs[context_param_name]) + if isinstance(kwargs[context_param_name], UnboundAssetExecutionContext): + context = cast(UnboundAssetExecutionContext, kwargs[context_param_name]) + else: + context = cast(UnboundOpExecutionContext, kwargs[context_param_name]) # update kwargs to remove context kwargs = { kwarg: val for kwarg, val in kwargs.items() if not kwarg == context_param_name } # allow passing context, even if the function doesn't have an arg for it - elif len(args) > 0 and isinstance(args[0], UnboundOpExecutionContext): - context = cast(UnboundOpExecutionContext, args[0]) + elif len(args) > 0 and isinstance( + args[0], (UnboundOpExecutionContext, UnboundAssetExecutionContext) + ): + if isinstance(args[0], UnboundAssetExecutionContext): + context = cast(UnboundAssetExecutionContext, args[0]) + else: + context = cast(UnboundOpExecutionContext, args[0]) args = args[1:] resource_arg_mapping = {arg.name: arg.name for arg in compute_fn.get_resource_args()} @@ -224,7 +235,10 @@ def direct_invocation_result( def _resolve_inputs( - op_def: "OpDefinition", args, kwargs, context: "BoundOpExecutionContext" + op_def: "OpDefinition", + args, + kwargs, + context: Union["BoundOpExecutionContext", "BoundAssetExecutionContext"], ) -> Mapping[str, Any]: from dagster._core.execution.plan.execute_step import do_type_check @@ -307,7 +321,7 @@ def _resolve_inputs( input_dict[k] = v # Type check inputs - op_label = context.describe_op() + step_label = context.describe_step() for input_name, val in input_dict.items(): input_def = input_defs_by_name[input_name] @@ -316,7 +330,7 @@ def _resolve_inputs( if not type_check.success: raise DagsterTypeCheckDidNotPass( description=( - f'Type check failed for {op_label} input "{input_def.name}" - ' + f'Type check failed for {step_label} input "{input_def.name}" - ' f'expected type "{dagster_type.display_name}". ' f"Description: {type_check.description}" ), @@ -328,7 +342,9 @@ def _resolve_inputs( def _type_check_output_wrapper( - op_def: "OpDefinition", result: Any, context: "BoundOpExecutionContext" + op_def: "OpDefinition", + result: Any, + context: Union["BoundOpExecutionContext", "BoundAssetExecutionContext"], ) -> Any: """Type checks and returns the result of a op. @@ -436,7 +452,9 @@ def type_check_gen(gen): def _type_check_function_output( - op_def: "OpDefinition", result: T, context: "BoundOpExecutionContext" + op_def: "OpDefinition", + result: T, + context: Union["BoundOpExecutionContext", "BoundAssetExecutionContext"], ) -> T: from ..execution.plan.compute_generator import validate_and_coerce_op_result_to_iterator @@ -447,7 +465,9 @@ def _type_check_function_output( def _type_check_output( - output_def: "OutputDefinition", output: T, context: "BoundOpExecutionContext" + output_def: "OutputDefinition", + output: T, + context: Union["BoundOpExecutionContext", "BoundAssetExecutionContext"], ) -> T: """Validates and performs core type check on a provided output. @@ -459,7 +479,7 @@ def _type_check_output( """ from ..execution.plan.execute_step import do_type_check - op_label = context.describe_op() + op_label = context.describe_step() if isinstance(output, (Output, DynamicOutput)): dagster_type = output_def.dagster_type diff --git a/python_modules/dagster/dagster/_core/execution/context/invocation.py b/python_modules/dagster/dagster/_core/execution/context/invocation.py index d9b0c88334db3..5d3d3ba7a3c5d 100644 --- a/python_modules/dagster/dagster/_core/execution/context/invocation.py +++ b/python_modules/dagster/dagster/_core/execution/context/invocation.py @@ -565,6 +565,9 @@ def get_mapping_key(self) -> Optional[str]: return self._mapping_key def describe_op(self) -> str: + return self.describe_step() + + def describe_step(self) -> str: if isinstance(self.op_def, OpDefinition): return f'op "{self.op_def.name}"' @@ -797,7 +800,6 @@ def __init__( instance: Optional[DagsterInstance], partition_key: Optional[str], partition_key_range: Optional[PartitionKeyRange], - mapping_key: Optional[str], assets_def: Optional[AssetsDefinition], ): self._op_execution_context = build_op_context( @@ -807,7 +809,7 @@ def __init__( instance=instance, partition_key_range=partition_key_range, partition_key=partition_key, - mapping_key=mapping_key, + mapping_key=None, _assets_def=assets_def, ) @@ -865,7 +867,7 @@ def test_my_op(): expectation_results = [event for event in all_user_events if isinstance(event, ExpectationResult)] ... """ - return self._op_execution_context._user_events + return self._op_execution_context._user_events # noqa: SLF001 def get_output_metadata( self, output_name: str, mapping_key: Optional[str] = None @@ -881,13 +883,13 @@ def get_output_metadata( Returns: Optional[Mapping[str, Any]]: The metadata values present for the output_name/mapping_key combination, if present. """ - metadata = self._op_execution_context._output_metadata.get(output_name) + metadata = self._op_execution_context._output_metadata.get(output_name) # noqa: SLF001 if mapping_key and metadata: return metadata.get(mapping_key) return metadata def get_mapping_key(self) -> Optional[str]: - return self._op_execution_context._mapping_key + return self._op_execution_context._mapping_key # noqa: SLF001 class BoundAssetExecutionContext(AssetExecutionContext): @@ -902,13 +904,16 @@ def __init__(self, bound_op_execution_context: BoundOpExecutionContext): @property def alias(self) -> str: - return self._op_execution_context._alias + return self._op_execution_context._alias # noqa: SLF001 + + def describe_step(self) -> str: + return f" asset '{self._op_execution_context.op_def.name}'" def for_type(self, dagster_type: DagsterType) -> TypeCheckContext: return self._op_execution_context.for_type(dagster_type=dagster_type) def get_mapping_key(self) -> Optional[str]: - return self._op_execution_context._mapping_key + return self._op_execution_context._mapping_key # noqa: SLF001 def observe_output(self, output_name: str, mapping_key: Optional[str] = None) -> None: self._op_execution_context.observe_output(output_name=output_name, mapping_key=mapping_key) @@ -928,7 +933,6 @@ def build_asset_context( partition_key_range: Optional[PartitionKeyRange] = None, # TODO - the below params were not originally params for this function, but are used by `build_op_context` # figure out what they are for anf if we need them - mapping_key: Optional[str] = None, _assets_def: Optional[AssetsDefinition] = None, ): """Builds asset execution context from provided parameters. @@ -968,6 +972,5 @@ def build_asset_context( partition_key_range, "partition_key_range", PartitionKeyRange ), instance=check.opt_inst_param(instance, "instance", DagsterInstance), - mapping_key=check.opt_str_param(mapping_key, "mapping_key"), assets_def=check.opt_inst_param(_assets_def, "_assets_def", AssetsDefinition), ) diff --git a/python_modules/dagster/dagster/_core/execution/plan/compute_generator.py b/python_modules/dagster/dagster/_core/execution/plan/compute_generator.py index 075fc884243fe..bd7b7d52b2c59 100644 --- a/python_modules/dagster/dagster/_core/execution/plan/compute_generator.py +++ b/python_modules/dagster/dagster/_core/execution/plan/compute_generator.py @@ -242,17 +242,22 @@ def _check_output_object_name( def validate_and_coerce_op_result_to_iterator( - result: Any, context: OpExecutionContext, output_defs: Sequence[OutputDefinition] + result: Any, + context: Union[OpExecutionContext, AssetExecutionContext], + output_defs: Sequence[OutputDefinition], ) -> Iterator[Any]: if isinstance(context, AssetExecutionContext): + step_description = f" asset '{context.op_execution_context.op_def.name}'" context = context.op_execution_context + else: + step_description = context.describe_op() if inspect.isgenerator(result): # this happens when a user explicitly returns a generator in the op for event in result: yield event elif isinstance(result, (AssetMaterialization, ExpectationResult)): raise DagsterInvariantViolationError( - f"Error in {context.describe_op()}: If you are " + f"Error in {step_description}: If you are " "returning an AssetMaterialization " "or an ExpectationResult from " f"{context.op_def.node_type_str} you must yield them " @@ -265,7 +270,7 @@ def validate_and_coerce_op_result_to_iterator( yield result elif result is not None and not output_defs: raise DagsterInvariantViolationError( - f"Error in {context.describe_op()}: Unexpectedly returned output of type" + f"Error in {step_description}: Unexpectedly returned output of type" f" {type(result)}. {context.op_def.node_type_str.capitalize()} is explicitly defined to" " return no results." ) @@ -277,7 +282,7 @@ def validate_and_coerce_op_result_to_iterator( if output_def.is_dynamic: if not isinstance(element, list): raise DagsterInvariantViolationError( - f"Error with output for {context.describe_op()}: " + f"Error with output for {step_description}: " f"dynamic output '{output_def.name}' expected a list of " "DynamicOutput objects, but instead received instead an " f"object of type {type(element)}." @@ -285,7 +290,7 @@ def validate_and_coerce_op_result_to_iterator( for item in element: if not isinstance(item, DynamicOutput): raise DagsterInvariantViolationError( - f"Error with output for {context.describe_op()}: " + f"Error with output for {step_description}: " f"dynamic output '{output_def.name}' at position {position} expected a " "list of DynamicOutput objects, but received an " f"item with type {type(item)}." @@ -307,7 +312,7 @@ def validate_and_coerce_op_result_to_iterator( annotation ): raise DagsterInvariantViolationError( - f"Error with output for {context.describe_op()}: received Output object for" + f"Error with output for {step_description}: received Output object for" f" output '{output_def.name}' which does not have an Output annotation." f" Annotation has type {annotation}." ) @@ -325,7 +330,7 @@ def validate_and_coerce_op_result_to_iterator( # output object was not received, throw an error. if is_generic_output_annotation(annotation): raise DagsterInvariantViolationError( - f"Error with output for {context.describe_op()}: output " + f"Error with output for {step_description}: output " f"'{output_def.name}' has generic output annotation, " "but did not receive an Output object for this output. " f"Received instead an object of type {type(element)}." @@ -333,7 +338,7 @@ def validate_and_coerce_op_result_to_iterator( if result is None and output_def.is_required is False: context.log.warning( 'Value "None" returned for non-required output ' - f'"{output_def.name}" of {context.describe_op()}. ' + f'"{output_def.name}" of {step_description}. ' "This value will be passed to downstream " f"{context.op_def.node_type_str}s. For conditional " "execution, results must be yielded: " diff --git a/python_modules/dagster/dagster_tests/core_tests/resource_tests/pythonic_resources/test_direct_invocation.py b/python_modules/dagster/dagster_tests/core_tests/resource_tests/pythonic_resources/test_direct_invocation.py index 8839f0c7dd438..e3ebb34c7206d 100644 --- a/python_modules/dagster/dagster_tests/core_tests/resource_tests/pythonic_resources/test_direct_invocation.py +++ b/python_modules/dagster/dagster_tests/core_tests/resource_tests/pythonic_resources/test_direct_invocation.py @@ -1,5 +1,5 @@ import pytest -from dagster import AssetExecutionContext, ConfigurableResource, asset, op +from dagster import AssetExecutionContext, ConfigurableResource, OpExecutionContext, asset, op from dagster._core.errors import ( DagsterInvalidInvocationError, ) @@ -11,7 +11,7 @@ class MyResource(ConfigurableResource): a_str: str @op - def my_op(context, my_resource: MyResource) -> str: + def my_op(context: OpExecutionContext, my_resource: MyResource) -> str: assert my_resource.a_str == "foo" return my_resource.a_str @@ -57,7 +57,9 @@ class MyResource(ConfigurableResource): a_str: str @op - def my_op(context, my_resource: MyResource, my_other_resource: MyResource) -> str: + def my_op( + context: OpExecutionContext, my_resource: MyResource, my_other_resource: MyResource + ) -> str: assert my_resource.a_str == "foo" assert my_other_resource.a_str == "bar" return my_resource.a_str @@ -118,7 +120,9 @@ class MyResource(ConfigurableResource): z: int @op - def my_wacky_addition_op(context, x: int, y: int, my_resource: MyResource) -> int: + def my_wacky_addition_op( + context: OpExecutionContext, x: int, y: int, my_resource: MyResource + ) -> int: return x + y + my_resource.z # Just providing context is ok, we'll use the resource from the context @@ -236,28 +240,36 @@ class MyResource(ConfigurableResource): z: int @asset - def my_wacky_addition_asset(context, x: int, y: int, my_resource: MyResource) -> int: + def my_wacky_addition_asset( + context: AssetExecutionContext, x: int, y: int, my_resource: MyResource + ) -> int: return x + y + my_resource.z # Just providing context is ok, we'll use the resource from the context # We are successfully able to input x and y as args assert ( - my_wacky_addition_asset(build_op_context(resources={"my_resource": MyResource(z=2)}), 4, 5) + my_wacky_addition_asset( + build_asset_context(resources={"my_resource": MyResource(z=2)}), 4, 5 + ) == 11 ) # We can also input x and y as kwargs assert ( my_wacky_addition_asset( - build_op_context(resources={"my_resource": MyResource(z=3)}), y=1, x=2 + build_asset_context(resources={"my_resource": MyResource(z=3)}), y=1, x=2 ) == 6 ) # Providing resource only as kwarg is ok, we'll use that (we still need a context though) # We can input x and y as args - assert my_wacky_addition_asset(build_op_context(), 10, 20, my_resource=MyResource(z=30)) == 60 + assert ( + my_wacky_addition_asset(build_asset_context(), 10, 20, my_resource=MyResource(z=30)) == 60 + ) # We can also input x and y as kwargs in this case - assert my_wacky_addition_asset(build_op_context(), y=1, x=2, my_resource=MyResource(z=3)) == 6 + assert ( + my_wacky_addition_asset(build_asset_context(), y=1, x=2, my_resource=MyResource(z=3)) == 6 + ) @asset def my_wacky_addition_asset_no_context(x: int, y: int, my_resource: MyResource) -> int: @@ -267,14 +279,14 @@ def my_wacky_addition_asset_no_context(x: int, y: int, my_resource: MyResource) # We can input x and y as args assert ( my_wacky_addition_asset_no_context( - build_op_context(resources={"my_resource": MyResource(z=2)}), 4, 5 + build_asset_context(resources={"my_resource": MyResource(z=2)}), 4, 5 ) == 11 ) # We can also input x and y as kwargs assert ( my_wacky_addition_asset_no_context( - build_op_context(resources={"my_resource": MyResource(z=3)}), y=1, x=2 + build_asset_context(resources={"my_resource": MyResource(z=3)}), y=1, x=2 ) == 6 ) @@ -385,19 +397,21 @@ class NumResource(ConfigurableResource): executed = {} @asset - def an_asset(context, my_resource: NumResource, my_other_resource: NumResource) -> None: + def an_asset( + context: AssetExecutionContext, my_resource: NumResource, my_other_resource: NumResource + ) -> None: assert context.resources.my_resource.num == 1 assert context.resources.my_other_resource.num == 2 assert my_resource.num == 1 assert my_other_resource.num == 2 executed["yes"] = True - an_asset(build_op_context(), NumResource(num=1), NumResource(num=2)) + an_asset(build_asset_context(), NumResource(num=1), NumResource(num=2)) assert executed["yes"] executed.clear() an_asset( - build_op_context(), my_resource=NumResource(num=1), my_other_resource=NumResource(num=2) + build_asset_context(), my_resource=NumResource(num=1), my_other_resource=NumResource(num=2) ) assert executed["yes"] executed.clear() @@ -405,7 +419,7 @@ def an_asset(context, my_resource: NumResource, my_other_resource: NumResource) an_asset( my_other_resource=NumResource(num=2), my_resource=NumResource(num=1), - context=build_op_context(), + context=build_asset_context(), ) assert executed["yes"] executed.clear()