From ac6877e6b42e20bf2ff0d26ec6b73f034b9cd637 Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Fri, 19 Jan 2024 15:12:43 -0500 Subject: [PATCH] move storing AM to when it is accurate --- .../dagster/dagster/_core/execution/context/system.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/python_modules/dagster/dagster/_core/execution/context/system.py b/python_modules/dagster/dagster/_core/execution/context/system.py index df5c1d385dd17..85b67cf0d00cf 100644 --- a/python_modules/dagster/dagster/_core/execution/context/system.py +++ b/python_modules/dagster/dagster/_core/execution/context/system.py @@ -1001,9 +1001,6 @@ def _fetch_input_asset_materialization_and_version_info(self, key: AssetKey) -> self._input_asset_version_info[key] = None self._upstream_asset_materialization_events[key] = None else: - self._upstream_asset_materialization_events[key] = ( - event.asset_materialization if event.asset_materialization else None - ) storage_id = event.storage_id # Input name will be none if this is an internal dep input_name = self.job_def.asset_layer.input_for_asset_key(self.node_handle, key) @@ -1030,6 +1027,12 @@ def _fetch_input_asset_materialization_and_version_info(self, key: AssetKey) -> data_version = extract_data_version_from_entry(event.event_log_entry) else: data_version = extract_data_version_from_entry(event.event_log_entry) + # the AssetMaterialization fetched above is only accurate if the asset it not partitioned + # if the asset is partitioned, then the latest AssetMaterialization may be for a partition + # that is irrelevant to the current execution + self._upstream_asset_materialization_events[key] = ( + event.asset_materialization if event.asset_materialization else None + ) self._input_asset_version_info[key] = InputAssetVersionInfo( storage_id, data_version, event.run_id, event.timestamp )