From f0e6d87a17dec8a43153e5aa1474cc6f714eef9c Mon Sep 17 00:00:00 2001 From: Embedded DevOps Date: Tue, 10 Sep 2024 19:48:05 -0700 Subject: [PATCH] Initial commit of mutation test. Updated log and LLM call. --- cover_agent/CoverAgent.py | 5 + cover_agent/PromptBuilder.py | 22 +++- cover_agent/UnitTestGenerator.py | 117 +++++++++++++++++- cover_agent/main.py | 10 ++ cover_agent/settings/config_loader.py | 1 + .../settings/mutation_test_prompt.toml | 85 +++++++++++++ templated_tests/python_fastapi/test_app.py | 25 ++++ tests/test_CoverAgent.py | 8 +- 8 files changed, 264 insertions(+), 9 deletions(-) create mode 100644 cover_agent/settings/mutation_test_prompt.toml diff --git a/cover_agent/CoverAgent.py b/cover_agent/CoverAgent.py index 6f3de76fb..8aed51a8d 100644 --- a/cover_agent/CoverAgent.py +++ b/cover_agent/CoverAgent.py @@ -39,6 +39,8 @@ def __init__(self, args): llm_model=args.model, api_base=args.api_base, use_report_coverage_feature_flag=args.use_report_coverage_feature_flag, + mutation_testing=args.mutation_testing, + more_mutation_logging=args.more_mutation_logging, ) def _validate_paths(self): @@ -151,6 +153,9 @@ def run(self): # Run the coverage tool again if the desired coverage hasn't been reached self.test_gen.run_coverage() + if self.args.mutation_testing: + self.test_gen.run_mutations() + # Log the final coverage if self.test_gen.current_coverage >= (self.test_gen.desired_coverage / 100): self.logger.info( diff --git a/cover_agent/PromptBuilder.py b/cover_agent/PromptBuilder.py index 6b81bf58d..b7205fd66 100644 --- a/cover_agent/PromptBuilder.py +++ b/cover_agent/PromptBuilder.py @@ -42,6 +42,7 @@ def __init__( additional_instructions: str = "", failed_test_runs: str = "", language: str = "python", + mutation_testing: bool = False, ): """ The `PromptBuilder` class is responsible for building a formatted prompt string by replacing placeholders with the actual content of files read during initialization. It takes in various paths and settings as parameters and provides a method to generate the prompt. @@ -72,6 +73,7 @@ def __init__( self.test_file = self._read_file(test_file_path) self.code_coverage_report = code_coverage_report self.language = language + self.mutation_testing = mutation_testing # add line numbers to each line in 'source_file'. start from 1 self.source_file_numbered = "\n".join( [f"{i + 1} {line}" for i, line in enumerate(self.source_file.split("\n"))] @@ -141,12 +143,20 @@ def build_prompt(self) -> dict: } environment = Environment(undefined=StrictUndefined) try: - system_prompt = environment.from_string( - get_settings().test_generation_prompt.system - ).render(variables) - user_prompt = environment.from_string( - get_settings().test_generation_prompt.user - ).render(variables) + if self.mutation_testing: + system_prompt = environment.from_string( + get_settings().mutation_test_prompt.system + ).render(variables) + user_prompt = environment.from_string( + get_settings().mutation_test_prompt.user + ).render(variables) + else: + system_prompt = environment.from_string( + get_settings().test_generation_prompt.system + ).render(variables) + user_prompt = environment.from_string( + get_settings().test_generation_prompt.user + ).render(variables) except Exception as e: logging.error(f"Error rendering prompt: {e}") return {"system": "", "user": ""} diff --git a/cover_agent/UnitTestGenerator.py b/cover_agent/UnitTestGenerator.py index 0c73ecfc4..b89b7187d 100644 --- a/cover_agent/UnitTestGenerator.py +++ b/cover_agent/UnitTestGenerator.py @@ -4,6 +4,9 @@ import logging import os import re +import json + +from wandb.sdk.data_types.trace_tree import Trace from cover_agent.AICaller import AICaller from cover_agent.CoverageProcessor import CoverageProcessor @@ -14,6 +17,10 @@ from cover_agent.settings.config_loader import get_settings from cover_agent.utils import load_yaml +import subprocess + +from shlex import split + class UnitTestGenerator: def __init__( @@ -30,6 +37,8 @@ def __init__( desired_coverage: int = 90, # Default to 90% coverage if not specified additional_instructions: str = "", use_report_coverage_feature_flag: bool = False, + mutation_testing: bool = False, + more_mutation_logging: bool = False, ): """ Initialize the UnitTestGenerator class with the provided parameters. @@ -65,6 +74,8 @@ def __init__( self.additional_instructions = additional_instructions self.language = self.get_code_language(source_file_path) self.use_report_coverage_feature_flag = use_report_coverage_feature_flag + self.mutation_testing = mutation_testing + self.more_mutation_logging = more_mutation_logging self.last_coverage_percentages = {} self.llm_model = llm_model @@ -91,6 +102,7 @@ def get_coverage_and_build_prompt(self): Returns: None """ + # Run coverage and build the prompt self.run_coverage() self.prompt = self.build_prompt() @@ -213,7 +225,7 @@ def run_coverage(self): "Will default to using the full coverage report. You will need to check coverage manually for each passing test." ) with open(self.code_coverage_report_path, "r") as f: - self.code_coverage_report = f.read() + self.code_coverage_report = f.read() @staticmethod def get_included_files(included_files): @@ -761,6 +773,109 @@ def to_dict(self): def to_json(self): return json.dumps(self.to_dict()) + def run_mutations(self): + self.logger.info("Running mutation tests") + + # Run mutation tests + + mutation_prompt_builder = PromptBuilder( + source_file_path=self.source_file_path, + test_file_path=self.test_file_path, + code_coverage_report=self.code_coverage_report, + included_files=self.included_files, + additional_instructions=self.additional_instructions, + failed_test_runs=self.failed_test_runs, + language=self.language, + mutation_testing=True + ) + + mutation_prompt = mutation_prompt_builder.build_prompt() + + response, prompt_token_count, response_token_count = ( + self.ai_caller.call_model(prompt=mutation_prompt) + ) + + mutation_dict = load_yaml(response) + + for mutation in mutation_dict["mutations"]: + result = self.run_mutation(mutation) + + # Prepare the log message with banners + log_message = f"Mutation result (return code: {result.returncode}):\n" + if result.returncode == 0: + log_message += "Mutation survived.\n" + else: + log_message += "Mutation caught.\n" + + # Add STDOUT to the log message if it's not empty + if result.stdout.strip() and self.more_mutation_logging: + log_message += "\n" + "="*10 + " STDOUT " + "="*10 + "\n" + log_message += result.stdout + + # Add STDERR to the log message if it's not empty + if result.stderr.strip() and self.more_mutation_logging: + log_message += "\n" + "="*10 + " STDERR " + "="*10 + "\n" + log_message += result.stderr + + + self.logger.info(log_message) + + + def run_mutation(self, mutation): + mutated_code = mutation.get("mutated_version", None) + line_number = mutation.get("location", None) + + + # Read the original content + with open(self.source_file_path, "r") as source_file: + original_content = source_file.readlines() + + # Determine the indentation level of the line at line_number + indentation = len(original_content[line_number]) - len(original_content[line_number].lstrip()) + + # Adjust the indentation of the mutated code + adjusted_mutated_code = [ + ' ' * indentation + line if line.strip() else line + for line in mutated_code.split("\n") + ] + + # Insert the mutated code at the specified spot + modified_content = ( + original_content[:line_number - 1] + + adjusted_mutated_code + + original_content[line_number:] + ) + + # Write the modified content back to the file + with open(self.source_file_path, "w") as source_file: + source_file.writelines(modified_content) + source_file.flush() + + # Step 2: Run the test using the Runner class + self.logger.info( + f'Running test with the following command: "{self.test_command}"' + ) + stdout, stderr, exit_code, time_of_test_command = Runner.run_command( + command=self.test_command, cwd=self.test_command_dir + ) + + try: + result = subprocess.run( + split(self.test_command), + text=True, + capture_output=True, + cwd=self.test_command_dir, + timeout=30, + ) + except Exception as e: + logging.error(f"Error running test command: {e}") + result = None + finally: + # Write the modified content back to the file + with open(self.source_file_path, "w") as source_file: + source_file.writelines(original_content) + source_file.flush() + return result def extract_error_message_python(fail_message): """ diff --git a/cover_agent/main.py b/cover_agent/main.py index 1a9a9f10c..750df9e07 100644 --- a/cover_agent/main.py +++ b/cover_agent/main.py @@ -101,6 +101,16 @@ def parse_args(): default="", help="Path to optional log database. Default: %(default)s.", ) + parser.add_argument( + "--mutation-testing", + action="store_true", + help="Setting this to True enables mutation testing. Default: False.", + ) + parser.add_argument( + "--more-mutation-logging", + action="store_true", + help="Setting this to True enables more logging. Default: False.", + ) return parser.parse_args() diff --git a/cover_agent/settings/config_loader.py b/cover_agent/settings/config_loader.py index 1d2622d55..138aef751 100644 --- a/cover_agent/settings/config_loader.py +++ b/cover_agent/settings/config_loader.py @@ -7,6 +7,7 @@ "language_extensions.toml", "analyze_suite_test_headers_indentation.toml", "analyze_suite_test_insert_line.toml", + "mutation_test_prompt.toml", ] diff --git a/cover_agent/settings/mutation_test_prompt.toml b/cover_agent/settings/mutation_test_prompt.toml new file mode 100644 index 000000000..8e861607d --- /dev/null +++ b/cover_agent/settings/mutation_test_prompt.toml @@ -0,0 +1,85 @@ +[mutation_test_prompt] +system="""\ +""" + +user="""\ + +You are an AI mutation testing agent tasked with mutating {{ language }} code to evaluate its robustness. + +Mutation Strategy: + +1. Logic Tweaks: + Modify conditions (e.g., 'if (a < b)' to 'if (a <= b)') + Adjust loop boundaries + Introduce minor calculation errors + Avoid drastic changes or infinite loops. + +2. Output Modifications: + Change return types or formats + Alter response structures + Return corrupted or incorrect data + +3. Method Interference: + Alter function parameters + Replace or omit key method calls + +4. Failure Injection: + Introduce exceptions or error states + Simulate system or resource failures + +5.Data Handling Faults: + Inject parsing errors + Bypass data validation + Corrupt object states + +6. Boundary Condition Testing: + Use out-of-bounds indices + Test extreme or edge-case parameters + +7. Concurrency Issues: + Simulate race conditions or deadlocks + Introduce timeouts or delays + +8. Security Vulnerabilities: + Replicate common vulnerabilities (e.g., buffer overflow, SQL injection, XSS) + Introduce authentication or authorization bypasses + + +Focus on subtle, realistic mutations that challenge the code's resilience while keeping core functionality intact. Prioritize scenarios likely to arise from programming errors or edge cases. + + +## Source Code to add Mutations to: {{ source_file_name }} +```{{language}} +{{ source_file_numbered }} +``` + +## Task +1. Conduct a line-by-line analysis of the source code. +2. Generate mutations for each test case. +3. Prioritize mutating function blocks and critical code sections. +4. Ensure the mutations offer meaningful insights into code quality and test coverage. +5. Present the output in order of ascending line numbers. +6. Avoid including manually inserted line numbers in the response. +7. Limit mutations to single-line changes only. + +Example output: +```yaml +file: {{source_file}} +mutations: + - method: + category: + summary: + location: + original: | + + mutated_version: | + +``` + +Use block scalar('|') to format each YAML output. + +Response (should be a valid YAML, and nothing else): +```yaml + +Generate mutants that test the code’s resilience while preserving core functionality. Output only in YAML format, with no additional explanations or comments. +""" \ No newline at end of file diff --git a/templated_tests/python_fastapi/test_app.py b/templated_tests/python_fastapi/test_app.py index 8c391f3e9..60be8f415 100644 --- a/templated_tests/python_fastapi/test_app.py +++ b/templated_tests/python_fastapi/test_app.py @@ -3,6 +3,7 @@ from app import app from datetime import date +import math client = TestClient(app) def test_root(): @@ -13,3 +14,27 @@ def test_root(): assert response.status_code == 200 assert response.json() == {"message": "Welcome to the FastAPI application!"} + +def test_sqrt_negative_number(): + response = client.get("/sqrt/-4") + assert response.status_code == 400 + assert response.json() == {"detail": "Cannot take square root of a negative number"} + + +def test_divide_by_zero(): + response = client.get("/divide/10/0") + assert response.status_code == 400 + assert response.json() == {"detail": "Cannot divide by zero"} + + +def test_add(): + response = client.get("/add/3/5") + assert response.status_code == 200 + assert response.json() == {"result": 8} + + +def test_current_date(): + response = client.get("/current-date") + assert response.status_code == 200 + assert response.json() == {"date": date.today().isoformat()} + diff --git a/tests/test_CoverAgent.py b/tests/test_CoverAgent.py index ebc084e0f..245228df0 100644 --- a/tests/test_CoverAgent.py +++ b/tests/test_CoverAgent.py @@ -119,7 +119,9 @@ def test_duplicate_test_file_with_output_path(self, mock_isfile, mock_copy): model="openai/test-model", api_base="openai/test-api", use_report_coverage_feature_flag=False, - log_db_path="" + log_db_path="", + mutation_testing=False, + more_mutation_logging=False, ) with pytest.raises(AssertionError) as exc_info: @@ -154,7 +156,9 @@ def test_duplicate_test_file_without_output_path(self, mock_isfile): model="openai/test-model", api_base="openai/test-api", use_report_coverage_feature_flag=False, - log_db_path="" + log_db_path="", + mutation_testing=False, + more_mutation_logging=False, ) with pytest.raises(AssertionError) as exc_info: