Skip to content

Commit

Permalink
feat: find repo from latest artifact when provided artifact has none
Browse files Browse the repository at this point in the history
Signed-off-by: Ben Selwyn-Smith <[email protected]>
  • Loading branch information
benmss committed Nov 28, 2024
1 parent b65f0db commit 70daa62
Show file tree
Hide file tree
Showing 5 changed files with 205 additions and 55 deletions.
91 changes: 67 additions & 24 deletions src/macaron/repo_finder/repo_finder.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
from macaron.repo_finder.repo_finder_base import BaseRepoFinder
from macaron.repo_finder.repo_finder_deps_dev import DepsDevRepoFinder
from macaron.repo_finder.repo_finder_java import JavaRepoFinder
from macaron.repo_finder.repo_utils import generate_report, prepare_repo
from macaron.repo_finder.repo_utils import check_repo_urls_are_equal, generate_report, prepare_repo
from macaron.slsa_analyzer.git_url import GIT_REPOS_DIR, list_remote_references

logger: logging.Logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -133,7 +133,7 @@ def to_repo_path(purl: PackageURL, available_domains: list[str]) -> str | None:
)


def find_source(purl_string: str, input_repo: str | None) -> bool:
def find_source(purl_string: str, input_repo: str | None, latest_version_fallback: bool = True) -> bool:
"""Perform repo and commit finding for a passed PURL, or commit finding for a passed PURL and repo.
Parameters
Expand All @@ -142,6 +142,8 @@ def find_source(purl_string: str, input_repo: str | None) -> bool:
The PURL string of the target.
input_repo: str | None
The repository path optionally provided by the user.
latest_version_fallback: bool
A flag that determines whether the latest version of the same artifact can be checked as a fallback option.
Returns
-------
Expand All @@ -151,12 +153,14 @@ def find_source(purl_string: str, input_repo: str | None) -> bool:
try:
purl = PackageURL.from_string(purl_string)
except ValueError as error:
logger.error("Could not parse PURL: %s", error)
logger.error("Could not parse PURL: '%s'. Error: %s", purl_string, error)
return False

if not purl.version:
logger.debug("PURL is missing version.")
return False
purl = DepsDevRepoFinder().get_latest_version(purl)
if not purl.version:
logger.debug("PURL is missing version.")
return False

found_repo = input_repo
if not input_repo:
Expand All @@ -165,11 +169,24 @@ def find_source(purl_string: str, input_repo: str | None) -> bool:

if not found_repo:
logger.error("Could not find repo for PURL: %s", purl)
return False
if not latest_version_fallback:
return False

# Try to find the latest version repo.
latest_version_purl = get_latest_version(purl)
if latest_version_purl == purl:
logger.error("Latest version PURL is the same as original: %s", purl)
return False

found_repo = DepsDevRepoFinder().find_repo(latest_version_purl)
if not found_repo:
logger.error("Could not find repo from latest version of PURL: %s >> %s.", latest_version_purl, purl)
return False

# Disable other loggers for cleaner output.
logging.getLogger("macaron.slsa_analyzer.analyzer").disabled = True

digest = None
if defaults.getboolean("repofinder", "find_source_should_clone"):
logger.debug("Preparing repo: %s", found_repo)
repo_dir = os.path.join(global_config.output_path, GIT_REPOS_DIR)
Expand All @@ -180,33 +197,41 @@ def find_source(purl_string: str, input_repo: str | None) -> bool:
purl=purl,
)

if not git_obj:
# TODO expand this message to cover cases where the obj was not created due to lack of correct tag.
logger.error("Could not resolve repository: %s", found_repo)
return False

try:
digest = git_obj.get_head().hash
except ValueError:
logger.debug("Could not retrieve commit hash from repository.")
return False
if git_obj:
try:
digest = git_obj.get_head().hash
except ValueError:
logger.debug("Could not retrieve commit hash from repository.")
else:
# Retrieve the tags.
tags = get_tags_via_git_remote(found_repo)
if not tags:
if tags:
matches = match_tags(list(tags.keys()), purl.name, purl.version)
if matches:
matched_tag = matches[0]
digest = tags[matched_tag]

if not digest:
logger.error("Could not find commit for purl / repository: %s / %s", purl, found_repo)
if not latest_version_fallback:
return False

matches = match_tags(list(tags.keys()), purl.name, purl.version)
# Try to use the latest version of the artifact.
latest_version_purl = get_latest_version(purl)
if latest_version_purl == purl:
logger.error("Latest version PURL is the same as original: %s", purl)
return False

if not matches:
latest_repo = DepsDevRepoFinder().find_repo(latest_version_purl)
if not latest_repo:
logger.error("Could not find repo from latest version of PURL: %s >> %s.", latest_version_purl, purl)
return False

matched_tag = matches[0]
digest = tags[matched_tag]
if check_repo_urls_are_equal(found_repo, latest_repo):
logger.error("Latest version repo is the same as original: %s", latest_repo)
return False

if not digest:
logger.error("Could not find commit for purl / repository: %s / %s", purl, found_repo)
return False
return find_source(str(purl), latest_repo, False)

if not input_repo:
logger.info("Found repository for PURL: %s", found_repo)
Expand All @@ -219,6 +244,24 @@ def find_source(purl_string: str, input_repo: str | None) -> bool:
return True


def get_latest_version(purl: PackageURL) -> PackageURL:
"""Get the latest version of the passed artifact.
Parameters
----------
purl: PackageURL
The artifact as a PURL.
Returns
-------
PackageURL
The latest version of the same artifact.
"""
namespace = purl.namespace + "/" if purl.namespace else ""
no_version_purl = PackageURL.from_string(f"pkg:{purl.type}/{namespace}{purl.name}")
return DepsDevRepoFinder.get_latest_version(no_version_purl)


def get_tags_via_git_remote(repo: str) -> dict[str, str] | None:
"""Retrieve all tags from a given repository using ls-remote.
Expand Down
66 changes: 38 additions & 28 deletions src/macaron/repo_finder/repo_finder_deps_dev.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ class DepsDevType(StrEnum):
class DepsDevRepoFinder(BaseRepoFinder):
"""This class is used to find repositories using Google's Open Source Insights A.K.A. deps.dev."""

