From a6415364d6e761bdc77b5b14d60071ed951bdc6e Mon Sep 17 00:00:00 2001 From: Kye Gomez Date: Mon, 23 Dec 2024 10:41:29 -0800 Subject: [PATCH] [GROUPCHAT] --- docs/mkdocs.yml | 1 + docs/swarms/structs/group_chat.md | 468 +++++++++------ docs/swarms/structs/swarm_rearrange.md | 334 +++++++++++ example.py | 8 +- graph_swarm_example.py | 6 +- new_features_examples/swarm_arange_demo.py | 21 - pyproject.toml | 2 +- swarm_arange_demo.py | 216 +++++++ swarms/structs/__init__.py | 12 +- swarms/structs/groupchat.py | 646 ++++++++------------- swarms/structs/groupchat_new.py | 243 -------- swarms/structs/multi_agent_exec.py | 12 +- swarms/structs/sequential_workflow.py | 3 +- swarms/structs/swarm_builder.py | 297 ++++++++-- swarms/structs/swarm_matcher.py | 11 +- swarms/structs/swarm_router.py | 47 +- tests/structs/test_groupchat.py | 319 ++++------ 17 files changed, 1479 insertions(+), 1167 deletions(-) create mode 100644 docs/swarms/structs/swarm_rearrange.md delete mode 100644 new_features_examples/swarm_arange_demo.py create mode 100644 swarm_arange_demo.py delete mode 100644 swarms/structs/groupchat_new.py diff --git a/docs/mkdocs.yml b/docs/mkdocs.yml index c57ef3066..59b762219 100644 --- a/docs/mkdocs.yml +++ b/docs/mkdocs.yml @@ -184,6 +184,7 @@ nav: - ForestSwarm: "swarms/structs/forest_swarm.md" - SwarmRouter: "swarms/structs/swarm_router.md" - TaskQueueSwarm: "swarms/structs/taskqueue_swarm.md" + - SwarmRearrange: "swarms/structs/swarm_rearrange.md" - Various Execution Methods: "swarms/structs/various_execution_methods.md" - Workflows: - ConcurrentWorkflow: "swarms/structs/concurrentworkflow.md" diff --git a/docs/swarms/structs/group_chat.md b/docs/swarms/structs/group_chat.md index 712549531..84919e310 100644 --- a/docs/swarms/structs/group_chat.md +++ b/docs/swarms/structs/group_chat.md @@ -1,231 +1,341 @@ -# GroupChat Class Documentation - - -The GroupChat class manages multi-agent conversations with state persistence, comprehensive logging, and flexible agent configurations. It supports both Agent class instances and callable functions, making it versatile for different use cases. +# GroupChat Swarm Documentation + +A production-grade multi-agent system enabling sophisticated group conversations between AI agents with customizable speaking patterns, parallel processing capabilities, and comprehensive conversation tracking. + +## Advanced Configuration + +### Agent Parameters + +| Parameter | Type | Default | Description | +|-----------|------|---------|-------------| +| agent_name | str | Required | Unique identifier for the agent | +| system_prompt | str | Required | Role and behavior instructions | +| llm | Any | Required | Language model instance | +| max_loops | int | 1 | Maximum conversation turns | +| autosave | bool | False | Enable conversation saving | +| dashboard | bool | False | Enable monitoring dashboard | +| verbose | bool | True | Enable detailed logging | +| dynamic_temperature | bool | True | Enable dynamic temperature | +| retry_attempts | int | 1 | Failed request retry count | +| context_length | int | 200000 | Maximum context window | +| output_type | str | "string" | Response format type | +| streaming_on | bool | False | Enable streaming responses | + +### GroupChat Parameters + +| Parameter | Type | Default | Description | +|-----------|------|---------|-------------| +| name | str | "GroupChat" | Chat group identifier | +| description | str | "" | Purpose description | +| agents | List[Agent] | [] | Participating agents | +| speaker_fn | Callable | round_robin | Speaker selection function | +| max_turns | int | 10 | Maximum conversation turns | + + +## Table of Contents + +- [Installation](#installation) +- [Core Concepts](#core-concepts) +- [Basic Usage](#basic-usage) +- [Advanced Configuration](#advanced-configuration) +- [Speaker Functions](#speaker-functions) +- [Response Models](#response-models) +- [Advanced Examples](#advanced-examples) +- [API Reference](#api-reference) +- [Best Practices](#best-practices) ## Installation -```bash -pip install swarms python-dotenv pydantic -``` - - -## Attributes - -| Attribute | Type | Description | -|-----------|------|-------------| -| state_path | str | Path for saving/loading chat state | -| wrapped_agents | List[AgentWrapper] | List of wrapped agent instances | -| selector_agent | AgentWrapper | Agent responsible for speaker selection | -| state | GroupChatState | Current state of the group chat | - -## Methods - -### Core Methods - -```python -def run(self, task: str) -> str: - """Execute the group chat conversation""" - -def save_state(self) -> None: - """Save current state to disk""" - -@classmethod -def load_state(cls, state_path: str) -> 'GroupChat': - """Load GroupChat from saved state""" -def get_conversation_summary(self) -> Dict[str, Any]: - """Return a summary of the conversation""" - -def export_conversation(self, format: str = "json") -> Union[str, Dict]: - """Export the conversation in specified format""" +```bash +pip3 install swarms swarm-models loguru ``` -### Internal Methods - -```python -def _log_interaction(self, agent_name: str, position: int, input_text: str, output_text: str) -> None: - """Log a single interaction""" +## Core Concepts -def _add_message(self, role: str, content: str) -> None: - """Add a message to the conversation history""" +The GroupChat system consists of several key components: -def select_next_speaker(self, last_speaker: AgentWrapper) -> AgentWrapper: - """Select the next speaker using the selector agent""" -``` +1. **Agents**: Individual AI agents with specialized knowledge and roles +2. **Speaker Functions**: Control mechanisms for conversation flow +3. **Chat History**: Structured conversation tracking +4. **Response Models**: Pydantic models for data validation -## Usage Examples +## Basic Usage -### 1. Basic Setup with Two Agents ```python import os -from swarms import Agent +from dotenv import load_dotenv from swarm_models import OpenAIChat +from swarms import Agent, GroupChat +from loguru import logger -# Initialize OpenAI +# Load environment variables +load_dotenv() api_key = os.getenv("OPENAI_API_KEY") -model = OpenAIChat(openai_api_key=api_key, model_name="gpt-4-mini") -# Create agents -analyst = Agent( - agent_name="Financial-Analyst", - system_prompt="You are a financial analyst...", - llm=model +# Initialize LLM +model = OpenAIChat( + openai_api_key=api_key, + model_name="gpt-4o-mini", + temperature=0.1 ) -advisor = Agent( - agent_name="Investment-Advisor", - system_prompt="You are an investment advisor...", - llm=model +# Create financial analyst agent +financial_analyst = Agent( + agent_name="Financial-Analysis-Agent", + system_prompt="You are a financial analyst specializing in investment strategies.", + llm=model, + max_loops=1, + autosave=False, + dashboard=False, + verbose=True, + dynamic_temperature_enabled=True, + retry_attempts=1, + context_length=200000, + output_type="string" +) + +# Create tax advisor agent +tax_advisor = Agent( + agent_name="Tax-Adviser-Agent", + system_prompt="You are a tax adviser providing clear tax guidance.", + llm=model, + max_loops=1, + autosave=False, + dashboard=False, + verbose=True, + dynamic_temperature_enabled=True, + retry_attempts=1, + context_length=200000, + output_type="string" ) -# Create group chat +# Initialize group chat chat = GroupChat( - name="Investment Team", - agents=[analyst, advisor], - max_rounds=5, - group_objective="Provide investment advice" + name="Investment Advisory", + description="Financial and tax analysis group", + agents=[financial_analyst, tax_advisor], + speaker_fn=expertise_based ) -response = chat.run("What's the best investment strategy for retirement?") +# Run conversation +history = chat.run("How to optimize tax strategy for investments?") ``` -### 2. Advanced Setup with State Management -```python -# Create group chat with state persistence -chat = GroupChat( - name="Investment Advisory Team", - description="Expert team for financial planning", - agents=[analyst, advisor, tax_specialist], - max_rounds=10, - admin_name="Senior Advisor", - group_objective="Provide comprehensive financial planning", - state_path="investment_chat_state.json", - rules="1. Always provide sources\n2. Be concise\n3. Focus on practical advice" -) +## Speaker Functions -# Run chat and save state -response = chat.run("Create a retirement plan for a 35-year old") -chat.save_state() -# Load existing chat state -loaded_chat = GroupChat.load_state("investment_chat_state.json") -``` +### Built-in Functions -### 3. Using Custom Callable Agents ```python -def custom_agent(input_text: str) -> str: - # Custom logic here - return f"Processed: {input_text}" +def round_robin(history: List[str], agent: Agent) -> bool: + """ + Enables agents to speak in turns. + Returns True for each agent in sequence. + """ + return True + +def expertise_based(history: List[str], agent: Agent) -> bool: + """ + Enables agents to speak based on their expertise. + Returns True if agent's role matches conversation context. + """ + return agent.system_prompt.lower() in history[-1].lower() if history else True + +def random_selection(history: List[str], agent: Agent) -> bool: + """ + Randomly selects speaking agents. + Returns True/False with 50% probability. + """ + import random + return random.choice([True, False]) + +def most_recent(history: List[str], agent: Agent) -> bool: + """ + Enables agents to respond to their mentions. + Returns True if agent was last speaker. + """ + return agent.agent_name == history[-1].split(":")[0].strip() if history else True +``` -# Mix of regular agents and callable functions +### Custom Speaker Function Example + +```python +def custom_speaker(history: List[str], agent: Agent) -> bool: + """ + Custom speaker function with complex logic. + + Args: + history: Previous conversation messages + agent: Current agent being evaluated + + Returns: + bool: Whether agent should speak + """ + # No history - let everyone speak + if not history: + return True + + last_message = history[-1].lower() + + # Check for agent expertise keywords + expertise_relevant = any( + keyword in last_message + for keyword in agent.expertise_keywords + ) + + # Check for direct mentions + mentioned = agent.agent_name.lower() in last_message + + # Check if agent hasn't spoken recently + not_recent_speaker = not any( + agent.agent_name in msg + for msg in history[-3:] + ) + + return expertise_relevant or mentioned or not_recent_speaker + +# Usage chat = GroupChat( - name="Hybrid Team", - agents=[analyst, custom_agent], - max_rounds=3 + agents=[agent1, agent2], + speaker_fn=custom_speaker ) ``` -### 4. Export and Analysis -```python -# Run chat -chat.run("Analyze market conditions") +## Response Models -# Get summary -summary = chat.get_conversation_summary() -print(summary) +### Complete Schema -# Export in different formats -json_conv = chat.export_conversation(format="json") -text_conv = chat.export_conversation(format="text") +```python +class AgentResponse(BaseModel): + """Individual agent response in a conversation turn""" + agent_name: str + role: str + message: str + timestamp: datetime = Field(default_factory=datetime.now) + turn_number: int + preceding_context: List[str] = Field(default_factory=list) + +class ChatTurn(BaseModel): + """Single turn in the conversation""" + turn_number: int + responses: List[AgentResponse] + task: str + timestamp: datetime = Field(default_factory=datetime.now) + +class ChatHistory(BaseModel): + """Complete conversation history""" + turns: List[ChatTurn] + total_messages: int + name: str + description: str + start_time: datetime = Field(default_factory=datetime.now) ``` -### 5. Advanced Configuration with Custom Selector -```python -class CustomSelector(Agent): - def run(self, input_text: str) -> str: - # Custom selection logic - return "Financial-Analyst" +## Advanced Examples -chat = GroupChat( - name="Custom Selection Team", - agents=[analyst, advisor], - selector_agent=CustomSelector( - agent_name="Custom-Selector", - system_prompt="Select the next speaker based on expertise", - llm=model - ), - max_rounds=5 -) -``` +### Multi-Agent Analysis Team -### 6. Debugging Setup ```python -import logging +# Create specialized agents +data_analyst = Agent( + agent_name="Data-Analyst", + system_prompt="You analyze numerical data and patterns", + llm=model +) -# Configure logging -logging.basicConfig(level=logging.DEBUG) +market_expert = Agent( + agent_name="Market-Expert", + system_prompt="You provide market insights and trends", + llm=model +) -chat = GroupChat( - name="Debug Team", - agents=[analyst, advisor], - max_rounds=3, - state_path="debug_chat.json" +strategy_advisor = Agent( + agent_name="Strategy-Advisor", + system_prompt="You formulate strategic recommendations", + llm=model ) -# Run with detailed logging -try: - response = chat.run("Complex query") -except Exception as e: - logger.error(f"Chat failed: {str(e)}") - # Access last successful state - state = chat.state -``` +# Create analysis team +analysis_team = GroupChat( + name="Market Analysis Team", + description="Comprehensive market analysis group", + agents=[data_analyst, market_expert, strategy_advisor], + speaker_fn=expertise_based, + max_turns=15 +) -## Error Handling +# Run complex analysis +history = analysis_team.run(""" + Analyze the current market conditions: + 1. Identify key trends + 2. Evaluate risks + 3. Recommend investment strategy +""") +``` -The GroupChat class includes comprehensive error handling: +### Parallel Processing ```python -try: - chat = GroupChat(agents=[analyst]) # Will raise ValueError -except ValueError as e: - print("Configuration error:", str(e)) - -try: - response = chat.run("Query") -except Exception as e: - # Access error state - error_summary = chat.get_conversation_summary() - print("Execution error:", str(e)) - print("State at error:", error_summary) +# Define multiple analysis tasks +tasks = [ + "Analyze tech sector trends", + "Evaluate real estate market", + "Review commodity prices", + "Assess global economic indicators" +] + +# Run tasks concurrently +histories = chat.concurrent_run(tasks) + +# Process results +for task, history in zip(tasks, histories): + print(f"\nAnalysis for: {task}") + for turn in history.turns: + for response in turn.responses: + print(f"{response.agent_name}: {response.message}") ``` ## Best Practices -1. **State Management**: - - Always specify a `state_path` for important conversations - - Use `save_state()` after critical operations - - Implement regular state backups for long conversations - -2. **Agent Configuration**: - - Provide clear system prompts for each agent - - Use descriptive agent names - - Consider agent expertise when setting the group objective - -3. **Performance**: - - Keep `max_rounds` reasonable (5-10 for most cases) - - Use early stopping conditions when possible - - Monitor conversation length and complexity - -4. **Error Handling**: - - Always wrap chat execution in try-except blocks - - Implement proper logging - - Save states before potentially risky operations - -## Limitations - -- Agents must either have a `run` method or be callable -- State files can grow large with many interactions -- Selector agent may need optimization for large agent groups -- Real-time streaming not supported in basic configuration - +1. **Agent Design** + - Give agents clear, specific roles + - Use detailed system prompts + - Set appropriate context lengths + - Enable retries for reliability + +2. **Speaker Functions** + - Match function to use case + - Consider conversation flow + - Handle edge cases + - Add appropriate logging + +3. **Error Handling** + - Use try-except blocks + - Log errors appropriately + - Implement retry logic + - Provide fallback responses + +4. **Performance** + - Use concurrent processing for multiple tasks + - Monitor context lengths + - Implement proper cleanup + - Cache responses when appropriate + +## API Reference + +### GroupChat Methods + +| Method | Description | Arguments | Returns | +|--------|-------------|-----------|---------| +| run | Run single conversation | task: str | ChatHistory | +| batched_run | Run multiple sequential tasks | tasks: List[str] | List[ChatHistory] | +| concurrent_run | Run multiple parallel tasks | tasks: List[str] | List[ChatHistory] | +| get_recent_messages | Get recent messages | n: int = 3 | List[str] | + +### Agent Methods + +| Method | Description | Returns | +|--------|-------------|---------| +| run | Process single task | str | +| generate_response | Generate LLM response | str | +| save_context | Save conversation context | None | \ No newline at end of file diff --git a/docs/swarms/structs/swarm_rearrange.md b/docs/swarms/structs/swarm_rearrange.md new file mode 100644 index 000000000..c40aa5b59 --- /dev/null +++ b/docs/swarms/structs/swarm_rearrange.md @@ -0,0 +1,334 @@ +# SwarmRearrange Documentation + +SwarmRearrange is a class for orchestrating multiple swarms in a sequential or parallel flow pattern. It provides thread-safe operations for managing swarm execution, history tracking, and flow validation. + +## Constructor Arguments + +| Parameter | Type | Default | Description | +|-----------|------|---------|-------------| +| id | str | UUID | Unique identifier for the swarm arrangement | +| name | str | "SwarmRearrange" | Name of the swarm arrangement | +| description | str | "A swarm of swarms..." | Description of the arrangement | +| swarms | List[Any] | [] | List of swarm objects to be managed | +| flow | str | None | Flow pattern for swarm execution | +| max_loops | int | 1 | Maximum number of execution loops | +| verbose | bool | True | Enable detailed logging | +| human_in_the_loop | bool | False | Enable human intervention | +| custom_human_in_the_loop | Callable | None | Custom function for human interaction | +| return_json | bool | False | Return results in JSON format | + +## Methods + +### add_swarm(swarm: Any) +Adds a single swarm to the arrangement. + +### remove_swarm(swarm_name: str) +Removes a swarm by name from the arrangement. + +### add_swarms(swarms: List[Any]) +Adds multiple swarms to the arrangement. + +### validate_flow() +Validates the flow pattern syntax and swarm names. + +### run(task: str = None, img: str = None, custom_tasks: Dict[str, str] = None) +Executes the swarm arrangement according to the flow pattern. + +## Flow Pattern Syntax +The flow pattern uses arrow notation (`->`) to define execution order: + +- Sequential: `"SwarmA -> SwarmB -> SwarmC"` +- Parallel: `"SwarmA, SwarmB -> SwarmC"` +- Human intervention: Use `"H"` in the flow + +## Examples + +### Basic Sequential Flow + +```python +from swarms.structs.swarm_arange import SwarmRearrange +import os +from swarms import Agent, AgentRearrange +from swarm_models import OpenAIChat + +# model = Anthropic(anthropic_api_key=os.getenv("ANTHROPIC_API_KEY")) +company = "TGSC" + +# Get the OpenAI API key from the environment variable +api_key = os.getenv("GROQ_API_KEY") + +# Model +model = OpenAIChat( + openai_api_base="https://api.groq.com/openai/v1", + openai_api_key=api_key, + model_name="llama-3.1-70b-versatile", + temperature=0.1, +) + + +# Initialize the Managing Director agent +managing_director = Agent( + agent_name="Managing-Director", + system_prompt=f""" + As the Managing Director at Blackstone, your role is to oversee the entire investment analysis process for potential acquisitions. + Your responsibilities include: + 1. Setting the overall strategy and direction for the analysis + 2. Coordinating the efforts of the various team members and ensuring a comprehensive evaluation + 3. Reviewing the findings and recommendations from each team member + 4. Making the final decision on whether to proceed with the acquisition + + For the current potential acquisition of {company}, direct the tasks for the team to thoroughly analyze all aspects of the company, including its financials, industry position, technology, market potential, and regulatory compliance. Provide guidance and feedback as needed to ensure a rigorous and unbiased assessment. + """, + llm=model, + max_loops=1, + dashboard=False, + streaming_on=True, + verbose=True, + stopping_token="", + state_save_file_type="json", + saved_state_path="managing-director.json", +) + +# Initialize the Vice President of Finance +vp_finance = Agent( + agent_name="VP-Finance", + system_prompt=f""" + As the Vice President of Finance at Blackstone, your role is to lead the financial analysis of potential acquisitions. + For the current potential acquisition of {company}, your tasks include: + 1. Conducting a thorough review of {company}' financial statements, including income statements, balance sheets, and cash flow statements + 2. Analyzing key financial metrics such as revenue growth, profitability margins, liquidity ratios, and debt levels + 3. Assessing the company's historical financial performance and projecting future performance based on assumptions and market conditions + 4. Identifying any financial risks or red flags that could impact the acquisition decision + 5. Providing a detailed report on your findings and recommendations to the Managing Director + + Be sure to consider factors such as the sustainability of {company}' business model, the strength of its customer base, and its ability to generate consistent cash flows. Your analysis should be data-driven, objective, and aligned with Blackstone's investment criteria. + """, + llm=model, + max_loops=1, + dashboard=False, + streaming_on=True, + verbose=True, + stopping_token="", + state_save_file_type="json", + saved_state_path="vp-finance.json", +) + +# Initialize the Industry Analyst +industry_analyst = Agent( + agent_name="Industry-Analyst", + system_prompt=f""" + As the Industry Analyst at Blackstone, your role is to provide in-depth research and analysis on the industries and markets relevant to potential acquisitions. + For the current potential acquisition of {company}, your tasks include: + 1. Conducting a comprehensive analysis of the industrial robotics and automation solutions industry, including market size, growth rates, key trends, and future prospects + 2. Identifying the major players in the industry and assessing their market share, competitive strengths and weaknesses, and strategic positioning + 3. Evaluating {company}' competitive position within the industry, including its market share, differentiation, and competitive advantages + 4. Analyzing the key drivers and restraints for the industry, such as technological advancements, labor costs, regulatory changes, and economic conditions + 5. Identifying potential risks and opportunities for {company} based on the industry analysis, such as disruptive technologies, emerging markets, or shifts in customer preferences + + Your analysis should provide a clear and objective assessment of the attractiveness and future potential of the industrial robotics industry, as well as {company}' positioning within it. Consider both short-term and long-term factors, and provide evidence-based insights to inform the investment decision. + """, + llm=model, + max_loops=1, + dashboard=False, + streaming_on=True, + verbose=True, + stopping_token="", + state_save_file_type="json", + saved_state_path="industry-analyst.json", +) + +# Initialize the Technology Expert +tech_expert = Agent( + agent_name="Tech-Expert", + system_prompt=f""" + As the Technology Expert at Blackstone, your role is to assess the technological capabilities, competitive advantages, and potential risks of companies being considered for acquisition. + For the current potential acquisition of {company}, your tasks include: + 1. Conducting a deep dive into {company}' proprietary technologies, including its robotics platforms, automation software, and AI capabilities + 2. Assessing the uniqueness, scalability, and defensibility of {company}' technology stack and intellectual property + 3. Comparing {company}' technologies to those of its competitors and identifying any key differentiators or technology gaps + 4. Evaluating {company}' research and development capabilities, including its innovation pipeline, engineering talent, and R&D investments + 5. Identifying any potential technology risks or disruptive threats that could impact {company}' long-term competitiveness, such as emerging technologies or expiring patents + + Your analysis should provide a comprehensive assessment of {company}' technological strengths and weaknesses, as well as the sustainability of its competitive advantages. Consider both the current state of its technology and its future potential in light of industry trends and advancements. + """, + llm=model, + max_loops=1, + dashboard=False, + streaming_on=True, + verbose=True, + stopping_token="", + state_save_file_type="json", + saved_state_path="tech-expert.json", +) + +# Initialize the Market Researcher +market_researcher = Agent( + agent_name="Market-Researcher", + system_prompt=f""" + As the Market Researcher at Blackstone, your role is to analyze the target company's customer base, market share, and growth potential to assess the commercial viability and attractiveness of the potential acquisition. + For the current potential acquisition of {company}, your tasks include: + 1. Analyzing {company}' current customer base, including customer segmentation, concentration risk, and retention rates + 2. Assessing {company}' market share within its target markets and identifying key factors driving its market position + 3. Conducting a detailed market sizing and segmentation analysis for the industrial robotics and automation markets, including identifying high-growth segments and emerging opportunities + 4. Evaluating the demand drivers and sales cycles for {company}' products and services, and identifying any potential risks or limitations to adoption + 5. Developing financial projections and estimates for {company}' revenue growth potential based on the market analysis and assumptions around market share and penetration + + Your analysis should provide a data-driven assessment of the market opportunity for {company} and the feasibility of achieving our investment return targets. Consider both bottom-up and top-down market perspectives, and identify any key sensitivities or assumptions in your projections. + """, + llm=model, + max_loops=1, + dashboard=False, + streaming_on=True, + verbose=True, + stopping_token="", + state_save_file_type="json", + saved_state_path="market-researcher.json", +) + +# Initialize the Regulatory Specialist +regulatory_specialist = Agent( + agent_name="Regulatory-Specialist", + system_prompt=f""" + As the Regulatory Specialist at Blackstone, your role is to identify and assess any regulatory risks, compliance requirements, and potential legal liabilities associated with potential acquisitions. + For the current potential acquisition of {company}, your tasks include: + 1. Identifying all relevant regulatory bodies and laws that govern the operations of {company}, including industry-specific regulations, labor laws, and environmental regulations + 2. Reviewing {company}' current compliance policies, procedures, and track record to identify any potential gaps or areas of non-compliance + 3. Assessing the potential impact of any pending or proposed changes to relevant regulations that could affect {company}' business or create additional compliance burdens + 4. Evaluating the potential legal liabilities and risks associated with {company}' products, services, and operations, including product liability, intellectual property, and customer contracts + 5. Providing recommendations on any regulatory or legal due diligence steps that should be taken as part of the acquisition process, as well as any post-acquisition integration considerations + + Your analysis should provide a comprehensive assessment of the regulatory and legal landscape surrounding {company}, and identify any material risks or potential deal-breakers. Consider both the current state and future outlook, and provide practical recommendations to mitigate identified risks. + """, + llm=model, + max_loops=1, + dashboard=False, + streaming_on=True, + verbose=True, + stopping_token="", + state_save_file_type="json", + saved_state_path="regulatory-specialist.json", +) + +# Create a list of agents +agents = [ + managing_director, + vp_finance, + industry_analyst, + tech_expert, + market_researcher, + regulatory_specialist, +] + +# Define multiple flow patterns +flows = [ + "Industry-Analyst -> Tech-Expert -> Market-Researcher -> Regulatory-Specialist -> Managing-Director -> VP-Finance", + "Managing-Director -> VP-Finance -> Industry-Analyst -> Tech-Expert -> Market-Researcher -> Regulatory-Specialist", + "Tech-Expert -> Market-Researcher -> Regulatory-Specialist -> Industry-Analyst -> Managing-Director -> VP-Finance", +] + +# Create instances of AgentRearrange for each flow pattern +blackstone_acquisition_analysis = AgentRearrange( + name="Blackstone-Acquisition-Analysis", + description="A system for analyzing potential acquisitions", + agents=agents, + flow=flows[0], +) + +blackstone_investment_strategy = AgentRearrange( + name="Blackstone-Investment-Strategy", + description="A system for evaluating investment opportunities", + agents=agents, + flow=flows[1], +) + +blackstone_market_analysis = AgentRearrange( + name="Blackstone-Market-Analysis", + description="A system for analyzing market trends and opportunities", + agents=agents, + flow=flows[2], +) + +swarm_arrange = SwarmRearrange( + swarms=[ + blackstone_acquisition_analysis, + blackstone_investment_strategy, + blackstone_market_analysis, + ], + flow=f"{blackstone_acquisition_analysis.name} -> {blackstone_investment_strategy.name} -> {blackstone_market_analysis.name}", +) + +print( + swarm_arrange.run( + "Analyze swarms, 150k revenue with 45m+ agents build, with 1.4m downloads since march 2024" + ) +) + +``` + +### Human-in-the-Loop + +```python +def custom_human_input(task): + return input(f"Review {task} and provide feedback: ") + +# Create arrangement with human intervention +arrangement = SwarmRearrange( + name="HumanAugmented", + swarms=[swarm1, swarm2], + flow="SwarmA -> H -> SwarmB", + human_in_the_loop=True, + custom_human_in_the_loop=custom_human_input +) + +# Execute with human intervention +result = arrangement.run("Initial task") +``` + +### Complex Multi-Stage Pipeline + +```python +# Define multiple flow patterns +flows = [ + "Collector -> Processor -> Analyzer", + "Analyzer -> ML -> Validator", + "Validator -> Reporter" +] + +# Create arrangements for each flow +pipelines = [ + SwarmRearrange(name=f"Pipeline{i}", swarms=swarms, flow=flow) + for i, flow in enumerate(flows) +] + +# Create master arrangement +master = SwarmRearrange( + name="MasterPipeline", + swarms=pipelines, + flow="Pipeline0 -> Pipeline1 -> Pipeline2" +) + +# Execute complete pipeline +result = master.run("Start analysis") +``` + +## Best Practices + +1. **Flow Validation**: Always validate flows before execution +2. **Error Handling**: Implement try-catch blocks around run() calls +3. **History Tracking**: Use track_history() for monitoring swarm execution +4. **Resource Management**: Set appropriate max_loops to prevent infinite execution +5. **Logging**: Enable verbose mode during development for detailed logging + +## Error Handling + +The class implements comprehensive error handling: + +```python +try: + arrangement = SwarmRearrange(swarms=swarms, flow=flow) + result = arrangement.run(task) +except ValueError as e: + logger.error(f"Flow validation error: {e}") +except Exception as e: + logger.error(f"Execution error: {e}") +``` \ No newline at end of file diff --git a/example.py b/example.py index 96670e1b7..09c70ec9c 100644 --- a/example.py +++ b/example.py @@ -7,21 +7,17 @@ agent = Agent( agent_name="Financial-Analysis-Agent", agent_description="Personal finance advisor agent", - system_prompt=FINANCIAL_AGENT_SYS_PROMPT - + "Output the token when you're done creating a portfolio of etfs, index, funds, and more for AI", + system_prompt=FINANCIAL_AGENT_SYS_PROMPT, max_loops=1, model_name="gpt-4o", dynamic_temperature_enabled=True, - user_name="Kye", + user_name="swarms_corp", retry_attempts=3, - # streaming_on=True, context_length=8192, return_step_meta=False, output_type="str", # "json", "dict", "csv" OR "string" "yaml" and auto_generate_prompt=False, # Auto generate prompt for the agent based on name, description, and system prompt, task max_tokens=4000, # max output tokens - # interactive=True, - stopping_token="", saved_state_path="agent_00.json", interactive=False, ) diff --git a/graph_swarm_example.py b/graph_swarm_example.py index ae9976732..5173a1a55 100644 --- a/graph_swarm_example.py +++ b/graph_swarm_example.py @@ -8,21 +8,21 @@ # Create agents data_collector = Agent( agent_name="Market-Data-Collector", - model_name="gpt-4o-mini", + model_name="openai/gpt-4o", max_loops=1, streaming_on=True, ) trend_analyzer = Agent( agent_name="Market-Trend-Analyzer", - model_name="gpt-4o-mini", + model_name="openai/gpt-4o", max_loops=1, streaming_on=True, ) report_generator = Agent( agent_name="Investment-Report-Generator", - model_name="gpt-4o-mini", + model_name="openai/gpt-4o", max_loops=1, streaming_on=True, ) diff --git a/new_features_examples/swarm_arange_demo.py b/new_features_examples/swarm_arange_demo.py deleted file mode 100644 index 685cff9d7..000000000 --- a/new_features_examples/swarm_arange_demo.py +++ /dev/null @@ -1,21 +0,0 @@ -from swarms.structs.swarm_arange import SwarmRearrange -from blackstone_pe.rearrange_example_blackstone import ( - blackstone_acquisition_analysis, - blackstone_investment_strategy, - blackstone_market_analysis, -) - -swarm_arrange = SwarmRearrange( - swarms=[ - blackstone_acquisition_analysis, - blackstone_investment_strategy, - blackstone_market_analysis, - ], - flow=f"{blackstone_acquisition_analysis.name} -> {blackstone_investment_strategy.name} -> {blackstone_market_analysis.name}, {blackstone_acquisition_analysis.name}", -) - -print( - swarm_arrange.run( - "Analyze swarms, 150k revenue with 45m+ agents build, with 1.4m downloads since march 2024" - ) -) diff --git a/pyproject.toml b/pyproject.toml index a2516e755..693c68866 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,7 +5,7 @@ build-backend = "poetry.core.masonry.api" [tool.poetry] name = "swarms" -version = "6.7.5" +version = "6.7.6" description = "Swarms - TGSC" license = "MIT" authors = ["Kye Gomez "] diff --git a/swarm_arange_demo.py b/swarm_arange_demo.py new file mode 100644 index 000000000..713c2cfb8 --- /dev/null +++ b/swarm_arange_demo.py @@ -0,0 +1,216 @@ +from swarms.structs.swarm_arange import SwarmRearrange +import os +from swarms import Agent, AgentRearrange +from swarm_models import OpenAIChat + +# model = Anthropic(anthropic_api_key=os.getenv("ANTHROPIC_API_KEY")) +company = "TGSC" + +# Get the OpenAI API key from the environment variable +api_key = os.getenv("GROQ_API_KEY") + +# Model +model = OpenAIChat( + openai_api_base="https://api.groq.com/openai/v1", + openai_api_key=api_key, + model_name="llama-3.1-70b-versatile", + temperature=0.1, +) + + +# Initialize the Managing Director agent +managing_director = Agent( + agent_name="Managing-Director", + system_prompt=f""" + As the Managing Director at Blackstone, your role is to oversee the entire investment analysis process for potential acquisitions. + Your responsibilities include: + 1. Setting the overall strategy and direction for the analysis + 2. Coordinating the efforts of the various team members and ensuring a comprehensive evaluation + 3. Reviewing the findings and recommendations from each team member + 4. Making the final decision on whether to proceed with the acquisition + + For the current potential acquisition of {company}, direct the tasks for the team to thoroughly analyze all aspects of the company, including its financials, industry position, technology, market potential, and regulatory compliance. Provide guidance and feedback as needed to ensure a rigorous and unbiased assessment. + """, + llm=model, + max_loops=1, + dashboard=False, + streaming_on=True, + verbose=True, + stopping_token="", + state_save_file_type="json", + saved_state_path="managing-director.json", +) + +# Initialize the Vice President of Finance +vp_finance = Agent( + agent_name="VP-Finance", + system_prompt=f""" + As the Vice President of Finance at Blackstone, your role is to lead the financial analysis of potential acquisitions. + For the current potential acquisition of {company}, your tasks include: + 1. Conducting a thorough review of {company}' financial statements, including income statements, balance sheets, and cash flow statements + 2. Analyzing key financial metrics such as revenue growth, profitability margins, liquidity ratios, and debt levels + 3. Assessing the company's historical financial performance and projecting future performance based on assumptions and market conditions + 4. Identifying any financial risks or red flags that could impact the acquisition decision + 5. Providing a detailed report on your findings and recommendations to the Managing Director + + Be sure to consider factors such as the sustainability of {company}' business model, the strength of its customer base, and its ability to generate consistent cash flows. Your analysis should be data-driven, objective, and aligned with Blackstone's investment criteria. + """, + llm=model, + max_loops=1, + dashboard=False, + streaming_on=True, + verbose=True, + stopping_token="", + state_save_file_type="json", + saved_state_path="vp-finance.json", +) + +# Initialize the Industry Analyst +industry_analyst = Agent( + agent_name="Industry-Analyst", + system_prompt=f""" + As the Industry Analyst at Blackstone, your role is to provide in-depth research and analysis on the industries and markets relevant to potential acquisitions. + For the current potential acquisition of {company}, your tasks include: + 1. Conducting a comprehensive analysis of the industrial robotics and automation solutions industry, including market size, growth rates, key trends, and future prospects + 2. Identifying the major players in the industry and assessing their market share, competitive strengths and weaknesses, and strategic positioning + 3. Evaluating {company}' competitive position within the industry, including its market share, differentiation, and competitive advantages + 4. Analyzing the key drivers and restraints for the industry, such as technological advancements, labor costs, regulatory changes, and economic conditions + 5. Identifying potential risks and opportunities for {company} based on the industry analysis, such as disruptive technologies, emerging markets, or shifts in customer preferences + + Your analysis should provide a clear and objective assessment of the attractiveness and future potential of the industrial robotics industry, as well as {company}' positioning within it. Consider both short-term and long-term factors, and provide evidence-based insights to inform the investment decision. + """, + llm=model, + max_loops=1, + dashboard=False, + streaming_on=True, + verbose=True, + stopping_token="", + state_save_file_type="json", + saved_state_path="industry-analyst.json", +) + +# Initialize the Technology Expert +tech_expert = Agent( + agent_name="Tech-Expert", + system_prompt=f""" + As the Technology Expert at Blackstone, your role is to assess the technological capabilities, competitive advantages, and potential risks of companies being considered for acquisition. + For the current potential acquisition of {company}, your tasks include: + 1. Conducting a deep dive into {company}' proprietary technologies, including its robotics platforms, automation software, and AI capabilities + 2. Assessing the uniqueness, scalability, and defensibility of {company}' technology stack and intellectual property + 3. Comparing {company}' technologies to those of its competitors and identifying any key differentiators or technology gaps + 4. Evaluating {company}' research and development capabilities, including its innovation pipeline, engineering talent, and R&D investments + 5. Identifying any potential technology risks or disruptive threats that could impact {company}' long-term competitiveness, such as emerging technologies or expiring patents + + Your analysis should provide a comprehensive assessment of {company}' technological strengths and weaknesses, as well as the sustainability of its competitive advantages. Consider both the current state of its technology and its future potential in light of industry trends and advancements. + """, + llm=model, + max_loops=1, + dashboard=False, + streaming_on=True, + verbose=True, + stopping_token="", + state_save_file_type="json", + saved_state_path="tech-expert.json", +) + +# Initialize the Market Researcher +market_researcher = Agent( + agent_name="Market-Researcher", + system_prompt=f""" + As the Market Researcher at Blackstone, your role is to analyze the target company's customer base, market share, and growth potential to assess the commercial viability and attractiveness of the potential acquisition. + For the current potential acquisition of {company}, your tasks include: + 1. Analyzing {company}' current customer base, including customer segmentation, concentration risk, and retention rates + 2. Assessing {company}' market share within its target markets and identifying key factors driving its market position + 3. Conducting a detailed market sizing and segmentation analysis for the industrial robotics and automation markets, including identifying high-growth segments and emerging opportunities + 4. Evaluating the demand drivers and sales cycles for {company}' products and services, and identifying any potential risks or limitations to adoption + 5. Developing financial projections and estimates for {company}' revenue growth potential based on the market analysis and assumptions around market share and penetration + + Your analysis should provide a data-driven assessment of the market opportunity for {company} and the feasibility of achieving our investment return targets. Consider both bottom-up and top-down market perspectives, and identify any key sensitivities or assumptions in your projections. + """, + llm=model, + max_loops=1, + dashboard=False, + streaming_on=True, + verbose=True, + stopping_token="", + state_save_file_type="json", + saved_state_path="market-researcher.json", +) + +# Initialize the Regulatory Specialist +regulatory_specialist = Agent( + agent_name="Regulatory-Specialist", + system_prompt=f""" + As the Regulatory Specialist at Blackstone, your role is to identify and assess any regulatory risks, compliance requirements, and potential legal liabilities associated with potential acquisitions. + For the current potential acquisition of {company}, your tasks include: + 1. Identifying all relevant regulatory bodies and laws that govern the operations of {company}, including industry-specific regulations, labor laws, and environmental regulations + 2. Reviewing {company}' current compliance policies, procedures, and track record to identify any potential gaps or areas of non-compliance + 3. Assessing the potential impact of any pending or proposed changes to relevant regulations that could affect {company}' business or create additional compliance burdens + 4. Evaluating the potential legal liabilities and risks associated with {company}' products, services, and operations, including product liability, intellectual property, and customer contracts + 5. Providing recommendations on any regulatory or legal due diligence steps that should be taken as part of the acquisition process, as well as any post-acquisition integration considerations + + Your analysis should provide a comprehensive assessment of the regulatory and legal landscape surrounding {company}, and identify any material risks or potential deal-breakers. Consider both the current state and future outlook, and provide practical recommendations to mitigate identified risks. + """, + llm=model, + max_loops=1, + dashboard=False, + streaming_on=True, + verbose=True, + stopping_token="", + state_save_file_type="json", + saved_state_path="regulatory-specialist.json", +) + +# Create a list of agents +agents = [ + managing_director, + vp_finance, + industry_analyst, + tech_expert, + market_researcher, + regulatory_specialist, +] + +# Define multiple flow patterns +flows = [ + "Industry-Analyst -> Tech-Expert -> Market-Researcher -> Regulatory-Specialist -> Managing-Director -> VP-Finance", + "Managing-Director -> VP-Finance -> Industry-Analyst -> Tech-Expert -> Market-Researcher -> Regulatory-Specialist", + "Tech-Expert -> Market-Researcher -> Regulatory-Specialist -> Industry-Analyst -> Managing-Director -> VP-Finance", +] + +# Create instances of AgentRearrange for each flow pattern +blackstone_acquisition_analysis = AgentRearrange( + name="Blackstone-Acquisition-Analysis", + description="A system for analyzing potential acquisitions", + agents=agents, + flow=flows[0], +) + +blackstone_investment_strategy = AgentRearrange( + name="Blackstone-Investment-Strategy", + description="A system for evaluating investment opportunities", + agents=agents, + flow=flows[1], +) + +blackstone_market_analysis = AgentRearrange( + name="Blackstone-Market-Analysis", + description="A system for analyzing market trends and opportunities", + agents=agents, + flow=flows[2], +) + +swarm_arrange = SwarmRearrange( + swarms=[ + blackstone_acquisition_analysis, + blackstone_investment_strategy, + blackstone_market_analysis, + ], + flow=f"{blackstone_acquisition_analysis.name} -> {blackstone_investment_strategy.name} -> {blackstone_market_analysis.name}", +) + +print( + swarm_arrange.run( + "Analyze swarms, 150k revenue with 45m+ agents build, with 1.4m downloads since march 2024" + ) +) diff --git a/swarms/structs/__init__.py b/swarms/structs/__init__.py index 16a93512a..234fb72c5 100644 --- a/swarms/structs/__init__.py +++ b/swarms/structs/__init__.py @@ -12,7 +12,12 @@ Node, NodeType, ) -from swarms.structs.groupchat import GroupChat, GroupChatState +from swarms.structs.groupchat import ( + GroupChat, + ChatHistory, + ChatTurn, + AgentResponse, +) from swarms.structs.majority_voting import ( MajorityVoting, majority_voting, @@ -143,5 +148,8 @@ "AsyncWorkflow", "run_agents_with_tasks_concurrently", "showcase_available_agents", - "GroupChatState", + "GroupChat", + "ChatHistory", + "ChatTurn", + "AgentResponse", ] diff --git a/swarms/structs/groupchat.py b/swarms/structs/groupchat.py index 46e798bae..39fc45521 100644 --- a/swarms/structs/groupchat.py +++ b/swarms/structs/groupchat.py @@ -1,493 +1,301 @@ -from typing import List, Dict, Optional, Union, Callable, Any -from pydantic import BaseModel, Field +import concurrent.futures from datetime import datetime -import json -from uuid import uuid4 -import logging -from swarms.structs.agent import Agent -from swarms.structs.agents_available import showcase_available_agents +from typing import Callable, List -# Configure logging -logging.basicConfig(level=logging.INFO) -logger = logging.getLogger(__name__) +from loguru import logger +from pydantic import BaseModel, Field +from swarms.structs.agent import Agent -class Message(BaseModel): - """Single message in the conversation""" +class AgentResponse(BaseModel): + agent_name: str role: str - content: str - timestamp: datetime = Field(default_factory=datetime.utcnow) + message: str + timestamp: datetime = Field(default_factory=datetime.now) + turn_number: int + preceding_context: List[str] = Field(default_factory=list) -class AgentMetadata(BaseModel): - """Metadata for tracking agent state and configuration""" +class ChatTurn(BaseModel): + turn_number: int + responses: List[AgentResponse] + task: str + timestamp: datetime = Field(default_factory=datetime.now) - agent_name: str - agent_type: str - system_prompt: Optional[str] = None - description: Optional[str] = None - config: Dict[str, Any] = Field(default_factory=dict) +class ChatHistory(BaseModel): + turns: List[ChatTurn] + total_messages: int + name: str + description: str + start_time: datetime = Field(default_factory=datetime.now) -class InteractionLog(BaseModel): - """Log entry for a single interaction""" - id: str = Field(default_factory=lambda: uuid4().hex) - agent_name: str - position: int - input_text: str - output_text: str - timestamp: datetime = Field(default_factory=datetime.utcnow) - metadata: Dict[str, Any] = Field(default_factory=dict) +SpeakerFunction = Callable[[List[str], "Agent"], bool] -class GroupChatState(BaseModel): - """Complete state of the group chat""" +def round_robin(history: List[str], agent: Agent) -> bool: + """ + Round robin speaker function. + Each agent speaks in turn, in a circular order. + """ + return True - id: str = Field(default_factory=lambda: uuid4().hex) - name: Optional[str] = None - description: Optional[str] = None - admin_name: str - group_objective: str - max_rounds: int - rules: Optional[str] = None - agent_metadata: List[AgentMetadata] - messages: List[Message] - interactions: List[InteractionLog] - created_at: datetime = Field(default_factory=datetime.utcnow) - updated_at: datetime = Field(default_factory=datetime.utcnow) +def expertise_based(history: List[str], agent: Agent) -> bool: + """ + Expertise based speaker function. + An agent speaks if their system prompt is in the last message. + """ + return ( + agent.system_prompt.lower() in history[-1].lower() + if history + else True + ) -# Todo: -# Build a function that prompts the llm to output the -# [Agent-Name] in square brackets and then the question or something -# An agentic Language notation +def random_selection(history: List[str], agent: Agent) -> bool: + """ + Random selection speaker function. + An agent speaks randomly. + """ + import random -class AgentWrapper: - """Wrapper class to standardize agent interfaces""" + return random.choice([True, False]) - def __init__( - self, - agent: Union["Agent", Callable], - agent_name: str, - system_prompt: Optional[str] = None, - ): - self.agent = agent - self.agent_name = agent_name - self.system_prompt = system_prompt - self._validate_agent() - - def _validate_agent(self): - """Validate that the agent has the required interface""" - if hasattr(self.agent, "run"): - self.run = self.agent.run - elif callable(self.agent): - self.run = self.agent - else: - raise ValueError( - "Agent must either have a 'run' method or be callable" - ) - def get_metadata(self) -> AgentMetadata: - """Extract metadata from the agent""" - return AgentMetadata( - agent_name=self.agent_name, - agent_type=type(self.agent).__name__, - system_prompt=self.system_prompt, - config={ - k: v - for k, v in self.agent.__dict__.items() - if isinstance(v, (str, int, float, bool, dict, list)) - }, - ) +def most_recent(history: List[str], agent: Agent) -> bool: + """ + Most recent speaker function. + An agent speaks if they are the last speaker. + """ + return ( + agent.agent_name == history[-1].split(":")[0].strip() + if history + else True + ) class GroupChat: - """Enhanced GroupChat manager with state persistence and comprehensive logging. - - This class implements a multi-agent chat system with the following key features: - - State persistence to disk - - Comprehensive interaction logging - - Configurable agent selection - - Early stopping conditions - - Conversation export capabilities - - The GroupChat coordinates multiple agents to have a goal-directed conversation, - with one agent speaking at a time based on a selector agent's decisions. - - Attributes: - name (Optional[str]): Name of the group chat - description (Optional[str]): Description of the group chat's purpose - agents (List[Union["Agent", Callable]]): List of participating agents - max_rounds (int): Maximum number of conversation rounds - admin_name (str): Name of the administrator - group_objective (str): The goal/objective of the conversation - selector_agent (Union["Agent", Callable]): Agent that selects next speaker - rules (Optional[str]): Rules governing the conversation - state_path (Optional[str]): Path to save conversation state - showcase_agents_on (bool): Whether to showcase agent capabilities + """ + GroupChat class to enable multiple agents to communicate in a synchronous group chat. + Each agent is aware of all other agents, every message exchanged, and the social context. """ def __init__( self, - name: Optional[str] = None, - description: Optional[str] = None, - agents: List[Union["Agent", Callable]] = None, - max_rounds: int = 10, - admin_name: str = "Admin", - group_objective: str = None, - selector_agent: Union["Agent", Callable] = None, - rules: Optional[str] = None, - state_path: Optional[str] = None, - showcase_agents_on: bool = False, + name: str = "GroupChat", + description: str = "A group chat for multiple agents", + agents: List[Agent] = [], + speaker_fn: SpeakerFunction = round_robin, + max_turns: int = 10, ): - """Initialize a new GroupChat instance. + """ + Initialize the GroupChat. Args: - name: Name of the group chat - description: Description of the group chat's purpose - agents: List of participating agents - max_rounds: Maximum number of conversation rounds - admin_name: Name of the administrator - group_objective: The goal/objective of the conversation - selector_agent: Agent that selects next speaker - rules: Rules governing the conversation - state_path: Path to save conversation state - showcase_agents_on: Whether to showcase agent capabilities - - Raises: - ValueError: If no agents are provided + name (str): Name of the group chat. + description (str): Description of the purpose of the group chat. + agents (List[Agent]): A list of agents participating in the chat. + speaker_fn (SpeakerFunction): The function to determine which agent should speak next. + max_turns (int): Maximum number of turns in the chat. """ self.name = name self.description = description self.agents = agents - self.max_rounds = max_rounds - self.admin_name = admin_name - self.group_objective = group_objective - self.selector_agent = selector_agent - self.rules = rules - self.state_path = state_path - self.showcase_agents_on = showcase_agents_on - - if not agents: - raise ValueError("At least two agents are required") - - # Generate unique state path if not provided - self.state_path = ( - state_path or f"group_chat_{uuid4().hex}.json" - ) - - # Wrap all agents to standardize interface - self.wrapped_agents = [ - AgentWrapper( - agent, - ( - f"Agent_{i}" - if not hasattr(agent, "agent_name") - else agent.agent_name - ), - ) - for i, agent in enumerate(agents) - ] - - # Configure selector agent - self.selector_agent = AgentWrapper( - selector_agent or self.wrapped_agents[0].agent, - "Selector", - "Select the next speaker based on the conversation context", - ) - - # Initialize conversation state - self.state = GroupChatState( + self.speaker_fn = speaker_fn + self.max_turns = max_turns + self.chat_history = ChatHistory( + turns=[], + total_messages=0, name=name, description=description, - admin_name=admin_name, - group_objective=group_objective, - max_rounds=max_rounds, - rules=rules, - agent_metadata=[ - agent.get_metadata() for agent in self.wrapped_agents - ], - messages=[], - interactions=[], ) - # Showcase agents if enabled - if self.showcase_agents_on is True: - self.showcase_agents() - - def showcase_agents(self): - """Showcase available agents and update their system prompts. - - This method displays agent capabilities and updates each agent's - system prompt with information about other agents in the group. + def _get_response_sync( + self, agent: Agent, prompt: str, turn_number: int + ) -> AgentResponse: """ - out = showcase_available_agents( - name=self.name, - description=self.description, - agents=self.wrapped_agents, - ) - - for agent in self.wrapped_agents: - # Initialize system_prompt if None - if agent.system_prompt is None: - agent.system_prompt = "" - agent.system_prompt += out - - def save_state(self) -> None: - """Save current conversation state to disk. - - The state is saved as a JSON file at the configured state_path. - """ - with open(self.state_path, "w") as f: - json.dump(self.state.dict(), f, default=str, indent=2) - logger.info(f"State saved to {self.state_path}") - - @classmethod - def load_state(cls, state_path: str) -> "GroupChat": - """Load GroupChat from saved state. + Get the response from an agent synchronously. Args: - state_path: Path to the saved state JSON file + agent (Agent): The agent responding. + prompt (str): The message triggering the response. + turn_number (int): The current turn number. Returns: - GroupChat: A new GroupChat instance with restored state - - Raises: - FileNotFoundError: If state file doesn't exist - json.JSONDecodeError: If state file is invalid JSON - """ - with open(state_path, "r") as f: - state_dict = json.load(f) - - # Convert loaded data back to state model - state = GroupChatState(**state_dict) - - # Initialize with minimal config, then restore state - instance = cls( - name=state.name, - admin_name=state.admin_name, - agents=[], # Temporary empty list - group_objective=state.group_objective, - ) - instance.state = state - return instance - - def _log_interaction( - self, - agent_name: str, - position: int, - input_text: str, - output_text: str, - ) -> None: - """Log a single interaction in the conversation. - - Args: - agent_name: Name of the speaking agent - position: Position in conversation sequence - input_text: Input context provided to agent - output_text: Agent's response + AgentResponse: The agent's response captured in a structured format. """ - log_entry = InteractionLog( - agent_name=agent_name, - position=position, - input_text=input_text, - output_text=output_text, - metadata={ - "current_agents": [ - a.agent_name for a in self.wrapped_agents - ], - "round": position // len(self.wrapped_agents), - }, - ) - self.state.interactions.append(log_entry) - self.save_state() - - def _add_message(self, role: str, content: str) -> None: - """Add a message to the conversation history. + try: + context = f"""You are {agent.name} with role: {agent.system_prompt}. + Other agents: {[a.name for a in self.agents if a != agent]} + Previous messages: {[t.responses[-3:] for t in self.chat_history.turns[-3:]]}""" + + message = agent.run(context + prompt) + return AgentResponse( + agent_name=agent.name, + role=agent.system_prompt, + message=message, + turn_number=turn_number, + preceding_context=self.get_recent_messages(3), + ) + except Exception as e: + logger.error(f"Error from {agent.name}: {e}") + return AgentResponse( + agent_name=agent.name, + role=agent.system_prompt, + message=f"Error generating response: {str(e)}", + turn_number=turn_number, + preceding_context=[], + ) - Args: - role: Speaker's role/name - content: Message content + def get_recent_messages(self, n: int = 3) -> List[str]: """ - message = Message(role=role, content=content) - self.state.messages.append(message) - self.save_state() - - def select_next_speaker( - self, last_speaker: AgentWrapper - ) -> AgentWrapper: - """Select the next speaker using the selector agent. + Get the most recent messages in the chat. Args: - last_speaker: The agent who spoke last + n (int): The number of recent messages to retrieve. Returns: - AgentWrapper: The next agent to speak - - Note: - Falls back to round-robin selection if selector agent fails + List[str]: The most recent messages in the chat. """ - conversation_history = "\n".join( - [ - f"{msg.role}: {msg.content}" - for msg in self.state.messages - ] - ) + messages = [] + for turn in self.chat_history.turns[-n:]: + for response in turn.responses: + messages.append( + f"{response.agent_name}: {response.message}" + ) + return messages - selection_prompt = f""" - Current speakers: {[agent.agent_name for agent in self.wrapped_agents]} - Last speaker: {last_speaker.agent_name} - Group objective: {self.state.group_objective} - - Based on the conversation history and group objective, select the next most appropriate speaker. - Only return the speaker's name. - - Conversation history: - {conversation_history} + def run(self, task: str) -> ChatHistory: """ - - try: - next_speaker_name = self.selector_agent.run( - selection_prompt - ).strip() - return next( - agent - for agent in self.wrapped_agents - if agent.agent_name in next_speaker_name - ) - except (StopIteration, Exception) as e: - logger.warning( - f"Selector agent failed: {str(e)}. Falling back to round-robin." - ) - # Fallback to round-robin if selection fails - current_idx = self.wrapped_agents.index(last_speaker) - return self.wrapped_agents[ - (current_idx + 1) % len(self.wrapped_agents) - ] - - def run(self, task: str) -> str: - """Execute the group chat conversation. + Run the group chat. Args: - task: The initial task/question to discuss + task (str): The initial message to start the chat. Returns: - str: The final response from the conversation - - Raises: - Exception: If any error occurs during execution + ChatHistory: The history of the chat. """ try: - logger.info(f"Starting GroupChat with task: {task}") - self._add_message(self.state.admin_name, task) - - current_speaker = self.wrapped_agents[0] - final_response = None - - for round_num in range(self.state.max_rounds): - # Select next speaker - current_speaker = self.select_next_speaker( - current_speaker - ) - logger.info( - f"Selected speaker: {current_speaker.agent_name}" - ) - - # Prepare context and get response - conversation_history = "\n".join( - [ - f"{msg.role}: {msg.content}" - for msg in self.state.messages[ - -10: - ] # Last 10 messages for context - ] - ) + logger.info( + f"Starting chat '{self.name}' with task: {task}" + ) - try: - response = current_speaker.run( - conversation_history - ) - final_response = response - except Exception as e: - logger.error( - f"Agent {current_speaker.agent_name} failed: {str(e)}" - ) - continue - - # Log interaction and add to message history - self._log_interaction( - current_speaker.agent_name, - round_num, - conversation_history, - response, - ) - self._add_message( - current_speaker.agent_name, response + for turn in range(self.max_turns): + current_turn = ChatTurn( + turn_number=turn, responses=[], task=task ) - # Optional: Add early stopping condition based on response content - if ( - "TASK_COMPLETE" in response - or "CONCLUSION" in response - ): - logger.info( - "Task completion detected, ending conversation" - ) - break - - return final_response or "No valid response generated" - + for agent in self.agents: + if self.speaker_fn( + self.get_recent_messages(), agent + ): + response = self._get_response_sync( + agent, task, turn + ) + current_turn.responses.append(response) + self.chat_history.total_messages += 1 + logger.debug( + f"Turn {turn}, {agent.name} responded" + ) + + self.chat_history.turns.append(current_turn) + + return self.chat_history except Exception as e: - logger.error(f"Error in GroupChat execution: {str(e)}") - raise + logger.error(f"Error in chat: {e}") + raise e - def get_conversation_summary(self) -> Dict[str, Any]: - """Return a summary of the conversation. + def batched_run(self, tasks: List[str], *args, **kwargs): + """ + Run the group chat with a batch of tasks. + + Args: + tasks (List[str]): The list of tasks to run in the chat. Returns: - Dict containing conversation metrics and status + List[ChatHistory]: The history of each chat. + """ + return [self.run(task, *args, **kwargs) for task in tasks] + + def concurrent_run(self, tasks: List[str], *args, **kwargs): """ - return { - "id": self.state.id, - "total_interactions": len(self.state.interactions), - "participating_agents": [ - agent.agent_name for agent in self.wrapped_agents - ], - "conversation_length": len(self.state.messages), - "duration": ( - datetime.utcnow() - self.state.created_at - ).total_seconds(), - "objective_completed": any( - "TASK_COMPLETE" in msg.content - for msg in self.state.messages - ), - } - - def export_conversation( - self, format: str = "json" - ) -> Union[str, Dict]: - """Export the conversation in the specified format. + Run the group chat with a batch of tasks concurrently using a thread pool. Args: - format: Output format ("json" or "text") + tasks (List[str]): The list of tasks to run in the chat. Returns: - Union[str, Dict]: Conversation in requested format - - Raises: - ValueError: If format is not supported + List[ChatHistory]: The history of each chat. """ - if format == "json": - return self.state.dict() - elif format == "text": - return "\n".join( - [ - f"{msg.role} ({msg.timestamp}): {msg.content}" - for msg in self.state.messages - ] + with concurrent.futures.ThreadPoolExecutor() as executor: + return list( + executor.map( + lambda task: self.run(task, *args, **kwargs), + tasks, + ) ) - else: - raise ValueError(f"Unsupported export format: {format}") + + +# if __name__ == "__main__": + +# load_dotenv() + +# # Get the OpenAI API key from the environment variable +# api_key = os.getenv("OPENAI_API_KEY") + +# # Create an instance of the OpenAIChat class +# model = OpenAIChat( +# openai_api_key=api_key, +# model_name="gpt-4o-mini", +# temperature=0.1, +# ) + +# # Example agents +# agent1 = Agent( +# agent_name="Financial-Analysis-Agent", +# system_prompt="You are a financial analyst specializing in investment strategies.", +# llm=model, +# max_loops=1, +# autosave=False, +# dashboard=False, +# verbose=True, +# dynamic_temperature_enabled=True, +# user_name="swarms_corp", +# retry_attempts=1, +# context_length=200000, +# output_type="string", +# streaming_on=False, +# ) + +# agent2 = Agent( +# agent_name="Tax-Adviser-Agent", +# system_prompt="You are a tax adviser who provides clear and concise guidance on tax-related queries.", +# llm=model, +# max_loops=1, +# autosave=False, +# dashboard=False, +# verbose=True, +# dynamic_temperature_enabled=True, +# user_name="swarms_corp", +# retry_attempts=1, +# context_length=200000, +# output_type="string", +# streaming_on=False, +# ) + +# agents = [agent1, agent2] + +# chat = GroupChat( +# name="Investment Advisory", +# description="Financial and tax analysis group", +# agents=agents, +# speaker_fn=expertise_based, +# ) + +# history = chat.run( +# "How to optimize tax strategy for investments?" +# ) +# print(history.model_dump_json(indent=2)) diff --git a/swarms/structs/groupchat_new.py b/swarms/structs/groupchat_new.py deleted file mode 100644 index a6aaaa7c8..000000000 --- a/swarms/structs/groupchat_new.py +++ /dev/null @@ -1,243 +0,0 @@ -import os -import asyncio -from pydantic import BaseModel, Field -from typing import List, Dict, Any -from swarms import Agent -from dotenv import load_dotenv -from swarms.utils.formatter import formatter - -# Load environment variables -load_dotenv() - -# Get OpenAI API key -api_key = os.getenv("OPENAI_API_KEY") - - -# Define Pydantic schema for agent outputs -class AgentOutput(BaseModel): - """Schema for capturing the output of each agent.""" - - agent_name: str = Field(..., description="The name of the agent") - message: str = Field( - ..., - description="The agent's response or contribution to the group chat", - ) - metadata: Dict[str, Any] = Field( - default_factory=dict, - description="Additional metadata about the agent's response", - ) - - -class GroupChat: - """ - GroupChat class to enable multiple agents to communicate in an asynchronous group chat. - Each agent is aware of all other agents, every message exchanged, and the social context. - """ - - def __init__( - self, - name: str, - description: str, - agents: List[Agent], - max_loops: int = 1, - ): - """ - Initialize the GroupChat. - - Args: - name (str): Name of the group chat. - description (str): Description of the purpose of the group chat. - agents (List[Agent]): A list of agents participating in the chat. - max_loops (int): Maximum number of loops to run through all agents. - """ - self.name = name - self.description = description - self.agents = agents - self.max_loops = max_loops - self.chat_history = ( - [] - ) # Stores all messages exchanged in the chat - - formatter.print_panel( - f"Initialized GroupChat '{self.name}' with {len(self.agents)} agents. Max loops: {self.max_loops}", - title="Groupchat Swarm", - ) - - async def _agent_conversation( - self, agent: Agent, input_message: str - ) -> AgentOutput: - """ - Facilitate a single agent's response to the chat. - - Args: - agent (Agent): The agent responding. - input_message (str): The message triggering the response. - - Returns: - AgentOutput: The agent's response captured in a structured format. - """ - formatter.print_panel( - f"Agent '{agent.agent_name}' is responding to the message: {input_message}", - title="Groupchat Swarm", - ) - response = await asyncio.to_thread(agent.run, input_message) - - output = AgentOutput( - agent_name=agent.agent_name, - message=response, - metadata={"context_length": agent.context_length}, - ) - # logger.debug(f"Agent '{agent.agent_name}' response: {response}") - return output - - async def _run(self, initial_message: str) -> List[AgentOutput]: - """ - Execute the group chat asynchronously, looping through all agents up to max_loops. - - Args: - initial_message (str): The initial message to start the chat. - - Returns: - List[AgentOutput]: The responses of all agents across all loops. - """ - formatter.print_panel( - f"Starting group chat '{self.name}' with initial message: {initial_message}", - title="Groupchat Swarm", - ) - self.chat_history.append( - {"sender": "System", "message": initial_message} - ) - - outputs = [] - for loop in range(self.max_loops): - formatter.print_panel( - f"Group chat loop {loop + 1}/{self.max_loops}", - title="Groupchat Swarm", - ) - - for agent in self.agents: - # Create a custom input message for each agent, sharing the chat history and social context - input_message = ( - f"Chat History:\n{self._format_chat_history()}\n\n" - f"Participants:\n" - + "\n".join( - [ - f"- {a.agent_name}: {a.system_prompt}" - for a in self.agents - ] - ) - + f"\n\nNew Message: {initial_message}\n\n" - f"You are '{agent.agent_name}'. Remember to keep track of the social context, who is speaking, " - f"and respond accordingly based on your role: {agent.system_prompt}." - ) - - # Collect agent's response - output = await self._agent_conversation( - agent, input_message - ) - outputs.append(output) - - # Update chat history with the agent's response - self.chat_history.append( - { - "sender": agent.agent_name, - "message": output.message, - } - ) - - formatter.print_panel( - "Group chat completed. All agent responses captured.", - title="Groupchat Swarm", - ) - return outputs - - def run(self, task: str, *args, **kwargs): - return asyncio.run(self.run(task, *args, **kwargs)) - - def _format_chat_history(self) -> str: - """ - Format the chat history for agents to understand the context. - - Returns: - str: The formatted chat history as a string. - """ - return "\n".join( - [ - f"{entry['sender']}: {entry['message']}" - for entry in self.chat_history - ] - ) - - def __str__(self) -> str: - """String representation of the group chat's outputs.""" - return self._format_chat_history() - - def to_json(self) -> str: - """JSON representation of the group chat's outputs.""" - return [ - {"sender": entry["sender"], "message": entry["message"]} - for entry in self.chat_history - ] - - -# # Example Usage -# if __name__ == "__main__": - -# load_dotenv() - -# # Get the OpenAI API key from the environment variable -# api_key = os.getenv("OPENAI_API_KEY") - -# # Create an instance of the OpenAIChat class -# model = OpenAIChat( -# openai_api_key=api_key, -# model_name="gpt-4o-mini", -# temperature=0.1, -# ) - -# # Example agents -# agent1 = Agent( -# agent_name="Financial-Analysis-Agent", -# system_prompt="You are a financial analyst specializing in investment strategies.", -# llm=model, -# max_loops=1, -# autosave=False, -# dashboard=False, -# verbose=True, -# dynamic_temperature_enabled=True, -# user_name="swarms_corp", -# retry_attempts=1, -# context_length=200000, -# output_type="string", -# streaming_on=False, -# ) - -# agent2 = Agent( -# agent_name="Tax-Adviser-Agent", -# system_prompt="You are a tax adviser who provides clear and concise guidance on tax-related queries.", -# llm=model, -# max_loops=1, -# autosave=False, -# dashboard=False, -# verbose=True, -# dynamic_temperature_enabled=True, -# user_name="swarms_corp", -# retry_attempts=1, -# context_length=200000, -# output_type="string", -# streaming_on=False, -# ) - -# # Create group chat -# group_chat = GroupChat( -# name="Financial Discussion", -# description="A group chat for financial analysis and tax advice.", -# agents=[agent1, agent2], -# ) - -# # Run the group chat -# asyncio.run( -# group_chat.run( -# "How can I establish a ROTH IRA to buy stocks and get a tax break? What are the criteria? What do you guys think?" -# ) -# ) diff --git a/swarms/structs/multi_agent_exec.py b/swarms/structs/multi_agent_exec.py index ef87a5d8c..1ee5add2f 100644 --- a/swarms/structs/multi_agent_exec.py +++ b/swarms/structs/multi_agent_exec.py @@ -1,19 +1,19 @@ import asyncio +import os +import threading from concurrent.futures import ThreadPoolExecutor -import psutil from dataclasses import dataclass -import threading -from typing import List, Any from multiprocessing import cpu_count -import os +from typing import Any, List + +import psutil from swarms.structs.agent import Agent +from swarms.structs.omni_agent_types import AgentType from swarms.utils.wrapper_clusterop import ( exec_callable_with_clusterops, ) -from swarms.structs.omni_agent_types import AgentType - def run_single_agent(agent: AgentType, task: str) -> Any: """Run a single agent synchronously""" diff --git a/swarms/structs/sequential_workflow.py b/swarms/structs/sequential_workflow.py index 61cdbb0ea..7d919c724 100644 --- a/swarms/structs/sequential_workflow.py +++ b/swarms/structs/sequential_workflow.py @@ -45,12 +45,13 @@ def __init__( self.shared_memory_system = shared_memory_system self.reliability_check() + self.flow = self.sequential_flow() self.agent_rearrange = AgentRearrange( name=name, description=description, agents=agents, - flow=self.sequential_flow(), + flow=self.flow, max_loops=max_loops, output_type=output_type, return_json=return_json, diff --git a/swarms/structs/swarm_builder.py b/swarms/structs/swarm_builder.py index f1d769b43..24ac34b02 100644 --- a/swarms/structs/swarm_builder.py +++ b/swarms/structs/swarm_builder.py @@ -1,20 +1,115 @@ import os from typing import List, Optional -from datetime import datetime +from dotenv import load_dotenv +from openai import OpenAI from pydantic import BaseModel, Field from pydantic.v1 import validator -from loguru import logger +from swarm_models import OpenAIChat from tenacity import ( retry, stop_after_attempt, wait_exponential, ) -from swarm_models import OpenAIFunctionCaller, OpenAIChat from swarms.structs.agent import Agent -from swarms.structs.swarm_router import SwarmRouter -from swarms.structs.agents_available import showcase_available_agents +from swarms.structs.swarm_router import SwarmRouter, SwarmType +from loguru import logger + +logger.add("swarm_builder.log", rotation="10 MB", backtrace=True) + +load_dotenv() + + +class OpenAIFunctionCaller: + """ + A class to interact with the OpenAI API for generating text based on a system prompt and a task. + + Attributes: + - system_prompt (str): The system prompt to guide the AI's response. + - api_key (str): The API key for the OpenAI service. + - temperature (float): The temperature parameter for the AI model, controlling randomness. + - base_model (BaseModel): The Pydantic model to parse the response into. + - max_tokens (int): The maximum number of tokens in the response. + - client (OpenAI): The OpenAI client instance. + """ + + def __init__( + self, + system_prompt: str, + api_key: str, + temperature: float, + base_model: BaseModel, + max_tokens: int = 5000, + ): + self.system_prompt = system_prompt + self.api_key = api_key + self.temperature = temperature + self.base_model = base_model + self.max_tokens = max_tokens + self.client = OpenAI(api_key=api_key) + + def run(self, task: str, *args, **kwargs) -> BaseModel: + """ + Run the OpenAI model with the system prompt and task to generate a response. + + Args: + - task (str): The task to be completed. + - *args: Additional positional arguments for the OpenAI API. + - **kwargs: Additional keyword arguments for the OpenAI API. + + Returns: + - BaseModel: The parsed response based on the base_model. + """ + completion = self.client.beta.chat.completions.parse( + model="gpt-4o-2024-08-06", + messages=[ + {"role": "system", "content": self.system_prompt}, + {"role": "user", "content": task}, + ], + response_format=self.base_model, + temperature=self.temperature, + max_tokens=self.max_tokens, + *args, + **kwargs, + ) + + return completion.choices[0].message.parsed + + @retry( + stop=stop_after_attempt(3), + wait=wait_exponential(multiplier=1, min=4, max=10), + ) + async def run_async( + self, task: str, *args, **kwargs + ) -> BaseModel: + """ + Asynchronous version of the run method. + + Args: + - task (str): The task to be completed. + - *args: Additional positional arguments for the OpenAI API. + - **kwargs: Additional keyword arguments for the OpenAI API. + + Returns: + - BaseModel: The parsed response based on the base_model. + """ + completion = ( + await self.client.beta.chat.completions.parse_async( + model="gpt-4o-2024-08-06", + messages=[ + {"role": "system", "content": self.system_prompt}, + {"role": "user", "content": task}, + ], + response_format=self.base_model, + temperature=self.temperature, + max_tokens=self.max_tokens, + *args, + **kwargs, + ) + ) + + return completion.choices[0].message.parsed BOSS_SYSTEM_PROMPT = """ @@ -59,29 +154,15 @@ class AgentConfig(BaseModel): """Configuration for an individual agent in a swarm""" name: str = Field( - description="The name of the agent", example="Research-Agent" + description="The name of the agent", ) description: str = Field( description="A description of the agent's purpose and capabilities", - example="Agent responsible for researching and gathering information", ) system_prompt: str = Field( description="The system prompt that defines the agent's behavior", - example="You are a research agent. Your role is to gather and analyze information...", ) - @validator("name") - def validate_name(cls, v): - if not v.strip(): - raise ValueError("Agent name cannot be empty") - return v.strip() - - @validator("system_prompt") - def validate_system_prompt(cls, v): - if not v.strip(): - raise ValueError("System prompt cannot be empty") - return v.strip() - class SwarmConfig(BaseModel): """Configuration for a swarm of cooperative agents""" @@ -96,7 +177,9 @@ class SwarmConfig(BaseModel): ) agents: List[AgentConfig] = Field( description="The list of agents that make up the swarm", - min_items=1, + ) + max_loops: int = Field( + description="The maximum number of loops for the swarm to iterate on", ) @validator("agents") @@ -106,23 +189,90 @@ def validate_agents(cls, v): return v +class AutoSwarmBuilderOutput(BaseModel): + """A class that automatically builds and manages swarms of AI agents with enhanced error handling.""" + + name: Optional[str] = Field( + description="The name of the swarm", + example="DefaultSwarm", + default=None, + ) + description: Optional[str] = Field( + description="The description of the swarm's purpose and capabilities", + example="Generic AI Agent Swarm", + default=None, + ) + verbose: Optional[bool] = Field( + description="Whether to display verbose output", + default=None, + ) + model_name: Optional[str] = Field( + description="The name of the OpenAI model to use", + default=None, + ) + boss_output_schema: Optional[list] = Field( + description="The schema for the output of the BOSS system prompt", + default=None, + ) + director_agents_created: Optional[SwarmConfig] = Field( + description="The agents created by the director", + default=None, + ) + swarm_router_outputs: Optional[list] = Field( + description="The outputs from the swarm router", + default=None, + ) + max_loops: Optional[int] = Field( + description="The maximum number of loops for the swarm to iterate on", + default=None, + ) + swarm_type: Optional[SwarmType] = Field( + description="The type of swarm to build", + default=None, + ) + + class AutoSwarmBuilder: """A class that automatically builds and manages swarms of AI agents with enhanced error handling.""" def __init__( self, - name: Optional[str] = None, - description: Optional[str] = None, + name: Optional[str] = "autonomous-swarm-builder", + description: Optional[ + str + ] = "Given a task, this swarm will automatically create specialized agents and route it to the appropriate agents.", verbose: bool = True, - api_key: Optional[str] = None, - model_name: str = "gpt-4", + model_name: str = "gpt-4o", + boss_output_schema: list = None, + swarm_router_outputs: AutoSwarmBuilderOutput = None, + max_loops: int = 1, + swarm_type: str = "SequentialWorkflow", + auto_generate_prompts_for_agents: bool = False, + shared_memory_system: callable = None, ): self.name = name or "DefaultSwarm" self.description = description or "Generic AI Agent Swarm" self.verbose = verbose self.agents_pool = [] - self.api_key = api_key or os.getenv("OPENAI_API_KEY") + self.api_key = os.getenv("OPENAI_API_KEY") self.model_name = model_name + self.boss_output_schema = boss_output_schema + self.max_loops = max_loops + self.swarm_type = swarm_type + self.auto_generate_prompts_for_agents = ( + auto_generate_prompts_for_agents + ) + self.shared_memory_system = shared_memory_system + self.auto_swarm_builder_output = AutoSwarmBuilderOutput( + name=name, + description=description, + verbose=verbose, + model_name=model_name, + boss_output_schema=boss_output_schema or [], + swarm_router_outputs=swarm_router_outputs or [], + max_loops=max_loops, + swarm_type=swarm_type, + ) if not self.api_key: raise ValueError( @@ -143,7 +293,6 @@ def __init__( self.chat_model = OpenAIChat( openai_api_key=self.api_key, model_name=self.model_name, - temperature=0.1, ) except Exception as e: logger.error( @@ -151,11 +300,13 @@ def __init__( ) raise - @retry( - stop=stop_after_attempt(3), - wait=wait_exponential(multiplier=1, min=4, max=10), - ) - def run(self, task: str, image_url: Optional[str] = None) -> str: + def run( + self, + task: str, + image_url: Optional[str] = None, + *args, + **kwargs, + ): """Run the swarm on a given task with error handling and retries.""" if not task or not task.strip(): raise ValueError("Task cannot be empty") @@ -164,7 +315,7 @@ def run(self, task: str, image_url: Optional[str] = None) -> str: try: # Create agents for the task - agents = self._create_agents(task, image_url) + agents = self._create_agents(task) if not agents: raise ValueError( "No agents were created for the task" @@ -175,20 +326,33 @@ def run(self, task: str, image_url: Optional[str] = None) -> str: "Routing task through swarm", extra={"num_agents": len(agents)}, ) - output = self.swarm_router(agents, task, image_url) + output = self.swarm_router( + agents=agents, + task=task, + image_url=image_url, + *args, + **kwargs, + ) + self.auto_swarm_builder_output.swarm_router_outputs.append( + output + ) + print(output) logger.info("Swarm execution completed successfully") - return output + # return output + return self.auto_swarm_builder_output.model_dump_json( + indent=4 + ) except Exception as e: logger.error( f"Error during swarm execution: {str(e)}", - exc_info=True, ) - raise + raise e def _create_agents( - self, task: str, image_url: Optional[str] = None + self, + task: str, ) -> List[Agent]: """Create the necessary agents for a task with enhanced error handling.""" logger.info("Creating agents for task", extra={"task": task}) @@ -202,7 +366,12 @@ def _create_agents( ) agents_config = model.run(task) - print(f"{agents_config}") + logger.info( + f"Director has successfully created agents: {agents_config}" + ) + self.auto_swarm_builder_output.director_agents_created = ( + agents_config + ) if isinstance(agents_config, dict): agents_config = SwarmConfig(**agents_config) @@ -224,15 +393,19 @@ def _create_agents( ) agents.append(agent) - # Add available agents showcase to system prompts - agents_available = showcase_available_agents( - name=self.name, - description=self.description, - agents=agents, - ) + print( + f"Agent created: {agent_config.name}: Description: {agent_config.description}" + ) + + # # Add available agents showcase to system prompts + # agents_available = showcase_available_agents( + # name=self.name, + # description=self.description, + # agents=agents, + # ) - for agent in agents: - agent.system_prompt += "\n" + agents_available + # for agent in agents: + # agent.system_prompt += "\n" + agents_available logger.info( "Successfully created agents", @@ -251,6 +424,8 @@ def build_agent( agent_name: str, agent_description: str, agent_system_prompt: str, + *args, + **kwargs, ) -> Agent: """Build a single agent with enhanced error handling.""" logger.info( @@ -263,18 +438,11 @@ def build_agent( description=agent_description, system_prompt=agent_system_prompt, llm=self.chat_model, - autosave=True, - dashboard=False, verbose=self.verbose, - dynamic_temperature_enabled=True, - saved_state_path=f"states/{agent_name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json", - user_name="swarms_corp", - retry_attempts=3, - context_length=200000, + dynamic_temperature_enabled=False, return_step_meta=False, output_type="str", - streaming_on=False, - auto_generate_prompt=True, + streaming_on=True, ) return agent @@ -292,7 +460,9 @@ def swarm_router( self, agents: List[Agent], task: str, - image_url: Optional[str] = None, + img: Optional[str] = None, + *args, + **kwargs, ) -> str: """Route tasks between agents in the swarm with error handling and retries.""" logger.info( @@ -305,11 +475,14 @@ def swarm_router( name=self.name, description=self.description, agents=agents, - swarm_type="auto", + swarm_type=self.swarm_type, + auto_generate_prompts=self.auto_generate_prompts_for_agents, ) - formatted_task = f"{self.name} {self.description} {task}" - result = swarm_router_instance.run(formatted_task) + # formatted_task = f"{self.name} {self.description} {task}" + result = swarm_router_instance.run( + task=task, *args, **kwargs + ) logger.info("Successfully completed swarm routing") return result @@ -324,10 +497,10 @@ def swarm_router( swarm = AutoSwarmBuilder( name="ChipDesign-Swarm", description="A swarm of specialized AI agents for chip design", - api_key="your-api-key", # Optional if set in environment - model_name="gpt-4", # Optional, defaults to gpt-4 + swarm_type="ConcurrentWorkflow", ) result = swarm.run( "Design a new AI accelerator chip optimized for transformer model inference..." ) +print(result) diff --git a/swarms/structs/swarm_matcher.py b/swarms/structs/swarm_matcher.py index 21b973a74..d1594c951 100644 --- a/swarms/structs/swarm_matcher.py +++ b/swarms/structs/swarm_matcher.py @@ -1,14 +1,15 @@ -from typing import List, Tuple, Optional +import json +from typing import List, Optional, Tuple + import numpy as np -from swarms.utils.lazy_loader import lazy_import_decorator from pydantic import BaseModel, Field -import json from tenacity import retry, stop_after_attempt, wait_exponential -from swarms.utils.loguru_logger import initialize_logger + from swarms.utils.auto_download_check_packages import ( auto_check_and_download_package, ) - +from swarms.utils.lazy_loader import lazy_import_decorator +from swarms.utils.loguru_logger import initialize_logger logger = initialize_logger(log_folder="swarm_matcher") diff --git a/swarms/structs/swarm_router.py b/swarms/structs/swarm_router.py index 190471ece..1384b4f65 100644 --- a/swarms/structs/swarm_router.py +++ b/swarms/structs/swarm_router.py @@ -130,7 +130,7 @@ def __init__( agents: List[Union[Agent, Callable]] = [], swarm_type: SwarmType = "SequentialWorkflow", # "SpreadSheetSwarm" # "auto" autosave: bool = False, - flow: str = None, + rearrange_flow: str = None, return_json: bool = False, auto_generate_prompts: bool = False, shared_memory_system: Any = None, @@ -147,7 +147,7 @@ def __init__( self.agents = agents self.swarm_type = swarm_type self.autosave = autosave - self.flow = flow + self.rearrange_flow = rearrange_flow self.return_json = return_json self.auto_generate_prompts = auto_generate_prompts self.shared_memory_system = shared_memory_system @@ -296,7 +296,7 @@ def _create_swarm( description=self.description, agents=self.agents, max_loops=self.max_loops, - flow=self.flow, + flow=self.rearrange_flow, return_json=self.return_json, output_type=self.output_type, *args, @@ -323,11 +323,7 @@ def _create_swarm( *args, **kwargs, ) - elif ( - self.swarm_type == "SequentialWorkflow" - or self.swarm_type == "sequential" - or self.swarm_type == "Sequential" - ): + elif self.swarm_type == "SequentialWorkflow": return SequentialWorkflow( name=self.name, description=self.description, @@ -382,7 +378,7 @@ def _log( logger.log(level.upper(), message) @retry(stop=stop_after_attempt(3), wait=wait_fixed(1)) - def _run(self, task: str, *args, **kwargs) -> Any: + def _run(self, task: str, img: str, *args, **kwargs) -> Any: """ Dynamically run the specified task on the selected or matched swarm type. @@ -402,11 +398,9 @@ def _run(self, task: str, *args, **kwargs) -> Any: try: self._log( "info", - f"Running task on {self.swarm_type} swarm", - task=task, - metadata=kwargs, + f"Running task on {self.swarm_type} swarm with task: {task}", ) - result = self.swarm.run(task, *args, **kwargs) + result = self.swarm.run(task=task, *args, **kwargs) self._log( "success", @@ -427,9 +421,11 @@ def _run(self, task: str, *args, **kwargs) -> Any: def run( self, task: str, + img: str = None, device: str = "cpu", all_cores: bool = True, all_gpus: bool = False, + no_clusterops: bool = True, *args, **kwargs, ) -> Any: @@ -450,15 +446,22 @@ def run( Raises: Exception: If an error occurs during task execution. """ - return exec_callable_with_clusterops( - func=self._run, - device=device, - all_cores=all_cores, - all_gpus=all_gpus, - task=task, - *args, - **kwargs, - ) + try: + if no_clusterops: + return self._run(task=task, img=img, *args, **kwargs) + else: + return exec_callable_with_clusterops( + func=self._run, + device=device, + all_cores=all_cores, + all_gpus=all_gpus, + task=task, + *args, + **kwargs, + ) + except Exception as e: + logger.error(f"Error executing task on swarm: {str(e)}") + raise def __call__(self, task: str, *args, **kwargs) -> Any: """ diff --git a/tests/structs/test_groupchat.py b/tests/structs/test_groupchat.py index 992223657..22cf1ef4b 100644 --- a/tests/structs/test_groupchat.py +++ b/tests/structs/test_groupchat.py @@ -1,222 +1,147 @@ -import pytest - +import os +from dotenv import load_dotenv from swarm_models import OpenAIChat -from swarm_models.anthropic import Anthropic from swarms.structs.agent import Agent -from swarms.structs.groupchat import GroupChat, GroupChatManager - -llm = OpenAIChat() -llm2 = Anthropic() - - -# Mock the OpenAI class for testing -class MockOpenAI: - def __init__(self, *args, **kwargs): - pass - - def generate_reply(self, content): - return {"role": "mocked_agent", "content": "Mocked Reply"} - - -# Create fixtures for agents and a sample message -@pytest.fixture -def agent1(): - return Agent(name="Agent1", llm=llm) - - -@pytest.fixture -def agent2(): - return Agent(name="Agent2", llm=llm2) - - -@pytest.fixture -def sample_message(): - return {"role": "Agent1", "content": "Hello, World!"} - - -# Test the initialization of GroupChat -def test_groupchat_initialization(agent1, agent2): - groupchat = GroupChat(agents=[agent1, agent2]) - assert len(groupchat.agents) == 2 - assert len(groupchat.messages) == 0 - assert groupchat.max_round == 10 - assert groupchat.admin_name == "Admin" - - -# Test resetting the GroupChat -def test_groupchat_reset(agent1, agent2, sample_message): - groupchat = GroupChat(agents=[agent1, agent2]) - groupchat.messages.append(sample_message) - groupchat.reset() - assert len(groupchat.messages) == 0 - - -# Test finding an agent by name -def test_groupchat_find_agent_by_name(agent1, agent2): - groupchat = GroupChat(agents=[agent1, agent2]) - found_agent = groupchat.agent_by_name("Agent1") - assert found_agent == agent1 - - -# Test selecting the next agent -def test_groupchat_select_next_agent(agent1, agent2): - groupchat = GroupChat(agents=[agent1, agent2]) - next_agent = groupchat.next_agent(agent1) - assert next_agent == agent2 +from swarms.structs.groupchat import GroupChat, expertise_based -# Add more tests for different methods and scenarios as needed - - -# Test the GroupChatManager -def test_groupchat_manager(agent1, agent2): - groupchat = GroupChat(agents=[agent1, agent2]) - selector = agent1 # Assuming agent1 is the selector - manager = GroupChatManager(groupchat, selector) - task = "Task for agent2" - reply = manager(task) - assert reply["role"] == "Agent2" - assert reply["content"] == "Reply from Agent2" - - -# Test selecting the next speaker when there is only one agent -def test_groupchat_select_speaker_single_agent(agent1): - groupchat = GroupChat(agents=[agent1]) - selector = agent1 - manager = GroupChatManager(groupchat, selector) - task = "Task for agent1" - reply = manager(task) - assert reply["role"] == "Agent1" - assert reply["content"] == "Reply from Agent1" - - -# Test selecting the next speaker when GroupChat is underpopulated -def test_groupchat_select_speaker_underpopulated(agent1, agent2): - groupchat = GroupChat(agents=[agent1, agent2]) - selector = agent1 - manager = GroupChatManager(groupchat, selector) - task = "Task for agent1" - reply = manager(task) - assert reply["role"] == "Agent2" - assert reply["content"] == "Reply from Agent2" - - -# Test formatting history -def test_groupchat_format_history(agent1, agent2, sample_message): - groupchat = GroupChat(agents=[agent1, agent2]) - groupchat.messages.append(sample_message) - formatted_history = groupchat.format_history(groupchat.messages) - expected_history = "'Agent1:Hello, World!" - assert formatted_history == expected_history - - -# Test agent names property -def test_groupchat_agent_names(agent1, agent2): - groupchat = GroupChat(agents=[agent1, agent2]) - names = groupchat.agent_names - assert len(names) == 2 - assert "Agent1" in names - assert "Agent2" in names - - -# Test GroupChatManager initialization -def test_groupchat_manager_initialization(agent1, agent2): - groupchat = GroupChat(agents=[agent1, agent2]) - selector = agent1 - manager = GroupChatManager(groupchat, selector) - assert manager.groupchat == groupchat - assert manager.selector == selector - - -# Test case to ensure GroupChatManager generates a reply from an agent -def test_groupchat_manager_generate_reply(): - # Create a GroupChat with two agents - agents = [agent1, agent2] - groupchat = GroupChat(agents=agents, messages=[], max_round=10) - - # Mock the OpenAI class and GroupChat selector - mocked_openai = MockOpenAI() - selector = agent1 - - # Initialize GroupChatManager - manager = GroupChatManager( - groupchat=groupchat, selector=selector, openai=mocked_openai +def setup_test_agents(): + model = OpenAIChat( + openai_api_key=os.getenv("OPENAI_API_KEY"), + model_name="gpt-4", + temperature=0.1, ) - # Generate a reply - task = "Write me a riddle" - reply = manager(task) + return [ + Agent( + agent_name="Agent1", + system_prompt="You only respond with 'A'", + llm=model, + ), + Agent( + agent_name="Agent2", + system_prompt="You only respond with 'B'", + llm=model, + ), + Agent( + agent_name="Agent3", + system_prompt="You only respond with 'C'", + llm=model, + ), + ] + + +def test_round_robin_speaking(): + chat = GroupChat(agents=setup_test_agents()) + history = chat.run("Say your letter") + + # Verify agents speak in order + responses = [ + r.message for t in history.turns for r in t.responses + ] + assert responses == ["A", "B", "C"] * (len(history.turns)) + + +def test_concurrent_processing(): + chat = GroupChat(agents=setup_test_agents()) + tasks = ["Task1", "Task2", "Task3"] + histories = chat.concurrent_run(tasks) + + assert len(histories) == len(tasks) + for history in histories: + assert history.total_messages > 0 + + +def test_expertise_based_speaking(): + agents = setup_test_agents() + chat = GroupChat(agents=agents, speaker_fn=expertise_based) + + # Test each agent's expertise trigger + for agent in agents: + history = chat.run(f"Trigger {agent.system_prompt}") + first_response = history.turns[0].responses[0] + assert first_response.agent_name == agent.agent_name + + +def test_max_turns_limit(): + max_turns = 3 + chat = GroupChat(agents=setup_test_agents(), max_turns=max_turns) + history = chat.run("Test message") + + assert len(history.turns) == max_turns + + +def test_error_handling(): + broken_agent = Agent( + agent_name="BrokenAgent", + system_prompt="You raise errors", + llm=None, + ) - # Check if a valid reply is generated - assert "role" in reply - assert "content" in reply - assert reply["role"] in groupchat.agent_names + chat = GroupChat(agents=[broken_agent]) + history = chat.run("Trigger error") + assert "Error" in history.turns[0].responses[0].message -# Test case to ensure GroupChat selects the next speaker correctly -def test_groupchat_select_speaker(): - agent3 = Agent(name="agent3", llm=llm) - agents = [agent1, agent2, agent3] - groupchat = GroupChat(agents=agents, messages=[], max_round=10) - # Initialize GroupChatManager with agent1 as selector - selector = agent1 - manager = GroupChatManager(groupchat=groupchat, selector=selector) +def test_conversation_context(): + agents = setup_test_agents() + complex_prompt = "Previous message refers to A. Now trigger B. Finally discuss C." - # Simulate selecting the next speaker - last_speaker = agent1 - next_speaker = manager.select_speaker( - last_speaker=last_speaker, selector=selector - ) + chat = GroupChat(agents=agents, speaker_fn=expertise_based) + history = chat.run(complex_prompt) - # Ensure the next speaker is agent2 - assert next_speaker == agent2 + responses = [ + r.agent_name for t in history.turns for r in t.responses + ] + assert all(agent.agent_name in responses for agent in agents) -# Test case to ensure GroupChat handles underpopulated group correctly -def test_groupchat_underpopulated_group(): - agent1 = Agent(name="agent1", llm=llm) - agents = [agent1] - groupchat = GroupChat(agents=agents, messages=[], max_round=10) +def test_large_agent_group(): + large_group = setup_test_agents() * 5 # 15 agents + chat = GroupChat(agents=large_group) + history = chat.run("Test scaling") - # Initialize GroupChatManager with agent1 as selector - selector = agent1 - manager = GroupChatManager(groupchat=groupchat, selector=selector) + assert history.total_messages > len(large_group) - # Simulate selecting the next speaker in an underpopulated group - last_speaker = agent1 - next_speaker = manager.select_speaker( - last_speaker=last_speaker, selector=selector - ) - # Ensure the next speaker is the same as the last speaker in an underpopulated group - assert next_speaker == last_speaker +def test_long_conversations(): + chat = GroupChat(agents=setup_test_agents(), max_turns=50) + history = chat.run("Long conversation test") + assert len(history.turns) == 50 + assert history.total_messages > 100 -# Test case to ensure GroupChatManager handles the maximum rounds correctly -def test_groupchat_max_rounds(): - agents = [agent1, agent2] - groupchat = GroupChat(agents=agents, messages=[], max_round=2) - # Initialize GroupChatManager with agent1 as selector - selector = agent1 - manager = GroupChatManager(groupchat=groupchat, selector=selector) +def test_stress_batched_runs(): + chat = GroupChat(agents=setup_test_agents()) + tasks = ["Task"] * 100 + histories = chat.batched_run(tasks) - # Simulate the conversation with max rounds - last_speaker = agent1 - for _ in range(2): - next_speaker = manager.select_speaker( - last_speaker=last_speaker, selector=selector - ) - last_speaker = next_speaker + assert len(histories) == len(tasks) + total_messages = sum(h.total_messages for h in histories) + assert total_messages > len(tasks) * 3 - # Try one more round, should stay with the last speaker - next_speaker = manager.select_speaker( - last_speaker=last_speaker, selector=selector - ) - # Ensure the next speaker is the same as the last speaker after reaching max rounds - assert next_speaker == last_speaker +if __name__ == "__main__": + load_dotenv() + functions = [ + test_round_robin_speaking, + test_concurrent_processing, + test_expertise_based_speaking, + test_max_turns_limit, + test_error_handling, + test_conversation_context, + test_large_agent_group, + test_long_conversations, + test_stress_batched_runs, + ] -# Continue adding more test cases as needed to cover various scenarios and functionalities of the code. + for func in functions: + try: + print(f"Running {func.__name__}...") + func() + print("✓ Passed") + except Exception as e: + print(f"✗ Failed: {str(e)}")