diff --git a/python_modules/dagster/dagster/_core/definitions/op_invocation.py b/python_modules/dagster/dagster/_core/definitions/op_invocation.py index 3dc8e5efc1a67..d77dd6eb81d3a 100644 --- a/python_modules/dagster/dagster/_core/definitions/op_invocation.py +++ b/python_modules/dagster/dagster/_core/definitions/op_invocation.py @@ -32,7 +32,8 @@ from .result import MaterializeResult if TYPE_CHECKING: - from ..execution.context.invocation import DirectOpExecutionContext + from ..execution.context.compute import OpExecutionContext + from ..execution.context.invocation import BaseDirectExecutionContext from .assets import AssetsDefinition from .composition import PendingNodeInvocation from .decorators.op_decorator import DecoratedOpFunction @@ -100,6 +101,16 @@ def _separate_args_and_kwargs( ) +def _get_op_context( + context, +) -> "OpExecutionContext": + from dagster._core.execution.context.compute import AssetExecutionContext + + if isinstance(context, AssetExecutionContext): + return context.op_execution_context + return context + + def direct_invocation_result( def_or_invocation: Union[ "OpDefinition", "PendingNodeInvocation[OpDefinition]", "AssetsDefinition" @@ -109,7 +120,7 @@ def direct_invocation_result( ) -> Any: from dagster._config.pythonic_config import Config from dagster._core.execution.context.invocation import ( - DirectOpExecutionContext, + BaseDirectExecutionContext, build_op_context, ) @@ -149,12 +160,12 @@ def direct_invocation_result( " no context was provided when invoking." ) if len(args) > 0: - if args[0] is not None and not isinstance(args[0], DirectOpExecutionContext): + if args[0] is not None and not isinstance(args[0], BaseDirectExecutionContext): raise DagsterInvalidInvocationError( f"Decorated function '{compute_fn.name}' has context argument, " "but no context was provided when invoking." ) - context = cast(DirectOpExecutionContext, args[0]) + context = args[0] # update args to omit context args = args[1:] else: # context argument is provided under kwargs @@ -165,14 +176,14 @@ def direct_invocation_result( f"'{context_param_name}', but no value for '{context_param_name}' was " f"found when invoking. Provided kwargs: {kwargs}" ) - context = cast(DirectOpExecutionContext, kwargs[context_param_name]) + context = kwargs[context_param_name] # update kwargs to remove context kwargs = { kwarg: val for kwarg, val in kwargs.items() if not kwarg == context_param_name } # allow passing context, even if the function doesn't have an arg for it - elif len(args) > 0 and isinstance(args[0], DirectOpExecutionContext): - context = cast(DirectOpExecutionContext, args[0]) + elif len(args) > 0 and isinstance(args[0], BaseDirectExecutionContext): + context = args[0] args = args[1:] resource_arg_mapping = {arg.name: arg.name for arg in compute_fn.get_resource_args()} @@ -230,7 +241,7 @@ def direct_invocation_result( def _resolve_inputs( - op_def: "OpDefinition", args, kwargs, context: "DirectOpExecutionContext" + op_def: "OpDefinition", args, kwargs, context: "BaseDirectExecutionContext" ) -> Mapping[str, Any]: from dagster._core.execution.plan.execute_step import do_type_check @@ -333,7 +344,7 @@ def _resolve_inputs( return input_dict -def _key_for_result(result: MaterializeResult, context: "DirectOpExecutionContext") -> AssetKey: +def _key_for_result(result: MaterializeResult, context: "BaseDirectExecutionContext") -> AssetKey: if not context.per_invocation_properties.assets_def: raise DagsterInvariantViolationError( f"Op {context.per_invocation_properties.alias} does not have an assets definition." @@ -355,7 +366,7 @@ def _key_for_result(result: MaterializeResult, context: "DirectOpExecutionContex def _output_name_for_result_obj( event: MaterializeResult, - context: "DirectOpExecutionContext", + context: "BaseDirectExecutionContext", ): if not context.per_invocation_properties.assets_def: raise DagsterInvariantViolationError( @@ -368,7 +379,7 @@ def _output_name_for_result_obj( def _handle_gen_event( event: T, op_def: "OpDefinition", - context: "DirectOpExecutionContext", + context: "BaseDirectExecutionContext", output_defs: Mapping[str, OutputDefinition], outputs_seen: Set[str], ) -> T: @@ -402,7 +413,7 @@ def _handle_gen_event( def _type_check_output_wrapper( - op_def: "OpDefinition", result: Any, context: "DirectOpExecutionContext" + op_def: "OpDefinition", result: Any, context: "BaseDirectExecutionContext" ) -> Any: """Type checks and returns the result of a op. @@ -496,12 +507,13 @@ def type_check_gen(gen): def _type_check_function_output( - op_def: "OpDefinition", result: T, context: "DirectOpExecutionContext" + op_def: "OpDefinition", result: T, context: "BaseDirectExecutionContext" ) -> T: from ..execution.plan.compute_generator import validate_and_coerce_op_result_to_iterator output_defs_by_name = {output_def.name: output_def for output_def in op_def.output_defs} - for event in validate_and_coerce_op_result_to_iterator(result, context, op_def.output_defs): + op_context = _get_op_context(context) + for event in validate_and_coerce_op_result_to_iterator(result, op_context, op_def.output_defs): if isinstance(event, (Output, DynamicOutput)): _type_check_output(output_defs_by_name[event.output_name], event, context) elif isinstance(event, (MaterializeResult)): @@ -515,14 +527,14 @@ def _type_check_function_output( def _type_check_output( output_def: "OutputDefinition", output: Union[Output, DynamicOutput], - context: "DirectOpExecutionContext", + context: "BaseDirectExecutionContext", ) -> None: """Validates and performs core type check on a provided output. Args: output_def (OutputDefinition): The output definition to validate against. output (Any): The output to validate. - context (DirectOpExecutionContext): Context containing resources to be used for type + context (BaseDirectExecutionContext): Context containing resources to be used for type check. """ from ..execution.plan.execute_step import do_type_check diff --git a/python_modules/dagster/dagster/_core/execution/context/compute.py b/python_modules/dagster/dagster/_core/execution/context/compute.py index fb859cb42cee5..90e33e6e5c316 100644 --- a/python_modules/dagster/dagster/_core/execution/context/compute.py +++ b/python_modules/dagster/dagster/_core/execution/context/compute.py @@ -38,6 +38,7 @@ AssetKey, AssetMaterialization, AssetObservation, + CoercibleToAssetKey, ExpectationResult, UserEvent, ) @@ -1459,6 +1460,29 @@ def job_def(self) -> JobDefinition: """ return self.op_execution_context.job_def + @public + def latest_materialization_for_upstream_asset( + self, key: CoercibleToAssetKey + ) -> Optional[AssetMaterialization]: + """Get the most recent AssetMaterialization event for the key. The key must be an upstream + asset for the currently materializing asset. Information like metadata and tags can be found + on the AssetMaterialization. If the key is not an upstream asset of the currently + materializing asset, an error will be raised. If no AssetMaterialization exists for key, None + will be returned. + + Returns: Optional[AssetMaterialization] + """ + materialization_events = ( + self.op_execution_context._step_execution_context.upstream_asset_materialization_events # noqa: SLF001 + ) + if AssetKey.from_coercible(key) in materialization_events.keys(): + return materialization_events.get(AssetKey.from_coercible(key)) + + raise DagsterInvariantViolationError( + f"Cannot fetch AssetMaterialization for asset {key}. {key} must be an upstream dependency" + "in order to call latest_materialization_for_upstream_asset." + ) + ######## Deprecated methods @deprecated(**_get_deprecation_kwargs("dagster_run")) diff --git a/python_modules/dagster/dagster/_core/execution/context/invocation.py b/python_modules/dagster/dagster/_core/execution/context/invocation.py index 02038f15127ed..0719115dfc583 100644 --- a/python_modules/dagster/dagster/_core/execution/context/invocation.py +++ b/python_modules/dagster/dagster/_core/execution/context/invocation.py @@ -1,3 +1,4 @@ +from abc import abstractmethod from contextlib import ExitStack from typing import ( AbstractSet, @@ -21,6 +22,7 @@ from dagster._core.definitions.events import ( AssetMaterialization, AssetObservation, + CoercibleToAssetKey, ExpectationResult, UserEvent, ) @@ -56,14 +58,60 @@ from dagster._utils.merger import merge_dicts from dagster._utils.warnings import deprecation_warning -from .compute import OpExecutionContext +from .compute import AssetExecutionContext, OpExecutionContext from .system import StepExecutionContext, TypeCheckContext def _property_msg(prop_name: str, method_name: str) -> str: - return ( - f"The {prop_name} {method_name} is not set on the context when an op is directly invoked." - ) + return f"The {prop_name} {method_name} is not set on the context when an asset or op is directly invoked." + + +class BaseDirectExecutionContext: + """Base class for any direct invocation execution contexts. Each type of execution context + (ex. OpExecutionContext, AssetExecutionContext) needs to have a variant for direct invocation. + Those direct invocation contexts have some methods that are not available until the context + is bound to a particular op/asset. The "bound" properties are held in PerInvocationProperties. + There are also some properties that are specific to a particular execution of an op/asset, these + properties are held in DirectExecutionProperties. Direct invocation contexts must + be able to be bound and unbound from a particular op/asset. Additionally, there are some methods + that all direct invocation contexts must implement so that the will be usable in the execution + code path. + """ + + @abstractmethod + def bind( + self, + op_def: OpDefinition, + pending_invocation: Optional[PendingNodeInvocation[OpDefinition]], + assets_def: Optional[AssetsDefinition], + config_from_args: Optional[Mapping[str, Any]], + resources_from_args: Optional[Mapping[str, Any]], + ): + """Subclasses of BaseDirectExecutionContext must implement bind.""" + + @abstractmethod + def unbind(self): + """Subclasses of BaseDirectExecutionContext must implement unbind.""" + + @property + @abstractmethod + def per_invocation_properties(self) -> "PerInvocationProperties": + """Subclasses of BaseDirectExecutionContext must contain a PerInvocationProperties object.""" + + @property + @abstractmethod + def execution_properties(self) -> "DirectExecutionProperties": + """Subclasses of BaseDirectExecutionContext must contain a DirectExecutionProperties object.""" + + @abstractmethod + def for_type(self, dagster_type: DagsterType) -> TypeCheckContext: + """Subclasses of BaseDirectExecutionContext must implement for_type.""" + pass + + @abstractmethod + def observe_output(self, output_name: str, mapping_key: Optional[str] = None) -> None: + """Subclasses of BaseDirectExecutionContext must implement observe_output.""" + pass class PerInvocationProperties( @@ -127,7 +175,7 @@ def __init__(self): self.typed_event_stream_error_message: Optional[str] = None -class DirectOpExecutionContext(OpExecutionContext): +class DirectOpExecutionContext(OpExecutionContext, BaseDirectExecutionContext): """The ``context`` object available as the first argument to an op's compute function when being invoked directly. Can also be used as a context manager. """ @@ -706,6 +754,90 @@ def set_requires_typed_event_stream(self, *, error_message: Optional[str]) -> No self._execution_properties.typed_event_stream_error_message = error_message +class DirectAssetExecutionContext(AssetExecutionContext, BaseDirectExecutionContext): + """The ``context`` object available as the first argument to an asset's compute function when + being invoked directly. Can also be used as a context manager. + """ + + def __init__(self, op_execution_context: DirectOpExecutionContext): + self._op_execution_context = op_execution_context + + def __enter__(self): + self.op_execution_context._cm_scope_entered = True # noqa: SLF001 + return self + + def __exit__(self, *exc): + self.op_execution_context._exit_stack.close() # noqa: SLF001 + + def __del__(self): + self.op_execution_context._exit_stack.close() # noqa: SLF001 + + def _check_bound_to_invocation(self, fn_name: str, fn_type: str): + if not self._op_execution_context._per_invocation_properties: # noqa: SLF001 + raise DagsterInvalidPropertyError(_property_msg(fn_name, fn_type)) + + def bind( + self, + op_def: OpDefinition, + pending_invocation: Optional[PendingNodeInvocation[OpDefinition]], + assets_def: Optional[AssetsDefinition], + config_from_args: Optional[Mapping[str, Any]], + resources_from_args: Optional[Mapping[str, Any]], + ) -> "DirectAssetExecutionContext": + if assets_def is None: + raise DagsterInvariantViolationError( + "DirectAssetExecutionContext can only being used to invoke an asset." + ) + if self._op_execution_context._per_invocation_properties is not None: # noqa: SLF001 + raise DagsterInvalidInvocationError( + f"This context is currently being used to execute {self.op_execution_context.alias}." + " The context cannot be used to execute another asset until" + f" {self.op_execution_context.alias} has finished executing." + ) + + self._op_execution_context = self._op_execution_context.bind( + op_def=op_def, + pending_invocation=pending_invocation, + assets_def=assets_def, + config_from_args=config_from_args, + resources_from_args=resources_from_args, + ) + + return self + + def unbind(self): + self._op_execution_context.unbind() + + @property + def per_invocation_properties(self) -> PerInvocationProperties: + return self.op_execution_context.per_invocation_properties + + @property + def is_bound(self) -> bool: + return self.op_execution_context.is_bound + + @property + def execution_properties(self) -> DirectExecutionProperties: + return self.op_execution_context.execution_properties + + @property + def op_execution_context(self) -> DirectOpExecutionContext: + return self._op_execution_context + + def for_type(self, dagster_type: DagsterType) -> TypeCheckContext: + return self.op_execution_context.for_type(dagster_type) + + def observe_output(self, output_name: str, mapping_key: Optional[str] = None) -> None: + self.op_execution_context.observe_output(output_name=output_name, mapping_key=mapping_key) + + def latest_materialization_for_upstream_asset( + self, key: CoercibleToAssetKey + ) -> Optional[AssetMaterialization]: + raise DagsterInvalidPropertyError( + _property_msg("latest_materialization_for_upstream_asset", "method") + ) + + def _validate_resource_requirements( resource_defs: Mapping[str, ResourceDefinition], op_def: OpDefinition ) -> None: @@ -796,7 +928,7 @@ def build_asset_context( instance: Optional[DagsterInstance] = None, partition_key: Optional[str] = None, partition_key_range: Optional[PartitionKeyRange] = None, -): +) -> DirectAssetExecutionContext: """Builds asset execution context from provided parameters. ``build_asset_context`` can be used as either a function or context manager. If there is a @@ -823,7 +955,7 @@ def build_asset_context( with build_asset_context(resources={"foo": context_manager_resource}) as context: asset_to_invoke(context) """ - return build_op_context( + op_context = build_op_context( op_config=asset_config, resources=resources, resources_config=resources_config, @@ -831,3 +963,5 @@ def build_asset_context( partition_key_range=partition_key_range, instance=instance, ) + + return DirectAssetExecutionContext(op_execution_context=op_context) diff --git a/python_modules/dagster/dagster/_core/execution/context/system.py b/python_modules/dagster/dagster/_core/execution/context/system.py index 25acd4c272b72..85b67cf0d00cf 100644 --- a/python_modules/dagster/dagster/_core/execution/context/system.py +++ b/python_modules/dagster/dagster/_core/execution/context/system.py @@ -30,7 +30,7 @@ extract_data_version_from_entry, ) from dagster._core.definitions.dependency import OpNode -from dagster._core.definitions.events import AssetKey, AssetLineageInfo +from dagster._core.definitions.events import AssetKey, AssetLineageInfo, AssetMaterialization from dagster._core.definitions.hook_definition import HookDefinition from dagster._core.definitions.job_base import IJob from dagster._core.definitions.job_definition import JobDefinition @@ -571,6 +571,10 @@ def __init__( self._output_metadata: Dict[str, Any] = {} self._seen_outputs: Dict[str, Union[str, Set[str]]] = {} + self._upstream_asset_materialization_events: Dict[ + AssetKey, Optional[AssetMaterialization] + ] = {} + self._input_asset_version_info: Dict[AssetKey, Optional["InputAssetVersionInfo"]] = {} self._is_external_input_asset_version_info_loaded = False self._data_version_cache: Dict[AssetKey, "DataVersion"] = {} @@ -949,17 +953,28 @@ def get_data_version(self, asset_key: AssetKey) -> "DataVersion": def input_asset_records(self) -> Optional[Mapping[AssetKey, Optional["InputAssetVersionInfo"]]]: return self._input_asset_version_info + @property + def upstream_asset_materialization_events( + self, + ) -> Dict[AssetKey, Optional[AssetMaterialization]]: + return self._upstream_asset_materialization_events + @property def is_external_input_asset_version_info_loaded(self) -> bool: return self._is_external_input_asset_version_info_loaded def get_input_asset_version_info(self, key: AssetKey) -> Optional["InputAssetVersionInfo"]: if key not in self._input_asset_version_info: - self._fetch_input_asset_version_info(key) + self._fetch_input_asset_materialization_and_version_info(key) return self._input_asset_version_info[key] # "external" refers to records for inputs generated outside of this step - def fetch_external_input_asset_version_info(self) -> None: + def fetch_external_input_asset_materialization_and_version_info(self) -> None: + """Fetches the latest observation or materialization for each upstream dependency + in order to determine the version info. As a side effect we create a dictionary + of the materialization events so that the AssetContext can access the latest materialization + event. + """ output_keys = self.get_output_asset_keys() all_dep_keys: List[AssetKey] = [] @@ -973,10 +988,10 @@ def fetch_external_input_asset_version_info(self) -> None: self._input_asset_version_info = {} for key in all_dep_keys: - self._fetch_input_asset_version_info(key) + self._fetch_input_asset_materialization_and_version_info(key) self._is_external_input_asset_version_info_loaded = True - def _fetch_input_asset_version_info(self, key: AssetKey) -> None: + def _fetch_input_asset_materialization_and_version_info(self, key: AssetKey) -> None: from dagster._core.definitions.data_version import ( extract_data_version_from_entry, ) @@ -984,6 +999,7 @@ def _fetch_input_asset_version_info(self, key: AssetKey) -> None: event = self._get_input_asset_event(key) if event is None: self._input_asset_version_info[key] = None + self._upstream_asset_materialization_events[key] = None else: storage_id = event.storage_id # Input name will be none if this is an internal dep @@ -1011,6 +1027,12 @@ def _fetch_input_asset_version_info(self, key: AssetKey) -> None: data_version = extract_data_version_from_entry(event.event_log_entry) else: data_version = extract_data_version_from_entry(event.event_log_entry) + # the AssetMaterialization fetched above is only accurate if the asset it not partitioned + # if the asset is partitioned, then the latest AssetMaterialization may be for a partition + # that is irrelevant to the current execution + self._upstream_asset_materialization_events[key] = ( + event.asset_materialization if event.asset_materialization else None + ) self._input_asset_version_info[key] = InputAssetVersionInfo( storage_id, data_version, event.run_id, event.timestamp ) diff --git a/python_modules/dagster/dagster/_core/execution/plan/compute.py b/python_modules/dagster/dagster/_core/execution/plan/compute.py index 2d67fb38bab2e..d48ee9b1363b9 100644 --- a/python_modules/dagster/dagster/_core/execution/plan/compute.py +++ b/python_modules/dagster/dagster/_core/execution/plan/compute.py @@ -61,6 +61,14 @@ ] +def _get_op_context( + context: Union[OpExecutionContext, AssetExecutionContext], +) -> OpExecutionContext: + if isinstance(context, AssetExecutionContext): + return context.op_execution_context + return context + + def create_step_outputs( node: Node, handle: NodeHandle, @@ -189,12 +197,12 @@ def _yield_compute_results( ), user_event_generator, ): - if compute_context.has_events(): - yield from compute_context.consume_events() + if _get_op_context(compute_context).has_events(): + yield from _get_op_context(compute_context).consume_events() yield _validate_event(event, step_context) - if compute_context.has_events(): - yield from compute_context.consume_events() + if _get_op_context(compute_context).has_events(): + yield from _get_op_context(compute_context).consume_events() def execute_core_compute( diff --git a/python_modules/dagster/dagster/_core/execution/plan/compute_generator.py b/python_modules/dagster/dagster/_core/execution/plan/compute_generator.py index 375be39a7ea43..21583e20760e9 100644 --- a/python_modules/dagster/dagster/_core/execution/plan/compute_generator.py +++ b/python_modules/dagster/dagster/_core/execution/plan/compute_generator.py @@ -36,9 +36,18 @@ from dagster._utils import is_named_tuple_instance from dagster._utils.warnings import disable_dagster_warnings -from ..context.compute import OpExecutionContext +from ..context.compute import AssetExecutionContext, OpExecutionContext +def _get_op_context( + context: Union[OpExecutionContext, AssetExecutionContext], +) -> OpExecutionContext: + if isinstance(context, AssetExecutionContext): + return context.op_execution_context + return context + + +# called in execute_step if the fn is not decorated def create_op_compute_wrapper( op_def: OpDefinition, ) -> Callable[[OpExecutionContext, Mapping[str, InputDefinition]], Any]: @@ -94,6 +103,7 @@ def compute( return compute +# called in this file (create_op_compute_wrapper) async def _coerce_async_op_to_async_gen( awaitable: Awaitable[Any], context: OpExecutionContext, output_defs: Sequence[OutputDefinition] ) -> AsyncIterator[Any]: @@ -102,6 +112,7 @@ async def _coerce_async_op_to_async_gen( yield event +# called in this file, and in op_invocation for direct invocation def invoke_compute_fn( fn: Callable, context: OpExecutionContext, @@ -125,6 +136,7 @@ def invoke_compute_fn( return fn(context, **args_to_pass) if context_arg_provided else fn(**args_to_pass) +# called in this file (create_op_compute_wrapper) def _coerce_op_compute_fn_to_iterator( fn, output_defs, context, context_arg_provided, kwargs, config_arg_class, resource_arg_mapping ): @@ -135,6 +147,7 @@ def _coerce_op_compute_fn_to_iterator( yield event +# called in this file (validate_and_coerce_op_result_to_iterator) def _zip_and_iterate_op_result( result: Any, context: OpExecutionContext, output_defs: Sequence[OutputDefinition] ) -> Iterator[Tuple[int, Any, OutputDefinition]]: @@ -162,6 +175,7 @@ def _zip_and_iterate_op_result( # Filter out output_defs corresponding to asset check results that already exist on a # MaterializeResult. +# called in this file (_zip_and_iterate_op_result) def _filter_expected_output_defs( result: Any, context: OpExecutionContext, output_defs: Sequence[OutputDefinition] ) -> Sequence[OutputDefinition]: @@ -177,6 +191,7 @@ def _filter_expected_output_defs( return [out for out in output_defs if out.name not in remove_outputs] +# called in this file (_zip_and_iterate_op_result) def _validate_multi_return( context: OpExecutionContext, result: Any, @@ -212,6 +227,7 @@ def _validate_multi_return( return result +# called in this file (validate_and_coerce_op_result_to_iterator) def _get_annotation_for_output_position( position: int, op_def: OpDefinition, output_defs: Sequence[OutputDefinition] ) -> Any: @@ -226,6 +242,7 @@ def _get_annotation_for_output_position( return inspect.Parameter.empty +# called in this file (validate_and_coerce_op_result_to_iterator) def _check_output_object_name( output: Union[DynamicOutput, Output], output_def: OutputDefinition, position: int ) -> None: @@ -239,6 +256,7 @@ def _check_output_object_name( ) +# called in op_invocation and this file def validate_and_coerce_op_result_to_iterator( result: Any, context: OpExecutionContext, output_defs: Sequence[OutputDefinition] ) -> Iterator[Any]: diff --git a/python_modules/dagster/dagster/_core/execution/plan/execute_step.py b/python_modules/dagster/dagster/_core/execution/plan/execute_step.py index 2fbd81041bff9..b403b7e56a113 100644 --- a/python_modules/dagster/dagster/_core/execution/plan/execute_step.py +++ b/python_modules/dagster/dagster/_core/execution/plan/execute_step.py @@ -440,7 +440,7 @@ def core_dagster_event_sequence_for_step( inputs = {} if step_context.is_sda_step: - step_context.fetch_external_input_asset_version_info() + step_context.fetch_external_input_asset_materialization_and_version_info() for step_input in step_context.step.step_inputs: input_def = step_context.op_def.input_def_named(step_input.name) diff --git a/python_modules/dagster/dagster/_core/pipes/context.py b/python_modules/dagster/dagster/_core/pipes/context.py index 5b61647bbf43b..e902f335d87c1 100644 --- a/python_modules/dagster/dagster/_core/pipes/context.py +++ b/python_modules/dagster/dagster/_core/pipes/context.py @@ -39,7 +39,7 @@ from dagster._core.errors import DagsterPipesExecutionError from dagster._core.events import EngineEventData from dagster._core.execution.context.compute import OpExecutionContext -from dagster._core.execution.context.invocation import DirectOpExecutionContext +from dagster._core.execution.context.invocation import BaseDirectExecutionContext from dagster._utils.error import ( ExceptionInfo, SerializableErrorInfo, @@ -406,8 +406,8 @@ def build_external_execution_context_data( _convert_time_window(partition_time_window) if partition_time_window else None ), run_id=context.run_id, - job_name=None if isinstance(context, DirectOpExecutionContext) else context.job_name, - retry_number=0 if isinstance(context, DirectOpExecutionContext) else context.retry_number, + job_name=None if isinstance(context, BaseDirectExecutionContext) else context.job_name, + retry_number=0 if isinstance(context, BaseDirectExecutionContext) else context.retry_number, extras=extras or {}, ) diff --git a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_context.py b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_context.py index 1fd029361981b..fca48fecadf55 100644 --- a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_context.py +++ b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_context.py @@ -9,6 +9,7 @@ DagsterInstance, Definitions, GraphDefinition, + MaterializeResult, OpExecutionContext, Output, asset, @@ -426,3 +427,33 @@ def a(context: AssetExecutionContext): assert context == AssetExecutionContext.get() assert materialize([a]).success + + +def test_upstream_metadata(): + # with output metadata + @asset + def upstream(context: AssetExecutionContext): + context.add_output_metadata({"foo": "bar"}) + + @asset + def downstream(context: AssetExecutionContext, upstream): + mat = context.latest_materialization_for_upstream_asset("upstream") + assert mat is not None + assert mat.metadata["foo"].value == "bar" + + materialize([upstream, downstream]) + + +def test_upstream_metadata_materialize_result(): + # with asset materialization + @asset + def upstream(): + return MaterializeResult(metadata={"foo": "bar"}) + + @asset + def downstream(context: AssetExecutionContext, upstream): + mat = context.latest_materialization_for_upstream_asset("upstream") + assert mat is not None + assert mat.metadata["foo"].value == "bar" + + materialize([upstream, downstream]) diff --git a/python_modules/dagster/dagster_tests/core_tests/test_op_invocation.py b/python_modules/dagster/dagster_tests/core_tests/test_op_invocation.py index a0deafa4c784f..7ceb3bf2ea220 100644 --- a/python_modules/dagster/dagster_tests/core_tests/test_op_invocation.py +++ b/python_modules/dagster/dagster_tests/core_tests/test_op_invocation.py @@ -1383,7 +1383,7 @@ async def main(): with pytest.raises( DagsterInvalidInvocationError, match=r"This context is currently being used to execute .* The context" - r" cannot be used to execute another op until .* has finished executing", + r" cannot be used to execute another asset until .* has finished executing", ): asyncio.run(main())