diff --git a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_context.py b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_context.py index fca48fecadf55..1c15456d103c1 100644 --- a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_context.py +++ b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_context.py @@ -4,18 +4,23 @@ import pytest from dagster import ( AssetCheckResult, + AssetDep, AssetExecutionContext, + AssetKey, AssetOut, DagsterInstance, + DailyPartitionsDefinition, Definitions, GraphDefinition, MaterializeResult, OpExecutionContext, Output, + TimeWindowPartitionMapping, asset, asset_check, graph_asset, graph_multi_asset, + instance_for_test, job, materialize, multi_asset, @@ -437,7 +442,7 @@ def upstream(context: AssetExecutionContext): @asset def downstream(context: AssetExecutionContext, upstream): - mat = context.latest_materialization_for_upstream_asset("upstream") + mat = context.fetch_latest_materialization_for_upstream_asset("upstream") assert mat is not None assert mat.metadata["foo"].value == "bar" @@ -452,8 +457,80 @@ def upstream(): @asset def downstream(context: AssetExecutionContext, upstream): - mat = context.latest_materialization_for_upstream_asset("upstream") + mat = context.fetch_latest_materialization_for_upstream_asset("upstream") assert mat is not None assert mat.metadata["foo"].value == "bar" materialize([upstream, downstream]) + + +def test_upstream_metadata_with_partitions(): + partitions_def = DailyPartitionsDefinition(start_date="2024-01-01") + first_materialization_partition_key = "2024-01-01" + second_materialization_partition_key = "2024-01-02" + + @asset(partitions_def=partitions_def) + def upstream(context): + return MaterializeResult(metadata={"partition_key": context.partition_key}) + + @asset(partitions_def=partitions_def, deps=[upstream]) + def downstream(context: AssetExecutionContext): + mat = context.fetch_latest_materialization_for_upstream_asset("upstream") + assert mat is not None + assert mat.metadata["partition_key"].value == context.partition_key + + # upstream_asset_materialization_events has the latest AssetMaterialization and is not scoped to + # a specific partition + latest_mat = context._step_execution_context.upstream_asset_materialization_events.get( # noqa: SLF001 + AssetKey.from_coercible("upstream") + ) + assert latest_mat is not None + assert latest_mat.metadata["partition_key"].value != context.partition_key + assert latest_mat.metadata["partition_key"].value == second_materialization_partition_key + + with instance_for_test() as instance: + materialize( + [upstream], partition_key=first_materialization_partition_key, instance=instance + ) + + # the metadata from this materialization will be what's in upstream_asset_materialization_events + materialize( + [upstream], partition_key=second_materialization_partition_key, instance=instance + ) + + # asserts that fetch_latest_materialization_for_upstream_asset gets the AssetMaterialization + # scoped to the correct partition, not just the latest AssetMaterialization + materialize( + [downstream], partition_key=first_materialization_partition_key, instance=instance + ) + + +def test_upstream_metadata_with_partition_mapping(): + partitions_def = DailyPartitionsDefinition(start_date="2024-01-01") + + @asset(partitions_def=partitions_def) + def upstream(context): + return MaterializeResult(metadata={"partition_key": context.partition_key}) + + @asset( + partitions_def=partitions_def, + deps=[ + AssetDep(asset=upstream, partition_mapping=TimeWindowPartitionMapping(start_offset=-2)) + ], + ) + def downstream(context: AssetExecutionContext): + with pytest.raises(DagsterInvariantViolationError): + context.fetch_latest_materialization_for_upstream_asset("upstream") + + mat = context.fetch_latest_materialization_for_upstream_asset( + "upstream", partition_key="2024-01-01" + ) + assert mat is not None + assert mat.metadata["partition_key"].value == "2024-01-01" + + with instance_for_test() as instance: + materialize([upstream], partition_key="2024-01-01", instance=instance) + materialize([upstream], partition_key="2024-01-02", instance=instance) + materialize([upstream], partition_key="2024-01-03", instance=instance) + + materialize([downstream], partition_key="2024-01-03", instance=instance)