From 235e8008329f4b0470d36074ea173062f49f1295 Mon Sep 17 00:00:00 2001 From: Dan D'Avella Date: Tue, 30 Apr 2024 14:11:28 -0400 Subject: [PATCH] Add parser/detector for CodeQL SARIF files (#531) --- pyproject.toml | 1 + src/codemodder/codemods/codeql.py | 29 ++++++++++ src/codemodder/codeql.py | 68 +++++++++++++++++++++++ tests/test_codeql.py | 89 +++++++++++++++++++++++++++++++ 4 files changed, 187 insertions(+) create mode 100644 src/codemodder/codemods/codeql.py create mode 100644 src/codemodder/codeql.py create mode 100644 tests/test_codeql.py diff --git a/pyproject.toml b/pyproject.toml index 4fecd7c6..0460a394 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -88,6 +88,7 @@ defectdojo = "core_codemods:defectdojo_registry" [project.entry-points.sarif_detectors] "semgrep" = "codemodder.semgrep:SemgrepSarifToolDetector" +"codeql" = "codemodder.codeql:CodeQLSarifToolDetector" [tool.setuptools] diff --git a/src/codemodder/codemods/codeql.py b/src/codemodder/codemods/codeql.py new file mode 100644 index 00000000..b71b77c7 --- /dev/null +++ b/src/codemodder/codemods/codeql.py @@ -0,0 +1,29 @@ +from functools import cache +from pathlib import Path + +from codemodder.codemods.base_detector import BaseDetector +from codemodder.codeql import CodeQLResultSet +from codemodder.context import CodemodExecutionContext +from codemodder.result import ResultSet + + +class CodeQLSarifFileDetector(BaseDetector): + def apply( + self, + codemod_id: str, + context: CodemodExecutionContext, + files_to_analyze: list[Path], + ) -> ResultSet: + del codemod_id + del files_to_analyze + return process_codeql_findings( + tuple(context.tool_result_files_map.get("codeql", ())) + ) # Convert list to tuple for cache hashability + + +@cache +def process_codeql_findings(semgrep_sarif_files: tuple[str]) -> ResultSet: + results = CodeQLResultSet() + for file in semgrep_sarif_files or (): + results |= CodeQLResultSet.from_sarif(file) + return results diff --git a/src/codemodder/codeql.py b/src/codemodder/codeql.py new file mode 100644 index 00000000..d44437e9 --- /dev/null +++ b/src/codemodder/codeql.py @@ -0,0 +1,68 @@ +import json +from pathlib import Path + +from typing_extensions import Self + +from codemodder.result import LineInfo, Location, Result, ResultSet +from codemodder.sarifs import AbstractSarifToolDetector + + +class CodeQLSarifToolDetector(AbstractSarifToolDetector): + @classmethod + def detect(cls, run_data: dict) -> bool: + return "tool" in run_data and "CodeQL" in run_data["tool"]["driver"]["name"] + + +class CodeQLLocation(Location): + @classmethod + def from_sarif(cls, sarif_location) -> Self: + artifact_location = sarif_location["physicalLocation"]["artifactLocation"] + file = Path(artifact_location["uri"]) + start = LineInfo( + line=sarif_location["physicalLocation"]["region"]["startLine"], + column=sarif_location["physicalLocation"]["region"].get("startColumn"), + ) + end = LineInfo( + line=sarif_location["physicalLocation"]["region"].get( + "endLine", start.line + ), + column=sarif_location["physicalLocation"]["region"].get( + "endColumn", start.column + ), + ) + return cls(file=file, start=start, end=end) + + +class CodeQLResult(Result): + @classmethod + def from_sarif( + cls, sarif_result, sarif_run, rule_extensions, truncate_rule_id: bool = False + ) -> Self: + extension_index = sarif_result["rule"]["toolComponent"]["index"] + tool_index = sarif_result["rule"]["index"] + + rule_data = rule_extensions[extension_index]["rules"][tool_index] + + locations: list[Location] = [] + for location in sarif_result["locations"]: + codeql_location = CodeQLLocation.from_sarif(location) + locations.append(codeql_location) + return cls(rule_id=rule_data["id"], locations=locations) + + +class CodeQLResultSet(ResultSet): + @classmethod + def from_sarif(cls, sarif_file: str | Path, truncate_rule_id: bool = False) -> Self: + with open(sarif_file, "r", encoding="utf-8") as f: + data = json.load(f) + + result_set = cls() + for sarif_run in data["runs"]: + rule_extensions = sarif_run["tool"]["extensions"] + if CodeQLSarifToolDetector.detect(sarif_run): + for sarif_result in sarif_run["results"]: + codeql_result = CodeQLResult.from_sarif( + sarif_result, sarif_run, rule_extensions, truncate_rule_id + ) + result_set.add_result(codeql_result) + return result_set diff --git a/tests/test_codeql.py b/tests/test_codeql.py new file mode 100644 index 00000000..18575e48 --- /dev/null +++ b/tests/test_codeql.py @@ -0,0 +1,89 @@ +import json +from pathlib import Path +from unittest import TestCase, mock + +from codemodder.codeql import CodeQLResultSet + + +class TestCodeQLResultSet(TestCase): + + def test_from_sarif(self): + # Given a SARIF file with known content + sarif_content = { + "runs": [ + { + "tool": { + "driver": {"name": "CodeQL"}, + "extensions": [{"rules": [{"id": "python/sql-injection"}]}], + }, + "results": [ + { + "ruleId": "python/sql-injection", + "message": {"text": "Possible SQL injection"}, + "locations": [ + { + "physicalLocation": { + "artifactLocation": {"uri": "example.py"}, + "region": { + "startLine": 10, + "startColumn": 5, + "endLine": 10, + "endColumn": 20, + }, + } + } + ], + "rule": { + "toolComponent": {"index": 0}, + "index": 0, + }, + } + ], + } + ] + } + sarif_file = Path("/path/to/sarif/file.sarif") + with mock.patch( + "builtins.open", mock.mock_open(read_data=json.dumps(sarif_content)) + ): + # When parsing the SARIF file + result_set = CodeQLResultSet.from_sarif(sarif_file) + + # Then the result set should contain the expected results + self.assertEqual(len(result_set), 1) + self.assertIn("python/sql-injection", result_set) + self.assertEqual(len(result_set["python/sql-injection"]), 1) + self.assertEqual( + result_set["python/sql-injection"][Path("example.py")][0].rule_id, + "python/sql-injection", + ) + self.assertEqual( + result_set["python/sql-injection"][Path("example.py")][0] + .locations[0] + .file, + Path("example.py"), + ) + self.assertEqual( + result_set["python/sql-injection"][Path("example.py")][0] + .locations[0] + .start.line, + 10, + ) + self.assertEqual( + result_set["python/sql-injection"][Path("example.py")][0] + .locations[0] + .start.column, + 5, + ) + self.assertEqual( + result_set["python/sql-injection"][Path("example.py")][0] + .locations[0] + .end.line, + 10, + ) + self.assertEqual( + result_set["python/sql-injection"][Path("example.py")][0] + .locations[0] + .end.column, + 20, + )