Skip to content

Commit

Permalink
feat: parser env add to cve_db
Browse files Browse the repository at this point in the history
Signed-off-by: John Andersen <[email protected]>
  • Loading branch information
John Andersen committed Jun 17, 2024
1 parent 18510d0 commit 1f502b8
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 143 deletions.
9 changes: 9 additions & 0 deletions cve_bin_tool/cvedb.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import asyncio
import datetime
import contextlib
import json
import logging
import shutil
Expand Down Expand Up @@ -1193,3 +1194,11 @@ def fetch_from_mirror(self, mirror, pubkey, ignore_signature, log_signature_erro
else:
self.clear_cached_data()
return -1

@contextlib.contextmanager
def with_cursor(self):
cursor = self.db_open_and_get_cursor()
try:
yield cursor
finally:
self.db_close()
195 changes: 55 additions & 140 deletions cve_bin_tool/parsers/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import pathlib
import subprocess
import contextlib
import dataclasses
from re import MULTILINE, compile, search

from packaging.version import parse as parse_version
Expand All @@ -17,6 +18,19 @@
import snoop


@dataclasses.dataclass
class EnvNamespaceConfig:
ad_hoc_cve_id: str
vendor: str
product: str
version: str


@dataclasses.dataclass
class EnvConfig:
namespaces: dict[str, EnvNamespaceConfig]


class EnvParser(Parser):
"""
Parser for Python requirements files.
Expand All @@ -31,7 +45,7 @@ class EnvParser(Parser):
def __init__(self, cve_db, logger):
"""Initialize the python requirements file parser."""
super().__init__(cve_db, logger)
self.purl_pkg_type = "pypi"
self.purl_pkg_type = "ad-hoc"

def generate_purl(self, product, vendor, qualifier={}, subpath=None):
"""Generates PURL after normalizing all components."""
Expand Down Expand Up @@ -59,158 +73,59 @@ def parse_file_contents(contents):
if line.strip() and line.startswith("CVE_BIN_TOOL_")
]
)
namespaces = {}
for i, line in enumerate(lines):
key, value = line.split("=", maxsplit=1)
namespace, key = key[len("CVE_BIN_TOOL_"):].split("_", maxsplit=1)
if value.startswith("\""):
namespace, key = key[len("CVE_BIN_TOOL_") :].split("_", maxsplit=1)
if value.startswith('"'):
value = value[1:]
if value.endswith("\""):
if value.endswith('"'):
value = value[:-1]
# lines[i] = ((namespace, key), value)
lines[i] = (key, value)
result = lines
return dict(result)
namespaces.setdefault(namespace, {})
namespaces[namespace][key.lower()] = value
for namespace, config in namespaces.items():
namespaces[namespace] = EnvNamespaceConfig(**config)
return EnvConfig(namespaces=namespaces)

@snoop
def run_checker(self, filename):
"""
Parse the requirements file and yield PURLs for the listed packages.
Parse the .env file and yield ScanInfo objects for the listed packages.
Args:
filename (str): The path to the requirements file.
filename (str): The path to the .env file.
Yields:
str: PURLs for the packages listed in the file.
str: ScanInfo objects for the packages listed in the file.
"""
self.filename = filename
contents = pathlib.Path(self.filename).read_text()

snoop.pp(self.cve_db)

parsed_file_contents = self.parse_file_contents(contents)

snoop.pp(parsed_file_contents)

return
try:
output = subprocess.check_output(
[
"pip3",
"install",
"-r",
self.filename,
"--dry-run",
"--ignore-installed",
"--report",
"-",
"--quiet",
],
stderr=subprocess.STDOUT,
)
except subprocess.CalledProcessError as e:
self.logger.error(e.output)
pip_version = str(subprocess.check_output(["pip3", "--version"]))
# Output will look like:
# pip 20.0.2 from /usr/lib/python3/dist-packages/pip (python 3.8)
pip_version = pip_version.split(" ")[1]
if parse_version(pip_version) < parse_version("22.2"):
self.logger.error(
f"{filename} not scanned: pip --dry-run was unable to get package versions."
)
self.logger.error(
"pip version >= 22.2 is required to scan Python requirements files."
)
else:
output = subprocess.check_output(
[
"pip3",
"install",
"-r",
self.filename,
"--dry-run",
"--ignore-installed",
"--report",
"-",
"--quiet",
],
)
lines = json.loads(output)
for line in lines["install"]:
product = line["metadata"]["name"]
version = line["metadata"]["version"]
purl = self.generate_purl(product, "")
vendor, result = self.find_vendor_from_purl(purl, version)

if not result:
vendor = self.find_vendor(product, version)

if vendor is not None:
yield from vendor
self.logger.debug(f"Done scanning file: {self.filename}")

"""
Parser for Python package metadata files.
This parser is designed to parse Python package metadata files (usually named
PKG-INFO or METADATA) and generate PURLs (Package URLs) for the package.
"""

a_PARSER_MATCH_FILENAMES = [
"PKG-INFO: ",
"METADATA: ",
]

def a___init__(self, cve_db, logger):
"""Initialize the python package metadata parser."""
super().__init__(cve_db, logger)
self.purl_pkg_type = "pypi"

def a_generate_purl(self, product, vendor, qualifier={}, subpath=None):
"""Generates PURL after normalizing all components."""
product = re.sub(r"[^a-zA-Z0-9._-]", "", product).lower()
env_config = self.parse_file_contents(contents)

if not product:
return None

purl = super().generate_purl(
product,
vendor,
qualifier,
subpath,
)

return purl
snoop.pp(self.cve_db)

def a_run_checker(self, filename):
"""
This generator runs only for python packages.
There are no actual checkers.
The ProductInfo is computed without the help of any checkers from PKG-INFO or METADATA.
"""
self.filename = filename
lines = parse_strings(self.filename)
lines = "\n".join(lines.splitlines()[:3])
try:
product = search(compile(r"^Name: (.+)$", MULTILINE), lines).group(1)
version = search(compile(r"^Version: (.+)$", MULTILINE), lines).group(1)
purl = self.generate_purl(product, "")
vendor, result = self.find_vendor_from_purl(purl, version)

if vendor is not None:
yield from vendor

if not result:
vendor_package_pair = self.cve_db.get_vendor_product_pairs(product)
if vendor_package_pair != []:
for pair in vendor_package_pair:
vendor = pair["vendor"]
location = pair.get("location", "/usr/local/bin/product")
file_path = self.filename
self.logger.debug(
f"{file_path} is {vendor}.{product} {version}"
)
yield ScanInfo(
ProductInfo(vendor, product, version, location), file_path
)

# There are packages with a METADATA file in them containing different data from what the tool expects
except AttributeError:
self.logger.debug(f"{filename} is an invalid METADATA/PKG-INFO")
self.logger.debug(f"Done scanning file: {filename}")
# TODO Create SCITT_URN_FOR_MANIFEST_OF_EXECUTED_WORKFLOW_WITH_SARIF_OUTPUTS_DEREFERENCEABLE
# by making a request to the poligy engine and getting it's workflow
# manifest as output and deriving from that or extend it to return that.
data_source = "SCITT_URN_FOR_MANIFEST_OF_EXECUTED_WORKFLOW_WITH_SARIF_OUTPUTS_DEREFERENCEABLE"
affected_data = [
{
"cve_id": cve.ad_hoc_cve_id,
"vendor": cve.vendor,
"product": cve.product,
"version": cve.version,
"versionStartIncluding": "",
# "versionStartIncluding": cve.version,
"versionStartExcluding": "",
"versionEndIncluding": "",
# "versionEndIncluding": cve.version,
"versionEndExcluding": "",
}
for _namespace, cve in env_config.namespaces.items()
]
with self.cve_db.with_cursor() as cursor:
self.cve_db.populate_affected(affected_data, cursor, data_source)

for _namespace, cve in env_config.namespaces.items():
yield from self.find_vendor(cve.product, cve.version)

# TODO VEX attached via linked data to ad-hoc CVE-ID
19 changes: 16 additions & 3 deletions test/test_parsers.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import pytest

from cve_bin_tool.cvedb import CVEDB
from cve_bin_tool.util import ProductInfo, ScanInfo
from cve_bin_tool.log import LOGGER
from cve_bin_tool.parsers.parse import valid_files as actual_valid_files
from cve_bin_tool.parsers.dart import DartParser
Expand Down Expand Up @@ -59,7 +60,7 @@
CVE_BIN_TOOL_0_PRODUCT="myproduct"
CVE_BIN_TOOL_0_VENDOR="myvendor"
CVE_BIN_TOOL_0_VERSION="v0.0.0.dev-15abff2d529396937e18c657ecee1ed224842000"
CVE_BIN_TOOL_0_AD_HOC_CVE="CVE-0001-15004435-aa84-43ff-9c26-f703a26069f8"
CVE_BIN_TOOL_0_AD_HOC_CVE_ID="CVE-0001-15004435-aa84-43ff-9c26-f703a26069f8"
"""
)

Expand All @@ -84,6 +85,18 @@ async def test_parser_env_test_0001(self):
results = list(env_parser.run_checker(file_path))
unittest.TestCase().assertListEqual(
results,
# TODO DEBUG Change empty list to expected value
[],
[
ScanInfo(
product_info=ProductInfo(
vendor="myvendor",
product="myproduct",
version="v0.0.0.dev-15abff2d529396937e18c657ecee1ed224842000",
# TODO location?
location="/usr/local/bin/product",
# TODO purl
purl=None,
),
file_path=file_path,
)
],
)

0 comments on commit 1f502b8

Please sign in to comment.