This repository has been archived by the owner on Nov 21, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 3
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #795 from cisco-open/apigw
Add API GW login. Refactor session related files.
- Loading branch information
Showing
15 changed files
with
416 additions
and
340 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,93 @@ | ||
import logging | ||
from typing import TYPE_CHECKING, Literal, Optional | ||
from urllib.parse import urlparse | ||
|
||
from pydantic import BaseModel, Field, PositiveInt | ||
from requests import HTTPError, PreparedRequest, post | ||
from requests.auth import AuthBase | ||
from requests.exceptions import JSONDecodeError | ||
|
||
from catalystwan.exceptions import CatalystwanException | ||
from catalystwan.response import ManagerResponse | ||
|
||
if TYPE_CHECKING: | ||
from catalystwan.session import ManagerSession | ||
|
||
LoginMode = Literal["machine", "user", "session"] | ||
|
||
|
||
class ApiGwLogin(BaseModel): | ||
client_id: str | ||
client_secret: str | ||
org_name: str | ||
mode: Optional[LoginMode] = None | ||
username: Optional[str] = None | ||
session: Optional[str] = None | ||
tenant_user: Optional[bool] = None | ||
token_duration: PositiveInt = Field(default=10, description="in minutes") | ||
|
||
|
||
class ApiGwAuth(AuthBase): | ||
"""Attaches ApiGateway Authentication to the given Requests object. | ||
1. Get a bearer token by sending a POST request to the /apigw/login endpoint. | ||
2. Use the token in the Authorization header for subsequent requests. | ||
""" | ||
|
||
def __init__(self, login: ApiGwLogin, logger: Optional[logging.Logger] = None): | ||
self.login = login | ||
self.token = "" | ||
self.logger = logger or logging.getLogger(__name__) | ||
|
||
def __call__(self, request: PreparedRequest) -> PreparedRequest: | ||
self.handle_auth(request) | ||
self.build_digest_header(request) | ||
return request | ||
|
||
def handle_auth(self, request: PreparedRequest) -> None: | ||
if self.token == "": | ||
self.authenticate(request) | ||
|
||
def authenticate(self, request: PreparedRequest): | ||
assert request.url is not None | ||
url = urlparse(request.url) | ||
base_url = f"{url.scheme}://{url.netloc}" | ||
self.token = self.get_token(base_url, self.login) | ||
|
||
def build_digest_header(self, request: PreparedRequest) -> None: | ||
header = { | ||
"sdwan-org": self.login.org_name, | ||
"Authorization": f"Bearer {self.token}", | ||
} | ||
request.headers.update(header) | ||
|
||
@staticmethod | ||
def get_token(base_url: str, apigw_login: ApiGwLogin) -> str: | ||
try: | ||
response = post( | ||
url=f"{base_url}/apigw/login", | ||
verify=False, | ||
json=apigw_login.model_dump(exclude_none=True), | ||
timeout=10, | ||
) | ||
response.raise_for_status() | ||
token = response.json()["token"] | ||
except JSONDecodeError: | ||
raise CatalystwanException(f"Incorrect response type from ApiGateway login request, ({response.text})") | ||
except HTTPError as ex: | ||
raise CatalystwanException(f"Problem with connection to ApiGateway login endpoint, ({ex})") | ||
except KeyError as ex: | ||
raise CatalystwanException(f"Not found token in login response from ApiGateway, ({ex})") | ||
else: | ||
if not token or not isinstance(token, str): | ||
raise CatalystwanException("Failed to get bearer token") | ||
return token | ||
|
||
def __str__(self) -> str: | ||
return f"ApiGatewayAuth(mode={self.login.mode})" | ||
|
||
def logout(self, session: "ManagerSession") -> Optional[ManagerResponse]: | ||
return None | ||
|
||
def clear_tokens_and_cookies(self) -> None: | ||
self.token = "" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
from pydantic import BaseModel | ||
|
||
from catalystwan.endpoints import APIEndpoints, post | ||
|
||
|
||
class OnBoardClient(BaseModel): | ||
client_id: str | ||
client_secret: str | ||
client_name: str | ||
|
||
|
||
class ApiGateway(APIEndpoints): | ||
@post("/apigw/config/reload") | ||
def configuration_reload(self) -> None: | ||
"""After launching the API Gateway, SSP can use the API | ||
and bearer authentication header with provision access token obtained | ||
in the step above. It reloads the configuration from S3 bucket, Secrets Manager | ||
and reset the RDS connection pool.""" | ||
... | ||
|
||
@post("/apigw/client/registration") | ||
def on_board_client(self, payload: OnBoardClient) -> None: | ||
... |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.