diff --git a/cve_bin_tool/cvedb.py b/cve_bin_tool/cvedb.py index 1451eaa996..99a29e4cca 100644 --- a/cve_bin_tool/cvedb.py +++ b/cve_bin_tool/cvedb.py @@ -8,6 +8,7 @@ import asyncio import datetime +import contextlib import json import logging import shutil @@ -1193,3 +1194,11 @@ def fetch_from_mirror(self, mirror, pubkey, ignore_signature, log_signature_erro else: self.clear_cached_data() return -1 + + @contextlib.contextmanager + def with_cursor(self): + cursor = self.db_open_and_get_cursor() + try: + yield cursor + finally: + self.db_close() diff --git a/cve_bin_tool/parsers/env.py b/cve_bin_tool/parsers/env.py index 48f58df38c..d8c5675825 100644 --- a/cve_bin_tool/parsers/env.py +++ b/cve_bin_tool/parsers/env.py @@ -6,6 +6,7 @@ import pathlib import subprocess import contextlib +import dataclasses from re import MULTILINE, compile, search from packaging.version import parse as parse_version @@ -17,6 +18,19 @@ import snoop +@dataclasses.dataclass +class EnvNamespaceConfig: + ad_hoc_cve_id: str + vendor: str + product: str + version: str + + +@dataclasses.dataclass +class EnvConfig: + namespaces: dict[str, EnvNamespaceConfig] + + class EnvParser(Parser): """ Parser for Python requirements files. @@ -31,7 +45,7 @@ class EnvParser(Parser): def __init__(self, cve_db, logger): """Initialize the python requirements file parser.""" super().__init__(cve_db, logger) - self.purl_pkg_type = "pypi" + self.purl_pkg_type = "ad-hoc" def generate_purl(self, product, vendor, qualifier={}, subpath=None): """Generates PURL after normalizing all components.""" @@ -59,158 +73,59 @@ def parse_file_contents(contents): if line.strip() and line.startswith("CVE_BIN_TOOL_") ] ) + namespaces = {} for i, line in enumerate(lines): key, value = line.split("=", maxsplit=1) - namespace, key = key[len("CVE_BIN_TOOL_"):].split("_", maxsplit=1) - if value.startswith("\""): + namespace, key = key[len("CVE_BIN_TOOL_") :].split("_", maxsplit=1) + if value.startswith('"'): value = value[1:] - if value.endswith("\""): + if value.endswith('"'): value = value[:-1] - # lines[i] = ((namespace, key), value) - lines[i] = (key, value) - result = lines - return dict(result) + namespaces.setdefault(namespace, {}) + namespaces[namespace][key.lower()] = value + for namespace, config in namespaces.items(): + namespaces[namespace] = EnvNamespaceConfig(**config) + return EnvConfig(namespaces=namespaces) @snoop def run_checker(self, filename): """ - Parse the requirements file and yield PURLs for the listed packages. + Parse the .env file and yield ScanInfo objects for the listed packages. Args: - filename (str): The path to the requirements file. + filename (str): The path to the .env file. Yields: - str: PURLs for the packages listed in the file. + str: ScanInfo objects for the packages listed in the file. """ self.filename = filename contents = pathlib.Path(self.filename).read_text() - snoop.pp(self.cve_db) - - parsed_file_contents = self.parse_file_contents(contents) - - snoop.pp(parsed_file_contents) - - return - try: - output = subprocess.check_output( - [ - "pip3", - "install", - "-r", - self.filename, - "--dry-run", - "--ignore-installed", - "--report", - "-", - "--quiet", - ], - stderr=subprocess.STDOUT, - ) - except subprocess.CalledProcessError as e: - self.logger.error(e.output) - pip_version = str(subprocess.check_output(["pip3", "--version"])) - # Output will look like: - # pip 20.0.2 from /usr/lib/python3/dist-packages/pip (python 3.8) - pip_version = pip_version.split(" ")[1] - if parse_version(pip_version) < parse_version("22.2"): - self.logger.error( - f"{filename} not scanned: pip --dry-run was unable to get package versions." - ) - self.logger.error( - "pip version >= 22.2 is required to scan Python requirements files." - ) - else: - output = subprocess.check_output( - [ - "pip3", - "install", - "-r", - self.filename, - "--dry-run", - "--ignore-installed", - "--report", - "-", - "--quiet", - ], - ) - lines = json.loads(output) - for line in lines["install"]: - product = line["metadata"]["name"] - version = line["metadata"]["version"] - purl = self.generate_purl(product, "") - vendor, result = self.find_vendor_from_purl(purl, version) - - if not result: - vendor = self.find_vendor(product, version) - - if vendor is not None: - yield from vendor - self.logger.debug(f"Done scanning file: {self.filename}") - - """ - Parser for Python package metadata files. - This parser is designed to parse Python package metadata files (usually named - PKG-INFO or METADATA) and generate PURLs (Package URLs) for the package. - """ - - a_PARSER_MATCH_FILENAMES = [ - "PKG-INFO: ", - "METADATA: ", - ] - - def a___init__(self, cve_db, logger): - """Initialize the python package metadata parser.""" - super().__init__(cve_db, logger) - self.purl_pkg_type = "pypi" - - def a_generate_purl(self, product, vendor, qualifier={}, subpath=None): - """Generates PURL after normalizing all components.""" - product = re.sub(r"[^a-zA-Z0-9._-]", "", product).lower() + env_config = self.parse_file_contents(contents) - if not product: - return None - - purl = super().generate_purl( - product, - vendor, - qualifier, - subpath, - ) - - return purl + snoop.pp(self.cve_db) - def a_run_checker(self, filename): - """ - This generator runs only for python packages. - There are no actual checkers. - The ProductInfo is computed without the help of any checkers from PKG-INFO or METADATA. - """ - self.filename = filename - lines = parse_strings(self.filename) - lines = "\n".join(lines.splitlines()[:3]) - try: - product = search(compile(r"^Name: (.+)$", MULTILINE), lines).group(1) - version = search(compile(r"^Version: (.+)$", MULTILINE), lines).group(1) - purl = self.generate_purl(product, "") - vendor, result = self.find_vendor_from_purl(purl, version) - - if vendor is not None: - yield from vendor - - if not result: - vendor_package_pair = self.cve_db.get_vendor_product_pairs(product) - if vendor_package_pair != []: - for pair in vendor_package_pair: - vendor = pair["vendor"] - location = pair.get("location", "/usr/local/bin/product") - file_path = self.filename - self.logger.debug( - f"{file_path} is {vendor}.{product} {version}" - ) - yield ScanInfo( - ProductInfo(vendor, product, version, location), file_path - ) - - # There are packages with a METADATA file in them containing different data from what the tool expects - except AttributeError: - self.logger.debug(f"{filename} is an invalid METADATA/PKG-INFO") - self.logger.debug(f"Done scanning file: {filename}") + # TODO Create SCITT_URN_FOR_MANIFEST_OF_EXECUTED_WORKFLOW_WITH_SARIF_OUTPUTS_DEREFERENCEABLE + # by making a request to the poligy engine and getting it's workflow + # manifest as output and deriving from that or extend it to return that. + data_source = "SCITT_URN_FOR_MANIFEST_OF_EXECUTED_WORKFLOW_WITH_SARIF_OUTPUTS_DEREFERENCEABLE" + affected_data = [ + { + "cve_id": cve.ad_hoc_cve_id, + "vendor": cve.vendor, + "product": cve.product, + "version": cve.version, + "versionStartIncluding": "", + # "versionStartIncluding": cve.version, + "versionStartExcluding": "", + "versionEndIncluding": "", + # "versionEndIncluding": cve.version, + "versionEndExcluding": "", + } + for _namespace, cve in env_config.namespaces.items() + ] + with self.cve_db.with_cursor() as cursor: + self.cve_db.populate_affected(affected_data, cursor, data_source) + + for _namespace, cve in env_config.namespaces.items(): + yield from self.find_vendor(cve.product, cve.version) + + # TODO VEX attached via linked data to ad-hoc CVE-ID diff --git a/test/test_parsers.py b/test/test_parsers.py index 75adc0659c..44e668b4d2 100644 --- a/test/test_parsers.py +++ b/test/test_parsers.py @@ -11,6 +11,7 @@ import pytest from cve_bin_tool.cvedb import CVEDB +from cve_bin_tool.util import ProductInfo, ScanInfo from cve_bin_tool.log import LOGGER from cve_bin_tool.parsers.parse import valid_files as actual_valid_files from cve_bin_tool.parsers.dart import DartParser @@ -59,7 +60,7 @@ CVE_BIN_TOOL_0_PRODUCT="myproduct" CVE_BIN_TOOL_0_VENDOR="myvendor" CVE_BIN_TOOL_0_VERSION="v0.0.0.dev-15abff2d529396937e18c657ecee1ed224842000" - CVE_BIN_TOOL_0_AD_HOC_CVE="CVE-0001-15004435-aa84-43ff-9c26-f703a26069f8" + CVE_BIN_TOOL_0_AD_HOC_CVE_ID="CVE-0001-15004435-aa84-43ff-9c26-f703a26069f8" """ ) @@ -84,6 +85,18 @@ async def test_parser_env_test_0001(self): results = list(env_parser.run_checker(file_path)) unittest.TestCase().assertListEqual( results, - # TODO DEBUG Change empty list to expected value - [], + [ + ScanInfo( + product_info=ProductInfo( + vendor="myvendor", + product="myproduct", + version="v0.0.0.dev-15abff2d529396937e18c657ecee1ed224842000", + # TODO location? + location="/usr/local/bin/product", + # TODO purl + purl=None, + ), + file_path=file_path, + ) + ], )