Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: include inspector package urls as part of the malicious metadata facts for pypi packages #935

Merged
merged 10 commits into from
Dec 6, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@
import logging

from macaron.errors import HeuristicAnalyzerValueError
from macaron.json_tools import JsonType
from macaron.json_tools import JsonType, json_extract
from macaron.malware_analyzer.pypi_heuristics.base_analyzer import BaseHeuristicAnalyzer
from macaron.malware_analyzer.pypi_heuristics.heuristics import HeuristicResult, Heuristics
from macaron.slsa_analyzer.package_registry.pypi_registry import PyPIPackageJsonAsset
from macaron.util import send_head_http_raw

logger: logging.Logger = logging.getLogger(__name__)

Expand All @@ -23,6 +24,10 @@ class WheelAbsenceAnalyzer(BaseHeuristicAnalyzer):
"""

WHEEL: str = "bdist_wheel"
# as per https://github.com/pypi/inspector/blob/main/inspector/main.py line 125
INSPECTOR_TEMPLATE = (
"https://inspector.pypi.io/project/{name}/{version}/packages/{first}/{second}/{rest}/{filename}"
)

def __init__(self) -> None:
super().__init__(
Expand All @@ -47,7 +52,7 @@ def analyze(self, pypi_package_json: PyPIPackageJsonAsset) -> tuple[HeuristicRes
Raises
------
HeuristicAnalyzerValueError
If there is no release information, or has no most recent version (if queried).
If there is no release information, or has other missing package information.
"""
releases = pypi_package_json.get_releases()
if releases is None: # no release information
Expand All @@ -64,21 +69,64 @@ def analyze(self, pypi_package_json: PyPIPackageJsonAsset) -> tuple[HeuristicRes
logger.debug(error_msg)
raise HeuristicAnalyzerValueError(error_msg)

release_files: list[JsonType] = []
inspector_links: list[JsonType] = []
wheel_present: bool = False

try:
for release_metadata in releases[version]:
if release_metadata["packagetype"] == self.WHEEL:
wheel_present = True

release_files.append(release_metadata["filename"])
except KeyError as error:
release_distributions = json_extract(releases, [version], list)
if release_distributions is None:
error_msg = f"The version {version} is not available as a release."
logger.debug(error_msg)
raise HeuristicAnalyzerValueError(error_msg) from error
raise HeuristicAnalyzerValueError(error_msg)

for distribution in release_distributions:
# validate data
package_type = json_extract(distribution, ["packagetype"], str)
if package_type is None:
error_msg = f"The version {version} has no 'package type' field in a distribution"
logger.debug(error_msg)
raise HeuristicAnalyzerValueError(error_msg)

name = json_extract(pypi_package_json.package_json, ["info", "name"], str)
if name is None:
error_msg = f"The version {version} has no 'name' field in a distribution"
logger.debug(error_msg)
raise HeuristicAnalyzerValueError(error_msg)

blake2b_256 = json_extract(distribution, ["digests", "blake2b_256"], str)
if blake2b_256 is None:
error_msg = f"The version {version} has no 'blake2b_256' field in a distribution"
logger.debug(error_msg)
raise HeuristicAnalyzerValueError(error_msg)

filename = json_extract(distribution, ["filename"], str)
if filename is None:
error_msg = f"The version {version} has no 'filename' field in a distribution"
logger.debug(error_msg)
raise HeuristicAnalyzerValueError(error_msg)

if package_type == self.WHEEL:
wheel_present = True

inspector_link = self.INSPECTOR_TEMPLATE.format(
name=name,
version=version,
first=blake2b_256[0:2],
second=blake2b_256[2:4],
rest=blake2b_256[4:],
filename=filename,
)

# use a head request because we don't care about the response contents
if send_head_http_raw(inspector_link) is None:
inspector_links.append(None)
else:
inspector_links.append(inspector_link)

detail_info: dict[str, JsonType] = {
"inspector_links": inspector_links,
}

if wheel_present:
return HeuristicResult.PASS, {version: release_files}
return HeuristicResult.PASS, detail_info

return HeuristicResult.FAIL, {version: release_files}
return HeuristicResult.FAIL, detail_info
66 changes: 66 additions & 0 deletions src/macaron/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,72 @@ def send_get_http(url: str, headers: dict) -> dict:
return dict(response.json())


def send_head_http_raw(
url: str, headers: dict | None = None, timeout: int | None = None, allow_redirects: bool = True
) -> Response | None:
"""Send the HEAD HTTP request with the given url and headers.

This method also handle logging when the API server return error status code.

Parameters
----------
url : str
The url of the request.
headers : dict | None
The dict that describes the headers of the request.
timeout: int | None
The request timeout (optional).
allow_redirects: bool
Whether to allow redirects. Default: True.

Returns
-------
Response | None
If a Response object is returned and ``allow_redirects`` is ``True`` (the default) it will have a status code of
200 (OK). If ``allow_redirects`` is ``False`` the response can instead have a status code of 302. Otherwise, the
request has failed and ``None`` will be returned.
"""
logger.debug("HEAD - %s", url)
if not timeout:
timeout = defaults.getint("requests", "timeout", fallback=10)
error_retries = defaults.getint("requests", "error_retries", fallback=5)
retry_counter = error_retries
try:
response = requests.head(
url=url,
headers=headers,
timeout=timeout,
allow_redirects=allow_redirects,
)
except requests.exceptions.RequestException as error:
logger.debug(error)
return None
if not allow_redirects and response.status_code == 302:
# Found, most likely because a redirect is about to happen.
return response
while response.status_code != 200:
logger.debug(
"Receiving error code %s from server.",
response.status_code,
)
if retry_counter <= 0:
logger.debug("Maximum retries reached: %s", error_retries)
return None
if response.status_code == 403:
check_rate_limit(response)
else:
return None
retry_counter = retry_counter - 1
response = requests.head(
url=url,
headers=headers,
timeout=timeout,
allow_redirects=allow_redirects,
)

return response


def send_get_http_raw(
url: str, headers: dict | None = None, timeout: int | None = None, allow_redirects: bool = True
) -> Response | None:
Expand Down
88 changes: 70 additions & 18 deletions tests/malware_analyzer/pypi/test_wheel_absence.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/.

"""Tests for heuristic detecting wheel (.whl) file absence from PyPI packages"""
from unittest.mock import MagicMock
from unittest.mock import MagicMock, patch

import pytest

Expand All @@ -21,11 +21,23 @@ def test_analyze_no_information(pypi_package_json: MagicMock) -> None:
analyzer.analyze(pypi_package_json)


def test_analyze_tar_present(pypi_package_json: MagicMock) -> None:
# Note: to patch a function, the way it is imported matters.
# e.g. if it is imported like this: import os; os.listdir() then you patch os.listdir
# if it is imported like this: from os import listdir; listdir() then you patch <module>.listdir
@patch("macaron.malware_analyzer.pypi_heuristics.metadata.wheel_absence.send_head_http_raw")
def test_analyze_tar_present(mock_send_head_http_raw: MagicMock, pypi_package_json: MagicMock) -> None:
"""Test for when only .tar.gz is present, so failed"""
analyzer = WheelAbsenceAnalyzer()
version = "0.1.0"
filename = "ttttttttest_nester.py-0.1.0.tar.gz"
url = (
"https://files.pythonhosted.org/packages/de/fa/"
+ f"2fbcebaeeb909511139ce28dac4a77ab2452ba72b49a22b12981b2f375b3/{filename}"
)
inspector_link_expected = (
"https://inspector.pypi.io/project/ttttttttest_nester/0.1.0/packages/"
+ f"de/fa/2fbcebaeeb909511139ce28dac4a77ab2452ba72b49a22b12981b2f375b3/{filename}"
)

release = {
version: [
Expand All @@ -46,8 +58,7 @@ def test_analyze_tar_present(pypi_package_json: MagicMock) -> None:
"size": 546,
"upload_time": "2016-10-13T05:42:27",
"upload_time_iso_8601": "2016-10-13T05:42:27.073842Z",
"url": f"https://files.pythonhosted.org/packages/de/fa/2fbcebaeeb909511139ce28d \
ac4a77ab2452ba72b49a22b12981b2f375b3/{filename}",
"url": url,
"yanked": False,
"yanked_reason": None,
}
Expand All @@ -57,18 +68,34 @@ def test_analyze_tar_present(pypi_package_json: MagicMock) -> None:
pypi_package_json.get_releases.return_value = release
pypi_package_json.get_latest_version.return_value = version
pypi_package_json.component.version = None
expected_result: tuple[HeuristicResult, dict] = (HeuristicResult.FAIL, {version: [filename]})
pypi_package_json.package_json = {"info": {"name": "ttttttttest_nester"}}
mock_send_head_http_raw.return_value = MagicMock() # assume valid URL for testing purposes

expected_detail_info = {
"inspector_links": [inspector_link_expected],
}

expected_result: tuple[HeuristicResult, dict] = (HeuristicResult.FAIL, expected_detail_info)

actual_result = analyzer.analyze(pypi_package_json)

assert actual_result == expected_result


def test_analyze_whl_present(pypi_package_json: MagicMock) -> None:
@patch("macaron.malware_analyzer.pypi_heuristics.metadata.wheel_absence.send_head_http_raw")
def test_analyze_whl_present(mock_send_head_http_raw: MagicMock, pypi_package_json: MagicMock) -> None:
"""Test for when only .whl is present, so pass"""
analyzer = WheelAbsenceAnalyzer()
version = "0.1.0"
filename = "ttttttttest_nester.py-0.1.0.whl"
url = (
"https://files.pythonhosted.org/packages/de/fa/"
+ f"2fbcebaeeb909511139ce28dac4a77ab2452ba72b49a22b12981b2f375b3/{filename}"
)
inspector_link_expected = (
"https://inspector.pypi.io/project/ttttttttest_nester/0.1.0/packages/"
+ f"de/fa/2fbcebaeeb909511139ce28dac4a77ab2452ba72b49a22b12981b2f375b3/{filename}"
)

release = {
version: [
Expand All @@ -89,8 +116,7 @@ def test_analyze_whl_present(pypi_package_json: MagicMock) -> None:
"size": 546,
"upload_time": "2016-10-13T05:42:27",
"upload_time_iso_8601": "2016-10-13T05:42:27.073842Z",
"url": f"https://files.pythonhosted.org/packages/de/fa/2fbcebaeeb909511139ce28d \
ac4a77ab2452ba72b49a22b12981b2f375b3/{filename}",
"url": url,
"yanked": False,
"yanked_reason": None,
}
Expand All @@ -99,18 +125,42 @@ def test_analyze_whl_present(pypi_package_json: MagicMock) -> None:

pypi_package_json.get_releases.return_value = release
pypi_package_json.component.version = version
expected_result: tuple[HeuristicResult, dict] = (HeuristicResult.PASS, {version: [filename]})
pypi_package_json.package_json = {"info": {"name": "ttttttttest_nester"}}
mock_send_head_http_raw.return_value = MagicMock() # assume valid URL for testing purposes

expected_detail_info = {
"inspector_links": [inspector_link_expected],
}

expected_result: tuple[HeuristicResult, dict] = (HeuristicResult.PASS, expected_detail_info)

actual_result = analyzer.analyze(pypi_package_json)

assert actual_result == expected_result


def test_analyze_both_present(pypi_package_json: MagicMock) -> None:
@patch("macaron.malware_analyzer.pypi_heuristics.metadata.wheel_absence.send_head_http_raw")
def test_analyze_both_present(mock_send_head_http_raw: MagicMock, pypi_package_json: MagicMock) -> None:
"""Test for when both .tar.gz and .whl are present, so passed"""
analyzer = WheelAbsenceAnalyzer()
version = "0.1.0"
file_prefix = "ttttttttest_nester.py-0.1.0"
wheel_url = (
"https://files.pythonhosted.org/packages/de/fa/"
+ f"2fbcebaeeb909511139ce28dac4a77ab2452ba72b49a22b12981b2f375b3/{file_prefix}.whl"
)
tar_url = (
"https://files.pythonhosted.org/packages/de/fa/"
+ f"2fbcebaeeb909511139ce28dac4a77ab2452ba72b49a22b12981b2f375b3/{file_prefix}.tar.gz"
)
wheel_link_expected = (
"https://inspector.pypi.io/project/ttttttttest_nester/0.1.0/packages/"
+ f"de/fa/2fbcebaeeb909511139ce28dac4a77ab2452ba72b49a22b12981b2f375b3/{file_prefix}.whl"
)
tar_link_expected = (
"https://inspector.pypi.io/project/ttttttttest_nester/0.1.0/packages/"
+ f"de/fa/2fbcebaeeb909511139ce28dac4a77ab2452ba72b49a22b12981b2f375b3/{file_prefix}.tar.gz"
)

release = {
version: [
Expand All @@ -131,8 +181,7 @@ def test_analyze_both_present(pypi_package_json: MagicMock) -> None:
"size": 546,
"upload_time": "2016-10-13T05:42:27",
"upload_time_iso_8601": "2016-10-13T05:42:27.073842Z",
"url": f"https://files.pythonhosted.org/packages/de/fa/2fbcebaeeb909511139ce28d \
ac4a77ab2452ba72b49a22b12981b2f375b3/{file_prefix}.whl",
"url": wheel_url,
"yanked": False,
"yanked_reason": None,
},
Expand All @@ -153,8 +202,7 @@ def test_analyze_both_present(pypi_package_json: MagicMock) -> None:
"size": 546,
"upload_time": "2016-10-13T05:42:27",
"upload_time_iso_8601": "2016-10-13T05:42:27.073842Z",
"url": f"https://files.pythonhosted.org/packages/de/fa/2fbcebaeeb909511139ce28d \
ac4a77ab2452ba72b49a22b12981b2f375b3/{file_prefix}.tar.gz",
"url": tar_url,
"yanked": False,
"yanked_reason": None,
},
Expand All @@ -163,10 +211,14 @@ def test_analyze_both_present(pypi_package_json: MagicMock) -> None:

pypi_package_json.get_releases.return_value = release
pypi_package_json.component.version = version
expected_result: tuple[HeuristicResult, dict] = (
HeuristicResult.PASS,
{version: [f"{file_prefix}.whl", f"{file_prefix}.tar.gz"]},
)
pypi_package_json.package_json = {"info": {"name": "ttttttttest_nester"}}
mock_send_head_http_raw.return_value = MagicMock() # assume valid URL for testing purposes

expected_detail_info = {
"inspector_links": [wheel_link_expected, tar_link_expected],
}

expected_result: tuple[HeuristicResult, dict] = (HeuristicResult.PASS, expected_detail_info)

actual_result = analyzer.analyze(pypi_package_json)

Expand Down
Loading