diff --git a/src/macaron/repo_finder/repo_finder.py b/src/macaron/repo_finder/repo_finder.py index d9b4df1e5..2a2f80652 100644 --- a/src/macaron/repo_finder/repo_finder.py +++ b/src/macaron/repo_finder/repo_finder.py @@ -45,7 +45,7 @@ from macaron.repo_finder.repo_finder_base import BaseRepoFinder from macaron.repo_finder.repo_finder_deps_dev import DepsDevRepoFinder from macaron.repo_finder.repo_finder_java import JavaRepoFinder -from macaron.repo_finder.repo_utils import generate_report, prepare_repo +from macaron.repo_finder.repo_utils import check_repo_urls_are_equivalent, generate_report, prepare_repo from macaron.slsa_analyzer.git_url import GIT_REPOS_DIR, list_remote_references logger: logging.Logger = logging.getLogger(__name__) @@ -133,7 +133,7 @@ def to_repo_path(purl: PackageURL, available_domains: list[str]) -> str | None: ) -def find_source(purl_string: str, input_repo: str | None) -> bool: +def find_source(purl_string: str, input_repo: str | None, latest_version_fallback: bool = True) -> bool: """Perform repo and commit finding for a passed PURL, or commit finding for a passed PURL and repo. Parameters @@ -142,6 +142,8 @@ def find_source(purl_string: str, input_repo: str | None) -> bool: The PURL string of the target. input_repo: str | None The repository path optionally provided by the user. + latest_version_fallback: bool + A flag that determines whether the latest version of the same artifact can be checked as a fallback option. Returns ------- @@ -151,12 +153,14 @@ def find_source(purl_string: str, input_repo: str | None) -> bool: try: purl = PackageURL.from_string(purl_string) except ValueError as error: - logger.error("Could not parse PURL: %s", error) + logger.error("Could not parse PURL: '%s'. Error: %s", purl_string, error) return False if not purl.version: - logger.debug("PURL is missing version.") - return False + purl = DepsDevRepoFinder().get_latest_version(purl) + if not purl.version: + logger.debug("PURL is missing version.") + return False found_repo = input_repo if not input_repo: @@ -165,11 +169,24 @@ def find_source(purl_string: str, input_repo: str | None) -> bool: if not found_repo: logger.error("Could not find repo for PURL: %s", purl) - return False + if not latest_version_fallback: + return False + + # Try to find the latest version repo. + latest_version_purl = get_latest_version(purl) + if latest_version_purl == purl: + logger.error("Latest version PURL is the same as original: %s", purl) + return False + + found_repo = DepsDevRepoFinder().find_repo(latest_version_purl) + if not found_repo: + logger.error("Could not find repo from latest version of PURL: %s >> %s.", latest_version_purl, purl) + return False # Disable other loggers for cleaner output. logging.getLogger("macaron.slsa_analyzer.analyzer").disabled = True + digest = None if defaults.getboolean("repofinder", "find_source_should_clone"): logger.debug("Preparing repo: %s", found_repo) repo_dir = os.path.join(global_config.output_path, GIT_REPOS_DIR) @@ -180,33 +197,41 @@ def find_source(purl_string: str, input_repo: str | None) -> bool: purl=purl, ) - if not git_obj: - # TODO expand this message to cover cases where the obj was not created due to lack of correct tag. - logger.error("Could not resolve repository: %s", found_repo) - return False - - try: - digest = git_obj.get_head().hash - except ValueError: - logger.debug("Could not retrieve commit hash from repository.") - return False + if git_obj: + try: + digest = git_obj.get_head().hash + except ValueError: + logger.debug("Could not retrieve commit hash from repository.") else: # Retrieve the tags. tags = get_tags_via_git_remote(found_repo) - if not tags: + if tags: + matches = match_tags(list(tags.keys()), purl.name, purl.version) + if matches: + matched_tag = matches[0] + digest = tags[matched_tag] + + if not digest: + logger.error("Could not find commit for purl / repository: %s / %s", purl, found_repo) + if not latest_version_fallback: return False - matches = match_tags(list(tags.keys()), purl.name, purl.version) + # Try to use the latest version of the artifact. + latest_version_purl = get_latest_version(purl) + if latest_version_purl == purl: + logger.error("Latest version PURL is the same as original: %s", purl) + return False - if not matches: + latest_repo = DepsDevRepoFinder().find_repo(latest_version_purl) + if not latest_repo: + logger.error("Could not find repo from latest version of PURL: %s >> %s.", latest_version_purl, purl) return False - matched_tag = matches[0] - digest = tags[matched_tag] + if check_repo_urls_are_equivalent(found_repo, latest_repo): + logger.error("Latest version repo is the same as original: %s", latest_repo) + return False - if not digest: - logger.error("Could not find commit for purl / repository: %s / %s", purl, found_repo) - return False + return find_source(str(purl), latest_repo, False) if not input_repo: logger.info("Found repository for PURL: %s", found_repo) @@ -219,6 +244,24 @@ def find_source(purl_string: str, input_repo: str | None) -> bool: return True +def get_latest_version(purl: PackageURL) -> PackageURL: + """Get the latest version of the passed artifact. + + Parameters + ---------- + purl: PackageURL + The artifact as a PURL. + + Returns + ------- + PackageURL + The latest version of the same artifact. + """ + namespace = purl.namespace + "/" if purl.namespace else "" + no_version_purl = PackageURL.from_string(f"pkg:{purl.type}/{namespace}{purl.name}") + return DepsDevRepoFinder.get_latest_version(no_version_purl) + + def get_tags_via_git_remote(repo: str) -> dict[str, str] | None: """Retrieve all tags from a given repository using ls-remote. diff --git a/src/macaron/repo_finder/repo_finder_deps_dev.py b/src/macaron/repo_finder/repo_finder_deps_dev.py index 4696caa27..5164137ab 100644 --- a/src/macaron/repo_finder/repo_finder_deps_dev.py +++ b/src/macaron/repo_finder/repo_finder_deps_dev.py @@ -36,6 +36,9 @@ class DepsDevType(StrEnum): class DepsDevRepoFinder(BaseRepoFinder): """This class is used to find repositories using Google's Open Source Insights A.K.A. deps.dev.""" + # See https://docs.deps.dev/api/v3alpha/ + BASE_URL = "https://api.deps.dev/v3alpha/purl/" + def find_repo(self, purl: PackageURL) -> str: """ Attempt to retrieve a repository URL that matches the passed artifact. @@ -108,6 +111,37 @@ def get_project_info(project_url: str) -> dict[str, Any] | None: return response_json + @staticmethod + def get_latest_version(purl: PackageURL) -> PackageURL: + """Return a PURL representing the latest version of the passed artifact.""" + original_purl = purl + if purl.version: + namespace = purl.namespace + "/" if purl.namespace else "" + purl = PackageURL.from_string(f"pkg:{purl.type}/{namespace}{purl.name}") + + url = f"{DepsDevRepoFinder.BASE_URL}{encode(str(purl), safe='')}" + response = send_get_http_raw(url) + + if not response: + return original_purl + + try: + metadata: dict = json.loads(response.text) + except ValueError as error: + logger.debug("Failed to parse response from deps.dev: %s", error) + return original_purl + + versions_keys = ["package", "versions"] if "package" in metadata else ["version"] + versions = json_extract(metadata, versions_keys, list) + if not versions: + return original_purl + latest_version = json_extract(versions[-1], ["versionKey", "version"], str) + if not latest_version: + return original_purl + + namespace = purl.namespace + "/" if purl.namespace else "" + return PackageURL.from_string(f"pkg:{purl.type}/{namespace}{purl.name}@{latest_version}") + def _create_urls(self, purl: PackageURL) -> list[str]: """ Create the urls to search for the metadata relating to the passed artifact. @@ -124,37 +158,13 @@ def _create_urls(self, purl: PackageURL) -> list[str]: list[str] The list of created URLs. """ - # See https://docs.deps.dev/api/v3alpha/ - base_url = f"https://api.deps.dev/v3alpha/purl/{encode(str(purl), safe='')}" + if not purl.version: + purl = DepsDevRepoFinder.get_latest_version(purl) - if not base_url: - return [] - - if purl.version: - return [base_url] - - # Find the latest version. - response = send_get_http_raw(base_url, {}) - - if not response: - return [] - - try: - metadata: dict = json.loads(response.text) - except ValueError as error: - logger.debug("Failed to parse response from deps.dev: %s", error) - return [] - - versions_keys = ["package", "versions"] if "package" in metadata else ["version"] - versions = json_extract(metadata, versions_keys, list) - if not versions: - return [] - latest_version = json_extract(versions[-1], ["versionKey", "version"], str) - if not latest_version: + if not purl.version: return [] - logger.debug("Found latest version: %s", latest_version) - return [f"{base_url}%40{latest_version}"] + return [f"{DepsDevRepoFinder.BASE_URL}{encode(str(purl), safe='')}"] def _retrieve_json(self, url: str) -> str: """ diff --git a/src/macaron/repo_finder/repo_finder_java.py b/src/macaron/repo_finder/repo_finder_java.py index 77e1705f8..8d106d1ea 100644 --- a/src/macaron/repo_finder/repo_finder_java.py +++ b/src/macaron/repo_finder/repo_finder_java.py @@ -11,6 +11,7 @@ from macaron.config.defaults import defaults from macaron.parsers.pomparser import parse_pom_string from macaron.repo_finder.repo_finder_base import BaseRepoFinder +from macaron.repo_finder.repo_finder_deps_dev import DepsDevRepoFinder from macaron.repo_finder.repo_validator import find_valid_repository_url from macaron.util import send_get_http_raw @@ -51,8 +52,13 @@ def find_repo(self, purl: PackageURL) -> str: if not version: logger.info("Version missing for maven artifact: %s:%s", group, artifact) - # TODO add support for Java artifacts without a version - return "" + purl = DepsDevRepoFinder().get_latest_version(purl) + if not purl.version: + logger.debug("Could not find version for artifact: %s:%s", purl.namespace, purl.name) + return "" + group = purl.namespace or "" + artifact = purl.name + version = purl.version while group and artifact and version and limit > 0: # Create the URLs for retrieving the artifact's POM diff --git a/src/macaron/repo_finder/repo_utils.py b/src/macaron/repo_finder/repo_utils.py index c3dffc8c5..59b15c599 100644 --- a/src/macaron/repo_finder/repo_utils.py +++ b/src/macaron/repo_finder/repo_utils.py @@ -15,6 +15,7 @@ from macaron.config.global_config import global_config from macaron.errors import CloneError, RepoCheckOutError from macaron.repo_finder.commit_finder import find_commit +from macaron.repo_finder.repo_finder_deps_dev import DepsDevRepoFinder from macaron.slsa_analyzer.git_service import GIT_SERVICES, BaseGitService from macaron.slsa_analyzer.git_service.base_git_service import NoneGitService from macaron.slsa_analyzer.git_url import ( @@ -131,6 +132,7 @@ def prepare_repo( branch_name: str = "", digest: str = "", purl: PackageURL | None = None, + latest_version_fallback: bool = True, ) -> Git | None: """Prepare the target repository for analysis. @@ -154,6 +156,8 @@ def prepare_repo( The hash of the commit that we want to checkout in the branch. purl : PackageURL | None The PURL of the analysis target. + latest_version_fallback: bool + A flag that determines whether the latest version of the same artifact can be checked as a fallback option. Returns ------- @@ -210,7 +214,12 @@ def prepare_repo( found_digest = find_commit(git_obj, purl) if not found_digest: logger.error("Could not map the input purl string to a specific commit in the corresponding repository.") - return None + if not latest_version_fallback: + return None + # If the commit could not be found, check if the latest version of the artifact has a different repository. + git_obj, repo_path, found_digest = check_latest_version(purl, repo_path, target_dir) + if not git_obj: + return None digest = found_digest # Checking out the specific branch or commit. This operation varies depends on the git service that the @@ -278,3 +287,70 @@ def get_git_service(remote_path: str | None) -> BaseGitService: return git_service return NoneGitService() + + +def check_latest_version(purl: PackageURL, repo_path: str, target_dir: str) -> tuple[Git | None, str, str]: + """Check the latest version of an artifact to see if it has a different repository URL. + + Parameters + ---------- + purl : PackageURL | None + The PURL of the analysis target. + repo_path : str + The path to the repository, can be either local or remote. + target_dir : str + The directory where all remote repository will be cloned. + + Returns + ------- + tuple[Git | None, str, str] + A tuple of: the pydriller.Git object of the repository (or None if error), the repository path, the commit. + """ + namespace = purl.namespace + "/" if purl.namespace else "" + no_version_purl = PackageURL.from_string(f"pkg:{purl.type}/{namespace}{purl.name}") + + latest_version_purl = DepsDevRepoFinder.get_latest_version(no_version_purl) + if latest_version_purl == purl: + return None, "", "" + + latest_repo = DepsDevRepoFinder().find_repo(latest_version_purl) + if not latest_repo: + return None, "", "" + + if check_repo_urls_are_equivalent(repo_path, latest_repo): + return None, "", "" + + # Try to prepare the new repo. + git_obj = prepare_repo(target_dir, latest_repo, "", "", purl, False) + if not git_obj: + return None, "", "" + + # Try to find the commit in the new repo. + digest = find_commit(git_obj, purl) + if not digest: + return None, "", "" + + return git_obj, latest_repo, digest + + +def check_repo_urls_are_equivalent(repo_1: str, repo_2: str) -> bool: + """Check if the two passed repo URLs are equivalent. + + Parameters + ---------- + repo_1: str + The first repository URL as a string. + repo_2: str + The second repository URL as a string. + + Returns + ------- + bool + True if the repository URLs have equal hostnames and paths, otherwise False. + """ + repo_url_1 = urlparse(repo_1) + repo_url_2 = urlparse(repo_2) + if repo_url_1.hostname != repo_url_2.hostname or repo_url_1.path != repo_url_2.path: + return False + + return True diff --git a/tests/integration/cases/repo_finder_remote_calls/repo_finder.py b/tests/integration/cases/repo_finder_remote_calls/repo_finder.py index 12f10cac1..16de2c7d5 100644 --- a/tests/integration/cases/repo_finder_remote_calls/repo_finder.py +++ b/tests/integration/cases/repo_finder_remote_calls/repo_finder.py @@ -12,6 +12,7 @@ from macaron.config.defaults import defaults from macaron.repo_finder import repo_validator from macaron.repo_finder.repo_finder import find_repo +from macaron.repo_finder.repo_finder_deps_dev import DepsDevRepoFinder from macaron.slsa_analyzer.git_url import clean_url logger: logging.Logger = logging.getLogger(__name__) @@ -70,6 +71,20 @@ def test_repo_finder() -> int: if not parsed_url or not repo_validator.resolve_redirects(parsed_url): return os.EX_UNAVAILABLE + # Test Java package whose SCM metadata only points to the repo in the later versions than is provided here. + purl = PackageURL.from_string("pkg:maven/io.vertx/vertx-auth-common@3.8.0") + repo = find_repo(purl) + if repo == "https://github.com/eclipse-vertx/vertx-auth": + return os.EX_UNAVAILABLE + latest_purl = DepsDevRepoFinder().get_latest_version(purl) + repo = find_repo(latest_purl) + if repo != "https://github.com/eclipse-vertx/vertx-auth": + return os.EX_UNAVAILABLE + + # Test Java package that has no version. + if not find_repo(PackageURL.from_string("pkg:maven/io.vertx/vertx-auth-common")): + return os.EX_UNAVAILABLE + return os.EX_OK