From 19a7eedb391839e50605ff2a9ed1d409409476d3 Mon Sep 17 00:00:00 2001 From: Anuj Gupta <84966248+Anuj-Gupta4@users.noreply.github.com> Date: Thu, 9 Jan 2025 20:46:22 +0545 Subject: [PATCH] fix(backend): various fixes based on sentry error reports (#2053) * fix(db): update connection handling for background task updates * feat(db): implement batch processing for entity updates in the database * fix(submission_crud): correct attachment key and await db commit * fix(db): update connection handling in DbBackgroundTask for cursor usage * fix(backend): also get a new db conn from pool when updating basemap model --------- Co-authored-by: spwoodcock --- src/backend/app/db/models.py | 104 +++++++++++------- .../app/submissions/submission_crud.py | 4 +- 2 files changed, 65 insertions(+), 43 deletions(-) diff --git a/src/backend/app/db/models.py b/src/backend/app/db/models.py index 168177e849..e68b358f61 100644 --- a/src/backend/app/db/models.py +++ b/src/backend/app/db/models.py @@ -39,6 +39,7 @@ from app.central.central_schemas import ODKCentralDecrypted from app.config import settings +from app.db import database from app.db.enums import ( BackgroundTaskStatus, CommunityType, @@ -1387,58 +1388,69 @@ async def upsert( db: Connection, project_id: int, entities: list[Self], + batch_size: int = 10000, ) -> bool: - """Update or insert Entity data, with statuses. + """Update or insert Entity data in batches, with statuses. Args: db (Connection): The database connection. project_id (int): The project ID. entities (list[Self]): List of DbOdkEntities objects. + batch_size (int): Number of entities to process in each batch. Returns: bool: Success or failure. """ log.info( f"Updating FMTM database Entities for project {project_id} " - f"with ({len(entities)}) features" + f"with ({len(entities)}) features in batches of {batch_size}" ) - sql = """ - INSERT INTO public.odk_entities - (entity_id, status, project_id, task_id) - VALUES - """ + result = [] - # Prepare data for bulk insert - values = [] - data = {} - for index, entity in enumerate(entities): - entity_index = f"entity_{index}" - values.append( - f"(%({entity_index}_entity_id)s, " - f"%({entity_index}_status)s, " - f"%({entity_index}_project_id)s, " - f"%({entity_index}_task_id)s)" - ) - data[f"{entity_index}_entity_id"] = entity["id"] - data[f"{entity_index}_status"] = EntityState(int(entity["status"])).name - data[f"{entity_index}_project_id"] = project_id - task_id = entity["task_id"] - data[f"{entity_index}_task_id"] = int(task_id) if task_id else None + for batch_start in range(0, len(entities), batch_size): + batch = entities[batch_start : batch_start + batch_size] + sql = """ + INSERT INTO public.odk_entities + (entity_id, status, project_id, task_id) + VALUES + """ - sql += ( - ", ".join(values) - + """ - ON CONFLICT (entity_id) DO UPDATE SET - status = EXCLUDED.status, - task_id = EXCLUDED.task_id - RETURNING True; - """ - ) + # Prepare data for batch insert + values = [] + data = {} + for index, entity in enumerate(batch): + entity_index = f"entity_{batch_start + index}" + values.append( + f"(%({entity_index}_entity_id)s, " + f"%({entity_index}_status)s, " + f"%({entity_index}_project_id)s, " + f"%({entity_index}_task_id)s)" + ) + data[f"{entity_index}_entity_id"] = entity["id"] + data[f"{entity_index}_status"] = EntityState(int(entity["status"])).name + data[f"{entity_index}_project_id"] = project_id + task_id = entity["task_id"] + data[f"{entity_index}_task_id"] = int(task_id) if task_id else None + + sql += ( + ", ".join(values) + + """ + ON CONFLICT (entity_id) DO UPDATE SET + status = EXCLUDED.status, + task_id = EXCLUDED.task_id + RETURNING True; + """ + ) - async with db.cursor() as cur: - await cur.execute(sql, data) - result = await cur.fetchall() + async with db.cursor() as cur: + await cur.execute(sql, data) + batch_result = await cur.fetchall() + if not batch_result: + log.warning( + f"Batch failed at batch {batch_start} for project {project_id}" + ) + result.extend(batch_result) return bool(result) @@ -1534,9 +1546,14 @@ async def update( RETURNING *; """ - async with db.cursor(row_factory=class_row(cls)) as cur: - await cur.execute(sql, {"task_id": task_id, **model_dump}) - updated_task = await cur.fetchone() + # This is a workaround as the db connection can often timeout, + # before the background job is finished processing + pool = database.get_db_connection_pool() + async with pool as pool_instance: + async with pool_instance.connection() as conn: + async with conn.cursor(row_factory=class_row(cls)) as cur: + await cur.execute(sql, {"task_id": task_id, **model_dump}) + updated_task = await cur.fetchone() if updated_task is None: msg = f"Failed to update background task with ID: {task_id}" @@ -1690,9 +1707,14 @@ async def update( RETURNING *; """ - async with db.cursor(row_factory=class_row(cls)) as cur: - await cur.execute(sql, {"basemap_id": basemap_id, **model_dump}) - updated_basemap = await cur.fetchone() + # This is a workaround as the db connection can often timeout, + # before the basemap is finished processing + pool = database.get_db_connection_pool() + async with pool as pool_instance: + async with pool_instance.connection() as conn: + async with conn.cursor(row_factory=class_row(cls)) as cur: + await cur.execute(sql, {"basemap_id": basemap_id, **model_dump}) + updated_basemap = await cur.fetchone() if updated_basemap is None: msg = f"Failed to update basemap with ID: {basemap_id}" diff --git a/src/backend/app/submissions/submission_crud.py b/src/backend/app/submissions/submission_crud.py index 56bcf6a996..dc563da764 100644 --- a/src/backend/app/submissions/submission_crud.py +++ b/src/backend/app/submissions/submission_crud.py @@ -294,7 +294,7 @@ async def upload_attachment_to_s3( batch_insert_data = [] for instance_id in instance_ids: submission_detail = await get_submission_detail(instance_id, project) - attachments = submission_detail["verification"]["image"] + attachments = submission_detail["image"] if not isinstance(attachments, list): attachments = [attachments] @@ -358,7 +358,7 @@ async def upload_attachment_to_s3( """, batch_insert_data, ) - db.commit() + await db.commit() return True except Exception as e: