diff --git a/src/macaron/malware_analyzer/pypi_heuristics/metadata/wheel_absence.py b/src/macaron/malware_analyzer/pypi_heuristics/metadata/wheel_absence.py index 618f26852..b5c8ef64f 100644 --- a/src/macaron/malware_analyzer/pypi_heuristics/metadata/wheel_absence.py +++ b/src/macaron/malware_analyzer/pypi_heuristics/metadata/wheel_absence.py @@ -6,10 +6,11 @@ import logging from macaron.errors import HeuristicAnalyzerValueError -from macaron.json_tools import JsonType +from macaron.json_tools import JsonType, json_extract from macaron.malware_analyzer.pypi_heuristics.base_analyzer import BaseHeuristicAnalyzer from macaron.malware_analyzer.pypi_heuristics.heuristics import HeuristicResult, Heuristics from macaron.slsa_analyzer.package_registry.pypi_registry import PyPIPackageJsonAsset +from macaron.util import send_head_http_raw logger: logging.Logger = logging.getLogger(__name__) @@ -23,6 +24,10 @@ class WheelAbsenceAnalyzer(BaseHeuristicAnalyzer): """ WHEEL: str = "bdist_wheel" + # as per https://github.com/pypi/inspector/blob/main/inspector/main.py line 125 + INSPECTOR_TEMPLATE = ( + "https://inspector.pypi.io/project/{name}/{version}/packages/{first}/{second}/{rest}/{filename}" + ) def __init__(self) -> None: super().__init__( @@ -47,7 +52,7 @@ def analyze(self, pypi_package_json: PyPIPackageJsonAsset) -> tuple[HeuristicRes Raises ------ HeuristicAnalyzerValueError - If there is no release information, or has no most recent version (if queried). + If there is no release information, or has other missing package information. """ releases = pypi_package_json.get_releases() if releases is None: # no release information @@ -64,21 +69,64 @@ def analyze(self, pypi_package_json: PyPIPackageJsonAsset) -> tuple[HeuristicRes logger.debug(error_msg) raise HeuristicAnalyzerValueError(error_msg) - release_files: list[JsonType] = [] + inspector_links: list[JsonType] = [] wheel_present: bool = False - try: - for release_metadata in releases[version]: - if release_metadata["packagetype"] == self.WHEEL: - wheel_present = True - - release_files.append(release_metadata["filename"]) - except KeyError as error: + release_distributions = json_extract(releases, [version], list) + if release_distributions is None: error_msg = f"The version {version} is not available as a release." logger.debug(error_msg) - raise HeuristicAnalyzerValueError(error_msg) from error + raise HeuristicAnalyzerValueError(error_msg) + + for distribution in release_distributions: + # validate data + package_type = json_extract(distribution, ["packagetype"], str) + if package_type is None: + error_msg = f"The version {version} has no 'package type' field in a distribution" + logger.debug(error_msg) + raise HeuristicAnalyzerValueError(error_msg) + + name = json_extract(pypi_package_json.package_json, ["info", "name"], str) + if name is None: + error_msg = f"The version {version} has no 'name' field in a distribution" + logger.debug(error_msg) + raise HeuristicAnalyzerValueError(error_msg) + + blake2b_256 = json_extract(distribution, ["digests", "blake2b_256"], str) + if blake2b_256 is None: + error_msg = f"The version {version} has no 'blake2b_256' field in a distribution" + logger.debug(error_msg) + raise HeuristicAnalyzerValueError(error_msg) + + filename = json_extract(distribution, ["filename"], str) + if filename is None: + error_msg = f"The version {version} has no 'filename' field in a distribution" + logger.debug(error_msg) + raise HeuristicAnalyzerValueError(error_msg) + + if package_type == self.WHEEL: + wheel_present = True + + inspector_link = self.INSPECTOR_TEMPLATE.format( + name=name, + version=version, + first=blake2b_256[0:2], + second=blake2b_256[2:4], + rest=blake2b_256[4:], + filename=filename, + ) + + # use a head request because we don't care about the response contents + if send_head_http_raw(inspector_link) is None: + inspector_links.append(None) + else: + inspector_links.append(inspector_link) + + detail_info: dict[str, JsonType] = { + "inspector_links": inspector_links, + } if wheel_present: - return HeuristicResult.PASS, {version: release_files} + return HeuristicResult.PASS, detail_info - return HeuristicResult.FAIL, {version: release_files} + return HeuristicResult.FAIL, detail_info diff --git a/src/macaron/util.py b/src/macaron/util.py index 8fdc41f3e..047d14125 100644 --- a/src/macaron/util.py +++ b/src/macaron/util.py @@ -59,6 +59,72 @@ def send_get_http(url: str, headers: dict) -> dict: return dict(response.json()) +def send_head_http_raw( + url: str, headers: dict | None = None, timeout: int | None = None, allow_redirects: bool = True +) -> Response | None: + """Send the HEAD HTTP request with the given url and headers. + + This method also handle logging when the API server return error status code. + + Parameters + ---------- + url : str + The url of the request. + headers : dict | None + The dict that describes the headers of the request. + timeout: int | None + The request timeout (optional). + allow_redirects: bool + Whether to allow redirects. Default: True. + + Returns + ------- + Response | None + If a Response object is returned and ``allow_redirects`` is ``True`` (the default) it will have a status code of + 200 (OK). If ``allow_redirects`` is ``False`` the response can instead have a status code of 302. Otherwise, the + request has failed and ``None`` will be returned. + """ + logger.debug("HEAD - %s", url) + if not timeout: + timeout = defaults.getint("requests", "timeout", fallback=10) + error_retries = defaults.getint("requests", "error_retries", fallback=5) + retry_counter = error_retries + try: + response = requests.head( + url=url, + headers=headers, + timeout=timeout, + allow_redirects=allow_redirects, + ) + except requests.exceptions.RequestException as error: + logger.debug(error) + return None + if not allow_redirects and response.status_code == 302: + # Found, most likely because a redirect is about to happen. + return response + while response.status_code != 200: + logger.debug( + "Receiving error code %s from server.", + response.status_code, + ) + if retry_counter <= 0: + logger.debug("Maximum retries reached: %s", error_retries) + return None + if response.status_code == 403: + check_rate_limit(response) + else: + return None + retry_counter = retry_counter - 1 + response = requests.head( + url=url, + headers=headers, + timeout=timeout, + allow_redirects=allow_redirects, + ) + + return response + + def send_get_http_raw( url: str, headers: dict | None = None, timeout: int | None = None, allow_redirects: bool = True ) -> Response | None: diff --git a/tests/malware_analyzer/pypi/test_wheel_absence.py b/tests/malware_analyzer/pypi/test_wheel_absence.py index 718417927..76138c336 100644 --- a/tests/malware_analyzer/pypi/test_wheel_absence.py +++ b/tests/malware_analyzer/pypi/test_wheel_absence.py @@ -2,7 +2,7 @@ # Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. """Tests for heuristic detecting wheel (.whl) file absence from PyPI packages""" -from unittest.mock import MagicMock +from unittest.mock import MagicMock, patch import pytest @@ -21,11 +21,23 @@ def test_analyze_no_information(pypi_package_json: MagicMock) -> None: analyzer.analyze(pypi_package_json) -def test_analyze_tar_present(pypi_package_json: MagicMock) -> None: +# Note: to patch a function, the way it is imported matters. +# e.g. if it is imported like this: import os; os.listdir() then you patch os.listdir +# if it is imported like this: from os import listdir; listdir() then you patch .listdir +@patch("macaron.malware_analyzer.pypi_heuristics.metadata.wheel_absence.send_head_http_raw") +def test_analyze_tar_present(mock_send_head_http_raw: MagicMock, pypi_package_json: MagicMock) -> None: """Test for when only .tar.gz is present, so failed""" analyzer = WheelAbsenceAnalyzer() version = "0.1.0" filename = "ttttttttest_nester.py-0.1.0.tar.gz" + url = ( + "https://files.pythonhosted.org/packages/de/fa/" + + f"2fbcebaeeb909511139ce28dac4a77ab2452ba72b49a22b12981b2f375b3/{filename}" + ) + inspector_link_expected = ( + "https://inspector.pypi.io/project/ttttttttest_nester/0.1.0/packages/" + + f"de/fa/2fbcebaeeb909511139ce28dac4a77ab2452ba72b49a22b12981b2f375b3/{filename}" + ) release = { version: [ @@ -46,8 +58,7 @@ def test_analyze_tar_present(pypi_package_json: MagicMock) -> None: "size": 546, "upload_time": "2016-10-13T05:42:27", "upload_time_iso_8601": "2016-10-13T05:42:27.073842Z", - "url": f"https://files.pythonhosted.org/packages/de/fa/2fbcebaeeb909511139ce28d \ - ac4a77ab2452ba72b49a22b12981b2f375b3/{filename}", + "url": url, "yanked": False, "yanked_reason": None, } @@ -57,18 +68,34 @@ def test_analyze_tar_present(pypi_package_json: MagicMock) -> None: pypi_package_json.get_releases.return_value = release pypi_package_json.get_latest_version.return_value = version pypi_package_json.component.version = None - expected_result: tuple[HeuristicResult, dict] = (HeuristicResult.FAIL, {version: [filename]}) + pypi_package_json.package_json = {"info": {"name": "ttttttttest_nester"}} + mock_send_head_http_raw.return_value = MagicMock() # assume valid URL for testing purposes + + expected_detail_info = { + "inspector_links": [inspector_link_expected], + } + + expected_result: tuple[HeuristicResult, dict] = (HeuristicResult.FAIL, expected_detail_info) actual_result = analyzer.analyze(pypi_package_json) assert actual_result == expected_result -def test_analyze_whl_present(pypi_package_json: MagicMock) -> None: +@patch("macaron.malware_analyzer.pypi_heuristics.metadata.wheel_absence.send_head_http_raw") +def test_analyze_whl_present(mock_send_head_http_raw: MagicMock, pypi_package_json: MagicMock) -> None: """Test for when only .whl is present, so pass""" analyzer = WheelAbsenceAnalyzer() version = "0.1.0" filename = "ttttttttest_nester.py-0.1.0.whl" + url = ( + "https://files.pythonhosted.org/packages/de/fa/" + + f"2fbcebaeeb909511139ce28dac4a77ab2452ba72b49a22b12981b2f375b3/{filename}" + ) + inspector_link_expected = ( + "https://inspector.pypi.io/project/ttttttttest_nester/0.1.0/packages/" + + f"de/fa/2fbcebaeeb909511139ce28dac4a77ab2452ba72b49a22b12981b2f375b3/{filename}" + ) release = { version: [ @@ -89,8 +116,7 @@ def test_analyze_whl_present(pypi_package_json: MagicMock) -> None: "size": 546, "upload_time": "2016-10-13T05:42:27", "upload_time_iso_8601": "2016-10-13T05:42:27.073842Z", - "url": f"https://files.pythonhosted.org/packages/de/fa/2fbcebaeeb909511139ce28d \ - ac4a77ab2452ba72b49a22b12981b2f375b3/{filename}", + "url": url, "yanked": False, "yanked_reason": None, } @@ -99,18 +125,42 @@ def test_analyze_whl_present(pypi_package_json: MagicMock) -> None: pypi_package_json.get_releases.return_value = release pypi_package_json.component.version = version - expected_result: tuple[HeuristicResult, dict] = (HeuristicResult.PASS, {version: [filename]}) + pypi_package_json.package_json = {"info": {"name": "ttttttttest_nester"}} + mock_send_head_http_raw.return_value = MagicMock() # assume valid URL for testing purposes + + expected_detail_info = { + "inspector_links": [inspector_link_expected], + } + + expected_result: tuple[HeuristicResult, dict] = (HeuristicResult.PASS, expected_detail_info) actual_result = analyzer.analyze(pypi_package_json) assert actual_result == expected_result -def test_analyze_both_present(pypi_package_json: MagicMock) -> None: +@patch("macaron.malware_analyzer.pypi_heuristics.metadata.wheel_absence.send_head_http_raw") +def test_analyze_both_present(mock_send_head_http_raw: MagicMock, pypi_package_json: MagicMock) -> None: """Test for when both .tar.gz and .whl are present, so passed""" analyzer = WheelAbsenceAnalyzer() version = "0.1.0" file_prefix = "ttttttttest_nester.py-0.1.0" + wheel_url = ( + "https://files.pythonhosted.org/packages/de/fa/" + + f"2fbcebaeeb909511139ce28dac4a77ab2452ba72b49a22b12981b2f375b3/{file_prefix}.whl" + ) + tar_url = ( + "https://files.pythonhosted.org/packages/de/fa/" + + f"2fbcebaeeb909511139ce28dac4a77ab2452ba72b49a22b12981b2f375b3/{file_prefix}.tar.gz" + ) + wheel_link_expected = ( + "https://inspector.pypi.io/project/ttttttttest_nester/0.1.0/packages/" + + f"de/fa/2fbcebaeeb909511139ce28dac4a77ab2452ba72b49a22b12981b2f375b3/{file_prefix}.whl" + ) + tar_link_expected = ( + "https://inspector.pypi.io/project/ttttttttest_nester/0.1.0/packages/" + + f"de/fa/2fbcebaeeb909511139ce28dac4a77ab2452ba72b49a22b12981b2f375b3/{file_prefix}.tar.gz" + ) release = { version: [ @@ -131,8 +181,7 @@ def test_analyze_both_present(pypi_package_json: MagicMock) -> None: "size": 546, "upload_time": "2016-10-13T05:42:27", "upload_time_iso_8601": "2016-10-13T05:42:27.073842Z", - "url": f"https://files.pythonhosted.org/packages/de/fa/2fbcebaeeb909511139ce28d \ - ac4a77ab2452ba72b49a22b12981b2f375b3/{file_prefix}.whl", + "url": wheel_url, "yanked": False, "yanked_reason": None, }, @@ -153,8 +202,7 @@ def test_analyze_both_present(pypi_package_json: MagicMock) -> None: "size": 546, "upload_time": "2016-10-13T05:42:27", "upload_time_iso_8601": "2016-10-13T05:42:27.073842Z", - "url": f"https://files.pythonhosted.org/packages/de/fa/2fbcebaeeb909511139ce28d \ - ac4a77ab2452ba72b49a22b12981b2f375b3/{file_prefix}.tar.gz", + "url": tar_url, "yanked": False, "yanked_reason": None, }, @@ -163,10 +211,14 @@ def test_analyze_both_present(pypi_package_json: MagicMock) -> None: pypi_package_json.get_releases.return_value = release pypi_package_json.component.version = version - expected_result: tuple[HeuristicResult, dict] = ( - HeuristicResult.PASS, - {version: [f"{file_prefix}.whl", f"{file_prefix}.tar.gz"]}, - ) + pypi_package_json.package_json = {"info": {"name": "ttttttttest_nester"}} + mock_send_head_http_raw.return_value = MagicMock() # assume valid URL for testing purposes + + expected_detail_info = { + "inspector_links": [wheel_link_expected, tar_link_expected], + } + + expected_result: tuple[HeuristicResult, dict] = (HeuristicResult.PASS, expected_detail_info) actual_result = analyzer.analyze(pypi_package_json)