diff --git a/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py b/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py index dfa57a188767c..a832b9202546e 100644 --- a/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py +++ b/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py @@ -29,7 +29,6 @@ get_time_partitions_def, ) from dagster._core.loader import LoadingContext -from dagster._core.storage.partition_status_cache import AssetStatusCacheValue from dagster._time import get_current_datetime from dagster._utils.aiodataloader import DataLoader from dagster._utils.cached_method import cached_method @@ -398,6 +397,8 @@ def _compute_missing_check_subset(self, key: AssetCheckKey) -> EntitySubset[Asse return self.compute_subset_with_status(key, None) def _compute_run_in_progress_asset_subset(self, key: AssetKey) -> EntitySubset[AssetKey]: + from dagster._core.storage.partition_status_cache import AssetStatusCacheValue + partitions_def = self._get_partitions_def(key) if partitions_def: cache_value = AssetStatusCacheValue.blocking_get(self, (key, partitions_def)) @@ -418,6 +419,8 @@ def _compute_backfill_in_progress_asset_subset(self, key: AssetKey) -> EntitySub return EntitySubset(self, key=key, value=_ValidatedEntitySubsetValue(value)) def _compute_execution_failed_asset_subset(self, key: AssetKey) -> EntitySubset[AssetKey]: + from dagster._core.storage.partition_status_cache import AssetStatusCacheValue + partitions_def = self._get_partitions_def(key) if partitions_def: cache_value = AssetStatusCacheValue.blocking_get(self, (key, partitions_def)) @@ -435,6 +438,8 @@ def _compute_missing_asset_subset( """Returns a subset which is the subset of the input subset that has never been materialized (if it is a materializable asset) or observered (if it is an observable asset). """ + from dagster._core.storage.partition_status_cache import AssetStatusCacheValue + # TODO: this logic should be simplified once we have a unified way of detecting both # materializations and observations through the parittion status cache. at that point, the # definition will slightly change to search for materializations and observations regardless @@ -444,12 +449,13 @@ def _compute_missing_asset_subset( partitions_def = self._get_partitions_def(key) if partitions_def: cache_value = AssetStatusCacheValue.blocking_get(self, (key, partitions_def)) - if cache_value is None: - value = partitions_def.empty_subset() - else: - value = cache_value.deserialize_materialized_partition_subsets(partitions_def) - else: - value = self._queryer.get_materialized_asset_subset(asset_key=key).value + return ( + cache_value.get_materialized_subset(self, key, partitions_def) + if cache_value + else self.get_empty_subset(key=key) + ) + + value = self._queryer.get_materialized_asset_subset(asset_key=key).value materialized_subset = EntitySubset( self, key=key, value=_ValidatedEntitySubsetValue(value) ) diff --git a/python_modules/dagster/dagster/_core/storage/partition_status_cache.py b/python_modules/dagster/dagster/_core/storage/partition_status_cache.py index aa527265eb620..6e2c194e8a824 100644 --- a/python_modules/dagster/dagster/_core/storage/partition_status_cache.py +++ b/python_modules/dagster/dagster/_core/storage/partition_status_cache.py @@ -7,6 +7,8 @@ DagsterRunStatus, _check as check, ) +from dagster._core.asset_graph_view.asset_graph_view import AssetGraphView +from dagster._core.asset_graph_view.entity_subset import EntitySubset, _ValidatedEntitySubsetValue from dagster._core.definitions.multi_dimensional_partitions import ( MultiPartitionKey, MultiPartitionsDefinition, @@ -148,8 +150,7 @@ def from_db_string(db_string: str) -> Optional["AssetStatusCacheValue"]: def _blocking_batch_load( cls, keys: Iterable[Tuple[AssetKey, PartitionsDefinition]], instance: "DagsterInstance" ) -> Iterable[Optional["AssetStatusCacheValue"]]: - partition_defs_by_key = {key: partition_def for key, partition_def in keys} - return instance.event_log_storage.get_asset_status_cache_values(partition_defs_by_key) + return instance.event_log_storage.get_asset_status_cache_values(dict(keys)) def deserialize_materialized_partition_subsets( self, partitions_def: PartitionsDefinition @@ -175,6 +176,39 @@ def deserialize_in_progress_partition_subsets( return partitions_def.deserialize_subset(self.serialized_in_progress_partition_subset) + def get_materialized_subset( + self, + asset_graph_view: AssetGraphView, + asset_key: AssetKey, + partitions_def: PartitionsDefinition, + ) -> EntitySubset[AssetKey]: + value = self.deserialize_materialized_partition_subsets(partitions_def) + return EntitySubset( + asset_graph_view, key=asset_key, value=_ValidatedEntitySubsetValue(value) + ) + + def get_failed_subset( + self, + asset_graph_view: AssetGraphView, + asset_key: AssetKey, + partitions_def: PartitionsDefinition, + ) -> EntitySubset[AssetKey]: + value = self.deserialize_failed_partition_subsets(partitions_def) + return EntitySubset( + asset_graph_view, key=asset_key, value=_ValidatedEntitySubsetValue(value) + ) + + def get_in_progress_subset( + self, + asset_graph_view: AssetGraphView, + asset_key: AssetKey, + partitions_def: PartitionsDefinition, + ) -> EntitySubset[AssetKey]: + value = self.deserialize_in_progress_partition_subsets(partitions_def) + return EntitySubset( + asset_graph_view, key=asset_key, value=_ValidatedEntitySubsetValue(value) + ) + def get_materialized_multipartitions( instance: DagsterInstance, asset_key: AssetKey, partitions_def: MultiPartitionsDefinition