-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathkeycloak_sso_expiration_middleware.py
89 lines (63 loc) · 2.73 KB
/
keycloak_sso_expiration_middleware.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
import logging
import jwt
import json
import requests
from django.utils.deprecation import MiddlewareMixin
from django.contrib.auth import logout, get_backends
from django.urls import reverse
from django.shortcuts import redirect
from django.http import HttpResponse
from social_django.models import UserSocialAuth
from social_core.backends.keycloak import KeycloakOAuth2
class KeycloakSSOExpirationMiddleware(MiddlewareMixin):
def __init__(self, get_response):
super().__init__(get_response)
self.keycloak = KeycloakOAuth2()
def _sso_expire(self, request):
logger = logging.getLogger('KeycloakSSOExpirationMiddleware')
logger.debug("Processing request..")
if request.user.is_anonymous:
return
user = request.user
# handle django 1.4 pickling bug
if hasattr(user, '_wrapped') and hasattr(user, '_setup'):
if user._wrapped.__class__ == object:
user._setup()
user = user._wrapped
try:
social_auth_user = user.social_auth.get(provider='keycloak')
except UserSocialAuth.DoesNotExist as error:
logger.debug('User does not exist in Keycloak Social Auth provider')
return
# It seems like extra_data used to be a dict, but is a string in newer versions.
extra_data = social_auth_user.extra_data
if isinstance(extra_data, str):
extra_data = json.loads(extra_data)
if self.is_social_auth_user_access_token_valid(extra_data['access_token']):
logger.debug('Access token is valid')
return
logger.debug('Access token is no longer valid')
try:
resp = self.keycloak.refresh_token(extra_data['refresh_token'])
extra_data['access_token'] = resp['access_token']
extra_data['refresh_token'] = resp['refresh_token']
social_auth_user.set_extra_data(extra_data)
social_auth_user.save()
logger.debug('Refreshed access token')
except requests.exceptions.HTTPError as error:
status_code = error.response.status_code
logger.debug('Got unexpected status code')
logger.debug(status_code)
# Maybe this should only be done on 401?
logout(request)
return redirect(reverse('login'))
def is_social_auth_user_access_token_valid(self, access_token: str) -> bool:
try:
payload = self.keycloak.user_data(access_token)
return True
except jwt.exceptions.ExpiredSignatureError:
return False
def process_request(self, request):
return self._sso_expire(request)
def process_response(self, request, response):
return response