From ea85cfbfb8499de162ab57dd20a0853289dfee3d Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Wed, 6 Dec 2023 17:00:57 -0500 Subject: [PATCH 01/27] add DI asset context --- .../_core/execution/context/invocation.py | 83 ++++++++++++++++++- 1 file changed, 81 insertions(+), 2 deletions(-) diff --git a/python_modules/dagster/dagster/_core/execution/context/invocation.py b/python_modules/dagster/dagster/_core/execution/context/invocation.py index 02038f15127ed..f697fd02b56d2 100644 --- a/python_modules/dagster/dagster/_core/execution/context/invocation.py +++ b/python_modules/dagster/dagster/_core/execution/context/invocation.py @@ -56,11 +56,12 @@ from dagster._utils.merger import merge_dicts from dagster._utils.warnings import deprecation_warning -from .compute import OpExecutionContext +from .compute import AssetExecutionContext, OpExecutionContext, RunProperties from .system import StepExecutionContext, TypeCheckContext def _property_msg(prop_name: str, method_name: str) -> str: + # TODO - update to handle assets too return ( f"The {prop_name} {method_name} is not set on the context when an op is directly invoked." ) @@ -706,6 +707,82 @@ def set_requires_typed_event_stream(self, *, error_message: Optional[str]) -> No self._execution_properties.typed_event_stream_error_message = error_message +class RunlessAssetExecutionContext(AssetExecutionContext): + """The ``context`` object available as the first argument to an asset's compute function when + being invoked directly. Can also be used as a context manager. + """ + + def __init__(self, op_execution_context: RunlessOpExecutionContext): + self._op_execution_context = op_execution_context + + self._run_props = None + + def _check_bound(self, fn_name: str, fn_type: str): + if not self._op_execution_context._bound_properties: # noqa: SLF001 + raise DagsterInvalidPropertyError(_property_msg(fn_name, fn_type)) + + def bind( + self, + op_def: OpDefinition, + pending_invocation: Optional[PendingNodeInvocation[OpDefinition]], + assets_def: Optional[AssetsDefinition], + config_from_args: Optional[Mapping[str, Any]], + resources_from_args: Optional[Mapping[str, Any]], + ) -> "RunlessAssetExecutionContext": + if assets_def is None: + raise DagsterInvariantViolationError( + "RunlessAssetExecutionContext can only being used to invoke an asset." + ) + if self._op_execution_context._bound_properties is not None: # noqa: SLF001 + raise DagsterInvalidInvocationError( + f"This context is currently being used to execute {self.op_execution_context.alias}." + " The context cannot be used to execute another asset until" + f" {self.op_execution_context.alias} has finished executing." + ) + + self._op_execution_context = self._op_execution_context.bind( + op_def=op_def, + pending_invocation=pending_invocation, + assets_def=assets_def, + config_from_args=config_from_args, + resources_from_args=resources_from_args, + ) + + return self + + def unbind(self): + self._op_execution_context = self._op_execution_context.unbind() + + @property + def op_execution_context(self) -> RunlessOpExecutionContext: + return self._op_execution_context + + def for_type(self, dagster_type: DagsterType) -> TypeCheckContext: + self._check_bound(fn_name="for_type", fn_type="method") + resources = cast(NamedTuple, self.resources) + return TypeCheckContext( + self.run_id, + self.log, + ScopedResourcesBuilder(resources._asdict()), + dagster_type, + ) + + def observe_output(self, output_name: str, mapping_key: Optional[str] = None) -> None: + self.op_execution_context.observe_output(output_name=output_name, mapping_key=mapping_key) + + @property + def run_properties(self) -> RunProperties: + self._check_bound(fn_name="run_properties", fn_type="property") + if self._run_props is None: + self._run_props = RunProperties( + run_id=self.op_execution_context.run_id, + run_config=self.op_execution_context.run_config, + dagster_run=self.op_execution_context.run, + retry_number=self.op_execution_context.retry_number, + ) + return self._run_props + + def _validate_resource_requirements( resource_defs: Mapping[str, ResourceDefinition], op_def: OpDefinition ) -> None: @@ -823,7 +900,7 @@ def build_asset_context( with build_asset_context(resources={"foo": context_manager_resource}) as context: asset_to_invoke(context) """ - return build_op_context( + op_context = build_op_context( op_config=asset_config, resources=resources, resources_config=resources_config, @@ -831,3 +908,5 @@ def build_asset_context( partition_key_range=partition_key_range, instance=instance, ) + + return RunlessAssetExecutionContext(op_execution_context=op_context) From 45bca56174724a5c767a609632df0f9f7f8f42db Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Wed, 6 Dec 2023 18:20:11 -0500 Subject: [PATCH 02/27] add parent class so typing can start to work --- .../_core/definitions/op_invocation.py | 42 +++++++++------ .../_core/execution/context/invocation.py | 53 ++++++++++++++++++- 2 files changed, 77 insertions(+), 18 deletions(-) diff --git a/python_modules/dagster/dagster/_core/definitions/op_invocation.py b/python_modules/dagster/dagster/_core/definitions/op_invocation.py index 3dc8e5efc1a67..eecb9f31ac803 100644 --- a/python_modules/dagster/dagster/_core/definitions/op_invocation.py +++ b/python_modules/dagster/dagster/_core/definitions/op_invocation.py @@ -19,6 +19,7 @@ DagsterInvariantViolationError, DagsterTypeCheckDidNotPass, ) +from dagster._core.execution.context.compute import AssetExecutionContext, OpExecutionContext from .events import ( AssetKey, @@ -32,7 +33,7 @@ from .result import MaterializeResult if TYPE_CHECKING: - from ..execution.context.invocation import DirectOpExecutionContext + from ..execution.context.invocation import BaseRunlessContext from .assets import AssetsDefinition from .composition import PendingNodeInvocation from .decorators.op_decorator import DecoratedOpFunction @@ -100,6 +101,14 @@ def _separate_args_and_kwargs( ) +def _get_op_context( + context: Union[OpExecutionContext, AssetExecutionContext] +) -> OpExecutionContext: + if isinstance(context, AssetExecutionContext): + return context.op_execution_context + return context + + def direct_invocation_result( def_or_invocation: Union[ "OpDefinition", "PendingNodeInvocation[OpDefinition]", "AssetsDefinition" @@ -109,7 +118,7 @@ def direct_invocation_result( ) -> Any: from dagster._config.pythonic_config import Config from dagster._core.execution.context.invocation import ( - DirectOpExecutionContext, + BaseRunlessContext, build_op_context, ) @@ -149,12 +158,12 @@ def direct_invocation_result( " no context was provided when invoking." ) if len(args) > 0: - if args[0] is not None and not isinstance(args[0], DirectOpExecutionContext): + if args[0] is not None and not isinstance(args[0], BaseRunlessContext): raise DagsterInvalidInvocationError( f"Decorated function '{compute_fn.name}' has context argument, " "but no context was provided when invoking." ) - context = cast(DirectOpExecutionContext, args[0]) + context = args[0] # update args to omit context args = args[1:] else: # context argument is provided under kwargs @@ -165,14 +174,14 @@ def direct_invocation_result( f"'{context_param_name}', but no value for '{context_param_name}' was " f"found when invoking. Provided kwargs: {kwargs}" ) - context = cast(DirectOpExecutionContext, kwargs[context_param_name]) + context = kwargs[context_param_name] # update kwargs to remove context kwargs = { kwarg: val for kwarg, val in kwargs.items() if not kwarg == context_param_name } # allow passing context, even if the function doesn't have an arg for it - elif len(args) > 0 and isinstance(args[0], DirectOpExecutionContext): - context = cast(DirectOpExecutionContext, args[0]) + elif len(args) > 0 and isinstance(args[0], BaseRunlessContext): + context = args[0] args = args[1:] resource_arg_mapping = {arg.name: arg.name for arg in compute_fn.get_resource_args()} @@ -230,7 +239,7 @@ def direct_invocation_result( def _resolve_inputs( - op_def: "OpDefinition", args, kwargs, context: "DirectOpExecutionContext" + op_def: "OpDefinition", args, kwargs, context: "BaseRunlessContext" ) -> Mapping[str, Any]: from dagster._core.execution.plan.execute_step import do_type_check @@ -333,7 +342,7 @@ def _resolve_inputs( return input_dict -def _key_for_result(result: MaterializeResult, context: "DirectOpExecutionContext") -> AssetKey: +def _key_for_result(result: MaterializeResult, context: "BaseRunlessContext") -> AssetKey: if not context.per_invocation_properties.assets_def: raise DagsterInvariantViolationError( f"Op {context.per_invocation_properties.alias} does not have an assets definition." @@ -355,7 +364,7 @@ def _key_for_result(result: MaterializeResult, context: "DirectOpExecutionContex def _output_name_for_result_obj( event: MaterializeResult, - context: "DirectOpExecutionContext", + context: "BaseRunlessContext", ): if not context.per_invocation_properties.assets_def: raise DagsterInvariantViolationError( @@ -368,7 +377,7 @@ def _output_name_for_result_obj( def _handle_gen_event( event: T, op_def: "OpDefinition", - context: "DirectOpExecutionContext", + context: "BaseRunlessContext", output_defs: Mapping[str, OutputDefinition], outputs_seen: Set[str], ) -> T: @@ -402,7 +411,7 @@ def _handle_gen_event( def _type_check_output_wrapper( - op_def: "OpDefinition", result: Any, context: "DirectOpExecutionContext" + op_def: "OpDefinition", result: Any, context: "BaseRunlessContext" ) -> Any: """Type checks and returns the result of a op. @@ -496,12 +505,13 @@ def type_check_gen(gen): def _type_check_function_output( - op_def: "OpDefinition", result: T, context: "DirectOpExecutionContext" + op_def: "OpDefinition", result: T, context: "BaseRunlessContext" ) -> T: from ..execution.plan.compute_generator import validate_and_coerce_op_result_to_iterator output_defs_by_name = {output_def.name: output_def for output_def in op_def.output_defs} - for event in validate_and_coerce_op_result_to_iterator(result, context, op_def.output_defs): + op_context = _get_op_context(context) + for event in validate_and_coerce_op_result_to_iterator(result, op_context, op_def.output_defs): if isinstance(event, (Output, DynamicOutput)): _type_check_output(output_defs_by_name[event.output_name], event, context) elif isinstance(event, (MaterializeResult)): @@ -515,14 +525,14 @@ def _type_check_function_output( def _type_check_output( output_def: "OutputDefinition", output: Union[Output, DynamicOutput], - context: "DirectOpExecutionContext", + context: "BaseRunlessContext", ) -> None: """Validates and performs core type check on a provided output. Args: output_def (OutputDefinition): The output definition to validate against. output (Any): The output to validate. - context (DirectOpExecutionContext): Context containing resources to be used for type + context (BaseRunlessContext): Context containing resources to be used for type check. """ from ..execution.plan.execute_step import do_type_check diff --git a/python_modules/dagster/dagster/_core/execution/context/invocation.py b/python_modules/dagster/dagster/_core/execution/context/invocation.py index f697fd02b56d2..8c4d54f996058 100644 --- a/python_modules/dagster/dagster/_core/execution/context/invocation.py +++ b/python_modules/dagster/dagster/_core/execution/context/invocation.py @@ -1,3 +1,4 @@ +from abc import abstractmethod from contextlib import ExitStack from typing import ( AbstractSet, @@ -66,6 +67,36 @@ def _property_msg(prop_name: str, method_name: str) -> str: f"The {prop_name} {method_name} is not set on the context when an op is directly invoked." ) +class BaseRunlessContext: + @abstractmethod + def bind( + self, + op_def: OpDefinition, + pending_invocation: Optional[PendingNodeInvocation[OpDefinition]], + assets_def: Optional[AssetsDefinition], + config_from_args: Optional[Mapping[str, Any]], + resources_from_args: Optional[Mapping[str, Any]], + ): + """Instances of BsaeRunlessContest must implement bind.""" + + @abstractmethod + def unbind(self): + """Instances of BsaeRunlessContest must implement unbind.""" + + @property + @abstractmethod + def bound_properties(self) -> "BoundProperties": + """Instances of BaseRunlessContext must contain a BoundProperties object.""" + + @property + @abstractmethod + def execution_properties(self) -> "RunlessExecutionProperties": + """Instances of BaseRunlessContext must contain a RunlessExecutionProperties object.""" + + @abstractmethod + def for_type(self, dagster_type: DagsterType) -> TypeCheckContext: + pass + class PerInvocationProperties( NamedTuple( @@ -128,7 +159,7 @@ def __init__(self): self.typed_event_stream_error_message: Optional[str] = None -class DirectOpExecutionContext(OpExecutionContext): +class DirectOpExecutionContext(OpExecutionContext, BaseRunlessContext): """The ``context`` object available as the first argument to an op's compute function when being invoked directly. Can also be used as a context manager. """ @@ -707,7 +738,7 @@ def set_requires_typed_event_stream(self, *, error_message: Optional[str]) -> No self._execution_properties.typed_event_stream_error_message = error_message -class RunlessAssetExecutionContext(AssetExecutionContext): +class RunlessAssetExecutionContext(AssetExecutionContext, BaseRunlessContext): """The ``context`` object available as the first argument to an asset's compute function when being invoked directly. Can also be used as a context manager. """ @@ -717,6 +748,16 @@ def __init__(self, op_execution_context: RunlessOpExecutionContext): self._run_props = None + def __enter__(self): + self.op_execution_context._cm_scope_entered = True # noqa: SLF001 + return self + + def __exit__(self, *exc): + self.op_execution_context._exit_stack.close() # noqa: SLF001 + + def __del__(self): + self.op_execution_context._exit_stack.close() # noqa: SLF001 + def _check_bound(self, fn_name: str, fn_type: str): if not self._op_execution_context._bound_properties: # noqa: SLF001 raise DagsterInvalidPropertyError(_property_msg(fn_name, fn_type)) @@ -753,6 +794,14 @@ def bind( def unbind(self): self._op_execution_context = self._op_execution_context.unbind() + @property + def bound_properties(self) -> BoundProperties: + return self.op_execution_context.bound_properties + + @property + def execution_properties(self) -> RunlessExecutionProperties: + return self.op_execution_context.execution_properties + @property def op_execution_context(self) -> RunlessOpExecutionContext: return self._op_execution_context From 8a5b90e16c46159cd0f73e8b0e13b6078ee2b6fb Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Thu, 7 Dec 2023 16:34:26 -0500 Subject: [PATCH 03/27] fix circular import --- .../dagster/dagster/_core/definitions/op_invocation.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/python_modules/dagster/dagster/_core/definitions/op_invocation.py b/python_modules/dagster/dagster/_core/definitions/op_invocation.py index eecb9f31ac803..b055c19d2bd46 100644 --- a/python_modules/dagster/dagster/_core/definitions/op_invocation.py +++ b/python_modules/dagster/dagster/_core/definitions/op_invocation.py @@ -19,7 +19,6 @@ DagsterInvariantViolationError, DagsterTypeCheckDidNotPass, ) -from dagster._core.execution.context.compute import AssetExecutionContext, OpExecutionContext from .events import ( AssetKey, @@ -33,6 +32,8 @@ from .result import MaterializeResult if TYPE_CHECKING: + from dagster._core.execution.context.compute import OpExecutionContext + from ..execution.context.invocation import BaseRunlessContext from .assets import AssetsDefinition from .composition import PendingNodeInvocation @@ -102,8 +103,10 @@ def _separate_args_and_kwargs( def _get_op_context( - context: Union[OpExecutionContext, AssetExecutionContext] -) -> OpExecutionContext: + context, # TODO - type hint +) -> "OpExecutionContext": + from dagster._core.execution.context.compute import AssetExecutionContext + if isinstance(context, AssetExecutionContext): return context.op_execution_context return context From 007fc0ee417ad6f19c1068e52a9881fa4f6cbacf Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Thu, 7 Dec 2023 16:35:11 -0500 Subject: [PATCH 04/27] fix circular import --- .../dagster/dagster/_core/definitions/op_invocation.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/python_modules/dagster/dagster/_core/definitions/op_invocation.py b/python_modules/dagster/dagster/_core/definitions/op_invocation.py index b055c19d2bd46..2970a5cea9270 100644 --- a/python_modules/dagster/dagster/_core/definitions/op_invocation.py +++ b/python_modules/dagster/dagster/_core/definitions/op_invocation.py @@ -32,8 +32,7 @@ from .result import MaterializeResult if TYPE_CHECKING: - from dagster._core.execution.context.compute import OpExecutionContext - + from ..execution.context.compute import OpExecutionContext from ..execution.context.invocation import BaseRunlessContext from .assets import AssetsDefinition from .composition import PendingNodeInvocation From 26a4ea1f7b23d35fbd9f660ad93a2f2121413a52 Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Fri, 8 Dec 2023 11:48:42 -0500 Subject: [PATCH 05/27] fixt two tests --- .../dagster/dagster/_core/execution/context/invocation.py | 2 +- .../dagster/dagster_tests/core_tests/test_op_invocation.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/python_modules/dagster/dagster/_core/execution/context/invocation.py b/python_modules/dagster/dagster/_core/execution/context/invocation.py index 8c4d54f996058..3e5ec0ab0352f 100644 --- a/python_modules/dagster/dagster/_core/execution/context/invocation.py +++ b/python_modules/dagster/dagster/_core/execution/context/invocation.py @@ -792,7 +792,7 @@ def bind( return self def unbind(self): - self._op_execution_context = self._op_execution_context.unbind() + self._op_execution_context.unbind() @property def bound_properties(self) -> BoundProperties: diff --git a/python_modules/dagster/dagster_tests/core_tests/test_op_invocation.py b/python_modules/dagster/dagster_tests/core_tests/test_op_invocation.py index a0deafa4c784f..7ceb3bf2ea220 100644 --- a/python_modules/dagster/dagster_tests/core_tests/test_op_invocation.py +++ b/python_modules/dagster/dagster_tests/core_tests/test_op_invocation.py @@ -1383,7 +1383,7 @@ async def main(): with pytest.raises( DagsterInvalidInvocationError, match=r"This context is currently being used to execute .* The context" - r" cannot be used to execute another op until .* has finished executing", + r" cannot be used to execute another asset until .* has finished executing", ): asyncio.run(main()) From daed438fe9f5fbc46926114d5dd70ad5c69107b5 Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Fri, 8 Dec 2023 12:17:57 -0500 Subject: [PATCH 06/27] add is_bound --- .../dagster/dagster/_core/execution/context/invocation.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/python_modules/dagster/dagster/_core/execution/context/invocation.py b/python_modules/dagster/dagster/_core/execution/context/invocation.py index 3e5ec0ab0352f..5517fb4078ee6 100644 --- a/python_modules/dagster/dagster/_core/execution/context/invocation.py +++ b/python_modules/dagster/dagster/_core/execution/context/invocation.py @@ -798,6 +798,10 @@ def unbind(self): def bound_properties(self) -> BoundProperties: return self.op_execution_context.bound_properties + @property + def is_bound(self) -> bool: + return self.op_execution_context.is_bound + @property def execution_properties(self) -> RunlessExecutionProperties: return self.op_execution_context.execution_properties From 6d710dc3311fc582bc663af5adfa3bf3ee01c54e Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Fri, 8 Dec 2023 14:31:13 -0500 Subject: [PATCH 07/27] add run prop test --- .../dagster/_core/execution/context/invocation.py | 15 +++++++++++---- .../core_tests/test_op_invocation.py | 14 ++++++++++++++ 2 files changed, 25 insertions(+), 4 deletions(-) diff --git a/python_modules/dagster/dagster/_core/execution/context/invocation.py b/python_modules/dagster/dagster/_core/execution/context/invocation.py index 5517fb4078ee6..2b8df761518d6 100644 --- a/python_modules/dagster/dagster/_core/execution/context/invocation.py +++ b/python_modules/dagster/dagster/_core/execution/context/invocation.py @@ -159,6 +159,12 @@ def __init__(self): self.typed_event_stream_error_message: Optional[str] = None +class RunlessRunProperties(RunProperties): + @property + def dagster_run(self): + raise DagsterInvalidPropertyError(_property_msg("dagster_run", "property")) + + class DirectOpExecutionContext(OpExecutionContext, BaseRunlessContext): """The ``context`` object available as the first argument to an op's compute function when being invoked directly. Can also be used as a context manager. @@ -814,7 +820,7 @@ def for_type(self, dagster_type: DagsterType) -> TypeCheckContext: self._check_bound(fn_name="for_type", fn_type="method") resources = cast(NamedTuple, self.resources) return TypeCheckContext( - self.run_id, + self.run_properties.run_id, self.log, ScopedResourcesBuilder(resources._asdict()), dagster_type, @@ -827,11 +833,12 @@ def observe_output(self, output_name: str, mapping_key: Optional[str] = None) -> def run_properties(self) -> RunProperties: self._check_bound(fn_name="run_properties", fn_type="property") if self._run_props is None: - self._run_props = RunProperties( + self._run_props = RunlessRunProperties( run_id=self.op_execution_context.run_id, run_config=self.op_execution_context.run_config, - dagster_run=self.op_execution_context.run, - retry_number=self.op_execution_context.retry_number, + # pass None for dagster_run, since RunlessRunProperties raises an exception for this attr + dagster_run=None, # type: ignore + retry_number=0, ) return self._run_props diff --git a/python_modules/dagster/dagster_tests/core_tests/test_op_invocation.py b/python_modules/dagster/dagster_tests/core_tests/test_op_invocation.py index 7ceb3bf2ea220..b0e4830d91f3a 100644 --- a/python_modules/dagster/dagster_tests/core_tests/test_op_invocation.py +++ b/python_modules/dagster/dagster_tests/core_tests/test_op_invocation.py @@ -1589,3 +1589,17 @@ async def get_results(): asyncio.run(get_results()) assert_context_unbound(ctx) + + +def test_run_properties_access(): + @asset + def access_run_properties(context: AssetExecutionContext): + assert context.run_properties.run_id == "EPHEMERAL" + assert context.run_properties.retry_number == 0 + + with pytest.raises(DagsterInvalidPropertyError): + context.run_properties.dagster_run # noqa:B018 + + ctx = build_asset_context() + + access_run_properties(ctx) From 260fa035720a79a68fb8f18b60c0f3fe4c43dc55 Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Fri, 8 Dec 2023 15:30:59 -0500 Subject: [PATCH 08/27] pipes --- python_modules/dagster/dagster/_core/pipes/context.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/python_modules/dagster/dagster/_core/pipes/context.py b/python_modules/dagster/dagster/_core/pipes/context.py index 5b61647bbf43b..173a69e03dd17 100644 --- a/python_modules/dagster/dagster/_core/pipes/context.py +++ b/python_modules/dagster/dagster/_core/pipes/context.py @@ -39,7 +39,11 @@ from dagster._core.errors import DagsterPipesExecutionError from dagster._core.events import EngineEventData from dagster._core.execution.context.compute import OpExecutionContext +<<<<<<< HEAD from dagster._core.execution.context.invocation import DirectOpExecutionContext +======= +from dagster._core.execution.context.invocation import BaseRunlessContext +>>>>>>> c4d31671bc (pipes) from dagster._utils.error import ( ExceptionInfo, SerializableErrorInfo, @@ -406,8 +410,8 @@ def build_external_execution_context_data( _convert_time_window(partition_time_window) if partition_time_window else None ), run_id=context.run_id, - job_name=None if isinstance(context, DirectOpExecutionContext) else context.job_name, - retry_number=0 if isinstance(context, DirectOpExecutionContext) else context.retry_number, + job_name=None if isinstance(context, BaseRunlessContext) else context.job_name, + retry_number=0 if isinstance(context, BaseRunlessContext) else context.retry_number, extras=extras or {}, ) From 2f31b871e94b3f0b9447f7e73fde384e86ab8841 Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Thu, 28 Dec 2023 15:08:08 -0800 Subject: [PATCH 09/27] update for dowstream changes --- .../_core/execution/context/invocation.py | 66 ++++++------------- 1 file changed, 19 insertions(+), 47 deletions(-) diff --git a/python_modules/dagster/dagster/_core/execution/context/invocation.py b/python_modules/dagster/dagster/_core/execution/context/invocation.py index 2b8df761518d6..daa6ca1d7b275 100644 --- a/python_modules/dagster/dagster/_core/execution/context/invocation.py +++ b/python_modules/dagster/dagster/_core/execution/context/invocation.py @@ -57,17 +57,15 @@ from dagster._utils.merger import merge_dicts from dagster._utils.warnings import deprecation_warning -from .compute import AssetExecutionContext, OpExecutionContext, RunProperties +from .compute import AssetExecutionContext, OpExecutionContext from .system import StepExecutionContext, TypeCheckContext def _property_msg(prop_name: str, method_name: str) -> str: - # TODO - update to handle assets too - return ( - f"The {prop_name} {method_name} is not set on the context when an op is directly invoked." - ) + return f"The {prop_name} {method_name} is not set on the context when an asset or op is directly invoked." + -class BaseRunlessContext: +class BasDirectExecutionContext: @abstractmethod def bind( self, @@ -77,21 +75,21 @@ def bind( config_from_args: Optional[Mapping[str, Any]], resources_from_args: Optional[Mapping[str, Any]], ): - """Instances of BsaeRunlessContest must implement bind.""" + """Instances of BasDirectExecutionContext must implement bind.""" @abstractmethod def unbind(self): - """Instances of BsaeRunlessContest must implement unbind.""" + """Instances of BasDirectExecutionContext must implement unbind.""" @property @abstractmethod def bound_properties(self) -> "BoundProperties": - """Instances of BaseRunlessContext must contain a BoundProperties object.""" + """Instances of BasDirectExecutionContext must contain a BoundProperties object.""" @property @abstractmethod - def execution_properties(self) -> "RunlessExecutionProperties": - """Instances of BaseRunlessContext must contain a RunlessExecutionProperties object.""" + def execution_properties(self) -> "DirectExecutionProperties": + """Instances of BasDirectExecutionContext must contain a DirectExecutionProperties object.""" @abstractmethod def for_type(self, dagster_type: DagsterType) -> TypeCheckContext: @@ -159,13 +157,7 @@ def __init__(self): self.typed_event_stream_error_message: Optional[str] = None -class RunlessRunProperties(RunProperties): - @property - def dagster_run(self): - raise DagsterInvalidPropertyError(_property_msg("dagster_run", "property")) - - -class DirectOpExecutionContext(OpExecutionContext, BaseRunlessContext): +class DirectOpExecutionContext(OpExecutionContext, BasDirectExecutionContext): """The ``context`` object available as the first argument to an op's compute function when being invoked directly. Can also be used as a context manager. """ @@ -744,12 +736,12 @@ def set_requires_typed_event_stream(self, *, error_message: Optional[str]) -> No self._execution_properties.typed_event_stream_error_message = error_message -class RunlessAssetExecutionContext(AssetExecutionContext, BaseRunlessContext): +class DirectAssetExecutionContext(AssetExecutionContext, BasDirectExecutionContext): """The ``context`` object available as the first argument to an asset's compute function when being invoked directly. Can also be used as a context manager. """ - def __init__(self, op_execution_context: RunlessOpExecutionContext): + def __init__(self, op_execution_context: DirectOpExecutionContext): self._op_execution_context = op_execution_context self._run_props = None @@ -775,10 +767,10 @@ def bind( assets_def: Optional[AssetsDefinition], config_from_args: Optional[Mapping[str, Any]], resources_from_args: Optional[Mapping[str, Any]], - ) -> "RunlessAssetExecutionContext": + ) -> "DirectAssetExecutionContext": if assets_def is None: raise DagsterInvariantViolationError( - "RunlessAssetExecutionContext can only being used to invoke an asset." + "DirectAssetExecutionContext can only being used to invoke an asset." ) if self._op_execution_context._bound_properties is not None: # noqa: SLF001 raise DagsterInvalidInvocationError( @@ -809,39 +801,19 @@ def is_bound(self) -> bool: return self.op_execution_context.is_bound @property - def execution_properties(self) -> RunlessExecutionProperties: + def execution_properties(self) -> DirectExecutionProperties: return self.op_execution_context.execution_properties @property - def op_execution_context(self) -> RunlessOpExecutionContext: + def op_execution_context(self) -> DirectOpExecutionContext: return self._op_execution_context def for_type(self, dagster_type: DagsterType) -> TypeCheckContext: - self._check_bound(fn_name="for_type", fn_type="method") - resources = cast(NamedTuple, self.resources) - return TypeCheckContext( - self.run_properties.run_id, - self.log, - ScopedResourcesBuilder(resources._asdict()), - dagster_type, - ) + return self.op_execution_context.for_type(dagster_type) def observe_output(self, output_name: str, mapping_key: Optional[str] = None) -> None: self.op_execution_context.observe_output(output_name=output_name, mapping_key=mapping_key) - @property - def run_properties(self) -> RunProperties: - self._check_bound(fn_name="run_properties", fn_type="property") - if self._run_props is None: - self._run_props = RunlessRunProperties( - run_id=self.op_execution_context.run_id, - run_config=self.op_execution_context.run_config, - # pass None for dagster_run, since RunlessRunProperties raises an exception for this attr - dagster_run=None, # type: ignore - retry_number=0, - ) - return self._run_props - def _validate_resource_requirements( resource_defs: Mapping[str, ResourceDefinition], op_def: OpDefinition @@ -933,7 +905,7 @@ def build_asset_context( instance: Optional[DagsterInstance] = None, partition_key: Optional[str] = None, partition_key_range: Optional[PartitionKeyRange] = None, -): +) -> DirectAssetExecutionContext: """Builds asset execution context from provided parameters. ``build_asset_context`` can be used as either a function or context manager. If there is a @@ -969,4 +941,4 @@ def build_asset_context( instance=instance, ) - return RunlessAssetExecutionContext(op_execution_context=op_context) + return DirectAssetExecutionContext(op_execution_context=op_context) From bf0f24dcacd11f8f02c98e0c1ec0489cfce6a8b4 Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Thu, 28 Dec 2023 15:27:21 -0800 Subject: [PATCH 10/27] update naming --- .../_core/definitions/op_invocation.py | 24 +++++++++---------- .../_core/execution/context/invocation.py | 14 +++++------ .../dagster/dagster/_core/pipes/context.py | 10 +++----- 3 files changed, 22 insertions(+), 26 deletions(-) diff --git a/python_modules/dagster/dagster/_core/definitions/op_invocation.py b/python_modules/dagster/dagster/_core/definitions/op_invocation.py index 2970a5cea9270..5b03d0509e9db 100644 --- a/python_modules/dagster/dagster/_core/definitions/op_invocation.py +++ b/python_modules/dagster/dagster/_core/definitions/op_invocation.py @@ -33,7 +33,7 @@ if TYPE_CHECKING: from ..execution.context.compute import OpExecutionContext - from ..execution.context.invocation import BaseRunlessContext + from ..execution.context.invocation import BaseDirectExecutionContext from .assets import AssetsDefinition from .composition import PendingNodeInvocation from .decorators.op_decorator import DecoratedOpFunction @@ -120,7 +120,7 @@ def direct_invocation_result( ) -> Any: from dagster._config.pythonic_config import Config from dagster._core.execution.context.invocation import ( - BaseRunlessContext, + BaseDirectExecutionContext, build_op_context, ) @@ -160,7 +160,7 @@ def direct_invocation_result( " no context was provided when invoking." ) if len(args) > 0: - if args[0] is not None and not isinstance(args[0], BaseRunlessContext): + if args[0] is not None and not isinstance(args[0], BaseDirectExecutionContext): raise DagsterInvalidInvocationError( f"Decorated function '{compute_fn.name}' has context argument, " "but no context was provided when invoking." @@ -182,7 +182,7 @@ def direct_invocation_result( kwarg: val for kwarg, val in kwargs.items() if not kwarg == context_param_name } # allow passing context, even if the function doesn't have an arg for it - elif len(args) > 0 and isinstance(args[0], BaseRunlessContext): + elif len(args) > 0 and isinstance(args[0], BaseDirectExecutionContext): context = args[0] args = args[1:] @@ -241,7 +241,7 @@ def direct_invocation_result( def _resolve_inputs( - op_def: "OpDefinition", args, kwargs, context: "BaseRunlessContext" + op_def: "OpDefinition", args, kwargs, context: "BaseDirectExecutionContext" ) -> Mapping[str, Any]: from dagster._core.execution.plan.execute_step import do_type_check @@ -344,7 +344,7 @@ def _resolve_inputs( return input_dict -def _key_for_result(result: MaterializeResult, context: "BaseRunlessContext") -> AssetKey: +def _key_for_result(result: MaterializeResult, context: "BaseDirectExecutionContext") -> AssetKey: if not context.per_invocation_properties.assets_def: raise DagsterInvariantViolationError( f"Op {context.per_invocation_properties.alias} does not have an assets definition." @@ -366,7 +366,7 @@ def _key_for_result(result: MaterializeResult, context: "BaseRunlessContext") -> def _output_name_for_result_obj( event: MaterializeResult, - context: "BaseRunlessContext", + context: "BaseDirectExecutionContext", ): if not context.per_invocation_properties.assets_def: raise DagsterInvariantViolationError( @@ -379,7 +379,7 @@ def _output_name_for_result_obj( def _handle_gen_event( event: T, op_def: "OpDefinition", - context: "BaseRunlessContext", + context: "BaseDirectExecutionContext", output_defs: Mapping[str, OutputDefinition], outputs_seen: Set[str], ) -> T: @@ -413,7 +413,7 @@ def _handle_gen_event( def _type_check_output_wrapper( - op_def: "OpDefinition", result: Any, context: "BaseRunlessContext" + op_def: "OpDefinition", result: Any, context: "BaseDirectExecutionContext" ) -> Any: """Type checks and returns the result of a op. @@ -507,7 +507,7 @@ def type_check_gen(gen): def _type_check_function_output( - op_def: "OpDefinition", result: T, context: "BaseRunlessContext" + op_def: "OpDefinition", result: T, context: "BaseDirectExecutionContext" ) -> T: from ..execution.plan.compute_generator import validate_and_coerce_op_result_to_iterator @@ -527,14 +527,14 @@ def _type_check_function_output( def _type_check_output( output_def: "OutputDefinition", output: Union[Output, DynamicOutput], - context: "BaseRunlessContext", + context: "BaseDirectExecutionContext", ) -> None: """Validates and performs core type check on a provided output. Args: output_def (OutputDefinition): The output definition to validate against. output (Any): The output to validate. - context (BaseRunlessContext): Context containing resources to be used for type + context (BaseDirectExecutionContext): Context containing resources to be used for type check. """ from ..execution.plan.execute_step import do_type_check diff --git a/python_modules/dagster/dagster/_core/execution/context/invocation.py b/python_modules/dagster/dagster/_core/execution/context/invocation.py index daa6ca1d7b275..8f534566c1fbe 100644 --- a/python_modules/dagster/dagster/_core/execution/context/invocation.py +++ b/python_modules/dagster/dagster/_core/execution/context/invocation.py @@ -65,7 +65,7 @@ def _property_msg(prop_name: str, method_name: str) -> str: return f"The {prop_name} {method_name} is not set on the context when an asset or op is directly invoked." -class BasDirectExecutionContext: +class BaseDirectExecutionContext: @abstractmethod def bind( self, @@ -75,21 +75,21 @@ def bind( config_from_args: Optional[Mapping[str, Any]], resources_from_args: Optional[Mapping[str, Any]], ): - """Instances of BasDirectExecutionContext must implement bind.""" + """Instances of BaseDirectExecutionContext must implement bind.""" @abstractmethod def unbind(self): - """Instances of BasDirectExecutionContext must implement unbind.""" + """Instances of BaseDirectExecutionContext must implement unbind.""" @property @abstractmethod def bound_properties(self) -> "BoundProperties": - """Instances of BasDirectExecutionContext must contain a BoundProperties object.""" + """Instances of BaseDirectExecutionContext must contain a BoundProperties object.""" @property @abstractmethod def execution_properties(self) -> "DirectExecutionProperties": - """Instances of BasDirectExecutionContext must contain a DirectExecutionProperties object.""" + """Instances of BaseDirectExecutionContext must contain a DirectExecutionProperties object.""" @abstractmethod def for_type(self, dagster_type: DagsterType) -> TypeCheckContext: @@ -157,7 +157,7 @@ def __init__(self): self.typed_event_stream_error_message: Optional[str] = None -class DirectOpExecutionContext(OpExecutionContext, BasDirectExecutionContext): +class DirectOpExecutionContext(OpExecutionContext, BaseDirectExecutionContext): """The ``context`` object available as the first argument to an op's compute function when being invoked directly. Can also be used as a context manager. """ @@ -736,7 +736,7 @@ def set_requires_typed_event_stream(self, *, error_message: Optional[str]) -> No self._execution_properties.typed_event_stream_error_message = error_message -class DirectAssetExecutionContext(AssetExecutionContext, BasDirectExecutionContext): +class DirectAssetExecutionContext(AssetExecutionContext, BaseDirectExecutionContext): """The ``context`` object available as the first argument to an asset's compute function when being invoked directly. Can also be used as a context manager. """ diff --git a/python_modules/dagster/dagster/_core/pipes/context.py b/python_modules/dagster/dagster/_core/pipes/context.py index 173a69e03dd17..e902f335d87c1 100644 --- a/python_modules/dagster/dagster/_core/pipes/context.py +++ b/python_modules/dagster/dagster/_core/pipes/context.py @@ -39,11 +39,7 @@ from dagster._core.errors import DagsterPipesExecutionError from dagster._core.events import EngineEventData from dagster._core.execution.context.compute import OpExecutionContext -<<<<<<< HEAD -from dagster._core.execution.context.invocation import DirectOpExecutionContext -======= -from dagster._core.execution.context.invocation import BaseRunlessContext ->>>>>>> c4d31671bc (pipes) +from dagster._core.execution.context.invocation import BaseDirectExecutionContext from dagster._utils.error import ( ExceptionInfo, SerializableErrorInfo, @@ -410,8 +406,8 @@ def build_external_execution_context_data( _convert_time_window(partition_time_window) if partition_time_window else None ), run_id=context.run_id, - job_name=None if isinstance(context, BaseRunlessContext) else context.job_name, - retry_number=0 if isinstance(context, BaseRunlessContext) else context.retry_number, + job_name=None if isinstance(context, BaseDirectExecutionContext) else context.job_name, + retry_number=0 if isinstance(context, BaseDirectExecutionContext) else context.retry_number, extras=extras or {}, ) From 7c4481e1e49a983a07d17500b45ed04127ee562b Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Tue, 2 Jan 2024 10:28:00 -0800 Subject: [PATCH 11/27] remove unneeded test --- .../dagster_tests/core_tests/test_op_invocation.py | 14 -------------- 1 file changed, 14 deletions(-) diff --git a/python_modules/dagster/dagster_tests/core_tests/test_op_invocation.py b/python_modules/dagster/dagster_tests/core_tests/test_op_invocation.py index b0e4830d91f3a..7ceb3bf2ea220 100644 --- a/python_modules/dagster/dagster_tests/core_tests/test_op_invocation.py +++ b/python_modules/dagster/dagster_tests/core_tests/test_op_invocation.py @@ -1589,17 +1589,3 @@ async def get_results(): asyncio.run(get_results()) assert_context_unbound(ctx) - - -def test_run_properties_access(): - @asset - def access_run_properties(context: AssetExecutionContext): - assert context.run_properties.run_id == "EPHEMERAL" - assert context.run_properties.retry_number == 0 - - with pytest.raises(DagsterInvalidPropertyError): - context.run_properties.dagster_run # noqa:B018 - - ctx = build_asset_context() - - access_run_properties(ctx) From 7913807f21a52830448d20acea0d05380af64fac Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Tue, 2 Jan 2024 13:50:59 -0800 Subject: [PATCH 12/27] pyright fix --- .../dagster/_core/execution/context/invocation.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/python_modules/dagster/dagster/_core/execution/context/invocation.py b/python_modules/dagster/dagster/_core/execution/context/invocation.py index 8f534566c1fbe..75ac196f52e01 100644 --- a/python_modules/dagster/dagster/_core/execution/context/invocation.py +++ b/python_modules/dagster/dagster/_core/execution/context/invocation.py @@ -75,24 +75,30 @@ def bind( config_from_args: Optional[Mapping[str, Any]], resources_from_args: Optional[Mapping[str, Any]], ): - """Instances of BaseDirectExecutionContext must implement bind.""" + """Subclasses of BaseDirectExecutionContext must implement bind.""" @abstractmethod def unbind(self): - """Instances of BaseDirectExecutionContext must implement unbind.""" + """Subclasses of BaseDirectExecutionContext must implement unbind.""" @property @abstractmethod def bound_properties(self) -> "BoundProperties": - """Instances of BaseDirectExecutionContext must contain a BoundProperties object.""" + """Subclasses of BaseDirectExecutionContext must contain a BoundProperties object.""" @property @abstractmethod def execution_properties(self) -> "DirectExecutionProperties": - """Instances of BaseDirectExecutionContext must contain a DirectExecutionProperties object.""" + """Subclasses of BaseDirectExecutionContext must contain a DirectExecutionProperties object.""" @abstractmethod def for_type(self, dagster_type: DagsterType) -> TypeCheckContext: + """Subclasses of BaseDirectExecutionContext must implement for_type.""" + pass + + @abstractmethod + def observe_output(self, output_name: str, mapping_key: Optional[str] = None) -> None: + """Subclasses of BaseDirectExecutionContext must implement observe_output.""" pass From ac881a4bff7a6207b36e03faf99f95f120bb25f1 Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Fri, 26 Jan 2024 14:10:22 -0500 Subject: [PATCH 13/27] small cleanup --- .../dagster/dagster/_core/definitions/op_invocation.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python_modules/dagster/dagster/_core/definitions/op_invocation.py b/python_modules/dagster/dagster/_core/definitions/op_invocation.py index 5b03d0509e9db..d77dd6eb81d3a 100644 --- a/python_modules/dagster/dagster/_core/definitions/op_invocation.py +++ b/python_modules/dagster/dagster/_core/definitions/op_invocation.py @@ -102,7 +102,7 @@ def _separate_args_and_kwargs( def _get_op_context( - context, # TODO - type hint + context, ) -> "OpExecutionContext": from dagster._core.execution.context.compute import AssetExecutionContext From b00ca5dac2b3d8491f9ead63c4d30a46bcbba1d5 Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Fri, 26 Jan 2024 15:05:10 -0500 Subject: [PATCH 14/27] add time window to DI contexts --- .../dagster/dagster/_core/execution/context/invocation.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/python_modules/dagster/dagster/_core/execution/context/invocation.py b/python_modules/dagster/dagster/_core/execution/context/invocation.py index 75ac196f52e01..c479ae751fff3 100644 --- a/python_modules/dagster/dagster/_core/execution/context/invocation.py +++ b/python_modules/dagster/dagster/_core/execution/context/invocation.py @@ -635,6 +635,10 @@ def asset_partitions_time_window_for_output(self, output_name: str = "result") - Union[MultiPartitionsDefinition, TimeWindowPartitionsDefinition], partitions_def ).time_window_for_partition_key(self.partition_key) + @property + def partition_time_window(self) -> TimeWindow: + return self.asset_partitions_time_window_for_output() + def add_output_metadata( self, metadata: Mapping[str, Any], From 45b652eb63cf615090f8e77f20bbc47c9a9719ce Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Mon, 29 Jan 2024 12:25:30 -0500 Subject: [PATCH 15/27] update for rename --- .../_core/execution/context/invocation.py | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/python_modules/dagster/dagster/_core/execution/context/invocation.py b/python_modules/dagster/dagster/_core/execution/context/invocation.py index c479ae751fff3..9254cf5196642 100644 --- a/python_modules/dagster/dagster/_core/execution/context/invocation.py +++ b/python_modules/dagster/dagster/_core/execution/context/invocation.py @@ -83,8 +83,8 @@ def unbind(self): @property @abstractmethod - def bound_properties(self) -> "BoundProperties": - """Subclasses of BaseDirectExecutionContext must contain a BoundProperties object.""" + def per_invocation_properties(self) -> "PerInvocationProperties": + """Subclasses of BaseDirectExecutionContext must contain a PerInvocationProperties object.""" @property @abstractmethod @@ -754,8 +754,6 @@ class DirectAssetExecutionContext(AssetExecutionContext, BaseDirectExecutionCont def __init__(self, op_execution_context: DirectOpExecutionContext): self._op_execution_context = op_execution_context - self._run_props = None - def __enter__(self): self.op_execution_context._cm_scope_entered = True # noqa: SLF001 return self @@ -766,8 +764,8 @@ def __exit__(self, *exc): def __del__(self): self.op_execution_context._exit_stack.close() # noqa: SLF001 - def _check_bound(self, fn_name: str, fn_type: str): - if not self._op_execution_context._bound_properties: # noqa: SLF001 + def _check_bound_to_invocation(self, fn_name: str, fn_type: str): + if not self._op_execution_context._per_invocation_properties: # noqa: SLF001 raise DagsterInvalidPropertyError(_property_msg(fn_name, fn_type)) def bind( @@ -782,7 +780,7 @@ def bind( raise DagsterInvariantViolationError( "DirectAssetExecutionContext can only being used to invoke an asset." ) - if self._op_execution_context._bound_properties is not None: # noqa: SLF001 + if self._op_execution_context._per_invocation_properties is not None: # noqa: SLF001 raise DagsterInvalidInvocationError( f"This context is currently being used to execute {self.op_execution_context.alias}." " The context cannot be used to execute another asset until" @@ -803,8 +801,8 @@ def unbind(self): self._op_execution_context.unbind() @property - def bound_properties(self) -> BoundProperties: - return self.op_execution_context.bound_properties + def per_invocation_properties(self) -> PerInvocationProperties: + return self.op_execution_context.per_invocation_properties @property def is_bound(self) -> bool: From 6732f464cb55da9d1572e39bfea9a6a1d3490d84 Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Tue, 30 Jan 2024 10:25:07 -0500 Subject: [PATCH 16/27] remove thing that needs to be in other branch --- .../dagster/dagster/_core/execution/context/invocation.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/python_modules/dagster/dagster/_core/execution/context/invocation.py b/python_modules/dagster/dagster/_core/execution/context/invocation.py index 9254cf5196642..3e27abb7dcbe9 100644 --- a/python_modules/dagster/dagster/_core/execution/context/invocation.py +++ b/python_modules/dagster/dagster/_core/execution/context/invocation.py @@ -635,10 +635,6 @@ def asset_partitions_time_window_for_output(self, output_name: str = "result") - Union[MultiPartitionsDefinition, TimeWindowPartitionsDefinition], partitions_def ).time_window_for_partition_key(self.partition_key) - @property - def partition_time_window(self) -> TimeWindow: - return self.asset_partitions_time_window_for_output() - def add_output_metadata( self, metadata: Mapping[str, Any], From 63e5bc64c4ce3545c613866231cd3296e1f994ae Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Tue, 30 Jan 2024 12:22:19 -0500 Subject: [PATCH 17/27] comment --- .../dagster/_core/execution/context/invocation.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/python_modules/dagster/dagster/_core/execution/context/invocation.py b/python_modules/dagster/dagster/_core/execution/context/invocation.py index 3e27abb7dcbe9..ca3b80c1e630e 100644 --- a/python_modules/dagster/dagster/_core/execution/context/invocation.py +++ b/python_modules/dagster/dagster/_core/execution/context/invocation.py @@ -66,6 +66,17 @@ def _property_msg(prop_name: str, method_name: str) -> str: class BaseDirectExecutionContext: + """Base class for any direct invocation execution contexts. Each type of execution context + (ex. OpExecutionContext, AssetExecutionContext) needs to have a variant for direct invocation. + Those direct invocation contexts have some methods that are not available until the context + is bound to a particular op/asset. The "bound" properties are held in PerInvocationProperties. + There are also some properties that are specific to a particular execution of an op/asset, these + properties are held in DirectExecutionProperties. Direct invocation contexts must + be able to be bound and unbound from a particular op/asset. Additionally, there are some methods + that all direct invocation contexts must implement so that the will be usable in the execution + code path. + """ + @abstractmethod def bind( self, From 303086ec398dcd2fb1fe239fefba926bf4241649 Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Tue, 2 Jan 2024 10:33:08 -0800 Subject: [PATCH 18/27] small --- .../dagster/dagster/_core/execution/context/compute.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/python_modules/dagster/dagster/_core/execution/context/compute.py b/python_modules/dagster/dagster/_core/execution/context/compute.py index fb859cb42cee5..b75e2a2b8a286 100644 --- a/python_modules/dagster/dagster/_core/execution/context/compute.py +++ b/python_modules/dagster/dagster/_core/execution/context/compute.py @@ -1896,3 +1896,6 @@ def enter_execution_context( _current_asset_execution_context: ContextVar[Optional[AssetExecutionContext]] = ContextVar( "_current_asset_execution_context", default=None ) + + +# TODO - remove From dd1e5db161e51f618dea02c6d8e7524781dbe183 Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Tue, 2 Jan 2024 10:41:36 -0800 Subject: [PATCH 19/27] asset materialization event --- .../_core/execution/context/compute.py | 12 +++++-- .../dagster/_core/execution/context/system.py | 21 ++++++++++--- .../_core/execution/plan/execute_step.py | 2 +- .../execution_tests/test_context.py | 31 +++++++++++++++++++ 4 files changed, 57 insertions(+), 9 deletions(-) diff --git a/python_modules/dagster/dagster/_core/execution/context/compute.py b/python_modules/dagster/dagster/_core/execution/context/compute.py index b75e2a2b8a286..2808254e92a18 100644 --- a/python_modules/dagster/dagster/_core/execution/context/compute.py +++ b/python_modules/dagster/dagster/_core/execution/context/compute.py @@ -38,6 +38,7 @@ AssetKey, AssetMaterialization, AssetObservation, + CoercibleToAssetKey, ExpectationResult, UserEvent, ) @@ -1459,6 +1460,14 @@ def job_def(self) -> JobDefinition: """ return self.op_execution_context.job_def + @public + def latest_materialization_event( + self, key: CoercibleToAssetKey + ) -> Optional[AssetMaterialization]: + return self._step_execution_context.latest_materialization_event.get( + AssetKey.from_coercible(key) + ) + ######## Deprecated methods @deprecated(**_get_deprecation_kwargs("dagster_run")) @@ -1896,6 +1905,3 @@ def enter_execution_context( _current_asset_execution_context: ContextVar[Optional[AssetExecutionContext]] = ContextVar( "_current_asset_execution_context", default=None ) - - -# TODO - remove diff --git a/python_modules/dagster/dagster/_core/execution/context/system.py b/python_modules/dagster/dagster/_core/execution/context/system.py index 25acd4c272b72..d8117166c9b52 100644 --- a/python_modules/dagster/dagster/_core/execution/context/system.py +++ b/python_modules/dagster/dagster/_core/execution/context/system.py @@ -30,7 +30,7 @@ extract_data_version_from_entry, ) from dagster._core.definitions.dependency import OpNode -from dagster._core.definitions.events import AssetKey, AssetLineageInfo +from dagster._core.definitions.events import AssetKey, AssetLineageInfo, AssetMaterialization from dagster._core.definitions.hook_definition import HookDefinition from dagster._core.definitions.job_base import IJob from dagster._core.definitions.job_definition import JobDefinition @@ -571,6 +571,8 @@ def __init__( self._output_metadata: Dict[str, Any] = {} self._seen_outputs: Dict[str, Union[str, Set[str]]] = {} + self.latest_materialization_event: Dict[AssetKey, Optional[AssetMaterialization]] = {} + self._input_asset_version_info: Dict[AssetKey, Optional["InputAssetVersionInfo"]] = {} self._is_external_input_asset_version_info_loaded = False self._data_version_cache: Dict[AssetKey, "DataVersion"] = {} @@ -955,11 +957,16 @@ def is_external_input_asset_version_info_loaded(self) -> bool: def get_input_asset_version_info(self, key: AssetKey) -> Optional["InputAssetVersionInfo"]: if key not in self._input_asset_version_info: - self._fetch_input_asset_version_info(key) + self._fetch_input_asset_materialization_and_version_info(key) return self._input_asset_version_info[key] # "external" refers to records for inputs generated outside of this step - def fetch_external_input_asset_version_info(self) -> None: + def fetch_external_input_asset_materialization_and_version_info(self) -> None: + """Fetches the latest observation or materialization for each upstream dependency + in order to determine the version info. As a side effect we create a dictionary + of the materialization events so that the AssetContext can access the latest materialization + event. + """ output_keys = self.get_output_asset_keys() all_dep_keys: List[AssetKey] = [] @@ -973,10 +980,10 @@ def fetch_external_input_asset_version_info(self) -> None: self._input_asset_version_info = {} for key in all_dep_keys: - self._fetch_input_asset_version_info(key) + self._fetch_input_asset_materialization_and_version_info(key) self._is_external_input_asset_version_info_loaded = True - def _fetch_input_asset_version_info(self, key: AssetKey) -> None: + def _fetch_input_asset_materialization_and_version_info(self, key: AssetKey) -> None: from dagster._core.definitions.data_version import ( extract_data_version_from_entry, ) @@ -984,7 +991,11 @@ def _fetch_input_asset_version_info(self, key: AssetKey) -> None: event = self._get_input_asset_event(key) if event is None: self._input_asset_version_info[key] = None + self.latest_materialization_event[key] = None else: + self.latest_materialization_event[key] = ( + event.asset_materialization if event.asset_materialization else None + ) storage_id = event.storage_id # Input name will be none if this is an internal dep input_name = self.job_def.asset_layer.input_for_asset_key(self.node_handle, key) diff --git a/python_modules/dagster/dagster/_core/execution/plan/execute_step.py b/python_modules/dagster/dagster/_core/execution/plan/execute_step.py index 2fbd81041bff9..b403b7e56a113 100644 --- a/python_modules/dagster/dagster/_core/execution/plan/execute_step.py +++ b/python_modules/dagster/dagster/_core/execution/plan/execute_step.py @@ -440,7 +440,7 @@ def core_dagster_event_sequence_for_step( inputs = {} if step_context.is_sda_step: - step_context.fetch_external_input_asset_version_info() + step_context.fetch_external_input_asset_materialization_and_version_info() for step_input in step_context.step.step_inputs: input_def = step_context.op_def.input_def_named(step_input.name) diff --git a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_context.py b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_context.py index 1fd029361981b..171f109af1427 100644 --- a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_context.py +++ b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_context.py @@ -9,6 +9,7 @@ DagsterInstance, Definitions, GraphDefinition, + MaterializeResult, OpExecutionContext, Output, asset, @@ -426,3 +427,33 @@ def a(context: AssetExecutionContext): assert context == AssetExecutionContext.get() assert materialize([a]).success + + +def test_upstream_metadata(): + # with output metadata + @asset + def upstream(context: AssetExecutionContext): + context.add_output_metadata({"foo": "bar"}) + + @asset + def downstream(context: AssetExecutionContext, upstream): + mat = context.latest_materialization_event("upstream") + assert mat is not None + assert mat.metadata["foo"].value == "bar" + + materialize([upstream, downstream]) + + +def test_upstream_metadata_materialize_result(): + # with asset materialization + @asset + def upstream(): + return MaterializeResult(metadata={"foo": "bar"}) + + @asset + def downstream(context: AssetExecutionContext, upstream): + mat = context.latest_materialization_event("upstream") + assert mat is not None + assert mat.metadata["foo"].value == "bar" + + materialize([upstream, downstream]) From 7da9f4bda4c03a0206f2cafaa952bb063f3e209d Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Tue, 2 Jan 2024 10:47:45 -0800 Subject: [PATCH 20/27] better naming --- .../dagster/_core/execution/context/compute.py | 8 +++++++- .../dagster/_core/execution/context/system.py | 14 +++++++++++--- 2 files changed, 18 insertions(+), 4 deletions(-) diff --git a/python_modules/dagster/dagster/_core/execution/context/compute.py b/python_modules/dagster/dagster/_core/execution/context/compute.py index 2808254e92a18..7784083fe08f5 100644 --- a/python_modules/dagster/dagster/_core/execution/context/compute.py +++ b/python_modules/dagster/dagster/_core/execution/context/compute.py @@ -1464,7 +1464,13 @@ def job_def(self) -> JobDefinition: def latest_materialization_event( self, key: CoercibleToAssetKey ) -> Optional[AssetMaterialization]: - return self._step_execution_context.latest_materialization_event.get( + """Get the most recent AssetMaterialization event for the key. Information like metadata and tags + can be found on the AssetMaterialization. If the key is not an upstream asset of the currently + materializing asset, None will be returned. + + Returns: Optional[AssetMaterialization] + """ + return self._step_execution_context.upstream_asset_materialization_events.get( AssetKey.from_coercible(key) ) diff --git a/python_modules/dagster/dagster/_core/execution/context/system.py b/python_modules/dagster/dagster/_core/execution/context/system.py index d8117166c9b52..df5c1d385dd17 100644 --- a/python_modules/dagster/dagster/_core/execution/context/system.py +++ b/python_modules/dagster/dagster/_core/execution/context/system.py @@ -571,7 +571,9 @@ def __init__( self._output_metadata: Dict[str, Any] = {} self._seen_outputs: Dict[str, Union[str, Set[str]]] = {} - self.latest_materialization_event: Dict[AssetKey, Optional[AssetMaterialization]] = {} + self._upstream_asset_materialization_events: Dict[ + AssetKey, Optional[AssetMaterialization] + ] = {} self._input_asset_version_info: Dict[AssetKey, Optional["InputAssetVersionInfo"]] = {} self._is_external_input_asset_version_info_loaded = False @@ -951,6 +953,12 @@ def get_data_version(self, asset_key: AssetKey) -> "DataVersion": def input_asset_records(self) -> Optional[Mapping[AssetKey, Optional["InputAssetVersionInfo"]]]: return self._input_asset_version_info + @property + def upstream_asset_materialization_events( + self, + ) -> Dict[AssetKey, Optional[AssetMaterialization]]: + return self._upstream_asset_materialization_events + @property def is_external_input_asset_version_info_loaded(self) -> bool: return self._is_external_input_asset_version_info_loaded @@ -991,9 +999,9 @@ def _fetch_input_asset_materialization_and_version_info(self, key: AssetKey) -> event = self._get_input_asset_event(key) if event is None: self._input_asset_version_info[key] = None - self.latest_materialization_event[key] = None + self._upstream_asset_materialization_events[key] = None else: - self.latest_materialization_event[key] = ( + self._upstream_asset_materialization_events[key] = ( event.asset_materialization if event.asset_materialization else None ) storage_id = event.storage_id From 32ab2f64a0b04c81da6da2243ea72b710ae40d26 Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Thu, 4 Jan 2024 11:05:53 -0800 Subject: [PATCH 21/27] via op context --- .../dagster/dagster/_core/execution/context/compute.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python_modules/dagster/dagster/_core/execution/context/compute.py b/python_modules/dagster/dagster/_core/execution/context/compute.py index 7784083fe08f5..272aacb831790 100644 --- a/python_modules/dagster/dagster/_core/execution/context/compute.py +++ b/python_modules/dagster/dagster/_core/execution/context/compute.py @@ -1470,7 +1470,7 @@ def latest_materialization_event( Returns: Optional[AssetMaterialization] """ - return self._step_execution_context.upstream_asset_materialization_events.get( + return self.op_execution_context._step_execution_context.upstream_asset_materialization_events.get( # noqa: SLF001 AssetKey.from_coercible(key) ) From 6e74a25865f33224e9197c504d3b1ef98bd5d88b Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Tue, 16 Jan 2024 12:36:55 -0500 Subject: [PATCH 22/27] rename and add exception --- .../_core/execution/context/compute.py | 21 +++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/python_modules/dagster/dagster/_core/execution/context/compute.py b/python_modules/dagster/dagster/_core/execution/context/compute.py index 272aacb831790..90e33e6e5c316 100644 --- a/python_modules/dagster/dagster/_core/execution/context/compute.py +++ b/python_modules/dagster/dagster/_core/execution/context/compute.py @@ -1461,17 +1461,26 @@ def job_def(self) -> JobDefinition: return self.op_execution_context.job_def @public - def latest_materialization_event( + def latest_materialization_for_upstream_asset( self, key: CoercibleToAssetKey ) -> Optional[AssetMaterialization]: - """Get the most recent AssetMaterialization event for the key. Information like metadata and tags - can be found on the AssetMaterialization. If the key is not an upstream asset of the currently - materializing asset, None will be returned. + """Get the most recent AssetMaterialization event for the key. The key must be an upstream + asset for the currently materializing asset. Information like metadata and tags can be found + on the AssetMaterialization. If the key is not an upstream asset of the currently + materializing asset, an error will be raised. If no AssetMaterialization exists for key, None + will be returned. Returns: Optional[AssetMaterialization] """ - return self.op_execution_context._step_execution_context.upstream_asset_materialization_events.get( # noqa: SLF001 - AssetKey.from_coercible(key) + materialization_events = ( + self.op_execution_context._step_execution_context.upstream_asset_materialization_events # noqa: SLF001 + ) + if AssetKey.from_coercible(key) in materialization_events.keys(): + return materialization_events.get(AssetKey.from_coercible(key)) + + raise DagsterInvariantViolationError( + f"Cannot fetch AssetMaterialization for asset {key}. {key} must be an upstream dependency" + "in order to call latest_materialization_for_upstream_asset." ) ######## Deprecated methods From fc5c91a55d8c20f84bf7bb00f75c3ec9098bcd20 Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Tue, 16 Jan 2024 13:13:41 -0500 Subject: [PATCH 23/27] DI context --- .../dagster/dagster/_core/execution/context/invocation.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/python_modules/dagster/dagster/_core/execution/context/invocation.py b/python_modules/dagster/dagster/_core/execution/context/invocation.py index ca3b80c1e630e..0719115dfc583 100644 --- a/python_modules/dagster/dagster/_core/execution/context/invocation.py +++ b/python_modules/dagster/dagster/_core/execution/context/invocation.py @@ -22,6 +22,7 @@ from dagster._core.definitions.events import ( AssetMaterialization, AssetObservation, + CoercibleToAssetKey, ExpectationResult, UserEvent, ) @@ -829,6 +830,13 @@ def for_type(self, dagster_type: DagsterType) -> TypeCheckContext: def observe_output(self, output_name: str, mapping_key: Optional[str] = None) -> None: self.op_execution_context.observe_output(output_name=output_name, mapping_key=mapping_key) + def latest_materialization_for_upstream_asset( + self, key: CoercibleToAssetKey + ) -> Optional[AssetMaterialization]: + raise DagsterInvalidPropertyError( + _property_msg("latest_materialization_for_upstream_asset", "method") + ) + def _validate_resource_requirements( resource_defs: Mapping[str, ResourceDefinition], op_def: OpDefinition From 686afd727a7da2a6ec5d176923b9560727e56854 Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Wed, 17 Jan 2024 16:12:55 -0500 Subject: [PATCH 24/27] fix naming --- .../dagster_tests/core_tests/execution_tests/test_context.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_context.py b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_context.py index 171f109af1427..fca48fecadf55 100644 --- a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_context.py +++ b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_context.py @@ -437,7 +437,7 @@ def upstream(context: AssetExecutionContext): @asset def downstream(context: AssetExecutionContext, upstream): - mat = context.latest_materialization_event("upstream") + mat = context.latest_materialization_for_upstream_asset("upstream") assert mat is not None assert mat.metadata["foo"].value == "bar" @@ -452,7 +452,7 @@ def upstream(): @asset def downstream(context: AssetExecutionContext, upstream): - mat = context.latest_materialization_event("upstream") + mat = context.latest_materialization_for_upstream_asset("upstream") assert mat is not None assert mat.metadata["foo"].value == "bar" From 54cf22de89755bd598235d8edfffdefc56e3fdcc Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Fri, 19 Jan 2024 15:12:43 -0500 Subject: [PATCH 25/27] move storing AM to when it is accurate --- .../dagster/dagster/_core/execution/context/system.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/python_modules/dagster/dagster/_core/execution/context/system.py b/python_modules/dagster/dagster/_core/execution/context/system.py index df5c1d385dd17..85b67cf0d00cf 100644 --- a/python_modules/dagster/dagster/_core/execution/context/system.py +++ b/python_modules/dagster/dagster/_core/execution/context/system.py @@ -1001,9 +1001,6 @@ def _fetch_input_asset_materialization_and_version_info(self, key: AssetKey) -> self._input_asset_version_info[key] = None self._upstream_asset_materialization_events[key] = None else: - self._upstream_asset_materialization_events[key] = ( - event.asset_materialization if event.asset_materialization else None - ) storage_id = event.storage_id # Input name will be none if this is an internal dep input_name = self.job_def.asset_layer.input_for_asset_key(self.node_handle, key) @@ -1030,6 +1027,12 @@ def _fetch_input_asset_materialization_and_version_info(self, key: AssetKey) -> data_version = extract_data_version_from_entry(event.event_log_entry) else: data_version = extract_data_version_from_entry(event.event_log_entry) + # the AssetMaterialization fetched above is only accurate if the asset it not partitioned + # if the asset is partitioned, then the latest AssetMaterialization may be for a partition + # that is irrelevant to the current execution + self._upstream_asset_materialization_events[key] = ( + event.asset_materialization if event.asset_materialization else None + ) self._input_asset_version_info[key] = InputAssetVersionInfo( storage_id, data_version, event.run_id, event.timestamp ) From 5b914cf0f0e44937709bca5ef43ef9c290ba1b40 Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Wed, 6 Dec 2023 15:13:01 -0500 Subject: [PATCH 26/27] do the if else thing --- .../dagster/_core/execution/plan/compute.py | 16 +++++++++++---- .../_core/execution/plan/compute_generator.py | 20 ++++++++++++++++++- 2 files changed, 31 insertions(+), 5 deletions(-) diff --git a/python_modules/dagster/dagster/_core/execution/plan/compute.py b/python_modules/dagster/dagster/_core/execution/plan/compute.py index 2d67fb38bab2e..7bb4c8d677b43 100644 --- a/python_modules/dagster/dagster/_core/execution/plan/compute.py +++ b/python_modules/dagster/dagster/_core/execution/plan/compute.py @@ -61,6 +61,14 @@ ] +def _get_op_context( + context: Union[OpExecutionContext, AssetExecutionContext] +) -> OpExecutionContext: + if isinstance(context, AssetExecutionContext): + return context.op_execution_context + return context + + def create_step_outputs( node: Node, handle: NodeHandle, @@ -189,12 +197,12 @@ def _yield_compute_results( ), user_event_generator, ): - if compute_context.has_events(): - yield from compute_context.consume_events() + if _get_op_context(compute_context).has_events(): + yield from _get_op_context(compute_context).consume_events() yield _validate_event(event, step_context) - if compute_context.has_events(): - yield from compute_context.consume_events() + if _get_op_context(compute_context).has_events(): + yield from _get_op_context(compute_context).consume_events() def execute_core_compute( diff --git a/python_modules/dagster/dagster/_core/execution/plan/compute_generator.py b/python_modules/dagster/dagster/_core/execution/plan/compute_generator.py index 375be39a7ea43..dd930fa0c98a6 100644 --- a/python_modules/dagster/dagster/_core/execution/plan/compute_generator.py +++ b/python_modules/dagster/dagster/_core/execution/plan/compute_generator.py @@ -36,9 +36,18 @@ from dagster._utils import is_named_tuple_instance from dagster._utils.warnings import disable_dagster_warnings -from ..context.compute import OpExecutionContext +from ..context.compute import AssetExecutionContext, OpExecutionContext +def _get_op_context( + context: Union[OpExecutionContext, AssetExecutionContext] +) -> OpExecutionContext: + if isinstance(context, AssetExecutionContext): + return context.op_execution_context + return context + + +# called in execute_step if the fn is not decorated def create_op_compute_wrapper( op_def: OpDefinition, ) -> Callable[[OpExecutionContext, Mapping[str, InputDefinition]], Any]: @@ -94,6 +103,7 @@ def compute( return compute +# called in this file (create_op_compute_wrapper) async def _coerce_async_op_to_async_gen( awaitable: Awaitable[Any], context: OpExecutionContext, output_defs: Sequence[OutputDefinition] ) -> AsyncIterator[Any]: @@ -102,6 +112,7 @@ async def _coerce_async_op_to_async_gen( yield event +# called in this file, and in op_invocation for direct invocation def invoke_compute_fn( fn: Callable, context: OpExecutionContext, @@ -125,6 +136,7 @@ def invoke_compute_fn( return fn(context, **args_to_pass) if context_arg_provided else fn(**args_to_pass) +# called in this file (create_op_compute_wrapper) def _coerce_op_compute_fn_to_iterator( fn, output_defs, context, context_arg_provided, kwargs, config_arg_class, resource_arg_mapping ): @@ -135,6 +147,7 @@ def _coerce_op_compute_fn_to_iterator( yield event +# called in this file (validate_and_coerce_op_result_to_iterator) def _zip_and_iterate_op_result( result: Any, context: OpExecutionContext, output_defs: Sequence[OutputDefinition] ) -> Iterator[Tuple[int, Any, OutputDefinition]]: @@ -162,6 +175,7 @@ def _zip_and_iterate_op_result( # Filter out output_defs corresponding to asset check results that already exist on a # MaterializeResult. +# called in this file (_zip_and_iterate_op_result) def _filter_expected_output_defs( result: Any, context: OpExecutionContext, output_defs: Sequence[OutputDefinition] ) -> Sequence[OutputDefinition]: @@ -177,6 +191,7 @@ def _filter_expected_output_defs( return [out for out in output_defs if out.name not in remove_outputs] +# called in this file (_zip_and_iterate_op_result) def _validate_multi_return( context: OpExecutionContext, result: Any, @@ -212,6 +227,7 @@ def _validate_multi_return( return result +# called in this file (validate_and_coerce_op_result_to_iterator) def _get_annotation_for_output_position( position: int, op_def: OpDefinition, output_defs: Sequence[OutputDefinition] ) -> Any: @@ -226,6 +242,7 @@ def _get_annotation_for_output_position( return inspect.Parameter.empty +# called in this file (validate_and_coerce_op_result_to_iterator) def _check_output_object_name( output: Union[DynamicOutput, Output], output_def: OutputDefinition, position: int ) -> None: @@ -239,6 +256,7 @@ def _check_output_object_name( ) +# called in op_invocation and this file def validate_and_coerce_op_result_to_iterator( result: Any, context: OpExecutionContext, output_defs: Sequence[OutputDefinition] ) -> Iterator[Any]: From 9b0b9dbf8218f16c29baabcfe25336d2db93018e Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Thu, 4 Jan 2024 11:07:20 -0800 Subject: [PATCH 27/27] format --- python_modules/dagster/dagster/_core/execution/plan/compute.py | 2 +- .../dagster/dagster/_core/execution/plan/compute_generator.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/python_modules/dagster/dagster/_core/execution/plan/compute.py b/python_modules/dagster/dagster/_core/execution/plan/compute.py index 7bb4c8d677b43..d48ee9b1363b9 100644 --- a/python_modules/dagster/dagster/_core/execution/plan/compute.py +++ b/python_modules/dagster/dagster/_core/execution/plan/compute.py @@ -62,7 +62,7 @@ def _get_op_context( - context: Union[OpExecutionContext, AssetExecutionContext] + context: Union[OpExecutionContext, AssetExecutionContext], ) -> OpExecutionContext: if isinstance(context, AssetExecutionContext): return context.op_execution_context diff --git a/python_modules/dagster/dagster/_core/execution/plan/compute_generator.py b/python_modules/dagster/dagster/_core/execution/plan/compute_generator.py index dd930fa0c98a6..21583e20760e9 100644 --- a/python_modules/dagster/dagster/_core/execution/plan/compute_generator.py +++ b/python_modules/dagster/dagster/_core/execution/plan/compute_generator.py @@ -40,7 +40,7 @@ def _get_op_context( - context: Union[OpExecutionContext, AssetExecutionContext] + context: Union[OpExecutionContext, AssetExecutionContext], ) -> OpExecutionContext: if isinstance(context, AssetExecutionContext): return context.op_execution_context