Skip to content

Commit

Permalink
make a copy of the context instead
Browse files Browse the repository at this point in the history
  • Loading branch information
jamiedemaria committed Nov 2, 2023
1 parent 0fac095 commit c9078de
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 80 deletions.
179 changes: 100 additions & 79 deletions python_modules/dagster/dagster/_core/execution/context/invocation.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ def __init__(
self._alias = None
self._hook_defs = None
self._tags = {}
self._seen_outputs = {}

# Indicates whether the context has been bound to a particular invocation of an op
# @op
Expand All @@ -147,6 +148,105 @@ def _check_bound(self, fn_name: str, fn_type: str):
if not self._bound:
raise DagsterInvalidPropertyError(_property_msg(fn_name, fn_type))

def bind(
self,
op_def: OpDefinition,
pending_invocation: Optional[PendingNodeInvocation[OpDefinition]],
assets_def: Optional[AssetsDefinition],
config_from_args: Optional[Mapping[str, Any]],
resources_from_args: Optional[Mapping[str, Any]],
) -> "DirectInvocationOpExecutionContext":
from dagster._core.definitions.resource_invocation import resolve_bound_config

if self._bound:
raise DagsterInvalidInvocationError(
"Cannot call bind() on a DirectInvocationOpExecutionContext that has already been bound."
)

# make a copy of the current context that will be bound to a particular op execution
bound_ctx = DirectInvocationOpExecutionContext(
op_config=self._op_config,
resources_dict=self._resource_defs,
instance=self._instance,
resources_config=self._resources_config,
partition_key=self._partition_key,
partition_key_range=self._partition_key_range,
mapping_key=self._mapping_key,
assets_def=self._assets_def,
)
bound_ctx._user_events = self._user_events # noqa: SLF001
bound_ctx._output_metadata = self._output_metadata # noqa: SLF001

# update the bound context with properties relevant to the execution of the op
bound_ctx._op_def = op_def # noqa: SLF001

invocation_tags = (
pending_invocation.tags
if isinstance(pending_invocation, PendingNodeInvocation)
else None
)
bound_ctx._tags = ( # noqa: SLF001
merge_dicts(bound_ctx._op_def.tags, invocation_tags) # noqa: SLF001
if invocation_tags
else bound_ctx._op_def.tags # noqa: SLF001
)

bound_ctx._hook_defs = ( # noqa: SLF001
pending_invocation.hook_defs
if isinstance(pending_invocation, PendingNodeInvocation)
else None
)
invocation_alias = (
pending_invocation.given_alias
if isinstance(pending_invocation, PendingNodeInvocation)
else None
)
bound_ctx._alias = invocation_alias if invocation_alias else bound_ctx._op_def.name # noqa: SLF001

bound_ctx._assets_def = assets_def # noqa: SLF001

if resources_from_args:
if self._resource_defs:
raise DagsterInvalidInvocationError(
"Cannot provide resources in both context and kwargs"
)
resource_defs = wrap_resources_for_execution(resources_from_args)
# add new resources context to the stack to be cleared on exit
bound_ctx._resources = self._exit_stack.enter_context( # noqa: SLF001
build_resources(resource_defs, self.instance)
)
elif assets_def and assets_def.resource_defs:
for key in sorted(list(assets_def.resource_defs.keys())):
if key in self._resource_defs:
raise DagsterInvalidInvocationError(
f"Error when invoking {assets_def!s} resource '{key}' "
"provided on both the definition and invocation context. Please "
"provide on only one or the other."
)
resource_defs = wrap_resources_for_execution(
{**self._resource_defs, **assets_def.resource_defs}
)
# add new resources context to the stack to be cleared on exit
bound_ctx._resources = self._exit_stack.enter_context( # noqa: SLF001
build_resources(resource_defs, self.instance, self._resources_config)
)
else:
bound_ctx._resources = self.resources # noqa: SLF001
resource_defs = self._resource_defs

_validate_resource_requirements(resource_defs, op_def)

if self.op_config and config_from_args:
raise DagsterInvalidInvocationError("Cannot provide config in both context and kwargs")
bound_ctx._op_config = resolve_bound_config(config_from_args or self.op_config, op_def) # noqa: SLF001

bound_ctx._seen_outputs = {} # noqa: SLF001
bound_ctx._requires_typed_event_stream = False # noqa: SLF001
bound_ctx._typed_event_stream_error_message = None # noqa: SLF001
bound_ctx._bound = True # noqa: SLF001

return bound_ctx

@property
def op_config(self) -> Any:
return self._op_config
Expand Down Expand Up @@ -298,85 +398,6 @@ def alias(self) -> str:
def get_step_execution_context(self) -> StepExecutionContext:
raise DagsterInvalidPropertyError(_property_msg("get_step_execution_context", "method"))

def bind(
self,
op_def: OpDefinition,
pending_invocation: Optional[PendingNodeInvocation[OpDefinition]],
assets_def: Optional[AssetsDefinition],
config_from_args: Optional[Mapping[str, Any]],
resources_from_args: Optional[Mapping[str, Any]],
) -> "DirectInvocationOpExecutionContext":
from dagster._core.definitions.resource_invocation import resolve_bound_config

self._op_def = op_def

invocation_tags = (
pending_invocation.tags
if isinstance(pending_invocation, PendingNodeInvocation)
else None
)
self._tags = (
merge_dicts(self._op_def.tags, invocation_tags)
if invocation_tags
else self._op_def.tags
)

self._hook_defs = (
pending_invocation.hook_defs
if isinstance(pending_invocation, PendingNodeInvocation)
else None
)
invocation_alias = (
pending_invocation.given_alias
if isinstance(pending_invocation, PendingNodeInvocation)
else None
)
self._alias = invocation_alias if invocation_alias else self._op_def.name

self._assets_def = assets_def

if resources_from_args:
if self._resource_defs:
raise DagsterInvalidInvocationError(
"Cannot provide resources in both context and kwargs"
)
resource_defs = wrap_resources_for_execution(resources_from_args)
# add new resources context to the stack to be cleared on exit
self._resources = self._exit_stack.enter_context(
build_resources(resource_defs, self.instance)
)
elif assets_def and assets_def.resource_defs:
for key in sorted(list(assets_def.resource_defs.keys())):
if key in self._resource_defs:
raise DagsterInvalidInvocationError(
f"Error when invoking {assets_def!s} resource '{key}' "
"provided on both the definition and invocation context. Please "
"provide on only one or the other."
)
resource_defs = wrap_resources_for_execution(
{**self._resource_defs, **assets_def.resource_defs}
)
# add new resources context to the stack to be cleared on exit
self._resources = self._exit_stack.enter_context(
build_resources(resource_defs, self.instance, self._resources_config)
)
else:
self._resources = self.resources
resource_defs = self._resource_defs

_validate_resource_requirements(resource_defs, op_def)

if self.op_config and config_from_args:
raise DagsterInvalidInvocationError("Cannot provide config in both context and kwargs")
self._op_config = resolve_bound_config(config_from_args or self.op_config, op_def)

self._seen_outputs = {}
self._requires_typed_event_stream = False
self._typed_event_stream_error_message = None
self._bound = True

return self

def get_events(self) -> Sequence[UserEvent]:
"""Retrieve the list of user-generated events that were logged via the context.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -440,4 +440,4 @@ def my_asset(context):
ctx = build_op_context()
assert not ctx._bound # noqa: SLF001
my_asset(ctx)
assert not ctx._bound # noqa: SLF001gi
assert not ctx._bound # noqa: SLF001

0 comments on commit c9078de

Please sign in to comment.