diff --git a/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py b/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py index 17eaa0137c296..035bd1d4e4a18 100644 --- a/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py +++ b/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py @@ -29,6 +29,7 @@ get_time_partitions_def, ) from dagster._core.loader import LoadingContext +from dagster._core.storage.partition_status_cache import AssetStatusCacheValue from dagster._time import get_current_datetime from dagster._utils.aiodataloader import DataLoader from dagster._utils.cached_method import cached_method @@ -398,7 +399,15 @@ def _compute_missing_check_subset(self, key: AssetCheckKey) -> EntitySubset[Asse return self.compute_subset_with_status(key, None) def _compute_run_in_progress_asset_subset(self, key: AssetKey) -> EntitySubset[AssetKey]: - value = self._queryer.get_in_progress_asset_subset(asset_key=key).value + partitions_def = self._get_partitions_def(key) + if partitions_def: + cache_value = AssetStatusCacheValue.blocking_get(self, (key, partitions_def)) + if cache_value is None: + value = partitions_def.empty_subset() + else: + value = cache_value.deserialize_in_progress_partition_subsets(partitions_def) + else: + value = self._queryer.get_in_progress_asset_subset(asset_key=key).value return EntitySubset(self, key=key, value=_ValidatedEntitySubsetValue(value)) def _compute_backfill_in_progress_asset_subset(self, key: AssetKey) -> EntitySubset[AssetKey]: @@ -410,7 +419,15 @@ def _compute_backfill_in_progress_asset_subset(self, key: AssetKey) -> EntitySub return EntitySubset(self, key=key, value=_ValidatedEntitySubsetValue(value)) def _compute_execution_failed_asset_subset(self, key: AssetKey) -> EntitySubset[AssetKey]: - value = self._queryer.get_failed_asset_subset(asset_key=key).value + partitions_def = self._get_partitions_def(key) + if partitions_def: + cache_value = AssetStatusCacheValue.blocking_get(self, (key, partitions_def)) + if cache_value is None: + value = partitions_def.empty_subset() + else: + value = cache_value.deserialize_failed_partition_subsets(partitions_def) + else: + value = self._queryer.get_failed_asset_subset(asset_key=key).value return EntitySubset(self, key=key, value=_ValidatedEntitySubsetValue(value)) def _compute_missing_asset_subset( @@ -425,9 +442,17 @@ def _compute_missing_asset_subset( # of the materializability of the asset if self.asset_graph.get(key).is_materializable: # cheap call which takes advantage of the partition status cache - materialized_subset = self._queryer.get_materialized_asset_subset(asset_key=key) + partitions_def = self._get_partitions_def(key) + if partitions_def: + cache_value = AssetStatusCacheValue.blocking_get(self, (key, partitions_def)) + if cache_value is None: + value = partitions_def.empty_subset() + else: + value = cache_value.deserialize_materialized_partition_subsets(partitions_def) + else: + value = self._queryer.get_materialized_asset_subset(asset_key=key).value materialized_subset = EntitySubset( - self, key=key, value=_ValidatedEntitySubsetValue(materialized_subset.value) + self, key=key, value=_ValidatedEntitySubsetValue(value) ) return from_subset.compute_difference(materialized_subset) else: diff --git a/python_modules/dagster/dagster/_core/storage/partition_status_cache.py b/python_modules/dagster/dagster/_core/storage/partition_status_cache.py index 6626c6ce0ce24..aa527265eb620 100644 --- a/python_modules/dagster/dagster/_core/storage/partition_status_cache.py +++ b/python_modules/dagster/dagster/_core/storage/partition_status_cache.py @@ -1,5 +1,5 @@ from enum import Enum -from typing import TYPE_CHECKING, List, NamedTuple, Optional, Sequence, Set, Tuple +from typing import TYPE_CHECKING, Iterable, List, NamedTuple, Optional, Sequence, Set, Tuple from dagster import ( AssetKey, @@ -19,7 +19,7 @@ ) from dagster._core.definitions.time_window_partitions import TimeWindowPartitionsDefinition from dagster._core.instance import DynamicPartitionsStore -from dagster._core.loader import LoadingContext +from dagster._core.loader import InstanceLoadableBy, LoadingContext from dagster._core.storage.dagster_run import FINISHED_STATUSES, RunsFilter from dagster._core.storage.tags import ( MULTIDIMENSIONAL_PARTITION_PREFIX, @@ -79,7 +79,8 @@ class AssetStatusCacheValue( ("serialized_in_progress_partition_subset", Optional[str]), ("earliest_in_progress_materialization_event_id", Optional[int]), ], - ) + ), + InstanceLoadableBy[Tuple[AssetKey, PartitionsDefinition]], ): """Set of asset fields that reflect partition materialization status. This is used to display global partition status in the asset view. @@ -143,6 +144,13 @@ def from_db_string(db_string: str) -> Optional["AssetStatusCacheValue"]: return cached_data + @classmethod + def _blocking_batch_load( + cls, keys: Iterable[Tuple[AssetKey, PartitionsDefinition]], instance: "DagsterInstance" + ) -> Iterable[Optional["AssetStatusCacheValue"]]: + partition_defs_by_key = {key: partition_def for key, partition_def in keys} + return instance.event_log_storage.get_asset_status_cache_values(partition_defs_by_key) + def deserialize_materialized_partition_subsets( self, partitions_def: PartitionsDefinition ) -> PartitionsSubset: