Skip to content

Commit

Permalink
* Concurrent task validation using asyncio gather
Browse files Browse the repository at this point in the history
* messaging tasks added to background.
* db context added in tasks and removed explicit connection.
* send message on project progress.
* project transfer messaging.
  • Loading branch information
prabinoid committed Dec 4, 2024
1 parent e313c15 commit 4775217
Show file tree
Hide file tree
Showing 17 changed files with 331 additions and 322 deletions.
15 changes: 6 additions & 9 deletions backend/api/projects/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from shapely import GEOSException
from shapely.errors import TopologicalError

from backend.db import db_connection, get_db
from backend.db import get_db
from backend.models.dtos.grid_dto import GridDTO
from backend.models.dtos.message_dto import MessageDTO
from backend.models.dtos.user_dto import AuthUserDTO
Expand All @@ -30,6 +30,7 @@
@router.post("/{project_id}/actions/transfer-ownership/")
async def post(
request: Request,
background_tasks: BackgroundTasks,
project_id: int,
user: AuthUserDTO = Depends(login_required),
db: Database = Depends(get_db),
Expand Down Expand Up @@ -81,13 +82,10 @@ async def post(
status_code=400,
)
try:
async with db.transaction():
await ProjectAdminService.transfer_project_to(
project_id, user.id, username, db
)
return JSONResponse(
content={"Success": "Project Transferred"}, status_code=200
)
await ProjectAdminService.transfer_project_to(
project_id, user.id, username, db, background_tasks
)
return JSONResponse(content={"Success": "Project Transferred"}, status_code=200)
except (ValueError, ProjectAdminServiceError) as e:
return JSONResponse(
content={"Error": str(e).split("-")[1], "SubCode": str(e).split("-")[0]},
Expand Down Expand Up @@ -175,7 +173,6 @@ async def post(
MessageService.send_message_to_all_contributors,
project_id,
message_dto,
db_connection.database,
)
return JSONResponse(content={"Success": "Messages started"}, status_code=200)
except Exception as e:
Expand Down
2 changes: 0 additions & 2 deletions backend/api/projects/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,8 +255,6 @@ async def post(
)


# @router.head("/{project_id}", response_model=ProjectDTO)
# @requires('authenticated')
def head(request: Request, project_id):
"""
Retrieves a Tasking-Manager project
Expand Down
17 changes: 10 additions & 7 deletions backend/api/tasks/actions.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from databases import Database
from fastapi import APIRouter, Depends, Query, Request
from fastapi import APIRouter, Depends, Query, Request, BackgroundTasks
from fastapi.responses import JSONResponse
from loguru import logger

Expand Down Expand Up @@ -228,6 +228,7 @@ async def post(
request: Request,
project_id: int,
task_id: int,
background_tasks: BackgroundTasks,
db: Database = Depends(get_db),
user: AuthUserDTO = Depends(login_required),
):
Expand Down Expand Up @@ -312,7 +313,9 @@ async def post(
try:
await ProjectService.exists(project_id, db)
async with db.transaction():
task = await MappingService.unlock_task_after_mapping(mapped_task, db)
task = await MappingService.unlock_task_after_mapping(
mapped_task, db, background_tasks
)
return task

except MappingServiceError as e:
Expand Down Expand Up @@ -591,6 +594,7 @@ async def post(
async def post(
request: Request,
project_id: int,
background_tasks: BackgroundTasks,
db: Database = Depends(get_db),
user: AuthUserDTO = Depends(login_required),
):
Expand Down Expand Up @@ -662,11 +666,10 @@ async def post(
)
try:
await ProjectService.exists(project_id, db)
async with db.transaction():
tasks = await ValidatorService.unlock_tasks_after_validation(
validated_dto, db
)
return tasks
tasks = await ValidatorService.unlock_tasks_after_validation(
validated_dto, db, background_tasks
)
return tasks
except ValidatorServiceError as e:
return JSONResponse(
content={"Error": str(e).split("-")[1], "SubCode": str(e).split("-")[0]},
Expand Down
3 changes: 1 addition & 2 deletions backend/api/teams/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from fastapi.responses import JSONResponse
from loguru import logger

from backend.db import db_connection, get_db
from backend.db import get_db
from backend.models.dtos.message_dto import MessageDTO
from backend.models.dtos.user_dto import AuthUserDTO
from backend.models.postgis.user import User
Expand Down Expand Up @@ -403,7 +403,6 @@ async def post(
team.name,
message_dto,
user.id,
db_connection.database,
)
return JSONResponse(
content={"Success": "Message sent successfully"}, status_code=200
Expand Down
1 change: 0 additions & 1 deletion backend/db.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from databases import Database
from sqlalchemy import create_engine
from sqlalchemy.orm import declarative_base, sessionmaker

from backend.config import settings

Base = declarative_base()
Expand Down
2 changes: 1 addition & 1 deletion backend/models/dtos/project_dto.py
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,7 @@ class ListSearchResultDTO(BaseModel):
last_updated: Optional[datetime] = Field(alias="lastUpdated", default=None)
due_date: Optional[datetime] = Field(alias="dueDate", default=None)
total_contributors: Optional[int] = Field(alias="totalContributors", default=None)
country: Optional[str] = Field(default="", serialize=False)
country: Optional[List[str]] = Field(default=None)

class Config:
populate_by_name = True
Expand Down
3 changes: 1 addition & 2 deletions backend/models/dtos/user_dto.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@ class UserDTO(BaseModel):
mapping_level: Optional[str] = Field(None, alias="mappingLevel")
projects_mapped: Optional[int] = Field(None, alias="projectsMapped")
email_address: Optional[str] = Field(None, alias="emailAddress")

is_email_verified: Optional[str] = Field(
is_email_verified: Optional[bool] = Field(
None, alias="isEmailVerified", serialize_when_none=False
)
is_expert: bool = Field(None, alias="isExpert", serialize_when_none=False)
Expand Down
11 changes: 8 additions & 3 deletions backend/services/mapping_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import xml.etree.ElementTree as ET

from databases import Database
from fastapi import BackgroundTasks

# from flask import current_app
from geoalchemy2 import WKBElement
Expand Down Expand Up @@ -140,7 +141,9 @@ async def lock_task_for_mapping(

@staticmethod
async def unlock_task_after_mapping(
mapped_task: MappedTaskDTO, db: Database
mapped_task: MappedTaskDTO,
db: Database,
background_tasks: BackgroundTasks,
) -> TaskDTO:
"""Unlocks the task and sets the task history appropriately"""
# Fetch the task locked by the user
Expand Down Expand Up @@ -172,6 +175,7 @@ async def unlock_task_after_mapping(
mapped_task.project_id,
db,
)

# Unlock the task and change its state
await Task.unlock_task(
task_id=mapped_task.task_id,
Expand All @@ -182,8 +186,9 @@ async def unlock_task_after_mapping(
comment=mapped_task.comment,
)
# Send email on project progress
# TODO: Verify this email mechanism.
await ProjectService.send_email_on_project_progress(mapped_task.project_id, db)
background_tasks.add_task(
ProjectService.send_email_on_project_progress, mapped_task.project_id
)

return await Task.as_dto_with_instructions(
task_id=mapped_task.task_id,
Expand Down
2 changes: 0 additions & 2 deletions backend/services/mapswipe_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,6 @@ def setup_group_dto(
self, partner_id: str, group_id: str, resp_body: str
) -> GroupedPartnerStatsDTO:
group_stats = json.loads(resp_body)["data"]
print(group_stats)
group_dto = GroupedPartnerStatsDTO(provider="mapswipe")
group_dto.id = partner_id
group_dto.id_inside_provider = group_id
Expand All @@ -156,7 +155,6 @@ def setup_group_dto(

group_dto.members_count = group_stats["userGroup"]["userMemberships"]["count"]
group_dto.members = []
print(group_stats["userGroup"]["userMemberships"]["items"])
for user_resp in group_stats["userGroup"]["userMemberships"]["items"]:
user = UserGroupMemberDTO()
user.id = user_resp["id"]
Expand Down
2 changes: 0 additions & 2 deletions backend/services/messaging/chat_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
from backend.services.project_admin_service import ProjectAdminService
from backend.services.project_service import ProjectService
from backend.services.team_service import TeamService
from backend.db import db_connection


class ChatService:
Expand Down Expand Up @@ -73,7 +72,6 @@ async def post_message(
chat_message.message,
chat_dto.project_id,
project_name,
db_connection.database,
)
return await ProjectChat.get_messages(chat_dto.project_id, db, 1, 5)
else:
Expand Down
87 changes: 43 additions & 44 deletions backend/services/messaging/message_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@
from markdown import markdown
from sqlalchemy import func, insert, text

from backend import db
from backend.config import settings
from backend.db import db_connection
from backend.exceptions import NotFound
from backend.models.dtos.message_dto import MessageDTO, MessagesDTO
from backend.models.dtos.stats_dto import Pagination
Expand Down Expand Up @@ -129,12 +129,12 @@ async def send_message_after_validation(

@staticmethod
async def send_message_to_all_contributors(
project_id: int, message_dto: MessageDTO, database: Database
project_id: int, message_dto: MessageDTO
):
"""Sends supplied message to all contributors on specified project. Message all contributors can take
over a minute to run, so this method is expected to be called on its own thread
"""
async with database.connection() as conn:
async with db_connection.database.connection() as conn:
contributors = await Message.get_all_contributors(project_id, conn)
project = await Project.get(project_id, conn)
project_info = await ProjectInfo.get_dto_for_locale(
Expand Down Expand Up @@ -393,47 +393,47 @@ async def send_project_transfer_message(
project_id: int,
transferred_to: str,
transferred_by: str,
db: Database,
):
"""Will send a message to the manager of the organization after a project is transferred"""
project = await Project.get(project_id, db)
project_name = project.get_project_title(project.default_locale)
message = Message()
message.message_type = MessageType.SYSTEM.value
message.date = timestamp()
message.read = False
message.subject = (
f"Project {project_name} #{project_id} was transferred to {transferred_to}"
)
message.message = (
f"Project {project_name} #{project_id} associated with your"
+ f"organisation {project.organisation.name} was transferred to {transferred_to} by {transferred_by}."
)
values = {
"PROJECT_ORG_NAME": project.organisation.name,
"PROJECT_ORG_ID": project.organisation_id,
"PROJECT_NAME": project_name,
"PROJECT_ID": project_id,
"TRANSFERRED_TO": transferred_to,
"TRANSFERRED_BY": transferred_by,
}
html_template = get_template("project_transfer_alert_en.html", values)

managers = OrganisationService.get_organisation_by_id_as_dto(
project.organisation_id, User.get_by_username(transferred_by).id, False, db
)
managers = managers.managers
for manager in managers:
manager = UserService.get_user_by_username(manager.username)
message.to_user_id = manager.id
message.save(db)
if manager.email_address and manager.is_email_verified:
SMTPService._send_message(
manager.email_address,
message.subject,
html_template,
message.message,
)
async with db_connection.database.connection() as db:
project = await Project.get(project_id, db)
project_name = await project.get_project_title(
db, project.id, project.default_locale
)
from_user = await User.get_by_username(transferred_by, db)
organisation = await OrganisationService.get_organisation_by_id_as_dto(
project.organisation_id, from_user.id, False, db
)
message = Message()
message.message_type = MessageType.SYSTEM.value
message.date = timestamp()
message.read = False
message.subject = f"Project {project_name} #{project_id} was transferred to {transferred_to}"
message.message = (
f"Project {project_name} #{project_id} associated with your"
+ f"organisation {organisation.name} was transferred to {transferred_to} by {transferred_by}."
)
values = {
"PROJECT_ORG_NAME": organisation.name,
"PROJECT_ORG_ID": project.organisation_id,
"PROJECT_NAME": project_name,
"PROJECT_ID": project_id,
"TRANSFERRED_TO": transferred_to,
"TRANSFERRED_BY": transferred_by,
}
html_template = get_template("project_transfer_alert_en.html", values)
managers = organisation.managers
for manager in managers:
manager = await UserService.get_user_by_username(manager.username, db)
message.to_user_id = manager.id
await message.save(db)
if manager.email_address and manager.is_email_verified:
await SMTPService._send_message(
manager.email_address,
message.subject,
html_template,
message.message,
)

@staticmethod
def get_user_link(username: str):
Expand Down Expand Up @@ -554,9 +554,8 @@ async def send_message_after_chat(
chat: str,
project_id: int,
project_name: str,
database: Database,
):
async with database.connection() as db:
async with db_connection.database.connection() as db:
usernames = await MessageService._parse_message_for_username(
message=chat, project_id=project_id, db=db
)
Expand Down
Loading

0 comments on commit 4775217

Please sign in to comment.