Skip to content

Commit

Permalink
Fix asset graph view functions
Browse files Browse the repository at this point in the history
  • Loading branch information
briantu committed Oct 22, 2024
1 parent 27c4088 commit d696688
Showing 1 changed file with 16 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -403,10 +403,11 @@ def _compute_run_in_progress_asset_subset(self, key: AssetKey) -> EntitySubset[A
partitions_def = self._get_partitions_def(key)
if partitions_def:
cache_value = AssetStatusCacheValue.blocking_get(self, (key, partitions_def))
if cache_value is None:
value = partitions_def.empty_subset()
else:
value = cache_value.deserialize_in_progress_partition_subsets(partitions_def)
return (
cache_value.get_in_progress_subset(self, key, partitions_def)
if cache_value
else self.get_empty_subset(key=key)
)
else:
value = self._queryer.get_in_progress_asset_subset(asset_key=key).value
return EntitySubset(self, key=key, value=_ValidatedEntitySubsetValue(value))
Expand All @@ -425,10 +426,11 @@ def _compute_execution_failed_asset_subset(self, key: AssetKey) -> EntitySubset[
partitions_def = self._get_partitions_def(key)
if partitions_def:
cache_value = AssetStatusCacheValue.blocking_get(self, (key, partitions_def))
if cache_value is None:
value = partitions_def.empty_subset()
else:
value = cache_value.deserialize_failed_partition_subsets(partitions_def)
return (
cache_value.get_failed_subset(self, key, partitions_def)
if cache_value
else self.get_empty_subset(key=key)
)
else:
value = self._queryer.get_failed_asset_subset(asset_key=key).value
return EntitySubset(self, key=key, value=_ValidatedEntitySubsetValue(value))
Expand All @@ -450,16 +452,16 @@ def _compute_missing_asset_subset(
partitions_def = self._get_partitions_def(key)
if partitions_def:
cache_value = AssetStatusCacheValue.blocking_get(self, (key, partitions_def))
return (
materialized_subset = (
cache_value.get_materialized_subset(self, key, partitions_def)
if cache_value
else self.get_empty_subset(key=key)
)

value = self._queryer.get_materialized_asset_subset(asset_key=key).value
materialized_subset = EntitySubset(
self, key=key, value=_ValidatedEntitySubsetValue(value)
)
else:
value = self._queryer.get_materialized_asset_subset(asset_key=key).value
materialized_subset = EntitySubset(
self, key=key, value=_ValidatedEntitySubsetValue(value)
)
return from_subset.compute_difference(materialized_subset)
else:
# more expensive call
Expand Down

0 comments on commit d696688

Please sign in to comment.