Skip to content

Commit

Permalink
Initial commit of mutation test.
Browse files Browse the repository at this point in the history
Updated log and LLM call.
  • Loading branch information
EmbeddedDevops1 committed Sep 15, 2024
1 parent d894997 commit f0e6d87
Show file tree
Hide file tree
Showing 8 changed files with 264 additions and 9 deletions.
5 changes: 5 additions & 0 deletions cover_agent/CoverAgent.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ def __init__(self, args):
llm_model=args.model,
api_base=args.api_base,
use_report_coverage_feature_flag=args.use_report_coverage_feature_flag,
mutation_testing=args.mutation_testing,
more_mutation_logging=args.more_mutation_logging,
)

def _validate_paths(self):
Expand Down Expand Up @@ -151,6 +153,9 @@ def run(self):
# Run the coverage tool again if the desired coverage hasn't been reached
self.test_gen.run_coverage()

if self.args.mutation_testing:
self.test_gen.run_mutations()

# Log the final coverage
if self.test_gen.current_coverage >= (self.test_gen.desired_coverage / 100):
self.logger.info(
Expand Down
22 changes: 16 additions & 6 deletions cover_agent/PromptBuilder.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ def __init__(
additional_instructions: str = "",
failed_test_runs: str = "",
language: str = "python",
mutation_testing: bool = False,
):
"""
The `PromptBuilder` class is responsible for building a formatted prompt string by replacing placeholders with the actual content of files read during initialization. It takes in various paths and settings as parameters and provides a method to generate the prompt.
Expand Down Expand Up @@ -72,6 +73,7 @@ def __init__(
self.test_file = self._read_file(test_file_path)
self.code_coverage_report = code_coverage_report
self.language = language
self.mutation_testing = mutation_testing
# add line numbers to each line in 'source_file'. start from 1
self.source_file_numbered = "\n".join(
[f"{i + 1} {line}" for i, line in enumerate(self.source_file.split("\n"))]
Expand Down Expand Up @@ -141,12 +143,20 @@ def build_prompt(self) -> dict:
}
environment = Environment(undefined=StrictUndefined)
try:
system_prompt = environment.from_string(
get_settings().test_generation_prompt.system
).render(variables)
user_prompt = environment.from_string(
get_settings().test_generation_prompt.user
).render(variables)
if self.mutation_testing:
system_prompt = environment.from_string(
get_settings().mutation_test_prompt.system
).render(variables)
user_prompt = environment.from_string(
get_settings().mutation_test_prompt.user
).render(variables)
else:
system_prompt = environment.from_string(
get_settings().test_generation_prompt.system
).render(variables)
user_prompt = environment.from_string(
get_settings().test_generation_prompt.user
).render(variables)
except Exception as e:
logging.error(f"Error rendering prompt: {e}")
return {"system": "", "user": ""}
Expand Down
117 changes: 116 additions & 1 deletion cover_agent/UnitTestGenerator.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
import logging
import os
import re
import json

from wandb.sdk.data_types.trace_tree import Trace

from cover_agent.AICaller import AICaller
from cover_agent.CoverageProcessor import CoverageProcessor
Expand All @@ -14,6 +17,10 @@
from cover_agent.settings.config_loader import get_settings
from cover_agent.utils import load_yaml

import subprocess

from shlex import split


class UnitTestGenerator:
def __init__(
Expand All @@ -30,6 +37,8 @@ def __init__(
desired_coverage: int = 90, # Default to 90% coverage if not specified
additional_instructions: str = "",
use_report_coverage_feature_flag: bool = False,
mutation_testing: bool = False,
more_mutation_logging: bool = False,
):
"""
Initialize the UnitTestGenerator class with the provided parameters.
Expand Down Expand Up @@ -65,6 +74,8 @@ def __init__(
self.additional_instructions = additional_instructions
self.language = self.get_code_language(source_file_path)
self.use_report_coverage_feature_flag = use_report_coverage_feature_flag
self.mutation_testing = mutation_testing
self.more_mutation_logging = more_mutation_logging
self.last_coverage_percentages = {}
self.llm_model = llm_model

Expand All @@ -91,6 +102,7 @@ def get_coverage_and_build_prompt(self):
Returns:
None
"""

# Run coverage and build the prompt
self.run_coverage()
self.prompt = self.build_prompt()
Expand Down Expand Up @@ -213,7 +225,7 @@ def run_coverage(self):
"Will default to using the full coverage report. You will need to check coverage manually for each passing test."
)
with open(self.code_coverage_report_path, "r") as f:
self.code_coverage_report = f.read()
self.code_coverage_report = f.read()

@staticmethod
def get_included_files(included_files):
Expand Down Expand Up @@ -761,6 +773,109 @@ def to_dict(self):
def to_json(self):
return json.dumps(self.to_dict())

def run_mutations(self):
self.logger.info("Running mutation tests")

# Run mutation tests

mutation_prompt_builder = PromptBuilder(
source_file_path=self.source_file_path,
test_file_path=self.test_file_path,
code_coverage_report=self.code_coverage_report,
included_files=self.included_files,
additional_instructions=self.additional_instructions,
failed_test_runs=self.failed_test_runs,
language=self.language,
mutation_testing=True
)

mutation_prompt = mutation_prompt_builder.build_prompt()

response, prompt_token_count, response_token_count = (
self.ai_caller.call_model(prompt=mutation_prompt)
)

mutation_dict = load_yaml(response)

for mutation in mutation_dict["mutations"]:
result = self.run_mutation(mutation)

# Prepare the log message with banners
log_message = f"Mutation result (return code: {result.returncode}):\n"
if result.returncode == 0:
log_message += "Mutation survived.\n"
else:
log_message += "Mutation caught.\n"

# Add STDOUT to the log message if it's not empty
if result.stdout.strip() and self.more_mutation_logging:
log_message += "\n" + "="*10 + " STDOUT " + "="*10 + "\n"
log_message += result.stdout

# Add STDERR to the log message if it's not empty
if result.stderr.strip() and self.more_mutation_logging:
log_message += "\n" + "="*10 + " STDERR " + "="*10 + "\n"
log_message += result.stderr


self.logger.info(log_message)


def run_mutation(self, mutation):
mutated_code = mutation.get("mutated_version", None)
line_number = mutation.get("location", None)


# Read the original content
with open(self.source_file_path, "r") as source_file:
original_content = source_file.readlines()

# Determine the indentation level of the line at line_number
indentation = len(original_content[line_number]) - len(original_content[line_number].lstrip())

# Adjust the indentation of the mutated code
adjusted_mutated_code = [
' ' * indentation + line if line.strip() else line
for line in mutated_code.split("\n")
]

# Insert the mutated code at the specified spot
modified_content = (
original_content[:line_number - 1]
+ adjusted_mutated_code
+ original_content[line_number:]
)

# Write the modified content back to the file
with open(self.source_file_path, "w") as source_file:
source_file.writelines(modified_content)
source_file.flush()

# Step 2: Run the test using the Runner class
self.logger.info(
f'Running test with the following command: "{self.test_command}"'
)
stdout, stderr, exit_code, time_of_test_command = Runner.run_command(
command=self.test_command, cwd=self.test_command_dir
)

try:
result = subprocess.run(
split(self.test_command),
text=True,
capture_output=True,
cwd=self.test_command_dir,
timeout=30,
)
except Exception as e:
logging.error(f"Error running test command: {e}")
result = None
finally:
# Write the modified content back to the file
with open(self.source_file_path, "w") as source_file:
source_file.writelines(original_content)
source_file.flush()
return result

def extract_error_message_python(fail_message):
"""
Expand Down
10 changes: 10 additions & 0 deletions cover_agent/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,16 @@ def parse_args():
default="",
help="Path to optional log database. Default: %(default)s.",
)
parser.add_argument(
"--mutation-testing",
action="store_true",
help="Setting this to True enables mutation testing. Default: False.",
)
parser.add_argument(
"--more-mutation-logging",
action="store_true",
help="Setting this to True enables more logging. Default: False.",
)
return parser.parse_args()


Expand Down
1 change: 1 addition & 0 deletions cover_agent/settings/config_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
"language_extensions.toml",
"analyze_suite_test_headers_indentation.toml",
"analyze_suite_test_insert_line.toml",
"mutation_test_prompt.toml",
]


Expand Down
85 changes: 85 additions & 0 deletions cover_agent/settings/mutation_test_prompt.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
[mutation_test_prompt]
system="""\
"""

user="""\
You are an AI mutation testing agent tasked with mutating {{ language }} code to evaluate its robustness.
Mutation Strategy:
1. Logic Tweaks:
Modify conditions (e.g., 'if (a < b)' to 'if (a <= b)')
Adjust loop boundaries
Introduce minor calculation errors
Avoid drastic changes or infinite loops.
2. Output Modifications:
Change return types or formats
Alter response structures
Return corrupted or incorrect data
3. Method Interference:
Alter function parameters
Replace or omit key method calls
4. Failure Injection:
Introduce exceptions or error states
Simulate system or resource failures
5.Data Handling Faults:
Inject parsing errors
Bypass data validation
Corrupt object states
6. Boundary Condition Testing:
Use out-of-bounds indices
Test extreme or edge-case parameters
7. Concurrency Issues:
Simulate race conditions or deadlocks
Introduce timeouts or delays
8. Security Vulnerabilities:
Replicate common vulnerabilities (e.g., buffer overflow, SQL injection, XSS)
Introduce authentication or authorization bypasses
Focus on subtle, realistic mutations that challenge the code's resilience while keeping core functionality intact. Prioritize scenarios likely to arise from programming errors or edge cases.
## Source Code to add Mutations to: {{ source_file_name }}
```{{language}}
{{ source_file_numbered }}
```
## Task
1. Conduct a line-by-line analysis of the source code.
2. Generate mutations for each test case.
3. Prioritize mutating function blocks and critical code sections.
4. Ensure the mutations offer meaningful insights into code quality and test coverage.
5. Present the output in order of ascending line numbers.
6. Avoid including manually inserted line numbers in the response.
7. Limit mutations to single-line changes only.
Example output:
```yaml
file: {{source_file}}
mutations:
- method: <function name>
category: <mutation type>
summary: <brief mutation description>
location: <line number>
original: |
<original code>
mutated_version: |
<mutated code with {{language}} comment explaining the change>
```
Use block scalar('|') to format each YAML output.
Response (should be a valid YAML, and nothing else):
```yaml
Generate mutants that test the code’s resilience while preserving core functionality. Output only in YAML format, with no additional explanations or comments.
"""
25 changes: 25 additions & 0 deletions templated_tests/python_fastapi/test_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from app import app
from datetime import date

