diff --git a/python_modules/dagster/dagster/_core/execution/context/compute.py b/python_modules/dagster/dagster/_core/execution/context/compute.py index 792d061509c3b..f8bc59086ae59 100644 --- a/python_modules/dagster/dagster/_core/execution/context/compute.py +++ b/python_modules/dagster/dagster/_core/execution/context/compute.py @@ -1408,6 +1408,8 @@ def __init__(self, op_execution_context: OpExecutionContext) -> None: ) self._step_execution_context = self._op_execution_context._step_execution_context # noqa: SLF001 + self._step_execution_context = op_execution_context._step_execution_context # noqa: SLF001 + @staticmethod def get() -> "AssetExecutionContext": ctx = _current_asset_execution_context.get() @@ -1471,27 +1473,65 @@ def job_def(self) -> JobDefinition: return self.op_execution_context.job_def @public - def latest_materialization_for_upstream_asset( - self, key: CoercibleToAssetKey + def fetch_latest_materialization_for_upstream_asset( + self, key: CoercibleToAssetKey, partition_key: Optional[str] = None ) -> Optional[AssetMaterialization]: - """Get the most recent AssetMaterialization event for the key. The key must be an upstream + """Get the most recent AssetMaterialization event for the key. The key must be a direct upstream asset for the currently materializing asset. Information like metadata and tags can be found on the AssetMaterialization. If the key is not an upstream asset of the currently materializing asset, an error will be raised. If no AssetMaterialization exists for key, None will be returned. + For partitioned assets, fetching the latest AssetMaterialization for the partition requires a + call to the event log. Additionally, if the currently materializing asset depends on multiple + partitions of the upstream asset, ``partition_key`` must be provided so that the latest AssetMaterialization + is fetched for the correct partition. + Returns: Optional[AssetMaterialization] """ + asset_key = AssetKey.from_coercible(key) + + if ( + self.job_def.asset_layer.input_for_asset_key( + self.op_execution_context.node_handle, AssetKey.from_coercible(asset_key) + ) + is None + ): + raise DagsterInvariantViolationError( + f"Cannot fetch AssetMaterialization for asset {asset_key}. {asset_key} must be a direct upstream dependency" + "in order to call fetch_latest_materialization_for_upstream_asset." + ) + materialization_events = ( self.op_execution_context._step_execution_context.upstream_asset_materialization_events # noqa: SLF001 ) - if AssetKey.from_coercible(key) in materialization_events.keys(): - return materialization_events.get(AssetKey.from_coercible(key)) + if not self._step_execution_context.has_asset_partitions_for_upstream_asset(asset_key): + return materialization_events.get(asset_key) + else: + # if the asset is partitioned, the AssetMaterialization in 'materialization_events' may not correspond to the + # latest AssetMaterialization of the particular partition that is relevant to this materialization. It will + # always be the latest AssetMaterialization regardless of partition. So we need to fetch the latest + # AssetMaterialization for the specific partition key that is relevant to this materialization + if partition_key is None: + partition_keys_for_asset = list( + self._step_execution_context.asset_partitions_subset_for_upstream_asset( + asset_key + ).get_partition_keys() + ) + if len(partition_keys_for_asset) > 1: + raise DagsterInvariantViolationError( + f"Asset {self.asset_key} depends on multiple partitions of asset {asset_key}." + " fetch_latest_materialization_for_upstream asset must be given" + " a specific partition_key to fetch the AssetMaterialization for." + ) + partition_key = partition_keys_for_asset[0] - raise DagsterInvariantViolationError( - f"Cannot fetch AssetMaterialization for asset {key}. {key} must be an upstream dependency" - "in order to call latest_materialization_for_upstream_asset." - ) + event = self.instance.get_latest_data_version_record( + asset_key, partition_key=partition_key + ) + if event is None: + return event + return event.asset_materialization if event.asset_materialization else None ######## Deprecated methods diff --git a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_context.py b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_context.py index fca48fecadf55..1c15456d103c1 100644 --- a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_context.py +++ b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_context.py @@ -4,18 +4,23 @@ import pytest from dagster import ( AssetCheckResult, + AssetDep, AssetExecutionContext, + AssetKey, AssetOut, DagsterInstance, + DailyPartitionsDefinition, Definitions, GraphDefinition, MaterializeResult, OpExecutionContext, Output, + TimeWindowPartitionMapping, asset, asset_check, graph_asset, graph_multi_asset, + instance_for_test, job, materialize, multi_asset, @@ -437,7 +442,7 @@ def upstream(context: AssetExecutionContext): @asset def downstream(context: AssetExecutionContext, upstream): - mat = context.latest_materialization_for_upstream_asset("upstream") + mat = context.fetch_latest_materialization_for_upstream_asset("upstream") assert mat is not None assert mat.metadata["foo"].value == "bar" @@ -452,8 +457,80 @@ def upstream(): @asset def downstream(context: AssetExecutionContext, upstream): - mat = context.latest_materialization_for_upstream_asset("upstream") + mat = context.fetch_latest_materialization_for_upstream_asset("upstream") assert mat is not None assert mat.metadata["foo"].value == "bar" materialize([upstream, downstream]) + + +def test_upstream_metadata_with_partitions(): + partitions_def = DailyPartitionsDefinition(start_date="2024-01-01") + first_materialization_partition_key = "2024-01-01" + second_materialization_partition_key = "2024-01-02" + + @asset(partitions_def=partitions_def) + def upstream(context): + return MaterializeResult(metadata={"partition_key": context.partition_key}) + + @asset(partitions_def=partitions_def, deps=[upstream]) + def downstream(context: AssetExecutionContext): + mat = context.fetch_latest_materialization_for_upstream_asset("upstream") + assert mat is not None + assert mat.metadata["partition_key"].value == context.partition_key + + # upstream_asset_materialization_events has the latest AssetMaterialization and is not scoped to + # a specific partition + latest_mat = context._step_execution_context.upstream_asset_materialization_events.get( # noqa: SLF001 + AssetKey.from_coercible("upstream") + ) + assert latest_mat is not None + assert latest_mat.metadata["partition_key"].value != context.partition_key + assert latest_mat.metadata["partition_key"].value == second_materialization_partition_key + + with instance_for_test() as instance: + materialize( + [upstream], partition_key=first_materialization_partition_key, instance=instance + ) + + # the metadata from this materialization will be what's in upstream_asset_materialization_events + materialize( + [upstream], partition_key=second_materialization_partition_key, instance=instance + ) + + # asserts that fetch_latest_materialization_for_upstream_asset gets the AssetMaterialization + # scoped to the correct partition, not just the latest AssetMaterialization + materialize( + [downstream], partition_key=first_materialization_partition_key, instance=instance + ) + + +def test_upstream_metadata_with_partition_mapping(): + partitions_def = DailyPartitionsDefinition(start_date="2024-01-01") + + @asset(partitions_def=partitions_def) + def upstream(context): + return MaterializeResult(metadata={"partition_key": context.partition_key}) + + @asset( + partitions_def=partitions_def, + deps=[ + AssetDep(asset=upstream, partition_mapping=TimeWindowPartitionMapping(start_offset=-2)) + ], + ) + def downstream(context: AssetExecutionContext): + with pytest.raises(DagsterInvariantViolationError): + context.fetch_latest_materialization_for_upstream_asset("upstream") + + mat = context.fetch_latest_materialization_for_upstream_asset( + "upstream", partition_key="2024-01-01" + ) + assert mat is not None + assert mat.metadata["partition_key"].value == "2024-01-01" + + with instance_for_test() as instance: + materialize([upstream], partition_key="2024-01-01", instance=instance) + materialize([upstream], partition_key="2024-01-02", instance=instance) + materialize([upstream], partition_key="2024-01-03", instance=instance) + + materialize([downstream], partition_key="2024-01-03", instance=instance)