diff --git a/cve_bin_tool/cvedb.py b/cve_bin_tool/cvedb.py index 1451eaa996..99a29e4cca 100644 --- a/cve_bin_tool/cvedb.py +++ b/cve_bin_tool/cvedb.py @@ -8,6 +8,7 @@ import asyncio import datetime +import contextlib import json import logging import shutil @@ -1193,3 +1194,11 @@ def fetch_from_mirror(self, mirror, pubkey, ignore_signature, log_signature_erro else: self.clear_cached_data() return -1 + + @contextlib.contextmanager + def with_cursor(self): + cursor = self.db_open_and_get_cursor() + try: + yield cursor + finally: + self.db_close() diff --git a/cve_bin_tool/parsers/env.py b/cve_bin_tool/parsers/env.py index 48f58df38c..7d984a9c9e 100644 --- a/cve_bin_tool/parsers/env.py +++ b/cve_bin_tool/parsers/env.py @@ -6,6 +6,7 @@ import pathlib import subprocess import contextlib +import dataclasses from re import MULTILINE, compile, search from packaging.version import parse as parse_version @@ -17,6 +18,19 @@ import snoop +@dataclasses.dataclass +class EnvNamespaceConfig: + ad_hoc_cve_id: str + vendor: str + product: str + version: str + + +@dataclasses.dataclass +class EnvConfig: + namespaces: dict[str, EnvNamespaceConfig] + + class EnvParser(Parser): """ Parser for Python requirements files. @@ -59,17 +73,19 @@ def parse_file_contents(contents): if line.strip() and line.startswith("CVE_BIN_TOOL_") ] ) + namespaces = {} for i, line in enumerate(lines): key, value = line.split("=", maxsplit=1) - namespace, key = key[len("CVE_BIN_TOOL_"):].split("_", maxsplit=1) - if value.startswith("\""): + namespace, key = key[len("CVE_BIN_TOOL_") :].split("_", maxsplit=1) + if value.startswith('"'): value = value[1:] - if value.endswith("\""): + if value.endswith('"'): value = value[:-1] - # lines[i] = ((namespace, key), value) - lines[i] = (key, value) - result = lines - return dict(result) + namespaces.setdefault(namespace, {}) + namespaces[namespace][key.lower()] = value + for namespace, config in namespaces.items(): + namespaces[namespace] = EnvNamespaceConfig(**config) + return EnvConfig(namespaces=namespaces) @snoop def run_checker(self, filename): @@ -83,13 +99,42 @@ def run_checker(self, filename): self.filename = filename contents = pathlib.Path(self.filename).read_text() + env_config = self.parse_file_contents(contents) + snoop.pp(self.cve_db) - parsed_file_contents = self.parse_file_contents(contents) + # TODO Create SCITT_URN_FOR_MANIFEST_OF_EXECUTED_WORKFLOW_WITH_SARIF_OUTPUTS_DEREFERENCEABLE + # by making a request to the poligy engine and getting it's workflow + # manifest as output and deriving from that or extend it to return that. + data_source = "SCITT_URN_FOR_MANIFEST_OF_EXECUTED_WORKFLOW_WITH_SARIF_OUTPUTS_DEREFERENCEABLE" + affected_data = [ + { + "cve_id": cve.ad_hoc_cve_id, + "vendor": cve.vendor, + "product": cve.product, + "version": cve.version, + "versionStartIncluding": "", + # "versionStartIncluding": cve.version, + "versionStartExcluding": "", + "versionEndIncluding": "", + # "versionEndIncluding": cve.version, + "versionEndExcluding": "", + } + for _namespace, cve in env_config.namespaces.items() + ] + with self.cve_db.with_cursor() as cursor: + self.cve_db.populate_affected(affected_data, cursor, data_source) + + for _namespace, cve in env_config.namespaces.items(): + yield from self.find_vendor(cve.product, cve.version) - snoop.pp(parsed_file_contents) + # TODO VEX attached via linked data to ad-hoc CVE-ID return + + # TODO Remove the rest if not helpful anymore + + vendor_package_pair = self.cve_db.get_vendor_product_pairs(product) try: output = subprocess.check_output( [ diff --git a/test/test_parsers.py b/test/test_parsers.py index 75adc0659c..0d47d09921 100644 --- a/test/test_parsers.py +++ b/test/test_parsers.py @@ -59,7 +59,7 @@ CVE_BIN_TOOL_0_PRODUCT="myproduct" CVE_BIN_TOOL_0_VENDOR="myvendor" CVE_BIN_TOOL_0_VERSION="v0.0.0.dev-15abff2d529396937e18c657ecee1ed224842000" - CVE_BIN_TOOL_0_AD_HOC_CVE="CVE-0001-15004435-aa84-43ff-9c26-f703a26069f8" + CVE_BIN_TOOL_0_AD_HOC_CVE_ID="CVE-0001-15004435-aa84-43ff-9c26-f703a26069f8" """ )