diff --git a/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py b/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py index dad0453ddf700..d151f0d74903a 100644 --- a/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py +++ b/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py @@ -18,7 +18,6 @@ get_time_partitions_def, ) from dagster._core.loader import LoadingContext -from dagster._core.storage.partition_status_cache import AssetStatusCacheValue from dagster._time import get_current_datetime from dagster._utils.aiodataloader import DataLoader from dagster._utils.cached_method import cached_method @@ -350,6 +349,8 @@ def compute_missing_subset( """Returns a subset which is the subset of the input subset that has never been materialized (if it is a materializable asset) or observered (if it is an observable asset). """ + from dagster._core.storage.partition_status_cache import AssetStatusCacheValue + # TODO: this logic should be simplified once we have a unified way of detecting both # materializations and observations through the parittion status cache. at that point, the # definition will slightly change to search for materializations and observations regardless @@ -359,12 +360,13 @@ def compute_missing_subset( partitions_def = self._get_partitions_def(asset_key) if partitions_def: cache_value = AssetStatusCacheValue.blocking_get(self, (asset_key, partitions_def)) - if cache_value is None: - value = partitions_def.empty_subset() - else: - value = cache_value.deserialize_materialized_partition_subsets(partitions_def) - else: - value = self._queryer.get_materialized_asset_subset(asset_key=asset_key).value + return ( + cache_value.get_materialized_subset(self, asset_key, partitions_def) + if cache_value + else self.get_empty_subset(key=asset_key) + ) + + value = self._queryer.get_materialized_asset_subset(asset_key=asset_key).value materialized_subset = EntitySubset( self, key=asset_key, value=_ValidatedEntitySubsetValue(value) ) @@ -382,29 +384,35 @@ def compute_missing_subset( @cached_method def compute_in_progress_asset_subset(self, *, asset_key: AssetKey) -> EntitySubset[AssetKey]: + from dagster._core.storage.partition_status_cache import AssetStatusCacheValue + # part of in progress run partitions_def = self._get_partitions_def(asset_key) if partitions_def: cache_value = AssetStatusCacheValue.blocking_get(self, (asset_key, partitions_def)) - if cache_value is None: - value = partitions_def.empty_subset() - else: - value = cache_value.deserialize_in_progress_partition_subsets(partitions_def) - else: - value = self._queryer.get_in_progress_asset_subset(asset_key=asset_key).value + return ( + cache_value.get_in_progress_subset(self, asset_key, partitions_def) + if cache_value + else self.get_empty_subset(key=asset_key) + ) + + value = self._queryer.get_in_progress_asset_subset(asset_key=asset_key).value return EntitySubset(self, key=asset_key, value=_ValidatedEntitySubsetValue(value)) @cached_method def compute_failed_asset_subset(self, *, asset_key: "AssetKey") -> EntitySubset[AssetKey]: + from dagster._core.storage.partition_status_cache import AssetStatusCacheValue + partitions_def = self._get_partitions_def(asset_key) if partitions_def: cache_value = AssetStatusCacheValue.blocking_get(self, (asset_key, partitions_def)) - if cache_value is None: - value = partitions_def.empty_subset() - else: - value = cache_value.deserialize_failed_partition_subsets(partitions_def) - else: - value = self._queryer.get_failed_asset_subset(asset_key=asset_key).value + return ( + cache_value.get_failed_subset(self, asset_key, partitions_def) + if cache_value + else self.get_empty_subset(key=asset_key) + ) + + value = self._queryer.get_failed_asset_subset(asset_key=asset_key).value return EntitySubset(self, key=asset_key, value=_ValidatedEntitySubsetValue(value)) @cached_method diff --git a/python_modules/dagster/dagster/_core/storage/partition_status_cache.py b/python_modules/dagster/dagster/_core/storage/partition_status_cache.py index aa527265eb620..6e2c194e8a824 100644 --- a/python_modules/dagster/dagster/_core/storage/partition_status_cache.py +++ b/python_modules/dagster/dagster/_core/storage/partition_status_cache.py @@ -7,6 +7,8 @@ DagsterRunStatus, _check as check, ) +from dagster._core.asset_graph_view.asset_graph_view import AssetGraphView +from dagster._core.asset_graph_view.entity_subset import EntitySubset, _ValidatedEntitySubsetValue from dagster._core.definitions.multi_dimensional_partitions import ( MultiPartitionKey, MultiPartitionsDefinition, @@ -148,8 +150,7 @@ def from_db_string(db_string: str) -> Optional["AssetStatusCacheValue"]: def _blocking_batch_load( cls, keys: Iterable[Tuple[AssetKey, PartitionsDefinition]], instance: "DagsterInstance" ) -> Iterable[Optional["AssetStatusCacheValue"]]: - partition_defs_by_key = {key: partition_def for key, partition_def in keys} - return instance.event_log_storage.get_asset_status_cache_values(partition_defs_by_key) + return instance.event_log_storage.get_asset_status_cache_values(dict(keys)) def deserialize_materialized_partition_subsets( self, partitions_def: PartitionsDefinition @@ -175,6 +176,39 @@ def deserialize_in_progress_partition_subsets( return partitions_def.deserialize_subset(self.serialized_in_progress_partition_subset) + def get_materialized_subset( + self, + asset_graph_view: AssetGraphView, + asset_key: AssetKey, + partitions_def: PartitionsDefinition, + ) -> EntitySubset[AssetKey]: + value = self.deserialize_materialized_partition_subsets(partitions_def) + return EntitySubset( + asset_graph_view, key=asset_key, value=_ValidatedEntitySubsetValue(value) + ) + + def get_failed_subset( + self, + asset_graph_view: AssetGraphView, + asset_key: AssetKey, + partitions_def: PartitionsDefinition, + ) -> EntitySubset[AssetKey]: + value = self.deserialize_failed_partition_subsets(partitions_def) + return EntitySubset( + asset_graph_view, key=asset_key, value=_ValidatedEntitySubsetValue(value) + ) + + def get_in_progress_subset( + self, + asset_graph_view: AssetGraphView, + asset_key: AssetKey, + partitions_def: PartitionsDefinition, + ) -> EntitySubset[AssetKey]: + value = self.deserialize_in_progress_partition_subsets(partitions_def) + return EntitySubset( + asset_graph_view, key=asset_key, value=_ValidatedEntitySubsetValue(value) + ) + def get_materialized_multipartitions( instance: DagsterInstance, asset_key: AssetKey, partitions_def: MultiPartitionsDefinition diff --git a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/perf_tests/test_perf.py b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/perf_tests/test_perf.py index 00214aef6fba8..8e7d5ef90727c 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/perf_tests/test_perf.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/perf_tests/test_perf.py @@ -6,6 +6,8 @@ HourlyPartitionsDefinition, evaluate_automation_conditions, ) +from dagster._core.definitions.definitions_class import Definitions +from dagster._core.definitions.events import AssetMaterialization from dagster._core.instance import DagsterInstance from dagster_test.toys.auto_materializing.large_graph import AssetLayerConfig, build_assets @@ -26,7 +28,17 @@ def test_eager_perf() -> None: auto_materialize_policy=AutomationCondition.eager().as_auto_materialize_policy(), ) + defs = Definitions(assets=assets) + instance = DagsterInstance.ephemeral() + + for asset_key in defs.get_asset_graph().all_asset_keys: + instance.report_runless_asset_event( + AssetMaterialization( + asset_key=asset_key, partition=hourly_partitions_def.get_last_partition_key() + ) + ) + cursor = None start = time.time() for _ in range(2):