Skip to content

Commit

Permalink
Refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
briantu committed Oct 18, 2024
1 parent 6e0a67b commit f3263a7
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
get_time_partitions_def,
)
from dagster._core.loader import LoadingContext
from dagster._core.storage.partition_status_cache import AssetStatusCacheValue
from dagster._time import get_current_datetime
from dagster._utils.aiodataloader import DataLoader
from dagster._utils.cached_method import cached_method
Expand Down Expand Up @@ -399,6 +398,8 @@ def _compute_missing_check_subset(self, key: AssetCheckKey) -> EntitySubset[Asse
return self.compute_subset_with_status(key, None)

def _compute_run_in_progress_asset_subset(self, key: AssetKey) -> EntitySubset[AssetKey]:
from dagster._core.storage.partition_status_cache import AssetStatusCacheValue

partitions_def = self._get_partitions_def(key)
if partitions_def:
cache_value = AssetStatusCacheValue.blocking_get(self, (key, partitions_def))
Expand All @@ -419,6 +420,8 @@ def _compute_backfill_in_progress_asset_subset(self, key: AssetKey) -> EntitySub
return EntitySubset(self, key=key, value=_ValidatedEntitySubsetValue(value))

def _compute_execution_failed_asset_subset(self, key: AssetKey) -> EntitySubset[AssetKey]:
from dagster._core.storage.partition_status_cache import AssetStatusCacheValue

partitions_def = self._get_partitions_def(key)
if partitions_def:
cache_value = AssetStatusCacheValue.blocking_get(self, (key, partitions_def))
Expand All @@ -436,6 +439,8 @@ def _compute_missing_asset_subset(
"""Returns a subset which is the subset of the input subset that has never been materialized
(if it is a materializable asset) or observered (if it is an observable asset).
"""
from dagster._core.storage.partition_status_cache import AssetStatusCacheValue

# TODO: this logic should be simplified once we have a unified way of detecting both
# materializations and observations through the parittion status cache. at that point, the
# definition will slightly change to search for materializations and observations regardless
Expand All @@ -445,12 +450,13 @@ def _compute_missing_asset_subset(
partitions_def = self._get_partitions_def(key)
if partitions_def:
cache_value = AssetStatusCacheValue.blocking_get(self, (key, partitions_def))
if cache_value is None:
value = partitions_def.empty_subset()
else:
value = cache_value.deserialize_materialized_partition_subsets(partitions_def)
else:
value = self._queryer.get_materialized_asset_subset(asset_key=key).value
return (
cache_value.get_materialized_subset(self, key, partitions_def)
if cache_value
else self.get_empty_subset(key=key)
)

value = self._queryer.get_materialized_asset_subset(asset_key=key).value
materialized_subset = EntitySubset(
self, key=key, value=_ValidatedEntitySubsetValue(value)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
DagsterRunStatus,
_check as check,
)
from dagster._core.asset_graph_view.asset_graph_view import AssetGraphView
from dagster._core.asset_graph_view.entity_subset import EntitySubset, _ValidatedEntitySubsetValue
from dagster._core.definitions.multi_dimensional_partitions import (
MultiPartitionKey,
MultiPartitionsDefinition,
Expand Down Expand Up @@ -148,8 +150,7 @@ def from_db_string(db_string: str) -> Optional["AssetStatusCacheValue"]:
def _blocking_batch_load(
cls, keys: Iterable[Tuple[AssetKey, PartitionsDefinition]], instance: "DagsterInstance"
) -> Iterable[Optional["AssetStatusCacheValue"]]:
partition_defs_by_key = {key: partition_def for key, partition_def in keys}
return instance.event_log_storage.get_asset_status_cache_values(partition_defs_by_key)
return instance.event_log_storage.get_asset_status_cache_values(dict(keys))

def deserialize_materialized_partition_subsets(
self, partitions_def: PartitionsDefinition
Expand All @@ -175,6 +176,39 @@ def deserialize_in_progress_partition_subsets(

return partitions_def.deserialize_subset(self.serialized_in_progress_partition_subset)

def get_materialized_subset(
self,
asset_graph_view: AssetGraphView,
asset_key: AssetKey,
partitions_def: PartitionsDefinition,
) -> EntitySubset[AssetKey]:
value = self.deserialize_materialized_partition_subsets(partitions_def)
return EntitySubset(
asset_graph_view, key=asset_key, value=_ValidatedEntitySubsetValue(value)
)

def get_failed_subset(
self,
asset_graph_view: AssetGraphView,
asset_key: AssetKey,
partitions_def: PartitionsDefinition,
) -> EntitySubset[AssetKey]:
value = self.deserialize_failed_partition_subsets(partitions_def)
return EntitySubset(
asset_graph_view, key=asset_key, value=_ValidatedEntitySubsetValue(value)
)

def get_in_progress_subset(
self,
asset_graph_view: AssetGraphView,
asset_key: AssetKey,
partitions_def: PartitionsDefinition,
) -> EntitySubset[AssetKey]:
value = self.deserialize_in_progress_partition_subsets(partitions_def)
return EntitySubset(
asset_graph_view, key=asset_key, value=_ValidatedEntitySubsetValue(value)
)


def get_materialized_multipartitions(
instance: DagsterInstance, asset_key: AssetKey, partitions_def: MultiPartitionsDefinition
Expand Down

0 comments on commit f3263a7

Please sign in to comment.