-
Notifications
You must be signed in to change notification settings - Fork 324
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: Add function to verify an App Check token (#642)
* Sketch out initial private methods and service * Remove unnecessary notes * Fix some lint issues * Fix style guide issues * Update code structure * Add pyjwt version to requirments & update code based on comments * Add app_id key for verified claims dict * Add initial test * Add tests for token headers * Add decode token test and notes * Updating requirements for mocks and note in test * Add verify token test and decode test * Update pytest-mock requirements * Add tests for error messages * Update requirements for lifespan cache * update error message and test * Explicitly pass audience to jwt.decode and update key retrieval * Mock signing key * Update aud check logic and tests * Remove print statement * Update method doc string * Add test for decode_token error * Catch additional errors and add custom error messages for them * Mock out all the common errors * Updating error messages and tests per comments * Make jwks_client a class property * Add validation for the subject in the JWT payload * Update docs and error message strings
- Loading branch information
1 parent
0dd6303
commit 5b7ac05
Showing
4 changed files
with
428 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -12,3 +12,4 @@ apikey.txt | |
htmlcov/ | ||
.pytest_cache/ | ||
.vscode/ | ||
.venv/ |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,150 @@ | ||
# Copyright 2022 Google Inc. | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
"""Firebase App Check module.""" | ||
|
||
from typing import Any, Dict | ||
import jwt | ||
from jwt import PyJWKClient, ExpiredSignatureError, InvalidTokenError | ||
from jwt import InvalidAudienceError, InvalidIssuerError, InvalidSignatureError | ||
from firebase_admin import _utils | ||
|
||
_APP_CHECK_ATTRIBUTE = '_app_check' | ||
|
||
def _get_app_check_service(app) -> Any: | ||
return _utils.get_app_service(app, _APP_CHECK_ATTRIBUTE, _AppCheckService) | ||
|
||
def verify_token(token: str, app=None) -> Dict[str, Any]: | ||
"""Verifies a Firebase App Check token. | ||
Args: | ||
token: A token from App Check. | ||
app: An App instance (optional). | ||
Returns: | ||
Dict[str, Any]: The token's decoded claims. | ||
Raises: | ||
ValueError: If the app's ``project_id`` is invalid or unspecified, | ||
or if the token's headers or payload are invalid. | ||
""" | ||
return _get_app_check_service(app).verify_token(token) | ||
|
||
class _AppCheckService: | ||
"""Service class that implements Firebase App Check functionality.""" | ||
|
||
_APP_CHECK_ISSUER = 'https://firebaseappcheck.googleapis.com/' | ||
_JWKS_URL = 'https://firebaseappcheck.googleapis.com/v1/jwks' | ||
_project_id = None | ||
_scoped_project_id = None | ||
_jwks_client = None | ||
|
||
def __init__(self, app): | ||
# Validate and store the project_id to validate the JWT claims | ||
self._project_id = app.project_id | ||
if not self._project_id: | ||
raise ValueError( | ||
'A project ID must be specified to access the App Check ' | ||
'service. Either set the projectId option, use service ' | ||
'account credentials, or set the ' | ||
'GOOGLE_CLOUD_PROJECT environment variable.') | ||
self._scoped_project_id = 'projects/' + app.project_id | ||
# Default lifespan is 300 seconds (5 minutes) so we change it to 21600 seconds (6 hours). | ||
self._jwks_client = PyJWKClient(self._JWKS_URL, lifespan=21600) | ||
|
||
|
||
def verify_token(self, token: str) -> Dict[str, Any]: | ||
"""Verifies a Firebase App Check token.""" | ||
_Validators.check_string("app check token", token) | ||
|
||
# Obtain the Firebase App Check Public Keys | ||
# Note: It is not recommended to hard code these keys as they rotate, | ||
# but you should cache them for up to 6 hours. | ||
signing_key = self._jwks_client.get_signing_key_from_jwt(token) | ||
self._has_valid_token_headers(jwt.get_unverified_header(token)) | ||
verified_claims = self._decode_and_verify(token, signing_key.key) | ||
|
||
verified_claims['app_id'] = verified_claims.get('sub') | ||
return verified_claims | ||
|
||
def _has_valid_token_headers(self, headers: Any) -> None: | ||
"""Checks whether the token has valid headers for App Check.""" | ||
# Ensure the token's header has type JWT | ||
if headers.get('typ') != 'JWT': | ||
raise ValueError("The provided App Check token has an incorrect type header") | ||
# Ensure the token's header uses the algorithm RS256 | ||
algorithm = headers.get('alg') | ||
if algorithm != 'RS256': | ||
raise ValueError( | ||
'The provided App Check token has an incorrect alg header. ' | ||
f'Expected RS256 but got {algorithm}.' | ||
) | ||
|
||
def _decode_and_verify(self, token: str, signing_key: str): | ||
"""Decodes and verifies the token from App Check.""" | ||
payload = {} | ||
try: | ||
payload = jwt.decode( | ||
token, | ||
signing_key, | ||
algorithms=["RS256"], | ||
audience=self._scoped_project_id | ||
) | ||
except InvalidSignatureError: | ||
raise ValueError( | ||
'The provided App Check token has an invalid signature.' | ||
) | ||
except InvalidAudienceError: | ||
raise ValueError( | ||
'The provided App Check token has an incorrect "aud" (audience) claim. ' | ||
f'Expected payload to include {self._scoped_project_id}.' | ||
) | ||
except InvalidIssuerError: | ||
raise ValueError( | ||
'The provided App Check token has an incorrect "iss" (issuer) claim. ' | ||
f'Expected claim to include {self._APP_CHECK_ISSUER}' | ||
) | ||
except ExpiredSignatureError: | ||
raise ValueError( | ||
'The provided App Check token has expired.' | ||
) | ||
except InvalidTokenError as exception: | ||
raise ValueError( | ||
f'Decoding App Check token failed. Error: {exception}' | ||
) | ||
|
||
audience = payload.get('aud') | ||
if not isinstance(audience, list) or self._scoped_project_id not in audience: | ||
raise ValueError('Firebase App Check token has incorrect "aud" (audience) claim.') | ||
if not payload.get('iss').startswith(self._APP_CHECK_ISSUER): | ||
raise ValueError('Token does not contain the correct "iss" (issuer).') | ||
_Validators.check_string( | ||
'The provided App Check token "sub" (subject) claim', | ||
payload.get('sub')) | ||
|
||
return payload | ||
|
||
class _Validators: | ||
"""A collection of data validation utilities. | ||
Methods provided in this class raise ``ValueErrors`` if any validations fail. | ||
""" | ||
|
||
@classmethod | ||
def check_string(cls, label: str, value: Any): | ||
"""Checks if the given value is a string.""" | ||
if value is None: | ||
raise ValueError('{0} "{1}" must be a non-empty string.'.format(label, value)) | ||
if not isinstance(value, str): | ||
raise ValueError('{0} "{1}" must be a string.'.format(label, value)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.