diff --git a/python_modules/dagster/dagster/_core/execution/context/compute.py b/python_modules/dagster/dagster/_core/execution/context/compute.py index 1d16aa8a2dee4..d4291ab0ef1e9 100644 --- a/python_modules/dagster/dagster/_core/execution/context/compute.py +++ b/python_modules/dagster/dagster/_core/execution/context/compute.py @@ -4,7 +4,6 @@ from inspect import ( _empty as EmptyAnnotation, ) -import pstats from typing import ( AbstractSet, Any, @@ -1407,6 +1406,8 @@ def __init__(self, op_execution_context: OpExecutionContext) -> None: op_execution_context, "op_execution_context", OpExecutionContext ) + self._step_execution_context = op_execution_context._step_execution_context # noqa: SLF001 + @staticmethod def get() -> "AssetExecutionContext": ctx = _current_asset_execution_context.get() @@ -1470,31 +1471,65 @@ def job_def(self) -> JobDefinition: return self.op_execution_context.job_def @public - def latest_materialization_for_upstream_asset( - self, key: CoercibleToAssetKey + def fetch_latest_materialization_for_upstream_asset( + self, key: CoercibleToAssetKey, partition_key: Optional[str] = None ) -> Optional[AssetMaterialization]: - """Get the most recent AssetMaterialization event for the key. The key must be an upstream + """Get the most recent AssetMaterialization event for the key. The key must be a direct upstream asset for the currently materializing asset. Information like metadata and tags can be found on the AssetMaterialization. If the key is not an upstream asset of the currently materializing asset, an error will be raised. If no AssetMaterialization exists for key, None will be returned. + For partitioned assets, fetching the latest AssetMaterialization for the partition requires a + call to the event log. Additionally, if the currently materializing asset depends on multiple + partitions of the upstream asset, ``partition_key`` must be provided so that the latest AssetMaterialization + is fetched for the correct partition. + Returns: Optional[AssetMaterialization] """ + asset_key = AssetKey.from_coercible(key) + + if ( + self.job_def.asset_layer.input_for_asset_key( + self.op_execution_context.node_handle, AssetKey.from_coercible(asset_key) + ) + is None + ): + raise DagsterInvariantViolationError( + f"Cannot fetch AssetMaterialization for asset {asset_key}. {asset_key} must be a direct upstream dependency" + "in order to call fetch_latest_materialization_for_upstream_asset." + ) + materialization_events = ( self.op_execution_context._step_execution_context.upstream_asset_materialization_events # noqa: SLF001 ) - if self.has_partition_key: - # if the asset is par - pass + if not self._step_execution_context.has_asset_partitions_for_upstream_asset(asset_key): + return materialization_events.get(asset_key) else: - if AssetKey.from_coercible(key) in materialization_events.keys(): - return materialization_events.get(AssetKey.from_coercible(key)) + # if the asset is partitioned, the AssetMaterialization in 'materialization_events' may not correspond to the + # latest AssetMaterialization of the particular partition that is relevant to this materialization. It will + # always be the latest AssetMaterialization regardless of partition. So we need to fetch the latest + # AssetMaterialization for the specific partition key that is relevant to this materialization + if partition_key is None: + partition_keys_for_asset = list( + self._step_execution_context.asset_partitions_subset_for_upstream_asset( + asset_key + ).get_partition_keys() + ) + if len(partition_keys_for_asset) > 1: + raise DagsterInvariantViolationError( + f"Asset {self.asset_key} depends on multiple partitions of asset {asset_key}." + " fetch_latest_materialization_for_upstream asset must be given" + " a specific partition_key to fetch the AssetMaterialization for." + ) + partition_key = partition_keys_for_asset[0] - raise DagsterInvariantViolationError( - f"Cannot fetch AssetMaterialization for asset {key}. {key} must be an upstream dependency" - "in order to call latest_materialization_for_upstream_asset." + event = self.instance.get_latest_data_version_record( + asset_key, partition_key=partition_key ) + if event is None: + return event + return event.asset_materialization if event.asset_materialization else None ######## Deprecated methods