Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/botirk38/SoloAgent into HEAD
Browse files Browse the repository at this point in the history
  • Loading branch information
yas789 committed Jul 4, 2024
2 parents 81d9156 + cfffa9f commit 2a0b2e8
Show file tree
Hide file tree
Showing 17 changed files with 321 additions and 276 deletions.
4 changes: 4 additions & 0 deletions autogpts/SoloAgent/forge/actions/code_gen/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@

from .models import Code, TestCase

__all__ = ["Code", "TestCase"]
243 changes: 109 additions & 134 deletions autogpts/SoloAgent/forge/actions/code_gen/code_gen.py
Original file line number Diff line number Diff line change
@@ -1,36 +1,16 @@

from __future__ import annotations
from typing import Dict
from ..registry import action
from forge.sdk import ForgeLogger, PromptEngine
from forge.sdk import ForgeLogger, PromptEngine, Agent, LocalWorkspace
from forge.llm import chat_completion_request
import os
from forge.sdk import Agent, LocalWorkspace
import re
import subprocess
import json
import test_code

LOG = ForgeLogger(__name__)
CodeType = Dict[str, str]
TestCaseType = Dict[str, str]


ARGO_TOML_CONTENT = """
[package]
name = "my_anchor_program"
version = "0.1.0"
edition = "2018"

[dependencies]
anchor-lang = "0.30.1"
"""

ANCHOR_TOML_CONTENT = """
[programs.localnet]
my_anchor_program = "4d3d5ab7f6b5e4b2b7d1f5d6e4b7d1f5d6e4b7d1"
"""
ERROR_INFO = ""
CodeType = Dict[str, str]
TestCaseType = Dict[str, str]


@action(
Expand Down Expand Up @@ -77,31 +57,29 @@ async def test_code(agent: Agent, task_id: str, project_path: str) -> str:
output_type="str",
)
async def generate_solana_code(agent: Agent, task_id: str, specification: str) -> str:
global ERROR_INFO
test_code.cargo_test_agbenchmark_config()


prompt_engine = PromptEngine("gpt-4o")
lib_prompt = prompt_engine.load_prompt("anchor-lib", specification=specification, error_info=ERROR_INFO)
instructions_prompt = prompt_engine.load_prompt("anchor-instructions", specification=specification, error_info=ERROR_INFO)
errors_prompt = prompt_engine.load_prompt("anchor-errors", specification=specification, error_info=ERROR_INFO)
cargo_toml_prompt = prompt_engine.load_prompt("anchor-cargo-toml", specification=specification, error_info=ERROR_INFO)
anchor_toml_prompt = prompt_engine.load_prompt("anchor-anchor-toml", specification=specification, error_info=ERROR_INFO)

lib_prompt = prompt_engine.load_prompt(
"anchor-lib", specification=specification)
instructions_prompt = prompt_engine.load_prompt(
"anchor-instructions", specification=specification)
errors_prompt = prompt_engine.load_prompt(
"anchor-errors", specification=specification)
cargo_toml_prompt = prompt_engine.load_prompt(
"anchor-cargo-toml", specification=specification)
anchor_toml_prompt = prompt_engine.load_prompt(
"anchor-anchor-toml", specification=specification)

messages = [
{"role": "system", "content": "You are a code generation assistant specialized in Anchor for Solana."},
{"role": "user", "content": lib_prompt},
{"role": "user", "content": instructions_prompt},
{"role": "user", "content": errors_prompt},
{"role": "user", "content": cargo_toml_prompt},
{"role": "user", "content": anchor_toml_prompt},
{"role": "user", "content": "Return the whole code as a string with the file markers intact that you received in each of the input without changing their wording at all and use // becore comments."},

{"role": "user", "content": "Return the whole code as a string with the file markers intact that you received in each of the input without changing their wording at all."}
]