import math
client = TestClient(app)

def test_root():
Expand All @@ -13,3 +14,27 @@ def test_root():
assert response.status_code == 200
assert response.json() == {"message": "Welcome to the FastAPI application!"}


def test_sqrt_negative_number():
response = client.get("/sqrt/-4")
assert response.status_code == 400
assert response.json() == {"detail": "Cannot take square root of a negative number"}


def test_divide_by_zero():
response = client.get("/divide/10/0")
assert response.status_code == 400
assert response.json() == {"detail": "Cannot divide by zero"}


def test_add():
response = client.get("/add/3/5")
assert response.status_code == 200
assert response.json() == {"result": 8}


def test_current_date():
response = client.get("/current-date")
assert response.status_code == 200
assert response.json() == {"date": date.today().isoformat()}

8 changes: 6 additions & 2 deletions tests/test_CoverAgent.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,9 @@ def test_duplicate_test_file_with_output_path(self, mock_isfile, mock_copy):
model="openai/test-model",
api_base="openai/test-api",
use_report_coverage_feature_flag=False,
log_db_path=""
log_db_path="",
mutation_testing=False,
more_mutation_logging=False,
)

with pytest.raises(AssertionError) as exc_info:
Expand Down Expand Up @@ -154,7 +156,9 @@ def test_duplicate_test_file_without_output_path(self, mock_isfile):
model="openai/test-model",
api_base="openai/test-api",
use_report_coverage_feature_flag=False,
log_db_path=""
log_db_path="",
mutation_testing=False,
more_mutation_logging=False,
)

with pytest.raises(AssertionError) as exc_info:
Expand Down

0 comments on commit f0e6d87

Please sign in to comment.