From 63c2dc875e1fd4719ee282db1bf89a517bdc31dd Mon Sep 17 00:00:00 2001 From: briantu Date: Tue, 22 Oct 2024 15:49:57 -0400 Subject: [PATCH] async LatestRunExecutedWithRootTargetCondition --- .../asset_graph_view/asset_graph_view.py | 29 ++++++++++--------- .../operands/operands.py | 4 +-- 2 files changed, 17 insertions(+), 16 deletions(-) diff --git a/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py b/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py index bd6349d3cb432..955fb805695df 100644 --- a/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py +++ b/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py @@ -360,9 +360,9 @@ def compute_latest_time_window_subset( async def compute_subset_with_status( self, key: AssetCheckKey, status: Optional["AssetCheckExecutionResolvedStatus"] ): - """Returns the subset of an asset check that matches a given status.""" from dagster._core.storage.event_log.base import AssetCheckSummaryRecord + """Returns the subset of an asset check that matches a given status.""" summary = await AssetCheckSummaryRecord.gen(self, key) latest_record = summary.last_check_execution_record if summary else None resolved_status = ( @@ -519,17 +519,17 @@ async def compute_missing_subset( ), ) - def _expensively_filter_entity_subset( - self, subset: EntitySubset, filter_fn: Callable[[Optional[str]], bool] + async def _expensively_filter_entity_subset( + self, subset: EntitySubset, filter_fn: Callable[[Optional[str]], Awaitable[bool]] ) -> EntitySubset: if subset.is_partitioned: return subset.compute_intersection_with_partition_keys( - {pk for pk in subset.expensively_compute_partition_keys() if filter_fn(pk)} + {pk for pk in subset.expensively_compute_partition_keys() if await filter_fn(pk)} ) else: return ( subset - if not subset.is_empty and filter_fn(None) + if not subset.is_empty and await filter_fn(None) else self.get_empty_subset(key=subset.key) ) @@ -538,27 +538,28 @@ def _run_record_targets_entity(self, run_record: "RunRecord", target_key: Entity check_selection = run_record.dagster_run.asset_check_selection or set() return target_key in (asset_selection | check_selection) - def _compute_latest_check_run_executed_with_target( + async def _compute_latest_check_run_executed_with_target( self, partition_key: Optional[str], query_key: AssetCheckKey, target_key: EntityKey ) -> bool: - from dagster._core.storage.asset_check_execution_record import AssetCheckExecutionRecord from dagster._core.storage.dagster_run import RunRecord + from dagster._core.storage.event_log.base import AssetCheckSummaryRecord check.invariant(partition_key is None, "Partitioned checks not supported") - check_record = AssetCheckExecutionRecord.blocking_get(self, query_key) + summary = await AssetCheckSummaryRecord.gen(self, query_key) + check_record = summary.last_check_execution_record if summary else None if check_record and check_record.event: - run_record = RunRecord.blocking_get(self, check_record.event.run_id) + run_record = await RunRecord.gen(self, check_record.event.run_id) return bool(run_record) and self._run_record_targets_entity(run_record, target_key) else: return False - def _compute_latest_asset_run_executed_with_target( + async def _compute_latest_asset_run_executed_with_target( self, partition_key: Optional[str], query_key: AssetKey, target_key: EntityKey ) -> bool: from dagster._core.storage.dagster_run import RunRecord from dagster._core.storage.event_log.base import AssetRecord - asset_record = AssetRecord.blocking_get(self, query_key) + asset_record = await AssetRecord.gen(self, query_key) if ( asset_record and asset_record.asset_entry.last_materialization @@ -566,20 +567,20 @@ def _compute_latest_asset_run_executed_with_target( and asset_record.asset_entry.last_materialization.asset_materialization.partition == partition_key ): - run_record = RunRecord.blocking_get( + run_record = await RunRecord.gen( self, asset_record.asset_entry.last_materialization.run_id ) return bool(run_record) and self._run_record_targets_entity(run_record, target_key) else: return False - def compute_latest_run_executed_with_subset( + async def compute_latest_run_executed_with_subset( self, from_subset: EntitySubset, target: EntityKey ) -> EntitySubset: """Computes the subset of from_subset for which the latest run also targeted the provided target EntityKey. """ - return _dispatch( + return await _dispatch( key=from_subset.key, check_method=lambda k: self._expensively_filter_entity_subset( from_subset, diff --git a/python_modules/dagster/dagster/_core/definitions/declarative_automation/operands/operands.py b/python_modules/dagster/dagster/_core/definitions/declarative_automation/operands/operands.py index bcfb014429720..d4eb5148398f1 100644 --- a/python_modules/dagster/dagster/_core/definitions/declarative_automation/operands/operands.py +++ b/python_modules/dagster/dagster/_core/definitions/declarative_automation/operands/operands.py @@ -147,8 +147,8 @@ class LatestRunExecutedWithRootTargetCondition(SubsetAutomationCondition): def name(self) -> str: return "executed_with_root_target" - def compute_subset(self, context: AutomationContext) -> EntitySubset: - return context.asset_graph_view.compute_latest_run_executed_with_subset( + async def compute_subset(self, context: AutomationContext) -> EntitySubset: + return await context.asset_graph_view.compute_latest_run_executed_with_subset( from_subset=context.candidate_subset, target=context.root_context.key )