Skip to content

Commit

Permalink
Added logic to query the PyPi json API for known vulnerabilities for …
Browse files Browse the repository at this point in the history
…each PyPi dependency
  • Loading branch information
owenlamont committed Dec 17, 2024
1 parent a87dd99 commit 690d512
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 28 deletions.
4 changes: 2 additions & 2 deletions src/uv_secure/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from uv_secure.run import app
from uv_secure.run import app, check_dependencies


__all__ = ["app"]
__all__ = ["app", "check_dependencies"]
102 changes: 78 additions & 24 deletions src/uv_secure/run.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
import asyncio
from pathlib import Path
import re
import sys
from typing import Optional

import httpx
from pydantic import BaseModel
import typer

Expand All @@ -19,40 +23,90 @@ class Dependency(BaseModel):
version: str


class Vulnerability(BaseModel):
id: str
details: str
fixed_in: Optional[list[str]] = None
aliases: Optional[list[str]] = None
link: Optional[str] = None
source: Optional[str] = None
summary: Optional[str] = None
withdrawn: Optional[str] = None


def parse_uv_lock_file(file_path: Path) -> list[Dependency]:
"""Parses a uv.lock TOML file and extract PyPi package dependencies"""
"""Parses a uv.lock TOML file and extracts package PyPi dependencies"""
with file_path.open("rb") as f:
data = toml.load(f)

package_data = data.get("package", [])
dependencies = []

for package in package_data:
sdist = package.get("sdist", {}).get("url")
wheels = package.get("wheels", [])

if (sdist and "https://files.pythonhosted.org/" in sdist) or (
wheels
and "url" in wheels[0]
and "https://files.pythonhosted.org/" in wheels[0]["url"]
):
dependencies.append(
Dependency(name=package["name"], version=package["version"])
)

return dependencies
return [
Dependency(name=package["name"], version=package["version"])
for package in package_data
if package.get("source", {}).get("registry") == "https://pypi.org/simple"
]


def canonicalize_name(name: str) -> str:
"""Converts a package name to its canonical form for PyPI URLs"""
return re.sub(r"[_.]+", "-", name).lower()


async def fetch_vulnerabilities(
client: httpx.AsyncClient, dependency: Dependency
) -> tuple[Dependency, list[Vulnerability]]:
"""Queries the PyPi JSON API for vulnerabilities of a given dependency."""
canonical_name = canonicalize_name(dependency.name)
url = f"https://pypi.org/pypi/{canonical_name}/{dependency.version}/json"
try:
response = await client.get(url)
if response.status_code == 200:
data = response.json()
vulnerabilities = [
Vulnerability(**v) for v in data.get("vulnerabilities", [])
]
return dependency, vulnerabilities
typer.echo(
f"Warning: Could not fetch data for {dependency.name}=={dependency.version}"
)
except httpx.RequestError as e:
typer.echo(f"Error fetching {dependency.name}=={dependency.version}: {e}")
return dependency, []


async def check_all_vulnerabilities(
dependencies: list[Dependency],
) -> list[tuple[Dependency, list[Vulnerability]]]:
"""Fetch vulnerabilities for all dependencies concurrently."""
async with httpx.AsyncClient(timeout=10) as client:
tasks = [fetch_vulnerabilities(client, dep) for dep in dependencies]
return await asyncio.gather(*tasks)


def check_dependencies(uv_lock_path: Path) -> None:
if not uv_lock_path.exists():
typer.echo(f"Error: File {uv_lock_path} does not exist.")
raise typer.Exit(1)
dependencies = parse_uv_lock_file(uv_lock_path)
typer.echo("Checking dependencies for vulnerabilities...")
results = asyncio.run(check_all_vulnerabilities(dependencies))
for dep, vulnerabilities in results:
if vulnerabilities:
typer.echo(f"Vulnerabilities found for {dep.name}=={dep.version}:")
for vuln in vulnerabilities:
fixed_in = (
", ".join(vuln.fixed_in) if vuln.fixed_in else "Not specified"
)
typer.echo(f"- {vuln.id}: {vuln.details} (Fixed in: {fixed_in})")
typer.echo(f" Source: {vuln.source}, Link: {vuln.link}")
else:
typer.echo(f"No known vulnerabilities for {dep.name}=={dep.version}")


@app.command()
def main(uv_lock_path: Path) -> None:
"""Parse a uv.lock file and list its dependencies"""
if not uv_lock_path.exists():
typer.echo(f"Error: File {uv_lock_path} does not exist.")
raise typer.Exit(1)

dependencies = parse_uv_lock_file(uv_lock_path)
for dep in dependencies:
typer.echo(f"{dep.name}=={dep.version}")
check_dependencies(uv_lock_path)


if __name__ == "__main__":
Expand Down
9 changes: 7 additions & 2 deletions tests/uv_secure/test_run.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
from pathlib import Path

from pytest_mock import MockFixture
from typer.testing import CliRunner

from uv_secure import app
Expand All @@ -6,6 +9,8 @@
runner = CliRunner()


def test_app() -> None:
result = runner.invoke(app)
def test_app(mocker: MockFixture) -> None:
mock_check_dependencies = mocker.patch("uv_secure.run.check_dependencies")
result = runner.invoke(app, "uv.lock")
mock_check_dependencies.assert_called_once_with(Path("uv.lock"))
assert result.exit_code == 0

0 comments on commit 690d512

Please sign in to comment.