diff --git a/python_modules/dagster/dagster/_core/definitions/op_invocation.py b/python_modules/dagster/dagster/_core/definitions/op_invocation.py index 7b38150ad329d..afee2c768b34c 100644 --- a/python_modules/dagster/dagster/_core/definitions/op_invocation.py +++ b/python_modules/dagster/dagster/_core/definitions/op_invocation.py @@ -32,7 +32,9 @@ from .result import MaterializeResult if TYPE_CHECKING: - from ..execution.context.invocation import DirectInvocationOpExecutionContext + from ..execution.context.invocation import ( + DirectInvocationOpExecutionContext, + ) from .assets import AssetsDefinition from .composition import PendingNodeInvocation from .decorators.op_decorator import DecoratedOpFunction @@ -109,7 +111,7 @@ def direct_invocation_result( ) -> Any: from dagster._config.pythonic_config import Config from dagster._core.execution.context.invocation import ( - DirectInvocationOpExecutionContext, + BaseDirectInvocationContext, build_op_context, ) @@ -149,12 +151,12 @@ def direct_invocation_result( " no context was provided when invoking." ) if len(args) > 0: - if args[0] is not None and not isinstance(args[0], DirectInvocationOpExecutionContext): + if args[0] is not None and not isinstance(args[0], BaseDirectInvocationContext): raise DagsterInvalidInvocationError( f"Decorated function '{compute_fn.name}' has context argument, " "but no context was provided when invoking." ) - context = cast(DirectInvocationOpExecutionContext, args[0]) + context = args[0] # update args to omit context args = args[1:] else: # context argument is provided under kwargs @@ -165,14 +167,14 @@ def direct_invocation_result( f"'{context_param_name}', but no value for '{context_param_name}' was " f"found when invoking. Provided kwargs: {kwargs}" ) - context = cast(DirectInvocationOpExecutionContext, kwargs[context_param_name]) + context = kwargs[context_param_name] # update kwargs to remove context kwargs = { kwarg: val for kwarg, val in kwargs.items() if not kwarg == context_param_name } # allow passing context, even if the function doesn't have an arg for it - elif len(args) > 0 and isinstance(args[0], DirectInvocationOpExecutionContext): - context = cast(DirectInvocationOpExecutionContext, args[0]) + elif len(args) > 0 and isinstance(args[0], BaseDirectInvocationContext): + context = args[0] args = args[1:] resource_arg_mapping = {arg.name: arg.name for arg in compute_fn.get_resource_args()} diff --git a/python_modules/dagster/dagster/_core/execution/context/invocation.py b/python_modules/dagster/dagster/_core/execution/context/invocation.py index a218c23291b2a..7d847d2fd414b 100644 --- a/python_modules/dagster/dagster/_core/execution/context/invocation.py +++ b/python_modules/dagster/dagster/_core/execution/context/invocation.py @@ -63,7 +63,19 @@ def _property_msg(prop_name: str, method_name: str, step_type: str) -> str: return f"The {prop_name} {method_name} is not set on the context when an {step_type} is directly invoked." -class DirectInvocationOpExecutionContext(OpExecutionContext): +class BaseDirectInvocationContext: + def bind( + self, + op_def: OpDefinition, + pending_invocation: Optional[PendingNodeInvocation[OpDefinition]], + assets_def: Optional[AssetsDefinition], + config_from_args: Optional[Mapping[str, Any]], + resources_from_args: Optional[Mapping[str, Any]], + ): + pass + + +class DirectInvocationOpExecutionContext(OpExecutionContext, BaseDirectInvocationContext): """The ``context`` object available as the first argument to an op's compute function when being invoked directly. Can also be used as a context manager. """ @@ -632,7 +644,7 @@ def _validate_resource_requirements( ensure_requirements_satisfied(resource_defs, [requirement]) -class DirectInvocationAssetExecutionContext(AssetExecutionContext): +class DirectInvocationAssetExecutionContext(AssetExecutionContext, BaseDirectInvocationContext): """The ``context`` object available as the first argument to an op's compute function when being invoked directly. Can also be used as a context manager. """