From 6e1d8646e03230d21554e88eb04401b8e2f377d5 Mon Sep 17 00:00:00 2001 From: amirul-wd <155528891+amirul-wd@users.noreply.github.com> Date: Sun, 3 Mar 2024 17:15:09 +0600 Subject: [PATCH] added celery task to send email initation --- README.md | 6 +++++- app/api/invitation.py | 22 +++++++++++-------- app/requirements.txt | 6 +++++- app/services/oauth2.py | 33 ++++++++++++++-------------- app/worker/__init__.py | 1 + app/worker/celery.py | 18 ++++++++++++++++ docker-compose.yml | 48 +++++++++++++++++++++++++++++++++++++++++ main.py | 2 -- scheduler/invitation.py | 19 ---------------- 9 files changed, 107 insertions(+), 48 deletions(-) create mode 100644 app/worker/__init__.py create mode 100644 app/worker/celery.py delete mode 100644 scheduler/invitation.py diff --git a/README.md b/README.md index ed1577b..0cb8c92 100644 --- a/README.md +++ b/README.md @@ -68,7 +68,7 @@ Now, the project will be running on `http://localhost:8000` ### Pgadmin4 -`http://localhost:5050` +http://localhost:5050 ## Testing @@ -78,6 +78,10 @@ To run tests using Pytest, execute the following command within the virtual envi $ make test ``` +## Flower + +Navigate to http://localhost:5556 to view the dashboard. You should see one worker ready to go. + ## Documentation ``` diff --git a/app/api/invitation.py b/app/api/invitation.py index 2ecafdb..4f00427 100644 --- a/app/api/invitation.py +++ b/app/api/invitation.py @@ -14,6 +14,7 @@ from core.config import settings from app.models import User from core.logger import logger +from app.worker.celery import send_email_task invitation_router = APIRouter(prefix='/invitation', tags=['Invitation']) @@ -89,6 +90,7 @@ async def accept_invitation( @invitation_router.get("/resend/{email}") async def resend_invitation( + background_tasks: BackgroundTasks, email: str, db: Session = Depends(get_db), current_user = Depends(get_current_user) @@ -113,15 +115,17 @@ async def resend_invitation( invitation.resent_count += 1 _ = invitation_crud.update(db=db, obj_in=invitation) - await send_email_async( - subject=f'Invitation to Join {invitation.organization}', - email_to=invitation.email, - body={ - "full_name": invitation.full_name, - "email": invitation.email, - "organization": invitation.organization, - "created_by_name": created_by.full_name, - "invitation_url": f"{settings.BASE_URL}/accept-invitation/{unique_token}"} + # Inside your FastAPI route + send_email_task.delay( + subject=f'Invitation to Join {invitation.organization}', + email_to=invitation.email, + body={ + "full_name": invitation.full_name, + "email": invitation.email, + "organization": invitation.organization, + "created_by_name": created_by.full_name, + "invitation_url": f"{settings.BASE_URL}/accept-invitation/{unique_token}" + } ) return {'message': 'Invitation resent successfully'} diff --git a/app/requirements.txt b/app/requirements.txt index d832a39..d25922f 100644 --- a/app/requirements.txt +++ b/app/requirements.txt @@ -46,4 +46,8 @@ databases[postgresql] faker factory-boy pytest-asyncio -pytest-mock \ No newline at end of file +pytest-mock +aiofiles +celery +redis +flower \ No newline at end of file diff --git a/app/services/oauth2.py b/app/services/oauth2.py index a126d96..f91904b 100644 --- a/app/services/oauth2.py +++ b/app/services/oauth2.py @@ -7,12 +7,12 @@ from app.db.base import engine from app.db.base import get_db -import casbin -import casbin_sqlalchemy_adapter +# import casbin +# import casbin_sqlalchemy_adapter oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/oauth-login") -adapter = casbin_sqlalchemy_adapter.Adapter(engine) +# adapter = casbin_sqlalchemy_adapter.Adapter(engine) def get_current_user( @@ -33,22 +33,23 @@ def get_current_user_authorization( db: Session = Depends(get_db), current_user: schemas.TokenData = Depends(get_current_user), ): - enforcer = casbin.Enforcer("core/model.conf", adapter) - sub = current_user.email - dom = current_user.organization_name - obj = req.url.path - act = req.method - - if not (enforcer.enforce(sub, dom, obj, act)): - raise HTTPException( - status_code=status.HTTP_401_UNAUTHORIZED, - detail="Method not authorized for this user") + # enforcer = casbin.Enforcer("core/model.conf", adapter) + # sub = current_user.email + # dom = current_user.organization_name + # obj = req.url.path + # act = req.method + + # if not (enforcer.enforce(sub, dom, obj, act)): + # raise HTTPException( + # status_code=status.HTTP_401_UNAUTHORIZED, + # detail="Method not authorized for this user") return current_user def add_new_role_in_org( email: str, role: str, dom: str, db: Session = Depends(get_db) ) -> None: - enforcer = casbin.Enforcer("core/model.conf", adapter) - enforcer.add_role_for_user_in_domain(email, role, dom) - db.commit() # Ensure changes are persisted + # enforcer = casbin.Enforcer("core/model.conf", adapter) + # enforcer.add_role_for_user_in_domain(email, role, dom) + # db.commit() # Ensure changes are persisted + pass diff --git a/app/worker/__init__.py b/app/worker/__init__.py new file mode 100644 index 0000000..7f9ccce --- /dev/null +++ b/app/worker/__init__.py @@ -0,0 +1 @@ +from app.worker.celery import * diff --git a/app/worker/celery.py b/app/worker/celery.py new file mode 100644 index 0000000..7806272 --- /dev/null +++ b/app/worker/celery.py @@ -0,0 +1,18 @@ +import os +import asyncio + +from celery import Celery + +from app.services.mail import send_email_async + + +celery = Celery(__name__) +celery.conf.broker_url = os.environ.get("CELERY_BROKER_URL", "redis://localhost:6379") +celery.conf.result_backend = os.environ.get("CELERY_RESULT_BACKEND", "redis://localhost:6379") + + +@celery.task(name="send_email_task") +def send_email_task(subject, email_to, body): + loop = asyncio.get_event_loop() + loop.run_until_complete(send_email_async(subject, email_to, body)) + return True diff --git a/docker-compose.yml b/docker-compose.yml index c8eaf3f..2af91c8 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -7,6 +7,7 @@ services: depends_on: - db - test-db + - redis environment: - DATABASE_URL=postgresql://postgres:postgres@db/db - TEST_DATABASE_URL=postgresql://postgres:postgres@test-db/test-db @@ -27,6 +28,8 @@ services: - INVITATION_URL_SECRET_KEY=INVITATION_URL_SECRET_KEY_HERE - INVITATION_URL_SECURITY_PASSWORD_SALT=INVITATION_URL_SECURITY_PASSWORD_SALT_HERE - INVITATION_URL_MAX_AGE=172800 + - CELERY_BROKER_URL=redis://redis:6379/0 + - CELERY_RESULT_BACKEND=redis://redis:6379/0 build: context: . dockerfile: Dockerfile @@ -58,6 +61,51 @@ services: - POSTGRES_USER=postgres - POSTGRES_PASSWORD=postgres - POSTGRES_DB=test-db + + worker: + build: . + command: celery -A app.worker.celery worker --loglevel=info + environment: + - DATABASE_URL=postgresql://postgres:postgres@db/db + - TEST_DATABASE_URL=postgresql://postgres:postgres@test-db/test-db + - BASE_URL=http://localhost:8000 + - APP_ENV=development + - JWT_REFRESH_SECRET_KEY=JWT_REFRESH_SECRET_KEY_HERE + - JWT_SECRET_KEY=WT_SECRET_KEY_HERE + - ROLLBAR_ACCESS_TOKEN=ROLLBAR_ACCESS_TOKEN_HERE + - BUCKET_NAME=BUCKET_NAME + - OAUTH_CLIENT_ID=CLIENT_IDw + - OAUTH_CLIENT_SECRET=OAUTH_CLIENT_SECRET + - MAIL_USERNAME=norval.hahn60@ethereal.email + - MAIL_PASSWORD=NupfGHXtsUdgx36VuK + - MAIL_FROM=norval.hahn60@ethereal.email + - MAIL_PORT=587 + - MAIL_SERVER=smtp.ethereal.email + - MAIL_FROM_NAME=Norval Hahn + - INVITATION_URL_SECRET_KEY=INVITATION_URL_SECRET_KEY_HERE + - INVITATION_URL_SECURITY_PASSWORD_SALT=INVITATION_URL_SECURITY_PASSWORD_SALT_HERE + - INVITATION_URL_MAX_AGE=172800 + - CELERY_BROKER_URL=redis://redis:6379/0 + - CELERY_RESULT_BACKEND=redis://redis:6379/0 + depends_on: + - web + - redis + + redis: + image: redis:7 + + dashboard: + build: . + command: celery --broker=redis://redis:6379/0 flower --port=5555 + ports: + - 5556:5555 + environment: + - CELERY_BROKER_URL=redis://redis:6379/0 + - CELERY_RESULT_BACKEND=redis://redis:6379/0 + depends_on: + - web + - redis + - worker volumes: db: diff --git a/main.py b/main.py index f4e4e9f..5b64750 100644 --- a/main.py +++ b/main.py @@ -7,7 +7,6 @@ from app.db.base import engine from app.db.base_class import Base -from scheduler.invitation import invitation_scheduler from app.routes import router as api_router @@ -38,5 +37,4 @@ async def lifespan(app: FastAPI): app.include_router(api_router) if __name__ == "__main__": - invitation_scheduler.start() uvicorn.run("main:app", host="0.0.0.0", port=8000) diff --git a/scheduler/invitation.py b/scheduler/invitation.py deleted file mode 100644 index aa2b8f8..0000000 --- a/scheduler/invitation.py +++ /dev/null @@ -1,19 +0,0 @@ -from apscheduler.schedulers.background import BackgroundScheduler -from datetime import datetime -from app.models.invitation import Invitation -from app.db.base import get_db - -invitation_scheduler = BackgroundScheduler() - -def invalidate_expired_invitations(): - db = get_db() - current_time = datetime.now() - expired_invitations = db.query(Invitation).filter(Invitation.expires_at < current_time).all() - - for invitation in expired_invitations: - db.delete(invitation) - - db.commit() - -invitation_scheduler.add_job(invalidate_expired_invitations, 'interval', minutes=60) # Run every hour -invitation_scheduler.start()