Skip to content

Commit

Permalink
add config.tls_verify to control TLS certs check (#217)
Browse files Browse the repository at this point in the history
* add config.tls_verify to control TLS certs check

RapiDAST sometimes may need to contact servers on its own. This may
happen during:
- OAuth pre-authentication
- manual download of OpenAPI, or GraphQL schemas

The application being tested may only have self signed certs, and
importing in the default CA location just for a temporary scan may
sometimes not be ideal.

This commit adds the config.tls_verify, which maps to python's requests
"verify", so that TLS certificate verification can be either disabled or
mapped to a non-default CA location.

NOTE: In some cases, RapiDAST manually downloads ZAP extensions from
upstream (in zap_none). I purposefully did NOT optionally disable this
verification, for security reasons (if github.com does not have a valid
certificate, this would be reason to worry about)

* renamed config tls_verify to tls_verify_for_rapidast_downloads
  • Loading branch information
cedricbu authored Oct 1, 2024
1 parent 630a06d commit 36037c0
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 27 deletions.
4 changes: 4 additions & 0 deletions config/config-template-zap-long.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ config:
# all the results of all scanners will be stored under that location
base_results_dir: "./results"

# In RapiDAST only: should RapiDAST verify certificates
# possible values: true [default], false, /path/to/a/PEM/file
tls_verify_for_rapidast_downloads: true

# Import a particular environment, and inject it for each scanner
environ:
envFile: "path/to/env/file"
Expand Down
13 changes: 9 additions & 4 deletions scanners/downloaders.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import yaml


def anonymous_download(url, dest=None, proxy=None):
def anonymous_download(url, dest=None, proxy=None, verify=None):
"""Given a URL, load it using a GET request to dest"""

logging.debug(f"Downloading {url}")
Expand All @@ -14,7 +14,7 @@ def anonymous_download(url, dest=None, proxy=None):
"https": f"http://{proxy['proxyHost']}:{proxy['proxyPort']}",
"http": f"http://{proxy['proxyHost']}:{proxy['proxyPort']}",
}
resp = requests.get(url, allow_redirects=True, proxies=proxy)
resp = requests.get(url, allow_redirects=True, proxies=proxy, verify=verify)
if resp.status_code >= 400:
logging.warning(f"Download {url} failed with {resp.status_code}.")
return False
Expand All @@ -29,14 +29,17 @@ def anonymous_download(url, dest=None, proxy=None):
return resp.content


def oauth2_get_token_from_rtoken(auth, proxy=None, session=None):
def oauth2_get_token_from_rtoken(auth, proxy=None, session=None, verify=None):
"""Given a rtoken, retrieve and return a Bearer token
auth is in the form { url, client_id, rtoken }
NOTE: if a session is provided, `verify` will not overwrite the session's `verify` state
"""

if session is None:
session = requests.Session()
if verify is not None:
session.verify = verify

