Skip to content

Commit

Permalink
Refactor UnitTestValidator to remove duplication of handling report f…
Browse files Browse the repository at this point in the history
…rom CoverageProcessing (#209)

* Refactor UnitTestValidator to remove duplication of handling report from CoverageProcessing

  Code to run tests and process coverage report  is duplicated
  between `run_coverage` and `validate_test`. While the code in
  `run_coverage` is initializing `last_coverage_percentages`
  during initialization, the code in `validate_test` is updating
  the coverage percentages after adding generated tests.

  This PR attempts to just combine the handling of Union(Tuple, dict)
  from CoverageProcessor as it return Union(Tuple, dict).

* fix incorrect variable reference
  • Loading branch information
coderustic authored Nov 12, 2024
1 parent 5860b4d commit f329e00
Showing 1 changed file with 68 additions and 99 deletions.
167 changes: 68 additions & 99 deletions cover_agent/UnitTestValidator.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,14 @@ def __init__(
with open(self.source_file_path, "r") as f:
self.source_code = f.read()

# initialize the coverage processor
self.coverage_processor = CoverageProcessor(
file_path=self.code_coverage_report_path,
src_file_path=self.source_file_path,
coverage_type=self.coverage_type,
use_report_coverage_feature_flag=self.use_report_coverage_feature_flag
)

def get_coverage(self):
"""
Run code coverage and build the prompt to be used for generating tests.
Expand Down Expand Up @@ -269,60 +277,13 @@ def run_coverage(self):
exit_code == 0
), f'Fatal: Error running test command. Are you sure the command is correct? "{self.test_command}"\nExit code {exit_code}. \nStdout: \n{stdout} \nStderr: \n{stderr}'

# Instantiate CoverageProcessor and process the coverage report
coverage_processor = CoverageProcessor(
file_path=self.code_coverage_report_path,
src_file_path=self.source_file_path,
coverage_type=self.coverage_type,
use_report_coverage_feature_flag=self.use_report_coverage_feature_flag
)

# Use the process_coverage_report method of CoverageProcessor, passing in the time the test command was executed
try:
if self.use_report_coverage_feature_flag:
self.logger.info(
"Using the report coverage feature flag to process the coverage report"
)
file_coverage_dict = coverage_processor.process_coverage_report(
time_of_test_command=time_of_test_command
)
total_lines_covered = 0
total_lines_missed = 0
total_lines = 0
for key in file_coverage_dict:
lines_covered, lines_missed, percentage_covered = (
file_coverage_dict[key]
)
total_lines_covered += len(lines_covered)
total_lines_missed += len(lines_missed)
total_lines += len(lines_covered) + len(lines_missed)
if key == self.source_file_path:
self.last_source_file_coverage = percentage_covered
if key not in self.last_coverage_percentages:
self.last_coverage_percentages[key] = 0
self.last_coverage_percentages[key] = percentage_covered
try:
percentage_covered = total_lines_covered / total_lines
except ZeroDivisionError:
self.logger.error(f"ZeroDivisionError: Attempting to perform total_lines_covered / total_lines: {total_lines_covered} / {total_lines}.")
percentage_covered = 0

self.logger.info(
f"Total lines covered: {total_lines_covered}, Total lines missed: {total_lines_missed}, Total lines: {total_lines}"
)
self.logger.info(
f"coverage: Percentage {round(percentage_covered * 100, 2)}%"
)
else:
lines_covered, lines_missed, percentage_covered = (
coverage_processor.process_coverage_report(
time_of_test_command=time_of_test_command
)
)

# Process the extracted coverage metrics
self.current_coverage = percentage_covered
self.code_coverage_report = f"Lines covered: {lines_covered}\nLines missed: {lines_missed}\nPercentage covered: {round(percentage_covered * 100, 2)}%"
coverage, coverage_percentages = self.post_process_coverage_report(
time_of_test_command
)
self.current_coverage = coverage
self.last_coverage_percentages = coverage_percentages.copy()
except AssertionError as error:
# Handle the case where the coverage report does not exist or was not updated after the test command
self.logger.error(f"Error in coverage processing: {error}")
Expand Down Expand Up @@ -523,43 +484,9 @@ def validate_test(self, generated_test: dict, num_attempts=1):

# If test passed, check for coverage increase
try:
# Step 4: Check that the coverage has increased using the CoverageProcessor class
new_coverage_processor = CoverageProcessor(
file_path=self.code_coverage_report_path,
src_file_path=self.source_file_path,
coverage_type=self.coverage_type,
use_report_coverage_feature_flag=self.use_report_coverage_feature_flag,
new_percentage_covered, new_coverage_percentages = self.post_process_coverage_report(
time_of_test_command
)
coverage_percentages = {}

if self.use_report_coverage_feature_flag:
self.logger.info(
"Using the report coverage feature flag to process the coverage report"
)
file_coverage_dict = new_coverage_processor.process_coverage_report(
time_of_test_command=time_of_test_command
)
total_lines_covered = 0
total_lines_missed = 0
total_lines = 0
for key in file_coverage_dict:
lines_covered, lines_missed, percentage_covered = (
file_coverage_dict[key]
)
total_lines_covered += len(lines_covered)
total_lines_missed += len(lines_missed)
total_lines += len(lines_covered) + len(lines_missed)
if key not in coverage_percentages:
coverage_percentages[key] = 0
coverage_percentages[key] = percentage_covered

new_percentage_covered = total_lines_covered / total_lines
else:
_, _, new_percentage_covered = (
new_coverage_processor.process_coverage_report(
time_of_test_command=time_of_test_command
)
)

if new_percentage_covered <= self.current_coverage:
# Coverage has not increased, rollback the test by removing it from the test file
Expand Down Expand Up @@ -633,20 +560,17 @@ def validate_test(self, generated_test: dict, num_attempts=1):
additional_imports_lines
) # this is important, otherwise the next test will be inserted at the wrong line

self.current_coverage = new_percentage_covered

for key in coverage_percentages:
if key not in self.last_coverage_percentages:
self.last_coverage_percentages[key] = 0
if coverage_percentages[key] > self.last_coverage_percentages[key] and key == self.source_file_path.split("/")[-1]:
for key in new_coverage_percentages:
if new_coverage_percentages[key] > self.last_coverage_percentages[key] and key == self.source_file_path.split("/")[-1]:
self.logger.info(
f"Coverage for provided source file: {key} increased from {round(self.last_coverage_percentages[key] * 100, 2)} to {round(coverage_percentages[key] * 100, 2)}"
f"Coverage for provided source file: {key} increased from {round(self.last_coverage_percentages[key] * 100, 2)} to {round(new_coverage_percentages[key] * 100, 2)}"
)
elif coverage_percentages[key] > self.last_coverage_percentages[key]:
elif new_coverage_percentages[key] > self.last_coverage_percentages[key]:
self.logger.info(
f"Coverage for non-source file: {key} increased from {round(self.last_coverage_percentages[key] * 100, 2)} to {round(coverage_percentages[key] * 100, 2)}"
f"Coverage for non-source file: {key} increased from {round(self.last_coverage_percentages[key] * 100, 2)} to {round(new_coverage_percentages[key] * 100, 2)}"
)
self.last_coverage_percentages[key] = coverage_percentages[key]
self.current_coverage = new_percentage_covered
self.last_coverage_percentages = new_coverage_percentages.copy()

self.logger.info(
f"Test passed and coverage increased. Current coverage: {round(new_percentage_covered * 100, 2)}%"
Expand Down Expand Up @@ -738,4 +662,49 @@ def extract_error_message(self, stderr, stdout):
except Exception as e:
logging.error(f"ERROR: Unable to extract error message from inputs using LLM.\nSTDERR: {stderr}\nSTDOUT: {stdout}")
logging.error(f"Error extracting error message: {e}")
return ""
return ""

def post_process_coverage_report(self, time_of_test_command):
coverage_percentages = {}
if self.use_report_coverage_feature_flag:
self.logger.info(
"Using the report coverage feature flag to process the coverage report"
)
file_coverage_dict = self.coverage_processor.process_coverage_report(
time_of_test_command=time_of_test_command
)
total_lines_covered = 0
total_lines_missed = 0
total_lines = 0
for key in file_coverage_dict:
lines_covered, lines_missed, percentage_covered = (
file_coverage_dict[key]
)
total_lines_covered += len(lines_covered)
total_lines_missed += len(lines_missed)
total_lines += len(lines_covered) + len(lines_missed)
if key == self.source_file_path:
self.last_source_file_coverage = percentage_covered
if key not in coverage_percentages:
coverage_percentages[key] = 0
coverage_percentages[key] = percentage_covered
try:
percentage_covered = total_lines_covered / total_lines
except ZeroDivisionError:
self.logger.error(f"ZeroDivisionError: Attempting to perform total_lines_covered / total_lines: {total_lines_covered} / {total_lines}.")
percentage_covered = 0

self.logger.info(
f"Total lines covered: {total_lines_covered}, Total lines missed: {total_lines_missed}, Total lines: {total_lines}"
)
self.logger.info(
f"coverage: Percentage {round(percentage_covered * 100, 2)}%"
)
else:
lines_covered, lines_missed, percentage_covered = (
self.coverage_processor.process_coverage_report(
time_of_test_command=time_of_test_command
)
)
self.code_coverage_report = f"Lines covered: {lines_covered}\nLines missed: {lines_missed}\nPercentage covered: {round(percentage_covered * 100, 2)}%"
return percentage_covered, coverage_percentages

0 comments on commit f329e00

Please sign in to comment.