# See https://docs.deps.dev/api/v3alpha/
BASE_URL = "https://api.deps.dev/v3alpha/purl/"

def find_repo(self, purl: PackageURL) -> str:
"""
Attempt to retrieve a repository URL that matches the passed artifact.
Expand Down Expand Up @@ -108,6 +111,37 @@ def get_project_info(project_url: str) -> dict[str, Any] | None:

return response_json

@staticmethod
def get_latest_version(purl: PackageURL) -> PackageURL:
"""Return a PURL representing the latest version of the passed artifact."""
original_purl = purl
if purl.version:
namespace = purl.namespace + "/" if purl.namespace else ""
purl = PackageURL.from_string(f"pkg:{purl.type}/{namespace}{purl.name}")

url = f"{DepsDevRepoFinder.BASE_URL}{encode(str(purl), safe='')}"
response = send_get_http_raw(url)

if not response:
return original_purl

try:
metadata: dict = json.loads(response.text)
except ValueError as error:
logger.debug("Failed to parse response from deps.dev: %s", error)
return original_purl

versions_keys = ["package", "versions"] if "package" in metadata else ["version"]
versions = json_extract(metadata, versions_keys, list)
if not versions:
return original_purl
latest_version = json_extract(versions[-1], ["versionKey", "version"], str)
if not latest_version:
return original_purl

namespace = purl.namespace + "/" if purl.namespace else ""
return PackageURL.from_string(f"pkg:{purl.type}/{namespace}{purl.name}@{latest_version}")

def _create_urls(self, purl: PackageURL) -> list[str]:
"""
Create the urls to search for the metadata relating to the passed artifact.
Expand All @@ -124,37 +158,13 @@ def _create_urls(self, purl: PackageURL) -> list[str]:
list[str]
The list of created URLs.
"""
# See https://docs.deps.dev/api/v3alpha/
base_url = f"https://api.deps.dev/v3alpha/purl/{encode(str(purl)).replace('/', '%2F')}"
if not purl.version:
purl = DepsDevRepoFinder.get_latest_version(purl)

if not base_url:
return []

if purl.version:
return [base_url]

# Find the latest version.
response = send_get_http_raw(base_url, {})

if not response:
return []

try:
metadata: dict = json.loads(response.text)
except ValueError as error:
logger.debug("Failed to parse response from deps.dev: %s", error)
return []

versions_keys = ["package", "versions"] if "package" in metadata else ["version"]
versions = json_extract(metadata, versions_keys, list)
if not versions:
return []
latest_version = json_extract(versions[-1], ["versionKey", "version"], str)
if not latest_version:
if not purl.version:
return []

logger.debug("Found latest version: %s", latest_version)
return [f"{base_url}%40{latest_version}"]
return [f"{DepsDevRepoFinder.BASE_URL}{encode(str(purl), safe='')}"]

def _retrieve_json(self, url: str) -> str:
"""
Expand Down
10 changes: 8 additions & 2 deletions src/macaron/repo_finder/repo_finder_java.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from macaron.config.defaults import defaults
from macaron.parsers.pomparser import parse_pom_string
from macaron.repo_finder.repo_finder_base import BaseRepoFinder
from macaron.repo_finder.repo_finder_deps_dev import DepsDevRepoFinder
from macaron.repo_finder.repo_validator import find_valid_repository_url
from macaron.util import send_get_http_raw

