Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: find repo from latest artifact when provided artifact has none #931

Open
wants to merge 2 commits into
base: staging
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 67 additions & 24 deletions src/macaron/repo_finder/repo_finder.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
from macaron.repo_finder.repo_finder_base import BaseRepoFinder
from macaron.repo_finder.repo_finder_deps_dev import DepsDevRepoFinder
from macaron.repo_finder.repo_finder_java import JavaRepoFinder
from macaron.repo_finder.repo_utils import generate_report, prepare_repo
from macaron.repo_finder.repo_utils import check_repo_urls_are_equal, generate_report, prepare_repo
from macaron.slsa_analyzer.git_url import GIT_REPOS_DIR, list_remote_references

logger: logging.Logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -133,7 +133,7 @@ def to_repo_path(purl: PackageURL, available_domains: list[str]) -> str | None:
)


def find_source(purl_string: str, input_repo: str | None) -> bool:
def find_source(purl_string: str, input_repo: str | None, latest_version_fallback: bool = True) -> bool:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that the repo finder used in the analyzer module is inconsistent with this implementation, as it doesn't utilize the same fallback for the latest version?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does use the same mechanism just through a different path. The repository is potentially re-found during the prepare_repo function call in repo_utils.py

"""Perform repo and commit finding for a passed PURL, or commit finding for a passed PURL and repo.

Parameters
Expand All @@ -142,6 +142,8 @@ def find_source(purl_string: str, input_repo: str | None) -> bool:
The PURL string of the target.
input_repo: str | None
The repository path optionally provided by the user.
latest_version_fallback: bool
A flag that determines whether the latest version of the same artifact can be checked as a fallback option.

Returns
-------
Expand All @@ -151,12 +153,14 @@ def find_source(purl_string: str, input_repo: str | None) -> bool:
try:
purl = PackageURL.from_string(purl_string)
except ValueError as error:
logger.error("Could not parse PURL: %s", error)
logger.error("Could not parse PURL: '%s'. Error: %s", purl_string, error)
return False

if not purl.version:
logger.debug("PURL is missing version.")
return False
purl = DepsDevRepoFinder().get_latest_version(purl)
if not purl.version:
logger.debug("PURL is missing version.")
return False

found_repo = input_repo
if not input_repo:
Expand All @@ -165,11 +169,24 @@ def find_source(purl_string: str, input_repo: str | None) -> bool:

if not found_repo:
logger.error("Could not find repo for PURL: %s", purl)
return False
if not latest_version_fallback:
return False

# Try to find the latest version repo.
latest_version_purl = get_latest_version(purl)
if latest_version_purl == purl:
logger.error("Latest version PURL is the same as original: %s", purl)
return False

found_repo = DepsDevRepoFinder().find_repo(latest_version_purl)
if not found_repo:
logger.error("Could not find repo from latest version of PURL: %s >> %s.", latest_version_purl, purl)
return False

# Disable other loggers for cleaner output.
logging.getLogger("macaron.slsa_analyzer.analyzer").disabled = True

digest = None
if defaults.getboolean("repofinder", "find_source_should_clone"):
logger.debug("Preparing repo: %s", found_repo)
repo_dir = os.path.join(global_config.output_path, GIT_REPOS_DIR)
Expand All @@ -180,33 +197,41 @@ def find_source(purl_string: str, input_repo: str | None) -> bool:
purl=purl,
)

if not git_obj:
# TODO expand this message to cover cases where the obj was not created due to lack of correct tag.
logger.error("Could not resolve repository: %s", found_repo)
return False

try:
digest = git_obj.get_head().hash
except ValueError:
logger.debug("Could not retrieve commit hash from repository.")
return False
if git_obj:
try:
digest = git_obj.get_head().hash
except ValueError:
logger.debug("Could not retrieve commit hash from repository.")
else:
# Retrieve the tags.
tags = get_tags_via_git_remote(found_repo)
if not tags:
if tags:
matches = match_tags(list(tags.keys()), purl.name, purl.version)
if matches:
matched_tag = matches[0]
digest = tags[matched_tag]

if not digest:
logger.error("Could not find commit for purl / repository: %s / %s", purl, found_repo)
if not latest_version_fallback:
return False

matches = match_tags(list(tags.keys()), purl.name, purl.version)
# Try to use the latest version of the artifact.
latest_version_purl = get_latest_version(purl)
if latest_version_purl == purl:
logger.error("Latest version PURL is the same as original: %s", purl)
return False

if not matches:
latest_repo = DepsDevRepoFinder().find_repo(latest_version_purl)
if not latest_repo:
logger.error("Could not find repo from latest version of PURL: %s >> %s.", latest_version_purl, purl)
return False

matched_tag = matches[0]
digest = tags[matched_tag]
if check_repo_urls_are_equal(found_repo, latest_repo):
logger.error("Latest version repo is the same as original: %s", latest_repo)
return False

if not digest:
logger.error("Could not find commit for purl / repository: %s / %s", purl, found_repo)
return False
return find_source(str(purl), latest_repo, False)

if not input_repo:
logger.info("Found repository for PURL: %s", found_repo)
Expand All @@ -219,6 +244,24 @@ def find_source(purl_string: str, input_repo: str | None) -> bool:
return True


def get_latest_version(purl: PackageURL) -> PackageURL:
"""Get the latest version of the passed artifact.