chat_completion_kwargs = {
"messages": messages,
"model": "gpt-3.5-turbo",
Expand All @@ -123,63 +101,42 @@ async def generate_solana_code(agent: Agent, task_id: str, specification: str) -
project_path = os.path.join(base_path, task_id)
LOG.info(f"Base path: {base_path}")
LOG.info(f"Project path: {project_path}")
cargo_toml_content = """
[package]
name = "my_anchor_program"
version = "0.1.0"
edition = "2018"
[dependencies]
anchor-lang = "0.30.1"
"""

LOG.info(f"id: {task_id}")
LOG.info(f"Parts: {response_content}")

file_actions = [
('Cargo.toml', ARGO_TOML_CONTENT),
('Anchor.toml', parts['Anchor.toml']),
('src/errors.rs', parts['errors.rs']),
('src/instructions.rs', parts['anchor-instructions.rs']),
('src/lib.rs', parts['anchor-lib.rs']),
]

for file_path, file_content in file_actions:
full_file_path = os.path.join(project_path, file_path)

if os.path.exists(full_file_path):
print(f"{file_path} already exists. Skipping regeneration.")
else:
print(f"Generating {file_path}. Press 'y' to continue...")
if input().strip().lower() != 'y':
return f"Generation halted by user at {file_path}."

await agent.abilities.run_action(task_id, "write_file", file_path=full_file_path, data=file_content.encode())
print(f"{file_path} generated successfully.")

# Compile the generated file
compile_result = await compile_file(agent, task_id, project_path, file_path)
if "error" in compile_result.lower():
LOG.error(f"Compilation failed for {file_path}: {compile_result}")
print(f"Compilation failed for {file_path}, regenerating...")

# Update ERROR_INFO with the compilation error
ERROR_INFO = compile_result

# Regenerate only the faulty file
return await generate_solana_code(agent, task_id, specification)

await agent.abilities.run_action(
task_id, "write_file", file_path=os.path.join(project_path, 'src', 'lib.rs'), data=parts['anchor-lib.rs'].encode()
)
await agent.abilities.run_action(
task_id, "write_file", file_path=os.path.join(project_path, 'src', 'instructions.rs'), data=parts['anchor-instructions.rs'].encode()
)
await agent.abilities.run_action(
task_id, "write_file", file_path=os.path.join(project_path, 'src', 'errors.rs'), data=parts['errors.rs'].encode()
)
await agent.abilities.run_action(
task_id, "write_file", file_path=os.path.join(project_path, 'Cargo.toml'), data=cargo_toml_content.encode()
)
await agent.abilities.run_action(
task_id, "write_file", file_path=os.path.join(project_path, 'Anchor.toml'), data=parts['Anchor.toml'].encode()
)
test_result = await agent.abilities.run_action(task_id, "test_code", project_path=project_path)
if "All tests passed" not in test_result:
# Regenerate the code based on errors
LOG.info(f"Regenerating code due to errors: {test_result}")
ERROR_INFO = test_result # Update ERROR_INFO with the test error
return await generate_solana_code(agent, task_id, specification)

return "Solana on-chain code generated, tested, and verified successfully."


async def compile_file(agent: Agent, task_id: str, project_path: str, file_path: str) -> str:
try:
result = subprocess.run(['cargo', 'check', '--release'], cwd=project_path, capture_output=True, text=True)
if result.returncode != 0:
return result.stderr
return "Compilation successful."
except Exception as e:
return f"Compilation failed: {e}"



@action(
name="generate_frontend_code",
description="Generate frontend code based on the provided specification",
Expand Down Expand Up @@ -260,83 +217,91 @@ async def generate_frontend_code(agent, task_id: str, specification: str) -> str
"required": True
}
],
output_type="str",
output_type="TestCase object",
)
async def generate_test_cases(agent: Agent, task_id: str, code_dict: CodeType) -> str:
prompt_engine = PromptEngine("gpt-3.5-turbo")
test_struct_prompt = prompt_engine.load_prompt("test-case-struct-return")
async def generate_test_cases(agent: Agent, task_id: str, code_dict: Dict[str, str]) -> TestCase:
try:
prompt_engine = PromptEngine("gpt-3.5-turbo")
messages = [
{"role": "system", "content": "You are a code generation assistant specialized in generating test cases."}]

messages = [
{"role": "system", "content": "You are a code generation assistant specialized in generating test cases."},
]
test_prompt_template, test_struct_template, folder_name = determine_templates(
next(iter(code_dict)))
if not test_prompt_template:
return "Unsupported file type."

code = Code(code_dict)
for file_name, code_content in code.items():
LOG.info(f"File Name: {file_name}")
LOG.info(f"Code: {code_content}")
test_prompt = prompt_engine.load_prompt(
test_prompt_template, file_name=file_name, code=code_content)
messages.append({"role": "user", "content": test_prompt})

test_struct_prompt = prompt_engine.load_prompt(test_struct_template)
messages.append({"role": "user", "content": test_struct_prompt})

response_content = await get_chat_response(messages)
LOG.info(f"Response content: {response_content}")

project_path = get_project_path(agent, task_id, folder_name)
os.makedirs(project_path, exist_ok=True)

for file_name, code in code_dict.items():
LOG.info(f"File Name: {file_name}")
LOG.info(f"Code: {code}")
test_prompt = prompt_engine.load_prompt(
"test-case-generation", file_name=file_name, code=code)
messages.append({"role": "user", "content": test_prompt})
test_cases = parse_test_cases_response(response_content)
await write_test_cases(agent, task_id, project_path, test_cases)

