From 4fe9b3824a0ab528365bbd388c9f98a160e568c2 Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Thu, 18 Jan 2024 12:58:45 -0500 Subject: [PATCH 1/3] wip --- .../dagster/_core/execution/context/compute.py | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/python_modules/dagster/dagster/_core/execution/context/compute.py b/python_modules/dagster/dagster/_core/execution/context/compute.py index 792d061509c3b..8b2e67c6d7d68 100644 --- a/python_modules/dagster/dagster/_core/execution/context/compute.py +++ b/python_modules/dagster/dagster/_core/execution/context/compute.py @@ -4,6 +4,7 @@ from inspect import ( _empty as EmptyAnnotation, ) +import pstats from typing import ( AbstractSet, Any, @@ -1485,13 +1486,17 @@ def latest_materialization_for_upstream_asset( materialization_events = ( self.op_execution_context._step_execution_context.upstream_asset_materialization_events # noqa: SLF001 ) - if AssetKey.from_coercible(key) in materialization_events.keys(): - return materialization_events.get(AssetKey.from_coercible(key)) + if self.has_partition_key: + # if the asset is par + pass + else: + if AssetKey.from_coercible(key) in materialization_events.keys(): + return materialization_events.get(AssetKey.from_coercible(key)) - raise DagsterInvariantViolationError( - f"Cannot fetch AssetMaterialization for asset {key}. {key} must be an upstream dependency" - "in order to call latest_materialization_for_upstream_asset." - ) + raise DagsterInvariantViolationError( + f"Cannot fetch AssetMaterialization for asset {key}. {key} must be an upstream dependency" + "in order to call latest_materialization_for_upstream_asset." + ) ######## Deprecated methods From 3c574554c570c05dd72e33dde398f07591aa5c9d Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Thu, 18 Jan 2024 14:33:39 -0500 Subject: [PATCH 2/3] make it work for partitions? --- .../_core/execution/context/compute.py | 59 +++++++++++++++---- 1 file changed, 47 insertions(+), 12 deletions(-) diff --git a/python_modules/dagster/dagster/_core/execution/context/compute.py b/python_modules/dagster/dagster/_core/execution/context/compute.py index 8b2e67c6d7d68..f8bc59086ae59 100644 --- a/python_modules/dagster/dagster/_core/execution/context/compute.py +++ b/python_modules/dagster/dagster/_core/execution/context/compute.py @@ -4,7 +4,6 @@ from inspect import ( _empty as EmptyAnnotation, ) -import pstats from typing import ( AbstractSet, Any, @@ -1409,6 +1408,8 @@ def __init__(self, op_execution_context: OpExecutionContext) -> None: ) self._step_execution_context = self._op_execution_context._step_execution_context # noqa: SLF001 + self._step_execution_context = op_execution_context._step_execution_context # noqa: SLF001 + @staticmethod def get() -> "AssetExecutionContext": ctx = _current_asset_execution_context.get() @@ -1472,31 +1473,65 @@ def job_def(self) -> JobDefinition: return self.op_execution_context.job_def @public - def latest_materialization_for_upstream_asset( - self, key: CoercibleToAssetKey + def fetch_latest_materialization_for_upstream_asset( + self, key: CoercibleToAssetKey, partition_key: Optional[str] = None ) -> Optional[AssetMaterialization]: - """Get the most recent AssetMaterialization event for the key. The key must be an upstream + """Get the most recent AssetMaterialization event for the key. The key must be a direct upstream asset for the currently materializing asset. Information like metadata and tags can be found on the AssetMaterialization. If the key is not an upstream asset of the currently materializing asset, an error will be raised. If no AssetMaterialization exists for key, None will be returned. + For partitioned assets, fetching the latest AssetMaterialization for the partition requires a + call to the event log. Additionally, if the currently materializing asset depends on multiple + partitions of the upstream asset, ``partition_key`` must be provided so that the latest AssetMaterialization + is fetched for the correct partition. + Returns: Optional[AssetMaterialization] """ + asset_key = AssetKey.from_coercible(key) + + if ( + self.job_def.asset_layer.input_for_asset_key( + self.op_execution_context.node_handle, AssetKey.from_coercible(asset_key) + ) + is None + ): + raise DagsterInvariantViolationError( + f"Cannot fetch AssetMaterialization for asset {asset_key}. {asset_key} must be a direct upstream dependency" + "in order to call fetch_latest_materialization_for_upstream_asset." + ) + materialization_events = ( self.op_execution_context._step_execution_context.upstream_asset_materialization_events # noqa: SLF001 ) - if self.has_partition_key: - # if the asset is par - pass + if not self._step_execution_context.has_asset_partitions_for_upstream_asset(asset_key): + return materialization_events.get(asset_key) else: - if AssetKey.from_coercible(key) in materialization_events.keys(): - return materialization_events.get(AssetKey.from_coercible(key)) + # if the asset is partitioned, the AssetMaterialization in 'materialization_events' may not correspond to the + # latest AssetMaterialization of the particular partition that is relevant to this materialization. It will + # always be the latest AssetMaterialization regardless of partition. So we need to fetch the latest + # AssetMaterialization for the specific partition key that is relevant to this materialization + if partition_key is None: + partition_keys_for_asset = list( + self._step_execution_context.asset_partitions_subset_for_upstream_asset( + asset_key + ).get_partition_keys() + ) + if len(partition_keys_for_asset) > 1: + raise DagsterInvariantViolationError( + f"Asset {self.asset_key} depends on multiple partitions of asset {asset_key}." + " fetch_latest_materialization_for_upstream asset must be given" + " a specific partition_key to fetch the AssetMaterialization for." + ) + partition_key = partition_keys_for_asset[0] - raise DagsterInvariantViolationError( - f"Cannot fetch AssetMaterialization for asset {key}. {key} must be an upstream dependency" - "in order to call latest_materialization_for_upstream_asset." + event = self.instance.get_latest_data_version_record( + asset_key, partition_key=partition_key ) + if event is None: + return event + return event.asset_materialization if event.asset_materialization else None ######## Deprecated methods From 876c5ed6c2c3bae21d899d3d2fe56d04c8d81f11 Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Thu, 18 Jan 2024 14:51:41 -0500 Subject: [PATCH 3/3] testing --- .../execution_tests/test_context.py | 81 ++++++++++++++++++- 1 file changed, 79 insertions(+), 2 deletions(-) diff --git a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_context.py b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_context.py index fca48fecadf55..1c15456d103c1 100644 --- a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_context.py +++ b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_context.py @@ -4,18 +4,23 @@ import pytest from dagster import ( AssetCheckResult, + AssetDep, AssetExecutionContext, + AssetKey, AssetOut, DagsterInstance, + DailyPartitionsDefinition, Definitions, GraphDefinition, MaterializeResult, OpExecutionContext, Output, + TimeWindowPartitionMapping, asset, asset_check, graph_asset, graph_multi_asset, + instance_for_test, job, materialize, multi_asset, @@ -437,7 +442,7 @@ def upstream(context: AssetExecutionContext): @asset def downstream(context: AssetExecutionContext, upstream): - mat = context.latest_materialization_for_upstream_asset("upstream") + mat = context.fetch_latest_materialization_for_upstream_asset("upstream") assert mat is not None assert mat.metadata["foo"].value == "bar" @@ -452,8 +457,80 @@ def upstream(): @asset def downstream(context: AssetExecutionContext, upstream): - mat = context.latest_materialization_for_upstream_asset("upstream") + mat = context.fetch_latest_materialization_for_upstream_asset("upstream") assert mat is not None assert mat.metadata["foo"].value == "bar" materialize([upstream, downstream]) + + +def test_upstream_metadata_with_partitions(): + partitions_def = DailyPartitionsDefinition(start_date="2024-01-01") + first_materialization_partition_key = "2024-01-01" + second_materialization_partition_key = "2024-01-02" + + @asset(partitions_def=partitions_def) + def upstream(context): + return MaterializeResult(metadata={"partition_key": context.partition_key}) + + @asset(partitions_def=partitions_def, deps=[upstream]) + def downstream(context: AssetExecutionContext): + mat = context.fetch_latest_materialization_for_upstream_asset("upstream") + assert mat is not None + assert mat.metadata["partition_key"].value == context.partition_key + + # upstream_asset_materialization_events has the latest AssetMaterialization and is not scoped to + # a specific partition + latest_mat = context._step_execution_context.upstream_asset_materialization_events.get( # noqa: SLF001 + AssetKey.from_coercible("upstream") + ) + assert latest_mat is not None + assert latest_mat.metadata["partition_key"].value != context.partition_key + assert latest_mat.metadata["partition_key"].value == second_materialization_partition_key + + with instance_for_test() as instance: + materialize( + [upstream], partition_key=first_materialization_partition_key, instance=instance + ) + + # the metadata from this materialization will be what's in upstream_asset_materialization_events + materialize( + [upstream], partition_key=second_materialization_partition_key, instance=instance + ) + + # asserts that fetch_latest_materialization_for_upstream_asset gets the AssetMaterialization + # scoped to the correct partition, not just the latest AssetMaterialization + materialize( + [downstream], partition_key=first_materialization_partition_key, instance=instance + ) + + +def test_upstream_metadata_with_partition_mapping(): + partitions_def = DailyPartitionsDefinition(start_date="2024-01-01") + + @asset(partitions_def=partitions_def) + def upstream(context): + return MaterializeResult(metadata={"partition_key": context.partition_key}) + + @asset( + partitions_def=partitions_def, + deps=[ + AssetDep(asset=upstream, partition_mapping=TimeWindowPartitionMapping(start_offset=-2)) + ], + ) + def downstream(context: AssetExecutionContext): + with pytest.raises(DagsterInvariantViolationError): + context.fetch_latest_materialization_for_upstream_asset("upstream") + + mat = context.fetch_latest_materialization_for_upstream_asset( + "upstream", partition_key="2024-01-01" + ) + assert mat is not None + assert mat.metadata["partition_key"].value == "2024-01-01" + + with instance_for_test() as instance: + materialize([upstream], partition_key="2024-01-01", instance=instance) + materialize([upstream], partition_key="2024-01-02", instance=instance) + materialize([upstream], partition_key="2024-01-03", instance=instance) + + materialize([downstream], partition_key="2024-01-03", instance=instance)