From 47752177b45fbf0e06d233a843004c71cae13778 Mon Sep 17 00:00:00 2001 From: prabinoid <38830224+prabinoid@users.noreply.github.com> Date: Wed, 4 Dec 2024 14:09:35 +0545 Subject: [PATCH] * Concurrent task validation using asyncio gather * messaging tasks added to background. * db context added in tasks and removed explicit connection. * send message on project progress. * project transfer messaging. --- backend/api/projects/actions.py | 15 +- backend/api/projects/resources.py | 2 - backend/api/tasks/actions.py | 17 +- backend/api/teams/actions.py | 3 +- backend/db.py | 1 - backend/models/dtos/project_dto.py | 2 +- backend/models/dtos/user_dto.py | 3 +- backend/services/mapping_service.py | 11 +- backend/services/mapswipe_service.py | 2 - backend/services/messaging/chat_service.py | 2 - backend/services/messaging/message_service.py | 87 +++++---- backend/services/messaging/smtp_service.py | 100 +++++----- backend/services/project_admin_service.py | 91 +++++---- backend/services/project_service.py | 101 +++++----- backend/services/team_service.py | 4 +- .../services/users/authentication_service.py | 36 +++- backend/services/validator_service.py | 176 ++++++++++-------- 17 files changed, 331 insertions(+), 322 deletions(-) diff --git a/backend/api/projects/actions.py b/backend/api/projects/actions.py index 246901e514..d0b41b35ec 100644 --- a/backend/api/projects/actions.py +++ b/backend/api/projects/actions.py @@ -5,7 +5,7 @@ from shapely import GEOSException from shapely.errors import TopologicalError -from backend.db import db_connection, get_db +from backend.db import get_db from backend.models.dtos.grid_dto import GridDTO from backend.models.dtos.message_dto import MessageDTO from backend.models.dtos.user_dto import AuthUserDTO @@ -30,6 +30,7 @@ @router.post("/{project_id}/actions/transfer-ownership/") async def post( request: Request, + background_tasks: BackgroundTasks, project_id: int, user: AuthUserDTO = Depends(login_required), db: Database = Depends(get_db), @@ -81,13 +82,10 @@ async def post( status_code=400, ) try: - async with db.transaction(): - await ProjectAdminService.transfer_project_to( - project_id, user.id, username, db - ) - return JSONResponse( - content={"Success": "Project Transferred"}, status_code=200 - ) + await ProjectAdminService.transfer_project_to( + project_id, user.id, username, db, background_tasks + ) + return JSONResponse(content={"Success": "Project Transferred"}, status_code=200) except (ValueError, ProjectAdminServiceError) as e: return JSONResponse( content={"Error": str(e).split("-")[1], "SubCode": str(e).split("-")[0]}, @@ -175,7 +173,6 @@ async def post( MessageService.send_message_to_all_contributors, project_id, message_dto, - db_connection.database, ) return JSONResponse(content={"Success": "Messages started"}, status_code=200) except Exception as e: diff --git a/backend/api/projects/resources.py b/backend/api/projects/resources.py index 7a6ada3962..a75a5b0b47 100644 --- a/backend/api/projects/resources.py +++ b/backend/api/projects/resources.py @@ -255,8 +255,6 @@ async def post( ) -# @router.head("/{project_id}", response_model=ProjectDTO) -# @requires('authenticated') def head(request: Request, project_id): """ Retrieves a Tasking-Manager project diff --git a/backend/api/tasks/actions.py b/backend/api/tasks/actions.py index 25977ce9ea..d393fc55ed 100644 --- a/backend/api/tasks/actions.py +++ b/backend/api/tasks/actions.py @@ -1,5 +1,5 @@ from databases import Database -from fastapi import APIRouter, Depends, Query, Request +from fastapi import APIRouter, Depends, Query, Request, BackgroundTasks from fastapi.responses import JSONResponse from loguru import logger @@ -228,6 +228,7 @@ async def post( request: Request, project_id: int, task_id: int, + background_tasks: BackgroundTasks, db: Database = Depends(get_db), user: AuthUserDTO = Depends(login_required), ): @@ -312,7 +313,9 @@ async def post( try: await ProjectService.exists(project_id, db) async with db.transaction(): - task = await MappingService.unlock_task_after_mapping(mapped_task, db) + task = await MappingService.unlock_task_after_mapping( + mapped_task, db, background_tasks + ) return task except MappingServiceError as e: @@ -591,6 +594,7 @@ async def post( async def post( request: Request, project_id: int, + background_tasks: BackgroundTasks, db: Database = Depends(get_db), user: AuthUserDTO = Depends(login_required), ): @@ -662,11 +666,10 @@ async def post( ) try: await ProjectService.exists(project_id, db) - async with db.transaction(): - tasks = await ValidatorService.unlock_tasks_after_validation( - validated_dto, db - ) - return tasks + tasks = await ValidatorService.unlock_tasks_after_validation( + validated_dto, db, background_tasks + ) + return tasks except ValidatorServiceError as e: return JSONResponse( content={"Error": str(e).split("-")[1], "SubCode": str(e).split("-")[0]}, diff --git a/backend/api/teams/actions.py b/backend/api/teams/actions.py index 91dfb8be1d..a28326d697 100644 --- a/backend/api/teams/actions.py +++ b/backend/api/teams/actions.py @@ -3,7 +3,7 @@ from fastapi.responses import JSONResponse from loguru import logger -from backend.db import db_connection, get_db +from backend.db import get_db from backend.models.dtos.message_dto import MessageDTO from backend.models.dtos.user_dto import AuthUserDTO from backend.models.postgis.user import User @@ -403,7 +403,6 @@ async def post( team.name, message_dto, user.id, - db_connection.database, ) return JSONResponse( content={"Success": "Message sent successfully"}, status_code=200 diff --git a/backend/db.py b/backend/db.py index a59234a8fe..b4973bcce6 100644 --- a/backend/db.py +++ b/backend/db.py @@ -1,7 +1,6 @@ from databases import Database from sqlalchemy import create_engine from sqlalchemy.orm import declarative_base, sessionmaker - from backend.config import settings Base = declarative_base() diff --git a/backend/models/dtos/project_dto.py b/backend/models/dtos/project_dto.py index 1bd2654100..3916b261d0 100644 --- a/backend/models/dtos/project_dto.py +++ b/backend/models/dtos/project_dto.py @@ -441,7 +441,7 @@ class ListSearchResultDTO(BaseModel): last_updated: Optional[datetime] = Field(alias="lastUpdated", default=None) due_date: Optional[datetime] = Field(alias="dueDate", default=None) total_contributors: Optional[int] = Field(alias="totalContributors", default=None) - country: Optional[str] = Field(default="", serialize=False) + country: Optional[List[str]] = Field(default=None) class Config: populate_by_name = True diff --git a/backend/models/dtos/user_dto.py b/backend/models/dtos/user_dto.py index 0418a892a5..87386b80cf 100644 --- a/backend/models/dtos/user_dto.py +++ b/backend/models/dtos/user_dto.py @@ -32,8 +32,7 @@ class UserDTO(BaseModel): mapping_level: Optional[str] = Field(None, alias="mappingLevel") projects_mapped: Optional[int] = Field(None, alias="projectsMapped") email_address: Optional[str] = Field(None, alias="emailAddress") - - is_email_verified: Optional[str] = Field( + is_email_verified: Optional[bool] = Field( None, alias="isEmailVerified", serialize_when_none=False ) is_expert: bool = Field(None, alias="isExpert", serialize_when_none=False) diff --git a/backend/services/mapping_service.py b/backend/services/mapping_service.py index 7b49078e42..66c58af6dd 100644 --- a/backend/services/mapping_service.py +++ b/backend/services/mapping_service.py @@ -2,6 +2,7 @@ import xml.etree.ElementTree as ET from databases import Database +from fastapi import BackgroundTasks # from flask import current_app from geoalchemy2 import WKBElement @@ -140,7 +141,9 @@ async def lock_task_for_mapping( @staticmethod async def unlock_task_after_mapping( - mapped_task: MappedTaskDTO, db: Database + mapped_task: MappedTaskDTO, + db: Database, + background_tasks: BackgroundTasks, ) -> TaskDTO: """Unlocks the task and sets the task history appropriately""" # Fetch the task locked by the user @@ -172,6 +175,7 @@ async def unlock_task_after_mapping( mapped_task.project_id, db, ) + # Unlock the task and change its state await Task.unlock_task( task_id=mapped_task.task_id, @@ -182,8 +186,9 @@ async def unlock_task_after_mapping( comment=mapped_task.comment, ) # Send email on project progress - # TODO: Verify this email mechanism. - await ProjectService.send_email_on_project_progress(mapped_task.project_id, db) + background_tasks.add_task( + ProjectService.send_email_on_project_progress, mapped_task.project_id + ) return await Task.as_dto_with_instructions( task_id=mapped_task.task_id, diff --git a/backend/services/mapswipe_service.py b/backend/services/mapswipe_service.py index 12249f2438..00ea242434 100644 --- a/backend/services/mapswipe_service.py +++ b/backend/services/mapswipe_service.py @@ -140,7 +140,6 @@ def setup_group_dto( self, partner_id: str, group_id: str, resp_body: str ) -> GroupedPartnerStatsDTO: group_stats = json.loads(resp_body)["data"] - print(group_stats) group_dto = GroupedPartnerStatsDTO(provider="mapswipe") group_dto.id = partner_id group_dto.id_inside_provider = group_id @@ -156,7 +155,6 @@ def setup_group_dto( group_dto.members_count = group_stats["userGroup"]["userMemberships"]["count"] group_dto.members = [] - print(group_stats["userGroup"]["userMemberships"]["items"]) for user_resp in group_stats["userGroup"]["userMemberships"]["items"]: user = UserGroupMemberDTO() user.id = user_resp["id"] diff --git a/backend/services/messaging/chat_service.py b/backend/services/messaging/chat_service.py index 15294b6005..6ec0d3e27a 100644 --- a/backend/services/messaging/chat_service.py +++ b/backend/services/messaging/chat_service.py @@ -11,7 +11,6 @@ from backend.services.project_admin_service import ProjectAdminService from backend.services.project_service import ProjectService from backend.services.team_service import TeamService -from backend.db import db_connection class ChatService: @@ -73,7 +72,6 @@ async def post_message( chat_message.message, chat_dto.project_id, project_name, - db_connection.database, ) return await ProjectChat.get_messages(chat_dto.project_id, db, 1, 5) else: diff --git a/backend/services/messaging/message_service.py b/backend/services/messaging/message_service.py index 0436ec6d4e..b199f2a090 100644 --- a/backend/services/messaging/message_service.py +++ b/backend/services/messaging/message_service.py @@ -10,8 +10,8 @@ from markdown import markdown from sqlalchemy import func, insert, text -from backend import db from backend.config import settings +from backend.db import db_connection from backend.exceptions import NotFound from backend.models.dtos.message_dto import MessageDTO, MessagesDTO from backend.models.dtos.stats_dto import Pagination @@ -129,12 +129,12 @@ async def send_message_after_validation( @staticmethod async def send_message_to_all_contributors( - project_id: int, message_dto: MessageDTO, database: Database + project_id: int, message_dto: MessageDTO ): """Sends supplied message to all contributors on specified project. Message all contributors can take over a minute to run, so this method is expected to be called on its own thread """ - async with database.connection() as conn: + async with db_connection.database.connection() as conn: contributors = await Message.get_all_contributors(project_id, conn) project = await Project.get(project_id, conn) project_info = await ProjectInfo.get_dto_for_locale( @@ -393,47 +393,47 @@ async def send_project_transfer_message( project_id: int, transferred_to: str, transferred_by: str, - db: Database, ): """Will send a message to the manager of the organization after a project is transferred""" - project = await Project.get(project_id, db) - project_name = project.get_project_title(project.default_locale) - message = Message() - message.message_type = MessageType.SYSTEM.value - message.date = timestamp() - message.read = False - message.subject = ( - f"Project {project_name} #{project_id} was transferred to {transferred_to}" - ) - message.message = ( - f"Project {project_name} #{project_id} associated with your" - + f"organisation {project.organisation.name} was transferred to {transferred_to} by {transferred_by}." - ) - values = { - "PROJECT_ORG_NAME": project.organisation.name, - "PROJECT_ORG_ID": project.organisation_id, - "PROJECT_NAME": project_name, - "PROJECT_ID": project_id, - "TRANSFERRED_TO": transferred_to, - "TRANSFERRED_BY": transferred_by, - } - html_template = get_template("project_transfer_alert_en.html", values) - - managers = OrganisationService.get_organisation_by_id_as_dto( - project.organisation_id, User.get_by_username(transferred_by).id, False, db - ) - managers = managers.managers - for manager in managers: - manager = UserService.get_user_by_username(manager.username) - message.to_user_id = manager.id - message.save(db) - if manager.email_address and manager.is_email_verified: - SMTPService._send_message( - manager.email_address, - message.subject, - html_template, - message.message, - ) + async with db_connection.database.connection() as db: + project = await Project.get(project_id, db) + project_name = await project.get_project_title( + db, project.id, project.default_locale + ) + from_user = await User.get_by_username(transferred_by, db) + organisation = await OrganisationService.get_organisation_by_id_as_dto( + project.organisation_id, from_user.id, False, db + ) + message = Message() + message.message_type = MessageType.SYSTEM.value + message.date = timestamp() + message.read = False + message.subject = f"Project {project_name} #{project_id} was transferred to {transferred_to}" + message.message = ( + f"Project {project_name} #{project_id} associated with your" + + f"organisation {organisation.name} was transferred to {transferred_to} by {transferred_by}." + ) + values = { + "PROJECT_ORG_NAME": organisation.name, + "PROJECT_ORG_ID": project.organisation_id, + "PROJECT_NAME": project_name, + "PROJECT_ID": project_id, + "TRANSFERRED_TO": transferred_to, + "TRANSFERRED_BY": transferred_by, + } + html_template = get_template("project_transfer_alert_en.html", values) + managers = organisation.managers + for manager in managers: + manager = await UserService.get_user_by_username(manager.username, db) + message.to_user_id = manager.id + await message.save(db) + if manager.email_address and manager.is_email_verified: + await SMTPService._send_message( + manager.email_address, + message.subject, + html_template, + message.message, + ) @staticmethod def get_user_link(username: str): @@ -554,9 +554,8 @@ async def send_message_after_chat( chat: str, project_id: int, project_name: str, - database: Database, ): - async with database.connection() as db: + async with db_connection.database.connection() as db: usernames = await MessageService._parse_message_for_username( message=chat, project_id=project_id, db=db ) diff --git a/backend/services/messaging/smtp_service.py b/backend/services/messaging/smtp_service.py index 7e94b2f7e7..f844a5ac69 100644 --- a/backend/services/messaging/smtp_service.py +++ b/backend/services/messaging/smtp_service.py @@ -3,9 +3,10 @@ from fastapi_mail import MessageSchema, MessageType from itsdangerous import URLSafeTimedSerializer from loguru import logger +from databases import Database # from backend import mail, create_app -from backend import create_app, mail +from backend import mail from backend.config import settings from backend.models.postgis.message import Message as PostgisMessage from backend.models.postgis.statuses import EncouragingEmailType @@ -69,62 +70,59 @@ async def send_email_to_contributors_on_project_progress( project_id: int = None, project_name: str = None, project_completion: int = None, + db: Database = None, ): """Sends an encouraging email to a users when a project they have contributed to make progress""" from backend.services.users.user_service import UserService - app = ( - create_app() - ) # Because message-all run on background thread it needs it's own app context - with app.app_context(): - if email_type == EncouragingEmailType.PROJECT_PROGRESS.value: - subject = "The project you have contributed to has made progress." - elif email_type == EncouragingEmailType.PROJECT_COMPLETE.value: - subject = "The project you have contributed to has been completed." - values = { - "EMAIL_TYPE": email_type, - "PROJECT_ID": project_id, - "PROJECT_NAME": project_name, - "PROJECT_COMPLETION": project_completion, - } - contributor_ids = PostgisMessage.get_all_contributors(project_id) - for contributor_id in contributor_ids: - contributor = UserService.get_user_by_id(contributor_id[0]) - values["USERNAME"] = contributor.username - if email_type == EncouragingEmailType.BEEN_SOME_TIME.value: - recommended_projects = UserService.get_recommended_projects( - contributor.username, "en" - ).results - projects = [] - for recommended_project in recommended_projects[:4]: - projects.append( - { - "org_logo": recommended_project.organisation_logo, - "priority": recommended_project.priority, - "name": recommended_project.name, - "id": recommended_project.project_id, - "description": recommended_project.short_description, - "total_contributors": recommended_project.total_contributors, - "difficulty": recommended_project.difficulty, - "progress": recommended_project.percent_mapped, - "due_date": recommended_project.due_date, - } - ) - - values["PROJECTS"] = projects - html_template = get_template("encourage_mapper_en.html", values) - if ( - contributor.email_address - and contributor.is_email_verified - and contributor.projects_notifications - ): - logger.debug( - f"Sending {email_type} email to {contributor.email_address} for project {project_id}" - ) - await SMTPService._send_message( - contributor.email_address, subject, html_template + if email_type == EncouragingEmailType.PROJECT_PROGRESS.value: + subject = "The project you have contributed to has made progress." + elif email_type == EncouragingEmailType.PROJECT_COMPLETE.value: + subject = "The project you have contributed to has been completed." + values = { + "EMAIL_TYPE": email_type, + "PROJECT_ID": project_id, + "PROJECT_NAME": project_name, + "PROJECT_COMPLETION": project_completion, + } + contributor_ids = await PostgisMessage.get_all_contributors(project_id, db) + for contributor_id in contributor_ids: + contributor = await UserService.get_user_by_id(contributor_id, db) + values["USERNAME"] = contributor.username + if email_type == EncouragingEmailType.BEEN_SOME_TIME.value: + recommended_projects = await UserService.get_recommended_projects( + contributor.username, "en", db + ).results + projects = [] + for recommended_project in recommended_projects[:4]: + projects.append( + { + "org_logo": recommended_project.organisation_logo, + "priority": recommended_project.priority, + "name": recommended_project.name, + "id": recommended_project.project_id, + "description": recommended_project.short_description, + "total_contributors": recommended_project.total_contributors, + "difficulty": recommended_project.difficulty, + "progress": recommended_project.percent_mapped, + "due_date": recommended_project.due_date, + } ) + values["PROJECTS"] = projects + html_template = get_template("encourage_mapper_en.html", values) + if ( + contributor.email_address + and contributor.is_email_verified + and contributor.projects_notifications + ): + logger.debug( + f"Sending {email_type} email to {contributor.email_address} for project {project_id}" + ) + await SMTPService._send_message( + contributor.email_address, subject, html_template + ) + @staticmethod async def send_email_alert( to_address: str, diff --git a/backend/services/project_admin_service.py b/backend/services/project_admin_service.py index d74489eabb..fe7adb30a3 100644 --- a/backend/services/project_admin_service.py +++ b/backend/services/project_admin_service.py @@ -2,6 +2,7 @@ import geojson from databases import Database +from fastapi import BackgroundTasks from loguru import logger from backend.config import settings @@ -19,6 +20,7 @@ from backend.models.postgis.utils import InvalidData, InvalidGeoJson from backend.services.grid.grid_service import GridService from backend.services.license_service import LicenseService +from backend.services.messaging.message_service import MessageService from backend.services.organisation_service import OrganisationService from backend.services.team_service import TeamService from backend.services.users.user_service import UserService @@ -330,61 +332,54 @@ async def transfer_project_to( transfering_user_id: int, username: str, db: Database, - # background_tasks: BackgroundTasks, + background_tasks: BackgroundTasks, ): """Transfers project from old owner (transfering_user_id) to new owner (username)""" - project = await Project.get(project_id, db) - new_owner = await UserService.get_user_by_username(username, db) - author_id = project.author_id - if not author_id: - raise ProjectAdminServiceError( - "TransferPermissionError- User does not have permissions to transfer project" - ) - author = await User.get_by_id(author_id, db) - if username == author.username: - return + async with db.transaction(): + project = await Project.get(project_id, db) + new_owner = await UserService.get_user_by_username(username, db) + author_id = project.author_id + if not author_id: + raise ProjectAdminServiceError( + "TransferPermissionError- User does not have permissions to transfer project" + ) + author = await User.get_by_id(author_id, db) + if username == author.username: + return - is_admin = await UserService.is_user_an_admin(transfering_user_id, db) + is_admin = await UserService.is_user_an_admin(transfering_user_id, db) - is_author = UserService.is_user_the_project_author( - transfering_user_id, project.author_id - ) - is_org_manager = await OrganisationService.is_user_an_org_manager( - project.organisation_id, transfering_user_id, db - ) - if not (is_admin or is_author or is_org_manager): - raise ProjectAdminServiceError( - "TransferPermissionError- User does not have permissions to transfer project" + is_author = UserService.is_user_the_project_author( + transfering_user_id, project.author_id + ) + is_org_manager = await OrganisationService.is_user_an_org_manager( + project.organisation_id, transfering_user_id, db ) + if not (is_admin or is_author or is_org_manager): + raise ProjectAdminServiceError( + "TransferPermissionError- User does not have permissions to transfer project" + ) - is_new_owner_org_manager = await OrganisationService.is_user_an_org_manager( - project.organisation_id, new_owner.id, db - ) - is_new_owner_admin = await UserService.is_user_an_admin(new_owner.id, db) - if not (is_new_owner_org_manager or is_new_owner_admin): - error_message = ( - "InvalidNewOwner- New owner must be project's org manager or TM admin" + is_new_owner_org_manager = await OrganisationService.is_user_an_org_manager( + project.organisation_id, new_owner.id, db + ) + is_new_owner_admin = await UserService.is_user_an_admin(new_owner.id, db) + if not (is_new_owner_org_manager or is_new_owner_admin): + error_message = "InvalidNewOwner- New owner must be project's org manager or TM admin" + logger.debug(error_message) + raise ValueError(error_message) + else: + transferred_by = await User.get_by_id(transfering_user_id, db) + transferred_by = transferred_by.username + project.author_id = new_owner.id + await Project.update_project_author(project_id, new_owner.id, db) + + background_tasks.add_task( + MessageService.send_project_transfer_message, + project_id, + username, + transferred_by, ) - logger.debug(error_message) - raise ValueError(error_message) - else: - transferred_by = await User.get_by_id(transfering_user_id, db) - transferred_by = transferred_by.username - project.author_id = new_owner.id - await Project.update_project_author(project_id, new_owner.id, db) - # TODO - # Adding the background task - # background_tasks.add_task( - # await MessageService.send_project_transfer_message, - # project_id, - # username, - # transferred_by, - # db - # ) - # threading.Thread( - # target=MessageService.send_project_transfer_message, - # args=(project_id, username, transferred_by, db), - # ).start() @staticmethod async def is_user_action_permitted_on_project( diff --git a/backend/services/project_service.py b/backend/services/project_service.py index dc75524016..421945432e 100644 --- a/backend/services/project_service.py +++ b/backend/services/project_service.py @@ -1,4 +1,4 @@ -import threading +import json from datetime import datetime, timedelta, timezone import geojson @@ -9,7 +9,7 @@ # # from flask import current_app from backend.config import get_settings -from backend.db import get_session +from backend.db import db_connection from backend.exceptions import NotFound from backend.models.dtos.mapping_dto import TaskDTOs from backend.models.dtos.project_dto import ( @@ -39,10 +39,6 @@ from backend.services.team_service import TeamService from backend.services.users.user_service import UserService -session = get_session() - -import json - summary_cache = TTLCache(maxsize=1024, ttl=600) @@ -524,7 +520,6 @@ async def is_user_permitted_to_validate( return True, "User allowed to validate" - # TODO: Implement Caching. @staticmethod @cached(summary_cache) def get_cached_project_summary( @@ -711,58 +706,54 @@ def get_project_organisation(project_id: int) -> Organisation: return project.organisation @staticmethod - async def send_email_on_project_progress(project_id: int, db: Database): + async def send_email_on_project_progress(project_id: int): """Send email to all contributors on project progress""" - current_settings = get_settings() - if not current_settings.SEND_PROJECT_EMAIL_UPDATES: - return - project = await ProjectService.get_project_by_id(project_id, db) - - project_completion = Project.calculate_tasks_percent( - "project_completion", - project.tasks_mapped, - project.tasks_validated, - project.total_tasks, - project.tasks_bad_imagery, - ) - - if project_completion == 50 and project.progress_email_sent: - return # Don't send progress email if it's already sent - if project_completion in [50, 100]: - email_type = ( - EncouragingEmailType.PROJECT_COMPLETE.value - if project_completion == 100 - else EncouragingEmailType.PROJECT_PROGRESS.value - ) - project_title_query = """ - SELECT name - FROM project_info - WHERE project_id = :project_id AND locale = :locale - """ - project_title = await db.fetch_val( - project_title_query, - values={"project_id": project_id, "locale": project["default_locale"]}, + async with db_connection.database.connection() as db: + current_settings = get_settings() + if not current_settings.SEND_PROJECT_EMAIL_UPDATES: + return + project = await ProjectService.get_project_by_id(project_id, db) + + project_completion = Project.calculate_tasks_percent( + "project_completion", + project.tasks_mapped, + project.tasks_validated, + project.total_tasks, + project.tasks_bad_imagery, ) - - # Update progress_email_sent status - await db.execute( + if project_completion == 50 and project.progress_email_sent: + return # Don't send progress email if it's already sent + if project_completion in [50, 100]: + email_type = ( + EncouragingEmailType.PROJECT_COMPLETE.value + if project_completion == 100 + else EncouragingEmailType.PROJECT_PROGRESS.value + ) + project_title_query = """ + SELECT name + FROM project_info + WHERE project_id = :project_id AND locale = :locale """ - UPDATE projects - SET progress_email_sent = TRUE - WHERE id = :project_id - """, - values={"project_id": project_id}, - ) + project_title = await db.fetch_val( + project_title_query, + values={ + "project_id": project_id, + "locale": project["default_locale"], + }, + ) - threading.Thread( - target=SMTPService.send_email_to_contributors_on_project_progress, - args=( - email_type, - project_id, - project_title, - project_completion, - ), - ).start() + # Update progress_email_sent status + await db.execute( + """ + UPDATE projects + SET progress_email_sent = TRUE + WHERE id = :project_id + """, + values={"project_id": project_id}, + ) + await SMTPService.send_email_to_contributors_on_project_progress( + email_type, project_id, project_title, project_completion, db + ) @staticmethod async def get_active_projects(interval: int, db: Database): diff --git a/backend/services/team_service.py b/backend/services/team_service.py index a93c698828..7ee5985480 100644 --- a/backend/services/team_service.py +++ b/backend/services/team_service.py @@ -28,6 +28,7 @@ from backend.services.messaging.message_service import MessageService from backend.services.organisation_service import OrganisationService from backend.services.users.user_service import UserService +from backend.db import db_connection class TeamServiceError(Exception): @@ -789,10 +790,9 @@ async def send_message_to_all_team_members( team_name: str, message_dto: MessageDTO, user_id: int, - database: Database, ): try: - async with database.connection() as conn: + async with db_connection.database.connection() as conn: team_members = await TeamService._get_active_team_members(team_id, conn) user = await UserService.get_user_by_id(user_id, conn) sender = user.username diff --git a/backend/services/users/authentication_service.py b/backend/services/users/authentication_service.py index 1bb77772d7..a4e7e5937f 100644 --- a/backend/services/users/authentication_service.py +++ b/backend/services/users/authentication_service.py @@ -87,16 +87,8 @@ async def authenticate(self, conn): decoded_token, 604800 ) if not valid_token: - logger.debug("Token not valid...") + logger.debug("Token not valid.") return - # raise HTTPException( - # status_code=401, - # detail={ - # "Error": "Token is expired or invalid", - # "SubCode": "InvalidToken", - # }, - # headers={"WWW-Authenticate": "Bearer"}, - # ) tm.authenticated_user_id = user_id return AuthCredentials(["authenticated"]), SimpleUser(user_id) @@ -287,3 +279,29 @@ async def login_required_optional( logger.debug("Token not valid") return None return AuthUserDTO(id=user_id) + + +async def pm_only( + Authorization: str = Security(APIKeyHeader(name="Authorization")), +): + if not Authorization: + raise HTTPException(status_code=401, detail="Authorization header missing") + try: + scheme, credentials = Authorization.split() + if scheme.lower() != "token": + raise HTTPException(status_code=401, detail="Invalid authentication scheme") + try: + decoded_token = base64.b64decode(credentials).decode("ascii") + except UnicodeDecodeError: + logger.debug("Unable to decode token") + raise HTTPException(status_code=401, detail="Invalid token") + except (ValueError, UnicodeDecodeError, binascii.Error): + raise AuthenticationError("Invalid auth credentials") + valid_token, user_id = AuthenticationService.is_valid_token(decoded_token, 604800) + if not valid_token: + logger.debug("Token not valid") + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail={"Error": "Token is expired or invalid", "SubCode": "InvalidToken"}, + headers={"WWW-Authenticate": "Bearer"}, + ) diff --git a/backend/services/validator_service.py b/backend/services/validator_service.py index d97c1b047e..ef474270b4 100644 --- a/backend/services/validator_service.py +++ b/backend/services/validator_service.py @@ -1,10 +1,12 @@ -# from flask import current_app +import asyncio import datetime from databases import Database +from fastapi import BackgroundTasks from loguru import logger from sqlalchemy import text +from backend.db import db_connection from backend.exceptions import NotFound from backend.models.dtos.mapping_dto import TaskDTOs from backend.models.dtos.stats_dto import Pagination @@ -152,9 +154,92 @@ async def _user_can_validate_task( return True return False + async def process_task(project_id, task_to_unlock, validated_dto): + async with db_connection.database.connection() as db: + async with db.transaction(): + task = task_to_unlock["task"] + if task_to_unlock["comment"]: + await MessageService.send_message_after_comment( + validated_dto.user_id, + task_to_unlock["comment"], + task.id, + validated_dto.project_id, + db, + ) + if ( + task_to_unlock["new_state"] == TaskStatus.VALIDATED + or task_to_unlock["new_state"] == TaskStatus.INVALIDATED + ): + await MessageService.send_message_after_validation( + task_to_unlock["new_state"], + validated_dto.user_id, + task.mapped_by, + task.id, + validated_dto.project_id, + db, + ) + + # Set last_validation_date for the mapper to current date + if task_to_unlock["new_state"] == TaskStatus.VALIDATED: + query = """ + UPDATE users + SET last_validation_date = :timestamp + WHERE id = ( + SELECT mapped_by + FROM tasks + WHERE id = :task_id + AND project_id = :project_id + ); + """ + values = { + "timestamp": datetime.datetime.utcnow(), + "task_id": task.id, + "project_id": validated_dto.project_id, + } + await db.execute(query=query, values=values) + + # Update stats if user setting task to a different state from previous state + prev_status = await TaskHistory.get_last_status(project_id, task.id, db) + if prev_status != task_to_unlock["new_state"]: + await StatsService.update_stats_after_task_state_change( + validated_dto.project_id, + validated_dto.user_id, + prev_status, + task_to_unlock["new_state"], + db, + ) + task_mapping_issues = await ValidatorService.get_task_mapping_issues( + task_to_unlock + ) + await Task.unlock_task( + task_id=task.id, + project_id=project_id, + user_id=validated_dto.user_id, + new_state=task_to_unlock["new_state"], + db=db, + comment=task_to_unlock["comment"], + issues=task_mapping_issues, + ) + + return await Task.as_dto_with_instructions( + task.id, project_id, db, validated_dto.preferred_locale + ) + + async def process_tasks_concurrently(project_id, tasks_to_unlock, validated_dto): + """ + Process tasks concurrently and ensure each task gets its own DB connection. + """ + tasks = [ + ValidatorService.process_task(project_id, task_to_unlock, validated_dto) + for task_to_unlock in tasks_to_unlock + ] + return await asyncio.gather(*tasks) + @staticmethod async def unlock_tasks_after_validation( - validated_dto: UnlockAfterValidationDTO, db: Database + validated_dto: UnlockAfterValidationDTO, + db: Database, + background_tasks: BackgroundTasks, ) -> TaskDTOs: """ Unlocks supplied tasks after validation @@ -166,88 +251,15 @@ async def unlock_tasks_after_validation( tasks_to_unlock = await ValidatorService.get_tasks_locked_by_user( project_id, validated_tasks, user_id, db ) - # Unlock all tasks - dtos = [] - message_sent_to = [] - for task_to_unlock in tasks_to_unlock: - task = task_to_unlock["task"] - if task_to_unlock["comment"]: - # Parses comment to see if any users have been @'d - await MessageService.send_message_after_comment( - validated_dto.user_id, - task_to_unlock["comment"], - task.id, - validated_dto.project_id, - db, - ) - if ( - task_to_unlock["new_state"] == TaskStatus.VALIDATED - or task_to_unlock["new_state"] == TaskStatus.INVALIDATED - ): - # All mappers get a notification if their task has been validated or invalidated. - # Only once if multiple tasks mapped - if task.mapped_by not in message_sent_to: - await MessageService.send_message_after_validation( - task_to_unlock["new_state"], - validated_dto.user_id, - task.mapped_by, - task.id, - validated_dto.project_id, - db, - ) - message_sent_to.append(task.mapped_by) - - # Set last_validation_date for the mapper to current date - if task_to_unlock["new_state"] == TaskStatus.VALIDATED: - query = """ - UPDATE users - SET last_validation_date = :timestamp - WHERE id = ( - SELECT mapped_by - FROM tasks - WHERE id = :task_id - AND project_id = :project_id - ); - """ - values = { - "timestamp": datetime.datetime.utcnow(), - "task_id": task.id, - "project_id": validated_dto.project_id, - } - await db.execute(query=query, values=values) - - # Update stats if user setting task to a different state from previous state - prev_status = await TaskHistory.get_last_status(project_id, task.id, db) - if prev_status != task_to_unlock["new_state"]: - await StatsService.update_stats_after_task_state_change( - validated_dto.project_id, - validated_dto.user_id, - prev_status, - task_to_unlock["new_state"], - db, - ) - task_mapping_issues = await ValidatorService.get_task_mapping_issues( - task_to_unlock - ) - await Task.unlock_task( - task_id=task.id, - project_id=project_id, - user_id=validated_dto.user_id, - new_state=task_to_unlock["new_state"], - db=db, - comment=task_to_unlock["comment"], - issues=task_mapping_issues, - ) - dtos.append( - await Task.as_dto_with_instructions( - task.id, project_id, db, validated_dto.preferred_locale - ) - ) - await ProjectService.send_email_on_project_progress( - validated_dto.project_id, db + results = await ValidatorService.process_tasks_concurrently( + project_id, tasks_to_unlock, validated_dto + ) + background_tasks.add_task( + ProjectService.send_email_on_project_progress, + validated_dto.project_id, ) task_dtos = TaskDTOs() - task_dtos.tasks = dtos + task_dtos.tasks = results return task_dtos @staticmethod