From 56a509e51f7482563cfcf690836f92f8bae411a6 Mon Sep 17 00:00:00 2001 From: Mauro Valota <7254293+valmoz@users.noreply.github.com> Date: Mon, 17 Jun 2024 15:10:03 +0200 Subject: [PATCH] feat: added device code manager --- fattureincloud_python_sdk/oauth2/oauth2.py | 138 ++++++++++--- test/test_oauth2.py | 214 ++++++++++++++++++--- 2 files changed, 289 insertions(+), 63 deletions(-) diff --git a/fattureincloud_python_sdk/oauth2/oauth2.py b/fattureincloud_python_sdk/oauth2/oauth2.py index c5798ef2..7ae1595b 100644 --- a/fattureincloud_python_sdk/oauth2/oauth2.py +++ b/fattureincloud_python_sdk/oauth2/oauth2.py @@ -1,4 +1,4 @@ -from typing import Iterable +from typing import Dict, Iterable from urllib.parse import urlencode, urlparse, parse_qs import urllib3 import json @@ -6,19 +6,47 @@ from fattureincloud_python_sdk.oauth2.scopes import Scope -class OAuth2AuthorizationCodeManager: +class OAuth2Manager: def __init__( self, client_id: str, - client_secret: str, - redirect_uri: str, base_uri: str = "https://api-v2.fattureincloud.it", ): self._http = urllib3.PoolManager() self.client_id = client_id + self.base_uri = base_uri + + def execute_post(self, uri: str, data: Dict[str, str]): + body = json.dumps(data).encode("utf-8") + + resp = self._http.request( + "POST", uri, body=body, headers={"Content-Type": "application/json"} + ) + res = json.loads(resp.data.decode("utf-8")) + if resp.status != 200: + raise OAuth2Error( + resp.status, res["error"], res["error_description"] + ) + return res + + @staticmethod + def _get_scope_str(scopes: Iterable[Scope]): + if scopes is None or len(scopes) == 0: + return " " + + return " ".join(map(lambda x: x.value, scopes)) + +class OAuth2AuthorizationCodeManager(OAuth2Manager): + def __init__( + self, + client_id: str, + client_secret: str, + redirect_uri: str, + base_uri: str = "https://api-v2.fattureincloud.it", + ): + OAuth2Manager.__init__(self, client_id, base_uri) self.client_secret = client_secret self.redirect_uri = redirect_uri - self.base_uri = base_uri def get_authorization_url(self, scopes: Iterable[Scope], state: str = None): authorization_uri = "{}/oauth/authorize".format(self.base_uri) @@ -53,17 +81,9 @@ def fetch_token(self, code: str): "code": code, } - body = json.dumps(data).encode("utf-8") + res = self.execute_post(token_uri, data) - resp = self._http.request( - "POST", token_uri, body=body, headers={"Content-Type": "application/json"} - ) - res = json.loads(resp.data.decode("utf-8")) - if resp.status != 200: - raise OAuth2AuthorizationCodeError( - resp.status, res["error"], res["error_description"] - ) - return OAuth2AuthorizationCodeTokenResponse( + return OAuth2TokenResponse( res["token_type"], res["access_token"], res["refresh_token"], @@ -80,38 +100,94 @@ def refresh_token(self, refresh_token: str): "refresh_token": refresh_token, } - body = json.dumps(data).encode("utf-8") + res = self.execute_post(token_uri, data) - resp = self._http.request( - "POST", token_uri, body=body, headers={"Content-Type": "application/json"} + return OAuth2TokenResponse( + res["token_type"], + res["access_token"], + res["refresh_token"], + res["expires_in"], ) - res = json.loads(resp.data.decode("utf-8")) - if resp.status != 200: - raise OAuth2AuthorizationCodeError( - resp.status, res["error"], res["error_description"] - ) - return OAuth2AuthorizationCodeTokenResponse( + +class OAuth2DeviceCodeManager(OAuth2Manager): + def __init__( + self, + client_id: str, + base_uri: str = "https://api-v2.fattureincloud.it", + ): + OAuth2Manager.__init__(self, client_id, base_uri) + + def get_device_code(self, scopes: Iterable[Scope]): + token_uri = "{}/oauth/device".format(self.base_uri) + scope = OAuth2DeviceCodeManager._get_scope_str(scopes) + + data = { + "client_id": self.client_id, + "scope": scope, + } + + res = self.execute_post(token_uri, data)["data"] + return OAuth2DeviceCodeResponse( + res["device_code"], + res["user_code"], + res["scope"], + res["verification_uri"], + res["interval"], + res["expires_in"], + ) + + def fetch_token(self, code: str): + token_uri = "{}/oauth/token".format(self.base_uri) + data = { + "grant_type": "urn:ietf:params:oauth:grant-type:device_code", + "client_id": self.client_id, + "device_code": code, + } + + res = self.execute_post(token_uri, data) + + return OAuth2TokenResponse( res["token_type"], res["access_token"], res["refresh_token"], res["expires_in"], ) - @staticmethod - def _get_scope_str(scopes: Iterable[Scope]): - if scopes is None or len(scopes) == 0: - return " " + def refresh_token(self, refresh_token: str): + token_uri = "{}/oauth/token".format(self.base_uri) - return " ".join(map(lambda x: x.value, scopes)) + data = { + "grant_type": "refresh_token", + "client_id": self.client_id, + "refresh_token": refresh_token, + } + + res = self.execute_post(token_uri, data) + return OAuth2TokenResponse( + res["token_type"], + res["access_token"], + res["refresh_token"], + res["expires_in"], + ) class OAuth2AuthorizationCodeParams: def __init__(self, authorization_code: str, state: str): self.authorization_code = authorization_code self.state = state +class OAuth2DeviceCodeResponse: + def __init__( + self, device_code: str, user_code: str, scope: Dict[str, str], verification_uri: str, interval: int, expires_in: int + ): + self.device_code = device_code + self.user_code = user_code + self.scope = scope + self.verification_uri = verification_uri + self.interval = interval + self.expires_in = expires_in -class OAuth2AuthorizationCodeTokenResponse: +class OAuth2TokenResponse: def __init__( self, token_type: str, access_token: str, refresh_token: str, expires_in: int ): @@ -121,7 +197,7 @@ def __init__( self.expires_in = expires_in -class OAuth2AuthorizationCodeError(Exception): +class OAuth2Error(Exception): def __init__(self, status: int, error: str, error_description: str): self.status = status self.error = error diff --git a/test/test_oauth2.py b/test/test_oauth2.py index bbbb00e5..2a4bf145 100644 --- a/test/test_oauth2.py +++ b/test/test_oauth2.py @@ -3,8 +3,10 @@ from fattureincloud_python_sdk.oauth2.oauth2 import ( OAuth2AuthorizationCodeParams, - OAuth2AuthorizationCodeTokenResponse, - OAuth2AuthorizationCodeError, + OAuth2DeviceCodeManager, + OAuth2DeviceCodeResponse, + OAuth2TokenResponse, + OAuth2Error, OAuth2AuthorizationCodeManager, ) from fattureincloud_python_sdk.oauth2.scopes import Scope @@ -14,9 +16,12 @@ class TestOAuth2(unittest.TestCase): """OAuth2 unit test stubs""" def setUp(self): - self.oa2 = OAuth2AuthorizationCodeManager( + self.oa2auth = OAuth2AuthorizationCodeManager( "CLIENT_ID", "CLIENT_SECRET", "http://localhost:3000/redirect" ) + self.oa2device = OAuth2DeviceCodeManager( + "CLIENT_ID" + ) pass def tearDown(self): @@ -27,8 +32,23 @@ def testOAuth2AuthorizationCodeParams(self): assert params.authorization_code == "EXAMPLE_CODE" assert params.state == "EXAMPLE_STATE" - def testOAuth2AuthorizationCodeTokenResponse(self): - params = OAuth2AuthorizationCodeTokenResponse( + def testOAuth2DeviceCodeResponse(self): + scope = {} + scope["situation"] = "r" + scope["settings"] = "a" + + params = OAuth2DeviceCodeResponse( + "PAPAYA", "TEDDY-BEAR", scope, "https://fattureincloud.it/connetti", 5, 300 + ) + assert params.device_code == "PAPAYA" + assert params.user_code == "TEDDY-BEAR" + assert params.scope == scope + assert params.verification_uri == "https://fattureincloud.it/connetti" + assert params.interval == 5 + assert params.expires_in == 300 + + def testOAuth2TokenResponse(self): + params = OAuth2TokenResponse( "bearer", "EXAMPLE_ACCESS_TOKEN", "EXAMPLE_REFRESH_TOKEN", 86400 ) assert params.token_type == "bearer" @@ -36,8 +56,8 @@ def testOAuth2AuthorizationCodeTokenResponse(self): assert params.refresh_token == "EXAMPLE_REFRESH_TOKEN" assert params.expires_in == 86400 - def testOAuth2AuthorizationCodeError(self): - err = OAuth2AuthorizationCodeError( + def testOAuth2Error(self): + err = OAuth2Error( 418, "I'm a teapot", "I'm a teapot, but a really evil one." ) assert err.status == 418 @@ -45,44 +65,43 @@ def testOAuth2AuthorizationCodeError(self): assert err.error_description == "I'm a teapot, but a really evil one." def testOAuth2AuthorizationCode(self): - assert self.oa2.client_id == "CLIENT_ID" - assert self.oa2.client_secret == "CLIENT_SECRET" - assert self.oa2.redirect_uri == "http://localhost:3000/redirect" - assert self.oa2.base_uri == "https://api-v2.fattureincloud.it" + assert self.oa2auth.client_id == "CLIENT_ID" + assert self.oa2auth.client_secret == "CLIENT_SECRET" + assert self.oa2auth.redirect_uri == "http://localhost:3000/redirect" + assert self.oa2auth.base_uri == "https://api-v2.fattureincloud.it" - def testGetScopeStr(self): + def testOAuth2AuthorizationCodeGetScopeStr(self): scopes = [Scope.SETTINGS_ALL, Scope.ISSUED_DOCUMENTS_INVOICES_READ] scopes_str = OAuth2AuthorizationCodeManager._get_scope_str(scopes) assert scopes_str == "settings:a issued_documents.invoices:r" - def testGetAuthorizationUrl(self): + def testOAuth2AuthorizationCodeGetAuthorizationUrl(self): scopes = [Scope.SETTINGS_ALL, Scope.ISSUED_DOCUMENTS_INVOICES_READ] - url = self.oa2.get_authorization_url(scopes, "EXAMPLE_STATE") + url = self.oa2auth.get_authorization_url(scopes, "EXAMPLE_STATE") assert ( url == "https://api-v2.fattureincloud.it/oauth/authorize?response_type=code&client_id=CLIENT_ID&redirect_uri=http%3A%2F%2Flocalhost%3A3000%2Fredirect&scope=settings%3Aa+issued_documents.invoices%3Ar&state=EXAMPLE_STATE" ) - def testGetParamsFromUrl(self): + def testOAuth2AuthorizationCodeGetParamsFromUrl(self): url = "http://localhost:3000/redirect?state=EXAMPLE_STATE&code=c%2FEXAMPLE_CODE" - params = self.oa2.get_params_from_url(url) + params = self.oa2auth.get_params_from_url(url) assert params.authorization_code == "c/EXAMPLE_CODE" assert params.state == "EXAMPLE_STATE" - def testFetchToken(self): + def testOAuth2AuthorizationCodeFetchToken(self): resp = unittest.mock.MagicMock( status=200, data=b'{"token_type": "bearer", "access_token": "a/ACCESS_TOKEN", "refresh_token": "r/REFRESH_TOKEN", "expires_in": 86400}', reason="OK", ) - self.oa2._http.request = unittest.mock.MagicMock(return_value=resp) + self.oa2auth._http.request = unittest.mock.MagicMock(return_value=resp) - result = self.oa2.fetch_token("EXAMPLE_CODE") - print(result) + result = self.oa2auth.fetch_token("EXAMPLE_CODE") assert result.token_type == "bearer" assert result.access_token == "a/ACCESS_TOKEN" assert result.refresh_token == "r/REFRESH_TOKEN" @@ -90,7 +109,7 @@ def testFetchToken(self): exp_body = b'{"grant_type": "authorization_code", "client_id": "CLIENT_ID", "client_secret": "CLIENT_SECRET", "redirect_uri": "http://localhost:3000/redirect", "code": "EXAMPLE_CODE"}' - self.oa2._http.request.assert_called_once_with( + self.oa2auth._http.request.assert_called_once_with( "POST", "https://api-v2.fattureincloud.it/oauth/token", body=exp_body, @@ -103,25 +122,24 @@ def testFetchToken(self): reason="I'm a teapot", ) - self.oa2._http.request = unittest.mock.MagicMock(return_value=resp) + self.oa2auth._http.request = unittest.mock.MagicMock(return_value=resp) - with self.assertRaises(OAuth2AuthorizationCodeError) as context: - self.oa2.fetch_token("EXAMPLE_ERR_CODE") + with self.assertRaises(OAuth2Error) as context: + self.oa2auth.fetch_token("EXAMPLE_ERR_CODE") assert "An error occurred while retrieving token: I'm a teapot" == "{0}".format( context.exception ) - def testRefreshToken(self): + def testOAuth2AuthorizationCodeRefreshToken(self): resp = unittest.mock.MagicMock( status=200, data=b'{"token_type": "bearer", "access_token": "a/ACCESS_TOKEN", "refresh_token": "r/REFRESH_TOKEN", "expires_in": 86400}', reason="OK", ) - self.oa2._http.request = unittest.mock.MagicMock(return_value=resp) + self.oa2auth._http.request = unittest.mock.MagicMock(return_value=resp) - result = self.oa2.refresh_token("r/RT") - print(result) + result = self.oa2auth.refresh_token("r/RT") assert result.token_type == "bearer" assert result.access_token == "a/ACCESS_TOKEN" assert result.refresh_token == "r/REFRESH_TOKEN" @@ -129,7 +147,139 @@ def testRefreshToken(self): exp_body = b'{"grant_type": "refresh_token", "client_id": "CLIENT_ID", "client_secret": "CLIENT_SECRET", "refresh_token": "r/RT"}' - self.oa2._http.request.assert_called_once_with( + self.oa2auth._http.request.assert_called_once_with( + "POST", + "https://api-v2.fattureincloud.it/oauth/token", + body=exp_body, + headers={"Content-Type": "application/json"}, + ) + + resp = unittest.mock.MagicMock( + status=418, + data=b'{"error": "I\'m a teapot", "error_description": "I\'m a teapot"}', + reason="I'm a teapot", + ) + + self.oa2auth._http.request = unittest.mock.MagicMock(return_value=resp) + + with self.assertRaises(OAuth2Error) as context: + self.oa2auth.refresh_token("r/ERR_RT") + assert "An error occurred while retrieving token: I'm a teapot" == "{0}".format( + context.exception + ) + + def testOAuth2DeviceCode(self): + assert self.oa2device.client_id == "CLIENT_ID" + assert self.oa2device.base_uri == "https://api-v2.fattureincloud.it" + + def testOAuth2DeviceCodeGetScopeStr(self): + scopes = [Scope.SETTINGS_ALL, Scope.ISSUED_DOCUMENTS_INVOICES_READ] + scopes_str = OAuth2DeviceCodeManager._get_scope_str(scopes) + + assert scopes_str == "settings:a issued_documents.invoices:r" + + def testOAuth2DeviceCodeGetDeviceCode(self): + scopes = [Scope.SITUATION_READ, Scope.SETTINGS_ALL] + + scope = {} + scope["situation"] = "r" + scope["settings"] = "a" + + resp = unittest.mock.MagicMock( + status=200, + data=b'{"data":{"device_code":"PAPAYA","user_code":"TEDDY-BEAR","scope":{"situation":"r","settings":"a"},"verification_uri":"https://fattureincloud.it/connetti","interval":5,"expires_in":300}}', + reason="OK", + ) + + self.oa2device._http.request = unittest.mock.MagicMock(return_value=resp) + + result = self.oa2device.get_device_code(scopes) + assert result.device_code == "PAPAYA" + assert result.user_code == "TEDDY-BEAR" + assert result.scope == scope + assert result.verification_uri == "https://fattureincloud.it/connetti" + assert result.interval == 5 + assert result.expires_in == 300 + + exp_body = b'{"client_id": "CLIENT_ID", "scope": "situation:r settings:a"}' + + self.oa2device._http.request.assert_called_once_with( + "POST", + "https://api-v2.fattureincloud.it/oauth/device", + body=exp_body, + headers={"Content-Type": "application/json"}, + ) + + resp = unittest.mock.MagicMock( + status=418, + data=b'{"error": "I\'m a teapot", "error_description": "I\'m a teapot"}', + reason="I'm a teapot", + ) + + self.oa2device._http.request = unittest.mock.MagicMock(return_value=resp) + + with self.assertRaises(OAuth2Error) as context: + self.oa2device.get_device_code(scopes) + assert "An error occurred while retrieving token: I'm a teapot" == "{0}".format( + context.exception + ) + + def testOAuth2DeviceCodeFetchToken(self): + resp = unittest.mock.MagicMock( + status=200, + data=b'{"token_type": "bearer", "access_token": "a/ACCESS_TOKEN", "refresh_token": "r/REFRESH_TOKEN", "expires_in": 86400}', + reason="OK", + ) + + self.oa2device._http.request = unittest.mock.MagicMock(return_value=resp) + + result = self.oa2device.fetch_token("EXAMPLE_CODE") + assert result.token_type == "bearer" + assert result.access_token == "a/ACCESS_TOKEN" + assert result.refresh_token == "r/REFRESH_TOKEN" + assert result.expires_in == 86400 + + exp_body = b'{"grant_type": "urn:ietf:params:oauth:grant-type:device_code", "client_id": "CLIENT_ID", "device_code": "EXAMPLE_CODE"}' + + self.oa2device._http.request.assert_called_once_with( + "POST", + "https://api-v2.fattureincloud.it/oauth/token", + body=exp_body, + headers={"Content-Type": "application/json"}, + ) + + resp = unittest.mock.MagicMock( + status=418, + data=b'{"error": "I\'m a teapot", "error_description": "I\'m a teapot"}', + reason="I'm a teapot", + ) + + self.oa2device._http.request = unittest.mock.MagicMock(return_value=resp) + + with self.assertRaises(OAuth2Error) as context: + self.oa2device.fetch_token("EXAMPLE_ERR_CODE") + assert "An error occurred while retrieving token: I'm a teapot" == "{0}".format( + context.exception + ) + + def testOAuth2DeviceCodeRefreshToken(self): + resp = unittest.mock.MagicMock( + status=200, + data=b'{"token_type": "bearer", "access_token": "a/ACCESS_TOKEN", "refresh_token": "r/REFRESH_TOKEN", "expires_in": 86400}', + reason="OK", + ) + + self.oa2device._http.request = unittest.mock.MagicMock(return_value=resp) + + result = self.oa2device.refresh_token("r/RT") + assert result.token_type == "bearer" + assert result.access_token == "a/ACCESS_TOKEN" + assert result.refresh_token == "r/REFRESH_TOKEN" + assert result.expires_in == 86400 + + exp_body = b'{"grant_type": "refresh_token", "client_id": "CLIENT_ID", "refresh_token": "r/RT"}' + + self.oa2device._http.request.assert_called_once_with( "POST", "https://api-v2.fattureincloud.it/oauth/token", body=exp_body, @@ -142,10 +292,10 @@ def testRefreshToken(self): reason="I'm a teapot", ) - self.oa2._http.request = unittest.mock.MagicMock(return_value=resp) + self.oa2device._http.request = unittest.mock.MagicMock(return_value=resp) - with self.assertRaises(OAuth2AuthorizationCodeError) as context: - self.oa2.refresh_token("r/ERR_RT") + with self.assertRaises(OAuth2Error) as context: + self.oa2device.refresh_token("r/ERR_RT") assert "An error occurred while retrieving token: I'm a teapot" == "{0}".format( context.exception )