Skip to content

Commit

Permalink
Refactored the lock file parsing and vulnerability downloading into s…
Browse files Browse the repository at this point in the history
…eparate modules
  • Loading branch information
owenlamont committed Dec 22, 2024
1 parent 1b335b9 commit 49e0c50
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 77 deletions.
79 changes: 2 additions & 77 deletions src/uv_secure/dependency_checker/dependency_checker.py
Original file line number Diff line number Diff line change
@@ -1,89 +1,14 @@
import asyncio
from pathlib import Path
import re
import sys
from typing import Optional

import httpx
import inflect
from pydantic import BaseModel
from rich.console import Console
from rich.panel import Panel
from rich.table import Table
from rich.text import Text
import typer


# Conditional import for toml
if sys.version_info >= (3, 11):
import tomllib as toml
else:
import tomli as toml


class Dependency(BaseModel):
name: str
version: str


class Vulnerability(BaseModel):
id: str
details: str
fixed_in: Optional[list[str]] = None
aliases: Optional[list[str]] = None
link: Optional[str] = None
source: Optional[str] = None
summary: Optional[str] = None
withdrawn: Optional[str] = None


def parse_uv_lock_file(file_path: Path) -> list[Dependency]:
"""Parses a uv.lock TOML file and extracts package PyPi dependencies"""
with file_path.open("rb") as f:
data = toml.load(f)

package_data = data.get("package", [])
return [
Dependency(name=package["name"], version=package["version"])
for package in package_data
if package.get("source", {}).get("registry") == "https://pypi.org/simple"
]


def canonicalize_name(name: str) -> str:
"""Converts a package name to its canonical form for PyPI URLs"""
return re.sub(r"[_.]+", "-", name).lower()


async def fetch_vulnerabilities(
client: httpx.AsyncClient, dependency: Dependency
) -> tuple[Dependency, list[Vulnerability]]:
"""Queries the PyPi JSON API for vulnerabilities of a given dependency."""
canonical_name = canonicalize_name(dependency.name)
url = f"https://pypi.org/pypi/{canonical_name}/{dependency.version}/json"
try:
response = await client.get(url)
if response.status_code == 200:
data = response.json()
vulnerabilities = [
Vulnerability(**v) for v in data.get("vulnerabilities", [])
]
return dependency, vulnerabilities
typer.echo(
f"Warning: Could not fetch data for {dependency.name}=={dependency.version}"
)
except httpx.RequestError as e:
typer.echo(f"Error fetching {dependency.name}=={dependency.version}: {e}")
return dependency, []


async def check_all_vulnerabilities(
dependencies: list[Dependency],
) -> list[tuple[Dependency, list[Vulnerability]]]:
"""Fetch vulnerabilities for all dependencies concurrently."""
async with httpx.AsyncClient(timeout=10) as client:
tasks = [fetch_vulnerabilities(client, dep) for dep in dependencies]
return await asyncio.gather(*tasks)
from uv_secure.package_info import download_vulnerabilities, parse_uv_lock_file


def check_dependencies(uv_lock_path: Path, ignore_ids: list[str]) -> int:
Expand All @@ -99,7 +24,7 @@ def check_dependencies(uv_lock_path: Path, ignore_ids: list[str]) -> int:
f"[bold cyan]Checking {uv_lock_path} dependencies for vulnerabilities...[/]"
)

results = asyncio.run(check_all_vulnerabilities(dependencies))
results = asyncio.run(download_vulnerabilities(dependencies))

total_dependencies = len(results)
vulnerable_count = 0
Expand Down
5 changes: 5 additions & 0 deletions src/uv_secure/package_info/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from uv_secure.package_info.lock_file_parser import parse_uv_lock_file
from uv_secure.package_info.vulnerability_downloader import download_vulnerabilities


__all__ = ["download_vulnerabilities", "parse_uv_lock_file"]
29 changes: 29 additions & 0 deletions src/uv_secure/package_info/lock_file_parser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from pathlib import Path
import sys

from pydantic import BaseModel


# Conditional import for toml
if sys.version_info >= (3, 11):
import tomllib as toml
else:
import tomli as toml


class Dependency(BaseModel):
name: str
version: str


def parse_uv_lock_file(file_path: Path) -> list[Dependency]:
"""Parses a uv.lock TOML file and extracts package PyPi dependencies"""
with file_path.open("rb") as f:
data = toml.load(f)

package_data = data.get("package", [])
return [
Dependency(name=package["name"], version=package["version"])
for package in package_data
if package.get("source", {}).get("registry") == "https://pypi.org/simple"
]
56 changes: 56 additions & 0 deletions src/uv_secure/package_info/vulnerability_downloader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import asyncio
import re
from typing import Optional

import httpx
from pydantic import BaseModel
import typer

from uv_secure.package_info.lock_file_parser import Dependency


class Vulnerability(BaseModel):
id: str
details: str
fixed_in: Optional[list[str]] = None
aliases: Optional[list[str]] = None
link: Optional[str] = None
source: Optional[str] = None
summary: Optional[str] = None
withdrawn: Optional[str] = None


def _canonicalize_name(name: str) -> str:
"""Converts a package name to its canonical form for PyPI URLs"""
return re.sub(r"[_.]+", "-", name).lower()


async def _download_package_vulnerabilities(
client: httpx.AsyncClient, dependency: Dependency
) -> tuple[Dependency, list[Vulnerability]]:
"""Queries the PyPi JSON API for vulnerabilities of a given dependency."""
canonical_name = _canonicalize_name(dependency.name)
url = f"https://pypi.org/pypi/{canonical_name}/{dependency.version}/json"
try:
response = await client.get(url)
if response.status_code == 200:
data = response.json()
vulnerabilities = [
Vulnerability(**v) for v in data.get("vulnerabilities", [])
]
return dependency, vulnerabilities
typer.echo(
f"Warning: Could not fetch data for {dependency.name}=={dependency.version}"
)
except httpx.RequestError as e:
typer.echo(f"Error fetching {dependency.name}=={dependency.version}: {e}")
return dependency, []


async def download_vulnerabilities(
dependencies: list[Dependency],
) -> list[tuple[Dependency, list[Vulnerability]]]:
"""Fetch vulnerabilities for all dependencies concurrently."""
async with httpx.AsyncClient(timeout=10) as client:
tasks = [_download_package_vulnerabilities(client, dep) for dep in dependencies]
return await asyncio.gather(*tasks)

0 comments on commit 49e0c50

Please sign in to comment.