Skip to content

Commit

Permalink
feat: added device code manager
Browse files Browse the repository at this point in the history
  • Loading branch information
valmoz authored Jun 17, 2024
1 parent f50f2aa commit 56a509e
Show file tree
Hide file tree
Showing 2 changed files with 289 additions and 63 deletions.
138 changes: 107 additions & 31 deletions fattureincloud_python_sdk/oauth2/oauth2.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,52 @@
from typing import Iterable
from typing import Dict, Iterable
from urllib.parse import urlencode, urlparse, parse_qs
import urllib3
import json

from fattureincloud_python_sdk.oauth2.scopes import Scope


class OAuth2AuthorizationCodeManager:
class OAuth2Manager:
def __init__(
self,
client_id: str,
client_secret: str,
redirect_uri: str,
base_uri: str = "https://api-v2.fattureincloud.it",
):
self._http = urllib3.PoolManager()
self.client_id = client_id
self.base_uri = base_uri

def execute_post(self, uri: str, data: Dict[str, str]):
body = json.dumps(data).encode("utf-8")

resp = self._http.request(
"POST", uri, body=body, headers={"Content-Type": "application/json"}
)
res = json.loads(resp.data.decode("utf-8"))
if resp.status != 200:
raise OAuth2Error(
resp.status, res["error"], res["error_description"]
)
return res

@staticmethod
def _get_scope_str(scopes: Iterable[Scope]):
if scopes is None or len(scopes) == 0:
return " "

return " ".join(map(lambda x: x.value, scopes))

class OAuth2AuthorizationCodeManager(OAuth2Manager):
def __init__(
self,
client_id: str,
client_secret: str,
redirect_uri: str,
base_uri: str = "https://api-v2.fattureincloud.it",
):
OAuth2Manager.__init__(self, client_id, base_uri)
self.client_secret = client_secret
self.redirect_uri = redirect_uri
self.base_uri = base_uri

def get_authorization_url(self, scopes: Iterable[Scope], state: str = None):
authorization_uri = "{}/oauth/authorize".format(self.base_uri)
Expand Down Expand Up @@ -53,17 +81,9 @@ def fetch_token(self, code: str):
"code": code,
}

body = json.dumps(data).encode("utf-8")
res = self.execute_post(token_uri, data)

resp = self._http.request(
"POST", token_uri, body=body, headers={"Content-Type": "application/json"}
)
res = json.loads(resp.data.decode("utf-8"))
if resp.status != 200:
raise OAuth2AuthorizationCodeError(
resp.status, res["error"], res["error_description"]
)
return OAuth2AuthorizationCodeTokenResponse(
return OAuth2TokenResponse(
res["token_type"],
res["access_token"],
res["refresh_token"],
Expand All @@ -80,38 +100,94 @@ def refresh_token(self, refresh_token: str):
"refresh_token": refresh_token,
}

body = json.dumps(data).encode("utf-8")
res = self.execute_post(token_uri, data)

resp = self._http.request(
"POST", token_uri, body=body, headers={"Content-Type": "application/json"}
return OAuth2TokenResponse(
res["token_type"],
res["access_token"],
res["refresh_token"],
res["expires_in"],
)
res = json.loads(resp.data.decode("utf-8"))
if resp.status != 200:
raise OAuth2AuthorizationCodeError(
resp.status, res["error"], res["error_description"]
)
return OAuth2AuthorizationCodeTokenResponse(

class OAuth2DeviceCodeManager(OAuth2Manager):
def __init__(
self,
client_id: str,
base_uri: str = "https://api-v2.fattureincloud.it",
):
OAuth2Manager.__init__(self, client_id, base_uri)

def get_device_code(self, scopes: Iterable[Scope]):
token_uri = "{}/oauth/device".format(self.base_uri)
scope = OAuth2DeviceCodeManager._get_scope_str(scopes)

data = {
"client_id": self.client_id,
"scope": scope,
}

res = self.execute_post(token_uri, data)["data"]
return OAuth2DeviceCodeResponse(
res["device_code"],
res["user_code"],
res["scope"],
res["verification_uri"],
res["interval"],
res["expires_in"],
)

def fetch_token(self, code: str):
token_uri = "{}/oauth/token".format(self.base_uri)
data = {
"grant_type": "urn:ietf:params:oauth:grant-type:device_code",
"client_id": self.client_id,
"device_code": code,
}

res = self.execute_post(token_uri, data)

return OAuth2TokenResponse(
res["token_type"],
res["access_token"],
res["refresh_token"],
res["expires_in"],
)

@staticmethod
def _get_scope_str(scopes: Iterable[Scope]):
if scopes is None or len(scopes) == 0:
return " "
def refresh_token(self, refresh_token: str):
token_uri = "{}/oauth/token".format(self.base_uri)

return " ".join(map(lambda x: x.value, scopes))
data = {
"grant_type": "refresh_token",
"client_id": self.client_id,
"refresh_token": refresh_token,
}

res = self.execute_post(token_uri, data)

return OAuth2TokenResponse(
res["token_type"],
res["access_token"],
res["refresh_token"],
res["expires_in"],
)

class OAuth2AuthorizationCodeParams:
def __init__(self, authorization_code: str, state: str):
self.authorization_code = authorization_code
self.state = state

class OAuth2DeviceCodeResponse:
def __init__(
self, device_code: str, user_code: str, scope: Dict[str, str], verification_uri: str, interval: int, expires_in: int
):
self.device_code = device_code
self.user_code = user_code
self.scope = scope
self.verification_uri = verification_uri
self.interval = interval
self.expires_in = expires_in

class OAuth2AuthorizationCodeTokenResponse:
class OAuth2TokenResponse:
def __init__(
self, token_type: str, access_token: str, refresh_token: str, expires_in: int
):
Expand All @@ -121,7 +197,7 @@ def __init__(
self.expires_in = expires_in


class OAuth2AuthorizationCodeError(Exception):
class OAuth2Error(Exception):
def __init__(self, status: int, error: str, error_description: str):
self.status = status
self.error = error
Expand Down
Loading

0 comments on commit 56a509e

Please sign in to comment.