Skip to content

Commit

Permalink
Make AssetStatusCacheValue inherit InstanceLoadableBy
Browse files Browse the repository at this point in the history
  • Loading branch information
briantu committed Oct 21, 2024
1 parent 30253d1 commit 9223183
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
get_time_partitions_def,
)
from dagster._core.loader import LoadingContext
from dagster._core.storage.partition_status_cache import AssetStatusCacheValue
from dagster._time import get_current_datetime
from dagster._utils.aiodataloader import DataLoader
from dagster._utils.cached_method import cached_method
Expand Down Expand Up @@ -398,7 +399,15 @@ def _compute_missing_check_subset(self, key: AssetCheckKey) -> EntitySubset[Asse
return self.compute_subset_with_status(key, None)

def _compute_run_in_progress_asset_subset(self, key: AssetKey) -> EntitySubset[AssetKey]:
value = self._queryer.get_in_progress_asset_subset(asset_key=key).value
partitions_def = self._get_partitions_def(key)
if partitions_def:
cache_value = AssetStatusCacheValue.blocking_get(self, (key, partitions_def))
if cache_value is None:
value = partitions_def.empty_subset()
else:
value = cache_value.deserialize_in_progress_partition_subsets(partitions_def)
else:
value = self._queryer.get_in_progress_asset_subset(asset_key=key).value
return EntitySubset(self, key=key, value=_ValidatedEntitySubsetValue(value))

def _compute_backfill_in_progress_asset_subset(self, key: AssetKey) -> EntitySubset[AssetKey]:
Expand All @@ -410,7 +419,15 @@ def _compute_backfill_in_progress_asset_subset(self, key: AssetKey) -> EntitySub
return EntitySubset(self, key=key, value=_ValidatedEntitySubsetValue(value))

def _compute_execution_failed_asset_subset(self, key: AssetKey) -> EntitySubset[AssetKey]:
value = self._queryer.get_failed_asset_subset(asset_key=key).value
partitions_def = self._get_partitions_def(key)
if partitions_def:
cache_value = AssetStatusCacheValue.blocking_get(self, (key, partitions_def))
if cache_value is None:
value = partitions_def.empty_subset()
else:
value = cache_value.deserialize_failed_partition_subsets(partitions_def)
else:
value = self._queryer.get_failed_asset_subset(asset_key=key).value
return EntitySubset(self, key=key, value=_ValidatedEntitySubsetValue(value))

def _compute_missing_asset_subset(
Expand All @@ -425,9 +442,17 @@ def _compute_missing_asset_subset(
# of the materializability of the asset
if self.asset_graph.get(key).is_materializable:
# cheap call which takes advantage of the partition status cache
materialized_subset = self._queryer.get_materialized_asset_subset(asset_key=key)
partitions_def = self._get_partitions_def(key)
if partitions_def:
cache_value = AssetStatusCacheValue.blocking_get(self, (key, partitions_def))
if cache_value is None:
value = partitions_def.empty_subset()
else:
value = cache_value.deserialize_materialized_partition_subsets(partitions_def)
else:
value = self._queryer.get_materialized_asset_subset(asset_key=key).value
materialized_subset = EntitySubset(
self, key=key, value=_ValidatedEntitySubsetValue(materialized_subset.value)
self, key=key, value=_ValidatedEntitySubsetValue(value)
)
return from_subset.compute_difference(materialized_subset)
else:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from enum import Enum
from typing import TYPE_CHECKING, List, NamedTuple, Optional, Sequence, Set, Tuple
from typing import TYPE_CHECKING, Iterable, List, NamedTuple, Optional, Sequence, Set, Tuple

from dagster import (
AssetKey,
Expand All @@ -19,7 +19,7 @@
)
from dagster._core.definitions.time_window_partitions import TimeWindowPartitionsDefinition
from dagster._core.instance import DynamicPartitionsStore
from dagster._core.loader import LoadingContext
from dagster._core.loader import InstanceLoadableBy, LoadingContext
from dagster._core.storage.dagster_run import FINISHED_STATUSES, RunsFilter
from dagster._core.storage.tags import (
MULTIDIMENSIONAL_PARTITION_PREFIX,
Expand Down Expand Up @@ -79,7 +79,8 @@ class AssetStatusCacheValue(
("serialized_in_progress_partition_subset", Optional[str]),
("earliest_in_progress_materialization_event_id", Optional[int]),
],
)
),
InstanceLoadableBy[Tuple[AssetKey, PartitionsDefinition]],
):
"""Set of asset fields that reflect partition materialization status. This is used to display
global partition status in the asset view.
Expand Down Expand Up @@ -143,6 +144,13 @@ def from_db_string(db_string: str) -> Optional["AssetStatusCacheValue"]:

return cached_data

@classmethod
def _blocking_batch_load(
cls, keys: Iterable[Tuple[AssetKey, PartitionsDefinition]], instance: "DagsterInstance"
) -> Iterable[Optional["AssetStatusCacheValue"]]:
partition_defs_by_key = {key: partition_def for key, partition_def in keys}
return instance.event_log_storage.get_asset_status_cache_values(partition_defs_by_key)

def deserialize_materialized_partition_subsets(
self, partitions_def: PartitionsDefinition
) -> PartitionsSubset:
Expand Down

0 comments on commit 9223183

Please sign in to comment.