Skip to content

Commit

Permalink
Add back @cached_method
Browse files Browse the repository at this point in the history
  • Loading branch information
briantu committed Oct 22, 2024
1 parent aeb91e9 commit 6963946
Showing 1 changed file with 8 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from dagster._core.loader import LoadingContext
from dagster._time import get_current_datetime
from dagster._utils.aiodataloader import DataLoader
from dagster._utils.cached_method import cached_method

if TYPE_CHECKING:
from dagster._core.definitions.base_asset_graph import BaseAssetGraph, BaseAssetNode
Expand Down Expand Up @@ -169,6 +170,7 @@ def _get_partitions_def(self, key: T_EntityKey) -> Optional["PartitionsDefinitio
else:
return None

@cached_method
def get_full_subset(self, *, key: T_EntityKey) -> EntitySubset[T_EntityKey]:
partitions_def = self._get_partitions_def(key)
value = (
Expand All @@ -182,6 +184,7 @@ def get_full_subset(self, *, key: T_EntityKey) -> EntitySubset[T_EntityKey]:
)
return EntitySubset(self, key=key, value=_ValidatedEntitySubsetValue(value))

@cached_method
def get_empty_subset(self, *, key: T_EntityKey) -> EntitySubset[T_EntityKey]:
partitions_def = self._get_partitions_def(key)
value = partitions_def.empty_subset() if partitions_def else False
Expand Down Expand Up @@ -476,13 +479,15 @@ async def _compute_missing_asset_subset(
key=key, asset_partitions=missing_asset_partitions
)

@cached_method
async def compute_run_in_progress_subset(self, *, key: EntityKey) -> EntitySubset:
return await _dispatch(
key=key,
check_method=self._compute_run_in_progress_check_subset,
asset_method=self._compute_run_in_progress_asset_subset,
)

@cached_method
async def compute_backfill_in_progress_subset(self, *, key: EntityKey) -> EntitySubset:
async def get_empty_subset(key: EntityKey) -> EntitySubset:
return self.get_empty_subset(key=key)
Expand All @@ -494,13 +499,15 @@ async def get_empty_subset(key: EntityKey) -> EntitySubset:
asset_method=self._compute_backfill_in_progress_asset_subset,
)

@cached_method
async def compute_execution_failed_subset(self, *, key: EntityKey) -> EntitySubset:
return await _dispatch(
key=key,
check_method=self._compute_execution_failed_check_subset,
asset_method=self._compute_execution_failed_asset_subset,
)

@cached_method
async def compute_missing_subset(
self, *, key: EntityKey, from_subset: EntitySubset
) -> EntitySubset:
Expand Down Expand Up @@ -539,6 +546,7 @@ async def _compute_updated_since_time_subset(
else:
return self.get_full_subset(key=key)

@cached_method
async def compute_updated_since_temporal_context_subset(
self, *, key: EntityKey, temporal_context: TemporalContext
) -> EntitySubset:
Expand Down

0 comments on commit 6963946

Please sign in to comment.