diff --git a/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py b/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py index 2d39362614a07..49de5bb2275fd 100644 --- a/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py +++ b/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py @@ -398,7 +398,18 @@ def _compute_missing_check_subset(self, key: AssetCheckKey) -> EntitySubset[Asse return self.compute_subset_with_status(key, None) def _compute_run_in_progress_asset_subset(self, key: AssetKey) -> EntitySubset[AssetKey]: - value = self._queryer.get_in_progress_asset_subset(asset_key=key).value + from dagster._core.storage.partition_status_cache import AssetStatusCacheValue + + partitions_def = self._get_partitions_def(key) + if partitions_def: + cache_value = AssetStatusCacheValue.blocking_get(self, (key, partitions_def)) + return ( + cache_value.get_in_progress_subset(self, key, partitions_def) + if cache_value + else self.get_empty_subset(key=key) + ) + else: + value = self._queryer.get_in_progress_asset_subset(asset_key=key).value return EntitySubset(self, key=key, value=_ValidatedEntitySubsetValue(value)) def _compute_backfill_in_progress_asset_subset(self, key: AssetKey) -> EntitySubset[AssetKey]: @@ -410,7 +421,18 @@ def _compute_backfill_in_progress_asset_subset(self, key: AssetKey) -> EntitySub return EntitySubset(self, key=key, value=_ValidatedEntitySubsetValue(value)) def _compute_execution_failed_asset_subset(self, key: AssetKey) -> EntitySubset[AssetKey]: - value = self._queryer.get_failed_asset_subset(asset_key=key).value + from dagster._core.storage.partition_status_cache import AssetStatusCacheValue + + partitions_def = self._get_partitions_def(key) + if partitions_def: + cache_value = AssetStatusCacheValue.blocking_get(self, (key, partitions_def)) + return ( + cache_value.get_failed_subset(self, key, partitions_def) + if cache_value + else self.get_empty_subset(key=key) + ) + else: + value = self._queryer.get_failed_asset_subset(asset_key=key).value return EntitySubset(self, key=key, value=_ValidatedEntitySubsetValue(value)) def _compute_missing_asset_subset( @@ -419,16 +441,27 @@ def _compute_missing_asset_subset( """Returns a subset which is the subset of the input subset that has never been materialized (if it is a materializable asset) or observered (if it is an observable asset). """ + from dagster._core.storage.partition_status_cache import AssetStatusCacheValue + # TODO: this logic should be simplified once we have a unified way of detecting both # materializations and observations through the parittion status cache. at that point, the # definition will slightly change to search for materializations and observations regardless # of the materializability of the asset if self.asset_graph.get(key).is_materializable: # cheap call which takes advantage of the partition status cache - materialized_subset = self._queryer.get_materialized_asset_subset(asset_key=key) - materialized_subset = EntitySubset( - self, key=key, value=_ValidatedEntitySubsetValue(materialized_subset.value) - ) + partitions_def = self._get_partitions_def(key) + if partitions_def: + cache_value = AssetStatusCacheValue.blocking_get(self, (key, partitions_def)) + materialized_subset = ( + cache_value.get_materialized_subset(self, key, partitions_def) + if cache_value + else self.get_empty_subset(key=key) + ) + else: + value = self._queryer.get_materialized_asset_subset(asset_key=key).value + materialized_subset = EntitySubset( + self, key=key, value=_ValidatedEntitySubsetValue(value) + ) return from_subset.compute_difference(materialized_subset) else: # more expensive call diff --git a/python_modules/dagster/dagster/_core/storage/partition_status_cache.py b/python_modules/dagster/dagster/_core/storage/partition_status_cache.py index 6626c6ce0ce24..6e2c194e8a824 100644 --- a/python_modules/dagster/dagster/_core/storage/partition_status_cache.py +++ b/python_modules/dagster/dagster/_core/storage/partition_status_cache.py @@ -1,5 +1,5 @@ from enum import Enum -from typing import TYPE_CHECKING, List, NamedTuple, Optional, Sequence, Set, Tuple +from typing import TYPE_CHECKING, Iterable, List, NamedTuple, Optional, Sequence, Set, Tuple from dagster import ( AssetKey, @@ -7,6 +7,8 @@ DagsterRunStatus, _check as check, ) +from dagster._core.asset_graph_view.asset_graph_view import AssetGraphView +from dagster._core.asset_graph_view.entity_subset import EntitySubset, _ValidatedEntitySubsetValue from dagster._core.definitions.multi_dimensional_partitions import ( MultiPartitionKey, MultiPartitionsDefinition, @@ -19,7 +21,7 @@ ) from dagster._core.definitions.time_window_partitions import TimeWindowPartitionsDefinition from dagster._core.instance import DynamicPartitionsStore -from dagster._core.loader import LoadingContext +from dagster._core.loader import InstanceLoadableBy, LoadingContext from dagster._core.storage.dagster_run import FINISHED_STATUSES, RunsFilter from dagster._core.storage.tags import ( MULTIDIMENSIONAL_PARTITION_PREFIX, @@ -79,7 +81,8 @@ class AssetStatusCacheValue( ("serialized_in_progress_partition_subset", Optional[str]), ("earliest_in_progress_materialization_event_id", Optional[int]), ], - ) + ), + InstanceLoadableBy[Tuple[AssetKey, PartitionsDefinition]], ): """Set of asset fields that reflect partition materialization status. This is used to display global partition status in the asset view. @@ -143,6 +146,12 @@ def from_db_string(db_string: str) -> Optional["AssetStatusCacheValue"]: return cached_data + @classmethod + def _blocking_batch_load( + cls, keys: Iterable[Tuple[AssetKey, PartitionsDefinition]], instance: "DagsterInstance" + ) -> Iterable[Optional["AssetStatusCacheValue"]]: + return instance.event_log_storage.get_asset_status_cache_values(dict(keys)) + def deserialize_materialized_partition_subsets( self, partitions_def: PartitionsDefinition ) -> PartitionsSubset: @@ -167,6 +176,39 @@ def deserialize_in_progress_partition_subsets( return partitions_def.deserialize_subset(self.serialized_in_progress_partition_subset) + def get_materialized_subset( + self, + asset_graph_view: AssetGraphView, + asset_key: AssetKey, + partitions_def: PartitionsDefinition, + ) -> EntitySubset[AssetKey]: + value = self.deserialize_materialized_partition_subsets(partitions_def) + return EntitySubset( + asset_graph_view, key=asset_key, value=_ValidatedEntitySubsetValue(value) + ) + + def get_failed_subset( + self, + asset_graph_view: AssetGraphView, + asset_key: AssetKey, + partitions_def: PartitionsDefinition, + ) -> EntitySubset[AssetKey]: + value = self.deserialize_failed_partition_subsets(partitions_def) + return EntitySubset( + asset_graph_view, key=asset_key, value=_ValidatedEntitySubsetValue(value) + ) + + def get_in_progress_subset( + self, + asset_graph_view: AssetGraphView, + asset_key: AssetKey, + partitions_def: PartitionsDefinition, + ) -> EntitySubset[AssetKey]: + value = self.deserialize_in_progress_partition_subsets(partitions_def) + return EntitySubset( + asset_graph_view, key=asset_key, value=_ValidatedEntitySubsetValue(value) + ) + def get_materialized_multipartitions( instance: DagsterInstance, asset_key: AssetKey, partitions_def: MultiPartitionsDefinition