From 257594857103c833c928d770dbdbadf2a420229c Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Thu, 16 Nov 2023 10:05:57 -0500 Subject: [PATCH] update typing and naming --- .../_core/execution/context/compute.py | 8 +++-- .../dagster/_core/execution/plan/compute.py | 15 ++++---- .../_core/execution/plan/compute_generator.py | 35 ++++++++++--------- .../test_asset_execution_context.py | 4 +-- 4 files changed, 33 insertions(+), 29 deletions(-) diff --git a/python_modules/dagster/dagster/_core/execution/context/compute.py b/python_modules/dagster/dagster/_core/execution/context/compute.py index f7e4f78003631..7e643b7e2ba26 100644 --- a/python_modules/dagster/dagster/_core/execution/context/compute.py +++ b/python_modules/dagster/dagster/_core/execution/context/compute.py @@ -135,7 +135,7 @@ def op_config(self) -> Any: """The parsed config specific to this op.""" -class HasExecutionProperties(ABC): +class ContextHasExecutionProperties(ABC): @property @abstractmethod def execution_properties(self) -> ExecutionProperties: @@ -161,7 +161,9 @@ def __instancecheck__(cls, instance) -> bool: class OpExecutionContext( - AbstractComputeExecutionContext, HasExecutionProperties, metaclass=OpExecutionContextMetaClass + AbstractComputeExecutionContext, + ContextHasExecutionProperties, + metaclass=OpExecutionContextMetaClass, ): """The ``context`` object that can be made available as the first argument to the function used for computing an op or asset. @@ -589,7 +591,7 @@ def retry_number(self) -> int: return self._step_execution_context.previous_attempt_count def describe_op(self) -> str: - return self.execution_info.step_description + return self.execution_properties.step_description @public def get_mapping_key(self) -> Optional[str]: diff --git a/python_modules/dagster/dagster/_core/execution/plan/compute.py b/python_modules/dagster/dagster/_core/execution/plan/compute.py index 70682230a17b2..d4201b5ea1efb 100644 --- a/python_modules/dagster/dagster/_core/execution/plan/compute.py +++ b/python_modules/dagster/dagster/_core/execution/plan/compute.py @@ -36,8 +36,7 @@ ) from dagster._core.events import DagsterEvent from dagster._core.execution.context.compute import ( - AssetExecutionContext, - OpExecutionContext, + ContextHasExecutionProperties, ) from dagster._core.execution.context.system import StepExecutionContext from dagster._core.system_config.objects import ResolvedRunConfig @@ -154,7 +153,7 @@ def _yield_compute_results( step_context: StepExecutionContext, inputs: Mapping[str, Any], compute_fn: OpComputeFunction, - compute_context: Union[OpExecutionContext, AssetExecutionContext], + compute_context: ContextHasExecutionProperties, ) -> Iterator[OpOutputUnion]: user_event_generator = compute_fn(compute_context, inputs) @@ -176,7 +175,7 @@ def _yield_compute_results( if inspect.isasyncgen(user_event_generator): user_event_generator = gen_from_async_gen(user_event_generator) - step_label = compute_context.execution_info.step_description + step_label = compute_context.execution_properties.step_description for event in iterate_with_context( lambda: op_execution_error_boundary( @@ -189,11 +188,11 @@ def _yield_compute_results( ), user_event_generator, ): - if compute_context.execution_info.op_execution_context.has_events(): - yield from compute_context.execution_info.op_execution_context.consume_events() + if compute_context.execution_properties.op_execution_context.has_events(): + yield from compute_context.execution_properties.op_execution_context.consume_events() yield _validate_event(event, step_context) - if compute_context.execution_info.op_execution_context.has_events(): + if compute_context.execution_properties.op_execution_context.has_events(): yield from compute_context.consume_events() @@ -201,7 +200,7 @@ def execute_core_compute( step_context: StepExecutionContext, inputs: Mapping[str, Any], compute_fn: OpComputeFunction, - compute_context: Union[OpExecutionContext, AssetExecutionContext], + compute_context: ContextHasExecutionProperties, ) -> Iterator[OpOutputUnion]: """Execute the user-specified compute for the op. Wrap in an error boundary and do all relevant logging and metrics tracking. diff --git a/python_modules/dagster/dagster/_core/execution/plan/compute_generator.py b/python_modules/dagster/dagster/_core/execution/plan/compute_generator.py index 4f0858da9a459..9067e5e68f3c9 100644 --- a/python_modules/dagster/dagster/_core/execution/plan/compute_generator.py +++ b/python_modules/dagster/dagster/_core/execution/plan/compute_generator.py @@ -36,7 +36,10 @@ from dagster._utils import is_named_tuple_instance from dagster._utils.warnings import disable_dagster_warnings -from ..context.compute import AssetExecutionContext, OpExecutionContext +from ..context.compute import ( + ContextHasExecutionProperties, + OpExecutionContext, +) def create_op_compute_wrapper( @@ -137,7 +140,7 @@ def _coerce_op_compute_fn_to_iterator( def _zip_and_iterate_op_result( result: Any, - context: Union[OpExecutionContext, AssetExecutionContext], + context: ContextHasExecutionProperties, output_defs: Sequence[OutputDefinition], ) -> Iterator[Tuple[int, Any, OutputDefinition]]: # Filtering the expected output defs here is an unfortunate temporary solution to deal with the @@ -166,7 +169,7 @@ def _zip_and_iterate_op_result( # MaterializeResult. def _filter_expected_output_defs( result: Any, - context: Union[OpExecutionContext, AssetExecutionContext], + context: ContextHasExecutionProperties, output_defs: Sequence[OutputDefinition], ) -> Sequence[OutputDefinition]: result_tuple = ( @@ -184,7 +187,7 @@ def _filter_expected_output_defs( def _validate_multi_return( - context: Union[OpExecutionContext, AssetExecutionContext], + context: ContextHasExecutionProperties, result: Any, output_defs: Sequence[OutputDefinition], ) -> Any: @@ -200,7 +203,7 @@ def _validate_multi_return( # When returning from an op with multiple outputs, the returned object must be a tuple of the same length as the number of outputs. At the time of the op's construction, we verify that a provided annotation is a tuple with the same length as the number of outputs, so if the result matches the number of output defs on the op, it will transitively also match the annotation. if not isinstance(result, tuple): raise DagsterInvariantViolationError( - f"{context.execution_info.step_description} has multiple outputs, but only one " + f"{context.execution_properties.step_description} has multiple outputs, but only one " f"output was returned of type {type(result)}. When using " "multiple outputs, either yield each output, or return a tuple " "containing a value for each output. Check out the " @@ -211,9 +214,9 @@ def _validate_multi_return( if not len(output_tuple) == len(output_defs): raise DagsterInvariantViolationError( "Length mismatch between returned tuple of outputs and number of " - f"output defs on {context.execution_info.step_description}. Output tuple has " + f"output defs on {context.execution_properties.step_description}. Output tuple has " f"{len(output_tuple)} outputs, while " - f"{context.execution_info.step_description} has {len(output_defs)} outputs." + f"{context.execution_properties.step_description} has {len(output_defs)} outputs." ) return result @@ -247,7 +250,7 @@ def _check_output_object_name( def validate_and_coerce_op_result_to_iterator( result: Any, - context: Union[AssetExecutionContext, OpExecutionContext], + context: ContextHasExecutionProperties, output_defs: Sequence[OutputDefinition], ) -> Iterator[Any]: if inspect.isgenerator(result): @@ -256,7 +259,7 @@ def validate_and_coerce_op_result_to_iterator( yield event elif isinstance(result, (AssetMaterialization, ExpectationResult)): raise DagsterInvariantViolationError( - f"Error in {context.execution_info.step_description}: If you are " + f"Error in {context.execution_properties.step_description}: If you are " "returning an AssetMaterialization " "or an ExpectationResult from " "an op you must yield them " @@ -269,8 +272,8 @@ def validate_and_coerce_op_result_to_iterator( yield result elif result is not None and not output_defs: raise DagsterInvariantViolationError( - f"Error in {context.execution_info.step_description}: Unexpectedly returned output of type" - f" {type(result)}. {context.execution_info.step_description} is explicitly defined to" + f"Error in {context.execution_properties.step_description}: Unexpectedly returned output of type" + f" {type(result)}. {context.execution_properties.step_description} is explicitly defined to" " return no results." ) # `requires_typed_event_stream` is a mode where we require users to return/yield exactly the @@ -295,7 +298,7 @@ def validate_and_coerce_op_result_to_iterator( if output_def.is_dynamic: if not isinstance(element, list): raise DagsterInvariantViolationError( - f"Error with output for {context.execution_info.step_description}: " + f"Error with output for {context.execution_properties.step_description}: " f"dynamic output '{output_def.name}' expected a list of " "DynamicOutput objects, but instead received instead an " f"object of type {type(element)}." @@ -303,7 +306,7 @@ def validate_and_coerce_op_result_to_iterator( for item in element: if not isinstance(item, DynamicOutput): raise DagsterInvariantViolationError( - f"Error with output for {context.execution_info.step_description}: " + f"Error with output for {context.execution_properties.step_description}: " f"dynamic output '{output_def.name}' at position {position} expected a " "list of DynamicOutput objects, but received an " f"item with type {type(item)}." @@ -325,7 +328,7 @@ def validate_and_coerce_op_result_to_iterator( annotation ): raise DagsterInvariantViolationError( - f"Error with output for {context.execution_info.step_description}: received Output object for" + f"Error with output for {context.execution_properties.step_description}: received Output object for" f" output '{output_def.name}' which does not have an Output annotation." f" Annotation has type {annotation}." ) @@ -343,7 +346,7 @@ def validate_and_coerce_op_result_to_iterator( # output object was not received, throw an error. if is_generic_output_annotation(annotation): raise DagsterInvariantViolationError( - f"Error with output for {context.execution_info.step_description}: output " + f"Error with output for {context.execution_properties.step_description}: output " f"'{output_def.name}' has generic output annotation, " "but did not receive an Output object for this output. " f"Received instead an object of type {type(element)}." @@ -351,7 +354,7 @@ def validate_and_coerce_op_result_to_iterator( if result is None and output_def.is_required is False: context.log.warning( 'Value "None" returned for non-required output ' - f'"{output_def.name}" of {context.execution_info.step_description}. ' + f'"{output_def.name}" of {context.execution_properties.step_description}. ' "This value will be passed to downstream " f"{context.op_def.node_type_str}s. For conditional " "execution, results must be yielded: " diff --git a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_execution_context.py b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_execution_context.py index d6d2e762331e8..c8ed51c1cc222 100644 --- a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_execution_context.py +++ b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_execution_context.py @@ -79,8 +79,8 @@ def test_deprecation_warnings(): "is_subset", "partition_keys", "get", - "execution_info", - "_execution_info", + "execution_properties", + "_execution_props", ] other_ignores = [