forked from danny-avila/rag_api
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmiddleware.py
57 lines (48 loc) · 1.92 KB
/
middleware.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
import os
from fastapi import Request
from datetime import datetime, timezone
from fastapi.responses import JSONResponse
from config import logger
import jwt
from jwt import PyJWTError
async def security_middleware(request: Request, call_next):
async def next_middleware_call():
return await call_next(request)
if request.url.path in {"/docs", "/openapi.json", "/health"}:
return await next_middleware_call()
jwt_secret = os.getenv("JWT_SECRET")
if not jwt_secret:
logger.warn("JWT_SECRET not found in environment variables")
return await next_middleware_call()
authorization = request.headers.get("Authorization")
if not authorization or not authorization.startswith("Bearer "):
logger.info(
f"Unauthorized request with missing or invalid Authorization header to: {request.url.path}"
)
return JSONResponse(
status_code=401,
content={"detail": "Missing or invalid Authorization header"},
)
token = authorization.split(" ")[1]
try:
payload = jwt.decode(token, jwt_secret, algorithms=["HS256"])
exp_timestamp = payload.get("exp")
if exp_timestamp and datetime.now(tz=timezone.utc) > datetime.fromtimestamp(
exp_timestamp, tz=timezone.utc
):
logger.info(
f"Unauthorized request with expired token to: {request.url.path}"
)
return JSONResponse(
status_code=401, content={"detail": "Token has expired"}
)
request.state.user = payload
logger.debug(f"{request.url.path} - {payload}")
except PyJWTError as e:
logger.info(
f"Unauthorized request with invalid token to: {request.url.path}, reason: {str(e)}"
)
return JSONResponse(
status_code=401, content={"detail": f"Invalid token: {str(e)}"}
)
return await next_middleware_call()