Skip to content

Commit

Permalink
see how this goes
Browse files Browse the repository at this point in the history
  • Loading branch information
jamiedemaria committed Nov 16, 2023
1 parent 5043c25 commit b37ac00
Showing 1 changed file with 103 additions and 2 deletions.
105 changes: 103 additions & 2 deletions python_modules/dagster/dagster/_core/execution/context/invocation.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
from dagster._utils.forked_pdb import ForkedPdb
from dagster._utils.merger import merge_dicts

from .compute import ExecutionProperties, OpExecutionContext
from .compute import AssetExecutionContext, ExecutionProperties, OpExecutionContext, RunProperties
from .system import StepExecutionContext, TypeCheckContext


Expand Down Expand Up @@ -632,6 +632,105 @@ def _validate_resource_requirements(
ensure_requirements_satisfied(resource_defs, [requirement])


class DirectInvocationAssetExecutionContext(AssetExecutionContext):
"""The ``context`` object available as the first argument to an op's compute function when
being invoked directly. Can also be used as a context manager.
"""

def __init__(self, op_execution_context: DirectInvocationOpExecutionContext):
self._op_execution_context = op_execution_context

self._execution_props = None
self._run_props = None

# Indicates whether the context has been bound to a particular invocation of an asset
# @asset
# def my_asset(context):
# # context._bound is True
# ...
# ctx = build_asset_context() # ctx._bound is False
# my_asset(ctx)
# ctx._bound is True, must call ctx.unbind() to unbind
self._bound = False

def _check_bound(self, fn_name: str, fn_type: str):
if not self._bound:
raise DagsterInvalidPropertyError(_property_msg(fn_name, fn_type))

def bind(
self,
op_def: OpDefinition,
pending_invocation: Optional[PendingNodeInvocation[OpDefinition]],
assets_def: Optional[AssetsDefinition],
config_from_args: Optional[Mapping[str, Any]],
resources_from_args: Optional[Mapping[str, Any]],
) -> "DirectInvocationAssetExecutionContext":
if assets_def is None:
raise DagsterInvariantViolationError(
"DirectInvocationAssetExecutionContext can only being used to invoke an asset."
)
if self._bound:
warnings.warn(
f"This context was already used to execute {self.execution_properties.step_description}. The information about"
f" {self.execution_properties.step_description} will be cleared, including user events and output metadata."
" If you would like to keep this information, you can create a new context"
" using build_op_context() to invoke other ops. You can also manually clear the"
f" information about {self.execution_properties.step_description} using the unbind() method."
)
self.unbind()

self._op_execution_context = self._op_execution_context.bind(
op_def=op_def,
pending_invocation=pending_invocation,
assets_def=assets_def,
config_from_args=config_from_args,
resources_from_args=resources_from_args,
)

# update the bound context with properties relevant to the execution of the op

self._execution_props = ExecutionProperties(
step_description=f'asset "{assets_def.node_def.name}"',
op_execution_context=self._op_execution_context,
)

self._bound = True

return self

def unbind(self):
self._op_execution_context = self._op_execution_context.unbind()
self._execution_props = None

self._bound = False

@property
def op_execution_context(self) -> OpExecutionContext:
return self._op_execution_context

@property
def run_properties(self) -> RunProperties:
self._check_bound(fn_name="run_properties", fn_type="property")
if self._run_props is None:
self._run_props = RunProperties(
run_id=self.op_execution_context.run_id,
run_config=self.op_execution_context.run_config,
dagster_run=self.op_execution_context.run,
retry_number=self.op_execution_context.retry_number,
)
return self._run_props

@property
def execution_properties(self) -> ExecutionProperties:
self._check_bound(fn_name="run_properties", fn_type="property")
if self._execution_props is None:
self._execution_props = ExecutionProperties(
step_description=f"asset {self.op_execution_context.assets_def.node_def.name}",
op_execution_context=self.op_execution_context,
)
return self._execution_props


def build_op_context(
resources: Optional[Mapping[str, Any]] = None,
op_config: Any = None,
Expand Down Expand Up @@ -730,11 +829,13 @@ def build_asset_context(
with build_asset_context(resources={"foo": context_manager_resource}) as context:
asset_to_invoke(context)
"""
return build_op_context(
op_context = build_op_context(
op_config=asset_config,
resources=resources,
resources_config=resources_config,
partition_key=partition_key,
partition_key_range=partition_key_range,
instance=instance,
)

return AssetExecutionContext(op_execution_context=op_context)

0 comments on commit b37ac00

Please sign in to comment.