From 69639460c5bd7a031c0b51fbf1dddd09a5ecea63 Mon Sep 17 00:00:00 2001 From: briantu Date: Mon, 21 Oct 2024 18:03:53 -0400 Subject: [PATCH] Add back @cached_method --- .../dagster/_core/asset_graph_view/asset_graph_view.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py b/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py index 2034dd5b0d9d4..c718a47c0c99b 100644 --- a/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py +++ b/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py @@ -32,6 +32,7 @@ from dagster._core.loader import LoadingContext from dagster._time import get_current_datetime from dagster._utils.aiodataloader import DataLoader +from dagster._utils.cached_method import cached_method if TYPE_CHECKING: from dagster._core.definitions.base_asset_graph import BaseAssetGraph, BaseAssetNode @@ -169,6 +170,7 @@ def _get_partitions_def(self, key: T_EntityKey) -> Optional["PartitionsDefinitio else: return None + @cached_method def get_full_subset(self, *, key: T_EntityKey) -> EntitySubset[T_EntityKey]: partitions_def = self._get_partitions_def(key) value = ( @@ -182,6 +184,7 @@ def get_full_subset(self, *, key: T_EntityKey) -> EntitySubset[T_EntityKey]: ) return EntitySubset(self, key=key, value=_ValidatedEntitySubsetValue(value)) + @cached_method def get_empty_subset(self, *, key: T_EntityKey) -> EntitySubset[T_EntityKey]: partitions_def = self._get_partitions_def(key) value = partitions_def.empty_subset() if partitions_def else False @@ -476,6 +479,7 @@ async def _compute_missing_asset_subset( key=key, asset_partitions=missing_asset_partitions ) + @cached_method async def compute_run_in_progress_subset(self, *, key: EntityKey) -> EntitySubset: return await _dispatch( key=key, @@ -483,6 +487,7 @@ async def compute_run_in_progress_subset(self, *, key: EntityKey) -> EntitySubse asset_method=self._compute_run_in_progress_asset_subset, ) + @cached_method async def compute_backfill_in_progress_subset(self, *, key: EntityKey) -> EntitySubset: async def get_empty_subset(key: EntityKey) -> EntitySubset: return self.get_empty_subset(key=key) @@ -494,6 +499,7 @@ async def get_empty_subset(key: EntityKey) -> EntitySubset: asset_method=self._compute_backfill_in_progress_asset_subset, ) + @cached_method async def compute_execution_failed_subset(self, *, key: EntityKey) -> EntitySubset: return await _dispatch( key=key, @@ -501,6 +507,7 @@ async def compute_execution_failed_subset(self, *, key: EntityKey) -> EntitySubs asset_method=self._compute_execution_failed_asset_subset, ) + @cached_method async def compute_missing_subset( self, *, key: EntityKey, from_subset: EntitySubset ) -> EntitySubset: @@ -539,6 +546,7 @@ async def _compute_updated_since_time_subset( else: return self.get_full_subset(key=key) + @cached_method async def compute_updated_since_temporal_context_subset( self, *, key: EntityKey, temporal_context: TemporalContext ) -> EntitySubset: