diff --git a/config/config-template-zap-long.yaml b/config/config-template-zap-long.yaml index 9c286b9..79051f5 100644 --- a/config/config-template-zap-long.yaml +++ b/config/config-template-zap-long.yaml @@ -15,6 +15,10 @@ config: # all the results of all scanners will be stored under that location base_results_dir: "./results" + # In RapiDAST only: should RapiDAST verify certificates + # possible values: true [default], false, /path/to/a/PEM/file + tls_verify_for_rapidast_downloads: true + # Import a particular environment, and inject it for each scanner environ: envFile: "path/to/env/file" diff --git a/scanners/downloaders.py b/scanners/downloaders.py index cc26e31..67ff44c 100644 --- a/scanners/downloaders.py +++ b/scanners/downloaders.py @@ -5,7 +5,7 @@ import yaml -def anonymous_download(url, dest=None, proxy=None): +def anonymous_download(url, dest=None, proxy=None, verify=None): """Given a URL, load it using a GET request to dest""" logging.debug(f"Downloading {url}") @@ -14,7 +14,7 @@ def anonymous_download(url, dest=None, proxy=None): "https": f"http://{proxy['proxyHost']}:{proxy['proxyPort']}", "http": f"http://{proxy['proxyHost']}:{proxy['proxyPort']}", } - resp = requests.get(url, allow_redirects=True, proxies=proxy) + resp = requests.get(url, allow_redirects=True, proxies=proxy, verify=verify) if resp.status_code >= 400: logging.warning(f"Download {url} failed with {resp.status_code}.") return False @@ -29,14 +29,17 @@ def anonymous_download(url, dest=None, proxy=None): return resp.content -def oauth2_get_token_from_rtoken(auth, proxy=None, session=None): +def oauth2_get_token_from_rtoken(auth, proxy=None, session=None, verify=None): """Given a rtoken, retrieve and return a Bearer token auth is in the form { url, client_id, rtoken } + NOTE: if a session is provided, `verify` will not overwrite the session's `verify` state """ if session is None: session = requests.Session() + if verify is not None: + session.verify = verify headers = { "Accept": "application/json", @@ -76,10 +79,12 @@ def oauth2_get_token_from_rtoken(auth, proxy=None, session=None): return token -def authenticated_download_with_rtoken(url, dest, auth, proxy=None): +def authenticated_download_with_rtoken(url, dest, auth, proxy=None, verify=None): """Given a URL and Oauth2 authentication parameters, download the URL and store it at `dest`""" session = requests.Session() + if verify is not None: + session.verify = verify # get a token token = oauth2_get_token_from_rtoken(auth, proxy, session) diff --git a/scanners/zap/zap.py b/scanners/zap/zap.py index 4f30ba7..5eaa0b5 100644 --- a/scanners/zap/zap.py +++ b/scanners/zap/zap.py @@ -838,7 +838,10 @@ def authentication_set_oauth2_rtoken(self): "rtoken": rtoken, "url": token_endpoint, } - token = oauth2_get_token_from_rtoken(auth, proxy=self.my_conf("proxy")) + verify = self.config.get("config.tls_verify_for_rapidast_downloads", True) + token = oauth2_get_token_from_rtoken( + auth, proxy=self.my_conf("proxy"), verify=verify + ) if token: # Delete previous config, and creating a new one logging.debug( @@ -949,8 +952,11 @@ def _manual_oauth2_download(self, auth, proxy): for change in changes: url = self.my_conf(change.config_url) + verify = self.config.get("config.tls_verify_for_rapidast_downloads", True) if url: - if authenticated_download_with_rtoken(url, change.path, auth, proxy): + if authenticated_download_with_rtoken( + url, change.path, auth, proxy, verify=verify + ): logging.info( f"Successful download of scanner's {change.config_url}" ) diff --git a/tests/scanners/test_downloaders.py b/tests/scanners/test_downloaders.py index 36ac95d..f48f02a 100644 --- a/tests/scanners/test_downloaders.py +++ b/tests/scanners/test_downloaders.py @@ -1,12 +1,12 @@ -from unittest.mock import Mock +from collections import namedtuple from unittest.mock import MagicMock +from unittest.mock import Mock from unittest.mock import patch import pytest from scanners import downloaders -from collections import namedtuple @pytest.fixture(scope="function") def my_auth(): @@ -16,6 +16,7 @@ def my_auth(): "rtoken": "aut_rtoken", } + @pytest.fixture(scope="function") def my_proxy(): proxy = { @@ -23,13 +24,13 @@ def my_proxy(): "proxyPort": "proxyPort", } + @patch("scanners.downloaders.requests.get") def test_anonymous_download(mock_get, my_proxy): - def request_get(url, allow_redirects=True, proxies=None): + def request_get(url, allow_redirects=True, proxies=None, verify=True): Response = namedtuple("Response", ["status_code", "content"]) return Response(status_code=200, content="content") - mock_get.side_effect = request_get ret = downloaders.anonymous_download("url", dest=None, proxy=my_proxy) @@ -37,43 +38,42 @@ def request_get(url, allow_redirects=True, proxies=None): assert ret == "content" - - @patch("scanners.downloaders.requests.Session") def test_oauth2_get_token_from_rtoken(mock_session, my_auth, my_proxy): - def fake_Session(): - def fake_post(url, **kwargs): + class fake_Session: + def post(self, url, **kwargs): Post = namedtuple("Post", ["raise_for_status", "text"]) return Post(raise_for_status=lambda: None, text=b"{'access_token':123}") - Session = namedtuple("Session", ["post"]) - return Session(post=fake_post) - mock_session.side_effect = fake_Session - rtoken = downloaders.oauth2_get_token_from_rtoken(auth=my_auth, proxy=my_proxy, session=None) + rtoken = downloaders.oauth2_get_token_from_rtoken( + auth=my_auth, proxy=my_proxy, session=None + ) assert rtoken == 123 + @patch("scanners.downloaders.requests.Session") @patch("scanners.downloaders.oauth2_get_token_from_rtoken") @patch("builtins.open") -def test_authenticated_download_with_rtoken(mock_open, mock_get_rtoken, mock_session, my_auth, my_proxy): - def fake_Session(): - def fake_post(url, **kwargs): +def test_authenticated_download_with_rtoken( + mock_open, mock_get_rtoken, mock_session, my_auth, my_proxy +): + class fake_Session: + def post(self, url, **kwargs): Post = namedtuple("Post", ["raise_for_status", "text"]) return Post(raise_for_status=lambda: None, text=b"{'access_token':123}") - def fake_get(url, **kwargs): + + def get(self, url, **kwargs): Get = namedtuple("Get", ["status_code", "text"]) return Get(status_code=200, text="text") - Session = namedtuple("Session", ["post", "get"]) - return Session(post=fake_post, get=fake_get) - mock_session.side_effect = fake_Session mock_get_rtoken.return_value = "123" mock_open.return_value = MagicMock() - res = downloaders.authenticated_download_with_rtoken("url", "Nowhere", auth=my_auth, proxy=my_proxy) + res = downloaders.authenticated_download_with_rtoken( + "url", "Nowhere", auth=my_auth, proxy=my_proxy + ) assert res == True -