From a2431c3258b668c322e783edf7ad2bdd34e3fe76 Mon Sep 17 00:00:00 2001 From: Brian Tu Date: Tue, 22 Oct 2024 18:27:46 -0400 Subject: [PATCH] Make compute asset/check subset functions async (#25264) ## Summary & Motivation We now want to make the compute asset and asset check subset functions on `AssetGraphView` async so we can use `AssetStatusCacheValue.gen()` instead of `AssetStatusCacheValue.blocking_get()`. Same for `AssetCheckExecutionRecord`. ## How I Tested These Changes Existing tests should pass --- .../dagster_graphql/schema/asset_checks.py | 2 +- .../asset_graph_view/asset_graph_view.py | 117 ++++++++++-------- .../operands/operands.py | 28 ++--- .../operands/subset_automation_condition.py | 7 +- .../storage/asset_check_execution_record.py | 8 +- 5 files changed, 89 insertions(+), 73 deletions(-) diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/asset_checks.py b/python_modules/dagster-graphql/dagster_graphql/schema/asset_checks.py index 20ae2de0a6b30..fef57e244d621 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/asset_checks.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/asset_checks.py @@ -158,7 +158,7 @@ async def resolve_executionForLatestMaterialization( record = await AssetCheckExecutionRecord.gen(graphene_info.context, self._asset_check.key) return ( GrapheneAssetCheckExecution(record) - if record and record.targets_latest_materialization(graphene_info.context) + if record and await record.targets_latest_materialization(graphene_info.context) else None ) diff --git a/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py b/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py index 49de5bb2275fd..955fb805695df 100644 --- a/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py +++ b/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py @@ -3,6 +3,7 @@ from typing import ( TYPE_CHECKING, AbstractSet, + Awaitable, Callable, Dict, Literal, @@ -356,17 +357,17 @@ def compute_latest_time_window_subset( else: check.failed(f"Unsupported partitions_def: {partitions_def}") - def compute_subset_with_status( + async def compute_subset_with_status( self, key: AssetCheckKey, status: Optional["AssetCheckExecutionResolvedStatus"] ): - """Returns the subset of an asset check that matches a given status.""" from dagster._core.storage.event_log.base import AssetCheckSummaryRecord - summary = AssetCheckSummaryRecord.blocking_get(self, key) + """Returns the subset of an asset check that matches a given status.""" + summary = await AssetCheckSummaryRecord.gen(self, key) latest_record = summary.last_check_execution_record if summary else None resolved_status = ( latest_record.resolve_status(self) - if latest_record and latest_record.targets_latest_materialization(self) + if latest_record and await latest_record.targets_latest_materialization(self) else None ) if resolved_status == status: @@ -374,45 +375,50 @@ def compute_subset_with_status( else: return self.get_empty_subset(key=key) - def _compute_run_in_progress_check_subset( + async def _compute_run_in_progress_check_subset( self, key: AssetCheckKey ) -> EntitySubset[AssetCheckKey]: from dagster._core.storage.asset_check_execution_record import ( AssetCheckExecutionResolvedStatus, ) - return self.compute_subset_with_status(key, AssetCheckExecutionResolvedStatus.IN_PROGRESS) + return await self.compute_subset_with_status( + key, AssetCheckExecutionResolvedStatus.IN_PROGRESS + ) - def _compute_execution_failed_check_subset( + async def _compute_execution_failed_check_subset( self, key: AssetCheckKey ) -> EntitySubset[AssetCheckKey]: from dagster._core.storage.asset_check_execution_record import ( AssetCheckExecutionResolvedStatus, ) - return self.compute_subset_with_status( + return await self.compute_subset_with_status( key, AssetCheckExecutionResolvedStatus.EXECUTION_FAILED ) - def _compute_missing_check_subset(self, key: AssetCheckKey) -> EntitySubset[AssetCheckKey]: - return self.compute_subset_with_status(key, None) + async def _compute_missing_check_subset( + self, key: AssetCheckKey + ) -> EntitySubset[AssetCheckKey]: + return await self.compute_subset_with_status(key, None) - def _compute_run_in_progress_asset_subset(self, key: AssetKey) -> EntitySubset[AssetKey]: + async def _compute_run_in_progress_asset_subset(self, key: AssetKey) -> EntitySubset[AssetKey]: from dagster._core.storage.partition_status_cache import AssetStatusCacheValue partitions_def = self._get_partitions_def(key) if partitions_def: - cache_value = AssetStatusCacheValue.blocking_get(self, (key, partitions_def)) + cache_value = await AssetStatusCacheValue.gen(self, (key, partitions_def)) return ( cache_value.get_in_progress_subset(self, key, partitions_def) if cache_value else self.get_empty_subset(key=key) ) - else: - value = self._queryer.get_in_progress_asset_subset(asset_key=key).value + value = self._queryer.get_in_progress_asset_subset(asset_key=key).value return EntitySubset(self, key=key, value=_ValidatedEntitySubsetValue(value)) - def _compute_backfill_in_progress_asset_subset(self, key: AssetKey) -> EntitySubset[AssetKey]: + async def _compute_backfill_in_progress_asset_subset( + self, key: AssetKey + ) -> EntitySubset[AssetKey]: value = ( self._queryer.get_active_backfill_in_progress_asset_graph_subset() .get_asset_subset(asset_key=key, asset_graph=self.asset_graph) @@ -420,22 +426,21 @@ def _compute_backfill_in_progress_asset_subset(self, key: AssetKey) -> EntitySub ) return EntitySubset(self, key=key, value=_ValidatedEntitySubsetValue(value)) - def _compute_execution_failed_asset_subset(self, key: AssetKey) -> EntitySubset[AssetKey]: + async def _compute_execution_failed_asset_subset(self, key: AssetKey) -> EntitySubset[AssetKey]: from dagster._core.storage.partition_status_cache import AssetStatusCacheValue partitions_def = self._get_partitions_def(key) if partitions_def: - cache_value = AssetStatusCacheValue.blocking_get(self, (key, partitions_def)) + cache_value = await AssetStatusCacheValue.gen(self, (key, partitions_def)) return ( cache_value.get_failed_subset(self, key, partitions_def) if cache_value else self.get_empty_subset(key=key) ) - else: - value = self._queryer.get_failed_asset_subset(asset_key=key).value + value = self._queryer.get_failed_asset_subset(asset_key=key).value return EntitySubset(self, key=key, value=_ValidatedEntitySubsetValue(value)) - def _compute_missing_asset_subset( + async def _compute_missing_asset_subset( self, key: AssetKey, from_subset: EntitySubset ) -> EntitySubset[AssetKey]: """Returns a subset which is the subset of the input subset that has never been materialized @@ -451,7 +456,7 @@ def _compute_missing_asset_subset( # cheap call which takes advantage of the partition status cache partitions_def = self._get_partitions_def(key) if partitions_def: - cache_value = AssetStatusCacheValue.blocking_get(self, (key, partitions_def)) + cache_value = await AssetStatusCacheValue.gen(self, (key, partitions_def)) materialized_subset = ( cache_value.get_materialized_subset(self, key, partitions_def) if cache_value @@ -475,33 +480,38 @@ def _compute_missing_asset_subset( ) @cached_method - def compute_run_in_progress_subset(self, *, key: EntityKey) -> EntitySubset: - return _dispatch( + async def compute_run_in_progress_subset(self, *, key: EntityKey) -> EntitySubset: + return await _dispatch( key=key, check_method=self._compute_run_in_progress_check_subset, asset_method=self._compute_run_in_progress_asset_subset, ) @cached_method - def compute_backfill_in_progress_subset(self, *, key: EntityKey) -> EntitySubset: - return _dispatch( + async def compute_backfill_in_progress_subset(self, *, key: EntityKey) -> EntitySubset: + async def get_empty_subset(key: EntityKey) -> EntitySubset: + return self.get_empty_subset(key=key) + + return await _dispatch( key=key, # asset checks cannot currently be backfilled - check_method=lambda k: self.get_empty_subset(key=k), + check_method=get_empty_subset, asset_method=self._compute_backfill_in_progress_asset_subset, ) @cached_method - def compute_execution_failed_subset(self, *, key: EntityKey) -> EntitySubset: - return _dispatch( + async def compute_execution_failed_subset(self, *, key: EntityKey) -> EntitySubset: + return await _dispatch( key=key, check_method=self._compute_execution_failed_check_subset, asset_method=self._compute_execution_failed_asset_subset, ) @cached_method - def compute_missing_subset(self, *, key: EntityKey, from_subset: EntitySubset) -> EntitySubset: - return _dispatch( + async def compute_missing_subset( + self, *, key: EntityKey, from_subset: EntitySubset + ) -> EntitySubset: + return await _dispatch( key=key, check_method=self._compute_missing_check_subset, asset_method=functools.partial( @@ -509,17 +519,17 @@ def compute_missing_subset(self, *, key: EntityKey, from_subset: EntitySubset) - ), ) - def _expensively_filter_entity_subset( - self, subset: EntitySubset, filter_fn: Callable[[Optional[str]], bool] + async def _expensively_filter_entity_subset( + self, subset: EntitySubset, filter_fn: Callable[[Optional[str]], Awaitable[bool]] ) -> EntitySubset: if subset.is_partitioned: return subset.compute_intersection_with_partition_keys( - {pk for pk in subset.expensively_compute_partition_keys() if filter_fn(pk)} + {pk for pk in subset.expensively_compute_partition_keys() if await filter_fn(pk)} ) else: return ( subset - if not subset.is_empty and filter_fn(None) + if not subset.is_empty and await filter_fn(None) else self.get_empty_subset(key=subset.key) ) @@ -528,27 +538,28 @@ def _run_record_targets_entity(self, run_record: "RunRecord", target_key: Entity check_selection = run_record.dagster_run.asset_check_selection or set() return target_key in (asset_selection | check_selection) - def _compute_latest_check_run_executed_with_target( + async def _compute_latest_check_run_executed_with_target( self, partition_key: Optional[str], query_key: AssetCheckKey, target_key: EntityKey ) -> bool: - from dagster._core.storage.asset_check_execution_record import AssetCheckExecutionRecord from dagster._core.storage.dagster_run import RunRecord + from dagster._core.storage.event_log.base import AssetCheckSummaryRecord check.invariant(partition_key is None, "Partitioned checks not supported") - check_record = AssetCheckExecutionRecord.blocking_get(self, query_key) + summary = await AssetCheckSummaryRecord.gen(self, query_key) + check_record = summary.last_check_execution_record if summary else None if check_record and check_record.event: - run_record = RunRecord.blocking_get(self, check_record.event.run_id) + run_record = await RunRecord.gen(self, check_record.event.run_id) return bool(run_record) and self._run_record_targets_entity(run_record, target_key) else: return False - def _compute_latest_asset_run_executed_with_target( + async def _compute_latest_asset_run_executed_with_target( self, partition_key: Optional[str], query_key: AssetKey, target_key: EntityKey ) -> bool: from dagster._core.storage.dagster_run import RunRecord from dagster._core.storage.event_log.base import AssetRecord - asset_record = AssetRecord.blocking_get(self, query_key) + asset_record = await AssetRecord.gen(self, query_key) if ( asset_record and asset_record.asset_entry.last_materialization @@ -556,20 +567,20 @@ def _compute_latest_asset_run_executed_with_target( and asset_record.asset_entry.last_materialization.asset_materialization.partition == partition_key ): - run_record = RunRecord.blocking_get( + run_record = await RunRecord.gen( self, asset_record.asset_entry.last_materialization.run_id ) return bool(run_record) and self._run_record_targets_entity(run_record, target_key) else: return False - def compute_latest_run_executed_with_subset( + async def compute_latest_run_executed_with_subset( self, from_subset: EntitySubset, target: EntityKey ) -> EntitySubset: """Computes the subset of from_subset for which the latest run also targeted the provided target EntityKey. """ - return _dispatch( + return await _dispatch( key=from_subset.key, check_method=lambda k: self._expensively_filter_entity_subset( from_subset, @@ -589,7 +600,7 @@ def compute_latest_run_executed_with_subset( ), ) - def _compute_updated_since_cursor_subset( + async def _compute_updated_since_cursor_subset( self, key: AssetKey, cursor: Optional[int] ) -> EntitySubset[AssetKey]: value = self._queryer.get_asset_subset_updated_after_cursor( @@ -597,14 +608,14 @@ def _compute_updated_since_cursor_subset( ).value return EntitySubset(self, key=key, value=_ValidatedEntitySubsetValue(value)) - def _compute_updated_since_time_subset( + async def _compute_updated_since_time_subset( self, key: AssetCheckKey, time: datetime ) -> EntitySubset[AssetCheckKey]: from dagster._core.events import DagsterEventType from dagster._core.storage.event_log.base import AssetCheckSummaryRecord # intentionally left unimplemented for AssetKey, as this is a less performant query - summary = AssetCheckSummaryRecord.blocking_get(self, key) + summary = await AssetCheckSummaryRecord.gen(self, key) record = summary.last_check_execution_record if summary else None if ( record is None @@ -617,10 +628,10 @@ def _compute_updated_since_time_subset( return self.get_full_subset(key=key) @cached_method - def compute_updated_since_temporal_context_subset( + async def compute_updated_since_temporal_context_subset( self, *, key: EntityKey, temporal_context: TemporalContext ) -> EntitySubset: - return _dispatch( + return await _dispatch( key=key, check_method=functools.partial( self._compute_updated_since_time_subset, time=temporal_context.effective_dt @@ -705,14 +716,14 @@ def _build_multi_partition_subset( O_Dispatch = TypeVar("O_Dispatch") -def _dispatch( +async def _dispatch( *, key: EntityKey, - check_method: Callable[[AssetCheckKey], O_Dispatch], - asset_method: Callable[[AssetKey], O_Dispatch], + check_method: Callable[[AssetCheckKey], Awaitable[O_Dispatch]], + asset_method: Callable[[AssetKey], Awaitable[O_Dispatch]], ) -> O_Dispatch: """Applies a method for either a check or an asset.""" if isinstance(key, AssetCheckKey): - return check_method(key) + return await check_method(key) else: - return asset_method(key) + return await asset_method(key) diff --git a/python_modules/dagster/dagster/_core/definitions/declarative_automation/operands/operands.py b/python_modules/dagster/dagster/_core/definitions/declarative_automation/operands/operands.py index 10d71d330ca15..d4eb5148398f1 100644 --- a/python_modules/dagster/dagster/_core/definitions/declarative_automation/operands/operands.py +++ b/python_modules/dagster/dagster/_core/definitions/declarative_automation/operands/operands.py @@ -60,8 +60,8 @@ class MissingAutomationCondition(SubsetAutomationCondition): def name(self) -> str: return "missing" - def compute_subset(self, context: AutomationContext) -> EntitySubset: - return context.asset_graph_view.compute_missing_subset( + async def compute_subset(self, context: AutomationContext) -> EntitySubset: + return await context.asset_graph_view.compute_missing_subset( key=context.key, from_subset=context.candidate_subset ) @@ -73,8 +73,8 @@ class RunInProgressAutomationCondition(SubsetAutomationCondition): def name(self) -> str: return "execution_in_progress" - def compute_subset(self, context: AutomationContext) -> EntitySubset: - return context.asset_graph_view.compute_run_in_progress_subset(key=context.key) + async def compute_subset(self, context: AutomationContext) -> EntitySubset: + return await context.asset_graph_view.compute_run_in_progress_subset(key=context.key) @whitelist_for_serdes @@ -84,8 +84,8 @@ class BackfillInProgressAutomationCondition(SubsetAutomationCondition): def name(self) -> str: return "backfill_in_progress" - def compute_subset(self, context: AutomationContext) -> EntitySubset: - return context.asset_graph_view.compute_backfill_in_progress_subset(key=context.key) + async def compute_subset(self, context: AutomationContext) -> EntitySubset: + return await context.asset_graph_view.compute_backfill_in_progress_subset(key=context.key) @whitelist_for_serdes(storage_name="FailedAutomationCondition") @@ -95,8 +95,8 @@ class ExecutionFailedAutomationCondition(SubsetAutomationCondition): def name(self) -> str: return "execution_failed" - def compute_subset(self, context: AutomationContext) -> EntitySubset: - return context.asset_graph_view.compute_execution_failed_subset(key=context.key) + async def compute_subset(self, context: AutomationContext) -> EntitySubset: + return await context.asset_graph_view.compute_execution_failed_subset(key=context.key) @whitelist_for_serdes @@ -147,8 +147,8 @@ class LatestRunExecutedWithRootTargetCondition(SubsetAutomationCondition): def name(self) -> str: return "executed_with_root_target" - def compute_subset(self, context: AutomationContext) -> EntitySubset: - return context.asset_graph_view.compute_latest_run_executed_with_subset( + async def compute_subset(self, context: AutomationContext) -> EntitySubset: + return await context.asset_graph_view.compute_latest_run_executed_with_subset( from_subset=context.candidate_subset, target=context.root_context.key ) @@ -160,11 +160,11 @@ class NewlyUpdatedCondition(SubsetAutomationCondition): def name(self) -> str: return "newly_updated" - def compute_subset(self, context: AutomationContext) -> EntitySubset: + async def compute_subset(self, context: AutomationContext) -> EntitySubset: # if it's the first time evaluating, just return the empty subset if context.previous_temporal_context is None: return context.get_empty_subset() - return context.asset_graph_view.compute_updated_since_temporal_context_subset( + return await context.asset_graph_view.compute_updated_since_temporal_context_subset( key=context.key, temporal_context=context.previous_temporal_context ) @@ -253,7 +253,7 @@ class CheckResultCondition(SubsetAutomationCondition[AssetCheckKey]): def name(self) -> str: return "check_passed" if self.passed else "check_failed" - def compute_subset( + async def compute_subset( self, context: AutomationContext[AssetCheckKey] ) -> EntitySubset[AssetCheckKey]: from dagster._core.storage.asset_check_execution_record import ( @@ -265,6 +265,6 @@ def compute_subset( if self.passed else AssetCheckExecutionResolvedStatus.FAILED ) - return context.asset_graph_view.compute_subset_with_status( + return await context.asset_graph_view.compute_subset_with_status( key=context.key, status=target_status ) diff --git a/python_modules/dagster/dagster/_core/definitions/declarative_automation/operands/subset_automation_condition.py b/python_modules/dagster/dagster/_core/definitions/declarative_automation/operands/subset_automation_condition.py index a94d4138d3893..a66ed1281ea20 100644 --- a/python_modules/dagster/dagster/_core/definitions/declarative_automation/operands/subset_automation_condition.py +++ b/python_modules/dagster/dagster/_core/definitions/declarative_automation/operands/subset_automation_condition.py @@ -1,3 +1,4 @@ +import inspect from abc import abstractmethod from dagster._core.asset_graph_view.entity_subset import EntitySubset @@ -23,10 +24,14 @@ def compute_subset( self, context: AutomationContext[T_EntityKey] ) -> EntitySubset[T_EntityKey]: ... - def evaluate(self, context: AutomationContext[T_EntityKey]) -> AutomationResult[T_EntityKey]: + async def evaluate( + self, context: AutomationContext[T_EntityKey] + ) -> AutomationResult[T_EntityKey]: # don't compute anything if there are no candidates if context.candidate_subset.is_empty: true_subset = context.get_empty_subset() + elif inspect.iscoroutinefunction(self.compute_subset): + true_subset = await self.compute_subset(context) else: true_subset = self.compute_subset(context) diff --git a/python_modules/dagster/dagster/_core/storage/asset_check_execution_record.py b/python_modules/dagster/dagster/_core/storage/asset_check_execution_record.py index 0424d2ddb1b56..7d7e94496d20e 100644 --- a/python_modules/dagster/dagster/_core/storage/asset_check_execution_record.py +++ b/python_modules/dagster/dagster/_core/storage/asset_check_execution_record.py @@ -156,7 +156,7 @@ def resolve_status(self, loading_context: LoadingContext) -> AssetCheckExecution else: check.failed(f"Unexpected status {self.status}") - def targets_latest_materialization(self, loading_context: LoadingContext) -> bool: + async def targets_latest_materialization(self, loading_context: LoadingContext) -> bool: from dagster._core.storage.event_log.base import AssetRecord resolved_status = self.resolve_status(loading_context) @@ -164,7 +164,7 @@ def targets_latest_materialization(self, loading_context: LoadingContext) -> boo # all in-progress checks execute against the latest version return True - asset_record = AssetRecord.blocking_get(loading_context, self.key.asset_key) + asset_record = await AssetRecord.gen(loading_context, self.key.asset_key) latest_materialization = ( asset_record.asset_entry.last_materialization_record if asset_record else None ) @@ -197,10 +197,10 @@ def targets_latest_materialization(self, loading_context: LoadingContext) -> boo ]: # the evaluation didn't complete, so we don't have target_materialization_data, so check if # the check's run executed after the materializations as a fallback - latest_materialization_run_record = RunRecord.blocking_get( + latest_materialization_run_record = await RunRecord.gen( loading_context, latest_materialization_run_id ) - check_run_record = RunRecord.blocking_get(loading_context, self.run_id) + check_run_record = await RunRecord.gen(loading_context, self.run_id) return bool( latest_materialization_run_record and check_run_record