From 7acc305e3984f72f19a8a6877b7a0d6a63088345 Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Mon, 13 Nov 2023 15:15:09 -0500 Subject: [PATCH 01/17] update execute path for handling op and asset contexts --- .../_core/execution/context/compute.py | 76 +++++++++---------- .../dagster/_core/execution/plan/compute.py | 10 +-- .../_core/execution/plan/compute_generator.py | 45 ++++++----- 3 files changed, 66 insertions(+), 65 deletions(-) diff --git a/python_modules/dagster/dagster/_core/execution/context/compute.py b/python_modules/dagster/dagster/_core/execution/context/compute.py index b1dfe43cce873..257ffeb87e43d 100644 --- a/python_modules/dagster/dagster/_core/execution/context/compute.py +++ b/python_modules/dagster/dagster/_core/execution/context/compute.py @@ -66,56 +66,39 @@ from .system import StepExecutionContext +class ExecutionInfo( + NamedTuple( + "_ExecutionInfo", + [ + ("step_description", PublicAttr[str]), + ("op_execution_context", PublicAttr["OpExecutionContext"]), + ], + ) +): + """Information to be used by dagster internals during execution. Contains: + * A description of the step. + """ + + def __new__(cls, step_description: str, op_execution_context: "OpExecutionContext"): + return super(ExecutionInfo, cls).__new__( + cls, step_description=step_description, op_execution_context=op_execution_context + ) + + # This metaclass has to exist for OpExecutionContext to have a metaclass class AbstractComputeMetaclass(ABCMeta): pass class AbstractComputeExecutionContext(ABC, metaclass=AbstractComputeMetaclass): - """Base class for op context implemented by OpExecutionContext and DagstermillExecutionContext.""" - - @abstractmethod - def has_tag(self, key: str) -> bool: - """Implement this method to check if a logging tag is set.""" - - @abstractmethod - def get_tag(self, key: str) -> Optional[str]: - """Implement this method to get a logging tag.""" - - @property - @abstractmethod - def run_id(self) -> str: - """The run id for the context.""" - - @property - @abstractmethod - def op_def(self) -> OpDefinition: - """The op definition corresponding to the execution step being executed.""" - - @property - @abstractmethod - def job_def(self) -> JobDefinition: - """The job being executed.""" - - @property - @abstractmethod - def run(self) -> DagsterRun: - """The DagsterRun object corresponding to the execution.""" - - @property - @abstractmethod - def resources(self) -> Any: - """Resources available in the execution context.""" - - @property - @abstractmethod - def log(self) -> DagsterLogManager: - """The log manager available in the execution context.""" + """Base class for op context implemented by OpExecutionContext, AssetExecutionContext, + and DagstermillExecutionContext. + """ @property @abstractmethod - def op_config(self) -> Any: - """The parsed config specific to this op.""" + def execution_info(self) -> ExecutionInfo: + """Implement this method to check if a logging tag is set.""" class OpExecutionContextMetaClass(AbstractComputeMetaclass): @@ -555,7 +538,7 @@ def retry_number(self) -> int: return self._step_execution_context.previous_attempt_count def describe_op(self) -> str: - return self._step_execution_context.describe_op() + return self.execution_info.step_description @public def get_mapping_key(self) -> Optional[str]: @@ -1420,6 +1403,11 @@ def __init__(self, op_execution_context: OpExecutionContext) -> None: retry_number=self._op_execution_context.retry_number, ) + self._execution_info = ExecutionInfo( + step_description=f"asset {self._op_execution_context.node_handle}", + op_execution_context=self._op_execution_context, + ) + @staticmethod def get() -> "AssetExecutionContext": ctx = _current_asset_execution_context.get() @@ -1435,6 +1423,10 @@ def op_execution_context(self) -> OpExecutionContext: def run_properties(self) -> RunProperties: return self._run_props + @property + def execution_info(self) -> ExecutionInfo: + return self._execution_info + ######## Deprecated methods @deprecated(**_get_deprecation_kwargs("run")) diff --git a/python_modules/dagster/dagster/_core/execution/plan/compute.py b/python_modules/dagster/dagster/_core/execution/plan/compute.py index 2d67fb38bab2e..70682230a17b2 100644 --- a/python_modules/dagster/dagster/_core/execution/plan/compute.py +++ b/python_modules/dagster/dagster/_core/execution/plan/compute.py @@ -176,12 +176,12 @@ def _yield_compute_results( if inspect.isasyncgen(user_event_generator): user_event_generator = gen_from_async_gen(user_event_generator) - op_label = step_context.describe_op() + step_label = compute_context.execution_info.step_description for event in iterate_with_context( lambda: op_execution_error_boundary( DagsterExecutionStepExecutionError, - msg_fn=lambda: f"Error occurred while executing {op_label}:", + msg_fn=lambda: f"Error occurred while executing {step_label}:", step_context=step_context, step_key=step_context.step.key, op_def_name=step_context.op_def.name, @@ -189,11 +189,11 @@ def _yield_compute_results( ), user_event_generator, ): - if compute_context.has_events(): - yield from compute_context.consume_events() + if compute_context.execution_info.op_execution_context.has_events(): + yield from compute_context.execution_info.op_execution_context.consume_events() yield _validate_event(event, step_context) - if compute_context.has_events(): + if compute_context.execution_info.op_execution_context.has_events(): yield from compute_context.consume_events() diff --git a/python_modules/dagster/dagster/_core/execution/plan/compute_generator.py b/python_modules/dagster/dagster/_core/execution/plan/compute_generator.py index 375be39a7ea43..fbcc3df738958 100644 --- a/python_modules/dagster/dagster/_core/execution/plan/compute_generator.py +++ b/python_modules/dagster/dagster/_core/execution/plan/compute_generator.py @@ -18,6 +18,7 @@ from typing_extensions import get_args +from dagster import AssetExecutionContext from dagster._config.pythonic_config import Config from dagster._core.definitions import ( AssetCheckResult, @@ -136,7 +137,9 @@ def _coerce_op_compute_fn_to_iterator( def _zip_and_iterate_op_result( - result: Any, context: OpExecutionContext, output_defs: Sequence[OutputDefinition] + result: Any, + context: Union[OpExecutionContext, AssetExecutionContext], + output_defs: Sequence[OutputDefinition], ) -> Iterator[Tuple[int, Any, OutputDefinition]]: # Filtering the expected output defs here is an unfortunate temporary solution to deal with the # change in expected outputs that occurs as a result of putting `AssetCheckResults` onto @@ -163,14 +166,18 @@ def _zip_and_iterate_op_result( # Filter out output_defs corresponding to asset check results that already exist on a # MaterializeResult. def _filter_expected_output_defs( - result: Any, context: OpExecutionContext, output_defs: Sequence[OutputDefinition] + result: Any, + context: Union[OpExecutionContext, AssetExecutionContext], + output_defs: Sequence[OutputDefinition], ) -> Sequence[OutputDefinition]: result_tuple = ( (result,) if not isinstance(result, tuple) or is_named_tuple_instance(result) else result ) materialize_results = [x for x in result_tuple if isinstance(x, MaterializeResult)] remove_outputs = [ - r.get_spec_python_identifier(asset_key=x.asset_key or context.asset_key) + r.get_spec_python_identifier( + asset_key=x.asset_key or context.asset_key + ) # TODO - asset key might need to be on ExecutionInfo for x in materialize_results for r in x.check_results or [] ] @@ -178,7 +185,7 @@ def _filter_expected_output_defs( def _validate_multi_return( - context: OpExecutionContext, + context: Union[OpExecutionContext, AssetExecutionContext], result: Any, output_defs: Sequence[OutputDefinition], ) -> Any: @@ -194,7 +201,7 @@ def _validate_multi_return( # When returning from an op with multiple outputs, the returned object must be a tuple of the same length as the number of outputs. At the time of the op's construction, we verify that a provided annotation is a tuple with the same length as the number of outputs, so if the result matches the number of output defs on the op, it will transitively also match the annotation. if not isinstance(result, tuple): raise DagsterInvariantViolationError( - f"{context.describe_op()} has multiple outputs, but only one " + f"{context.execution_info.step_description} has multiple outputs, but only one " f"output was returned of type {type(result)}. When using " "multiple outputs, either yield each output, or return a tuple " "containing a value for each output. Check out the " @@ -205,9 +212,9 @@ def _validate_multi_return( if not len(output_tuple) == len(output_defs): raise DagsterInvariantViolationError( "Length mismatch between returned tuple of outputs and number of " - f"output defs on {context.describe_op()}. Output tuple has " + f"output defs on {context.execution_info.step_description}. Output tuple has " f"{len(output_tuple)} outputs, while " - f"{context.op_def.node_type_str} has {len(output_defs)} outputs." + f"{context.execution_info.step_description} has {len(output_defs)} outputs." ) return result @@ -240,7 +247,9 @@ def _check_output_object_name( def validate_and_coerce_op_result_to_iterator( - result: Any, context: OpExecutionContext, output_defs: Sequence[OutputDefinition] + result: Any, + context: Union[AssetExecutionContext, OpExecutionContext], + output_defs: Sequence[OutputDefinition], ) -> Iterator[Any]: if inspect.isgenerator(result): # this happens when a user explicitly returns a generator in the op @@ -248,11 +257,11 @@ def validate_and_coerce_op_result_to_iterator( yield event elif isinstance(result, (AssetMaterialization, ExpectationResult)): raise DagsterInvariantViolationError( - f"Error in {context.describe_op()}: If you are " + f"Error in {context.execution_info.step_description}: If you are " "returning an AssetMaterialization " "or an ExpectationResult from " - f"{context.op_def.node_type_str} you must yield them " - "directly, or log them using the OpExecutionContext.log_event method to avoid " + "an op you must yield them " + "directly, or log them using the context.log_event method to avoid " "ambiguity with an implied result from returning a " "value. Check out the docs on logging events here: " "https://docs.dagster.io/concepts/ops-jobs-graphs/op-events#op-events-and-exceptions" @@ -261,8 +270,8 @@ def validate_and_coerce_op_result_to_iterator( yield result elif result is not None and not output_defs: raise DagsterInvariantViolationError( - f"Error in {context.describe_op()}: Unexpectedly returned output of type" - f" {type(result)}. {context.op_def.node_type_str.capitalize()} is explicitly defined to" + f"Error in {context.execution_info.step_description}: Unexpectedly returned output of type" + f" {type(result)}. {context.execution_info.step_description} is explicitly defined to" " return no results." ) # `requires_typed_event_stream` is a mode where we require users to return/yield exactly the @@ -287,7 +296,7 @@ def validate_and_coerce_op_result_to_iterator( if output_def.is_dynamic: if not isinstance(element, list): raise DagsterInvariantViolationError( - f"Error with output for {context.describe_op()}: " + f"Error with output for {context.execution_info.step_description}: " f"dynamic output '{output_def.name}' expected a list of " "DynamicOutput objects, but instead received instead an " f"object of type {type(element)}." @@ -295,7 +304,7 @@ def validate_and_coerce_op_result_to_iterator( for item in element: if not isinstance(item, DynamicOutput): raise DagsterInvariantViolationError( - f"Error with output for {context.describe_op()}: " + f"Error with output for {context.execution_info.step_description}: " f"dynamic output '{output_def.name}' at position {position} expected a " "list of DynamicOutput objects, but received an " f"item with type {type(item)}." @@ -317,7 +326,7 @@ def validate_and_coerce_op_result_to_iterator( annotation ): raise DagsterInvariantViolationError( - f"Error with output for {context.describe_op()}: received Output object for" + f"Error with output for {context.execution_info.step_description}: received Output object for" f" output '{output_def.name}' which does not have an Output annotation." f" Annotation has type {annotation}." ) @@ -335,7 +344,7 @@ def validate_and_coerce_op_result_to_iterator( # output object was not received, throw an error. if is_generic_output_annotation(annotation): raise DagsterInvariantViolationError( - f"Error with output for {context.describe_op()}: output " + f"Error with output for {context.execution_info.step_description}: output " f"'{output_def.name}' has generic output annotation, " "but did not receive an Output object for this output. " f"Received instead an object of type {type(element)}." @@ -343,7 +352,7 @@ def validate_and_coerce_op_result_to_iterator( if result is None and output_def.is_required is False: context.log.warning( 'Value "None" returned for non-required output ' - f'"{output_def.name}" of {context.describe_op()}. ' + f'"{output_def.name}" of {context.execution_info.step_description}. ' "This value will be passed to downstream " f"{context.op_def.node_type_str}s. For conditional " "execution, results must be yielded: " From 2edc412b4f0be71732c757d9e0c314584fddb446 Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Mon, 13 Nov 2023 15:35:08 -0500 Subject: [PATCH 02/17] fix import --- .../dagster/dagster/_core/execution/plan/compute_generator.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/python_modules/dagster/dagster/_core/execution/plan/compute_generator.py b/python_modules/dagster/dagster/_core/execution/plan/compute_generator.py index fbcc3df738958..d2657519e1f0b 100644 --- a/python_modules/dagster/dagster/_core/execution/plan/compute_generator.py +++ b/python_modules/dagster/dagster/_core/execution/plan/compute_generator.py @@ -18,7 +18,6 @@ from typing_extensions import get_args -from dagster import AssetExecutionContext from dagster._config.pythonic_config import Config from dagster._core.definitions import ( AssetCheckResult, @@ -37,7 +36,7 @@ from dagster._utils import is_named_tuple_instance from dagster._utils.warnings import disable_dagster_warnings -from ..context.compute import OpExecutionContext +from ..context.compute import AssetExecutionContext, OpExecutionContext def create_op_compute_wrapper( From 82f88cc5d2446700e6411271627e77ba39bdf09a Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Mon, 13 Nov 2023 16:04:15 -0500 Subject: [PATCH 03/17] add DI --- .../dagster/dagster/_core/execution/context/invocation.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/python_modules/dagster/dagster/_core/execution/context/invocation.py b/python_modules/dagster/dagster/_core/execution/context/invocation.py index 2fe58e73cb7a7..94b2d136a0c59 100644 --- a/python_modules/dagster/dagster/_core/execution/context/invocation.py +++ b/python_modules/dagster/dagster/_core/execution/context/invocation.py @@ -56,7 +56,7 @@ from dagster._utils.merger import merge_dicts from dagster._utils.warnings import deprecation_warning -from .compute import OpExecutionContext +from .compute import ExecutionInfo, OpExecutionContext from .system import StepExecutionContext, TypeCheckContext @@ -402,11 +402,14 @@ def bind( op_config=op_config, step_description=step_description, ) - + self._execution_info = ExecutionInfo( + step_description=f"op {alias}", op_execution_context=self + ) return self def unbind(self): self._bound_properties = None + self._execution_info = None @property def is_bound(self) -> bool: From 9c623b756f7931a0aa8bee1118dbceb915c70234 Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Wed, 15 Nov 2023 12:13:07 -0500 Subject: [PATCH 04/17] revert changes to base class to make dagstermill work - to figure out for real later --- .../_core/execution/context/compute.py | 51 ++++++++++++++++++- 1 file changed, 49 insertions(+), 2 deletions(-) diff --git a/python_modules/dagster/dagster/_core/execution/context/compute.py b/python_modules/dagster/dagster/_core/execution/context/compute.py index 257ffeb87e43d..9dda509d72f0a 100644 --- a/python_modules/dagster/dagster/_core/execution/context/compute.py +++ b/python_modules/dagster/dagster/_core/execution/context/compute.py @@ -91,17 +91,64 @@ class AbstractComputeMetaclass(ABCMeta): class AbstractComputeExecutionContext(ABC, metaclass=AbstractComputeMetaclass): - """Base class for op context implemented by OpExecutionContext, AssetExecutionContext, + """Base class for op context implemented by OpExecutionContext, and DagstermillExecutionContext. """ + """Base class for op context implemented by OpExecutionContext and DagstermillExecutionContext.""" + + @abstractmethod + def has_tag(self, key: str) -> bool: + """Implement this method to check if a logging tag is set.""" + + @abstractmethod + def get_tag(self, key: str) -> Optional[str]: + """Implement this method to get a logging tag.""" + + @property + @abstractmethod + def run_id(self) -> str: + """The run id for the context.""" + + @property + @abstractmethod + def op_def(self) -> OpDefinition: + """The op definition corresponding to the execution step being executed.""" + + @property + @abstractmethod + def job_def(self) -> JobDefinition: + """The job being executed.""" + + @property + @abstractmethod + def run(self) -> DagsterRun: + """The DagsterRun object corresponding to the execution.""" + + @property + @abstractmethod + def resources(self) -> Any: + """Resources available in the execution context.""" + + @property + @abstractmethod + def log(self) -> DagsterLogManager: + """The log manager available in the execution context.""" + + @property + @abstractmethod + def op_config(self) -> Any: + """The parsed config specific to this op.""" + + +class HasExecutionInfo(ABC): @property @abstractmethod def execution_info(self) -> ExecutionInfo: """Implement this method to check if a logging tag is set.""" -class OpExecutionContextMetaClass(AbstractComputeMetaclass): +class OpExecutionContextMetaClass(AbstractComputeMetaclass, HasExecutionInfo): def __instancecheck__(cls, instance) -> bool: # This makes isinstance(context, OpExecutionContext) throw a deprecation warning when # context is an AssetExecutionContext. This metaclass can be deleted once AssetExecutionContext From cf989c9845fb0fc978fb197de2fedf74e8a54dfd Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Wed, 15 Nov 2023 14:24:50 -0500 Subject: [PATCH 05/17] cleanup --- .../_core/execution/context/compute.py | 31 +++++++++++-------- .../_core/execution/context/invocation.py | 8 +++-- .../test_asset_execution_context.py | 2 ++ 3 files changed, 25 insertions(+), 16 deletions(-) diff --git a/python_modules/dagster/dagster/_core/execution/context/compute.py b/python_modules/dagster/dagster/_core/execution/context/compute.py index 9dda509d72f0a..3dda7171c45ea 100644 --- a/python_modules/dagster/dagster/_core/execution/context/compute.py +++ b/python_modules/dagster/dagster/_core/execution/context/compute.py @@ -66,9 +66,9 @@ from .system import StepExecutionContext -class ExecutionInfo( +class ExecutionProperties( NamedTuple( - "_ExecutionInfo", + "_ExecutionProperties", [ ("step_description", PublicAttr[str]), ("op_execution_context", PublicAttr["OpExecutionContext"]), @@ -80,7 +80,7 @@ class ExecutionInfo( """ def __new__(cls, step_description: str, op_execution_context: "OpExecutionContext"): - return super(ExecutionInfo, cls).__new__( + return super(ExecutionProperties, cls).__new__( cls, step_description=step_description, op_execution_context=op_execution_context ) @@ -141,14 +141,14 @@ def op_config(self) -> Any: """The parsed config specific to this op.""" -class HasExecutionInfo(ABC): +class HasExecutionProperties(ABC): @property @abstractmethod - def execution_info(self) -> ExecutionInfo: + def execution_properties(self) -> ExecutionProperties: """Implement this method to check if a logging tag is set.""" -class OpExecutionContextMetaClass(AbstractComputeMetaclass, HasExecutionInfo): +class OpExecutionContextMetaClass(AbstractComputeMetaclass): def __instancecheck__(cls, instance) -> bool: # This makes isinstance(context, OpExecutionContext) throw a deprecation warning when # context is an AssetExecutionContext. This metaclass can be deleted once AssetExecutionContext @@ -166,7 +166,9 @@ def __instancecheck__(cls, instance) -> bool: return super().__instancecheck__(instance) -class OpExecutionContext(AbstractComputeExecutionContext, metaclass=OpExecutionContextMetaClass): +class OpExecutionContext( + AbstractComputeExecutionContext, HasExecutionProperties, metaclass=OpExecutionContextMetaClass +): """The ``context`` object that can be made available as the first argument to the function used for computing an op or asset. @@ -1450,10 +1452,7 @@ def __init__(self, op_execution_context: OpExecutionContext) -> None: retry_number=self._op_execution_context.retry_number, ) - self._execution_info = ExecutionInfo( - step_description=f"asset {self._op_execution_context.node_handle}", - op_execution_context=self._op_execution_context, - ) + self._execution_props = None @staticmethod def get() -> "AssetExecutionContext": @@ -1471,8 +1470,14 @@ def run_properties(self) -> RunProperties: return self._run_props @property - def execution_info(self) -> ExecutionInfo: - return self._execution_info + def execution_properties(self) -> ExecutionProperties: + if self._execution_props is None: + self._execution_props = ExecutionProperties( + step_description=f"asset {self.assets_def.node_def.name}", + op_execution_context=self._op_execution_context, + ) + + return self._execution_props ######## Deprecated methods diff --git a/python_modules/dagster/dagster/_core/execution/context/invocation.py b/python_modules/dagster/dagster/_core/execution/context/invocation.py index 94b2d136a0c59..58e6803ffc70f 100644 --- a/python_modules/dagster/dagster/_core/execution/context/invocation.py +++ b/python_modules/dagster/dagster/_core/execution/context/invocation.py @@ -56,7 +56,7 @@ from dagster._utils.merger import merge_dicts from dagster._utils.warnings import deprecation_warning -from .compute import ExecutionInfo, OpExecutionContext +from .compute import ExecutionProperties, OpExecutionContext from .system import StepExecutionContext, TypeCheckContext @@ -294,6 +294,8 @@ def __init__( # my_op(ctx) # ctx._execution_properties is cleared at the beginning of the next invocation self._execution_properties = RunlessExecutionProperties() + self._execution_props = None + def __enter__(self): self._cm_scope_entered = True return self @@ -402,14 +404,14 @@ def bind( op_config=op_config, step_description=step_description, ) - self._execution_info = ExecutionInfo( + self._execution_props = ExecutionProperties( step_description=f"op {alias}", op_execution_context=self ) return self def unbind(self): self._bound_properties = None - self._execution_info = None + self._execution_props = None @property def is_bound(self) -> bool: diff --git a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_execution_context.py b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_execution_context.py index bad2c5f588979..a8a774b49cf04 100644 --- a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_execution_context.py +++ b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_execution_context.py @@ -98,6 +98,8 @@ def test_deprecation_warnings(): "is_subset", "partition_keys", "get", + "execution_info", + "_execution_info", ] other_ignores = [ From 2936a63b4bd23a8823130974294a1c731019917f Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Wed, 15 Nov 2023 15:13:29 -0500 Subject: [PATCH 06/17] move --- .../dagster/_core/execution/context/compute.py | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/python_modules/dagster/dagster/_core/execution/context/compute.py b/python_modules/dagster/dagster/_core/execution/context/compute.py index 3dda7171c45ea..d9636da4c1f3d 100644 --- a/python_modules/dagster/dagster/_core/execution/context/compute.py +++ b/python_modules/dagster/dagster/_core/execution/context/compute.py @@ -1452,7 +1452,10 @@ def __init__(self, op_execution_context: OpExecutionContext) -> None: retry_number=self._op_execution_context.retry_number, ) - self._execution_props = None + self._execution_props = ExecutionProperties( + step_description=f"asset {self.assets_def.node_def.name}", + op_execution_context=self._op_execution_context, + ) @staticmethod def get() -> "AssetExecutionContext": @@ -1471,12 +1474,6 @@ def run_properties(self) -> RunProperties: @property def execution_properties(self) -> ExecutionProperties: - if self._execution_props is None: - self._execution_props = ExecutionProperties( - step_description=f"asset {self.assets_def.node_def.name}", - op_execution_context=self._op_execution_context, - ) - return self._execution_props ######## Deprecated methods From 388eacdcf3262f6b3d1ca660520f60f057c8116c Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Wed, 15 Nov 2023 15:36:18 -0500 Subject: [PATCH 07/17] small update --- .../dagster/dagster/_core/execution/context/compute.py | 4 ++-- .../dagster/dagster/_core/execution/context/invocation.py | 3 ++- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/python_modules/dagster/dagster/_core/execution/context/compute.py b/python_modules/dagster/dagster/_core/execution/context/compute.py index d9636da4c1f3d..77a5078d1f444 100644 --- a/python_modules/dagster/dagster/_core/execution/context/compute.py +++ b/python_modules/dagster/dagster/_core/execution/context/compute.py @@ -75,8 +75,8 @@ class ExecutionProperties( ], ) ): - """Information to be used by dagster internals during execution. Contains: - * A description of the step. + """Information to be used by dagster internals during execution. + You should not need to access these attributes directly. """ def __new__(cls, step_description: str, op_execution_context: "OpExecutionContext"): diff --git a/python_modules/dagster/dagster/_core/execution/context/invocation.py b/python_modules/dagster/dagster/_core/execution/context/invocation.py index 58e6803ffc70f..c6428c7b43027 100644 --- a/python_modules/dagster/dagster/_core/execution/context/invocation.py +++ b/python_modules/dagster/dagster/_core/execution/context/invocation.py @@ -404,8 +404,9 @@ def bind( op_config=op_config, step_description=step_description, ) + self._execution_props = ExecutionProperties( - step_description=f"op {alias}", op_execution_context=self + step_description=f'op "{op_def.name}"', op_execution_context=self ) return self From 91d2c900616fd9a4556ed67af0bb9f80581e5410 Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Thu, 16 Nov 2023 09:33:16 -0500 Subject: [PATCH 08/17] fix execution context init for assets context: --- .../dagster/_core/execution/context/compute.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/python_modules/dagster/dagster/_core/execution/context/compute.py b/python_modules/dagster/dagster/_core/execution/context/compute.py index 77a5078d1f444..19bc666667705 100644 --- a/python_modules/dagster/dagster/_core/execution/context/compute.py +++ b/python_modules/dagster/dagster/_core/execution/context/compute.py @@ -1452,10 +1452,9 @@ def __init__(self, op_execution_context: OpExecutionContext) -> None: retry_number=self._op_execution_context.retry_number, ) - self._execution_props = ExecutionProperties( - step_description=f"asset {self.assets_def.node_def.name}", - op_execution_context=self._op_execution_context, - ) + # start execution_props as None since enter_execution_context builds an AssetExecutionContext + # for all steps (including ops) and ops will fail on self.assets_def call + self._execution_props = None @staticmethod def get() -> "AssetExecutionContext": @@ -1474,6 +1473,11 @@ def run_properties(self) -> RunProperties: @property def execution_properties(self) -> ExecutionProperties: + if self._execution_props is None: + self._execution_props = ExecutionProperties( + step_description=f"asset {self.assets_def.node_def.name}", + op_execution_context=self._op_execution_context, + ) return self._execution_props ######## Deprecated methods From 5d595989b1e1233c569c0f8c34a458ed8433f1e9 Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Thu, 16 Nov 2023 10:05:57 -0500 Subject: [PATCH 09/17] update typing and naming --- .../_core/execution/context/compute.py | 8 +++-- .../dagster/_core/execution/plan/compute.py | 15 ++++---- .../_core/execution/plan/compute_generator.py | 35 ++++++++++--------- .../test_asset_execution_context.py | 4 +-- 4 files changed, 33 insertions(+), 29 deletions(-) diff --git a/python_modules/dagster/dagster/_core/execution/context/compute.py b/python_modules/dagster/dagster/_core/execution/context/compute.py index 19bc666667705..6e9f36ad0d973 100644 --- a/python_modules/dagster/dagster/_core/execution/context/compute.py +++ b/python_modules/dagster/dagster/_core/execution/context/compute.py @@ -141,7 +141,7 @@ def op_config(self) -> Any: """The parsed config specific to this op.""" -class HasExecutionProperties(ABC): +class ContextHasExecutionProperties(ABC): @property @abstractmethod def execution_properties(self) -> ExecutionProperties: @@ -167,7 +167,9 @@ def __instancecheck__(cls, instance) -> bool: class OpExecutionContext( - AbstractComputeExecutionContext, HasExecutionProperties, metaclass=OpExecutionContextMetaClass + AbstractComputeExecutionContext, + ContextHasExecutionProperties, + metaclass=OpExecutionContextMetaClass, ): """The ``context`` object that can be made available as the first argument to the function used for computing an op or asset. @@ -587,7 +589,7 @@ def retry_number(self) -> int: return self._step_execution_context.previous_attempt_count def describe_op(self) -> str: - return self.execution_info.step_description + return self.execution_properties.step_description @public def get_mapping_key(self) -> Optional[str]: diff --git a/python_modules/dagster/dagster/_core/execution/plan/compute.py b/python_modules/dagster/dagster/_core/execution/plan/compute.py index 70682230a17b2..d4201b5ea1efb 100644 --- a/python_modules/dagster/dagster/_core/execution/plan/compute.py +++ b/python_modules/dagster/dagster/_core/execution/plan/compute.py @@ -36,8 +36,7 @@ ) from dagster._core.events import DagsterEvent from dagster._core.execution.context.compute import ( - AssetExecutionContext, - OpExecutionContext, + ContextHasExecutionProperties, ) from dagster._core.execution.context.system import StepExecutionContext from dagster._core.system_config.objects import ResolvedRunConfig @@ -154,7 +153,7 @@ def _yield_compute_results( step_context: StepExecutionContext, inputs: Mapping[str, Any], compute_fn: OpComputeFunction, - compute_context: Union[OpExecutionContext, AssetExecutionContext], + compute_context: ContextHasExecutionProperties, ) -> Iterator[OpOutputUnion]: user_event_generator = compute_fn(compute_context, inputs) @@ -176,7 +175,7 @@ def _yield_compute_results( if inspect.isasyncgen(user_event_generator): user_event_generator = gen_from_async_gen(user_event_generator) - step_label = compute_context.execution_info.step_description + step_label = compute_context.execution_properties.step_description for event in iterate_with_context( lambda: op_execution_error_boundary( @@ -189,11 +188,11 @@ def _yield_compute_results( ), user_event_generator, ): - if compute_context.execution_info.op_execution_context.has_events(): - yield from compute_context.execution_info.op_execution_context.consume_events() + if compute_context.execution_properties.op_execution_context.has_events(): + yield from compute_context.execution_properties.op_execution_context.consume_events() yield _validate_event(event, step_context) - if compute_context.execution_info.op_execution_context.has_events(): + if compute_context.execution_properties.op_execution_context.has_events(): yield from compute_context.consume_events() @@ -201,7 +200,7 @@ def execute_core_compute( step_context: StepExecutionContext, inputs: Mapping[str, Any], compute_fn: OpComputeFunction, - compute_context: Union[OpExecutionContext, AssetExecutionContext], + compute_context: ContextHasExecutionProperties, ) -> Iterator[OpOutputUnion]: """Execute the user-specified compute for the op. Wrap in an error boundary and do all relevant logging and metrics tracking. diff --git a/python_modules/dagster/dagster/_core/execution/plan/compute_generator.py b/python_modules/dagster/dagster/_core/execution/plan/compute_generator.py index d2657519e1f0b..962d55a535de5 100644 --- a/python_modules/dagster/dagster/_core/execution/plan/compute_generator.py +++ b/python_modules/dagster/dagster/_core/execution/plan/compute_generator.py @@ -36,7 +36,10 @@ from dagster._utils import is_named_tuple_instance from dagster._utils.warnings import disable_dagster_warnings -from ..context.compute import AssetExecutionContext, OpExecutionContext +from ..context.compute import ( + ContextHasExecutionProperties, + OpExecutionContext, +) def create_op_compute_wrapper( @@ -137,7 +140,7 @@ def _coerce_op_compute_fn_to_iterator( def _zip_and_iterate_op_result( result: Any, - context: Union[OpExecutionContext, AssetExecutionContext], + context: ContextHasExecutionProperties, output_defs: Sequence[OutputDefinition], ) -> Iterator[Tuple[int, Any, OutputDefinition]]: # Filtering the expected output defs here is an unfortunate temporary solution to deal with the @@ -166,7 +169,7 @@ def _zip_and_iterate_op_result( # MaterializeResult. def _filter_expected_output_defs( result: Any, - context: Union[OpExecutionContext, AssetExecutionContext], + context: ContextHasExecutionProperties, output_defs: Sequence[OutputDefinition], ) -> Sequence[OutputDefinition]: result_tuple = ( @@ -184,7 +187,7 @@ def _filter_expected_output_defs( def _validate_multi_return( - context: Union[OpExecutionContext, AssetExecutionContext], + context: ContextHasExecutionProperties, result: Any, output_defs: Sequence[OutputDefinition], ) -> Any: @@ -200,7 +203,7 @@ def _validate_multi_return( # When returning from an op with multiple outputs, the returned object must be a tuple of the same length as the number of outputs. At the time of the op's construction, we verify that a provided annotation is a tuple with the same length as the number of outputs, so if the result matches the number of output defs on the op, it will transitively also match the annotation. if not isinstance(result, tuple): raise DagsterInvariantViolationError( - f"{context.execution_info.step_description} has multiple outputs, but only one " + f"{context.execution_properties.step_description} has multiple outputs, but only one " f"output was returned of type {type(result)}. When using " "multiple outputs, either yield each output, or return a tuple " "containing a value for each output. Check out the " @@ -211,9 +214,9 @@ def _validate_multi_return( if not len(output_tuple) == len(output_defs): raise DagsterInvariantViolationError( "Length mismatch between returned tuple of outputs and number of " - f"output defs on {context.execution_info.step_description}. Output tuple has " + f"output defs on {context.execution_properties.step_description}. Output tuple has " f"{len(output_tuple)} outputs, while " - f"{context.execution_info.step_description} has {len(output_defs)} outputs." + f"{context.execution_properties.step_description} has {len(output_defs)} outputs." ) return result @@ -247,7 +250,7 @@ def _check_output_object_name( def validate_and_coerce_op_result_to_iterator( result: Any, - context: Union[AssetExecutionContext, OpExecutionContext], + context: ContextHasExecutionProperties, output_defs: Sequence[OutputDefinition], ) -> Iterator[Any]: if inspect.isgenerator(result): @@ -256,7 +259,7 @@ def validate_and_coerce_op_result_to_iterator( yield event elif isinstance(result, (AssetMaterialization, ExpectationResult)): raise DagsterInvariantViolationError( - f"Error in {context.execution_info.step_description}: If you are " + f"Error in {context.execution_properties.step_description}: If you are " "returning an AssetMaterialization " "or an ExpectationResult from " "an op you must yield them " @@ -269,8 +272,8 @@ def validate_and_coerce_op_result_to_iterator( yield result elif result is not None and not output_defs: raise DagsterInvariantViolationError( - f"Error in {context.execution_info.step_description}: Unexpectedly returned output of type" - f" {type(result)}. {context.execution_info.step_description} is explicitly defined to" + f"Error in {context.execution_properties.step_description}: Unexpectedly returned output of type" + f" {type(result)}. {context.execution_properties.step_description} is explicitly defined to" " return no results." ) # `requires_typed_event_stream` is a mode where we require users to return/yield exactly the @@ -295,7 +298,7 @@ def validate_and_coerce_op_result_to_iterator( if output_def.is_dynamic: if not isinstance(element, list): raise DagsterInvariantViolationError( - f"Error with output for {context.execution_info.step_description}: " + f"Error with output for {context.execution_properties.step_description}: " f"dynamic output '{output_def.name}' expected a list of " "DynamicOutput objects, but instead received instead an " f"object of type {type(element)}." @@ -303,7 +306,7 @@ def validate_and_coerce_op_result_to_iterator( for item in element: if not isinstance(item, DynamicOutput): raise DagsterInvariantViolationError( - f"Error with output for {context.execution_info.step_description}: " + f"Error with output for {context.execution_properties.step_description}: " f"dynamic output '{output_def.name}' at position {position} expected a " "list of DynamicOutput objects, but received an " f"item with type {type(item)}." @@ -325,7 +328,7 @@ def validate_and_coerce_op_result_to_iterator( annotation ): raise DagsterInvariantViolationError( - f"Error with output for {context.execution_info.step_description}: received Output object for" + f"Error with output for {context.execution_properties.step_description}: received Output object for" f" output '{output_def.name}' which does not have an Output annotation." f" Annotation has type {annotation}." ) @@ -343,7 +346,7 @@ def validate_and_coerce_op_result_to_iterator( # output object was not received, throw an error. if is_generic_output_annotation(annotation): raise DagsterInvariantViolationError( - f"Error with output for {context.execution_info.step_description}: output " + f"Error with output for {context.execution_properties.step_description}: output " f"'{output_def.name}' has generic output annotation, " "but did not receive an Output object for this output. " f"Received instead an object of type {type(element)}." @@ -351,7 +354,7 @@ def validate_and_coerce_op_result_to_iterator( if result is None and output_def.is_required is False: context.log.warning( 'Value "None" returned for non-required output ' - f'"{output_def.name}" of {context.execution_info.step_description}. ' + f'"{output_def.name}" of {context.execution_properties.step_description}. ' "This value will be passed to downstream " f"{context.op_def.node_type_str}s. For conditional " "execution, results must be yielded: " diff --git a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_execution_context.py b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_execution_context.py index a8a774b49cf04..e82214b9e3a7f 100644 --- a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_execution_context.py +++ b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_execution_context.py @@ -98,8 +98,8 @@ def test_deprecation_warnings(): "is_subset", "partition_keys", "get", - "execution_info", - "_execution_info", + "execution_properties", + "_execution_props", ] other_ignores = [ From 7ee4d7e6bff74aac3ae008ead098072b90875ff1 Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Thu, 16 Nov 2023 16:09:10 -0500 Subject: [PATCH 10/17] naming --- .../dagster/dagster/_core/execution/context/compute.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python_modules/dagster/dagster/_core/execution/context/compute.py b/python_modules/dagster/dagster/_core/execution/context/compute.py index 6e9f36ad0d973..30e3b73471f59 100644 --- a/python_modules/dagster/dagster/_core/execution/context/compute.py +++ b/python_modules/dagster/dagster/_core/execution/context/compute.py @@ -1477,7 +1477,7 @@ def run_properties(self) -> RunProperties: def execution_properties(self) -> ExecutionProperties: if self._execution_props is None: self._execution_props = ExecutionProperties( - step_description=f"asset {self.assets_def.node_def.name}", + step_description=f"asset {self.op_execution_context.node_handle}", op_execution_context=self._op_execution_context, ) return self._execution_props From 91c0742db7a4f710e3b83a492f235897e477b5ac Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Wed, 22 Nov 2023 11:28:53 -0500 Subject: [PATCH 11/17] pyright --- .../dagster/_core/execution/context/compute.py | 11 +++++++++-- .../dagster/_core/execution/context/invocation.py | 3 +-- .../dagster/dagster/_core/execution/plan/compute.py | 2 +- .../_core/execution/plan/compute_generator.py | 13 +++++++++---- 4 files changed, 20 insertions(+), 9 deletions(-) diff --git a/python_modules/dagster/dagster/_core/execution/context/compute.py b/python_modules/dagster/dagster/_core/execution/context/compute.py index 30e3b73471f59..fd6f93ba3b935 100644 --- a/python_modules/dagster/dagster/_core/execution/context/compute.py +++ b/python_modules/dagster/dagster/_core/execution/context/compute.py @@ -71,6 +71,7 @@ class ExecutionProperties( "_ExecutionProperties", [ ("step_description", PublicAttr[str]), + ("node_type", PublicAttr[str]), ("op_execution_context", PublicAttr["OpExecutionContext"]), ], ) @@ -79,9 +80,14 @@ class ExecutionProperties( You should not need to access these attributes directly. """ - def __new__(cls, step_description: str, op_execution_context: "OpExecutionContext"): + def __new__( + cls, step_description: str, node_type: str, op_execution_context: "OpExecutionContext" + ): return super(ExecutionProperties, cls).__new__( - cls, step_description=step_description, op_execution_context=op_execution_context + cls, + step_description=step_description, + node_type=node_type, + op_execution_context=op_execution_context, ) @@ -1478,6 +1484,7 @@ def execution_properties(self) -> ExecutionProperties: if self._execution_props is None: self._execution_props = ExecutionProperties( step_description=f"asset {self.op_execution_context.node_handle}", + node_type="asset", op_execution_context=self._op_execution_context, ) return self._execution_props diff --git a/python_modules/dagster/dagster/_core/execution/context/invocation.py b/python_modules/dagster/dagster/_core/execution/context/invocation.py index c6428c7b43027..141705e4a25fd 100644 --- a/python_modules/dagster/dagster/_core/execution/context/invocation.py +++ b/python_modules/dagster/dagster/_core/execution/context/invocation.py @@ -404,9 +404,8 @@ def bind( op_config=op_config, step_description=step_description, ) - self._execution_props = ExecutionProperties( - step_description=f'op "{op_def.name}"', op_execution_context=self + step_description=f'op "{op_def.name}"', node_type="op", op_execution_context=self ) return self diff --git a/python_modules/dagster/dagster/_core/execution/plan/compute.py b/python_modules/dagster/dagster/_core/execution/plan/compute.py index d4201b5ea1efb..351e57de2949d 100644 --- a/python_modules/dagster/dagster/_core/execution/plan/compute.py +++ b/python_modules/dagster/dagster/_core/execution/plan/compute.py @@ -193,7 +193,7 @@ def _yield_compute_results( yield _validate_event(event, step_context) if compute_context.execution_properties.op_execution_context.has_events(): - yield from compute_context.consume_events() + yield from compute_context.execution_properties.op_execution_context.consume_events() def execute_core_compute( diff --git a/python_modules/dagster/dagster/_core/execution/plan/compute_generator.py b/python_modules/dagster/dagster/_core/execution/plan/compute_generator.py index 962d55a535de5..ee4859805f60a 100644 --- a/python_modules/dagster/dagster/_core/execution/plan/compute_generator.py +++ b/python_modules/dagster/dagster/_core/execution/plan/compute_generator.py @@ -280,7 +280,10 @@ def validate_and_coerce_op_result_to_iterator( # results that will be registered in the instance, without additional fancy inference (like # wrapping `None` in an `Output`). We therefore skip any return-specific validation for this # mode and treat returned values as if they were yielded. - elif output_defs and context.requires_typed_event_stream: + elif ( + output_defs + and context.execution_properties.op_execution_context.requires_typed_event_stream + ): # If nothing was returned, treat it as an empty tuple instead of a `(None,)`. # This is important for delivering the correct error message when an output is missing. if result is None: @@ -294,7 +297,9 @@ def validate_and_coerce_op_result_to_iterator( for position, output_def, element in _zip_and_iterate_op_result( result, context, output_defs ): - annotation = _get_annotation_for_output_position(position, context.op_def, output_defs) + annotation = _get_annotation_for_output_position( + position, context.execution_properties.op_execution_context.op_def, output_defs + ) if output_def.is_dynamic: if not isinstance(element, list): raise DagsterInvariantViolationError( @@ -352,11 +357,11 @@ def validate_and_coerce_op_result_to_iterator( f"Received instead an object of type {type(element)}." ) if result is None and output_def.is_required is False: - context.log.warning( + context.execution_properties.op_execution_context.log.warning( 'Value "None" returned for non-required output ' f'"{output_def.name}" of {context.execution_properties.step_description}. ' "This value will be passed to downstream " - f"{context.op_def.node_type_str}s. For conditional " + f"{context.execution_properties.node_type}s. For conditional " "execution, results must be yielded: " "https://docs.dagster.io/concepts/ops-jobs-graphs/graphs#with-conditional-branching" ) From 41021f2cee62de319e7775be5addf27dec49bdb1 Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Tue, 5 Dec 2023 14:51:54 -0500 Subject: [PATCH 12/17] wip --- .../_core/execution/context/compute.py | 92 ++++++++++++------- .../dagster/_core/execution/plan/compute.py | 8 +- 2 files changed, 63 insertions(+), 37 deletions(-) diff --git a/python_modules/dagster/dagster/_core/execution/context/compute.py b/python_modules/dagster/dagster/_core/execution/context/compute.py index fd6f93ba3b935..8554c2db74f30 100644 --- a/python_modules/dagster/dagster/_core/execution/context/compute.py +++ b/python_modules/dagster/dagster/_core/execution/context/compute.py @@ -67,28 +67,63 @@ class ExecutionProperties( - NamedTuple( - "_ExecutionProperties", - [ - ("step_description", PublicAttr[str]), - ("node_type", PublicAttr[str]), - ("op_execution_context", PublicAttr["OpExecutionContext"]), - ], - ) + # NamedTuple( + # "_ExecutionProperties", + # [ + # ("step_description", PublicAttr[str]), + # ("node_type", PublicAttr[str]), + # ("op_execution_context", PublicAttr["OpExecutionContext"]), + # ], + # ) ): """Information to be used by dagster internals during execution. You should not need to access these attributes directly. """ - def __new__( - cls, step_description: str, node_type: str, op_execution_context: "OpExecutionContext" + def __init__( + self, step_description: str, node_type: str, op_execution_context: "OpExecutionContext" ): - return super(ExecutionProperties, cls).__new__( - cls, - step_description=step_description, - node_type=node_type, - op_execution_context=op_execution_context, - ) + self._step_description = step_description + self._node_type = node_type + self._op_execution_context = op_execution_context + self._events: List[DagsterEvent] = [] + self._requires_typed_event_stream = None + # return super(ExecutionProperties, cls).__new__( + # cls, + # step_description=step_description, + # node_type=node_type, + # op_execution_context=op_execution_context, + # ) + + @property + def step_description(self) -> str: + return self._step_description + + @property + def node_type(self) -> str: + return self._node_type + + @property + def op_execution_context(self) -> "OpExecutionContext": + return self._op_execution_context + + def consume_events(self) -> Iterator[DagsterEvent]: + events = self._events + self._events = [] + yield from events + + def has_events(self) -> bool: + return bool(self._events) + + def log_event(self, event: UserEvent, step_execution_context: StepExecutionContext) -> None: + if isinstance(event, AssetMaterialization): + self._events.append(DagsterEvent.asset_materialization(step_execution_context, event)) + elif isinstance(event, AssetObservation): + self._events.append(DagsterEvent.asset_observation(step_execution_context, event)) + elif isinstance(event, ExpectationResult): + self._events.append(DagsterEvent.step_expectation_result(step_execution_context, event)) + else: + check.failed(f"Unexpected event {event}") # This metaclass has to exist for OpExecutionContext to have a metaclass @@ -151,7 +186,7 @@ class ContextHasExecutionProperties(ABC): @property @abstractmethod def execution_properties(self) -> ExecutionProperties: - """Implement this method to check if a logging tag is set.""" + """Context classes must contain an instance of ExecutionProperties.""" class OpExecutionContextMetaClass(AbstractComputeMetaclass): @@ -481,16 +516,16 @@ def run_tags(self) -> Mapping[str, str]: return self._step_execution_context.run_tags def has_events(self) -> bool: - return bool(self._events) + return self.execution_properties.has_events() def consume_events(self) -> Iterator[DagsterEvent]: """Pops and yields all user-generated events that have been recorded from this context. If consume_events has not yet been called, this will yield all logged events since the beginning of the op's computation. If consume_events has been called, it will yield all events since the last time consume_events was called. Designed for internal use. Users should never need to invoke this method. """ - events = self._events - self._events = [] - yield from events + # events = self.execution_properties.events + # self._events = [] + yield from self.execution_properties.consume_events() @public def log_event(self, event: UserEvent) -> None: @@ -511,18 +546,9 @@ def log_event(self, event: UserEvent) -> None: def log_materialization(context): context.log_event(AssetMaterialization("foo")) """ - if isinstance(event, AssetMaterialization): - self._events.append( - DagsterEvent.asset_materialization(self._step_execution_context, event) - ) - elif isinstance(event, AssetObservation): - self._events.append(DagsterEvent.asset_observation(self._step_execution_context, event)) - elif isinstance(event, ExpectationResult): - self._events.append( - DagsterEvent.step_expectation_result(self._step_execution_context, event) - ) - else: - check.failed(f"Unexpected event {event}") + self.execution_properties.log_event( + event=event, step_execution_context=self._step_execution_context + ) @public def add_output_metadata( diff --git a/python_modules/dagster/dagster/_core/execution/plan/compute.py b/python_modules/dagster/dagster/_core/execution/plan/compute.py index 351e57de2949d..e608140725573 100644 --- a/python_modules/dagster/dagster/_core/execution/plan/compute.py +++ b/python_modules/dagster/dagster/_core/execution/plan/compute.py @@ -188,12 +188,12 @@ def _yield_compute_results( ), user_event_generator, ): - if compute_context.execution_properties.op_execution_context.has_events(): - yield from compute_context.execution_properties.op_execution_context.consume_events() + if compute_context.execution_properties.has_events(): + yield from compute_context.execution_properties.consume_events() yield _validate_event(event, step_context) - if compute_context.execution_properties.op_execution_context.has_events(): - yield from compute_context.execution_properties.op_execution_context.consume_events() + if compute_context.execution_properties.has_events(): + yield from compute_context.execution_properties.consume_events() def execute_core_compute( From 4cd7836ae35d5d3c54a56106f010c7848208a0a3 Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Tue, 5 Dec 2023 18:54:10 -0500 Subject: [PATCH 13/17] move more to execution props --- .../_core/execution/context/compute.py | 58 +++++++++++-------- .../_core/execution/plan/compute_generator.py | 10 ++-- 2 files changed, 38 insertions(+), 30 deletions(-) diff --git a/python_modules/dagster/dagster/_core/execution/context/compute.py b/python_modules/dagster/dagster/_core/execution/context/compute.py index 8554c2db74f30..950832b5cdbc8 100644 --- a/python_modules/dagster/dagster/_core/execution/context/compute.py +++ b/python_modules/dagster/dagster/_core/execution/context/compute.py @@ -66,34 +66,18 @@ from .system import StepExecutionContext -class ExecutionProperties( - # NamedTuple( - # "_ExecutionProperties", - # [ - # ("step_description", PublicAttr[str]), - # ("node_type", PublicAttr[str]), - # ("op_execution_context", PublicAttr["OpExecutionContext"]), - # ], - # ) -): +class ExecutionProperties: """Information to be used by dagster internals during execution. You should not need to access these attributes directly. """ - def __init__( - self, step_description: str, node_type: str, op_execution_context: "OpExecutionContext" - ): + def __init__(self, step_description: str, node_type: str, op_def: "OpDefinition"): self._step_description = step_description self._node_type = node_type - self._op_execution_context = op_execution_context + self._op_def = op_def self._events: List[DagsterEvent] = [] - self._requires_typed_event_stream = None - # return super(ExecutionProperties, cls).__new__( - # cls, - # step_description=step_description, - # node_type=node_type, - # op_execution_context=op_execution_context, - # ) + self._requires_typed_event_stream = False + self._typed_event_stream_error_message = None @property def step_description(self) -> str: @@ -104,8 +88,8 @@ def node_type(self) -> str: return self._node_type @property - def op_execution_context(self) -> "OpExecutionContext": - return self._op_execution_context + def op_def(self) -> "OpDefinition": + return self._op_def def consume_events(self) -> Iterator[DagsterEvent]: events = self._events @@ -125,6 +109,18 @@ def log_event(self, event: UserEvent, step_execution_context: StepExecutionConte else: check.failed(f"Unexpected event {event}") + @property + def requires_typed_event_stream(self) -> bool: + return self._requires_typed_event_stream + + @property + def typed_event_stream_error_message(self) -> Optional[str]: + return self._typed_event_stream_error_message + + def set_requires_typed_event_stream(self, *, error_message: Optional[str] = None): + self._requires_typed_event_stream = True + self._typed_event_stream_error_message = error_message + # This metaclass has to exist for OpExecutionContext to have a metaclass class AbstractComputeMetaclass(ABCMeta): @@ -241,6 +237,19 @@ def __init__(self, step_execution_context: StepExecutionContext): self._events: List[DagsterEvent] = [] self._output_metadata: Dict[str, Any] = {} + self._execution_props = ExecutionProperties( # TODO - maybe swap to this being None here and creating/caching in the property + step_description=self._step_execution_context.describe_op(), + node_type="op", + op_def=cast( + OpDefinition, + self._step_execution_context.job_def.get_node(self.node_handle).definition, + ), + ) + + @property + def execution_properties(self) -> ExecutionProperties: + return self._execution_props + @public @property def op_config(self) -> Any: @@ -1486,6 +1495,7 @@ def __init__(self, op_execution_context: OpExecutionContext) -> None: retry_number=self._op_execution_context.retry_number, ) + # TODO - confirm accuracy of this comment # start execution_props as None since enter_execution_context builds an AssetExecutionContext # for all steps (including ops) and ops will fail on self.assets_def call self._execution_props = None @@ -1511,7 +1521,7 @@ def execution_properties(self) -> ExecutionProperties: self._execution_props = ExecutionProperties( step_description=f"asset {self.op_execution_context.node_handle}", node_type="asset", - op_execution_context=self._op_execution_context, + op_def=self.op_execution_context.op_def, ) return self._execution_props diff --git a/python_modules/dagster/dagster/_core/execution/plan/compute_generator.py b/python_modules/dagster/dagster/_core/execution/plan/compute_generator.py index ee4859805f60a..e9c83fe3c86ad 100644 --- a/python_modules/dagster/dagster/_core/execution/plan/compute_generator.py +++ b/python_modules/dagster/dagster/_core/execution/plan/compute_generator.py @@ -34,6 +34,7 @@ from dagster._core.errors import DagsterInvariantViolationError from dagster._core.types.dagster_type import DagsterTypeKind, is_generic_output_annotation from dagster._utils import is_named_tuple_instance +from dagster._utils.log import get_dagster_logger from dagster._utils.warnings import disable_dagster_warnings from ..context.compute import ( @@ -280,10 +281,7 @@ def validate_and_coerce_op_result_to_iterator( # results that will be registered in the instance, without additional fancy inference (like # wrapping `None` in an `Output`). We therefore skip any return-specific validation for this # mode and treat returned values as if they were yielded. - elif ( - output_defs - and context.execution_properties.op_execution_context.requires_typed_event_stream - ): + elif output_defs and context.execution_properties.requires_typed_event_stream: # If nothing was returned, treat it as an empty tuple instead of a `(None,)`. # This is important for delivering the correct error message when an output is missing. if result is None: @@ -298,7 +296,7 @@ def validate_and_coerce_op_result_to_iterator( result, context, output_defs ): annotation = _get_annotation_for_output_position( - position, context.execution_properties.op_execution_context.op_def, output_defs + position, context.execution_properties.op_def, output_defs ) if output_def.is_dynamic: if not isinstance(element, list): @@ -357,7 +355,7 @@ def validate_and_coerce_op_result_to_iterator( f"Received instead an object of type {type(element)}." ) if result is None and output_def.is_required is False: - context.execution_properties.op_execution_context.log.warning( + get_dagster_logger().warning( 'Value "None" returned for non-required output ' f'"{output_def.name}" of {context.execution_properties.step_description}. ' "This value will be passed to downstream " From 6113cbaa009a86f753bf61df0ab61634e9a38ed9 Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Tue, 5 Dec 2023 19:11:28 -0500 Subject: [PATCH 14/17] update for execution properties --- .../dagster/_core/execution/context/invocation.py | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/python_modules/dagster/dagster/_core/execution/context/invocation.py b/python_modules/dagster/dagster/_core/execution/context/invocation.py index 141705e4a25fd..2fe58e73cb7a7 100644 --- a/python_modules/dagster/dagster/_core/execution/context/invocation.py +++ b/python_modules/dagster/dagster/_core/execution/context/invocation.py @@ -56,7 +56,7 @@ from dagster._utils.merger import merge_dicts from dagster._utils.warnings import deprecation_warning -from .compute import ExecutionProperties, OpExecutionContext +from .compute import OpExecutionContext from .system import StepExecutionContext, TypeCheckContext @@ -294,8 +294,6 @@ def __init__( # my_op(ctx) # ctx._execution_properties is cleared at the beginning of the next invocation self._execution_properties = RunlessExecutionProperties() - self._execution_props = None - def __enter__(self): self._cm_scope_entered = True return self @@ -404,14 +402,11 @@ def bind( op_config=op_config, step_description=step_description, ) - self._execution_props = ExecutionProperties( - step_description=f'op "{op_def.name}"', node_type="op", op_execution_context=self - ) + return self def unbind(self): self._bound_properties = None - self._execution_props = None @property def is_bound(self) -> bool: From 5cd5378fd14b12c2545f07dcfad08a16fc977c10 Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Tue, 5 Dec 2023 19:43:43 -0500 Subject: [PATCH 15/17] cleanup --- .../dagster/dagster/_core/execution/context/compute.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/python_modules/dagster/dagster/_core/execution/context/compute.py b/python_modules/dagster/dagster/_core/execution/context/compute.py index 950832b5cdbc8..d2fbdd8f02cfc 100644 --- a/python_modules/dagster/dagster/_core/execution/context/compute.py +++ b/python_modules/dagster/dagster/_core/execution/context/compute.py @@ -128,10 +128,6 @@ class AbstractComputeMetaclass(ABCMeta): class AbstractComputeExecutionContext(ABC, metaclass=AbstractComputeMetaclass): - """Base class for op context implemented by OpExecutionContext, - and DagstermillExecutionContext. - """ - """Base class for op context implemented by OpExecutionContext and DagstermillExecutionContext.""" @abstractmethod @@ -532,8 +528,6 @@ def consume_events(self) -> Iterator[DagsterEvent]: If consume_events has not yet been called, this will yield all logged events since the beginning of the op's computation. If consume_events has been called, it will yield all events since the last time consume_events was called. Designed for internal use. Users should never need to invoke this method. """ - # events = self.execution_properties.events - # self._events = [] yield from self.execution_properties.consume_events() @public From 9b04729809f276cc32f1fefaeeef3f3cbdd9cde0 Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Tue, 5 Dec 2023 19:57:30 -0500 Subject: [PATCH 16/17] its all soup --- .../_core/execution/context/compute.py | 25 ++++++++++--------- .../_core/execution/plan/compute_generator.py | 2 ++ 2 files changed, 15 insertions(+), 12 deletions(-) diff --git a/python_modules/dagster/dagster/_core/execution/context/compute.py b/python_modules/dagster/dagster/_core/execution/context/compute.py index d2fbdd8f02cfc..e530009d2523e 100644 --- a/python_modules/dagster/dagster/_core/execution/context/compute.py +++ b/python_modules/dagster/dagster/_core/execution/context/compute.py @@ -99,15 +99,8 @@ def consume_events(self) -> Iterator[DagsterEvent]: def has_events(self) -> bool: return bool(self._events) - def log_event(self, event: UserEvent, step_execution_context: StepExecutionContext) -> None: - if isinstance(event, AssetMaterialization): - self._events.append(DagsterEvent.asset_materialization(step_execution_context, event)) - elif isinstance(event, AssetObservation): - self._events.append(DagsterEvent.asset_observation(step_execution_context, event)) - elif isinstance(event, ExpectationResult): - self._events.append(DagsterEvent.step_expectation_result(step_execution_context, event)) - else: - check.failed(f"Unexpected event {event}") + def log_event(self, event: DagsterEvent): + self._events.append(event) @property def requires_typed_event_stream(self) -> bool: @@ -549,9 +542,17 @@ def log_event(self, event: UserEvent) -> None: def log_materialization(context): context.log_event(AssetMaterialization("foo")) """ - self.execution_properties.log_event( - event=event, step_execution_context=self._step_execution_context - ) + if isinstance(event, AssetMaterialization): + dagster_event = DagsterEvent.asset_materialization(self._step_execution_context, event) + elif isinstance(event, AssetObservation): + dagster_event = DagsterEvent.asset_observation(self._step_execution_context, event) + elif isinstance(event, ExpectationResult): + dagster_event = DagsterEvent.step_expectation_result( + self._step_execution_context, event + ) + else: + check.failed(f"Unexpected event {event}") + self.execution_properties.log_event(event=dagster_event) @public def add_output_metadata( diff --git a/python_modules/dagster/dagster/_core/execution/plan/compute_generator.py b/python_modules/dagster/dagster/_core/execution/plan/compute_generator.py index e9c83fe3c86ad..4cf2a837c683a 100644 --- a/python_modules/dagster/dagster/_core/execution/plan/compute_generator.py +++ b/python_modules/dagster/dagster/_core/execution/plan/compute_generator.py @@ -114,6 +114,8 @@ def invoke_compute_fn( config_arg_cls: Optional[Type[Config]], resource_args: Optional[Dict[str, str]] = None, ) -> Any: + # TODO - this is a possible execution pathway for both direct invocation and normal execution. Need to figure + # out the implications for the context args_to_pass = {**kwargs} if config_arg_cls: # config_arg_cls is either a Config class or a primitive type From fe91874ef4c331e8acdb79899e5110aa211aa389 Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Wed, 6 Dec 2023 12:06:46 -0500 Subject: [PATCH 17/17] more reorg --- .../_core/execution/context/compute.py | 22 +++++++++- .../_core/execution/context/invocation.py | 43 +++++++++++-------- .../_core/execution/plan/compute_generator.py | 6 ++- 3 files changed, 49 insertions(+), 22 deletions(-) diff --git a/python_modules/dagster/dagster/_core/execution/context/compute.py b/python_modules/dagster/dagster/_core/execution/context/compute.py index e530009d2523e..c029d781b410f 100644 --- a/python_modules/dagster/dagster/_core/execution/context/compute.py +++ b/python_modules/dagster/dagster/_core/execution/context/compute.py @@ -71,13 +71,16 @@ class ExecutionProperties: You should not need to access these attributes directly. """ - def __init__(self, step_description: str, node_type: str, op_def: "OpDefinition"): + def __init__( + self, step_description: str, node_type: str, op_def: "OpDefinition", op_config: Any + ): self._step_description = step_description self._node_type = node_type self._op_def = op_def self._events: List[DagsterEvent] = [] self._requires_typed_event_stream = False self._typed_event_stream_error_message = None + self._op_config = op_config @property def step_description(self) -> str: @@ -91,6 +94,10 @@ def node_type(self) -> str: def op_def(self) -> "OpDefinition": return self._op_def + @property + def op_config(self) -> Any: + return self._op_config + def consume_events(self) -> Iterator[DagsterEvent]: events = self._events self._events = [] @@ -168,11 +175,20 @@ def op_config(self) -> Any: class ContextHasExecutionProperties(ABC): + """Base class that any context that can be used for execution or invocation of an op or asset + must implement. + """ + @property @abstractmethod def execution_properties(self) -> ExecutionProperties: """Context classes must contain an instance of ExecutionProperties.""" + @property + @abstractmethod + def resources(self) -> Any: + """Context classes must be able to provide currently available resources.""" + class OpExecutionContextMetaClass(AbstractComputeMetaclass): def __instancecheck__(cls, instance) -> bool: @@ -233,6 +249,7 @@ def __init__(self, step_execution_context: StepExecutionContext): OpDefinition, self._step_execution_context.job_def.get_node(self.node_handle).definition, ), + op_config=self._step_execution_context.op_config, ) @property @@ -243,7 +260,7 @@ def execution_properties(self) -> ExecutionProperties: @property def op_config(self) -> Any: """Any: The parsed config specific to this op.""" - return self._step_execution_context.op_config + return self.execution_properties.op_config @property def dagster_run(self) -> DagsterRun: @@ -1517,6 +1534,7 @@ def execution_properties(self) -> ExecutionProperties: step_description=f"asset {self.op_execution_context.node_handle}", node_type="asset", op_def=self.op_execution_context.op_def, + op_config=self.op_execution_context.op_config, ) return self._execution_props diff --git a/python_modules/dagster/dagster/_core/execution/context/invocation.py b/python_modules/dagster/dagster/_core/execution/context/invocation.py index 2fe58e73cb7a7..3ba23e8202766 100644 --- a/python_modules/dagster/dagster/_core/execution/context/invocation.py +++ b/python_modules/dagster/dagster/_core/execution/context/invocation.py @@ -56,7 +56,7 @@ from dagster._utils.merger import merge_dicts from dagster._utils.warnings import deprecation_warning -from .compute import OpExecutionContext +from .compute import ExecutionProperties, OpExecutionContext from .system import StepExecutionContext, TypeCheckContext @@ -106,18 +106,26 @@ def __new__( ) -class RunlessExecutionProperties: - """Maintains information about the invocation that is updated during execution time. This information - needs to be available to the user once invocation is complete, so that they can assert on events and - outputs. It needs to be cleared before the context is used for another invocation. +class RunlessExecutionProperties(ExecutionProperties): + """Maintains properties that need to be available to the execution code. To support runless execution + (direct invocation) this class also maintains information about the invocation that is updated + during execution time. This information needs to be available to the user once invocation is + complete, so that they can assert on events and outputs. It needs to be cleared before the + context is used for another invocation. """ - def __init__(self): + def __init__( + self, step_description: str, node_type: str, op_def: "OpDefinition", op_config: Any + ): + self._step_description = step_description + self._node_type = node_type + self._op_def = op_def self._events: List[UserEvent] = [] self._seen_outputs = {} self._output_metadata = {} self._requires_typed_event_stream = False self._typed_event_stream_error_message = None + self._op_config = op_config @property def user_events(self): @@ -131,14 +139,6 @@ def seen_outputs(self): def output_metadata(self): return self._output_metadata - @property - def requires_typed_event_stream(self) -> bool: - return self._requires_typed_event_stream - - @property - def typed_event_stream_error_message(self) -> Optional[str]: - return self._typed_event_stream_error_message - def log_event(self, event: UserEvent) -> None: check.inst_param( event, @@ -292,7 +292,7 @@ def __init__( # my_op(ctx) # ctx._execution_properties.output_metadata # information is retained after invocation # my_op(ctx) # ctx._execution_properties is cleared at the beginning of the next invocation - self._execution_properties = RunlessExecutionProperties() + self._execution_properties = None def __enter__(self): self._cm_scope_entered = True @@ -326,9 +326,6 @@ def bind( f"This context is currently being used to execute {self.alias}. The context cannot be used to execute another op until {self.alias} has finished executing." ) - # reset execution_properties - self._execution_properties = RunlessExecutionProperties() - # update the bound context with properties relevant to the execution of the op invocation_tags = ( @@ -403,6 +400,11 @@ def bind( step_description=step_description, ) + # reset execution_properties + self._execution_properties = RunlessExecutionProperties( + step_description=step_description, node_type="op", op_def=op_def, op_config=op_config + ) + return self def unbind(self): @@ -414,6 +416,11 @@ def is_bound(self) -> bool: @property def execution_properties(self) -> RunlessExecutionProperties: + if self._execution_properties is None: + raise DagsterInvalidPropertyError( + "Cannot access execution_properties until after the context has been used to" + " invoke an op" + ) return self._execution_properties @property diff --git a/python_modules/dagster/dagster/_core/execution/plan/compute_generator.py b/python_modules/dagster/dagster/_core/execution/plan/compute_generator.py index 4cf2a837c683a..e5fba715e937e 100644 --- a/python_modules/dagster/dagster/_core/execution/plan/compute_generator.py +++ b/python_modules/dagster/dagster/_core/execution/plan/compute_generator.py @@ -120,10 +120,12 @@ def invoke_compute_fn( if config_arg_cls: # config_arg_cls is either a Config class or a primitive type if issubclass(config_arg_cls, Config): - to_pass = config_arg_cls._get_non_default_public_field_values_cls(context.op_config) # noqa: SLF001 + to_pass = config_arg_cls._get_non_default_public_field_values_cls( # noqa: SLF001 + context.execution_properties.op_config + ) args_to_pass["config"] = config_arg_cls(**to_pass) else: - args_to_pass["config"] = context.op_config + args_to_pass["config"] = context.execution_properties.op_config if resource_args: for resource_name, arg_name in resource_args.items(): args_to_pass[arg_name] = context.resources.original_resource_dict[resource_name]