diff --git a/catalystwan/integration_tests/__init__.py b/catalystwan/integration_tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/catalystwan/tests/__init__.py b/catalystwan/tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/catalystwan/tests/templates/__init__.py b/catalystwan/tests/templates/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/catalystwan/tests/templates/definitions/__init__.py b/catalystwan/tests/templates/definitions/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/catalystwan/tests/templates/definitions/basic/__init__.py b/catalystwan/tests/templates/definitions/basic/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/catalystwan/tests/test_vmanage_auth.py b/catalystwan/tests/test_vmanage_auth.py index 0924e888..c788dfdf 100644 --- a/catalystwan/tests/test_vmanage_auth.py +++ b/catalystwan/tests/test_vmanage_auth.py @@ -7,6 +7,7 @@ from requests import Request from catalystwan import USER_AGENT +from catalystwan.exceptions import CatalystwanException from catalystwan.vmanage_auth import UnauthorizedAccessError, vManageAuth @@ -28,7 +29,7 @@ def text(self) -> str: # TODO return self._text -def mocked_requests_method(*args, **kwargs): +def mock_request_j_security_check(*args, **kwargs): url_response = { "https://1.1.1.1:1111/j_security_check": { "admin": MockResponse(200, ""), @@ -44,12 +45,24 @@ def mocked_requests_method(*args, **kwargs): return MockResponse(404, "error") +def mock_valid_token(*args, **kw): + return MockResponse(200, "valid-token") + + +def mock_invalid_token_status(*args, **kw): + return MockResponse(503, "invalid-token") + + +def mock_invalid_token_format(*args, **kw): + return MockResponse(200, "error") + + class TestvManageAuth(TestCase): def setUp(self): self.base_url = "https://1.1.1.1:1111" self.password = str(uuid4()) - @mock.patch("requests.post", side_effect=mocked_requests_method) + @mock.patch("requests.post", side_effect=mock_request_j_security_check) def test_get_cookie(self, mock_post): # Arrange username = "admin" @@ -69,7 +82,7 @@ def test_get_cookie(self, mock_post): headers={"Content-Type": "application/x-www-form-urlencoded", "User-Agent": USER_AGENT}, ) - @mock.patch("requests.post", side_effect=mocked_requests_method) + @mock.patch("requests.post", side_effect=mock_request_j_security_check) def test_get_cookie_invalid_username(self, mock_post): # Arrange username = "invalid_username" @@ -91,16 +104,17 @@ def test_get_cookie_invalid_username(self, mock_post): ) @mock.patch("requests.cookies.RequestsCookieJar") - @mock.patch("requests.get", side_effect=mocked_requests_method) + @mock.patch("requests.get", side_effect=mock_valid_token) def test_fetch_token(self, mock_get, cookies): # Arrange valid_url = "https://1.1.1.1:1111/dataservice/client/token" auth = vManageAuth(self.base_url, "admin", self.password) # Act - auth.fetch_token(cookies) + token = auth.fetch_token(cookies) # Assert + self.assertEqual(token, "valid-token") mock_get.assert_called_with( url=valid_url, verify=False, @@ -108,6 +122,20 @@ def test_fetch_token(self, mock_get, cookies): cookies=cookies, ) + @mock.patch("requests.cookies.RequestsCookieJar") + @mock.patch("requests.get", side_effect=mock_invalid_token_status) + def test_incorrect_xsrf_token_status(self, mock_get, cookies): + auth = vManageAuth("http://invalid.response", "admin", self.password) + with self.assertRaises(CatalystwanException): + auth.fetch_token(cookies) + + @mock.patch("requests.cookies.RequestsCookieJar") + @mock.patch("requests.get", side_effect=mock_invalid_token_format) + def test_incorrect_xsrf_token_format(self, mock_get, cookies): + auth = vManageAuth("http://invalid.response", "admin", self.password) + with self.assertRaises(CatalystwanException): + auth.fetch_token(cookies) + if __name__ == "__main__": unittest.main() diff --git a/catalystwan/vmanage_auth.py b/catalystwan/vmanage_auth.py index 2b2bf44d..47ce6809 100644 --- a/catalystwan/vmanage_auth.py +++ b/catalystwan/vmanage_auth.py @@ -118,7 +118,10 @@ def fetch_token(self, cookies: RequestsCookieJar) -> str: headers=headers, ) self.logger.debug(self._auth_request_debug(response)) - return response.text + token = response.text + if response.status_code != 200 or "" in token: + raise CatalystwanException("Failed to get XSRF token") + return token def __call__(self, prepared_request: PreparedRequest) -> PreparedRequest: if self.expiration_time is None: