diff --git a/python_modules/dagster/dagster/_core/execution/context/invocation.py b/python_modules/dagster/dagster/_core/execution/context/invocation.py index 1f0051a1835b9..44fab07575c8d 100644 --- a/python_modules/dagster/dagster/_core/execution/context/invocation.py +++ b/python_modules/dagster/dagster/_core/execution/context/invocation.py @@ -123,6 +123,7 @@ def __init__( self._alias = None self._hook_defs = None self._tags = {} + self._seen_outputs = {} # Indicates whether the context has been bound to a particular invocation of an op # @op @@ -147,6 +148,105 @@ def _check_bound(self, fn_name: str, fn_type: str): if not self._bound: raise DagsterInvalidPropertyError(_property_msg(fn_name, fn_type)) + def bind( + self, + op_def: OpDefinition, + pending_invocation: Optional[PendingNodeInvocation[OpDefinition]], + assets_def: Optional[AssetsDefinition], + config_from_args: Optional[Mapping[str, Any]], + resources_from_args: Optional[Mapping[str, Any]], + ) -> "DirectInvocationOpExecutionContext": + from dagster._core.definitions.resource_invocation import resolve_bound_config + + if self._bound: + raise DagsterInvalidInvocationError( + "Cannot call bind() on a DirectInvocationOpExecutionContext that has already been bound." + ) + + # make a copy of the current context that will be bound to a particular op execution + bound_ctx = DirectInvocationOpExecutionContext( + op_config=self._op_config, + resources_dict=self._resource_defs, + instance=self._instance, + resources_config=self._resources_config, + partition_key=self._partition_key, + partition_key_range=self._partition_key_range, + mapping_key=self._mapping_key, + assets_def=self._assets_def, + ) + bound_ctx._user_events = self._user_events # noqa: SLF001 + bound_ctx._output_metadata = self._output_metadata # noqa: SLF001 + + # update the bound context with properties relevant to the execution of the op + bound_ctx._op_def = op_def # noqa: SLF001 + + invocation_tags = ( + pending_invocation.tags + if isinstance(pending_invocation, PendingNodeInvocation) + else None + ) + bound_ctx._tags = ( # noqa: SLF001 + merge_dicts(bound_ctx._op_def.tags, invocation_tags) # noqa: SLF001 + if invocation_tags + else bound_ctx._op_def.tags # noqa: SLF001 + ) + + bound_ctx._hook_defs = ( # noqa: SLF001 + pending_invocation.hook_defs + if isinstance(pending_invocation, PendingNodeInvocation) + else None + ) + invocation_alias = ( + pending_invocation.given_alias + if isinstance(pending_invocation, PendingNodeInvocation) + else None + ) + bound_ctx._alias = invocation_alias if invocation_alias else bound_ctx._op_def.name # noqa: SLF001 + + bound_ctx._assets_def = assets_def # noqa: SLF001 + + if resources_from_args: + if self._resource_defs: + raise DagsterInvalidInvocationError( + "Cannot provide resources in both context and kwargs" + ) + resource_defs = wrap_resources_for_execution(resources_from_args) + # add new resources context to the stack to be cleared on exit + bound_ctx._resources = self._exit_stack.enter_context( # noqa: SLF001 + build_resources(resource_defs, self.instance) + ) + elif assets_def and assets_def.resource_defs: + for key in sorted(list(assets_def.resource_defs.keys())): + if key in self._resource_defs: + raise DagsterInvalidInvocationError( + f"Error when invoking {assets_def!s} resource '{key}' " + "provided on both the definition and invocation context. Please " + "provide on only one or the other." + ) + resource_defs = wrap_resources_for_execution( + {**self._resource_defs, **assets_def.resource_defs} + ) + # add new resources context to the stack to be cleared on exit + bound_ctx._resources = self._exit_stack.enter_context( # noqa: SLF001 + build_resources(resource_defs, self.instance, self._resources_config) + ) + else: + bound_ctx._resources = self.resources # noqa: SLF001 + resource_defs = self._resource_defs + + _validate_resource_requirements(resource_defs, op_def) + + if self.op_config and config_from_args: + raise DagsterInvalidInvocationError("Cannot provide config in both context and kwargs") + bound_ctx._op_config = resolve_bound_config(config_from_args or self.op_config, op_def) # noqa: SLF001 + + bound_ctx._seen_outputs = {} # noqa: SLF001 + bound_ctx._requires_typed_event_stream = False # noqa: SLF001 + bound_ctx._typed_event_stream_error_message = None # noqa: SLF001 + bound_ctx._bound = True # noqa: SLF001 + + return bound_ctx + @property def op_config(self) -> Any: return self._op_config @@ -298,85 +398,6 @@ def alias(self) -> str: def get_step_execution_context(self) -> StepExecutionContext: raise DagsterInvalidPropertyError(_property_msg("get_step_execution_context", "method")) - def bind( - self, - op_def: OpDefinition, - pending_invocation: Optional[PendingNodeInvocation[OpDefinition]], - assets_def: Optional[AssetsDefinition], - config_from_args: Optional[Mapping[str, Any]], - resources_from_args: Optional[Mapping[str, Any]], - ) -> "DirectInvocationOpExecutionContext": - from dagster._core.definitions.resource_invocation import resolve_bound_config - - self._op_def = op_def - - invocation_tags = ( - pending_invocation.tags - if isinstance(pending_invocation, PendingNodeInvocation) - else None - ) - self._tags = ( - merge_dicts(self._op_def.tags, invocation_tags) - if invocation_tags - else self._op_def.tags - ) - - self._hook_defs = ( - pending_invocation.hook_defs - if isinstance(pending_invocation, PendingNodeInvocation) - else None - ) - invocation_alias = ( - pending_invocation.given_alias - if isinstance(pending_invocation, PendingNodeInvocation) - else None - ) - self._alias = invocation_alias if invocation_alias else self._op_def.name - - self._assets_def = assets_def - - if resources_from_args: - if self._resource_defs: - raise DagsterInvalidInvocationError( - "Cannot provide resources in both context and kwargs" - ) - resource_defs = wrap_resources_for_execution(resources_from_args) - # add new resources context to the stack to be cleared on exit - self._resources = self._exit_stack.enter_context( - build_resources(resource_defs, self.instance) - ) - elif assets_def and assets_def.resource_defs: - for key in sorted(list(assets_def.resource_defs.keys())): - if key in self._resource_defs: - raise DagsterInvalidInvocationError( - f"Error when invoking {assets_def!s} resource '{key}' " - "provided on both the definition and invocation context. Please " - "provide on only one or the other." - ) - resource_defs = wrap_resources_for_execution( - {**self._resource_defs, **assets_def.resource_defs} - ) - # add new resources context to the stack to be cleared on exit - self._resources = self._exit_stack.enter_context( - build_resources(resource_defs, self.instance, self._resources_config) - ) - else: - self._resources = self.resources - resource_defs = self._resource_defs - - _validate_resource_requirements(resource_defs, op_def) - - if self.op_config and config_from_args: - raise DagsterInvalidInvocationError("Cannot provide config in both context and kwargs") - self._op_config = resolve_bound_config(config_from_args or self.op_config, op_def) - - self._seen_outputs = {} - self._requires_typed_event_stream = False - self._typed_event_stream_error_message = None - self._bound = True - - return self - def get_events(self) -> Sequence[UserEvent]: """Retrieve the list of user-generated events that were logged via the context. diff --git a/python_modules/dagster/dagster_tests/core_tests/resource_tests/pythonic_resources/test_direct_invocation.py b/python_modules/dagster/dagster_tests/core_tests/resource_tests/pythonic_resources/test_direct_invocation.py index 134ea16aa2edb..7dbb680cdc556 100644 --- a/python_modules/dagster/dagster_tests/core_tests/resource_tests/pythonic_resources/test_direct_invocation.py +++ b/python_modules/dagster/dagster_tests/core_tests/resource_tests/pythonic_resources/test_direct_invocation.py @@ -440,4 +440,4 @@ def my_asset(context): ctx = build_op_context() assert not ctx._bound # noqa: SLF001 my_asset(ctx) - assert not ctx._bound # noqa: SLF001gi + assert not ctx._bound # noqa: SLF001