Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make AssetStatusCacheValue inherit InstanceLoadableBy #25200

Merged
merged 3 commits into from
Oct 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,18 @@ def _compute_missing_check_subset(self, key: AssetCheckKey) -> EntitySubset[Asse
return self.compute_subset_with_status(key, None)

def _compute_run_in_progress_asset_subset(self, key: AssetKey) -> EntitySubset[AssetKey]:
value = self._queryer.get_in_progress_asset_subset(asset_key=key).value
from dagster._core.storage.partition_status_cache import AssetStatusCacheValue

partitions_def = self._get_partitions_def(key)
if partitions_def:
cache_value = AssetStatusCacheValue.blocking_get(self, (key, partitions_def))
return (
cache_value.get_in_progress_subset(self, key, partitions_def)
if cache_value
else self.get_empty_subset(key=key)
)
else:
value = self._queryer.get_in_progress_asset_subset(asset_key=key).value
return EntitySubset(self, key=key, value=_ValidatedEntitySubsetValue(value))

def _compute_backfill_in_progress_asset_subset(self, key: AssetKey) -> EntitySubset[AssetKey]:
Expand All @@ -410,7 +421,18 @@ def _compute_backfill_in_progress_asset_subset(self, key: AssetKey) -> EntitySub
return EntitySubset(self, key=key, value=_ValidatedEntitySubsetValue(value))

def _compute_execution_failed_asset_subset(self, key: AssetKey) -> EntitySubset[AssetKey]:
value = self._queryer.get_failed_asset_subset(asset_key=key).value
from dagster._core.storage.partition_status_cache import AssetStatusCacheValue

partitions_def = self._get_partitions_def(key)
if partitions_def:
cache_value = AssetStatusCacheValue.blocking_get(self, (key, partitions_def))
return (
cache_value.get_failed_subset(self, key, partitions_def)
if cache_value
else self.get_empty_subset(key=key)
)
else:
value = self._queryer.get_failed_asset_subset(asset_key=key).value
return EntitySubset(self, key=key, value=_ValidatedEntitySubsetValue(value))

def _compute_missing_asset_subset(
Expand All @@ -419,16 +441,27 @@ def _compute_missing_asset_subset(
"""Returns a subset which is the subset of the input subset that has never been materialized
(if it is a materializable asset) or observered (if it is an observable asset).
"""
from dagster._core.storage.partition_status_cache import AssetStatusCacheValue

# TODO: this logic should be simplified once we have a unified way of detecting both
# materializations and observations through the parittion status cache. at that point, the
# definition will slightly change to search for materializations and observations regardless
# of the materializability of the asset
if self.asset_graph.get(key).is_materializable:
# cheap call which takes advantage of the partition status cache
materialized_subset = self._queryer.get_materialized_asset_subset(asset_key=key)
materialized_subset = EntitySubset(
self, key=key, value=_ValidatedEntitySubsetValue(materialized_subset.value)
)
partitions_def = self._get_partitions_def(key)
if partitions_def:
cache_value = AssetStatusCacheValue.blocking_get(self, (key, partitions_def))
materialized_subset = (
cache_value.get_materialized_subset(self, key, partitions_def)
if cache_value
else self.get_empty_subset(key=key)
)
else:
value = self._queryer.get_materialized_asset_subset(asset_key=key).value
materialized_subset = EntitySubset(
self, key=key, value=_ValidatedEntitySubsetValue(value)
)
return from_subset.compute_difference(materialized_subset)
else:
# more expensive call
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
from enum import Enum
from typing import TYPE_CHECKING, List, NamedTuple, Optional, Sequence, Set, Tuple
from typing import TYPE_CHECKING, Iterable, List, NamedTuple, Optional, Sequence, Set, Tuple

from dagster import (
AssetKey,
DagsterInstance,
DagsterRunStatus,
_check as check,
)
from dagster._core.asset_graph_view.asset_graph_view import AssetGraphView
from dagster._core.asset_graph_view.entity_subset import EntitySubset, _ValidatedEntitySubsetValue
from dagster._core.definitions.multi_dimensional_partitions import (
MultiPartitionKey,
MultiPartitionsDefinition,
Expand All @@ -19,7 +21,7 @@
)
from dagster._core.definitions.time_window_partitions import TimeWindowPartitionsDefinition
from dagster._core.instance import DynamicPartitionsStore
from dagster._core.loader import LoadingContext
from dagster._core.loader import InstanceLoadableBy, LoadingContext
from dagster._core.storage.dagster_run import FINISHED_STATUSES, RunsFilter
from dagster._core.storage.tags import (
MULTIDIMENSIONAL_PARTITION_PREFIX,
Expand Down Expand Up @@ -79,7 +81,8 @@ class AssetStatusCacheValue(
("serialized_in_progress_partition_subset", Optional[str]),
("earliest_in_progress_materialization_event_id", Optional[int]),
],
)
),
InstanceLoadableBy[Tuple[AssetKey, PartitionsDefinition]],
):
"""Set of asset fields that reflect partition materialization status. This is used to display
global partition status in the asset view.
Expand Down Expand Up @@ -143,6 +146,12 @@ def from_db_string(db_string: str) -> Optional["AssetStatusCacheValue"]:

return cached_data

@classmethod
def _blocking_batch_load(
cls, keys: Iterable[Tuple[AssetKey, PartitionsDefinition]], instance: "DagsterInstance"
) -> Iterable[Optional["AssetStatusCacheValue"]]:
return instance.event_log_storage.get_asset_status_cache_values(dict(keys))

def deserialize_materialized_partition_subsets(
self, partitions_def: PartitionsDefinition
) -> PartitionsSubset:
Expand All @@ -167,6 +176,39 @@ def deserialize_in_progress_partition_subsets(

return partitions_def.deserialize_subset(self.serialized_in_progress_partition_subset)

def get_materialized_subset(
self,
asset_graph_view: AssetGraphView,
asset_key: AssetKey,
partitions_def: PartitionsDefinition,
) -> EntitySubset[AssetKey]:
value = self.deserialize_materialized_partition_subsets(partitions_def)
return EntitySubset(
asset_graph_view, key=asset_key, value=_ValidatedEntitySubsetValue(value)
)

def get_failed_subset(
self,
asset_graph_view: AssetGraphView,
asset_key: AssetKey,
partitions_def: PartitionsDefinition,
) -> EntitySubset[AssetKey]:
value = self.deserialize_failed_partition_subsets(partitions_def)
return EntitySubset(
asset_graph_view, key=asset_key, value=_ValidatedEntitySubsetValue(value)
)

def get_in_progress_subset(
self,
asset_graph_view: AssetGraphView,
asset_key: AssetKey,
partitions_def: PartitionsDefinition,
) -> EntitySubset[AssetKey]:
value = self.deserialize_in_progress_partition_subsets(partitions_def)
return EntitySubset(
asset_graph_view, key=asset_key, value=_ValidatedEntitySubsetValue(value)
)


def get_materialized_multipartitions(
instance: DagsterInstance, asset_key: AssetKey, partitions_def: MultiPartitionsDefinition
Expand Down