Skip to content

Commit

Permalink
Remove @cached_method
Browse files Browse the repository at this point in the history
  • Loading branch information
briantu committed Oct 21, 2024
1 parent 1dba5a0 commit 920a158
Showing 1 changed file with 2 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
from dagster._core.loader import LoadingContext
from dagster._time import get_current_datetime
from dagster._utils.aiodataloader import DataLoader
from dagster._utils.cached_method import cached_method

if TYPE_CHECKING:
from dagster._core.definitions.base_asset_graph import BaseAssetGraph, BaseAssetNode
Expand Down Expand Up @@ -170,7 +169,6 @@ def _get_partitions_def(self, key: T_EntityKey) -> Optional["PartitionsDefinitio
else:
return None

@cached_method
def get_full_subset(self, *, key: T_EntityKey) -> EntitySubset[T_EntityKey]:
partitions_def = self._get_partitions_def(key)
value = (
Expand All @@ -184,7 +182,6 @@ def get_full_subset(self, *, key: T_EntityKey) -> EntitySubset[T_EntityKey]:
)
return EntitySubset(self, key=key, value=_ValidatedEntitySubsetValue(value))

@cached_method
def get_empty_subset(self, *, key: T_EntityKey) -> EntitySubset[T_EntityKey]:
partitions_def = self._get_partitions_def(key)
value = partitions_def.empty_subset() if partitions_def else False
Expand Down Expand Up @@ -363,7 +360,7 @@ async def compute_subset_with_status(
"""Returns the subset of an asset check that matches a given status."""
from dagster._core.storage.event_log.base import AssetCheckSummaryRecord

summary = AssetCheckSummaryRecord.blocking_get(self, key)
summary = await AssetCheckSummaryRecord.gen(self, key)
latest_record = summary.last_check_execution_record if summary else None
resolved_status = (
latest_record.resolve_status(self)
Expand Down Expand Up @@ -479,15 +476,13 @@ async def _compute_missing_asset_subset(
key=key, asset_partitions=missing_asset_partitions
)

@cached_method
async def compute_run_in_progress_subset(self, *, key: EntityKey) -> EntitySubset:
return await _dispatch(
key=key,
check_method=self._compute_run_in_progress_check_subset,
asset_method=self._compute_run_in_progress_asset_subset,
)

@cached_method
async def compute_backfill_in_progress_subset(self, *, key: EntityKey) -> EntitySubset:
async def get_empty_subset(key: EntityKey) -> EntitySubset:
return self.get_empty_subset(key=key)
Expand All @@ -499,15 +494,13 @@ async def get_empty_subset(key: EntityKey) -> EntitySubset:
asset_method=self._compute_backfill_in_progress_asset_subset,
)

@cached_method
async def compute_execution_failed_subset(self, *, key: EntityKey) -> EntitySubset:
return await _dispatch(
key=key,
check_method=self._compute_execution_failed_check_subset,
asset_method=self._compute_execution_failed_asset_subset,
)

@cached_method
async def compute_missing_subset(
self, *, key: EntityKey, from_subset: EntitySubset
) -> EntitySubset:
Expand All @@ -534,7 +527,7 @@ async def _compute_updated_since_time_subset(
from dagster._core.storage.event_log.base import AssetCheckSummaryRecord

# intentionally left unimplemented for AssetKey, as this is a less performant query
summary = AssetCheckSummaryRecord.blocking_get(self, key)
summary = await AssetCheckSummaryRecord.gen(self, key)
record = summary.last_check_execution_record if summary else None
if (
record is None
Expand All @@ -546,7 +539,6 @@ async def _compute_updated_since_time_subset(
else:
return self.get_full_subset(key=key)

@cached_method
async def compute_updated_since_temporal_context_subset(
self, *, key: EntityKey, temporal_context: TemporalContext
) -> EntitySubset:
Expand Down

0 comments on commit 920a158

Please sign in to comment.