diff --git a/src/uv_secure/dependency_checker/dependency_checker.py b/src/uv_secure/dependency_checker/dependency_checker.py index bf67db2..4f258f1 100644 --- a/src/uv_secure/dependency_checker/dependency_checker.py +++ b/src/uv_secure/dependency_checker/dependency_checker.py @@ -1,89 +1,14 @@ import asyncio from pathlib import Path -import re -import sys -from typing import Optional -import httpx import inflect -from pydantic import BaseModel from rich.console import Console from rich.panel import Panel from rich.table import Table from rich.text import Text import typer - -# Conditional import for toml -if sys.version_info >= (3, 11): - import tomllib as toml -else: - import tomli as toml - - -class Dependency(BaseModel): - name: str - version: str - - -class Vulnerability(BaseModel): - id: str - details: str - fixed_in: Optional[list[str]] = None - aliases: Optional[list[str]] = None - link: Optional[str] = None - source: Optional[str] = None - summary: Optional[str] = None - withdrawn: Optional[str] = None - - -def parse_uv_lock_file(file_path: Path) -> list[Dependency]: - """Parses a uv.lock TOML file and extracts package PyPi dependencies""" - with file_path.open("rb") as f: - data = toml.load(f) - - package_data = data.get("package", []) - return [ - Dependency(name=package["name"], version=package["version"]) - for package in package_data - if package.get("source", {}).get("registry") == "https://pypi.org/simple" - ] - - -def canonicalize_name(name: str) -> str: - """Converts a package name to its canonical form for PyPI URLs""" - return re.sub(r"[_.]+", "-", name).lower() - - -async def fetch_vulnerabilities( - client: httpx.AsyncClient, dependency: Dependency -) -> tuple[Dependency, list[Vulnerability]]: - """Queries the PyPi JSON API for vulnerabilities of a given dependency.""" - canonical_name = canonicalize_name(dependency.name) - url = f"https://pypi.org/pypi/{canonical_name}/{dependency.version}/json" - try: - response = await client.get(url) - if response.status_code == 200: - data = response.json() - vulnerabilities = [ - Vulnerability(**v) for v in data.get("vulnerabilities", []) - ] - return dependency, vulnerabilities - typer.echo( - f"Warning: Could not fetch data for {dependency.name}=={dependency.version}" - ) - except httpx.RequestError as e: - typer.echo(f"Error fetching {dependency.name}=={dependency.version}: {e}") - return dependency, [] - - -async def check_all_vulnerabilities( - dependencies: list[Dependency], -) -> list[tuple[Dependency, list[Vulnerability]]]: - """Fetch vulnerabilities for all dependencies concurrently.""" - async with httpx.AsyncClient(timeout=10) as client: - tasks = [fetch_vulnerabilities(client, dep) for dep in dependencies] - return await asyncio.gather(*tasks) +from uv_secure.package_info import download_vulnerabilities, parse_uv_lock_file def check_dependencies(uv_lock_path: Path, ignore_ids: list[str]) -> int: @@ -99,7 +24,7 @@ def check_dependencies(uv_lock_path: Path, ignore_ids: list[str]) -> int: f"[bold cyan]Checking {uv_lock_path} dependencies for vulnerabilities...[/]" ) - results = asyncio.run(check_all_vulnerabilities(dependencies)) + results = asyncio.run(download_vulnerabilities(dependencies)) total_dependencies = len(results) vulnerable_count = 0 diff --git a/src/uv_secure/package_info/__init__.py b/src/uv_secure/package_info/__init__.py new file mode 100644 index 0000000..d350d93 --- /dev/null +++ b/src/uv_secure/package_info/__init__.py @@ -0,0 +1,5 @@ +from uv_secure.package_info.lock_file_parser import parse_uv_lock_file +from uv_secure.package_info.vulnerability_downloader import download_vulnerabilities + + +__all__ = ["download_vulnerabilities", "parse_uv_lock_file"] diff --git a/src/uv_secure/package_info/lock_file_parser.py b/src/uv_secure/package_info/lock_file_parser.py new file mode 100644 index 0000000..2af5c11 --- /dev/null +++ b/src/uv_secure/package_info/lock_file_parser.py @@ -0,0 +1,29 @@ +from pathlib import Path +import sys + +from pydantic import BaseModel + + +# Conditional import for toml +if sys.version_info >= (3, 11): + import tomllib as toml +else: + import tomli as toml + + +class Dependency(BaseModel): + name: str + version: str + + +def parse_uv_lock_file(file_path: Path) -> list[Dependency]: + """Parses a uv.lock TOML file and extracts package PyPi dependencies""" + with file_path.open("rb") as f: + data = toml.load(f) + + package_data = data.get("package", []) + return [ + Dependency(name=package["name"], version=package["version"]) + for package in package_data + if package.get("source", {}).get("registry") == "https://pypi.org/simple" + ] diff --git a/src/uv_secure/package_info/vulnerability_downloader.py b/src/uv_secure/package_info/vulnerability_downloader.py new file mode 100644 index 0000000..697b993 --- /dev/null +++ b/src/uv_secure/package_info/vulnerability_downloader.py @@ -0,0 +1,56 @@ +import asyncio +import re +from typing import Optional + +import httpx +from pydantic import BaseModel +import typer + +from uv_secure.package_info.lock_file_parser import Dependency + + +class Vulnerability(BaseModel): + id: str + details: str + fixed_in: Optional[list[str]] = None + aliases: Optional[list[str]] = None + link: Optional[str] = None + source: Optional[str] = None + summary: Optional[str] = None + withdrawn: Optional[str] = None + + +def _canonicalize_name(name: str) -> str: + """Converts a package name to its canonical form for PyPI URLs""" + return re.sub(r"[_.]+", "-", name).lower() + + +async def _download_package_vulnerabilities( + client: httpx.AsyncClient, dependency: Dependency +) -> tuple[Dependency, list[Vulnerability]]: + """Queries the PyPi JSON API for vulnerabilities of a given dependency.""" + canonical_name = _canonicalize_name(dependency.name) + url = f"https://pypi.org/pypi/{canonical_name}/{dependency.version}/json" + try: + response = await client.get(url) + if response.status_code == 200: + data = response.json() + vulnerabilities = [ + Vulnerability(**v) for v in data.get("vulnerabilities", []) + ] + return dependency, vulnerabilities + typer.echo( + f"Warning: Could not fetch data for {dependency.name}=={dependency.version}" + ) + except httpx.RequestError as e: + typer.echo(f"Error fetching {dependency.name}=={dependency.version}: {e}") + return dependency, [] + + +async def download_vulnerabilities( + dependencies: list[Dependency], +) -> list[tuple[Dependency, list[Vulnerability]]]: + """Fetch vulnerabilities for all dependencies concurrently.""" + async with httpx.AsyncClient(timeout=10) as client: + tasks = [_download_package_vulnerabilities(client, dep) for dep in dependencies] + return await asyncio.gather(*tasks)