diff --git a/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py b/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py index 5aa4e870c72d3..2034dd5b0d9d4 100644 --- a/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py +++ b/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py @@ -32,7 +32,6 @@ from dagster._core.loader import LoadingContext from dagster._time import get_current_datetime from dagster._utils.aiodataloader import DataLoader -from dagster._utils.cached_method import cached_method if TYPE_CHECKING: from dagster._core.definitions.base_asset_graph import BaseAssetGraph, BaseAssetNode @@ -170,7 +169,6 @@ def _get_partitions_def(self, key: T_EntityKey) -> Optional["PartitionsDefinitio else: return None - @cached_method def get_full_subset(self, *, key: T_EntityKey) -> EntitySubset[T_EntityKey]: partitions_def = self._get_partitions_def(key) value = ( @@ -184,7 +182,6 @@ def get_full_subset(self, *, key: T_EntityKey) -> EntitySubset[T_EntityKey]: ) return EntitySubset(self, key=key, value=_ValidatedEntitySubsetValue(value)) - @cached_method def get_empty_subset(self, *, key: T_EntityKey) -> EntitySubset[T_EntityKey]: partitions_def = self._get_partitions_def(key) value = partitions_def.empty_subset() if partitions_def else False @@ -363,7 +360,7 @@ async def compute_subset_with_status( """Returns the subset of an asset check that matches a given status.""" from dagster._core.storage.event_log.base import AssetCheckSummaryRecord - summary = AssetCheckSummaryRecord.blocking_get(self, key) + summary = await AssetCheckSummaryRecord.gen(self, key) latest_record = summary.last_check_execution_record if summary else None resolved_status = ( latest_record.resolve_status(self) @@ -479,7 +476,6 @@ async def _compute_missing_asset_subset( key=key, asset_partitions=missing_asset_partitions ) - @cached_method async def compute_run_in_progress_subset(self, *, key: EntityKey) -> EntitySubset: return await _dispatch( key=key, @@ -487,7 +483,6 @@ async def compute_run_in_progress_subset(self, *, key: EntityKey) -> EntitySubse asset_method=self._compute_run_in_progress_asset_subset, ) - @cached_method async def compute_backfill_in_progress_subset(self, *, key: EntityKey) -> EntitySubset: async def get_empty_subset(key: EntityKey) -> EntitySubset: return self.get_empty_subset(key=key) @@ -499,7 +494,6 @@ async def get_empty_subset(key: EntityKey) -> EntitySubset: asset_method=self._compute_backfill_in_progress_asset_subset, ) - @cached_method async def compute_execution_failed_subset(self, *, key: EntityKey) -> EntitySubset: return await _dispatch( key=key, @@ -507,7 +501,6 @@ async def compute_execution_failed_subset(self, *, key: EntityKey) -> EntitySubs asset_method=self._compute_execution_failed_asset_subset, ) - @cached_method async def compute_missing_subset( self, *, key: EntityKey, from_subset: EntitySubset ) -> EntitySubset: @@ -534,7 +527,7 @@ async def _compute_updated_since_time_subset( from dagster._core.storage.event_log.base import AssetCheckSummaryRecord # intentionally left unimplemented for AssetKey, as this is a less performant query - summary = AssetCheckSummaryRecord.blocking_get(self, key) + summary = await AssetCheckSummaryRecord.gen(self, key) record = summary.last_check_execution_record if summary else None if ( record is None @@ -546,7 +539,6 @@ async def _compute_updated_since_time_subset( else: return self.get_full_subset(key=key) - @cached_method async def compute_updated_since_temporal_context_subset( self, *, key: EntityKey, temporal_context: TemporalContext ) -> EntitySubset: