Skip to content

Commit

Permalink
docstrings, not implimented, black and precommit
Browse files Browse the repository at this point in the history
  • Loading branch information
Merlinvt committed Dec 13, 2024
1 parent 477bffe commit 7b59e0e
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 18 deletions.
101 changes: 95 additions & 6 deletions autogen/agentchat/contrib/magentic_one/orchestrator_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from operator import le
from typing import Any, Callable, Dict, List, Literal, Optional, Union

from autogen.agentchat import Agent, ChatResult, ConversableAgent, UserProxyAgent
from autogen.agentchat import Agent, ChatResult, ConversableAgent
from autogen.logger import FileLogger

from .orchestrator_prompts import (
Expand All @@ -29,6 +29,46 @@


class OrchestratorAgent(ConversableAgent):
"""OrchestratorAgent is a the lead agent of magentic onethat coordinates and directs a team of specialized agents to solve complex tasks.
The OrchestratorAgent serves as a central coordinator, managing a team of specialized agents
with distinct capabilities. It orchestrates task execution through a sophisticated process of:
- Initial task planning and fact-gathering
- Dynamic agent selection and instruction
- Continuous progress tracking
- Adaptive replanning to recover from errors or stalls
Key Capabilities:
- Directs agents specialized in web browsing, file navigation, Python code execution, and more
- Dynamically generates and updates task plans
- Monitors agent interactions and task progress
- Implements intelligent recovery mechanisms when agents encounter challenges
Core Responsibilities:
1. Break down complex tasks into manageable subtasks
2. Assign and direct specialized agents based on their capabilities
3. Track and validate progress towards task completion
4. Detect and recover from execution stalls or loops
5. Provide a final synthesized answer or summary
Attributes:
_agents (List[ConversableAgent]): Specialized agents available for task execution.
_max_rounds (int): Maximum number of interaction rounds.
_max_stalls_before_replan (int): Threshold for detecting task progression issues.
_max_replans (int): Maximum number of task replanning attempts.
_return_final_answer (bool): Whether to generate a comprehensive task summary.
Args:
name (str): Name of the orchestrator agent.
agents (Optional[List[ConversableAgent]]): Specialized agents to coordinate.
max_rounds (int, optional): Maximum execution rounds. Defaults to 20.
max_stalls_before_replan (int, optional): Stall threshold before replanning. Defaults to 3.
max_replans (int, optional): Maximum replanning attempts. Defaults to 3.
return_final_answer (bool, optional): Generate a final summary. Defaults to False.
Additional arguments inherited from ConversableAgent.
"""

DEFAULT_SYSTEM_MESSAGES = [{"role": "system", "content": ORCHESTRATOR_SYSTEM_MESSAGE}]

def __init__(
Expand Down Expand Up @@ -103,7 +143,16 @@ def __init__(
self._plan = ""

def broadcast_message(self, message: Dict[str, Any], sender: Optional[ConversableAgent] = None) -> None:
"""Broadcast a message to all agents except the sender."""
"""Broadcast a message to all registered agents, excluding the optional sender.
This method sends the provided message to all agents in the orchestrator's agent list,
with an option to exclude the sender from receiving the message.
Args:
message (Dict[str, Any]): The message to be broadcast to all agents.
sender (Optional[ConversableAgent], optional): The agent to exclude from receiving the message.
Defaults to None.
"""
for agent in self._agents:
if agent != sender:
self.send(message, agent)
Expand Down Expand Up @@ -197,8 +246,7 @@ def _update_facts_and_plan(self) -> None:

self._plan = response

def update_ledger(self) -> Dict[str, Any]:
# updates the ledger at each turn
def _update_ledger(self) -> Dict[str, Any]:
max_json_retries = 10

team_description = self._get_team_description()
Expand Down Expand Up @@ -335,7 +383,7 @@ def _select_next_agent(self, task: dict | str) -> Optional[ConversableAgent]:
return self._select_next_agent(synthesized_prompt)

# Orchestrate the next step
ledger_dict = self.update_ledger()
ledger_dict = self._update_ledger()
logger.log_event(
source=self.name,
name="thought",
Expand Down Expand Up @@ -448,7 +496,42 @@ async def a_generate_reply(
sender: Optional["Agent"] = None,
**kwargs: Any,
) -> Union[str, Dict, None]:
"""Start the orchestration process with an initial message/task."""
"""Asynchronously generate a reply by orchestrating multi-agent task execution.
This method manages the entire lifecycle of a multi-agent task, including:
- Initializing the task
- Selecting and coordinating agents
- Managing task progress and replanning
- Handling task completion or termination
The method follows a sophisticated orchestration process:
1. Reset the agent states and chat histories
2. Agent based on the differnt sub task
3. Iteratively generate responses from agents
4. Broadcast responses and instructions
5. Track and manage task progress
6. Handle replanning and stall detection
7. Terminate when max rounds are reached or task is complete
Args:
messages (Optional[List[Dict[str, Any]]], optional):
Existing message history. If None, prompts for human input.
sender (Optional[Agent], optional):
The sender of the initial message. Defaults to None.
**kwargs:
Additional keyword arguments for future extensibility.
Returns:
Union[str, Dict, None]:
The final content of the last message in the conversation,
which could be a task result, summary, or None if task failed.
Raises:
Various potential exceptions during agent communication and task execution.
Notes:
- Tracks task state through `_current_round`, `_replan_counter`, and `_stall_counter`
"""
# Reset state
self._current_round = 0
self._oai_messages.clear()
Expand Down Expand Up @@ -523,3 +606,9 @@ async def a_generate_reply(

# Return chat result with all relevant info
return self._oai_messages[self][-1]["content"]

async def a_initiate_chats(self, chat_queue: List[Dict[str, Any]]) -> Dict[int, ChatResult]:
raise NotImplementedError

def initiate_chats(self, chat_queue: List[Dict[str, Any]]) -> List[ChatResult]:
raise NotImplementedError
26 changes: 14 additions & 12 deletions autogen/agentchat/contrib/magentic_one/utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from typing import Literal, Dict, Any
import json
import re
from typing import Any, Dict, Literal


def clean_and_parse_json(content: str) -> Dict[str, Any]:
Expand All @@ -17,10 +17,10 @@ def clean_and_parse_json(content: str) -> Dict[str, Any]:
parts = content.split("```")
if len(parts) > 1:
content = parts[1].strip() # Take first code block content

# Find JSON-like structure if not in code block
if not content.strip().startswith('{'):
json_match = re.search(r'\{[\s\S]*\}', content)
if not content.strip().startswith("{"):
json_match = re.search(r"\{[\s\S]*\}", content)
if not json_match:
raise ValueError(
f"Could not find valid JSON structure in content. "
Expand All @@ -31,21 +31,23 @@ def clean_and_parse_json(content: str) -> Dict[str, Any]:

# Preserve newlines for readability in error messages
formatted_content = content

# Now clean for parsing
try:
# First try parsing the cleaned but formatted content
return json.loads(formatted_content)
except json.JSONDecodeError:
# If that fails, try more aggressive cleaning
cleaned_content = re.sub(r'[\n\r\t]', ' ', content) # Replace newlines/tabs with spaces
cleaned_content = re.sub(r'\s+', ' ', cleaned_content) # Normalize whitespace
cleaned_content = re.sub(r'\\(?!["\\/bfnrt])', '', cleaned_content) # Remove invalid escapes
cleaned_content = re.sub(r',(\s*[}\]])', r'\1', cleaned_content) # Remove trailing commas
cleaned_content = re.sub(r'([{,]\s*)(\w+)(?=\s*:)', r'\1"\2"', cleaned_content) # Quote unquoted keys
cleaned_content = re.sub(r"[\n\r\t]", " ", content) # Replace newlines/tabs with spaces
cleaned_content = re.sub(r"\s+", " ", cleaned_content) # Normalize whitespace
cleaned_content = re.sub(r'\\(?!["\\/bfnrt])', "", cleaned_content) # Remove invalid escapes
cleaned_content = re.sub(r",(\s*[}\]])", r"\1", cleaned_content) # Remove trailing commas
cleaned_content = re.sub(r"([{,]\s*)(\w+)(?=\s*:)", r'\1"\2"', cleaned_content) # Quote unquoted keys
cleaned_content = cleaned_content.replace("'", '"') # Standardize quotes

try:
return json.loads(cleaned_content)
except json.JSONDecodeError as e:
raise ValueError(f"Failed to parse JSON after cleaning. Error: {str(e)} Original content: {formatted_content}")
raise ValueError(
f"Failed to parse JSON after cleaning. Error: {str(e)} Original content: {formatted_content}"
)

0 comments on commit 7b59e0e

Please sign in to comment.