Skip to content

Commit

Permalink
Enhanced coverage processing (#2) (#254)
Browse files Browse the repository at this point in the history
* Enhanced coverage processing (#2)

* While earlier PR[#230] managed to breakdown processing
  code into a class hierarechy, there wasnt any changes
  made to the code. This PR brings in enhancements to
  coverage processing where coverage data is stored by
  entity (Class or File).

* Coverage data is stored using a FQDN so that conflicts
  are taken care. This closes[#251]

* Earlier PR broke the behaviour of the agent that only
  target file coverage is considered if the global coverage
  flag is not set by the user, this PR fixes it to bring
  back the original behaviour.

* removed sample-reports

* bump version
  • Loading branch information
coderustic authored Jan 7, 2025
1 parent 4b0ac90 commit fa343e2
Show file tree
Hide file tree
Showing 5 changed files with 259 additions and 182 deletions.
1 change: 0 additions & 1 deletion cover_agent/UnitTestValidator.py
Original file line number Diff line number Diff line change
Expand Up @@ -681,7 +681,6 @@ def post_process_coverage_report(self, time_of_test_command: int):
report_path=self.code_coverage_report_path,
src_file_path=self.source_file_path,
is_global_coverage_enabled=self.use_report_coverage_feature_flag,
file_pattern=None,
diff_coverage_report_path=self.diff_cover_report_path,
)
self.logger.info(
Expand Down
255 changes: 118 additions & 137 deletions cover_agent/coverage/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class CoverageData:
missed (int) : The number of lines that are not covered by tests.
coverage (float) : The coverage percentage of the file or class.
"""
is_target_file: bool
covered_lines: List[int]
covered: int
missed_lines: List[int]
Expand All @@ -51,6 +52,19 @@ class CoverageReport:
total_coverage: float
file_coverage: Dict[str, CoverageData]

def filter_to_target_coverage(self) -> "CoverageReport":
"""
Returns a new CoverageReport object with only the target file's coverage data.
"""
target_coverage = {
file: coverage
for file, coverage in self.file_coverage.items()
if coverage.is_target_file
}
total_lines = sum(len(cov.covered_lines) + len(cov.missed_lines) for cov in target_coverage.values())
total_coverage = (sum(len(cov.covered_lines) for cov in target_coverage.values()) / total_lines) if total_lines > 0 else 0.0
return CoverageReport(total_coverage, target_coverage)

class CoverageProcessor(ABC):
"""
Abstract base class for processing coverage reports.
Expand Down Expand Up @@ -102,14 +116,12 @@ def process_coverage_report(self, time_of_test_command: int) -> CoverageReport:
Processes the coverage report and returns the coverage data.
"""
self._is_coverage_valid(time_of_test_command=time_of_test_command)
coverage = self.parse_coverage_report()
report = CoverageReport(0.0, coverage)
if coverage:
total_covered = sum(cov.covered for cov in coverage.values())
total_missed = sum(cov.missed for cov in coverage.values())
total_lines = total_covered + total_missed
report.total_coverage = (float(total_covered) / float(total_lines)) if total_lines > 0 else 0.0
return report
coverage_data = self.parse_coverage_report()
total_covered = sum(cov.covered for cov in coverage_data.values())
total_missed = sum(cov.missed for cov in coverage_data.values())
total_lines = total_covered + total_missed
total_coverage = (total_covered / total_lines) if total_lines > 0 else 0.0
return CoverageReport(total_coverage, coverage_data)

def _is_coverage_valid(
self, time_of_test_command: int
Expand Down Expand Up @@ -149,7 +161,7 @@ def _is_report_obsolete(self, time_of_test_command: int) -> bool:
bool: True if the report is obsolete, False otherwise.
"""
return int(round(os.path.getmtime(self.file_path) * 1000)) < time_of_test_command

class CoberturaProcessor(CoverageProcessor):
"""
A class to process Cobertura code coverage reports.
Expand All @@ -160,14 +172,28 @@ def parse_coverage_report(self) -> Dict[str, CoverageData]:
tree = ET.parse(self.file_path)
root = tree.getroot()
coverage = {}
for cls in root.findall(".//class"):
cls_filename = cls.get("filename")
if cls_filename:
coverage[cls_filename] = self._parse_coverage_data_for_class(cls)
for package in root.findall(".//package"):
# Package name could be '.' if the class is in the default package
# Eg: <package name="." line-rate="0.8143" branch-rate="0" complexity="0">
# In such cases, lets use default as the package name.
package_name = package.get("name", ".")
if package_name == ".":
package_name = "default"
for cls in package.findall(".//class"):
# In languages where Class is not a first class citizen,
# the class name is set to the file name as you can see
# in the below example from the Cobertura XML report.
# Usually this could be your util files. So we are good
# to consier name as the key for the CoverageData.
# Eg: <class name="utils.py" filename="utils.py" complexity="0" line-rate="0.8794" branch-rate="0">
class_name = cls.get("name", "")
fully_qualified_name = f"{package_name}.{class_name}".strip('.')
coverage[fully_qualified_name] = self._parse_class_coverage(cls)
return coverage

def _parse_coverage_data_for_class(self, cls) -> CoverageData:
lines_covered, lines_missed = [], []
def _parse_class_coverage(self, cls) -> CoverageData:
lines_covered = []
lines_missed = []
for line in cls.findall(".//line"):
line_number = int(line.get("number"))
hits = int(line.get("hits"))
Expand All @@ -176,9 +202,12 @@ def _parse_coverage_data_for_class(self, cls) -> CoverageData:
else:
lines_missed.append(line_number)
total_lines = len(lines_covered) + len(lines_missed)
coverage_percentage = (float(len(lines_covered)) / total_lines) if total_lines > 0 else 0.0
return CoverageData(lines_covered, len(lines_covered), lines_missed, len(lines_missed), coverage_percentage)

coverage = (len(lines_covered) / total_lines) if total_lines > 0 else 0.0
is_target = False
if self.src_file_path.endswith(cls.get("filename")):
is_target = True
return CoverageData(is_target, lines_covered, len(lines_covered), lines_missed, len(lines_missed), coverage)

class LcovProcessor(CoverageProcessor):
"""
A class to process LCOV code coverage reports.
Expand Down Expand Up @@ -206,7 +235,10 @@ def parse_coverage_report(self) -> Dict[str, CoverageData]:
break
total_lines = len(lines_covered) + len(lines_missed)
coverage_percentage = (float(len(lines_covered)) / total_lines) if total_lines > 0 else 0.0
coverage[filename] = CoverageData(lines_covered, len(lines_covered), lines_missed, len(lines_missed), coverage_percentage)
is_target = False
if filename == self.src_file_path:
is_target = True
coverage[filename] = CoverageData(is_target, lines_covered, len(lines_covered), lines_missed, len(lines_missed), coverage_percentage)
except (FileNotFoundError, IOError) as e:
self.logger.error(f"Error reading file {self.file_path}: {e}")
raise
Expand All @@ -222,85 +254,61 @@ class JacocoProcessor(CoverageProcessor):
reports in both XML and CSV formats.
"""
def parse_coverage_report(self) -> Dict[str, CoverageData]:
coverage = {}
package_name, class_name = self._extract_package_and_class_java()
file_extension = self._get_file_extension(self.file_path)
if file_extension == 'xml':
missed, covered = self._parse_jacoco_xml(class_name=class_name)
elif file_extension == 'csv':
missed, covered = self._parse_jacoco_csv(package_name=package_name, class_name=class_name)
extension = os.path.splitext(self.file_path)[1].lower()
if extension == ".xml":
return self._parse_xml()
elif extension == ".csv":
return self._parse_csv()
else:
raise ValueError(f"Unsupported JaCoCo code coverage report format: {file_extension}")
total_lines = missed + covered
coverage_percentage = (float(covered) / total_lines) if total_lines > 0 else 0.0
coverage[class_name] = CoverageData(covered_lines=[], covered=covered, missed_lines=[], missed=missed, coverage=coverage_percentage)
return coverage

def _get_file_extension(self, filename: str) -> str | None:
"""Get the file extension from a given filename."""
return os.path.splitext(filename)[1].lstrip(".")
raise ValueError(f"Unsupported JaCoCo report format: {extension}")

def _extract_package_and_class_java(self):
package_pattern = re.compile(r"^\s*package\s+([\w\.]+)\s*;.*$")
class_pattern = re.compile(r"^\s*public\s+class\s+(\w+).*")

package_name = ""
class_name = ""
try:
with open(self.src_file_path, "r") as file:
for line in file:
if not package_name: # Only match package if not already found
package_match = package_pattern.match(line)
if package_match:
package_name = package_match.group(1)

if not class_name: # Only match class if not already found
class_match = class_pattern.match(line)
if class_match:
class_name = class_match.group(1)

if package_name and class_name: # Exit loop if both are found
break
except (FileNotFoundError, IOError) as e:
self.logger.error(f"Error reading file {self.src_file_path}: {e}")
raise

return package_name, class_name

def _parse_jacoco_xml(
self, class_name: str
) -> tuple[int, int]:
def _parse_xml(self) -> Dict[str, CoverageData]:
"""Parses a JaCoCo XML code coverage report to extract covered and missed line numbers for a specific file."""
tree = ET.parse(self.file_path)
root = tree.getroot()
sourcefile = root.find(f".//sourcefile[@name='{class_name}.java']")

if sourcefile is None:
return 0, 0

missed, covered = 0, 0
for counter in sourcefile.findall('counter'):
if counter.attrib.get('type') == 'LINE':
missed += int(counter.attrib.get('missed', 0))
covered += int(counter.attrib.get('covered', 0))
break

return missed, covered
def _parse_jacoco_csv(self, package_name, class_name) -> Dict[str, CoverageData]:
with open(self.file_path, "r") as file:
reader = csv.DictReader(file)
missed, covered = 0, 0
for row in reader:
if row["PACKAGE"] == package_name and row["CLASS"] == class_name:
try:
missed = int(row["LINE_MISSED"])
covered = int(row["LINE_COVERED"])
coverage = {}
for package in root.findall(".//package"):
package_name = package.get("name", "")
for cls in package.findall(".//class"):
class_name = cls.get("sourcefilename", "")
fully_qualified_name = f"{package_name}.{class_name}".replace("/", ".")
missed = 0
covered = 0
for counter in cls.findall('counter'):
if counter.attrib.get('type') == 'LINE':
missed += int(counter.attrib.get('missed', 0))
covered += int(counter.attrib.get('covered', 0))
break
except KeyError as e:
self.logger.error(f"Missing expected column in CSV: {e}")
raise
total_lines = covered + missed
coverage_percentage = (covered / total_lines) if total_lines > 0 else 0.0
is_target = False
src_path = cls.get("name", "")
if f"{src_path}/{class_name}" == self.src_file_path:
is_target = True
# TODO: Add support for identifying which lines are covered and missed
coverage[fully_qualified_name] = CoverageData(is_target, [], covered, [], missed, coverage_percentage)
return coverage

return missed, covered
def _parse_csv(self) -> Dict[str, CoverageData]:
coverage = {}
with open(self.file_path, "r") as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
group = row.get("GROUP", "").strip()
package_name = row.get("PACKAGE", "").strip()
class_name = row.get("CLASS", "").strip()
fully_qualified_name = f"{group}.{package_name}.{class_name}".strip('.')

covered = int(row.get("LINE_COVERED", 0))
missed = int(row.get("LINE_MISSED", 0))
total = covered + missed
coverage_percentage = (covered / total) if total > 0 else 0.0
is_target = False
src_path = package_name.replace(".", "/")
if f"{src_path}/{class_name}" == self.src_file_path:
is_target = True
coverage[fully_qualified_name] = CoverageData(is_target, [], covered, [], missed, coverage_percentage)
return coverage

class DiffCoverageProcessor(CoverageProcessor):
"""
Expand Down Expand Up @@ -362,45 +370,18 @@ def parse_coverage_report(self) -> Dict[str, CoverageData]:
violation_lines = []
coverage_percentage = 0.0

coverage[self.file_path] = CoverageData(covered_lines=covered_lines, covered=len(covered_lines), missed_lines=violation_lines,missed=len(violation_lines), coverage=coverage_percentage)
# Consider every file as target file during diff coverage
coverage[self.file_path] = CoverageData(is_target_file=True, covered_lines=covered_lines, covered=len(covered_lines), missed_lines=violation_lines,missed=len(violation_lines), coverage=coverage_percentage)
return coverage

class CoverageReportFilter:
"""
A class to filter coverage reports based on
file patterns. This class abstracts the logic
for filtering coverage reports based on file
patterns.
"""
def filter_report(self, report: CoverageReport, file_pattern: str) -> CoverageReport:
"""
Filters the coverage report based on the specified file pattern.
Args:
report (CoverageReport): The coverage report to filter.
file_pattern (str): The file pattern to filter by.
Returns:
CoverageReport: The filtered coverage report.
"""
filtered_coverage = {
file: coverage
for file, coverage in report.file_coverage.items()
if file_pattern in file
}
total_lines = sum(len(cov.covered_lines) + len(cov.missed_lines) for cov in filtered_coverage.values())
total_coverage = (sum(len(cov.covered_lines) for cov in filtered_coverage.values()) / total_lines) if total_lines > 0 else 0.0
return CoverageReport(total_coverage = total_coverage, file_coverage=filtered_coverage)

class CoverageProcessorFactory:
"""Factory for creating coverage processors based on tool type."""

@staticmethod
def create_processor(
tool_type: str,
report_path: str,
src_file_path: str,
diff_coverage_report_path: Optional[str] = None
diff_report_path: Optional[str] = None
) -> CoverageProcessor:
"""
Creates appropriate coverage processor instance.
Expand All @@ -416,25 +397,28 @@ def create_processor(
Raises:
ValueError: If invalid tool type specified
"""
processors = {
'cobertura': CoberturaProcessor,
'jacoco': JacocoProcessor,
'lcov': LcovProcessor,
'diff_cover_json': DiffCoverageProcessor
processor_map = {
"cobertura": CoberturaProcessor,
"lcov": LcovProcessor,
"jacoco": JacocoProcessor,
"diff_cover_json": DiffCoverageProcessor,
}
if tool_type.lower() not in processors:
raise ValueError(f"Invalid coverage type specified: {tool_type}")
if tool_type.lower() == 'diff_cover_json':
return DiffCoverageProcessor(diff_coverage_report_path, report_path, src_file_path)
return processors[tool_type.lower()](report_path, src_file_path)
if tool_type.lower() not in processor_map:
raise ValueError(f"Unsupported tool type: {tool_type}")

if tool_type.lower() == "diff_cover_json":
if not diff_report_path:
raise ValueError("Diff report path must be provided for diff processor.")
return DiffCoverageProcessor(report_path, src_file_path, diff_report_path)

return processor_map[tool_type.lower()](report_path, src_file_path)

def process_coverage(
tool_type: str,
time_of_test_command: int,
report_path: str,
src_file_path: str,
is_global_coverage_enabled: bool = True,
file_pattern: Optional[str] = None,
diff_coverage_report_path: Optional[str] = None
) -> CoverageReport:
# Create appropriate processor
Expand All @@ -446,8 +430,5 @@ def process_coverage(
if is_global_coverage_enabled:
return report

# Apply filtering if needed
if file_pattern:
filter = CoverageReportFilter()
report = filter.filter_report(report, file_pattern)
return report
# If global coverage is disabled, filter to target coverage
return report.filter_to_target_coverage()
2 changes: 1 addition & 1 deletion cover_agent/version.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.2.13
0.2.14
Loading

0 comments on commit fa343e2

Please sign in to comment.