Skip to content

Commit

Permalink
targets_latest_materialization
Browse files Browse the repository at this point in the history
  • Loading branch information
briantu committed Oct 17, 2024
1 parent e4eeabd commit 7dabca7
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ async def resolve_executionForLatestMaterialization(
record = await AssetCheckExecutionRecord.gen(graphene_info.context, self._asset_check.key)
return (
GrapheneAssetCheckExecution(record)
if record and record.targets_latest_materialization(graphene_info.context)
if record and await record.targets_latest_materialization(graphene_info.context)
else None
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ async def compute_subset_with_status(
latest_record = summary.last_check_execution_record if summary else None
resolved_status = (
latest_record.resolve_status(self)
if latest_record and latest_record.targets_latest_materialization(self)
if latest_record and await latest_record.targets_latest_materialization(self)
else None
)
if resolved_status == status:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,15 +156,15 @@ def resolve_status(self, loading_context: LoadingContext) -> AssetCheckExecution
else:
check.failed(f"Unexpected status {self.status}")

def targets_latest_materialization(self, loading_context: LoadingContext) -> bool:
async def targets_latest_materialization(self, loading_context: LoadingContext) -> bool:
from dagster._core.storage.event_log.base import AssetRecord

resolved_status = self.resolve_status(loading_context)
if resolved_status == AssetCheckExecutionResolvedStatus.IN_PROGRESS:
# all in-progress checks execute against the latest version
return True

asset_record = AssetRecord.blocking_get(loading_context, self.key.asset_key)
asset_record = await AssetRecord.gen(loading_context, self.key.asset_key)
latest_materialization = (
asset_record.asset_entry.last_materialization_record if asset_record else None
)
Expand Down Expand Up @@ -197,10 +197,10 @@ def targets_latest_materialization(self, loading_context: LoadingContext) -> boo
]:
# the evaluation didn't complete, so we don't have target_materialization_data, so check if
# the check's run executed after the materializations as a fallback
latest_materialization_run_record = RunRecord.blocking_get(
latest_materialization_run_record = await RunRecord.gen(
loading_context, latest_materialization_run_id
)
check_run_record = RunRecord.blocking_get(loading_context, self.run_id)
check_run_record = await RunRecord.gen(loading_context, self.run_id)
return bool(
latest_materialization_run_record
and check_run_record
Expand Down

0 comments on commit 7dabca7

Please sign in to comment.