From a38d0cfab9b11901358949b15a7e3bac520a36d7 Mon Sep 17 00:00:00 2001 From: Jakub Krajewski Date: Mon, 12 Aug 2024 12:12:04 +0200 Subject: [PATCH 1/6] Add API GW login. Rework session related files. --- README.md | 24 +- catalystwan/api/administration.py | 6 +- catalystwan/api/tenant_management_api.py | 6 +- catalystwan/apigw_auth.py | 81 +++++++ catalystwan/endpoints/api_gateway.py | 23 ++ catalystwan/response.py | 5 + catalystwan/session.py | 212 ++++++++++-------- catalystwan/tests/test_administration.py | 5 +- catalystwan/tests/test_devices_api.py | 25 +-- catalystwan/tests/test_session.py | 99 ++------ .../tests/test_tenant_management_api.py | 3 +- catalystwan/tests/test_vmanage_auth.py | 55 ++--- catalystwan/vmanage_auth.py | 178 +++++++-------- catalystwan/workflows/tenant_migration.py | 14 +- 14 files changed, 395 insertions(+), 341 deletions(-) create mode 100644 catalystwan/apigw_auth.py create mode 100644 catalystwan/endpoints/api_gateway.py diff --git a/README.md b/README.md index b8d5d1d6..0ced7d6d 100644 --- a/README.md +++ b/README.md @@ -38,6 +38,7 @@ with create_manager_session(url=url, username=username, password=password) as se devices = session.api.devices.get() print(devices) ``` + **ManagerSession** extends [requests.Session](https://requests.readthedocs.io/en/latest/user/advanced/#session-objects) so all functionality from [requests](https://requests.readthedocs.io/en/latest/) library is avaiable to user, it also implements python [contextmanager](https://docs.python.org/3.8/library/contextlib.html#contextlib.contextmanager) and automatically frees server resources on exit.
@@ -47,13 +48,15 @@ It is possible to configure **ManagerSession** prior sending any request. ```python from catalystwan.session import ManagerSession +from catalystwan.vmanage_auth import vManageAuth url = "example.com" username = "admin" password = "password123" # configure session using constructor - nothing will be sent to target server yet -session = ManagerSession(url=url, username=username, password=password) +auth = vManageAuth(username, password) +session = ManagerSession(url=url, auth=auth) # login and send requests session.login() session.get("/dataservice/device") @@ -102,6 +105,25 @@ with create_manager_session(url=url, username=username, password=password, subdo
+
+ Login using Api Gateway (click to expand) + +```python +from catalystwan.session import create_apigw_session + +with create_apigw_session( + url="example.com", + client_id="client_id", + client_secret="client_secret", + org_name="Org-Name", + username="user", + mode="user", + token_duration=10, +) as session: + devices = session.api.devices.get() + print(devices) +``` +
## API usage examples diff --git a/catalystwan/api/administration.py b/catalystwan/api/administration.py index 14c79356..ead6f85f 100644 --- a/catalystwan/api/administration.py +++ b/catalystwan/api/administration.py @@ -100,7 +100,7 @@ def update(self, user_update_request: UserUpdateRequest): """ self._endpoints.update_user(user_update_request.username, user_update_request) - def update_password(self, username: str, new_password: str): + def update_password(self, username: str, new_password: str, current_user_password: str): """Updates exisiting user password Args: @@ -108,8 +108,8 @@ def update_password(self, username: str, new_password: str): new_password (str): New password for given user """ update_password_request = UserUpdateRequest( - username=username, password=new_password, current_user_password=self.session.password - ) # type: ignore + username=username, password=new_password, current_user_password=current_user_password + ) self._endpoints.update_password(username, update_password_request) def reset(self, username: str): diff --git a/catalystwan/api/tenant_management_api.py b/catalystwan/api/tenant_management_api.py index 97bd6123..c27b6d70 100644 --- a/catalystwan/api/tenant_management_api.py +++ b/catalystwan/api/tenant_management_api.py @@ -2,7 +2,7 @@ from __future__ import annotations -from typing import TYPE_CHECKING, List, Optional +from typing import TYPE_CHECKING, List from catalystwan.api.task_status_api import Task from catalystwan.endpoints.tenant_management import ( @@ -62,7 +62,7 @@ def update(self, tenant_update_request: TenantUpdateRequest) -> Tenant: tenant_id=tenant_update_request.tenant_id, tenant_update_request=tenant_update_request ) - def delete(self, tenant_id_list: List[str], password: Optional[str] = None) -> Task: + def delete(self, tenant_id_list: List[str], password: str) -> Task: """Deletes tenants on vManage Args: @@ -72,8 +72,6 @@ def delete(self, tenant_id_list: List[str], password: Optional[str] = None) -> T Returns: Task: Object representing tenant deletion process """ - if password is None: - password = self.session.password delete_request = TenantBulkDeleteRequest(tenant_id_list=tenant_id_list, password=password) task_id = self._endpoints.delete_tenant_async_bulk(delete_request).id return Task(self.session, task_id) diff --git a/catalystwan/apigw_auth.py b/catalystwan/apigw_auth.py new file mode 100644 index 00000000..0fbe50b0 --- /dev/null +++ b/catalystwan/apigw_auth.py @@ -0,0 +1,81 @@ +import logging +from typing import TYPE_CHECKING, Literal, Optional +from urllib.parse import urlparse + +from pydantic import BaseModel, Field, PositiveInt +from requests import PreparedRequest, post +from requests.auth import AuthBase + +from catalystwan.exceptions import CatalystwanException +from catalystwan.response import ManagerResponse + +if TYPE_CHECKING: + from catalystwan.session import ManagerSession + +LoginMode = Literal["machine", "user", "session"] + + +class ApiGwLogin(BaseModel): + client_id: str + client_secret: str + org_name: str + mode: Optional[LoginMode] = None + username: Optional[str] = None + session: Optional[str] = None + tenant_user: Optional[bool] = None + token_duration: PositiveInt = Field(default=10, description="in minutes") + + +class ApiGwAuth(AuthBase): + """Attaches ApiGateway Authentication to the given Requests object. + + 1. Get a bearer token by sending a POST request to the /apigw/login endpoint. + 2. Use the token in the Authorization header for subsequent requests. + """ + + def __init__(self, login: ApiGwLogin, logger: Optional[logging.Logger] = None): + self.login = login + self.token = "" + self.logger = logger or logging.getLogger(__name__) + + def __call__(self, request: PreparedRequest) -> PreparedRequest: + self.handle_auth(request) + self.build_digest_header(request) + return request + + def handle_auth(self, request: PreparedRequest) -> None: + if self.token == "": + self.authenticate(request) + + def authenticate(self, request: PreparedRequest): + base_url = f"{str(urlparse(request.url).scheme)}://{str(urlparse(request.url).netloc)}" + self.token = self.get_token(base_url, self.login) + + def build_digest_header(self, request: PreparedRequest) -> None: + header = { + "sdwan-org": self.login.org_name, + "Authorization": f"Bearer {self.token}", + } + request.headers.update(header) + + @staticmethod + def get_token(base_url: str, apigw_login: ApiGwLogin) -> str: + response = post( + url=f"{base_url}/apigw/login", + verify=False, + json=apigw_login.model_dump(exclude_none=True), + timeout=10, + ) + token = response.json().get("token", "") + if not token or not isinstance(token, str): + raise CatalystwanException("Failed to get bearer token") + return token + + def __str__(self) -> str: + return f"ApiGatewayAuth(mode={self.login.mode})" + + def logout(self, session: "ManagerSession") -> Optional[ManagerResponse]: + return None + + def clear_tokens_and_cookies(self) -> None: + self.token = "" diff --git a/catalystwan/endpoints/api_gateway.py b/catalystwan/endpoints/api_gateway.py new file mode 100644 index 00000000..b89a6e25 --- /dev/null +++ b/catalystwan/endpoints/api_gateway.py @@ -0,0 +1,23 @@ +from pydantic import BaseModel + +from catalystwan.endpoints import APIEndpoints, post + + +class OnBoardClient(BaseModel): + client_id: str + client_secret: str + client_name: str + + +class ApiGateway(APIEndpoints): + @post("/apigw/config/reload") + def configuration_reload(self) -> None: + """After launching the API Gateway, SSP can use the API + and bearer authentication header with provision access token obtained + in the step above. It reloads the configuration from S3 bucket, Secrets Manager + and reset the RDS connection pool.""" + ... + + @post("/apigw/client/registration") + def on_board_client(self, payload: OnBoardClient) -> None: + ... diff --git a/catalystwan/response.py b/catalystwan/response.py index 442185ae..af3c3510 100644 --- a/catalystwan/response.py +++ b/catalystwan/response.py @@ -139,6 +139,7 @@ class ManagerResponse(Response, APIEndpointClientResponse): def __init__(self, response: Response): self.__dict__.update(response.__dict__) self.jsessionid_expired = self._detect_expired_jsessionid() + self.api_gw_unauthorized = self._detect_apigw_unauthorized() try: self.payload = JsonPayload(response.json()) except JSONDecodeError: @@ -163,6 +164,10 @@ def _parse_set_cookie_from_headers(self) -> RequestsCookieJar: jar.update(parse_cookies_to_dict(cookies_string)) return jar + def _detect_apigw_unauthorized(self) -> bool: + """Determines if server sent unauthorized response""" + return self.status_code == 401 and self.json().get("message", "") == "failed to validate user" + def info(self, history: bool = False) -> str: """Returns human readable string containing Request-Response contents Args: diff --git a/catalystwan/session.py b/catalystwan/session.py index 9aec4f05..c3aa626e 100644 --- a/catalystwan/session.py +++ b/catalystwan/session.py @@ -7,15 +7,15 @@ from functools import cached_property from pathlib import Path from time import monotonic, sleep -from typing import TYPE_CHECKING, Any, Callable, ClassVar, Dict, List, Optional, Union +from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Union from urllib.parse import urljoin, urlparse, urlunparse from packaging.version import Version # type: ignore from requests import PreparedRequest, Request, Response, Session, get, head -from requests.auth import AuthBase from requests.exceptions import ConnectionError, HTTPError, RequestException from catalystwan import USER_AGENT +from catalystwan.apigw_auth import ApiGwAuth, ApiGwLogin, LoginMode from catalystwan.endpoints import APIEndpointClient from catalystwan.endpoints.client import AboutInfo, ServerInfo from catalystwan.exceptions import ( @@ -80,6 +80,73 @@ def determine_session_type( return SessionType.NOT_DEFINED +def create_base_url(url: str, port: Optional[int] = None) -> str: + """Creates base url based on ip address or domain and port if provided. + + Returns: + str: Base url shared for every request. + """ + parsed_url = urlparse(url) + netloc: str = parsed_url.netloc or parsed_url.path + scheme: str = parsed_url.scheme or "https" + base_url = urlunparse((scheme, netloc, "", None, None, None)) + if port: + return f"{base_url}:{port}" # noqa: E231 + return base_url + + +def create_apigw_session( + url: str, + client_id: str, + client_secret: str, + org_name: str, + subdomain: Optional[str] = None, + port: Optional[int] = None, + mode: Optional[LoginMode] = None, + username: Optional[str] = None, + session: Optional[str] = None, + tenant_user: Optional[bool] = None, + token_duration: int = 10, + logger: Optional[logging.Logger] = None, +) -> ManagerSession: + """Factory method that creates session object and performs login according to parameters + + Args: + url (str): IP address or domain name + client_id (str): client id + client_secret (str): client secret + org_name (str): organization name + subdomain: subdomain specifying to which view switch when creating provider as a tenant session, + works only on provider user mode + port (int): port + mode (LoginMode): login mode + username (str): username + session (str): session + tenant_user (bool): tenant user + token_duration (int): token duration + logger: override default module logger + + Returns: + ManagerSession: logged-in and operative session to perform tasks on SDWAN Manager. + """ + auth = ApiGwAuth( + login=ApiGwLogin( + client_id=client_id, + client_secret=client_secret, + org_name=org_name, + mode=mode, + username=username, + session=session, + tenant_user=tenant_user, + token_duration=token_duration, + ), + logger=logger, + ) + session_ = ManagerSession(base_url=create_base_url(url, port), auth=auth, subdomain=subdomain, logger=logger) + session_.state = ManagerSessionState.LOGIN + return session_ + + def create_manager_session( url: str, username: str, @@ -102,13 +169,14 @@ def create_manager_session( Returns: ManagerSession: logged-in and operative session to perform tasks on SDWAN Manager. """ - session = ManagerSession(url=url, username=username, password=password, port=port, subdomain=subdomain) - - if logger: - session.logger = logger - + auth = vManageAuth(username, password, logger=logger) + session = ManagerSession( + base_url=create_base_url(url, port), + auth=auth, + subdomain=subdomain, + logger=logger, + ) session.state = ManagerSessionState.LOGIN - session.on_session_create_hook() return session @@ -135,52 +203,55 @@ class ManagerSession(ManagerResponseAdapter, APIEndpointClient): Defines methods and handles session connectivity available for provider, provider as tenant, and tenant. Args: - url: IP address or domain name, i.e. '10.0.1.200' or 'example.com' - port: port - username: username - password: password + base_url: IP address or domain name, i.e. '10.0.1.200' or 'example.com' + auth: authentication object - vManage or API Gateway + subdomain: subdomain specifying to which view switch when creating provider as a tenant session, + works only on provider user mode + logger: override default module logger Attributes: enable_relogin (bool): defaults to True, in case that session is not properly logged-in, session will try to relogin and try the same request again - """ + api: APIContainer: container for API methods + endpoints: APIEndpointContainter: container for API endpoints + state: ManagerSessionState: current state of the session can be used to control session flow + response_trace: Callable: function that logs response and request details + server_name: str: server name + platform_version: str: platform version + api_version: Version: API version + restart_timeout: int: restart timeout in seconds + session_type: SessionType: type of session + verify: bool: verify SSL certificate - on_session_create_hook: ClassVar[Callable[[ManagerSession], Any]] = lambda *args: None + """ def __init__( self, - url: str, - username: str, - password: str, - verify: bool = False, - port: Optional[int] = None, + base_url: str, + auth: Union[vManageAuth, ApiGwAuth], subdomain: Optional[str] = None, - auth: Optional[AuthBase] = None, - validate_responses: bool = True, - ): - self.url = url - self.port = port - self.base_url = self.__create_base_url() - self.username = username - self.password = password + logger: Optional[logging.Logger] = None, + ) -> None: + self.base_url = base_url self.subdomain = subdomain self._session_type = SessionType.NOT_DEFINED self.server_name: Optional[str] = None - self.logger = logging.getLogger(__name__) + self.logger = logger or logging.getLogger(__name__) self.enable_relogin: bool = True self.response_trace: Callable[ [Optional[Response], Union[Request, PreparedRequest, None]], str ] = response_history_debug super(ManagerSession, self).__init__() + self.verify = False self.headers.update({"User-Agent": USER_AGENT}) - self.__prepare_session(verify, auth) + self._auth = auth self._platform_version: str = "" - self._api_version: Version - self._state: ManagerSessionState = ManagerSessionState.OPERATIVE + self._api_version: Version = NullVersion # type: ignore self.restart_timeout: int = 1200 self.polling_requests_timeout: int = 10 self.request_timeout: Optional[int] = None - self._validate_responses = validate_responses + self._validate_responses = True + self._state: ManagerSessionState = ManagerSessionState.OPERATIVE @cached_property def api(self) -> APIContainer: @@ -242,11 +313,9 @@ def login(self) -> ManagerSession: Returns: ManagerSession: (self) """ - self.cookies.clear_session_cookies() - self.auth = vManageAuth(self.base_url, self.username, self.password, verify=False) - self.auth.logger = self.logger - + self._auth.clear_tokens_and_cookies() + self.auth = self._auth if self.subdomain: tenant_id = self.get_tenant_id() vsession_id = self.get_virtual_session_id(tenant_id) @@ -275,12 +344,16 @@ def login(self) -> ManagerSession: ) self.logger.info( - f"Logged to vManage({self.platform_version}) as {self.username}. The session type is {self.session_type}" + f"Logged to vManage({self.platform_version}) as {self.auth}. The session type is {self.session_type}" ) - if jsessionid := self.auth.set_cookie.get("JSESSIONID"): - self.cookies.set("JSESSIONID", jsessionid) + self._set_jsessionid(self.auth) return self + def _set_jsessionid(self, auth: Union[vManageAuth, ApiGwAuth]) -> None: + if isinstance(auth, vManageAuth): + if jsessionid := auth.jsessionid: + self.cookies.set("JSESSIONID", jsessionid) + def wait_server_ready(self, timeout: int, poll_period: int = 10) -> None: """Waits until server is ready for API requests with given timeout in seconds""" @@ -356,6 +429,11 @@ def request(self, method, url, *args, **kwargs) -> ManagerResponse: self.state = ManagerSessionState.LOGIN return self.request(method, url, *args, **_kwargs) + if self.enable_relogin and response.api_gw_unauthorized and self.state == ManagerSessionState.OPERATIVE: + self.logger.warning("Logging to API GW session. Reason: unauthorized detected in response headers") + self.state = ManagerSessionState.LOGIN + return self.request(method, url, *args, **_kwargs) + if response.request.url and "passwordReset.html" in response.request.url: raise DefaultPasswordError("Password must be changed to use this session.") @@ -371,20 +449,6 @@ def get_full_url(self, url_path: str) -> str: """Returns base API url plus given url path.""" return urljoin(self.base_url, url_path) - def __create_base_url(self) -> str: - """Creates base url based on ip address or domain and port if provided. - - Returns: - str: Base url shared for every request. - """ - url = urlparse(self.url) - netloc: str = url.netloc or url.path - scheme: str = url.scheme or "https" - base_url = urlunparse((scheme, netloc, "", None, None, None)) - if self.port: - return f"{base_url}:{self.port}" # noqa: E231 - return base_url - def about(self) -> AboutInfo: return self.endpoints.client.about() @@ -448,23 +512,7 @@ def get_virtual_session_id(self, tenant_id: str) -> str: return response.json()["VSessionId"] def logout(self) -> Optional[ManagerResponse]: - response = None - if isinstance((version := self.api_version), NullVersion): - self.logger.warning("Cannot perform logout operation without known api_version.") - return response - else: - # disable automatic relogin before performing logout request - _relogin = self.enable_relogin - try: - self.enable_relogin = False - if version >= Version("20.12"): - response = self.post("/logout") - else: - response = self.get("/logout") - finally: - # restore original setting after performing logout request - self.enable_relogin = _relogin - return response + return self._auth.logout(self) def close(self) -> None: """Closes the ManagerSession. @@ -480,10 +528,6 @@ def close(self) -> None: self.logout() super().close() - def __prepare_session(self, verify: bool, auth: Optional[AuthBase]) -> None: - self.auth = auth - self.verify = verify - @property def session_type(self) -> SessionType: return self._session_type @@ -510,22 +554,4 @@ def validate_responses(self, value: bool): self._validate_responses = value def __str__(self) -> str: - return f"{self.username}@{self.base_url}" - - def __repr__(self): - return ( - f"{self.__class__.__name__}('{self.url}', '{self.username}', '{self.password}', port={self.port}, " - f"subdomain='{self.subdomain}')" - ) - - def __eq__(self, other): - if isinstance(other, ManagerSession): - comparison_list = [ - self.url == other.url, - self.username == other.username, - self.password == other.password, - self.port == other.port, - str(self.subdomain) == str(other.subdomain), - ] - return True if all(comparison_list) else False - return False + return f"ManagerSession(session_type={self.session_type}, auth={self._auth})" diff --git a/catalystwan/tests/test_administration.py b/catalystwan/tests/test_administration.py index de94636f..227168d6 100644 --- a/catalystwan/tests/test_administration.py +++ b/catalystwan/tests/test_administration.py @@ -122,13 +122,14 @@ def test_update_password(self): # Arrange username = "new_user" new_password = "PaSsWoRd" # pragma: allowlist secret + current_password = "32313123213" # pragma: allowlist secret self.api._endpoints.update_password = MagicMock() # Act - self.api.update_password(username, new_password) + self.api.update_password(username, new_password, current_password) # Assert self.api._endpoints.update_password.assert_called_once_with( username, - UserUpdateRequest(username=username, password=new_password, current_user_password=self.session.password), + UserUpdateRequest(username=username, password=new_password, current_user_password=current_password), ) def test_reset(self): diff --git a/catalystwan/tests/test_devices_api.py b/catalystwan/tests/test_devices_api.py index aaebfe40..90d9963d 100644 --- a/catalystwan/tests/test_devices_api.py +++ b/catalystwan/tests/test_devices_api.py @@ -24,6 +24,7 @@ def __init__(self, json, headers={}, cookies={}): self._json = json self.headers = headers self.cookies = cookies + self.status_code = 200 def json(self): return self._json @@ -118,20 +119,6 @@ def setUp(self) -> None: self.ips_list = [device["deviceId"] for device in self.devices] self.list_all_devices_resp = DataSequence(DeviceData, [DeviceData.model_validate(dev) for dev in self.devices]) - @patch.object(DevicesAPI, "get") - def test_controllers(self, mock_devices): - # # Arrange - # MockDevices = Mock() - # mock_devices.return_value = MockDevices - # session = Mock() - # test_object = DevicesAPI(session) - # test_object.devices = self.devices_dataclass - # # Act - # answer = test_object.get().filter(personality=[Personality.VMANAGE, Personality.VSMART]) - # # Assert - # self.assertEqual(answer, self.controllers_dataclass) - pass # TODO fix after updating .filter() - @patch("catalystwan.response.ManagerResponse") @patch("catalystwan.session.ManagerSession") def test_orchestrators(self, mock_session, mock_response): @@ -635,12 +622,6 @@ def test_get_device_wan_interfaces_empty(self, mock_session): # Assert self.assertEqual(answer, []) - def test_get_colors(self): - pass # TODO fix method before test - - def test_enable_data_stream(self): - pass # TODO fix method before test - @patch("catalystwan.session.ManagerSession") def test_get_bfd_sessions(self, mock_session): # Arrange @@ -691,7 +672,3 @@ def test_wait_for_device_state(self, mock_session): answer = DeviceStateAPI(mock_session).wait_for_device_state(device_id="1.1.1.1") # Assert self.assertTrue(answer) - - @patch("catalystwan.session.ManagerSession") - def test_wait_for_device_state_unreachable(self, mock_session): - pass # TODO fix method before test diff --git a/catalystwan/tests/test_session.py b/catalystwan/tests/test_session.py index 0b821670..4a32f46e 100644 --- a/catalystwan/tests/test_session.py +++ b/catalystwan/tests/test_session.py @@ -3,29 +3,30 @@ import unittest from typing import Optional from unittest.mock import patch -from uuid import uuid4 -import pytest # type: ignore from parameterized import parameterized # type: ignore from requests import HTTPError, Request, RequestException, Response from catalystwan.exceptions import CatalystwanException, ManagerHTTPError, ManagerRequestException -from catalystwan.session import ManagerSession +from catalystwan.session import ManagerSession, create_base_url +from catalystwan.vmanage_auth import vManageAuth -@pytest.mark.skip(reason="Session is not mocked property (#149)") class TestSession(unittest.TestCase): def setUp(self): - self.url = "example.com" + self.url = "https://example.com:80" self.username = "admin" - self.password = str(uuid4()) + self.password = "admin" # pragma: allowlist secret def test_session_str(self): # Arrange, Act - session = ManagerSession(self.url, self.username, self.password, port=111) + session = ManagerSession(self.url, auth=vManageAuth(self.username, self.password)) # Assert - self.assertEqual(str(session), "admin@https://example.com:111") + self.assertEqual( + str(session), + "ManagerSession(session_type=SessionType.NOT_DEFINED, " "auth=vManageAuth(username=admin))", + ) @parameterized.expand( [ @@ -39,57 +40,9 @@ def test_session_str(self): ) def test_base_url(self, port: Optional[int], user_url: str, expected_url: str): # Arrange, Act - session = ManagerSession(user_url, self.username, self.password, port=port) + base_url = create_base_url(user_url, port=port) # Assert - self.assertEqual(session.base_url, expected_url) - - def test_session_repr(self): - # Arrange, Act - session = ManagerSession("domain.com", "user1", "$password", port=111) - session_str = "ManagerSession('domain.com', 'user1', '$password', port=111, subdomain='None')" - # Assert - self.assertEqual(repr(session), session_str) - - def test_session_repr_different_sessions(self): - # Arrange, Act - session = ManagerSession("domain.com", "user1", "$password", port=111) - session_str = "ManagerSession('not.domain.com', 'different_user', '$password', port=111, subdomain='None')" - # Assert - self.assertNotEqual(repr(session), session_str) - - @patch("catalystwan.session.Session.__repr__") - def test_session_eval_repr(self, mock_repr): - # Arrange, Act - mock_repr.return_value = "ManagerSession('domain.com', 'user1', '$password', port=111, subdomain='None')" - session = ManagerSession("domain.com", "user1", "$password", port=111) - # Assert - self.assertEqual(eval(mock_repr()), session) - - @patch("catalystwan.session.ManagerSession.check_vmanage_server_connection") - @patch("catalystwan.session.Session.__repr__") - def test_session_eval_repr_different_sessions(self, mock_repr, mock_check_connection): - # Arrange, Act - mock_check_connection.return_value = True - mock_repr.return_value = "ManagerSession('domain.com', 'user1', '$password', port=111, subdomain='None')" - session = ManagerSession("not.domain.com", "different_user", "$password", port=111) - # Assert - self.assertNotEqual(eval(mock_repr()), session) - - def test_session_eq(self): - # Arrange, Act - session_1 = ManagerSession("domain.com", "user1", "$password", port=111) - session_2 = ManagerSession("domain.com", "user1", "$password", port=111) - # Assert - self.assertEqual(session_1, session_2) - - @patch("catalystwan.session.ManagerSession.check_vmanage_server_connection") - def test_session_eq_different_sessions(self, mock_check_connection): - # Arrange, Act - mock_check_connection.return_value = True - session_1 = ManagerSession("domain.com", "user1", "$password", port=111) - session_2 = ManagerSession("not.domain.com", "different_user", "$password", port=111) - # Assert - self.assertNotEqual(session_1, session_2) + self.assertEqual(base_url, expected_url) @parameterized.expand( [ @@ -99,35 +52,21 @@ def test_session_eq_different_sessions(self, mock_check_connection): (None, "devices", "https://example.com/devices"), ] ) - def test_get_full_url(self, port: Optional[int], url: str, full_url: str): + def test_get_full_url(self, port: Optional[int], url: str, expected_url: str): # Arrange, Act - session = ManagerSession(self.url, self.username, self.password, port=port) + base_url = create_base_url("https://example.com", port=port) + session = ManagerSession(base_url=base_url, auth=vManageAuth(base_url, self.username, self.password)) # Assert - self.assertEqual(session.get_full_url(url), full_url) - - @patch("catalystwan.session.head") - def test_check_vmanage_server_with_port(self, mock_head): - # Arrange, Act - mock_head.return_value = None - session = ManagerSession("domain.com", "user1", "$password", port=111) - answer = session.check_vmanage_server_connection() - # Assert - self.assertEqual(answer, True) - - @patch("catalystwan.session.head") - def test_check_vmanage_server_no_port(self, mock_requests): - # Arrange, Act - mock_requests.return_value = None - session = ManagerSession("domain.com", "user1", "$password") - answer = session.check_vmanage_server_connection() - # Assert - self.assertEqual(answer, True) + self.assertEqual(session.get_full_url(url), expected_url) class TestSessionExceptions(unittest.TestCase): def setUp(self): - self.session = ManagerSession(url="domain.com", username="user", password="<>") + self.session = ManagerSession( + base_url="https://domain.com:9443", + auth=vManageAuth(username="admin", password="admin"), + ) response = Response() response.json = lambda: { "error": { diff --git a/catalystwan/tests/test_tenant_management_api.py b/catalystwan/tests/test_tenant_management_api.py index 5841649f..cee182e0 100644 --- a/catalystwan/tests/test_tenant_management_api.py +++ b/catalystwan/tests/test_tenant_management_api.py @@ -104,8 +104,7 @@ def test_delete(self): def test_delete_auto_password(self): tenant_id_list = ["1"] - self.session.password = "p4s$w0rD" # pragma: allowlist secret - task = self.api.delete(tenant_id_list) + task = self.api.delete(tenant_id_list, password="test") self.assertIsInstance(task, Task) def test_get_statuses(self): diff --git a/catalystwan/tests/test_vmanage_auth.py b/catalystwan/tests/test_vmanage_auth.py index c788dfdf..8d9f5682 100644 --- a/catalystwan/tests/test_vmanage_auth.py +++ b/catalystwan/tests/test_vmanage_auth.py @@ -5,6 +5,7 @@ from uuid import uuid4 from requests import Request +from requests.cookies import RequestsCookieJar from catalystwan import USER_AGENT from catalystwan.exceptions import CatalystwanException @@ -12,14 +13,12 @@ class MockResponse: - def __init__(self, status_code: int, text: str): + def __init__(self, status_code: int, text: str, cookies: dict): self._status_code = status_code self._text = text + self.cookies = cookies self.request = Request() - def cookies(self) -> str: # TODO - return "JSESSIONID=xyz" - @property def status_code(self) -> int: return self._status_code @@ -32,8 +31,8 @@ def text(self) -> str: # TODO def mock_request_j_security_check(*args, **kwargs): url_response = { "https://1.1.1.1:1111/j_security_check": { - "admin": MockResponse(200, ""), - "invalid_username": MockResponse(200, "error"), + "admin": MockResponse(200, "", {"JSESSIONID": "xyz"}), + "invalid_username": MockResponse(200, "error", {}), } } @@ -42,19 +41,19 @@ def mock_request_j_security_check(*args, **kwargs): if full_url in url_response: return url_response[full_url][data["j_username"]] - return MockResponse(404, "error") + return MockResponse(404, "error", {}) def mock_valid_token(*args, **kw): - return MockResponse(200, "valid-token") + return MockResponse(200, "valid-token", {}) def mock_invalid_token_status(*args, **kw): - return MockResponse(503, "invalid-token") + return MockResponse(503, "invalid-token", {}) def mock_invalid_token_format(*args, **kw): - return MockResponse(200, "error") + return MockResponse(200, "error", {}) class TestvManageAuth(TestCase): @@ -62,7 +61,7 @@ def setUp(self): self.base_url = "https://1.1.1.1:1111" self.password = str(uuid4()) - @mock.patch("requests.post", side_effect=mock_request_j_security_check) + @mock.patch("catalystwan.vmanage_auth.post", side_effect=mock_request_j_security_check) def test_get_cookie(self, mock_post): # Arrange username = "admin" @@ -70,9 +69,8 @@ def test_get_cookie(self, mock_post): "j_username": username, "j_password": self.password, } - auth = vManageAuth(self.base_url, username, self.password) # Act - auth.get_cookie() + vManageAuth.get_jsessionid(self.base_url, username, self.password) # Assert mock_post.assert_called_with( @@ -82,7 +80,7 @@ def test_get_cookie(self, mock_post): headers={"Content-Type": "application/x-www-form-urlencoded", "User-Agent": USER_AGENT}, ) - @mock.patch("requests.post", side_effect=mock_request_j_security_check) + @mock.patch("catalystwan.vmanage_auth.post", side_effect=mock_request_j_security_check) def test_get_cookie_invalid_username(self, mock_post): # Arrange username = "invalid_username" @@ -90,10 +88,9 @@ def test_get_cookie_invalid_username(self, mock_post): "j_username": username, "j_password": self.password, } - auth = vManageAuth(self.base_url, username, self.password) # Act with self.assertRaises(UnauthorizedAccessError): - auth.get_cookie() + vManageAuth.get_jsessionid(self.base_url, username, self.password) # Assert mock_post.assert_called_with( @@ -103,18 +100,18 @@ def test_get_cookie_invalid_username(self, mock_post): headers={"Content-Type": "application/x-www-form-urlencoded", "User-Agent": USER_AGENT}, ) - @mock.patch("requests.cookies.RequestsCookieJar") - @mock.patch("requests.get", side_effect=mock_valid_token) - def test_fetch_token(self, mock_get, cookies): + @mock.patch("catalystwan.vmanage_auth.get", side_effect=mock_valid_token) + def test_fetch_token(self, mock_get): # Arrange valid_url = "https://1.1.1.1:1111/dataservice/client/token" - auth = vManageAuth(self.base_url, "admin", self.password) # Act - token = auth.fetch_token(cookies) + token = vManageAuth.get_xsrftoken(self.base_url, "xyz") # Assert self.assertEqual(token, "valid-token") + cookies = RequestsCookieJar() + cookies.set("JSESSIONID", "xyz") mock_get.assert_called_with( url=valid_url, verify=False, @@ -122,19 +119,15 @@ def test_fetch_token(self, mock_get, cookies): cookies=cookies, ) - @mock.patch("requests.cookies.RequestsCookieJar") - @mock.patch("requests.get", side_effect=mock_invalid_token_status) - def test_incorrect_xsrf_token_status(self, mock_get, cookies): - auth = vManageAuth("http://invalid.response", "admin", self.password) + @mock.patch("catalystwan.vmanage_auth.get", side_effect=mock_invalid_token_status) + def test_incorrect_xsrf_token_status(self, mock_get): with self.assertRaises(CatalystwanException): - auth.fetch_token(cookies) + vManageAuth.get_xsrftoken(self.base_url, "xyz") - @mock.patch("requests.cookies.RequestsCookieJar") - @mock.patch("requests.get", side_effect=mock_invalid_token_format) - def test_incorrect_xsrf_token_format(self, mock_get, cookies): - auth = vManageAuth("http://invalid.response", "admin", self.password) + @mock.patch("catalystwan.vmanage_auth.get", side_effect=mock_invalid_token_format) + def test_incorrect_xsrf_token_format(self, mock_get): with self.assertRaises(CatalystwanException): - auth.fetch_token(cookies) + vManageAuth.get_xsrftoken(self.base_url, "xyz") if __name__ == "__main__": diff --git a/catalystwan/vmanage_auth.py b/catalystwan/vmanage_auth.py index 47ce6809..a648f5d1 100644 --- a/catalystwan/vmanage_auth.py +++ b/catalystwan/vmanage_auth.py @@ -1,16 +1,21 @@ # Copyright 2022 Cisco Systems, Inc. and its affiliates import logging -from typing import Optional -from urllib.parse import urljoin +from typing import TYPE_CHECKING, Optional +from urllib.parse import urlparse -import requests -from requests import PreparedRequest, Response +from packaging.version import Version +from requests import PreparedRequest, Response, get, post from requests.auth import AuthBase from requests.cookies import RequestsCookieJar -from catalystwan import USER_AGENT, with_proc_info_header +from catalystwan import USER_AGENT from catalystwan.exceptions import CatalystwanException +from catalystwan.response import ManagerResponse +from catalystwan.version import NullVersion + +if TYPE_CHECKING: + from catalystwan.session import ManagerSession class UnauthorizedAccessError(CatalystwanException): @@ -44,100 +49,91 @@ class vManageAuth(AuthBase): The following are typical steps for a user to consume the API: 1. Log in with a user name and password to establish a session. 2. Get a cross-site request forgery prevention token, which is required for most POST operations. - - Attributes: - base_url (str): url (with port if applicable) f.e. https://1.1.1.1:1111 - username (str): vManage username - password (str): vManage user's password - verify (bool): Controls whether we verify the server's TLS certificate. - Defaults to True. - expiration_time (int): Expiration token time in seconds. - Defaults to None (unlimited). - token (str): Access token - """ - def __init__(self, base_url: str, username: str, password: str, verify: bool = False): - self.base_url = base_url + def __init__(self, username: str, password: str, logger: Optional[logging.Logger] = None): self.username = username self.password = password - self.verify = verify # TODO Handle `True` parameter - self.expiration_time: Optional[int] = None # Unlimited - self.set_cookie = RequestsCookieJar() - self.token: str = "" - self.logger = logging.getLogger(__name__) - - def get_cookie(self) -> RequestsCookieJar: - """Check whether a user is successfully authenticated. - - If a user is successfully authenticated, the response body is empty and a valid session cookie is set. - The response has entry in headers named `set-cookie` equal to JESSIONID={session hash}. - If a user is un-authenticated, the response body contains a html login page with tag in it. - - Raises: - UnauthorizedAccessError: _description_ - - Returns: - RequestsCookieJar: _description_ - """ + self.jsessionid: str = "" + self.xsrftoken: str = "" + self.logger = logger or logging.getLogger(__name__) + self._cookie: RequestsCookieJar = RequestsCookieJar() + + def __call__(self, request: PreparedRequest) -> PreparedRequest: + self.handle_auth(request) + self.build_digest_header(request) + return request + + def handle_auth(self, request: PreparedRequest): + cookie = request.headers.get("Cookie") + wrong_cookie = cookie is None or (cookie is not None and "JSESSION" not in cookie) + if self.jsessionid is None or self.xsrftoken is None or wrong_cookie: + self.authenticate(request) + + @staticmethod + def get_jsessionid(base_url: str, username: str, password: str) -> str: security_payload = { - "j_username": self.username, - "j_password": self.password, + "j_username": username, + "j_password": password, } - full_url = urljoin(self.base_url, "/j_security_check") + url = base_url + "/j_security_check" headers = {"Content-Type": "application/x-www-form-urlencoded", "User-Agent": USER_AGENT} - response = requests.post( - url=full_url, - data=security_payload, - verify=self.verify, - headers=headers, - ) - self.logger.debug(self._auth_request_debug(response, include_reponse_text=True)) - if response.text != "": - raise UnauthorizedAccessError(self.username, self.password) - return response.cookies - - def fetch_token(self, cookies: RequestsCookieJar) -> str: - """If it is required, fetches vManage REST API token. - - The XSRF token is in the response body. - XSRF token along with the JESSIONID cookie is used for ongoing API requests. - - Args: - cookies (RequestsCookieJar): The JESSIONID={session hash} cookie is required to authenticate. - - Returns: - str: Valid token. - """ - full_url = urljoin(self.base_url, "/dataservice/client/token") + response: Response = post(url=url, headers=headers, data=security_payload, verify=False) + jsessionid = response.cookies.get("JSESSIONID", "") + if response.text != "" or not isinstance(jsessionid, str) or jsessionid == "": + raise UnauthorizedAccessError(username, password) + return jsessionid + + @staticmethod + def get_xsrftoken(base_url: str, jsessionid: str) -> str: + url = base_url + "/dataservice/client/token" headers = {"Content-Type": "application/json", "User-Agent": USER_AGENT} - response = requests.get( - url=full_url, - cookies=cookies, - verify=self.verify, + cookie = RequestsCookieJar() + cookie.set("JSESSIONID", jsessionid) + response: Response = get( + url=url, + cookies=cookie, headers=headers, + verify=False, ) - self.logger.debug(self._auth_request_debug(response)) - token = response.text - if response.status_code != 200 or "" in token: + if response.status_code != 200 or "" in response.text: raise CatalystwanException("Failed to get XSRF token") - return token - - def __call__(self, prepared_request: PreparedRequest) -> PreparedRequest: - if self.expiration_time is None: - if self.token == "": - self.set_cookie = self.get_cookie() - self.token = self.fetch_token(self.set_cookie) - - prepared_request.prepare_cookies(self.set_cookie) - prepared_request.headers.update({"x-xsrf-token": self.token}) - return prepared_request - - @with_proc_info_header - def _auth_request_debug(self, response: Response, include_reponse_text: bool = False) -> str: - msg = ( - f"Authenticating: {self.username} {response.request.method} {response.request.url} <{response.status_code}>" - ) - if include_reponse_text and response.text: - msg += f" response.text: {response.text}" - return msg + return response.text + + def authenticate(self, request: PreparedRequest): + base_url = f"{str(urlparse(request.url).scheme)}://{str(urlparse(request.url).netloc)}" + self.jsessionid = self.get_jsessionid(base_url, self.username, self.password) + self._cookie = RequestsCookieJar() + self._cookie.set("JSESSIONID", self.jsessionid) + self.xsrftoken = self.get_xsrftoken(base_url, self.jsessionid) + + def build_digest_header(self, request: PreparedRequest) -> None: + request.headers["x-xsrf-token"] = self.xsrftoken + request.prepare_cookies(self._cookie) + + def logout(self, session: "ManagerSession") -> Optional[ManagerResponse]: + response = None + if isinstance((version := session.api_version), NullVersion): + session.logger.warning("Cannot perform logout operation without known api_version.") + return response + else: + # disable automatic relogin before performing logout request + _relogin = session.enable_relogin + try: + session.enable_relogin = False + if version >= Version("20.12"): + response = session.post("/logout") + else: + response = session.get("/logout") + finally: + # restore original setting after performing logout request + session.enable_relogin = _relogin + return response + + def __str__(self) -> str: + return f"vManageAuth(username={self.username})" + + def clear_tokens_and_cookies(self) -> None: + self.jsessionid = "" + self.xsrftoken = "" + self._cookie = RequestsCookieJar() diff --git a/catalystwan/workflows/tenant_migration.py b/catalystwan/workflows/tenant_migration.py index 42cf6204..67808f2c 100644 --- a/catalystwan/workflows/tenant_migration.py +++ b/catalystwan/workflows/tenant_migration.py @@ -13,7 +13,7 @@ from catalystwan.endpoints.troubleshooting_tools.device_connectivity import NPingRequest from catalystwan.exceptions import TenantMigrationPreconditionsError from catalystwan.models.tenant import TenantExport -from catalystwan.session import ManagerSession, create_manager_session +from catalystwan.session import ManagerSession, ManagerSessionState from catalystwan.utils.personality import Personality from catalystwan.utils.session_type import SessionType @@ -127,15 +127,9 @@ def migration_preconditions_check( logger.info("Checking if migrated devices can reach target validator...") conn_check = False if origin_session.session_type == SessionType.PROVIDER: - with create_manager_session( - url=origin_session.url, - username=origin_session.username, - password=origin_session.password, - port=origin_session.port, - subdomain=tenant.subdomain, - logger=origin_session.logger, - ) as provider_as_tenant_session: - conn_check = check_control_connectivity_from_edge_devices(provider_as_tenant_session, validator) + origin_session.subdomain = tenant.subdomain + origin_session.state = ManagerSessionState.LOGIN + conn_check = check_control_connectivity_from_edge_devices(origin_session, validator) else: conn_check = check_control_connectivity_from_edge_devices(origin_session, validator) if not conn_check: From 8f0eda93c3ed7ac02a6218c436f430c83baab549 Mon Sep 17 00:00:00 2001 From: Jakub Krajewski Date: Mon, 12 Aug 2024 12:29:23 +0200 Subject: [PATCH 2/6] Bump version --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 8af377cd..b8664df9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "catalystwan" -version = "0.34.0" +version = "0.35.0" description = "Cisco Catalyst WAN SDK for Python" authors = ["kagorski "] readme = "README.md" From 27d2d582403d0a29eaf099e3ef48c598a4c369b0 Mon Sep 17 00:00:00 2001 From: Jakub Krajewski Date: Mon, 12 Aug 2024 13:04:30 +0200 Subject: [PATCH 3/6] Perfom login action before check_control_connectivity_from_edge_devices() --- catalystwan/workflows/tenant_migration.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/catalystwan/workflows/tenant_migration.py b/catalystwan/workflows/tenant_migration.py index 67808f2c..1d23253f 100644 --- a/catalystwan/workflows/tenant_migration.py +++ b/catalystwan/workflows/tenant_migration.py @@ -128,8 +128,9 @@ def migration_preconditions_check( conn_check = False if origin_session.session_type == SessionType.PROVIDER: origin_session.subdomain = tenant.subdomain - origin_session.state = ManagerSessionState.LOGIN - conn_check = check_control_connectivity_from_edge_devices(origin_session, validator) + origin_session.subdomain = tenant.subdomain + with origin_session.login() as provider_as_tenant_session: + conn_check = check_control_connectivity_from_edge_devices(provider_as_tenant_session, validator) else: conn_check = check_control_connectivity_from_edge_devices(origin_session, validator) if not conn_check: From 67d5a2d6679c988d1683e4ca07966b86fe1e8dd4 Mon Sep 17 00:00:00 2001 From: Jakub Krajewski Date: Mon, 12 Aug 2024 13:22:43 +0200 Subject: [PATCH 4/6] fix --- catalystwan/vmanage_auth.py | 2 +- catalystwan/workflows/tenant_migration.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/catalystwan/vmanage_auth.py b/catalystwan/vmanage_auth.py index a648f5d1..b81cec02 100644 --- a/catalystwan/vmanage_auth.py +++ b/catalystwan/vmanage_auth.py @@ -4,7 +4,7 @@ from typing import TYPE_CHECKING, Optional from urllib.parse import urlparse -from packaging.version import Version +from packaging.version import Version # type: ignore from requests import PreparedRequest, Response, get, post from requests.auth import AuthBase from requests.cookies import RequestsCookieJar diff --git a/catalystwan/workflows/tenant_migration.py b/catalystwan/workflows/tenant_migration.py index 1d23253f..ded38096 100644 --- a/catalystwan/workflows/tenant_migration.py +++ b/catalystwan/workflows/tenant_migration.py @@ -13,7 +13,7 @@ from catalystwan.endpoints.troubleshooting_tools.device_connectivity import NPingRequest from catalystwan.exceptions import TenantMigrationPreconditionsError from catalystwan.models.tenant import TenantExport -from catalystwan.session import ManagerSession, ManagerSessionState +from catalystwan.session import ManagerSession from catalystwan.utils.personality import Personality from catalystwan.utils.session_type import SessionType @@ -128,7 +128,7 @@ def migration_preconditions_check( conn_check = False if origin_session.session_type == SessionType.PROVIDER: origin_session.subdomain = tenant.subdomain - origin_session.subdomain = tenant.subdomain + origin_session.logout() with origin_session.login() as provider_as_tenant_session: conn_check = check_control_connectivity_from_edge_devices(provider_as_tenant_session, validator) else: From e8a2a587293e84a173a7b91f1fd1ade5b9798ecc Mon Sep 17 00:00:00 2001 From: Jakub Krajewski Date: Mon, 12 Aug 2024 13:46:52 +0200 Subject: [PATCH 5/6] implement copy protocol --- catalystwan/session.py | 8 ++++++++ catalystwan/workflows/tenant_migration.py | 7 ++++--- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/catalystwan/session.py b/catalystwan/session.py index c3aa626e..a73e1d6f 100644 --- a/catalystwan/session.py +++ b/catalystwan/session.py @@ -553,5 +553,13 @@ def validate_responses(self) -> bool: def validate_responses(self, value: bool): self._validate_responses = value + def __copy__(self) -> ManagerSession: + return ManagerSession( + base_url=self.base_url, + auth=self._auth, + subdomain=self.subdomain, + logger=self.logger, + ) + def __str__(self) -> str: return f"ManagerSession(session_type={self.session_type}, auth={self._auth})" diff --git a/catalystwan/workflows/tenant_migration.py b/catalystwan/workflows/tenant_migration.py index ded38096..27fdb259 100644 --- a/catalystwan/workflows/tenant_migration.py +++ b/catalystwan/workflows/tenant_migration.py @@ -3,6 +3,7 @@ from __future__ import annotations import logging +from copy import copy from datetime import datetime from pathlib import Path from typing import Dict, List @@ -127,9 +128,9 @@ def migration_preconditions_check( logger.info("Checking if migrated devices can reach target validator...") conn_check = False if origin_session.session_type == SessionType.PROVIDER: - origin_session.subdomain = tenant.subdomain - origin_session.logout() - with origin_session.login() as provider_as_tenant_session: + as_tenant = copy(origin_session) + as_tenant.subdomain = tenant.subdomain + with as_tenant.login() as provider_as_tenant_session: conn_check = check_control_connectivity_from_edge_devices(provider_as_tenant_session, validator) else: conn_check = check_control_connectivity_from_edge_devices(origin_session, validator) From dbd6c6fa4bcf6f54fefc616f66800147aede0409 Mon Sep 17 00:00:00 2001 From: Jakub Krajewski Date: Tue, 13 Aug 2024 12:35:49 +0200 Subject: [PATCH 6/6] Enchance apigwauth get_token() --- catalystwan/apigw_auth.py | 34 +++++++++++++++++++++++----------- 1 file changed, 23 insertions(+), 11 deletions(-) diff --git a/catalystwan/apigw_auth.py b/catalystwan/apigw_auth.py index 0fbe50b0..61d03486 100644 --- a/catalystwan/apigw_auth.py +++ b/catalystwan/apigw_auth.py @@ -3,8 +3,9 @@ from urllib.parse import urlparse from pydantic import BaseModel, Field, PositiveInt -from requests import PreparedRequest, post +from requests import HTTPError, PreparedRequest, post from requests.auth import AuthBase +from requests.exceptions import JSONDecodeError from catalystwan.exceptions import CatalystwanException from catalystwan.response import ManagerResponse @@ -48,7 +49,9 @@ def handle_auth(self, request: PreparedRequest) -> None: self.authenticate(request) def authenticate(self, request: PreparedRequest): - base_url = f"{str(urlparse(request.url).scheme)}://{str(urlparse(request.url).netloc)}" + assert request.url is not None + url = urlparse(request.url) + base_url = f"{url.scheme}://{url.netloc}" self.token = self.get_token(base_url, self.login) def build_digest_header(self, request: PreparedRequest) -> None: @@ -60,15 +63,24 @@ def build_digest_header(self, request: PreparedRequest) -> None: @staticmethod def get_token(base_url: str, apigw_login: ApiGwLogin) -> str: - response = post( - url=f"{base_url}/apigw/login", - verify=False, - json=apigw_login.model_dump(exclude_none=True), - timeout=10, - ) - token = response.json().get("token", "") - if not token or not isinstance(token, str): - raise CatalystwanException("Failed to get bearer token") + try: + response = post( + url=f"{base_url}/apigw/login", + verify=False, + json=apigw_login.model_dump(exclude_none=True), + timeout=10, + ) + response.raise_for_status() + token = response.json()["token"] + except JSONDecodeError: + raise CatalystwanException(f"Incorrect response type from ApiGateway login request, ({response.text})") + except HTTPError as ex: + raise CatalystwanException(f"Problem with connection to ApiGateway login endpoint, ({ex})") + except KeyError as ex: + raise CatalystwanException(f"Not found token in login response from ApiGateway, ({ex})") + else: + if not token or not isinstance(token, str): + raise CatalystwanException("Failed to get bearer token") return token def __str__(self) -> str: