Skip to content
This repository has been archived by the owner on Nov 21, 2024. It is now read-only.

Commit

Permalink
Validate XSRF token returned by server
Browse files Browse the repository at this point in the history
In same case this token can be login page and raise incorrect error in future login flow.
  • Loading branch information
smialy committed Jun 3, 2024
1 parent 7d0a3dd commit c0cc378
Show file tree
Hide file tree
Showing 7 changed files with 37 additions and 6 deletions.
Empty file.
Empty file added catalystwan/tests/__init__.py
Empty file.
Empty file.
Empty file.
Empty file.
38 changes: 33 additions & 5 deletions catalystwan/tests/test_vmanage_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from requests import Request

from catalystwan import USER_AGENT
from catalystwan.exceptions import CatalystwanException
from catalystwan.vmanage_auth import UnauthorizedAccessError, vManageAuth


Expand All @@ -28,7 +29,7 @@ def text(self) -> str: # TODO
return self._text


def mocked_requests_method(*args, **kwargs):
def mock_request_j_security_check(*args, **kwargs):
url_response = {
"https://1.1.1.1:1111/j_security_check": {
"admin": MockResponse(200, ""),
Expand All @@ -44,12 +45,24 @@ def mocked_requests_method(*args, **kwargs):
return MockResponse(404, "error")


def mock_valid_token(*args, **kw):
return MockResponse(200, "valid-token")


def mock_invalid_token_status(*args, **kw):
return MockResponse(503, "invalid-token")


def mock_invalid_token_format(*args, **kw):
return MockResponse(200, "<html>error</html>")


class TestvManageAuth(TestCase):
def setUp(self):
self.base_url = "https://1.1.1.1:1111"
self.password = str(uuid4())

@mock.patch("requests.post", side_effect=mocked_requests_method)
@mock.patch("requests.post", side_effect=mock_request_j_security_check)
def test_get_cookie(self, mock_post):
# Arrange
username = "admin"
Expand All @@ -69,7 +82,7 @@ def test_get_cookie(self, mock_post):
headers={"Content-Type": "application/x-www-form-urlencoded", "User-Agent": USER_AGENT},
)

@mock.patch("requests.post", side_effect=mocked_requests_method)
@mock.patch("requests.post", side_effect=mock_request_j_security_check)
def test_get_cookie_invalid_username(self, mock_post):
# Arrange
username = "invalid_username"
Expand All @@ -91,23 +104,38 @@ def test_get_cookie_invalid_username(self, mock_post):
)

@mock.patch("requests.cookies.RequestsCookieJar")
@mock.patch("requests.get", side_effect=mocked_requests_method)
@mock.patch("requests.get", side_effect=mock_valid_token)
def test_fetch_token(self, mock_get, cookies):
# Arrange
valid_url = "https://1.1.1.1:1111/dataservice/client/token"
auth = vManageAuth(self.base_url, "admin", self.password)

# Act
auth.fetch_token(cookies)
token = auth.fetch_token(cookies)

# Assert
self.assertEqual(token, "valid-token")
mock_get.assert_called_with(
url=valid_url,
verify=False,
headers={"Content-Type": "application/json", "User-Agent": USER_AGENT},
cookies=cookies,
)

@mock.patch("requests.cookies.RequestsCookieJar")
@mock.patch("requests.get", side_effect=mock_invalid_token_status)
def test_incorrect_xsrf_token_status(self, mock_get, cookies):
auth = vManageAuth("http://invalid.response", "admin", self.password)
with self.assertRaises(CatalystwanException):
auth.fetch_token(cookies)

@mock.patch("requests.cookies.RequestsCookieJar")
@mock.patch("requests.get", side_effect=mock_invalid_token_format)
def test_incorrect_xsrf_token_format(self, mock_get, cookies):
auth = vManageAuth("http://invalid.response", "admin", self.password)
with self.assertRaises(CatalystwanException):
auth.fetch_token(cookies)


if __name__ == "__main__":
unittest.main()
5 changes: 4 additions & 1 deletion catalystwan/vmanage_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,10 @@ def fetch_token(self, cookies: RequestsCookieJar) -> str:
headers=headers,
)
self.logger.debug(self._auth_request_debug(response))
return response.text
token = response.text
if response.status_code != 200 or "<html>" in token:
raise CatalystwanException("Failed to get XSRF token")
return token

def __call__(self, prepared_request: PreparedRequest) -> PreparedRequest:
if self.expiration_time is None:
Expand Down

0 comments on commit c0cc378

Please sign in to comment.