return test_cases

except Exception as e:
LOG.error(f"Error generating test cases: {e}")
return "Failed to generate test cases due to an error."

messages.append({"role": "user", "content": test_struct_prompt})

def determine_templates(first_file_name: str):
if first_file_name.endswith(('.js', '.ts')):
return "test-case-generation-frontend", "test-case-struct-return-frontend", 'frontend/tests'
elif first_file_name.endswith('.rs'):
return "test-case-generation", "test-case-struct-return", 'rust/tests'
else:
LOG.error(f"Unsupported file type for: {first_file_name}")
return None, None, None


async def get_chat_response(messages: list) -> str:
chat_completion_kwargs = {
"messages": messages,
"model": "gpt-3.5-turbo",
}

chat_response = await chat_completion_request(**chat_completion_kwargs)
response_content = chat_response["choices"][0]["message"]["content"]
return chat_response["choices"][0]["message"]["content"]

LOG.info(f"Response content: {response_content}")

def get_project_path(agent: Agent, task_id: str, folder_name: str) -> str:
base_path = agent.workspace.base_path if isinstance(
agent.workspace, LocalWorkspace) else str(agent.workspace.base_path)
project_path = os.path.join(base_path, task_id)
return os.path.join(base_path, task_id, folder_name)

try:
test_cases = parse_test_cases_response(response_content)
except Exception as e:
LOG.error(f"Error parsing test cases response: {e}")
return "Failed to generate test cases due to response parsing error."

async def write_test_cases(agent: Agent, task_id: str, project_path: str, test_cases: TestCase):
for file_name, test_case in test_cases.items():
test_file_path = os.path.join(project_path, 'tests', file_name)
await agent.abilities.run_action(
task_id, "write_file", file_path=test_file_path, data=test_case.encode()
)
test_file_path = os.path.join(project_path, file_name)
await agent.abilities.run_action(task_id, "write_file", file_path=test_file_path, data=test_case.encode())

return "Test cases generated and written to respective files."


def sanitize_json_string(json_string: str) -> str:
# Replace newlines and tabs with escaped versions
sanitized_string = json_string.replace(
'\n', '\\n').replace('\t', '\\t').replace(' ', '\\t')
return sanitized_string


def parse_test_cases_response(response_content: str) -> TestCaseType:
def parse_test_cases_response(response_content: str) -> TestCase:
try:
# Extract JSON part from response content
json_start = response_content.index('{')
json_end = response_content.rindex('}') + 1
json_content = response_content[json_start:json_end]

# Sanitize JSON content
sanitized_content = sanitize_json_string(json_content)

# Load JSON content
response_dict = json.loads(sanitized_content)
LOG.info(f"JSON Content: {json_content}")

response_dict = json.loads(json_content)
file_name = response_dict["file_name"]
test_file = response_dict["test_file"]

# Unescape newlines and tabs in test_file
test_file = test_file.replace('\\n', '\n').replace(
'\\t', '\t').strip().strip('"')
test_file = response_dict["test_file"].replace(
'\\n', '\n').replace('\\t', '\t').strip().strip('"')

test_cases = {file_name: test_file}
return test_cases
return TestCase({file_name: test_file})
except (json.JSONDecodeError, ValueError) as e:
LOG.error(f"Error decoding JSON response: {e}")
raise
Expand Down Expand Up @@ -373,4 +338,14 @@ def parse_response_content(response_content: str) -> dict:
return parts



def parse_test_cases_response(response_content: str) -> TestCaseType:
# Correctly parse the JSON response content by escaping control characters
try:
response_dict = json.loads(response_content)
file_name = response_dict["file_name"]
test_file = response_dict["test_file"]
test_cases = {file_name: test_file}
return test_cases
except json.JSONDecodeError as e:
LOG.error(f"Error decoding JSON response: {e}")
raise
31 changes: 31 additions & 0 deletions autogpts/SoloAgent/forge/actions/code_gen/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@

from typing import Dict
from dataclasses import dataclass, field


@dataclass
class Code:
code_files: Dict[str, str] = field(default_factory=dict)

def __getitem__(self, item: str) -> str:
return self.code_files[item]

def __setitem__(self, key: str, value: str) -> None:
self.code_files[key] = value

def items(self):
return self.code_files.items()


@dataclass
class TestCase:
test_cases: Dict[str, str] = field(default_factory=dict)

def __getitem__(self, item: str) -> str:
return self.test_cases[item]

def __setitem__(self, key: str, value: str) -> None:
self.test_cases[key] = value

def items(self):
return self.test_cases.items()
Loading

0 comments on commit 2a0b2e8

Please sign in to comment.