Parameters
----------
purl: PackageURL
The artifact as a PURL.

Returns
-------
PackageURL
The latest version of the same artifact.
"""
namespace = purl.namespace + "/" if purl.namespace else ""
no_version_purl = PackageURL.from_string(f"pkg:{purl.type}/{namespace}{purl.name}")
return DepsDevRepoFinder.get_latest_version(no_version_purl)


def get_tags_via_git_remote(repo: str) -> dict[str, str] | None:
"""Retrieve all tags from a given repository using ls-remote.

Expand Down
66 changes: 38 additions & 28 deletions src/macaron/repo_finder/repo_finder_deps_dev.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ class DepsDevType(StrEnum):
class DepsDevRepoFinder(BaseRepoFinder):
"""This class is used to find repositories using Google's Open Source Insights A.K.A. deps.dev."""

# See https://docs.deps.dev/api/v3alpha/
BASE_URL = "https://api.deps.dev/v3alpha/purl/"

def find_repo(self, purl: PackageURL) -> str:
"""
Attempt to retrieve a repository URL that matches the passed artifact.
Expand Down Expand Up @@ -108,6 +111,37 @@ def get_project_info(project_url: str) -> dict[str, Any] | None:

return response_json

@staticmethod
def get_latest_version(purl: PackageURL) -> PackageURL:
"""Return a PURL representing the latest version of the passed artifact."""
original_purl = purl
if purl.version:
namespace = purl.namespace + "/" if purl.namespace else ""
purl = PackageURL.from_string(f"pkg:{purl.type}/{namespace}{purl.name}")

url = f"{DepsDevRepoFinder.BASE_URL}{encode(str(purl), safe='')}"
response = send_get_http_raw(url)

if not response:
return original_purl

try:
metadata: dict = json.loads(response.text)
except ValueError as error:
logger.debug("Failed to parse response from deps.dev: %s", error)
return original_purl

versions_keys = ["package", "versions"] if "package" in metadata else ["version"]
versions = json_extract(metadata, versions_keys, list)
if not versions:
return original_purl
latest_version = json_extract(versions[-1], ["versionKey", "version"], str)
if not latest_version:
return original_purl

namespace = purl.namespace + "/" if purl.namespace else ""
return PackageURL.from_string(f"pkg:{purl.type}/{namespace}{purl.name}@{latest_version}")

def _create_urls(self, purl: PackageURL) -> list[str]:
"""
Create the urls to search for the metadata relating to the passed artifact.
Expand All @@ -124,37 +158,13 @@ def _create_urls(self, purl: PackageURL) -> list[str]:
list[str]
The list of created URLs.
"""
# See https://docs.deps.dev/api/v3alpha/
base_url = f"https://api.deps.dev/v3alpha/purl/{encode(str(purl), safe='')}"
if not purl.version:
purl = DepsDevRepoFinder.get_latest_version(purl)

if not base_url:
return []

if purl.version:
return [base_url]

# Find the latest version.
response = send_get_http_raw(base_url, {})

if not response:
return []

try:
metadata: dict = json.loads(response.text)
except ValueError as error:
logger.debug("Failed to parse response from deps.dev: %s", error)
return []

versions_keys = ["package", "versions"] if "package" in metadata else ["version"]
versions = json_extract(metadata, versions_keys, list)
if not versions:
return []
latest_version = json_extract(versions[-1], ["versionKey", "version"], str)
if not latest_version:
if not purl.version:
return []

logger.debug("Found latest version: %s", latest_version)
return [f"{base_url}%40{latest_version}"]
return [f"{DepsDevRepoFinder.BASE_URL}{encode(str(purl), safe='')}"]

def _retrieve_json(self, url: str) -> str:
"""
Expand Down
10 changes: 8 additions & 2 deletions src/macaron/repo_finder/repo_finder_java.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from macaron.config.defaults import defaults
from macaron.parsers.pomparser import parse_pom_string
from macaron.repo_finder.repo_finder_base import BaseRepoFinder
from macaron.repo_finder.repo_finder_deps_dev import DepsDevRepoFinder
from macaron.repo_finder.repo_validator import find_valid_repository_url
from macaron.util import send_get_http_raw

