From cd569322d326e204c987e35d441fd20e0dd06230 Mon Sep 17 00:00:00 2001 From: Kye Gomez Date: Thu, 2 Jan 2025 14:37:53 -0500 Subject: [PATCH] [MultiAgentRouter] --- README.md | 50 +++ api/main.py | 165 ++++--- docs/mkdocs.yml | 1 + docs/swarms/structs/multi_agent_router.md | 331 +++++++++++++++ multi_agent_router_example.py | 41 ++ voice.py => new_features_examples/voice.py | 113 +++-- swarms/structs/__init__.py | 2 + swarms/structs/multi_agent_orchestrator.py | 401 ++++++++++++++++++ swarms/structs/swarm_builder.py | 26 +- swarms/structs/swarm_output_type.py | 23 + .../structs/test_multi_agent_orchestrator.py | 219 ++++++++++ unique_swarms_examples.py | 134 +++--- 12 files changed, 1344 insertions(+), 162 deletions(-) create mode 100644 docs/swarms/structs/multi_agent_router.md create mode 100644 multi_agent_router_example.py rename voice.py => new_features_examples/voice.py (76%) create mode 100644 swarms/structs/multi_agent_orchestrator.py create mode 100644 swarms/structs/swarm_output_type.py create mode 100644 tests/structs/test_multi_agent_orchestrator.py diff --git a/README.md b/README.md index 052ce10e6..2451e49f4 100644 --- a/README.md +++ b/README.md @@ -1918,6 +1918,56 @@ if __name__ == "__main__": ``` +--- + +## MultiAgentRouter + +The MultiAgentRouter is a swarm architecture designed to dynamically assign tasks to the most suitable agent. It achieves this through a director or boss entity that utilizes function calls to identify and allocate tasks to the agent best equipped to handle them. + +```python +from swarms import Agent +from swarms.structs.multi_agent_orchestrator import MultiAgentRouter + +# Example usage: +if __name__ == "__main__": + # Define some example agents + agents = [ + Agent( + agent_name="ResearchAgent", + description="Specializes in researching topics and providing detailed, factual information", + system_prompt="You are a research specialist. Provide detailed, well-researched information about any topic, citing sources when possible.", + model_name="openai/gpt-4o", + ), + Agent( + agent_name="CodeExpertAgent", + description="Expert in writing, reviewing, and explaining code across multiple programming languages", + system_prompt="You are a coding expert. Write, review, and explain code with a focus on best practices and clean code principles.", + model_name="openai/gpt-4o", + ), + Agent( + agent_name="WritingAgent", + description="Skilled in creative and technical writing, content creation, and editing", + system_prompt="You are a writing specialist. Create, edit, and improve written content while maintaining appropriate tone and style.", + model_name="openai/gpt-4o", + ), + ] + + # Initialize routers with different configurations + router_execute = MultiAgentRouter(agents=agents, execute_task=True) + + # Example task + task = "Write a Python function to calculate fibonacci numbers" + + try: + # Process the task with execution + print("\nWith task execution:") + result_execute = router_execute.route_task(task) + print(result_execute) + + except Exception as e: + print(f"Error occurred: {str(e)}") +``` + ---------- diff --git a/api/main.py b/api/main.py index 75367f9b8..dc0203388 100644 --- a/api/main.py +++ b/api/main.py @@ -36,6 +36,7 @@ class APIKey(BaseModel): """Model matching Supabase api_keys table""" + id: UUID created_at: datetime name: str @@ -44,12 +45,14 @@ class APIKey(BaseModel): limit_credit_dollar: Optional[float] = None is_deleted: bool = False + class User(BaseModel): id: UUID name: str is_active: bool = True is_admin: bool = False + @lru_cache() def get_supabase() -> Client: """Get cached Supabase client""" @@ -59,11 +62,14 @@ def get_supabase() -> Client: raise ValueError("Supabase configuration is missing") return create_client(supabase_url, supabase_key) + async def get_current_user( - api_key: str = Header(..., description="API key for authentication"), + api_key: str = Header( + ..., description="API key for authentication" + ), ) -> User: """Validate API key against Supabase and return current user.""" - if not api_key or not api_key.startswith('sk-'): + if not api_key or not api_key.startswith("sk-"): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid API key format", @@ -72,12 +78,18 @@ async def get_current_user( try: supabase = get_supabase() - + # Query the api_keys table - response = supabase.table('api_keys').select( - 'id, name, user_id, key, limit_credit_dollar, is_deleted' - ).eq('key', api_key).single().execute() - + response = ( + supabase.table("api_keys") + .select( + "id, name, user_id, key, limit_credit_dollar, is_deleted" + ) + .eq("key", api_key) + .single() + .execute() + ) + if not response.data: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, @@ -86,27 +98,30 @@ async def get_current_user( ) key_data = response.data - + # Check if key is deleted - if key_data['is_deleted']: + if key_data["is_deleted"]: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="API key has been deleted", headers={"WWW-Authenticate": "ApiKey"}, ) - + # Check credit limit if applicable - if key_data['limit_credit_dollar'] is not None and key_data['limit_credit_dollar'] <= 0: + if ( + key_data["limit_credit_dollar"] is not None + and key_data["limit_credit_dollar"] <= 0 + ): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, - detail="API key credit limit exceeded" + detail="API key credit limit exceeded", ) # Create user object return User( - id=key_data['user_id'], - name=key_data['name'], - is_active=not key_data['is_deleted'] + id=key_data["user_id"], + name=key_data["name"], + is_active=not key_data["is_deleted"], ) except Exception as e: @@ -291,7 +306,9 @@ class AgentStore: def __init__(self): self.agents: Dict[UUID, Agent] = {} self.agent_metadata: Dict[UUID, Dict[str, Any]] = {} - self.user_agents: Dict[UUID, List[UUID]] = {} # user_id -> [agent_ids] + self.user_agents: Dict[UUID, List[UUID]] = ( + {} + ) # user_id -> [agent_ids] self.executor = ThreadPoolExecutor(max_workers=4) self._ensure_directories() @@ -707,8 +724,7 @@ def _setup_routes(self): """Set up API routes with Supabase authentication.""" @self.app.get( - "/v1/users/me/agents", - response_model=List[AgentSummary] + "/v1/users/me/agents", response_model=List[AgentSummary] ) async def list_user_agents( current_user: User = Depends(get_current_user), @@ -716,10 +732,14 @@ async def list_user_agents( status: Optional[AgentStatus] = None, ): """List all agents owned by the current user.""" - user_agents = self.store.user_agents.get(current_user.id, []) + user_agents = self.store.user_agents.get( + current_user.id, [] + ) return [ agent - for agent in await self.store.list_agents(tags, status) + for agent in await self.store.list_agents( + tags, status + ) if agent.agent_id in user_agents ] @@ -745,107 +765,119 @@ async def list_agents( agents = await self.store.list_agents(tags, status) # Filter agents based on user access return [ - agent for agent in agents - if await self.store.verify_agent_access(agent.agent_id, current_user.id) + agent + for agent in agents + if await self.store.verify_agent_access( + agent.agent_id, current_user.id + ) ] @self.app.patch( - "/v1/agent/{agent_id}", - response_model=Dict[str, str] + "/v1/agent/{agent_id}", response_model=Dict[str, str] ) async def update_agent( agent_id: UUID, update: AgentUpdate, - current_user: User = Depends(get_current_user) + current_user: User = Depends(get_current_user), ): """Update an existing agent's configuration.""" - if not await self.store.verify_agent_access(agent_id, current_user.id): + if not await self.store.verify_agent_access( + agent_id, current_user.id + ): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, - detail="Not authorized to update this agent" + detail="Not authorized to update this agent", ) - + await self.store.update_agent(agent_id, update) return {"status": "updated"} @self.app.get( "/v1/agent/{agent_id}/metrics", - response_model=AgentMetrics + response_model=AgentMetrics, ) async def get_agent_metrics( agent_id: UUID, - current_user: User = Depends(get_current_user) + current_user: User = Depends(get_current_user), ): """Get performance metrics for a specific agent.""" - if not await self.store.verify_agent_access(agent_id, current_user.id): + if not await self.store.verify_agent_access( + agent_id, current_user.id + ): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, - detail="Not authorized to view this agent's metrics" + detail="Not authorized to view this agent's metrics", ) - + return await self.store.get_agent_metrics(agent_id) @self.app.post( "/v1/agent/{agent_id}/clone", - response_model=Dict[str, UUID] + response_model=Dict[str, UUID], ) async def clone_agent( agent_id: UUID, new_name: str, - current_user: User = Depends(get_current_user) + current_user: User = Depends(get_current_user), ): """Clone an existing agent with a new name.""" - if not await self.store.verify_agent_access(agent_id, current_user.id): + if not await self.store.verify_agent_access( + agent_id, current_user.id + ): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, - detail="Not authorized to clone this agent" + detail="Not authorized to clone this agent", ) - + new_id = await self.store.clone_agent(agent_id, new_name) # Add the cloned agent to user's agents if current_user.id not in self.store.user_agents: self.store.user_agents[current_user.id] = [] self.store.user_agents[current_user.id].append(new_id) - + return {"agent_id": new_id} @self.app.delete("/v1/agent/{agent_id}") async def delete_agent( agent_id: UUID, - current_user: User = Depends(get_current_user) + current_user: User = Depends(get_current_user), ): """Delete an agent.""" - if not await self.store.verify_agent_access(agent_id, current_user.id): + if not await self.store.verify_agent_access( + agent_id, current_user.id + ): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, - detail="Not authorized to delete this agent" + detail="Not authorized to delete this agent", ) - + await self.store.delete_agent(agent_id) # Remove from user's agents list if current_user.id in self.store.user_agents: self.store.user_agents[current_user.id] = [ - aid for aid in self.store.user_agents[current_user.id] + aid + for aid in self.store.user_agents[current_user.id] if aid != agent_id ] return {"status": "deleted"} @self.app.post( - "/v1/agent/completions", - response_model=CompletionResponse + "/v1/agent/completions", response_model=CompletionResponse ) async def create_completion( request: CompletionRequest, background_tasks: BackgroundTasks, - current_user: User = Depends(get_current_user) + current_user: User = Depends(get_current_user), ): """Process a completion request with the specified agent.""" - if not await self.store.verify_agent_access(request.agent_id, current_user.id): + if not await self.store.verify_agent_access( + request.agent_id, current_user.id + ): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, - detail="Not authorized to use this agent" + detail="Not authorized to use this agent", ) - + try: agent = await self.store.get_agent(request.agent_id) @@ -855,13 +887,12 @@ async def create_completion( request.prompt, request.agent_id, request.max_tokens, - request.temperature_override + request.temperature_override, ) # Schedule background cleanup background_tasks.add_task( - self._cleanup_old_metrics, - request.agent_id + self._cleanup_old_metrics, request.agent_id ) return response @@ -870,43 +901,47 @@ async def create_completion( logger.error(f"Error processing completion: {str(e)}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail=f"Error processing completion: {str(e)}" + detail=f"Error processing completion: {str(e)}", ) @self.app.get("/v1/agent/{agent_id}/status") async def get_agent_status( agent_id: UUID, - current_user: User = Depends(get_current_user) + current_user: User = Depends(get_current_user), ): """Get the current status of an agent.""" - if not await self.store.verify_agent_access(agent_id, current_user.id): + if not await self.store.verify_agent_access( + agent_id, current_user.id + ): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, - detail="Not authorized to view this agent's status" + detail="Not authorized to view this agent's status", ) - + metadata = self.store.agent_metadata.get(agent_id) if not metadata: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, - detail=f"Agent {agent_id} not found" + detail=f"Agent {agent_id} not found", ) - + return { "agent_id": agent_id, "status": metadata["status"], "last_used": metadata["last_used"], "total_completions": metadata["total_completions"], - "error_count": metadata["error_count"] + "error_count": metadata["error_count"], } - + @self.app.get("/health") async def health_check(): """Health check endpoint - no auth required.""" try: # Test Supabase connection supabase = get_supabase() - supabase.table('api_keys').select('count', count='exact').execute() + supabase.table("api_keys").select( + "count", count="exact" + ).execute() return {"status": "healthy", "database": "connected"} except Exception as e: logger.error(f"Health check failed: {str(e)}") @@ -915,8 +950,8 @@ async def health_check(): content={ "status": "unhealthy", "database": "disconnected", - "error": str(e) - } + "error": str(e), + }, ) async def _cleanup_old_metrics(self, agent_id: UUID): diff --git a/docs/mkdocs.yml b/docs/mkdocs.yml index 022efe825..2afbb8af9 100644 --- a/docs/mkdocs.yml +++ b/docs/mkdocs.yml @@ -185,6 +185,7 @@ nav: - SwarmRouter: "swarms/structs/swarm_router.md" - TaskQueueSwarm: "swarms/structs/taskqueue_swarm.md" - SwarmRearrange: "swarms/structs/swarm_rearrange.md" + - MultiAgentRouter: "swarms/structs/multi_agent_router.md" - Various Execution Methods: "swarms/structs/various_execution_methods.md" - Workflows: - ConcurrentWorkflow: "swarms/structs/concurrentworkflow.md" diff --git a/docs/swarms/structs/multi_agent_router.md b/docs/swarms/structs/multi_agent_router.md new file mode 100644 index 000000000..1e9107ce9 --- /dev/null +++ b/docs/swarms/structs/multi_agent_router.md @@ -0,0 +1,331 @@ +# MultiAgentRouter Documentation + +The MultiAgentRouter is a sophisticated task routing system that efficiently delegates tasks to specialized AI agents. It uses a "boss" agent to analyze incoming tasks and route them to the most appropriate specialized agent based on their capabilities and expertise. + +## Table of Contents +- [Installation](#installation) +- [Key Components](#key-components) +- [Arguments](#arguments) +- [Methods](#methods) +- [Usage Examples](#usage-examples) + - [Healthcare](#healthcare-example) + - [Finance](#finance-example) + - [Legal](#legal-example) + - [Research](#research-example) + +## Installation + +```bash +pip install swarms +``` + +## Key Components + +### Arguments Table + +| Argument | Type | Default | Description | +|----------|------|---------|-------------| +| name | str | "swarm-router" | Name identifier for the router instance | +| description | str | "Routes tasks..." | Description of the router's purpose | +| agents | List[Agent] | [] | List of available specialized agents | +| model | str | "gpt-4o-mini" | Base language model for the boss agent | +| temperature | float | 0.1 | Temperature parameter for model outputs | +| shared_memory_system | callable | None | Optional shared memory system | +| output_type | Literal["json", "string"] | "json" | Format of agent outputs | +| execute_task | bool | True | Whether to execute routed tasks | + +### Methods Table + +| Method | Arguments | Returns | Description | +|--------|-----------|---------|-------------| +| route_task | task: str | dict | Routes a single task to appropriate agent | +| batch_route | tasks: List[str] | List[dict] | Sequentially routes multiple tasks | +| concurrent_batch_route | tasks: List[str] | List[dict] | Concurrently routes multiple tasks | +| query_ragent | task: str | str | Queries the research agent | +| find_agent_in_list | agent_name: str | Optional[Agent] | Finds agent by name | + +## Production Examples + +### Healthcare Example + +```python +from swarms import Agent, MultiAgentRouter + +# Define specialized healthcare agents +agents = [ + Agent( + agent_name="DiagnosisAgent", + description="Specializes in preliminary symptom analysis and diagnostic suggestions", + system_prompt="""You are a medical diagnostic assistant. Analyze symptoms and provide + evidence-based diagnostic suggestions, always noting this is for informational purposes + only and recommending professional medical consultation.""", + model_name="openai/gpt-4o" + ), + Agent( + agent_name="TreatmentPlanningAgent", + description="Assists in creating treatment plans and medical documentation", + system_prompt="""You are a treatment planning assistant. Help create structured + treatment plans based on confirmed diagnoses, following medical best practices + and guidelines.""", + model_name="openai/gpt-4o" + ), + Agent( + agent_name="MedicalResearchAgent", + description="Analyzes medical research papers and clinical studies", + system_prompt="""You are a medical research analyst. Analyze and summarize medical + research papers, clinical trials, and scientific studies, providing evidence-based + insights.""", + model_name="openai/gpt-4o" + ) +] + +# Initialize router +healthcare_router = MultiAgentRouter( + name="Healthcare-Router", + description="Routes medical and healthcare-related tasks to specialized agents", + agents=agents, + model="gpt-4o", + temperature=0.1 +) + +# Example usage +try: + # Process medical case + case_analysis = healthcare_router.route_task( + """Patient presents with: + - Persistent dry cough for 3 weeks + - Mild fever (38.1°C) + - Fatigue + Analyze symptoms and suggest potential diagnoses for healthcare provider review.""" + ) + + # Research treatment options + treatment_research = healthcare_router.route_task( + """Find recent clinical studies on treatment efficacy for community-acquired + pneumonia in adult patients, focusing on outpatient care.""" + ) + + # Process multiple cases concurrently + cases = [ + "Case 1: Patient symptoms...", + "Case 2: Patient symptoms...", + "Case 3: Patient symptoms..." + ] + concurrent_results = healthcare_router.concurrent_batch_route(cases) + +except Exception as e: + logger.error(f"Error in healthcare processing: {str(e)}") +``` + +### Finance Example + +```python +# Define specialized finance agents +finance_agents = [ + Agent( + agent_name="MarketAnalysisAgent", + description="Analyzes market trends and provides trading insights", + system_prompt="""You are a financial market analyst. Analyze market data, trends, + and indicators to provide evidence-based market insights and trading suggestions.""", + model_name="openai/gpt-4o" + ), + Agent( + agent_name="RiskAssessmentAgent", + description="Evaluates financial risks and compliance requirements", + system_prompt="""You are a risk assessment specialist. Analyze financial data + and operations for potential risks, ensuring regulatory compliance and suggesting + risk mitigation strategies.""", + model_name="openai/gpt-4o" + ), + Agent( + agent_name="InvestmentAgent", + description="Provides investment strategies and portfolio management", + system_prompt="""You are an investment strategy specialist. Develop and analyze + investment strategies, portfolio allocations, and provide long-term financial + planning guidance.""", + model_name="openai/gpt-4o" + ) +] + +# Initialize finance router +finance_router = MultiAgentRouter( + name="Finance-Router", + description="Routes financial analysis and investment tasks", + agents=finance_agents +) + +# Example tasks +tasks = [ + """Analyze current market conditions for technology sector, focusing on: + - AI/ML companies + - Semiconductor manufacturers + - Cloud service providers + Provide risk assessment and investment opportunities.""", + + """Develop a diversified portfolio strategy for a conservative investor with: + - Investment horizon: 10 years + - Risk tolerance: Low to medium + - Initial investment: $500,000 + - Monthly contribution: $5,000""", + + """Conduct risk assessment for a fintech startup's crypto trading platform: + - Regulatory compliance requirements + - Security measures + - Operational risks + - Market risks""" +] + +# Process tasks concurrently +results = finance_router.concurrent_batch_route(tasks) +``` + +### Legal Example + +```python +# Define specialized legal agents +legal_agents = [ + Agent( + agent_name="ContractAnalysisAgent", + description="Analyzes legal contracts and documents", + system_prompt="""You are a legal document analyst. Review contracts and legal + documents for key terms, potential issues, and compliance requirements.""", + model_name="openai/gpt-4o" + ), + Agent( + agent_name="ComplianceAgent", + description="Ensures regulatory compliance and updates", + system_prompt="""You are a legal compliance specialist. Monitor and analyze + regulatory requirements, ensuring compliance and suggesting necessary updates + to policies and procedures.""", + model_name="openai/gpt-4o" + ), + Agent( + agent_name="LegalResearchAgent", + description="Conducts legal research and case analysis", + system_prompt="""You are a legal researcher. Research relevant cases, statutes, + and regulations, providing comprehensive legal analysis and citations.""", + model_name="openai/gpt-4o" + ) +] + +# Initialize legal router +legal_router = MultiAgentRouter( + name="Legal-Router", + description="Routes legal analysis and compliance tasks", + agents=legal_agents +) + +# Example usage for legal department +contract_analysis = legal_router.route_task( + """Review the following software licensing agreement: + [contract text] + + Analyze for: + 1. Key terms and conditions + 2. Potential risks and liabilities + 3. Compliance with current regulations + 4. Suggested modifications""" +) +``` + +## Error Handling and Best Practices + +1. Always use try-except blocks for task routing: +```python +try: + result = router.route_task(task) +except Exception as e: + logger.error(f"Task routing failed: {str(e)}") +``` + +2. Monitor agent performance: +```python +if result["execution"]["execution_time"] > 5.0: + logger.warning(f"Long execution time for task: {result['task']['original']}") +``` + +3. Implement rate limiting for concurrent tasks: +```python +from concurrent.futures import ThreadPoolExecutor +with ThreadPoolExecutor(max_workers=5) as executor: + results = router.concurrent_batch_route(tasks) +``` + +4. Regular agent validation: +```python +for agent in router.agents.values(): + if not agent.validate(): + logger.error(f"Agent validation failed: {agent.name}") +``` + +## Performance Considerations + +1. Task Batching + +- Group similar tasks together + +- Use concurrent_batch_route for independent tasks + +- Monitor memory usage with large batches + +2. Model Selection + +- Choose appropriate models based on task complexity + +- Balance speed vs. accuracy requirements + +- Consider cost implications + +3. Response Caching + +- Implement caching for frequently requested analyses + +- Use shared memory system for repeated queries + +- Regular cache invalidation for time-sensitive data + +## Security Considerations + +1. Data Privacy + +- Implement data encryption + +- Handle sensitive information appropriately + +- Regular security audits + +2. Access Control + +- Implement role-based access + +- Audit logging + +- Regular permission reviews + +## Monitoring and Logging + +1. Performance Metrics + +- Response times + +- Success rates + +- Error rates + +- Resource utilization + +2. Logging + +- Use structured logging + +- Implement log rotation + +- Regular log analysis + +3. Alerts + +- Set up alerting for critical errors + +- Monitor resource usage + +- Track API rate limits \ No newline at end of file diff --git a/multi_agent_router_example.py b/multi_agent_router_example.py new file mode 100644 index 000000000..209f0cffb --- /dev/null +++ b/multi_agent_router_example.py @@ -0,0 +1,41 @@ +from swarms import Agent +from swarms.structs.multi_agent_orchestrator import MultiAgentRouter + +# Example usage: +if __name__ == "__main__": + # Define some example agents + agents = [ + Agent( + agent_name="ResearchAgent", + description="Specializes in researching topics and providing detailed, factual information", + system_prompt="You are a research specialist. Provide detailed, well-researched information about any topic, citing sources when possible.", + model_name="openai/gpt-4o", + ), + Agent( + agent_name="CodeExpertAgent", + description="Expert in writing, reviewing, and explaining code across multiple programming languages", + system_prompt="You are a coding expert. Write, review, and explain code with a focus on best practices and clean code principles.", + model_name="openai/gpt-4o", + ), + Agent( + agent_name="WritingAgent", + description="Skilled in creative and technical writing, content creation, and editing", + system_prompt="You are a writing specialist. Create, edit, and improve written content while maintaining appropriate tone and style.", + model_name="openai/gpt-4o", + ), + ] + + # Initialize routers with different configurations + router_execute = MultiAgentRouter(agents=agents, execute_task=True) + + # Example task + task = "Write a Python function to calculate fibonacci numbers" + + try: + # Process the task with execution + print("\nWith task execution:") + result_execute = router_execute.route_task(task) + print(result_execute) + + except Exception as e: + print(f"Error occurred: {str(e)}") diff --git a/voice.py b/new_features_examples/voice.py similarity index 76% rename from voice.py rename to new_features_examples/voice.py index e09b406d1..e0f20752f 100644 --- a/voice.py +++ b/new_features_examples/voice.py @@ -1,4 +1,3 @@ - from __future__ import annotations import asyncio @@ -14,12 +13,14 @@ import pyaudio except ImportError: import subprocess + subprocess.check_call(["pip", "install", "pyaudio"]) import pyaudio try: import sounddevice as sd except ImportError: import subprocess + subprocess.check_call(["pip", "install", "sounddevice"]) import sounddevice as sd from loguru import logger @@ -33,6 +34,7 @@ from pydub import AudioSegment except ImportError: import subprocess + subprocess.check_call(["pip", "install", "pydub"]) from pydub import AudioSegment @@ -52,9 +54,16 @@ def audio_to_pcm16_base64(audio_bytes: bytes) -> bytes: # load the audio file from the byte stream audio = AudioSegment.from_file(io.BytesIO(audio_bytes)) - print(f"Loaded audio: {audio.frame_rate=} {audio.channels=} {audio.sample_width=} {audio.frame_width=}") + print( + f"Loaded audio: {audio.frame_rate=} {audio.channels=} {audio.sample_width=} {audio.frame_width=}" + ) # resample to 24kHz mono pcm16 - pcm_audio = audio.set_frame_rate(SAMPLE_RATE).set_channels(CHANNELS).set_sample_width(2).raw_data + pcm_audio = ( + audio.set_frame_rate(SAMPLE_RATE) + .set_channels(CHANNELS) + .set_sample_width(2) + .raw_data + ) return pcm_audio @@ -88,7 +97,12 @@ def callback(self, outdata, frames, time, status): # noqa # fill the rest of the frames with zeros if there is no more data if len(data) < frames: - data = np.concatenate((data, np.zeros(frames - len(data), dtype=np.int16))) + data = np.concatenate( + ( + data, + np.zeros(frames - len(data), dtype=np.int16), + ) + ) outdata[:] = data.reshape(-1, 1) @@ -151,14 +165,23 @@ async def send_audio_worker_sounddevice( if not sent_audio and start_send: await start_send() await connection.send( - {"type": "input_audio_buffer.append", "audio": base64.b64encode(data).decode("utf-8")} + { + "type": "input_audio_buffer.append", + "audio": base64.b64encode(data).decode( + "utf-8" + ), + } ) sent_audio = True elif sent_audio: print("Done, triggering inference") - await connection.send({"type": "input_audio_buffer.commit"}) - await connection.send({"type": "response.create", "response": {}}) + await connection.send( + {"type": "input_audio_buffer.commit"} + ) + await connection.send( + {"type": "response.create", "response": {}} + ) sent_audio = False await asyncio.sleep(0) @@ -169,6 +192,7 @@ async def send_audio_worker_sounddevice( stream.stop() stream.close() + class RealtimeApp: """ A console-based application to handle real-time audio recording and streaming, @@ -193,15 +217,21 @@ def __init__(self, system_prompt: str = None) -> None: async def initialize_text_prompt(self, text: str) -> None: """Initialize and send a text prompt to the OpenAI Realtime API.""" try: - async with self.client.beta.realtime.connect(model="gpt-4o-realtime-preview-2024-10-01") as conn: + async with self.client.beta.realtime.connect( + model="gpt-4o-realtime-preview-2024-10-01" + ) as conn: self.connection = conn - await conn.session.update(session={"modalities": ["text"]}) + await conn.session.update( + session={"modalities": ["text"]} + ) await conn.conversation.item.create( item={ "type": "message", "role": "system", - "content": [{"type": "input_text", "text": text}], + "content": [ + {"type": "input_text", "text": text} + ], } ) await conn.response.create() @@ -221,12 +251,16 @@ async def initialize_text_prompt(self, text: str) -> None: async def handle_realtime_connection(self) -> None: """Handle the connection to the OpenAI Realtime API.""" try: - async with self.client.beta.realtime.connect(model="gpt-4o-realtime-preview-2024-10-01") as conn: + async with self.client.beta.realtime.connect( + model="gpt-4o-realtime-preview-2024-10-01" + ) as conn: self.connection = conn self.connected.set() logger.info("Connected to OpenAI Realtime API.") - await conn.session.update(session={"turn_detection": {"type": "server_vad"}}) + await conn.session.update( + session={"turn_detection": {"type": "server_vad"}} + ) acc_items: dict[str, Any] = {} @@ -234,7 +268,9 @@ async def handle_realtime_connection(self) -> None: if event.type == "session.created": self.session = event.session assert event.session.id is not None - logger.info(f"Session created with ID: {event.session.id}") + logger.info( + f"Session created with ID: {event.session.id}" + ) continue if event.type == "session.updated": @@ -251,15 +287,22 @@ async def handle_realtime_connection(self) -> None: self.audio_player.add_data(bytes_data) continue - if event.type == "response.audio_transcript.delta": + if ( + event.type + == "response.audio_transcript.delta" + ): try: text = acc_items[event.item_id] except KeyError: acc_items[event.item_id] = event.delta else: - acc_items[event.item_id] = text + event.delta + acc_items[event.item_id] = ( + text + event.delta + ) - logger.debug(f"Transcription updated: {acc_items[event.item_id]}") + logger.debug( + f"Transcription updated: {acc_items[event.item_id]}" + ) continue if event.type == "response.text.delta": @@ -273,7 +316,9 @@ async def handle_realtime_connection(self) -> None: if event.type == "response.done": break except Exception as e: - logger.exception(f"Error in realtime connection handler: {e}") + logger.exception( + f"Error in realtime connection handler: {e}" + ) async def _get_connection(self) -> AsyncRealtimeConnection: """Wait for and return the realtime connection.""" @@ -286,7 +331,9 @@ async def send_text_prompt(self, text: str) -> None: try: connection = await self._get_connection() if not self.session: - logger.error("Session is not initialized. Cannot send prompt.") + logger.error( + "Session is not initialized. Cannot send prompt." + ) return logger.info(f"Sending prompt to the model: {text}") @@ -310,7 +357,9 @@ async def send_mic_audio(self) -> None: try: read_size = int(SAMPLE_RATE * 0.02) stream = sd.InputStream( - channels=CHANNELS, samplerate=SAMPLE_RATE, dtype="int16" + channels=CHANNELS, + samplerate=SAMPLE_RATE, + dtype="int16", ) stream.start() @@ -325,13 +374,21 @@ async def send_mic_audio(self) -> None: connection = await self._get_connection() if not sent_audio: - asyncio.create_task(connection.send({"type": "response.cancel"})) + asyncio.create_task( + connection.send({"type": "response.cancel"}) + ) sent_audio = True - await connection.input_audio_buffer.append(audio=base64.b64encode(cast(Any, data)).decode("utf-8")) + await connection.input_audio_buffer.append( + audio=base64.b64encode(cast(Any, data)).decode( + "utf-8" + ) + ) await asyncio.sleep(0) except Exception as e: - logger.exception(f"Error in microphone audio streaming: {e}") + logger.exception( + f"Error in microphone audio streaming: {e}" + ) finally: stream.stop() stream.close() @@ -339,15 +396,21 @@ async def send_mic_audio(self) -> None: async def run(self) -> None: """Start the application tasks.""" logger.info("Starting application tasks.") - + await asyncio.gather( # self.initialize_text_prompt(self.system_prompt), self.handle_realtime_connection(), - self.send_mic_audio() + self.send_mic_audio(), ) + if __name__ == "__main__": - logger.add("realtime_app.log", rotation="10 MB", retention="10 days", level="DEBUG") + logger.add( + "realtime_app.log", + rotation="10 MB", + retention="10 days", + level="DEBUG", + ) logger.info("Starting RealtimeApp.") app = RealtimeApp() asyncio.run(app.run()) diff --git a/swarms/structs/__init__.py b/swarms/structs/__init__.py index e4f6d91d4..f4436988f 100644 --- a/swarms/structs/__init__.py +++ b/swarms/structs/__init__.py @@ -80,6 +80,7 @@ parse_tasks, ) from swarms.structs.async_workflow import AsyncWorkflow +from swarms.structs.multi_agent_orchestrator import MultiAgentRouter __all__ = [ "Agent", @@ -154,4 +155,5 @@ "ChatTurn", "AgentResponse", "expertise_based", + "MultiAgentRouter", ] diff --git a/swarms/structs/multi_agent_orchestrator.py b/swarms/structs/multi_agent_orchestrator.py new file mode 100644 index 000000000..9cf0b3067 --- /dev/null +++ b/swarms/structs/multi_agent_orchestrator.py @@ -0,0 +1,401 @@ +""" +Todo: + +- Add multi-agent selection for a task and then run them automatically +- Add shared memory for large instances of agents + + + +""" + +import os +import subprocess +import uuid +from datetime import UTC, datetime +from typing import List, Literal, Optional + +from loguru import logger +from pydantic import BaseModel, Field +from tenacity import retry, stop_after_attempt, wait_exponential + +from swarms.structs.agent import Agent + + +class AgentResponse(BaseModel): + """Response from the boss agent indicating which agent should handle the task""" + + selected_agent: str = Field( + description="Name of the agent selected to handle the task" + ) + reasoning: str = Field( + description="Explanation for why this agent was selected" + ) + modified_task: Optional[str] = Field( + None, description="Optional modified version of the task" + ) + + +class OpenAIFunctionCaller: + """ + A class to interact with the OpenAI API for generating text based on a system prompt and a task. + """ + + def __init__( + self, + system_prompt: str, + api_key: str, + temperature: float, + max_tokens: int = 4000, + model_name: str = "gpt-4-0125-preview", + ): + self.system_prompt = system_prompt + self.api_key = api_key + self.temperature = temperature + self.max_tokens = max_tokens + self.model_name = model_name + + try: + from openai import OpenAI + except ImportError: + logger.error( + "OpenAI library not found. Please install it using 'pip install openai'" + ) + subprocess.run(["pip", "install", "openai"]) + raise + + try: + self.client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) + except Exception as e: + logger.error( + f"Error initializing OpenAI client: {str(e)}" + ) + raise + + @retry( + stop=stop_after_attempt(3), + wait=wait_exponential(multiplier=1, min=4, max=10), + ) + def get_completion(self, task: str) -> AgentResponse: + """Get completion from OpenAI with retries""" + try: + response = self.client.chat.completions.create( + model=self.model_name, + messages=[ + {"role": "system", "content": self.system_prompt}, + {"role": "user", "content": task}, + ], + response_format={"type": "json_object"}, + temperature=self.temperature, + max_tokens=self.max_tokens, + ) + + return AgentResponse.model_validate_json( + response.choices[0].message.content + ) + except Exception as e: + logger.error(f"Error getting completion: {str(e)}") + raise + + def get_agent_response( + self, system_prompt: str, task: str + ) -> str: + """Get agent response without function calling""" + try: + response = self.client.chat.completions.create( + model=self.model_name, + messages=[ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": task}, + ], + temperature=self.temperature, + max_tokens=self.max_tokens, + ) + + return response.choices[0].message.content + except Exception as e: + logger.error(f"Error getting agent response: {str(e)}") + raise + + +class MultiAgentRouter: + """ + Routes tasks to appropriate agents based on their capabilities. + + This class is responsible for managing a pool of agents and routing incoming tasks to the most suitable agent. It uses a boss agent to analyze the task and select the best agent for the job. The boss agent's decision is based on the capabilities and descriptions of the available agents. + + Attributes: + name (str): The name of the router. + description (str): A description of the router's purpose. + agents (dict): A dictionary of agents, where the key is the agent's name and the value is the agent object. + api_key (str): The API key for OpenAI. + output_type (str): The type of output expected from the agents. Can be either "json" or "string". + execute_task (bool): A flag indicating whether the task should be executed by the selected agent. + boss_system_prompt (str): A system prompt for the boss agent that includes information about all available agents. + function_caller (OpenAIFunctionCaller): An instance of OpenAIFunctionCaller for calling the boss agent. + """ + + def __init__( + self, + name: str = "swarm-router", + description: str = "Routes tasks to specialized agents based on their capabilities", + agents: List[Agent] = [], + model: str = "gpt-4o-mini", + temperature: float = 0.1, + shared_memory_system: callable = None, + output_type: Literal["json", "string"] = "json", + execute_task: bool = True, + ): + """ + Initializes the MultiAgentRouter with a list of agents and configuration options. + + Args: + name (str, optional): The name of the router. Defaults to "swarm-router". + description (str, optional): A description of the router's purpose. Defaults to "Routes tasks to specialized agents based on their capabilities". + agents (List[Agent], optional): A list of agents to be managed by the router. Defaults to an empty list. + model (str, optional): The model to use for the boss agent. Defaults to "gpt-4-0125-preview". + temperature (float, optional): The temperature for the boss agent's model. Defaults to 0.1. + output_type (Literal["json", "string"], optional): The type of output expected from the agents. Defaults to "json". + execute_task (bool, optional): A flag indicating whether the task should be executed by the selected agent. Defaults to True. + """ + self.name = name + self.description = description + self.shared_memory_system = shared_memory_system + self.agents = {agent.name: agent for agent in agents} + self.api_key = os.getenv("OPENAI_API_KEY") + if not self.api_key: + raise ValueError("OpenAI API key must be provided") + + self.output_type = output_type + self.execute_task = execute_task + self.boss_system_prompt = self._create_boss_system_prompt() + + # Initialize the function caller + self.function_caller = OpenAIFunctionCaller( + system_prompt=self.boss_system_prompt, + api_key=self.api_key, + temperature=temperature, + ) + + def __repr__(self): + return f"MultiAgentRouter(name={self.name}, agents={list(self.agents.keys())})" + + def query_ragent(self, task: str) -> str: + """Query the ResearchAgent""" + return self.shared_memory_system.query(task) + + def _create_boss_system_prompt(self) -> str: + """ + Creates a system prompt for the boss agent that includes information about all available agents. + + Returns: + str: The system prompt for the boss agent. + """ + agent_descriptions = "\n".join( + [ + f"- {name}: {agent.description}" + for name, agent in self.agents.items() + ] + ) + + return f"""You are a boss agent responsible for routing tasks to the most appropriate specialized agent. + Available agents: + {agent_descriptions} + + Your job is to: + 1. Analyze the incoming task + 2. Select the most appropriate agent based on their descriptions + 3. Provide clear reasoning for your selection + 4. Optionally modify the task to better suit the selected agent's capabilities + + You must respond with JSON that contains: + - selected_agent: Name of the chosen agent (must be one of the available agents) + - reasoning: Brief explanation of why this agent was selected + - modified_task: (Optional) A modified version of the task if needed + + Always select exactly one agent that best matches the task requirements. + """ + + def find_agent_in_list(self, agent_name: str) -> Optional[Agent]: + """ + Find an agent by name in a list of agents. + + Args: + agent_name (str): The name of the agent to find. + + Returns: + Optional[Agent]: The agent object if found, otherwise None. + """ + for agent in self.agent_list: + if agent.name == agent_name: + return agent + return None + + def route_task(self, task: str) -> dict: + """ + Routes a task to the appropriate agent and returns their response. + + Args: + task (str): The task to be routed. + + Returns: + dict: A dictionary containing the routing result, including the selected agent, reasoning, and response. + """ + try: + start_time = datetime.now(UTC) + + # Get boss decision using function calling + boss_response = self.function_caller.get_completion(task) + + # Validate that the selected agent exists + if boss_response.selected_agent not in self.agents: + raise ValueError( + f"Boss selected unknown agent: {boss_response.selected_agent}" + ) + + # Get the selected agent + selected_agent = self.agents[boss_response.selected_agent] + + # Use the modified task if provided, otherwise use original task + final_task = boss_response.modified_task or task + + # Execute the task with the selected agent if enabled + execution_start = datetime.now(UTC) + agent_response = None + execution_time = 0 + + if self.execute_task: + # Use the agent's run method directly + agent_response = selected_agent.run(final_task) + execution_time = ( + datetime.now(UTC) - execution_start + ).total_seconds() + else: + logger.info( + "Task execution skipped (execute_task=False)" + ) + + total_time = ( + datetime.now(UTC) - start_time + ).total_seconds() + + result = { + "id": str(uuid.uuid4()), + "timestamp": datetime.now(UTC).isoformat(), + "task": { + "original": task, + "modified": ( + final_task + if boss_response.modified_task + else None + ), + }, + "boss_decision": { + "selected_agent": boss_response.selected_agent, + "reasoning": boss_response.reasoning, + }, + "execution": { + "agent_name": selected_agent.name, + "agent_id": selected_agent.id, + "was_executed": self.execute_task, + "response": ( + agent_response if self.execute_task else None + ), + "execution_time": ( + execution_time if self.execute_task else None + ), + }, + "total_time": total_time, + } + + logger.info( + f"Successfully routed task to {selected_agent.name}" + ) + return result + + except Exception as e: + logger.error(f"Error routing task: {str(e)}") + raise + + def batch_route(self, tasks: List[str] = []): + """Batch route tasks to the appropriate agents""" + results = [] + for task in tasks: + try: + result = self.route_task(task) + results.append(result) + except Exception as e: + logger.error(f"Error routing task: {str(e)}") + return results + + def concurrent_batch_route(self, tasks: List[str] = []): + """Concurrently route tasks to the appropriate agents""" + import concurrent.futures + from concurrent.futures import ThreadPoolExecutor + + results = [] + with ThreadPoolExecutor() as executor: + futures = [ + executor.submit(self.route_task, task) + for task in tasks + ] + for future in concurrent.futures.as_completed(futures): + try: + result = future.result() + results.append(result) + except Exception as e: + logger.error(f"Error routing task: {str(e)}") + return results + + +# # Example usage: +# if __name__ == "__main__": +# # Define some example agents +# agents = [ +# Agent( +# agent_name="ResearchAgent", +# description="Specializes in researching topics and providing detailed, factual information", +# system_prompt="You are a research specialist. Provide detailed, well-researched information about any topic, citing sources when possible.", +# model_name="openai/gpt-4o", +# ), +# Agent( +# agent_name="CodeExpertAgent", +# description="Expert in writing, reviewing, and explaining code across multiple programming languages", +# system_prompt="You are a coding expert. Write, review, and explain code with a focus on best practices and clean code principles.", +# model_name="openai/gpt-4o", +# ), +# Agent( +# agent_name="WritingAgent", +# description="Skilled in creative and technical writing, content creation, and editing", +# system_prompt="You are a writing specialist. Create, edit, and improve written content while maintaining appropriate tone and style.", +# model_name="openai/gpt-4o", +# ), +# ] + +# # Initialize routers with different configurations +# router_execute = MultiAgentRouter(agents=agents, execute_task=True) +# # router_no_execute = MultiAgentRouter(agents=agents, execute_task=False) + +# # Example task +# task = "Write a Python function to calculate fibonacci numbers" + +# try: +# # Process the task with execution +# print("\nWith task execution:") +# result_execute = router_execute.route_task(task) +# print( +# f"Selected Agent: {result_execute['boss_decision']['selected_agent']}" +# ) +# print( +# f"Reasoning: {result_execute['boss_decision']['reasoning']}" +# ) +# if result_execute["execution"]["response"]: +# print( +# f"Response Preview: {result_execute['execution']['response'][:200]}..." +# ) +# print( +# f"Execution Time: {result_execute['execution']['execution_time']:.2f}s" +# ) +# print(f"Total Time: {result_execute['total_time']:.2f}s") + +# except Exception as e: +# print(f"Error occurred: {str(e)}") diff --git a/swarms/structs/swarm_builder.py b/swarms/structs/swarm_builder.py index 8c1c1da8e..eb11474df 100644 --- a/swarms/structs/swarm_builder.py +++ b/swarms/structs/swarm_builder.py @@ -501,16 +501,16 @@ def swarm_router( raise -swarm = AutoSwarmBuilder( - name="ChipDesign-Swarm", - description="A swarm of specialized AI agents for chip design", - swarm_type="ConcurrentWorkflow", -) - -try: - result = swarm.run( - "Design a new AI accelerator chip optimized for transformer model inference..." - ) - print(result) -except Exception as e: - print(f"An error occurred: {e}") +# swarm = AutoSwarmBuilder( +# name="ChipDesign-Swarm", +# description="A swarm of specialized AI agents for chip design", +# swarm_type="ConcurrentWorkflow", +# ) + +# try: +# result = swarm.run( +# "Design a new AI accelerator chip optimized for transformer model inference..." +# ) +# print(result) +# except Exception as e: +# print(f"An error occurred: {e}") diff --git a/swarms/structs/swarm_output_type.py b/swarms/structs/swarm_output_type.py new file mode 100644 index 000000000..f2a85732c --- /dev/null +++ b/swarms/structs/swarm_output_type.py @@ -0,0 +1,23 @@ +import time +from typing import List +import uuid +from pydantic import BaseModel, Field + + +class AgentResponde(BaseModel): + id: str = Field(default=uuid.uuid4().hex) + timestamp: str = Field(default=time.time()) + agent_position: int = Field(description="Agent in swarm position") + agent_name: str + agent_response: str = Field(description="Agent response") + + +class SwarmOutput(BaseModel): + id: str = Field(default=uuid.uuid4().hex) + timestamp: str = Field(default=time.time()) + name: str = Field(description="Swarm name") + description: str = Field(description="Swarm description") + swarm_type: str = Field(description="Swarm type") + agent_outputs: List[AgentResponde] = Field( + description="List of agent responses" + ) diff --git a/tests/structs/test_multi_agent_orchestrator.py b/tests/structs/test_multi_agent_orchestrator.py new file mode 100644 index 000000000..5c687c64f --- /dev/null +++ b/tests/structs/test_multi_agent_orchestrator.py @@ -0,0 +1,219 @@ +import os +from swarms.structs.agent import Agent +from swarms.structs.multi_agent_orchestrator import MultiAgentRouter + + +def create_test_agent(name: str) -> Agent: + """Helper function to create a test agent""" + return Agent( + agent_name=name, + description=f"Test {name}", + system_prompt=f"You are a {name}", + model_name="openai/gpt-4o", + ) + + +def test_boss_router_initialization(): + """Test MultiAgentRouter initialization""" + print("\nTesting MultiAgentRouter initialization...") + + # Test successful initialization + try: + agents = [ + create_test_agent("TestAgent1"), + create_test_agent("TestAgent2"), + ] + router = MultiAgentRouter(agents=agents) + assert ( + router.name == "swarm-router" + ), "Default name should be 'swarm-router'" + assert len(router.agents) == 2, "Should have 2 agents" + print("✓ Basic initialization successful") + except Exception as e: + print(f"✗ Basic initialization failed: {str(e)}") + + # Test initialization without API key + try: + temp_key = os.getenv("OPENAI_API_KEY") + os.environ["OPENAI_API_KEY"] = "" + success = False + try: + router = MultiAgentRouter(agents=[]) + except ValueError as e: + success = str(e) == "OpenAI API key must be provided" + os.environ["OPENAI_API_KEY"] = temp_key + assert ( + success + ), "Should raise ValueError when API key is missing" + print("✓ API key validation successful") + except Exception as e: + print(f"✗ API key validation failed: {str(e)}") + + +def test_boss_system_prompt(): + """Test system prompt generation""" + print("\nTesting system prompt generation...") + + try: + agents = [ + create_test_agent("Agent1"), + create_test_agent("Agent2"), + ] + router = MultiAgentRouter(agents=agents) + prompt = router._create_boss_system_prompt() + + # Check if prompt contains agent information + assert ( + "Agent1" in prompt + ), "Prompt should contain first agent name" + assert ( + "Agent2" in prompt + ), "Prompt should contain second agent name" + assert ( + "You are a boss agent" in prompt + ), "Prompt should contain boss agent description" + print("✓ System prompt generation successful") + except Exception as e: + print(f"✗ System prompt generation failed: {str(e)}") + + +def test_find_agent_in_list(): + """Test agent finding functionality""" + print("\nTesting agent finding functionality...") + + try: + agent1 = create_test_agent("Agent1") + agent2 = create_test_agent("Agent2") + router = MultiAgentRouter(agents=[agent1, agent2]) + + # Test finding existing agent + assert "Agent1" in router.agents, "Should find existing agent" + assert ( + "NonexistentAgent" not in router.agents + ), "Should not find nonexistent agent" + print("✓ Agent finding successful") + except Exception as e: + print(f"✗ Agent finding failed: {str(e)}") + + +def test_task_routing(): + """Test task routing functionality""" + print("\nTesting task routing...") + + try: + # Create test agents + agents = [ + create_test_agent("CodeAgent"), + create_test_agent("WritingAgent"), + ] + router = MultiAgentRouter(agents=agents) + + # Test routing a coding task + result = router.route_task( + "Write a Python function to sort a list" + ) + assert result["boss_decision"]["selected_agent"] in [ + "CodeAgent", + "WritingAgent", + ], "Should select an appropriate agent" + assert ( + "execution" in result + ), "Result should contain execution details" + assert ( + "total_time" in result + ), "Result should contain timing information" + print("✓ Task routing successful") + except Exception as e: + print(f"✗ Task routing failed: {str(e)}") + + +def test_batch_routing(): + """Test batch routing functionality""" + print("\nTesting batch routing...") + + try: + agents = [create_test_agent("TestAgent")] + router = MultiAgentRouter(agents=agents) + + tasks = ["Task 1", "Task 2", "Task 3"] + + # Test sequential batch routing + results = router.batch_route(tasks) + assert len(results) == len( + tasks + ), "Should return result for each task" + print("✓ Sequential batch routing successful") + + # Test concurrent batch routing + concurrent_results = router.concurrent_batch_route(tasks) + assert len(concurrent_results) == len( + tasks + ), "Should return result for each task" + print("✓ Concurrent batch routing successful") + except Exception as e: + print(f"✗ Batch routing failed: {str(e)}") + + +def test_error_handling(): + """Test error handling in various scenarios""" + print("\nTesting error handling...") + + try: + router = MultiAgentRouter(agents=[]) + + # Test routing with no agents + success = False + try: + router.route_task("Test task") + except Exception: + success = True + assert success, "Should handle routing with no agents" + print("✓ Empty agent list handling successful") + + # Test with invalid task + success = False + router = MultiAgentRouter( + agents=[create_test_agent("TestAgent")] + ) + try: + router.route_task("") + except ValueError: + success = True + assert success, "Should handle empty task" + print("✓ Invalid task handling successful") + except Exception as e: + print(f"✗ Error handling failed: {str(e)}") + + +def run_all_tests(): + """Run all test functions""" + print("Starting MultiAgentRouter tests...") + + test_functions = [ + test_boss_router_initialization, + test_boss_system_prompt, + test_find_agent_in_list, + test_task_routing, + test_batch_routing, + test_error_handling, + ] + + total_tests = len(test_functions) + passed_tests = 0 + + for test_func in test_functions: + try: + test_func() + passed_tests += 1 + except Exception as e: + print( + f"Test {test_func.__name__} failed with error: {str(e)}" + ) + + print( + f"\nTest Results: {passed_tests}/{total_tests} tests passed" + ) + + +if __name__ == "__main__": + run_all_tests() diff --git a/unique_swarms_examples.py b/unique_swarms_examples.py index 7c8f73d5a..7f577e0be 100644 --- a/unique_swarms_examples.py +++ b/unique_swarms_examples.py @@ -25,266 +25,282 @@ def create_finance_agents() -> List[Agent]: Agent( agent_name="MarketAnalyst", system_prompt="You are a market analysis expert. Analyze market trends and provide insights.", - model_name="gpt-4o-mini" + model_name="gpt-4o-mini", ), Agent( agent_name="RiskManager", system_prompt="You are a risk management specialist. Evaluate risks and provide mitigation strategies.", - model_name="gpt-4o-mini" + model_name="gpt-4o-mini", ), Agent( agent_name="PortfolioManager", system_prompt="You are a portfolio management expert. Optimize investment portfolios and asset allocation.", - model_name="gpt-4o-mini" + model_name="gpt-4o-mini", ), Agent( agent_name="ComplianceOfficer", system_prompt="You are a financial compliance expert. Ensure regulatory compliance and identify issues.", - model_name="gpt-4o-mini" - ) + model_name="gpt-4o-mini", + ), ] + def create_healthcare_agents() -> List[Agent]: """Create specialized healthcare agents""" return [ Agent( agent_name="Diagnostician", system_prompt="You are a medical diagnostician. Analyze symptoms and suggest potential diagnoses.", - model_name="gpt-4o-mini" + model_name="gpt-4o-mini", ), Agent( agent_name="Treatment_Planner", system_prompt="You are a treatment planning specialist. Develop comprehensive treatment plans.", - model_name="gpt-4o-mini" + model_name="gpt-4o-mini", ), Agent( agent_name="MedicalResearcher", system_prompt="You are a medical researcher. Analyze latest research and provide evidence-based recommendations.", - model_name="gpt-4o-mini" + model_name="gpt-4o-mini", ), Agent( agent_name="PatientCareCoordinator", system_prompt="You are a patient care coordinator. Manage patient care workflow and coordination.", - model_name="gpt-4o-mini" - ) + model_name="gpt-4o-mini", + ), ] + def print_separator(): - print("\n" + "="*50 + "\n") + print("\n" + "=" * 50 + "\n") + def run_finance_circular_swarm(): """Investment analysis workflow using circular swarm""" print_separator() print("FINANCE - INVESTMENT ANALYSIS (Circular Swarm)") - + agents = create_finance_agents() tasks = [ "Analyze Tesla stock performance for Q4 2024", "Assess market risks and potential hedging strategies", - "Recommend portfolio adjustments based on analysis" + "Recommend portfolio adjustments based on analysis", ] - + print("\nTasks:") for i, task in enumerate(tasks, 1): print(f"{i}. {task}") - + result = circular_swarm(agents, tasks) print("\nResults:") - for log in result['history']: + for log in result["history"]: print(f"\n{log['agent_name']}:") print(f"Task: {log['task']}") print(f"Response: {log['response']}") + def run_healthcare_grid_swarm(): """Patient diagnosis and treatment planning using grid swarm""" print_separator() print("HEALTHCARE - PATIENT DIAGNOSIS (Grid Swarm)") - + agents = create_healthcare_agents() tasks = [ "Review patient symptoms: fever, fatigue, joint pain", "Research latest treatment protocols", "Develop preliminary treatment plan", - "Coordinate with specialists" + "Coordinate with specialists", ] - + print("\nTasks:") for i, task in enumerate(tasks, 1): print(f"{i}. {task}") - + result = grid_swarm(agents, tasks) print("\nGrid swarm processing completed") print(result) + def run_finance_linear_swarm(): """Loan approval process using linear swarm""" print_separator() print("FINANCE - LOAN APPROVAL PROCESS (Linear Swarm)") - + agents = create_finance_agents()[:3] tasks = [ "Review loan application and credit history", "Assess risk factors and compliance requirements", - "Generate final loan recommendation" + "Generate final loan recommendation", ] - + print("\nTasks:") for i, task in enumerate(tasks, 1): print(f"{i}. {task}") - + result = linear_swarm(agents, tasks) print("\nResults:") - for log in result['history']: + for log in result["history"]: print(f"\n{log['agent_name']}:") print(f"Task: {log['task']}") print(f"Response: {log['response']}") + def run_healthcare_star_swarm(): """Complex medical case management using star swarm""" print_separator() print("HEALTHCARE - COMPLEX CASE MANAGEMENT (Star Swarm)") - + agents = create_healthcare_agents() tasks = [ "Complex case: Patient with multiple chronic conditions", - "Develop integrated care plan" + "Develop integrated care plan", ] - + print("\nTasks:") for i, task in enumerate(tasks, 1): print(f"{i}. {task}") - + result = star_swarm(agents, tasks) print("\nResults:") - for log in result['history']: + for log in result["history"]: print(f"\n{log['agent_name']}:") print(f"Task: {log['task']}") print(f"Response: {log['response']}") + def run_finance_mesh_swarm(): """Market risk assessment using mesh swarm""" print_separator() print("FINANCE - MARKET RISK ASSESSMENT (Mesh Swarm)") - + agents = create_finance_agents() tasks = [ "Analyze global market conditions", "Assess currency exchange risks", "Evaluate sector-specific risks", - "Review portfolio exposure" + "Review portfolio exposure", ] - + print("\nTasks:") for i, task in enumerate(tasks, 1): print(f"{i}. {task}") - + result = mesh_swarm(agents, tasks) print("\nResults:") - for log in result['history']: + for log in result["history"]: print(f"\n{log['agent_name']}:") print(f"Task: {log['task']}") print(f"Response: {log['response']}") + def run_mathematical_finance_swarms(): """Complex financial analysis using mathematical swarms""" print_separator() print("FINANCE - MARKET PATTERN ANALYSIS") - + agents = create_finance_agents() tasks = [ "Analyze historical market patterns", "Predict market trends using technical analysis", - "Identify potential arbitrage opportunities" + "Identify potential arbitrage opportunities", ] - + print("\nTasks:") for i, task in enumerate(tasks, 1): print(f"{i}. {task}") - + print("\nFibonacci Swarm Results:") result = fibonacci_swarm(agents, tasks.copy()) print(result) - + print("\nPrime Swarm Results:") result = prime_swarm(agents, tasks.copy()) print(result) - + print("\nExponential Swarm Results:") result = exponential_swarm(agents, tasks.copy()) print(result) + def run_healthcare_pattern_swarms(): """Patient monitoring using pattern swarms""" print_separator() print("HEALTHCARE - PATIENT MONITORING PATTERNS") - + agents = create_healthcare_agents() task = "Monitor and analyze patient vital signs: BP, heart rate, temperature, O2 saturation" - + print(f"\nTask: {task}") - + print("\nStaircase Pattern Analysis:") result = staircase_swarm(agents, task) print(result) - + print("\nSigmoid Pattern Analysis:") result = sigmoid_swarm(agents, task) print(result) - + print("\nSinusoidal Pattern Analysis:") result = sinusoidal_swarm(agents, task) print(result) + async def run_communication_examples(): """Communication patterns for emergency scenarios""" print_separator() print("EMERGENCY COMMUNICATION PATTERNS") - + # Finance market alert finance_sender = create_finance_agents()[0] finance_receivers = create_finance_agents()[1:] market_alert = "URGENT: Major market volatility detected - immediate risk assessment required" - + print("\nFinance Market Alert:") print(f"Alert: {market_alert}") - result = await broadcast(finance_sender, finance_receivers, market_alert) + result = await broadcast( + finance_sender, finance_receivers, market_alert + ) print("\nBroadcast Results:") - for log in result['history']: + for log in result["history"]: print(f"\n{log['agent_name']}:") print(f"Response: {log['response']}") - + # Healthcare emergency health_sender = create_healthcare_agents()[0] health_receivers = create_healthcare_agents()[1:4] emergency_case = "EMERGENCY: Trauma patient with multiple injuries - immediate consultation required" - + print("\nHealthcare Emergency:") print(f"Case: {emergency_case}") - result = await one_to_three(health_sender, health_receivers, emergency_case) + result = await one_to_three( + health_sender, health_receivers, emergency_case + ) print("\nConsultation Results:") - for log in result['history']: + for log in result["history"]: print(f"\n{log['agent_name']}:") print(f"Response: {log['response']}") + async def run_all_examples(): """Execute all swarm examples""" print("\n=== SWARM ARCHITECTURE EXAMPLES ===\n") - + # Finance examples run_finance_circular_swarm() run_finance_linear_swarm() run_finance_mesh_swarm() run_mathematical_finance_swarms() - + # Healthcare examples run_healthcare_grid_swarm() run_healthcare_star_swarm() run_healthcare_pattern_swarms() - + # Communication examples await run_communication_examples() - + print("\n=== ALL EXAMPLES COMPLETED ===") + if __name__ == "__main__": - asyncio.run(run_all_examples()) \ No newline at end of file + asyncio.run(run_all_examples())