Skip to content

Commit

Permalink
move storing AM to when it is accurate
Browse files Browse the repository at this point in the history
  • Loading branch information
jamiedemaria committed Jan 29, 2024
1 parent db6b1e4 commit 19af7e7
Showing 1 changed file with 6 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1001,9 +1001,6 @@ def _fetch_input_asset_materialization_and_version_info(self, key: AssetKey) ->
self._input_asset_version_info[key] = None
self._upstream_asset_materialization_events[key] = None
else:
self._upstream_asset_materialization_events[key] = (
event.asset_materialization if event.asset_materialization else None
)
storage_id = event.storage_id
# Input name will be none if this is an internal dep
input_name = self.job_def.asset_layer.input_for_asset_key(self.node_handle, key)
Expand All @@ -1030,6 +1027,12 @@ def _fetch_input_asset_materialization_and_version_info(self, key: AssetKey) ->
data_version = extract_data_version_from_entry(event.event_log_entry)
else:
data_version = extract_data_version_from_entry(event.event_log_entry)
# the AssetMaterialization fetched above is only accurate if the asset it not partitioned
# if the asset is partitioned, then the latest AssetMaterialization may be for a partition
# that is irrelevant to the current execution
self._upstream_asset_materialization_events[key] = (
event.asset_materialization if event.asset_materialization else None
)
self._input_asset_version_info[key] = InputAssetVersionInfo(
storage_id, data_version, event.run_id, event.timestamp
)
Expand Down

0 comments on commit 19af7e7

Please sign in to comment.