Skip to content

Commit

Permalink
async LatestRunExecutedWithRootTargetCondition
Browse files Browse the repository at this point in the history
  • Loading branch information
briantu committed Oct 22, 2024
1 parent de4cfa6 commit 3a0dca2
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -360,9 +360,9 @@ def compute_latest_time_window_subset(
async def compute_subset_with_status(
self, key: AssetCheckKey, status: Optional["AssetCheckExecutionResolvedStatus"]
):
"""Returns the subset of an asset check that matches a given status."""
from dagster._core.storage.event_log.base import AssetCheckSummaryRecord

"""Returns the subset of an asset check that matches a given status."""
summary = await AssetCheckSummaryRecord.gen(self, key)
latest_record = summary.last_check_execution_record if summary else None
resolved_status = (
Expand Down Expand Up @@ -519,17 +519,17 @@ async def compute_missing_subset(
),
)

def _expensively_filter_entity_subset(
self, subset: EntitySubset, filter_fn: Callable[[Optional[str]], bool]
async def _expensively_filter_entity_subset(
self, subset: EntitySubset, filter_fn: Callable[[Optional[str]], Awaitable[bool]]
) -> EntitySubset:
if subset.is_partitioned:
return subset.compute_intersection_with_partition_keys(
{pk for pk in subset.expensively_compute_partition_keys() if filter_fn(pk)}
{pk for pk in subset.expensively_compute_partition_keys() if await filter_fn(pk)}
)
else:
return (
subset
if not subset.is_empty and filter_fn(None)
if not subset.is_empty and await filter_fn(None)
else self.get_empty_subset(key=subset.key)
)

Expand All @@ -538,48 +538,49 @@ def _run_record_targets_entity(self, run_record: "RunRecord", target_key: Entity
check_selection = run_record.dagster_run.asset_check_selection or set()
return target_key in (asset_selection | check_selection)

def _compute_latest_check_run_executed_with_target(
async def _compute_latest_check_run_executed_with_target(
self, partition_key: Optional[str], query_key: AssetCheckKey, target_key: EntityKey
) -> bool:
from dagster._core.storage.asset_check_execution_record import AssetCheckExecutionRecord
from dagster._core.storage.dagster_run import RunRecord
from dagster._core.storage.event_log.base import AssetCheckSummaryRecord

check.invariant(partition_key is None, "Partitioned checks not supported")
check_record = AssetCheckExecutionRecord.blocking_get(self, query_key)
summary = await AssetCheckSummaryRecord.gen(self, query_key)
check_record = summary.last_check_execution_record if summary else None
if check_record and check_record.event:
run_record = RunRecord.blocking_get(self, check_record.event.run_id)
run_record = await RunRecord.gen(self, check_record.event.run_id)
return bool(run_record) and self._run_record_targets_entity(run_record, target_key)
else:
return False

def _compute_latest_asset_run_executed_with_target(
async def _compute_latest_asset_run_executed_with_target(
self, partition_key: Optional[str], query_key: AssetKey, target_key: EntityKey
) -> bool:
from dagster._core.storage.dagster_run import RunRecord
from dagster._core.storage.event_log.base import AssetRecord

asset_record = AssetRecord.blocking_get(self, query_key)
asset_record = await AssetRecord.gen(self, query_key)
if (
asset_record
and asset_record.asset_entry.last_materialization
and asset_record.asset_entry.last_materialization.asset_materialization
and asset_record.asset_entry.last_materialization.asset_materialization.partition
== partition_key
):
run_record = RunRecord.blocking_get(
run_record = await RunRecord.gen(
self, asset_record.asset_entry.last_materialization.run_id
)
return bool(run_record) and self._run_record_targets_entity(run_record, target_key)
else:
return False

def compute_latest_run_executed_with_subset(
async def compute_latest_run_executed_with_subset(
self, from_subset: EntitySubset, target: EntityKey
) -> EntitySubset:
"""Computes the subset of from_subset for which the latest run also targeted
the provided target EntityKey.
"""
return _dispatch(
return await _dispatch(
key=from_subset.key,
check_method=lambda k: self._expensively_filter_entity_subset(
from_subset,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,8 @@ class LatestRunExecutedWithRootTargetCondition(SubsetAutomationCondition):
def name(self) -> str:
return "executed_with_root_target"

def compute_subset(self, context: AutomationContext) -> EntitySubset:
return context.asset_graph_view.compute_latest_run_executed_with_subset(
async def compute_subset(self, context: AutomationContext) -> EntitySubset:
return await context.asset_graph_view.compute_latest_run_executed_with_subset(
from_subset=context.candidate_subset, target=context.root_context.key
)

Expand Down

0 comments on commit 3a0dca2

Please sign in to comment.