From d69668882f8142a0dceaa66eb7b6c02f47013273 Mon Sep 17 00:00:00 2001 From: briantu Date: Mon, 21 Oct 2024 17:52:51 -0400 Subject: [PATCH] Fix asset graph view functions --- .../asset_graph_view/asset_graph_view.py | 30 ++++++++++--------- 1 file changed, 16 insertions(+), 14 deletions(-) diff --git a/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py b/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py index 186d6535cada3..49de5bb2275fd 100644 --- a/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py +++ b/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py @@ -403,10 +403,11 @@ def _compute_run_in_progress_asset_subset(self, key: AssetKey) -> EntitySubset[A partitions_def = self._get_partitions_def(key) if partitions_def: cache_value = AssetStatusCacheValue.blocking_get(self, (key, partitions_def)) - if cache_value is None: - value = partitions_def.empty_subset() - else: - value = cache_value.deserialize_in_progress_partition_subsets(partitions_def) + return ( + cache_value.get_in_progress_subset(self, key, partitions_def) + if cache_value + else self.get_empty_subset(key=key) + ) else: value = self._queryer.get_in_progress_asset_subset(asset_key=key).value return EntitySubset(self, key=key, value=_ValidatedEntitySubsetValue(value)) @@ -425,10 +426,11 @@ def _compute_execution_failed_asset_subset(self, key: AssetKey) -> EntitySubset[ partitions_def = self._get_partitions_def(key) if partitions_def: cache_value = AssetStatusCacheValue.blocking_get(self, (key, partitions_def)) - if cache_value is None: - value = partitions_def.empty_subset() - else: - value = cache_value.deserialize_failed_partition_subsets(partitions_def) + return ( + cache_value.get_failed_subset(self, key, partitions_def) + if cache_value + else self.get_empty_subset(key=key) + ) else: value = self._queryer.get_failed_asset_subset(asset_key=key).value return EntitySubset(self, key=key, value=_ValidatedEntitySubsetValue(value)) @@ -450,16 +452,16 @@ def _compute_missing_asset_subset( partitions_def = self._get_partitions_def(key) if partitions_def: cache_value = AssetStatusCacheValue.blocking_get(self, (key, partitions_def)) - return ( + materialized_subset = ( cache_value.get_materialized_subset(self, key, partitions_def) if cache_value else self.get_empty_subset(key=key) ) - - value = self._queryer.get_materialized_asset_subset(asset_key=key).value - materialized_subset = EntitySubset( - self, key=key, value=_ValidatedEntitySubsetValue(value) - ) + else: + value = self._queryer.get_materialized_asset_subset(asset_key=key).value + materialized_subset = EntitySubset( + self, key=key, value=_ValidatedEntitySubsetValue(value) + ) return from_subset.compute_difference(materialized_subset) else: # more expensive call