From b21233f38ee08357b5259c0e0b5f99070ee3a54f Mon Sep 17 00:00:00 2001 From: Alexander Shtuchkin Date: Wed, 27 Nov 2024 11:45:20 -0500 Subject: [PATCH] Improve scheduled flows query (#16121) --- src/prefect/server/models/deployments.py | 16 +++++----------- 1 file changed, 5 insertions(+), 11 deletions(-) diff --git a/src/prefect/server/models/deployments.py b/src/prefect/server/models/deployments.py index 46b5274fdc3c..60b8521fc522 100644 --- a/src/prefect/server/models/deployments.py +++ b/src/prefect/server/models/deployments.py @@ -693,17 +693,11 @@ async def _insert_scheduled_flow_runs( # query for the rows that were newly inserted (by checking for any flow runs with # no corresponding flow run states) - inserted_rows = ( - sa.select(db.FlowRun.id) - .join( - db.FlowRunState, - db.FlowRun.id == db.FlowRunState.flow_run_id, - isouter=True, - ) - .where( - db.FlowRun.id.in_([r["id"] for r in runs]), - db.FlowRunState.id.is_(None), - ) + inserted_rows = sa.select(db.FlowRun.id).where( + db.FlowRun.id.in_([r["id"] for r in runs]), + ~select(db.FlowRunState.id) + .where(db.FlowRunState.flow_run_id == db.FlowRun.id) + .exists(), ) inserted_flow_run_ids = (await session.execute(inserted_rows)).scalars().all()