headers = {
"Accept": "application/json",
Expand Down Expand Up @@ -76,10 +79,12 @@ def oauth2_get_token_from_rtoken(auth, proxy=None, session=None):
return token


def authenticated_download_with_rtoken(url, dest, auth, proxy=None):
def authenticated_download_with_rtoken(url, dest, auth, proxy=None, verify=None):
"""Given a URL and Oauth2 authentication parameters, download the URL and store it at `dest`"""

session = requests.Session()
if verify is not None:
session.verify = verify

# get a token
token = oauth2_get_token_from_rtoken(auth, proxy, session)
Expand Down
10 changes: 8 additions & 2 deletions scanners/zap/zap.py
Original file line number Diff line number Diff line change
Expand Up @@ -838,7 +838,10 @@ def authentication_set_oauth2_rtoken(self):
"rtoken": rtoken,
"url": token_endpoint,
}
token = oauth2_get_token_from_rtoken(auth, proxy=self.my_conf("proxy"))
verify = self.config.get("config.tls_verify_for_rapidast_downloads", True)
token = oauth2_get_token_from_rtoken(
auth, proxy=self.my_conf("proxy"), verify=verify
)
if token:
# Delete previous config, and creating a new one
logging.debug(
Expand Down Expand Up @@ -949,8 +952,11 @@ def _manual_oauth2_download(self, auth, proxy):

for change in changes:
url = self.my_conf(change.config_url)
verify = self.config.get("config.tls_verify_for_rapidast_downloads", True)
if url:
if authenticated_download_with_rtoken(url, change.path, auth, proxy):
if authenticated_download_with_rtoken(
url, change.path, auth, proxy, verify=verify
):
logging.info(
f"Successful download of scanner's {change.config_url}"
)
Expand Down
42 changes: 21 additions & 21 deletions tests/scanners/test_downloaders.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
from unittest.mock import Mock
from collections import namedtuple
from unittest.mock import MagicMock
from unittest.mock import Mock
from unittest.mock import patch

import pytest

from scanners import downloaders

from collections import namedtuple

@pytest.fixture(scope="function")
def my_auth():
Expand All @@ -16,64 +16,64 @@ def my_auth():
"rtoken": "aut_rtoken",
}


@pytest.fixture(scope="function")
def my_proxy():
proxy = {
"proxyHost": "proxyHost",
"proxyPort": "proxyPort",
}


@patch("scanners.downloaders.requests.get")
def test_anonymous_download(mock_get, my_proxy):
def request_get(url, allow_redirects=True, proxies=None):
def request_get(url, allow_redirects=True, proxies=None, verify=True):
Response = namedtuple("Response", ["status_code", "content"])
return Response(status_code=200, content="content")


mock_get.side_effect = request_get

ret = downloaders.anonymous_download("url", dest=None, proxy=my_proxy)

assert ret == "content"




@patch("scanners.downloaders.requests.Session")
def test_oauth2_get_token_from_rtoken(mock_session, my_auth, my_proxy):
def fake_Session():
def fake_post(url, **kwargs):
class fake_Session:
def post(self, url, **kwargs):
Post = namedtuple("Post", ["raise_for_status", "text"])
return Post(raise_for_status=lambda: None, text=b"{'access_token':123}")

Session = namedtuple("Session", ["post"])
return Session(post=fake_post)

mock_session.side_effect = fake_Session

rtoken = downloaders.oauth2_get_token_from_rtoken(auth=my_auth, proxy=my_proxy, session=None)
rtoken = downloaders.oauth2_get_token_from_rtoken(
auth=my_auth, proxy=my_proxy, session=None
)

assert rtoken == 123


@patch("scanners.downloaders.requests.Session")
@patch("scanners.downloaders.oauth2_get_token_from_rtoken")
@patch("builtins.open")
def test_authenticated_download_with_rtoken(mock_open, mock_get_rtoken, mock_session, my_auth, my_proxy):
def fake_Session():
def fake_post(url, **kwargs):
def test_authenticated_download_with_rtoken(
mock_open, mock_get_rtoken, mock_session, my_auth, my_proxy
):
class fake_Session:
def post(self, url, **kwargs):
Post = namedtuple("Post", ["raise_for_status", "text"])
return Post(raise_for_status=lambda: None, text=b"{'access_token':123}")
def fake_get(url, **kwargs):

def get(self, url, **kwargs):
Get = namedtuple("Get", ["status_code", "text"])
return Get(status_code=200, text="text")

Session = namedtuple("Session", ["post", "get"])
return Session(post=fake_post, get=fake_get)

mock_session.side_effect = fake_Session
mock_get_rtoken.return_value = "123"
mock_open.return_value = MagicMock()

res = downloaders.authenticated_download_with_rtoken("url", "Nowhere", auth=my_auth, proxy=my_proxy)
res = downloaders.authenticated_download_with_rtoken(
"url", "Nowhere", auth=my_auth, proxy=my_proxy
)
assert res == True

0 comments on commit 36037c0

Please sign in to comment.