diff --git a/python_modules/dagster/dagster/_core/execution/context/invocation.py b/python_modules/dagster/dagster/_core/execution/context/invocation.py index bcd860235bf7f..3ee1f74dc8118 100644 --- a/python_modules/dagster/dagster/_core/execution/context/invocation.py +++ b/python_modules/dagster/dagster/_core/execution/context/invocation.py @@ -55,7 +55,7 @@ from dagster._utils.forked_pdb import ForkedPdb from dagster._utils.merger import merge_dicts -from .compute import ExecutionProperties, OpExecutionContext +from .compute import AssetExecutionContext, ExecutionProperties, OpExecutionContext, RunProperties from .system import StepExecutionContext, TypeCheckContext @@ -632,6 +632,105 @@ def _validate_resource_requirements( ensure_requirements_satisfied(resource_defs, [requirement]) +class DirectInvocationAssetExecutionContext(AssetExecutionContext): + """The ``context`` object available as the first argument to an op's compute function when + being invoked directly. Can also be used as a context manager. + """ + + def __init__(self, op_execution_context: DirectInvocationOpExecutionContext): + self._op_execution_context = op_execution_context + + self._execution_props = None + self._run_props = None + + # Indicates whether the context has been bound to a particular invocation of an asset + # @asset + # def my_asset(context): + # # context._bound is True + # ... + # ctx = build_asset_context() # ctx._bound is False + # my_asset(ctx) + # ctx._bound is True, must call ctx.unbind() to unbind + self._bound = False + + def _check_bound(self, fn_name: str, fn_type: str): + if not self._bound: + raise DagsterInvalidPropertyError(_property_msg(fn_name, fn_type)) + + def bind( + self, + op_def: OpDefinition, + pending_invocation: Optional[PendingNodeInvocation[OpDefinition]], + assets_def: Optional[AssetsDefinition], + config_from_args: Optional[Mapping[str, Any]], + resources_from_args: Optional[Mapping[str, Any]], + ) -> "DirectInvocationAssetExecutionContext": + if assets_def is None: + raise DagsterInvariantViolationError( + "DirectInvocationAssetExecutionContext can only being used to invoke an asset." + ) + if self._bound: + warnings.warn( + f"This context was already used to execute {self.execution_properties.step_description}. The information about" + f" {self.execution_properties.step_description} will be cleared, including user events and output metadata." + " If you would like to keep this information, you can create a new context" + " using build_op_context() to invoke other ops. You can also manually clear the" + f" information about {self.execution_properties.step_description} using the unbind() method." + ) + self.unbind() + + self._op_execution_context = self._op_execution_context.bind( + op_def=op_def, + pending_invocation=pending_invocation, + assets_def=assets_def, + config_from_args=config_from_args, + resources_from_args=resources_from_args, + ) + + # update the bound context with properties relevant to the execution of the op + + self._execution_props = ExecutionProperties( + step_description=f'asset "{assets_def.node_def.name}"', + op_execution_context=self._op_execution_context, + ) + + self._bound = True + + return self + + def unbind(self): + self._op_execution_context = self._op_execution_context.unbind() + self._execution_props = None + + self._bound = False + + @property + def op_execution_context(self) -> OpExecutionContext: + return self._op_execution_context + + @property + def run_properties(self) -> RunProperties: + self._check_bound(fn_name="run_properties", fn_type="property") + if self._run_props is None: + self._run_props = RunProperties( + run_id=self.op_execution_context.run_id, + run_config=self.op_execution_context.run_config, + dagster_run=self.op_execution_context.run, + retry_number=self.op_execution_context.retry_number, + ) + return self._run_props + + @property + def execution_properties(self) -> ExecutionProperties: + self._check_bound(fn_name="run_properties", fn_type="property") + if self._execution_props is None: + self._execution_props = ExecutionProperties( + step_description=f"asset {self.op_execution_context.assets_def.node_def.name}", + op_execution_context=self.op_execution_context, + ) + return self._execution_props + + def build_op_context( resources: Optional[Mapping[str, Any]] = None, op_config: Any = None, @@ -730,7 +829,7 @@ def build_asset_context( with build_asset_context(resources={"foo": context_manager_resource}) as context: asset_to_invoke(context) """ - return build_op_context( + op_context = build_op_context( op_config=asset_config, resources=resources, resources_config=resources_config, @@ -738,3 +837,5 @@ def build_asset_context( partition_key_range=partition_key_range, instance=instance, ) + + return AssetExecutionContext(op_execution_context=op_context)