Expand Down Expand Up @@ -51,8 +52,13 @@ def find_repo(self, purl: PackageURL) -> str:

if not version:
logger.info("Version missing for maven artifact: %s:%s", group, artifact)
# TODO add support for Java artifacts without a version
return ""
purl = DepsDevRepoFinder().get_latest_version(purl)
if not purl.version:
logger.debug("Could not find version for artifact: %s:%s", purl.namespace, purl.name)
return ""
group = purl.namespace or ""
artifact = purl.name
version = purl.version

while group and artifact and version and limit > 0:
# Create the URLs for retrieving the artifact's POM
Expand Down
78 changes: 77 additions & 1 deletion src/macaron/repo_finder/repo_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from macaron.config.global_config import global_config
from macaron.errors import CloneError, RepoCheckOutError
from macaron.repo_finder.commit_finder import find_commit
from macaron.repo_finder.repo_finder_deps_dev import DepsDevRepoFinder
from macaron.slsa_analyzer.git_service import GIT_SERVICES, BaseGitService
from macaron.slsa_analyzer.git_service.base_git_service import NoneGitService
from macaron.slsa_analyzer.git_url import (
Expand Down Expand Up @@ -131,6 +132,7 @@ def prepare_repo(
branch_name: str = "",
digest: str = "",
purl: PackageURL | None = None,
latest_version_fallback: bool = True,
) -> Git | None:
"""Prepare the target repository for analysis.
Expand All @@ -154,6 +156,8 @@ def prepare_repo(
The hash of the commit that we want to checkout in the branch.
purl : PackageURL | None
The PURL of the analysis target.
latest_version_fallback: bool
A flag that determines whether the latest version of the same artifact can be checked as a fallback option.
Returns
-------
Expand Down Expand Up @@ -210,7 +214,12 @@ def prepare_repo(
found_digest = find_commit(git_obj, purl)
if not found_digest:
logger.error("Could not map the input purl string to a specific commit in the corresponding repository.")
return None
if not latest_version_fallback:
return None
# If the commit could not be found, check if the latest version of the artifact has a different repository.
git_obj, repo_path, found_digest = check_latest_version(purl, repo_path, target_dir)
if not git_obj:
return None
digest = found_digest

# Checking out the specific branch or commit. This operation varies depends on the git service that the
Expand Down Expand Up @@ -278,3 +287,70 @@ def get_git_service(remote_path: str | None) -> BaseGitService:
return git_service

return NoneGitService()


def check_latest_version(purl: PackageURL, repo_path: str, target_dir: str) -> tuple[Git | None, str, str]:
"""Check the latest version of an artifact to see if it has a different repository URL.
Parameters
----------
purl : PackageURL | None
The PURL of the analysis target.
repo_path : str
The path to the repository, can be either local or remote.
target_dir : str
The directory where all remote repository will be cloned.
Returns
-------
tuple[Git | None, str, str]
A tuple of: the pydriller.Git object of the repository (or None if error), the repository path, the commit.
"""
namespace = purl.namespace + "/" if purl.namespace else ""
no_version_purl = PackageURL.from_string(f"pkg:{purl.type}/{namespace}{purl.name}")

latest_version_purl = DepsDevRepoFinder.get_latest_version(no_version_purl)
if latest_version_purl == purl:
return None, "", ""

latest_repo = DepsDevRepoFinder().find_repo(latest_version_purl)
if not latest_repo:
return None, "", ""

if check_repo_urls_are_equal(repo_path, latest_repo):
return None, "", ""

# Try to prepare the new repo.
git_obj = prepare_repo(target_dir, latest_repo, "", "", purl, False)
if not git_obj:
return None, "", ""

# Try to find the commit in the new repo.
digest = find_commit(git_obj, purl)
if not digest:
return None, "", ""

return git_obj, latest_repo, digest


def check_repo_urls_are_equal(repo_1: str, repo_2: str) -> bool:
"""Check if the two passed repo URLs are equal.
Parameters
----------
repo_1: str
The first repository URL as a string.
repo_2: str
The second repository URL as a string.
Returns
-------
bool
True if the repository URLs have equal hostnames and paths, otherwise False.
"""
repo_url_1 = urlparse(repo_1)
repo_url_2 = urlparse(repo_2)
if repo_url_1.hostname != repo_url_2.hostname or repo_url_1.path != repo_url_2.path:
return False

return True
Loading

0 comments on commit 70daa62

Please sign in to comment.