-
-
Notifications
You must be signed in to change notification settings - Fork 0
/
flask_cfaccess.py
184 lines (132 loc) · 4.79 KB
/
flask_cfaccess.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
"""Zero Trust Access with Cloudflare Access for Flask applications."""
import typing as t
from functools import wraps
from importlib.metadata import PackageNotFoundError, version
import jwt
import requests
from flask import Flask, abort, current_app, g, request
from jwt.jwks_client import PyJWKClient
from werkzeug.local import LocalProxy
try:
from _version import version as __version__
except ImportError: # pragma: nocover
try:
__version__ = version("flask_cfaccess")
except PackageNotFoundError:
__version__ = "0.1-dev0"
class CfAccess:
"""Zero Trust Access with Cloudflare Access for Flask applications.
:param app: The Flask application instance. The application instance is
optional and can be passed by calling
``cfaccess.init_app(app)`` in your application factory\
function.
"""
def __init__(self, app: t.Optional[Flask] = None):
self.default_app = app
if app:
self.init_app(app)
def init_app(self, app: Flask):
"""Initialize the application.
:param app: The Flask application.instance.
"""
try:
team = app.config["CLOUDFLARE_ACCESS_AUD"]
team = app.config["CLOUDFLARE_ACCESS_TEAM"]
except KeyError:
raise RuntimeError(
"CLOUDFLARE_ACCESS_AUD and CLOUDFLARE_ACCESS_TEAM is required "
"for flask-cfaccess"
)
client = PyJWKClient(
f"https://{team}.cloudflareaccess.com/cdn-cgi/access/certs"
)
if not hasattr(app, "extensions"):
app.extensions = {}
app.extensions["cfaccess"] = (self, client)
def _get_app(self):
try:
return current_app._get_current_object()
except RuntimeError:
if self.default_app:
return self.default_app
raise
def authenticate(self, token=None):
"""Validate that a request is authenticated."""
if not token:
token = request.headers.get("Cf-Access-Jwt-Assertion")
if not token:
token = request.cookies.get("CF_Authorization")
if not token:
return False
app = self._get_app()
try:
cfaccess, jwks_client = app.extensions["cfaccess"]
except (AttributeError, IndexError, KeyError, TypeError):
raise RuntimeError(
"flask-cfaccess not configured for this application"
)
aud = app.config["CLOUDFLARE_ACCESS_AUD"]
key = jwks_client.get_signing_key_from_jwt(token)
try:
payload = jwt.decode(
token, key, audience=aud, algorithms=["RS256"]
)
except jwt.DecodeError:
return False
g._cfaccess_token = token
g._cfaccess_payload = payload
return True
def get_identity(self, token=None):
"""Get the user's identity."""
if not token:
token = getattr(g, "_cfaccess_token", None)
if not token:
self.authenticate()
token = getattr(g, "_cfaccess_token", None)
if not token:
return
team = self._get_app().config["CLOUDFLARE_ACCESS_TEAM"]
url = (
f"https://{team}.cloudflareaccess.com/cdn-cgi/access/get-identity"
)
res = requests.get(url, cookies={"CF_Authorization": token})
res.raise_for_status()
return res.json()
def login_required(self, f):
"""Decorate a view to require a valid identity."""
return login_required(f)
def validate_identity(self, identify):
"""Decorate a view to validate the user's identity."""
return validate_identity(identify)
def get_cfaccess():
try:
return current_app.extensions["cfaccess"][0]
except (AttributeError, IndexError, KeyError, TypeError):
raise RuntimeError(
"flask-cfaccess not configured for this application"
)
cfaccess = LocalProxy(get_cfaccess)
def login_required(f):
"""Decorate a view to require that the request is authenticated."""
@wraps(f)
def wrapper(*args, **kwargs):
if not cfaccess.authenticate():
abort(401)
return f(*args, **kwargs)
return wrapper
def get_identity():
"""Get the user's identity."""
return cfaccess.get_identity()
def validate_identity(identify):
"""Decorate a view to validate the user's identity."""
def decorator(f):
@wraps(f)
def wrapper(*args, **kwargs):
if not cfaccess.authenticate():
abort(401)
identity = cfaccess.get_identity()
if not identify(identity):
abort(403)
return f(*args, **kwargs)
return wrapper
return decorator