Skip to content

Commit

Permalink
Improve scheduled flows query (#16121)
Browse files Browse the repository at this point in the history
  • Loading branch information
ashtuchkin authored Nov 27, 2024
1 parent a6bdfdf commit b21233f
Showing 1 changed file with 5 additions and 11 deletions.
16 changes: 5 additions & 11 deletions src/prefect/server/models/deployments.py
Original file line number Diff line number Diff line change
Expand Up @@ -693,17 +693,11 @@ async def _insert_scheduled_flow_runs(

# query for the rows that were newly inserted (by checking for any flow runs with
# no corresponding flow run states)
inserted_rows = (
sa.select(db.FlowRun.id)
.join(
db.FlowRunState,
db.FlowRun.id == db.FlowRunState.flow_run_id,
isouter=True,
)
.where(
db.FlowRun.id.in_([r["id"] for r in runs]),
db.FlowRunState.id.is_(None),
)
inserted_rows = sa.select(db.FlowRun.id).where(
db.FlowRun.id.in_([r["id"] for r in runs]),
~select(db.FlowRunState.id)
.where(db.FlowRunState.flow_run_id == db.FlowRun.id)
.exists(),
)
inserted_flow_run_ids = (await session.execute(inserted_rows)).scalars().all()

Expand Down

0 comments on commit b21233f

Please sign in to comment.