Expand Down Expand Up @@ -51,8 +52,13 @@ def find_repo(self, purl: PackageURL) -> str:

if not version:
logger.info("Version missing for maven artifact: %s:%s", group, artifact)
# TODO add support for Java artifacts without a version
return ""
purl = DepsDevRepoFinder().get_latest_version(purl)
if not purl.version:
logger.debug("Could not find version for artifact: %s:%s", purl.namespace, purl.name)
return ""
group = purl.namespace or ""
artifact = purl.name
version = purl.version

while group and artifact and version and limit > 0:
# Create the URLs for retrieving the artifact's POM
Expand Down
78 changes: 77 additions & 1 deletion src/macaron/repo_finder/repo_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from macaron.config.global_config import global_config
from macaron.errors import CloneError, RepoCheckOutError
from macaron.repo_finder.commit_finder import find_commit
from macaron.repo_finder.repo_finder_deps_dev import DepsDevRepoFinder
from macaron.slsa_analyzer.git_service import GIT_SERVICES, BaseGitService
from macaron.slsa_analyzer.git_service.base_git_service import NoneGitService
from macaron.slsa_analyzer.git_url import (
Expand Down Expand Up @@ -131,6 +132,7 @@ def prepare_repo(
branch_name: str = "",
digest: str = "",
purl: PackageURL | None = None,
latest_version_fallback: bool = True,
) -> Git | None:
"""Prepare the target repository for analysis.

Expand All @@ -154,6 +156,8 @@ def prepare_repo(
The hash of the commit that we want to checkout in the branch.
purl : PackageURL | None
The PURL of the analysis target.
latest_version_fallback: bool
A flag that determines whether the latest version of the same artifact can be checked as a fallback option.

Returns
-------
Expand Down Expand Up @@ -210,7 +214,12 @@ def prepare_repo(
found_digest = find_commit(git_obj, purl)
if not found_digest:
logger.error("Could not map the input purl string to a specific commit in the corresponding repository.")
return None
if not latest_version_fallback:
return None
# If the commit could not be found, check if the latest version of the artifact has a different repository.
git_obj, repo_path, found_digest = check_latest_version(purl, repo_path, target_dir)
if not git_obj:
return None
digest = found_digest

# Checking out the specific branch or commit. This operation varies depends on the git service that the
Expand Down Expand Up @@ -278,3 +287,70 @@ def get_git_service(remote_path: str | None) -> BaseGitService:
return git_service

return NoneGitService()


def check_latest_version(purl: PackageURL, repo_path: str, target_dir: str) -> tuple[Git | None, str, str]:
"""Check the latest version of an artifact to see if it has a different repository URL.

Parameters
----------
purl : PackageURL | None
The PURL of the analysis target.
repo_path : str
The path to the repository, can be either local or remote.
target_dir : str
The directory where all remote repository will be cloned.

Returns
-------
tuple[Git | None, str, str]
A tuple of: the pydriller.Git object of the repository (or None if error), the repository path, the commit.
"""
namespace = purl.namespace + "/" if purl.namespace else ""
no_version_purl = PackageURL.from_string(f"pkg:{purl.type}/{namespace}{purl.name}")

latest_version_purl = DepsDevRepoFinder.get_latest_version(no_version_purl)
if latest_version_purl == purl:
return None, "", ""

latest_repo = DepsDevRepoFinder().find_repo(latest_version_purl)
if not latest_repo:
return None, "", ""

if check_repo_urls_are_equal(repo_path, latest_repo):
return None, "", ""

# Try to prepare the new repo.
git_obj = prepare_repo(target_dir, latest_repo, "", "", purl, False)
if not git_obj:
return None, "", ""

# Try to find the commit in the new repo.
digest = find_commit(git_obj, purl)
if not digest:
return None, "", ""

return git_obj, latest_repo, digest


def check_repo_urls_are_equal(repo_1: str, repo_2: str) -> bool:
"""Check if the two passed repo URLs are equal.

Parameters
----------
repo_1: str
The first repository URL as a string.
repo_2: str
The second repository URL as a string.

Returns
-------
bool
True if the repository URLs have equal hostnames and paths, otherwise False.
"""
repo_url_1 = urlparse(repo_1)
repo_url_2 = urlparse(repo_2)
if repo_url_1.hostname != repo_url_2.hostname or repo_url_1.path != repo_url_2.path:
return False

return True
Loading
Loading