From 3e097fd0886756ed7590b09c69d8999e0aae6965 Mon Sep 17 00:00:00 2001 From: briantu Date: Thu, 10 Oct 2024 16:55:28 -0400 Subject: [PATCH 1/3] Make AssetStatusCacheValue inherit InstanceLoadableBy --- .../asset_graph_view/asset_graph_view.py | 33 ++++++++++++++++--- .../_core/storage/partition_status_cache.py | 14 ++++++-- 2 files changed, 40 insertions(+), 7 deletions(-) diff --git a/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py b/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py index 2d39362614a07..559742fe21c39 100644 --- a/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py +++ b/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py @@ -29,6 +29,7 @@ get_time_partitions_def, ) from dagster._core.loader import LoadingContext +from dagster._core.storage.partition_status_cache import AssetStatusCacheValue from dagster._time import get_current_datetime from dagster._utils.aiodataloader import DataLoader from dagster._utils.cached_method import cached_method @@ -398,7 +399,15 @@ def _compute_missing_check_subset(self, key: AssetCheckKey) -> EntitySubset[Asse return self.compute_subset_with_status(key, None) def _compute_run_in_progress_asset_subset(self, key: AssetKey) -> EntitySubset[AssetKey]: - value = self._queryer.get_in_progress_asset_subset(asset_key=key).value + partitions_def = self._get_partitions_def(key) + if partitions_def: + cache_value = AssetStatusCacheValue.blocking_get(self, (key, partitions_def)) + if cache_value is None: + value = partitions_def.empty_subset() + else: + value = cache_value.deserialize_in_progress_partition_subsets(partitions_def) + else: + value = self._queryer.get_in_progress_asset_subset(asset_key=key).value return EntitySubset(self, key=key, value=_ValidatedEntitySubsetValue(value)) def _compute_backfill_in_progress_asset_subset(self, key: AssetKey) -> EntitySubset[AssetKey]: @@ -410,7 +419,15 @@ def _compute_backfill_in_progress_asset_subset(self, key: AssetKey) -> EntitySub return EntitySubset(self, key=key, value=_ValidatedEntitySubsetValue(value)) def _compute_execution_failed_asset_subset(self, key: AssetKey) -> EntitySubset[AssetKey]: - value = self._queryer.get_failed_asset_subset(asset_key=key).value + partitions_def = self._get_partitions_def(key) + if partitions_def: + cache_value = AssetStatusCacheValue.blocking_get(self, (key, partitions_def)) + if cache_value is None: + value = partitions_def.empty_subset() + else: + value = cache_value.deserialize_failed_partition_subsets(partitions_def) + else: + value = self._queryer.get_failed_asset_subset(asset_key=key).value return EntitySubset(self, key=key, value=_ValidatedEntitySubsetValue(value)) def _compute_missing_asset_subset( @@ -425,9 +442,17 @@ def _compute_missing_asset_subset( # of the materializability of the asset if self.asset_graph.get(key).is_materializable: # cheap call which takes advantage of the partition status cache - materialized_subset = self._queryer.get_materialized_asset_subset(asset_key=key) + partitions_def = self._get_partitions_def(key) + if partitions_def: + cache_value = AssetStatusCacheValue.blocking_get(self, (key, partitions_def)) + if cache_value is None: + value = partitions_def.empty_subset() + else: + value = cache_value.deserialize_materialized_partition_subsets(partitions_def) + else: + value = self._queryer.get_materialized_asset_subset(asset_key=key).value materialized_subset = EntitySubset( - self, key=key, value=_ValidatedEntitySubsetValue(materialized_subset.value) + self, key=key, value=_ValidatedEntitySubsetValue(value) ) return from_subset.compute_difference(materialized_subset) else: diff --git a/python_modules/dagster/dagster/_core/storage/partition_status_cache.py b/python_modules/dagster/dagster/_core/storage/partition_status_cache.py index 6626c6ce0ce24..aa527265eb620 100644 --- a/python_modules/dagster/dagster/_core/storage/partition_status_cache.py +++ b/python_modules/dagster/dagster/_core/storage/partition_status_cache.py @@ -1,5 +1,5 @@ from enum import Enum -from typing import TYPE_CHECKING, List, NamedTuple, Optional, Sequence, Set, Tuple +from typing import TYPE_CHECKING, Iterable, List, NamedTuple, Optional, Sequence, Set, Tuple from dagster import ( AssetKey, @@ -19,7 +19,7 @@ ) from dagster._core.definitions.time_window_partitions import TimeWindowPartitionsDefinition from dagster._core.instance import DynamicPartitionsStore -from dagster._core.loader import LoadingContext +from dagster._core.loader import InstanceLoadableBy, LoadingContext from dagster._core.storage.dagster_run import FINISHED_STATUSES, RunsFilter from dagster._core.storage.tags import ( MULTIDIMENSIONAL_PARTITION_PREFIX, @@ -79,7 +79,8 @@ class AssetStatusCacheValue( ("serialized_in_progress_partition_subset", Optional[str]), ("earliest_in_progress_materialization_event_id", Optional[int]), ], - ) + ), + InstanceLoadableBy[Tuple[AssetKey, PartitionsDefinition]], ): """Set of asset fields that reflect partition materialization status. This is used to display global partition status in the asset view. @@ -143,6 +144,13 @@ def from_db_string(db_string: str) -> Optional["AssetStatusCacheValue"]: return cached_data + @classmethod + def _blocking_batch_load( + cls, keys: Iterable[Tuple[AssetKey, PartitionsDefinition]], instance: "DagsterInstance" + ) -> Iterable[Optional["AssetStatusCacheValue"]]: + partition_defs_by_key = {key: partition_def for key, partition_def in keys} + return instance.event_log_storage.get_asset_status_cache_values(partition_defs_by_key) + def deserialize_materialized_partition_subsets( self, partitions_def: PartitionsDefinition ) -> PartitionsSubset: From a2803c3269ed509be7af82d0e2859a2baa257035 Mon Sep 17 00:00:00 2001 From: briantu Date: Thu, 10 Oct 2024 17:51:47 -0400 Subject: [PATCH 2/3] Refactor --- .../asset_graph_view/asset_graph_view.py | 20 ++++++---- .../_core/storage/partition_status_cache.py | 38 ++++++++++++++++++- 2 files changed, 49 insertions(+), 9 deletions(-) diff --git a/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py b/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py index 559742fe21c39..186d6535cada3 100644 --- a/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py +++ b/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py @@ -29,7 +29,6 @@ get_time_partitions_def, ) from dagster._core.loader import LoadingContext -from dagster._core.storage.partition_status_cache import AssetStatusCacheValue from dagster._time import get_current_datetime from dagster._utils.aiodataloader import DataLoader from dagster._utils.cached_method import cached_method @@ -399,6 +398,8 @@ def _compute_missing_check_subset(self, key: AssetCheckKey) -> EntitySubset[Asse return self.compute_subset_with_status(key, None) def _compute_run_in_progress_asset_subset(self, key: AssetKey) -> EntitySubset[AssetKey]: + from dagster._core.storage.partition_status_cache import AssetStatusCacheValue + partitions_def = self._get_partitions_def(key) if partitions_def: cache_value = AssetStatusCacheValue.blocking_get(self, (key, partitions_def)) @@ -419,6 +420,8 @@ def _compute_backfill_in_progress_asset_subset(self, key: AssetKey) -> EntitySub return EntitySubset(self, key=key, value=_ValidatedEntitySubsetValue(value)) def _compute_execution_failed_asset_subset(self, key: AssetKey) -> EntitySubset[AssetKey]: + from dagster._core.storage.partition_status_cache import AssetStatusCacheValue + partitions_def = self._get_partitions_def(key) if partitions_def: cache_value = AssetStatusCacheValue.blocking_get(self, (key, partitions_def)) @@ -436,6 +439,8 @@ def _compute_missing_asset_subset( """Returns a subset which is the subset of the input subset that has never been materialized (if it is a materializable asset) or observered (if it is an observable asset). """ + from dagster._core.storage.partition_status_cache import AssetStatusCacheValue + # TODO: this logic should be simplified once we have a unified way of detecting both # materializations and observations through the parittion status cache. at that point, the # definition will slightly change to search for materializations and observations regardless @@ -445,12 +450,13 @@ def _compute_missing_asset_subset( partitions_def = self._get_partitions_def(key) if partitions_def: cache_value = AssetStatusCacheValue.blocking_get(self, (key, partitions_def)) - if cache_value is None: - value = partitions_def.empty_subset() - else: - value = cache_value.deserialize_materialized_partition_subsets(partitions_def) - else: - value = self._queryer.get_materialized_asset_subset(asset_key=key).value + return ( + cache_value.get_materialized_subset(self, key, partitions_def) + if cache_value + else self.get_empty_subset(key=key) + ) + + value = self._queryer.get_materialized_asset_subset(asset_key=key).value materialized_subset = EntitySubset( self, key=key, value=_ValidatedEntitySubsetValue(value) ) diff --git a/python_modules/dagster/dagster/_core/storage/partition_status_cache.py b/python_modules/dagster/dagster/_core/storage/partition_status_cache.py index aa527265eb620..6e2c194e8a824 100644 --- a/python_modules/dagster/dagster/_core/storage/partition_status_cache.py +++ b/python_modules/dagster/dagster/_core/storage/partition_status_cache.py @@ -7,6 +7,8 @@ DagsterRunStatus, _check as check, ) +from dagster._core.asset_graph_view.asset_graph_view import AssetGraphView +from dagster._core.asset_graph_view.entity_subset import EntitySubset, _ValidatedEntitySubsetValue from dagster._core.definitions.multi_dimensional_partitions import ( MultiPartitionKey, MultiPartitionsDefinition, @@ -148,8 +150,7 @@ def from_db_string(db_string: str) -> Optional["AssetStatusCacheValue"]: def _blocking_batch_load( cls, keys: Iterable[Tuple[AssetKey, PartitionsDefinition]], instance: "DagsterInstance" ) -> Iterable[Optional["AssetStatusCacheValue"]]: - partition_defs_by_key = {key: partition_def for key, partition_def in keys} - return instance.event_log_storage.get_asset_status_cache_values(partition_defs_by_key) + return instance.event_log_storage.get_asset_status_cache_values(dict(keys)) def deserialize_materialized_partition_subsets( self, partitions_def: PartitionsDefinition @@ -175,6 +176,39 @@ def deserialize_in_progress_partition_subsets( return partitions_def.deserialize_subset(self.serialized_in_progress_partition_subset) + def get_materialized_subset( + self, + asset_graph_view: AssetGraphView, + asset_key: AssetKey, + partitions_def: PartitionsDefinition, + ) -> EntitySubset[AssetKey]: + value = self.deserialize_materialized_partition_subsets(partitions_def) + return EntitySubset( + asset_graph_view, key=asset_key, value=_ValidatedEntitySubsetValue(value) + ) + + def get_failed_subset( + self, + asset_graph_view: AssetGraphView, + asset_key: AssetKey, + partitions_def: PartitionsDefinition, + ) -> EntitySubset[AssetKey]: + value = self.deserialize_failed_partition_subsets(partitions_def) + return EntitySubset( + asset_graph_view, key=asset_key, value=_ValidatedEntitySubsetValue(value) + ) + + def get_in_progress_subset( + self, + asset_graph_view: AssetGraphView, + asset_key: AssetKey, + partitions_def: PartitionsDefinition, + ) -> EntitySubset[AssetKey]: + value = self.deserialize_in_progress_partition_subsets(partitions_def) + return EntitySubset( + asset_graph_view, key=asset_key, value=_ValidatedEntitySubsetValue(value) + ) + def get_materialized_multipartitions( instance: DagsterInstance, asset_key: AssetKey, partitions_def: MultiPartitionsDefinition From 253a599952bd8148fe596d01bcc5551fafb96937 Mon Sep 17 00:00:00 2001 From: briantu Date: Mon, 21 Oct 2024 17:52:51 -0400 Subject: [PATCH 3/3] Fix asset graph view functions --- .../asset_graph_view/asset_graph_view.py | 30 ++++++++++--------- 1 file changed, 16 insertions(+), 14 deletions(-) diff --git a/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py b/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py index 186d6535cada3..49de5bb2275fd 100644 --- a/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py +++ b/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py @@ -403,10 +403,11 @@ def _compute_run_in_progress_asset_subset(self, key: AssetKey) -> EntitySubset[A partitions_def = self._get_partitions_def(key) if partitions_def: cache_value = AssetStatusCacheValue.blocking_get(self, (key, partitions_def)) - if cache_value is None: - value = partitions_def.empty_subset() - else: - value = cache_value.deserialize_in_progress_partition_subsets(partitions_def) + return ( + cache_value.get_in_progress_subset(self, key, partitions_def) + if cache_value + else self.get_empty_subset(key=key) + ) else: value = self._queryer.get_in_progress_asset_subset(asset_key=key).value return EntitySubset(self, key=key, value=_ValidatedEntitySubsetValue(value)) @@ -425,10 +426,11 @@ def _compute_execution_failed_asset_subset(self, key: AssetKey) -> EntitySubset[ partitions_def = self._get_partitions_def(key) if partitions_def: cache_value = AssetStatusCacheValue.blocking_get(self, (key, partitions_def)) - if cache_value is None: - value = partitions_def.empty_subset() - else: - value = cache_value.deserialize_failed_partition_subsets(partitions_def) + return ( + cache_value.get_failed_subset(self, key, partitions_def) + if cache_value + else self.get_empty_subset(key=key) + ) else: value = self._queryer.get_failed_asset_subset(asset_key=key).value return EntitySubset(self, key=key, value=_ValidatedEntitySubsetValue(value)) @@ -450,16 +452,16 @@ def _compute_missing_asset_subset( partitions_def = self._get_partitions_def(key) if partitions_def: cache_value = AssetStatusCacheValue.blocking_get(self, (key, partitions_def)) - return ( + materialized_subset = ( cache_value.get_materialized_subset(self, key, partitions_def) if cache_value else self.get_empty_subset(key=key) ) - - value = self._queryer.get_materialized_asset_subset(asset_key=key).value - materialized_subset = EntitySubset( - self, key=key, value=_ValidatedEntitySubsetValue(value) - ) + else: + value = self._queryer.get_materialized_asset_subset(asset_key=key).value + materialized_subset = EntitySubset( + self, key=key, value=_ValidatedEntitySubsetValue(value) + ) return from_subset.compute_difference(materialized_subset) else: # more expensive call