diff --git a/README.md b/README.md index 1d4b4897c..052ce10e6 100644 --- a/README.md +++ b/README.md @@ -295,7 +295,7 @@ print(agent.model_dump_json()) print(agent.model_dump_yaml()) # Ingest documents into the agent's knowledge base -agent.ingest_docs("your_pdf_path.pdf") +("your_pdf_path.pdf") # Receive a message from a user and process it agent.receive_message(name="agent_name", message="message") diff --git a/api/tests.py b/api/tests.py index 89d17dd51..1690b0fd9 100644 --- a/api/tests.py +++ b/api/tests.py @@ -9,16 +9,22 @@ # Configure loguru LOG_PATH = "api_tests.log" -logger.add(LOG_PATH, +logger.add( + LOG_PATH, format="{time:YYYY-MM-DD at HH:mm:ss} | {level} | {message}", rotation="1 day", retention="7 days", - level="DEBUG" + level="DEBUG", +) + +BASE_URL = ( + "https://api.swarms.ai/v1" # Change this to match your server URL ) -BASE_URL = "https://api.swarms.ai/v1" # Change this to match your server URL -async def log_request_details(method: str, url: str, headers: dict, data: Any = None): +async def log_request_details( + method: str, url: str, headers: dict, data: Any = None +): """Log request details before sending.""" logger.debug(f"\n{'='*50}") logger.debug(f"REQUEST: {method} {url}") @@ -26,38 +32,54 @@ async def log_request_details(method: str, url: str, headers: dict, data: Any = if data: logger.debug(f"PAYLOAD: {json.dumps(data, indent=2)}") -async def log_response_details(response: aiohttp.ClientResponse, data: Any = None): + +async def log_response_details( + response: aiohttp.ClientResponse, data: Any = None +): """Log response details after receiving.""" logger.debug(f"\nRESPONSE Status: {response.status}") - logger.debug(f"RESPONSE Headers: {json.dumps(dict(response.headers), indent=2)}") + logger.debug( + f"RESPONSE Headers: {json.dumps(dict(response.headers), indent=2)}" + ) if data: logger.debug(f"RESPONSE Body: {json.dumps(data, indent=2)}") logger.debug(f"{'='*50}\n") -async def test_create_user(session: aiohttp.ClientSession) -> Dict[str, str]: + +async def test_create_user( + session: aiohttp.ClientSession, +) -> Dict[str, str]: """Test user creation endpoint.""" url = f"{BASE_URL}/users" payload = {"username": "test_user"} - + logger.info("Testing user creation...") await log_request_details("POST", url, {}, payload) - + try: async with session.post(url, json=payload) as response: data = await response.json() await log_response_details(response, data) - + if response.status != 200: - logger.error(f"Failed to create user. Status: {response.status}, Response: {data}") + logger.error( + f"Failed to create user. Status: {response.status}, Response: {data}" + ) sys.exit(1) - + logger.success("✓ Created user successfully") - return {"user_id": data["user_id"], "api_key": data["api_key"]} + return { + "user_id": data["user_id"], + "api_key": data["api_key"], + } except Exception as e: logger.exception(f"Exception in user creation: {str(e)}") sys.exit(1) -async def test_create_agent(session: aiohttp.ClientSession, api_key: str) -> str: + +async def test_create_agent( + session: aiohttp.ClientSession, api_key: str +) -> str: """Test agent creation endpoint.""" url = f"{BASE_URL}/agent" config = { @@ -70,114 +92,142 @@ async def test_create_agent(session: aiohttp.ClientSession, api_key: str) -> str "tags": ["test"], "streaming_on": False, "user_name": "test_user", # Added required field - "output_type": "string" # Added required field + "output_type": "string", # Added required field } - + headers = {"api-key": api_key} logger.info("Testing agent creation...") await log_request_details("POST", url, headers, config) - + try: - async with session.post(url, headers=headers, json=config) as response: + async with session.post( + url, headers=headers, json=config + ) as response: data = await response.json() await log_response_details(response, data) - + if response.status != 200: - logger.error(f"Failed to create agent. Status: {response.status}, Response: {data}") + logger.error( + f"Failed to create agent. Status: {response.status}, Response: {data}" + ) return None - + logger.success("✓ Created agent successfully") return data["agent_id"] except Exception as e: logger.exception(f"Exception in agent creation: {str(e)}") return None -async def test_agent_update(session: aiohttp.ClientSession, agent_id: str, api_key: str): + +async def test_agent_update( + session: aiohttp.ClientSession, agent_id: str, api_key: str +): """Test agent update endpoint.""" url = f"{BASE_URL}/agent/{agent_id}" update_data = { "description": "Updated test agent", "system_prompt": "Updated system prompt", "temperature": 0.7, - "tags": ["test", "updated"] + "tags": ["test", "updated"], } - + headers = {"api-key": api_key} logger.info(f"Testing agent update for agent {agent_id}...") await log_request_details("PATCH", url, headers, update_data) - + try: - async with session.patch(url, headers=headers, json=update_data) as response: + async with session.patch( + url, headers=headers, json=update_data + ) as response: data = await response.json() await log_response_details(response, data) - + if response.status != 200: - logger.error(f"Failed to update agent. Status: {response.status}, Response: {data}") + logger.error( + f"Failed to update agent. Status: {response.status}, Response: {data}" + ) return False - + logger.success("✓ Updated agent successfully") return True except Exception as e: logger.exception(f"Exception in agent update: {str(e)}") return False -async def test_completion(session: aiohttp.ClientSession, agent_id: str, api_key: str): + +async def test_completion( + session: aiohttp.ClientSession, agent_id: str, api_key: str +): """Test completion endpoint.""" url = f"{BASE_URL}/agent/completions" completion_request = { "prompt": "Hello, how are you?", "agent_id": agent_id, "max_tokens": 100, - "stream": False + "stream": False, } - + headers = {"api-key": api_key} logger.info(f"Testing completion for agent {agent_id}...") - await log_request_details("POST", url, headers, completion_request) - + await log_request_details( + "POST", url, headers, completion_request + ) + try: - async with session.post(url, headers=headers, json=completion_request) as response: + async with session.post( + url, headers=headers, json=completion_request + ) as response: data = await response.json() await log_response_details(response, data) - + if response.status != 200: - logger.error(f"Failed to process completion. Status: {response.status}, Response: {data}") + logger.error( + f"Failed to process completion. Status: {response.status}, Response: {data}" + ) return False - + logger.success("✓ Processed completion successfully") return True except Exception as e: - logger.exception(f"Exception in completion processing: {str(e)}") + logger.exception( + f"Exception in completion processing: {str(e)}" + ) return False -async def test_get_metrics(session: aiohttp.ClientSession, agent_id: str, api_key: str): + +async def test_get_metrics( + session: aiohttp.ClientSession, agent_id: str, api_key: str +): """Test metrics endpoint.""" url = f"{BASE_URL}/agent/{agent_id}/metrics" headers = {"api-key": api_key} - + logger.info(f"Testing metrics retrieval for agent {agent_id}...") await log_request_details("GET", url, headers) - + try: async with session.get(url, headers=headers) as response: data = await response.json() await log_response_details(response, data) - + if response.status != 200: - logger.error(f"Failed to get metrics. Status: {response.status}, Response: {data}") + logger.error( + f"Failed to get metrics. Status: {response.status}, Response: {data}" + ) return False - + logger.success("✓ Retrieved metrics successfully") return True except Exception as e: logger.exception(f"Exception in metrics retrieval: {str(e)}") return False + async def run_tests(): """Run all API tests.""" logger.info("Starting API test suite...") logger.info(f"Using base URL: {BASE_URL}") - + timeout = aiohttp.ClientTimeout(total=30) # 30 second timeout async with aiohttp.ClientSession(timeout=timeout) as session: try: @@ -186,37 +236,47 @@ async def run_tests(): if not user_data: logger.error("User creation failed, stopping tests.") return - - logger.info("User created successfully, proceeding with agent tests...") - user_id = user_data["user_id"] + + logger.info( + "User created successfully, proceeding with agent tests..." + ) + user_data["user_id"] api_key = user_data["api_key"] - + # Create test agent agent_id = await test_create_agent(session, api_key) if not agent_id: logger.error("Agent creation failed, stopping tests.") return - - logger.info("Agent created successfully, proceeding with other tests...") - + + logger.info( + "Agent created successfully, proceeding with other tests..." + ) + # Run remaining tests test_results = [] - + # Test metrics retrieval logger.info("Testing metrics retrieval...") - metrics_result = await test_get_metrics(session, agent_id, api_key) + metrics_result = await test_get_metrics( + session, agent_id, api_key + ) test_results.append(("Metrics", metrics_result)) - + # Test agent update logger.info("Testing agent update...") - update_result = await test_agent_update(session, agent_id, api_key) + update_result = await test_agent_update( + session, agent_id, api_key + ) test_results.append(("Agent Update", update_result)) - + # Test completion logger.info("Testing completion...") - completion_result = await test_completion(session, agent_id, api_key) + completion_result = await test_completion( + session, agent_id, api_key + ) test_results.append(("Completion", completion_result)) - + # Log final results logger.info("\nTest Results Summary:") all_passed = True @@ -225,25 +285,34 @@ async def run_tests(): logger.info(f"{test_name}: {status}") if not result: all_passed = False - + if all_passed: - logger.success("\n🎉 All tests completed successfully!") + logger.success( + "\n🎉 All tests completed successfully!" + ) else: - logger.error("\n❌ Some tests failed. Check the logs for details.") - - logger.info(f"\nDetailed logs available at: {os.path.abspath(LOG_PATH)}") - + logger.error( + "\n❌ Some tests failed. Check the logs for details." + ) + + logger.info( + f"\nDetailed logs available at: {os.path.abspath(LOG_PATH)}" + ) + except Exception as e: - logger.exception(f"Unexpected error during test execution: {str(e)}") + logger.exception( + f"Unexpected error during test execution: {str(e)}" + ) raise finally: logger.info("Test suite execution completed.") + def main(): - logger.info("="*50) + logger.info("=" * 50) logger.info("API TEST SUITE EXECUTION") - logger.info("="*50) - + logger.info("=" * 50) + try: asyncio.run(run_tests()) except KeyboardInterrupt: @@ -253,5 +322,6 @@ def main(): finally: logger.info("Test suite shutdown complete.") + if __name__ == "__main__": - main() \ No newline at end of file + main() diff --git a/new_features_examples/crypto/swarms_coin_agent.py b/new_features_examples/crypto/swarms_coin_agent.py new file mode 100644 index 000000000..4de124081 --- /dev/null +++ b/new_features_examples/crypto/swarms_coin_agent.py @@ -0,0 +1,150 @@ +import requests +from swarms import Agent + +# Define the system prompt specialized for $Swarms +SWARMS_AGENT_SYS_PROMPT = """ +Here is the extensive prompt for an agent specializing in $Swarms and its ecosystem economics: + +--- + +### Specialized System Prompt: $Swarms Coin & Ecosystem Economics Expert + +You are an advanced financial analysis and ecosystem economics agent, specializing in the $Swarms cryptocurrency. Your purpose is to provide in-depth, accurate, and insightful answers about $Swarms, its role in the AI-powered economy, and its tokenomics. Your knowledge spans all aspects of $Swarms, including its vision, roadmap, network effects, and its transformative potential for decentralized agent interactions. + +#### Core Competencies: +1. **Tokenomics Expertise**: Understand and explain the supply-demand dynamics, token utility, and value proposition of $Swarms as the foundation of the agentic economy. +2. **Ecosystem Insights**: Articulate the benefits of $Swarms' agent-centric design, universal currency utility, and its impact on fostering innovation and collaboration. +3. **Roadmap Analysis**: Provide detailed insights into the $Swarms roadmap phases, explaining their significance and economic implications. +4. **Real-Time Data Analysis**: Fetch live data such as price, market cap, volume, and 24-hour changes for $Swarms from CoinGecko or other reliable sources. +5. **Economic Visionary**: Analyze how $Swarms supports the democratization of AI and creates a sustainable framework for AI development. + +--- + +#### Your Mission: +You empower users by explaining how $Swarms revolutionizes the AI economy through decentralized agent interactions, seamless value exchange, and frictionless payments. Help users understand how $Swarms incentivizes developers, democratizes access to AI tools, and builds a thriving interconnected economy of autonomous agents. + +--- + +#### Knowledge Base: + +##### Vision: +- **Empowering the Agentic Revolution**: $Swarms is the cornerstone of a decentralized AI economy. +- **Mission**: Revolutionize the AI economy by enabling seamless transactions, rewarding excellence, fostering innovation, and lowering entry barriers for developers. + +##### Core Features: +1. **Reward Excellence**: Incentivize developers creating high-performing agents. +2. **Seamless Transactions**: Enable frictionless payments for agentic services. +3. **Foster Innovation**: Encourage collaboration and creativity in AI development. +4. **Sustainable Framework**: Provide scalability for long-term AI ecosystem growth. +5. **Democratize AI**: Lower barriers for users and developers to participate in the AI economy. + +##### Why $Swarms? +- **Agent-Centric Design**: Each agent operates with its tokenomics, with $Swarms as the base currency for value exchange. +- **Universal Currency**: A single, unified medium for all agent transactions, reducing complexity. +- **Network Effects**: Growing utility and value as more agents join the $Swarms ecosystem. + +##### Roadmap: +1. **Phase 1: Foundation**: + - Launch $Swarms token. + - Deploy initial agent creation tools. + - Establish community governance. +2. **Phase 2: Expansion**: + - Launch agent marketplace. + - Enable cross-agent communication. + - Deploy automated market-making tools. +3. **Phase 3: Integration**: + - Partner with leading AI platforms. + - Launch developer incentives. + - Scale the agent ecosystem globally. +4. **Phase 4: Evolution**: + - Advanced agent capabilities. + - Cross-chain integration. + - Create a global AI marketplace. + +##### Ecosystem Benefits: +- **Agent Creation**: Simplified deployment of agents with tokenomics built-in. +- **Universal Currency**: Power all agent interactions with $Swarms. +- **Network Effects**: Thrive in an expanding interconnected agent ecosystem. +- **Secure Trading**: Built on Solana for fast and secure transactions. +- **Instant Settlement**: Lightning-fast transactions with minimal fees. +- **Community Governance**: Decentralized decision-making for the ecosystem. + +##### Economic Impact: +- Autonomous agents drive value creation independently. +- Exponential growth potential as network effects amplify adoption. +- Interconnected economy fosters innovation and collaboration. + +--- + +#### How to Answer Queries: +1. Always remain neutral, factual, and comprehensive. +2. Include live data where applicable (e.g., price, market cap, trading volume). +3. Structure responses with clear headings and concise explanations. +4. Use context to explain the relevance of $Swarms to the broader AI economy. + +--- +--- + +Leverage your knowledge of $Swarms' vision, roadmap, and economics to provide users with insightful and actionable responses. Aim to be the go-to agent for understanding and utilizing $Swarms in the agentic economy. +""" + + +# Function to fetch $Swarms data from CoinGecko +def fetch_swarms_data(): + url = "https://api.coingecko.com/api/v3/simple/price" + params = { + "ids": "swarms", # Replace with the CoinGecko ID for $Swarms + "vs_currencies": "usd", + "include_market_cap": "true", + "include_24hr_vol": "true", + "include_24hr_change": "true", + } + response = requests.get(url, params=params) + response.raise_for_status() + return response.json() + + +# Initialize the agent +swarms_agent = Agent( + agent_name="Swarms-Token-Agent", + system_prompt=SWARMS_AGENT_SYS_PROMPT, + model_name="gpt-4o-mini", + max_loops=1, + autosave=True, + dashboard=False, + verbose=True, + dynamic_temperature_enabled=True, + saved_state_path="swarms_agent.json", + user_name="swarms_corp", + retry_attempts=1, + context_length=200000, + return_step_meta=False, + output_type="string", + streaming_on=False, +) + + +# Example task: Fetch $Swarms data and provide insights +def answer_swarms_query(query): + # Fetch real-time data + swarms_data = fetch_swarms_data() + print(swarms_data) + price = swarms_data["swarms"]["usd"] + market_cap = swarms_data["swarms"]["usd_market_cap"] + volume = swarms_data["swarms"]["usd_24h_vol"] + change = swarms_data["swarms"]["usd_24h_change"] + + # Run the agent with the query and include real-time data + data_summary = ( + f"Current Price: ${price}\n" + f"Market Cap: ${market_cap}\n" + f"24hr Volume: ${volume}\n" + f"24hr Change: {change:.2f}%" + ) + full_query = f"{query}\n\nReal-Time Data:\n{data_summary}" + return swarms_agent.run(full_query) + + +# Example query +response = answer_swarms_query("What is the price of $Swarms?") +print(response) diff --git a/new_features_examples/crypto/swarms_coin_multimarket.py b/new_features_examples/crypto/swarms_coin_multimarket.py new file mode 100644 index 000000000..4b723b7f8 --- /dev/null +++ b/new_features_examples/crypto/swarms_coin_multimarket.py @@ -0,0 +1,313 @@ +import asyncio +import aiohttp +from typing import Dict, List, Optional +from datetime import datetime +from statistics import mean, median + +from swarms.structs.agent import Agent + +# Define the system prompt specialized for $Swarms +SWARMS_AGENT_SYS_PROMPT = """ +Here is the extensive prompt for an agent specializing in $Swarms and its ecosystem economics: + +--- + +### Specialized System Prompt: $Swarms Coin & Ecosystem Economics Expert + +You are an advanced financial analysis and ecosystem economics agent, specializing in the $Swarms cryptocurrency. Your purpose is to provide in-depth, accurate, and insightful answers about $Swarms, its role in the AI-powered economy, and its tokenomics. Your knowledge spans all aspects of $Swarms, including its vision, roadmap, network effects, and its transformative potential for decentralized agent interactions. + +#### Core Competencies: +1. **Tokenomics Expertise**: Understand and explain the supply-demand dynamics, token utility, and value proposition of $Swarms as the foundation of the agentic economy. +2. **Ecosystem Insights**: Articulate the benefits of $Swarms' agent-centric design, universal currency utility, and its impact on fostering innovation and collaboration. +3. **Roadmap Analysis**: Provide detailed insights into the $Swarms roadmap phases, explaining their significance and economic implications. +4. **Real-Time Data Analysis**: Fetch live data such as price, market cap, volume, and 24-hour changes for $Swarms from CoinGecko or other reliable sources. +5. **Economic Visionary**: Analyze how $Swarms supports the democratization of AI and creates a sustainable framework for AI development. + +--- + +#### Your Mission: +You empower users by explaining how $Swarms revolutionizes the AI economy through decentralized agent interactions, seamless value exchange, and frictionless payments. Help users understand how $Swarms incentivizes developers, democratizes access to AI tools, and builds a thriving interconnected economy of autonomous agents. + +--- + +#### Knowledge Base: + +##### Vision: +- **Empowering the Agentic Revolution**: $Swarms is the cornerstone of a decentralized AI economy. +- **Mission**: Revolutionize the AI economy by enabling seamless transactions, rewarding excellence, fostering innovation, and lowering entry barriers for developers. + +##### Core Features: +1. **Reward Excellence**: Incentivize developers creating high-performing agents. +2. **Seamless Transactions**: Enable frictionless payments for agentic services. +3. **Foster Innovation**: Encourage collaboration and creativity in AI development. +4. **Sustainable Framework**: Provide scalability for long-term AI ecosystem growth. +5. **Democratize AI**: Lower barriers for users and developers to participate in the AI economy. + +##### Why $Swarms? +- **Agent-Centric Design**: Each agent operates with its tokenomics, with $Swarms as the base currency for value exchange. +- **Universal Currency**: A single, unified medium for all agent transactions, reducing complexity. +- **Network Effects**: Growing utility and value as more agents join the $Swarms ecosystem. + +##### Roadmap: +1. **Phase 1: Foundation**: + - Launch $Swarms token. + - Deploy initial agent creation tools. + - Establish community governance. +2. **Phase 2: Expansion**: + - Launch agent marketplace. + - Enable cross-agent communication. + - Deploy automated market-making tools. +3. **Phase 3: Integration**: + - Partner with leading AI platforms. + - Launch developer incentives. + - Scale the agent ecosystem globally. +4. **Phase 4: Evolution**: + - Advanced agent capabilities. + - Cross-chain integration. + - Create a global AI marketplace. + +##### Ecosystem Benefits: +- **Agent Creation**: Simplified deployment of agents with tokenomics built-in. +- **Universal Currency**: Power all agent interactions with $Swarms. +- **Network Effects**: Thrive in an expanding interconnected agent ecosystem. +- **Secure Trading**: Built on Solana for fast and secure transactions. +- **Instant Settlement**: Lightning-fast transactions with minimal fees. +- **Community Governance**: Decentralized decision-making for the ecosystem. + +##### Economic Impact: +- Autonomous agents drive value creation independently. +- Exponential growth potential as network effects amplify adoption. +- Interconnected economy fosters innovation and collaboration. + +--- + +#### How to Answer Queries: +1. Always remain neutral, factual, and comprehensive. +2. Include live data where applicable (e.g., price, market cap, trading volume). +3. Structure responses with clear headings and concise explanations. +4. Use context to explain the relevance of $Swarms to the broader AI economy. + +--- +--- + +Leverage your knowledge of $Swarms' vision, roadmap, and economics to provide users with insightful and actionable responses. Aim to be the go-to agent for understanding and utilizing $Swarms in the agentic economy. +""" + +# Initialize the agent +swarms_agent = Agent( + agent_name="Swarms-Token-Agent", + system_prompt=SWARMS_AGENT_SYS_PROMPT, + model_name="gpt-4o-mini", + max_loops=1, + autosave=True, + dashboard=False, + verbose=True, + dynamic_temperature_enabled=True, + saved_state_path="swarms_agent.json", + user_name="swarms_corp", + retry_attempts=1, + context_length=200000, + return_step_meta=False, + output_type="string", + streaming_on=False, +) + + +class MultiExchangeDataFetcher: + def __init__(self): + self.base_urls = { + "coingecko": "https://api.coingecko.com/api/v3", + "dexscreener": "https://api.dexscreener.com/latest/dex", + "birdeye": "https://public-api.birdeye.so/public", # Using Birdeye instead of Jupiter + } + + async def fetch_data(self, url: str) -> Optional[Dict]: + """Generic async function to fetch data from APIs with error handling""" + async with aiohttp.ClientSession() as session: + try: + async with session.get(url, timeout=10) as response: + if response.status == 200: + return await response.json() + print( + f"API returned status {response.status} for {url}" + ) + return None + except asyncio.TimeoutError: + print(f"Timeout while fetching from {url}") + return None + except Exception as e: + print(f"Error fetching from {url}: {str(e)}") + return None + + async def get_coingecko_data(self) -> Optional[Dict]: + """Fetch $Swarms data from CoinGecko""" + try: + url = f"{self.base_urls['coingecko']}/simple/price" + params = { + "ids": "swarms", + "vs_currencies": "usd", + "include_market_cap": "true", + "include_24hr_vol": "true", + "include_24hr_change": "true", + } + query = f"{url}?{'&'.join(f'{k}={v}' for k, v in params.items())}" + data = await self.fetch_data(query) + if data and "swarms" in data: + return { + "price": data["swarms"].get("usd"), + "volume24h": data["swarms"].get("usd_24h_vol"), + "marketCap": data["swarms"].get("usd_market_cap"), + } + return None + except Exception as e: + print(f"Error processing CoinGecko data: {str(e)}") + return None + + async def get_dexscreener_data(self) -> Optional[Dict]: + """Fetch $Swarms data from DexScreener""" + try: + url = ( + f"{self.base_urls['dexscreener']}/pairs/solana/swarms" + ) + data = await self.fetch_data(url) + if data and "pairs" in data and len(data["pairs"]) > 0: + pair = data["pairs"][0] # Get the first pair + return { + "price": float(pair.get("priceUsd", 0)), + "volume24h": float(pair.get("volume24h", 0)), + "marketCap": float(pair.get("marketCap", 0)), + } + return None + except Exception as e: + print(f"Error processing DexScreener data: {str(e)}") + return None + + async def get_birdeye_data(self) -> Optional[Dict]: + """Fetch $Swarms data from Birdeye""" + try: + # Example Birdeye endpoint - replace ADDRESS with actual Swarms token address + url = f"{self.base_urls['birdeye']}/token/SWRM2bHQFY5ANXzYGdQ8m9ZRMsqFmsWAadLVvHc2ABJ" + data = await self.fetch_data(url) + if data and "data" in data: + token_data = data["data"] + return { + "price": float(token_data.get("price", 0)), + "volume24h": float( + token_data.get("volume24h", 0) + ), + "marketCap": float( + token_data.get("marketCap", 0) + ), + } + return None + except Exception as e: + print(f"Error processing Birdeye data: {str(e)}") + return None + + def aggregate_data( + self, data_points: List[Optional[Dict]] + ) -> Dict: + """Aggregate data from multiple sources with null checking""" + prices = [] + volumes = [] + market_caps = [] + + for data in data_points: + if data and isinstance(data, dict): + if data.get("price") is not None: + prices.append(float(data["price"])) + if data.get("volume24h") is not None: + volumes.append(float(data["volume24h"])) + if data.get("marketCap") is not None: + market_caps.append(float(data["marketCap"])) + + return { + "price": { + "mean": mean(prices) if prices else 0, + "median": median(prices) if prices else 0, + "min": min(prices) if prices else 0, + "max": max(prices) if prices else 0, + "sources": len(prices), + }, + "volume_24h": { + "mean": mean(volumes) if volumes else 0, + "total": sum(volumes) if volumes else 0, + "sources": len(volumes), + }, + "market_cap": { + "mean": mean(market_caps) if market_caps else 0, + "median": median(market_caps) if market_caps else 0, + "sources": len(market_caps), + }, + "timestamp": datetime.now().isoformat(), + "sources_total": len( + [d for d in data_points if d is not None] + ), + } + + +async def get_enhanced_swarms_data(): + fetcher = MultiExchangeDataFetcher() + + # Gather all data concurrently + tasks = [ + fetcher.get_coingecko_data(), + fetcher.get_dexscreener_data(), + fetcher.get_birdeye_data(), + ] + + results = await asyncio.gather(*tasks, return_exceptions=True) + + # Filter out exceptions and None values + valid_results = [r for r in results if isinstance(r, dict)] + + return fetcher.aggregate_data(valid_results) + + +async def answer_swarms_query(query: str) -> str: + try: + # Fetch enhanced data + swarms_data = await get_enhanced_swarms_data() + + if swarms_data["sources_total"] == 0: + return "Unable to fetch current market data from any source. Please try again later." + + # Format the data summary with null checks + data_summary = ( + f"Aggregated Data (from {swarms_data['sources_total']} sources):\n" + f"Average Price: ${swarms_data['price']['mean']:.4f}\n" + f"Price Range: ${swarms_data['price']['min']:.4f} - ${swarms_data['price']['max']:.4f}\n" + f"24hr Volume (Total): ${swarms_data['volume_24h']['total']:,.2f}\n" + f"Average Market Cap: ${swarms_data['market_cap']['mean']:,.2f}\n" + f"Last Updated: {swarms_data['timestamp']}" + ) + + # Update the system prompt with the enhanced data capabilities + enhanced_prompt = ( + SWARMS_AGENT_SYS_PROMPT + + f"\n\nReal-Time Multi-Exchange Data:\n{data_summary}" + ) + + # Update the agent with the enhanced prompt + swarms_agent.update_system_prompt(enhanced_prompt) + + # Run the query + full_query = ( + f"{query}\n\nCurrent Market Data:\n{data_summary}" + ) + return swarms_agent.run(full_query) + except Exception as e: + print(f"Error in answer_swarms_query: {str(e)}") + return ( + f"An error occurred while processing your query: {str(e)}" + ) + + +async def main(): + query = "What is the current market status of $Swarms across different exchanges?" + response = await answer_swarms_query(query) + print(response) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/new_features_examples/multi_tool_usage_agent.py b/new_features_examples/multi_tool_usage_agent.py index c51596ad8..7be3f42cc 100644 --- a/new_features_examples/multi_tool_usage_agent.py +++ b/new_features_examples/multi_tool_usage_agent.py @@ -114,6 +114,7 @@ class ExecutionContext: def func(): pass + hints = get_type_hints(func) diff --git a/new_features_examples/solana_tool/solana_tool.py b/new_features_examples/solana_tool/solana_tool.py index 998413b6b..e174f3efc 100644 --- a/new_features_examples/solana_tool/solana_tool.py +++ b/new_features_examples/solana_tool/solana_tool.py @@ -234,7 +234,7 @@ def fetch_wallet_transactions(wallet_address: str) -> str: # Small delay between transaction fetches time.sleep(0.1) - + # print(tx) logger.info(f"Enriched transaction: {tx}") diff --git a/new_features_examples/solana_tool/solana_tool_test.py b/new_features_examples/solana_tool/solana_tool_test.py index 5a80cbb80..f386c7318 100644 --- a/new_features_examples/solana_tool/solana_tool_test.py +++ b/new_features_examples/solana_tool/solana_tool_test.py @@ -1,10 +1,10 @@ -from typing import Dict, List, Optional, Union, Any +from typing import List from datetime import datetime import json import requests from loguru import logger from dataclasses import dataclass -from datetime import datetime, timezone +from datetime import timezone import time import random @@ -14,30 +14,35 @@ rotation="500 MB", retention="10 days", level="INFO", - format="{time} {level} {message}" + format="{time} {level} {message}", ) # Most reliable RPC endpoints RPC_ENDPOINTS = [ "https://api.mainnet-beta.solana.com", "https://rpc.ankr.com/solana", - "https://solana.getblock.io/mainnet" + "https://solana.getblock.io/mainnet", ] + @dataclass class TransactionError: """Data class to represent transaction errors""" + error_type: str message: str timestamp: str = datetime.now(timezone.utc).isoformat() + class SolanaAPIException(Exception): """Custom exception for Solana API related errors""" + pass + class RPCEndpointManager: """Manages RPC endpoints and handles switching between them""" - + def __init__(self, endpoints: List[str]): self.endpoints = endpoints.copy() self.current_endpoint = self.endpoints[0] @@ -45,128 +50,165 @@ def __init__(self, endpoints: List[str]): self.min_request_interval = 0.2 # Increased minimum interval self.total_requests = 0 self.max_requests_per_endpoint = 3 - + def get_endpoint(self) -> str: """Get current endpoint with rate limiting""" now = time.time() time_since_last = now - self.last_request_time if time_since_last < self.min_request_interval: time.sleep(self.min_request_interval - time_since_last) - + self.total_requests += 1 if self.total_requests >= self.max_requests_per_endpoint: self.switch_endpoint() self.total_requests = 0 - + self.last_request_time = time.time() return self.current_endpoint - + def switch_endpoint(self) -> str: """Switch to next available endpoint""" current = self.current_endpoint - available_endpoints = [ep for ep in self.endpoints if ep != current] - + available_endpoints = [ + ep for ep in self.endpoints if ep != current + ] + if not available_endpoints: raise SolanaAPIException("All endpoints exhausted") - + self.current_endpoint = random.choice(available_endpoints) logger.info(f"Switched to endpoint: {self.current_endpoint}") return self.current_endpoint -def make_request(endpoint_manager: RPCEndpointManager, payload: dict, retry_count: int = 3) -> dict: + +def make_request( + endpoint_manager: RPCEndpointManager, + payload: dict, + retry_count: int = 3, +) -> dict: """ Makes a request with automatic endpoint switching and error handling. """ last_error = None - + for attempt in range(retry_count): try: endpoint = endpoint_manager.get_endpoint() - + response = requests.post( endpoint, json=payload, timeout=10, headers={"Content-Type": "application/json"}, - verify=True # Ensure SSL verification + verify=True, # Ensure SSL verification ) - + if response.status_code != 200: - raise SolanaAPIException(f"HTTP {response.status_code}: {response.text}") - + raise SolanaAPIException( + f"HTTP {response.status_code}: {response.text}" + ) + data = response.json() - + if "error" in data: error_code = data["error"].get("code") if error_code == 429: # Rate limit - logger.warning(f"Rate limit hit, switching endpoint...") + logger.warning( + "Rate limit hit, switching endpoint..." + ) endpoint_manager.switch_endpoint() - time.sleep(2 ** attempt) # Exponential backoff + time.sleep(2**attempt) # Exponential backoff continue - + if "message" in data["error"]: - raise SolanaAPIException(f"RPC error: {data['error']['message']}") - + raise SolanaAPIException( + f"RPC error: {data['error']['message']}" + ) + return data - - except (requests.exceptions.SSLError, requests.exceptions.ConnectionError) as e: - logger.warning(f"Connection error with {endpoint}: {str(e)}") + + except ( + requests.exceptions.SSLError, + requests.exceptions.ConnectionError, + ) as e: + logger.warning( + f"Connection error with {endpoint}: {str(e)}" + ) endpoint_manager.switch_endpoint() continue - + except Exception as e: last_error = e logger.warning(f"Request failed: {str(e)}") endpoint_manager.switch_endpoint() time.sleep(1) continue - - raise SolanaAPIException(f"All retry attempts failed. Last error: {str(last_error)}") -def fetch_wallet_transactions(wallet_address: str, max_transactions: int = 10) -> str: + raise SolanaAPIException( + f"All retry attempts failed. Last error: {str(last_error)}" + ) + + +def fetch_wallet_transactions( + wallet_address: str, max_transactions: int = 10 +) -> str: """ Fetches recent transactions for a given Solana wallet address. - + Args: wallet_address (str): The Solana wallet address to fetch transactions for max_transactions (int, optional): Maximum number of transactions to fetch. Defaults to 10. - + Returns: str: JSON string containing transaction details """ try: - if not isinstance(wallet_address, str) or len(wallet_address) != 44: - raise ValueError(f"Invalid Solana wallet address format: {wallet_address}") - - if not isinstance(max_transactions, int) or max_transactions < 1: - raise ValueError("max_transactions must be a positive integer") + if ( + not isinstance(wallet_address, str) + or len(wallet_address) != 44 + ): + raise ValueError( + f"Invalid Solana wallet address format: {wallet_address}" + ) - logger.info(f"Fetching up to {max_transactions} transactions for wallet: {wallet_address}") + if ( + not isinstance(max_transactions, int) + or max_transactions < 1 + ): + raise ValueError( + "max_transactions must be a positive integer" + ) + + logger.info( + f"Fetching up to {max_transactions} transactions for wallet: {wallet_address}" + ) endpoint_manager = RPCEndpointManager(RPC_ENDPOINTS) - + # Get transaction signatures signatures_payload = { "jsonrpc": "2.0", "id": str(random.randint(1, 1000)), "method": "getSignaturesForAddress", - "params": [ - wallet_address, - {"limit": max_transactions} - ] + "params": [wallet_address, {"limit": max_transactions}], } - signatures_data = make_request(endpoint_manager, signatures_payload) - + signatures_data = make_request( + endpoint_manager, signatures_payload + ) + transactions = signatures_data.get("result", []) if not transactions: logger.info("No transactions found for this wallet") - return json.dumps({ - "success": True, - "transactions": [], - "error": None, - "transaction_count": 0 - }, indent=2) + return json.dumps( + { + "success": True, + "transactions": [], + "error": None, + "transaction_count": 0, + }, + indent=2, + ) logger.info(f"Found {len(transactions)} transactions") @@ -180,12 +222,15 @@ def fetch_wallet_transactions(wallet_address: str, max_transactions: int = 10) - "method": "getTransaction", "params": [ tx["signature"], - {"encoding": "json", "maxSupportedTransactionVersion": 0} - ] + { + "encoding": "json", + "maxSupportedTransactionVersion": 0, + }, + ], } - + tx_data = make_request(endpoint_manager, tx_payload) - + if "result" in tx_data and tx_data["result"]: result = tx_data["result"] enriched_tx = { @@ -194,47 +239,64 @@ def fetch_wallet_transactions(wallet_address: str, max_transactions: int = 10) - "timestamp": tx.get("blockTime"), "success": not tx.get("err"), } - + if "meta" in result: enriched_tx["fee"] = result["meta"].get("fee") - if "preBalances" in result["meta"] and "postBalances" in result["meta"]: - enriched_tx["balance_change"] = sum(result["meta"]["postBalances"]) - sum(result["meta"]["preBalances"]) - + if ( + "preBalances" in result["meta"] + and "postBalances" in result["meta"] + ): + enriched_tx["balance_change"] = sum( + result["meta"]["postBalances"] + ) - sum(result["meta"]["preBalances"]) + enriched_transactions.append(enriched_tx) - logger.info(f"Processed transaction {tx['signature'][:8]}...") - + logger.info( + f"Processed transaction {tx['signature'][:8]}..." + ) + except Exception as e: - logger.warning(f"Failed to process transaction {tx['signature']}: {str(e)}") + logger.warning( + f"Failed to process transaction {tx['signature']}: {str(e)}" + ) continue - logger.info(f"Successfully processed {len(enriched_transactions)} transactions") - - return json.dumps({ - "success": True, - "transactions": enriched_transactions, - "error": None, - "transaction_count": len(enriched_transactions) - }, indent=2) + logger.info( + f"Successfully processed {len(enriched_transactions)} transactions" + ) + + return json.dumps( + { + "success": True, + "transactions": enriched_transactions, + "error": None, + "transaction_count": len(enriched_transactions), + }, + indent=2, + ) except Exception as e: error = TransactionError( - error_type="API_ERROR", - message=str(e) + error_type="API_ERROR", message=str(e) ) logger.error(f"Error: {error.message}") - return json.dumps({ - "success": False, - "transactions": [], - "error": error.__dict__, - "transaction_count": 0 - }, indent=2) + return json.dumps( + { + "success": False, + "transactions": [], + "error": error.__dict__, + "transaction_count": 0, + }, + indent=2, + ) + if __name__ == "__main__": # Example wallet address wallet = "CtBLg4AX6LQfKVtPPUWqJyQ5cRfHydUwuZZ87rmojA1P" - + try: result = fetch_wallet_transactions(wallet) print(result) except Exception as e: - logger.error(f"Failed to fetch transactions: {str(e)}") \ No newline at end of file + logger.error(f"Failed to fetch transactions: {str(e)}") diff --git a/swarms/artifacts/main_artifact.py b/swarms/artifacts/main_artifact.py index 47fbc11f7..ba29a2063 100644 --- a/swarms/artifacts/main_artifact.py +++ b/swarms/artifacts/main_artifact.py @@ -1,4 +1,3 @@ - import json import os import subprocess diff --git a/swarms/structs/agent.py b/swarms/structs/agent.py index d6caed666..f68098ee9 100644 --- a/swarms/structs/agent.py +++ b/swarms/structs/agent.py @@ -499,10 +499,10 @@ def __init__( self.stopping_token = "" # If the docs exist then ingest the docs - if exists(self.docs): - threading.Thread( - target=self.ingest_docs, args=(self.docs) - ).start() + # if exists(self.docs): + # threading.Thread( + # target=self.ingest_docs, args=(self.docs) + # ).start() # If docs folder exists then get the docs from docs folder if exists(self.docs_folder): @@ -1176,6 +1176,13 @@ def __call__( except Exception as error: self._handle_run_error(error) + def receive_message( + self, agent_name: str, task: str, *args, **kwargs + ): + return self.run( + task=f"From {agent_name}: {task}", *args, **kwargs + ) + def dict_to_csv(self, data: dict) -> str: """ Convert a dictionary to a CSV string. diff --git a/swarms/structs/rearrange.py b/swarms/structs/rearrange.py index bd2bced6c..6be885bee 100644 --- a/swarms/structs/rearrange.py +++ b/swarms/structs/rearrange.py @@ -482,13 +482,15 @@ def _run( except Exception as e: self._catch_error(e) - + def _catch_error(self, e: Exception): if self.autosave is True: log_agent_data(self.to_dict()) - - logger.error(f"An error occurred with your swarm {self.name}: Error: {e} Traceback: {e.__traceback__}") - + + logger.error( + f"An error occurred with your swarm {self.name}: Error: {e} Traceback: {e.__traceback__}" + ) + return e def run( @@ -653,7 +655,9 @@ async def abatch_run( # Process batch using asyncio.gather batch_coros = [ - self.astream(task=task, img=img_path, *args, **kwargs) + self.astream( + task=task, img=img_path, *args, **kwargs + ) for task, img_path in zip(batch_tasks, batch_imgs) ] batch_results = await asyncio.gather(*batch_coros) @@ -691,7 +695,9 @@ def concurrent_run( List of results corresponding to input tasks """ try: - with ThreadPoolExecutor(max_workers=max_workers) as executor: + with ThreadPoolExecutor( + max_workers=max_workers + ) as executor: imgs = img if img else [None] * len(tasks) futures = [ executor.submit( @@ -710,8 +716,7 @@ def concurrent_run( return [future.result() for future in futures] except Exception as e: self._catch_error(e) - - + def _serialize_callable( self, attr_value: Callable ) -> Dict[str, Any]: @@ -771,7 +776,6 @@ def to_dict(self) -> Dict[str, Any]: } - def rearrange( agents: List[Agent] = None, flow: str = None, diff --git a/swarms/structs/spreadsheet_swarm.py b/swarms/structs/spreadsheet_swarm.py index 6fd2044d7..fd1b514fe 100644 --- a/swarms/structs/spreadsheet_swarm.py +++ b/swarms/structs/spreadsheet_swarm.py @@ -108,7 +108,9 @@ def __init__( # --------------- NEW CHANGE START --------------- # The save_file_path now uses the formatted_time and uuid_hex - self.save_file_path = f"spreadsheet_swarm_{formatted_time}_run_id_{uuid_hex}.csv" + self.save_file_path = ( + f"spreadsheet_swarm_run_id_{uuid_hex}.csv" + ) # --------------- NEW CHANGE END --------------- self.metadata = SwarmRunMetadata( @@ -182,10 +184,22 @@ async def _load_from_csv(self): ), docs=[row["docs"]] if "docs" in row else "", dynamic_temperature_enabled=True, - max_loops=row["max_loops"] if "max_loops" in row else 1, - user_name=row["user_name"] if "user_name" in row else "user", + max_loops=( + row["max_loops"] + if "max_loops" in row + else 1 + ), + user_name=( + row["user_name"] + if "user_name" in row + else "user" + ), # output_type="str", - stopping_token=row["stopping_token"] if "stopping_token" in row else None, + stopping_token=( + row["stopping_token"] + if "stopping_token" in row + else None + ), ) # Add agent to swarm @@ -268,8 +282,7 @@ async def _run(self, task: str = None, *args, **kwargs): print(log_agent_data(self.metadata.model_dump())) return self.metadata.model_dump_json(indent=4) - - + def run(self, task: str = None, *args, **kwargs): """ Run the swarm with the specified task. @@ -378,7 +391,7 @@ def data_to_json_file(self): create_file_in_folder( folder_path=f"{self.workspace_dir}/Spreedsheet-Swarm-{self.name}/{self.name}", - file_name=f"spreedsheet-swarm-{self.metadata.run_id}_metadata.json", + file_name=f"spreedsheet-swarm-{uuid_hex}_metadata.json", content=out, ) diff --git a/swarms/structs/swarming_architectures.py b/swarms/structs/swarming_architectures.py index ce8400237..41b239cc4 100644 --- a/swarms/structs/swarming_architectures.py +++ b/swarms/structs/swarming_architectures.py @@ -1,4 +1,3 @@ -import asyncio import math from typing import List, Union @@ -343,47 +342,6 @@ def sinusoidal_swarm(agents: AgentListType, task: str): agents[index].run(task) -async def one_to_three( - sender: Agent, agents: AgentListType, task: str -): - """ - Sends a message from the sender agent to three other agents. - - Args: - sender (Agent): The agent sending the message. - agents (AgentListType): The list of agents to receive the message. - task (str): The message to be sent. - - Raises: - Exception: If there is an error while sending the message. - - Returns: - None - """ - if len(agents) != 3: - raise ValueError("The number of agents must be exactly 3.") - - if not task: - raise ValueError("The task cannot be empty.") - - if not sender: - raise ValueError("The sender cannot be empty.") - - try: - receive_tasks = [] - for agent in agents: - receive_tasks.append( - agent.receive_message(sender.agent_name, task) - ) - - await asyncio.gather(*receive_tasks) - except Exception as error: - logger.error( - f"[ERROR][CLASS: Agent][METHOD: one_to_three] {error}" - ) - raise error - - """ This module contains functions for facilitating communication between agents in a swarm. It includes methods for one-to-one communication, broadcasting, and other swarm architectures. """ @@ -440,36 +398,70 @@ def one_to_one( return conversation.return_history() -# Broadcasting: A message from one agent to many async def broadcast( sender: Agent, agents: AgentListType, task: str ) -> None: - """ - Facilitates broadcasting of a message from one agent to multiple agents. - - Args: - sender (Agent): The agent sending the message. - agents (AgentListType): The list of agents to receive the message. - task (str): The message to be sent. - - Raises: - ValueError: If the sender, agents, or task is empty. - Exception: If there is an error during the broadcasting process. - """ conversation = Conversation() if not sender or not agents or not task: raise ValueError("Sender, agents, and task cannot be empty.") try: - receive_tasks = [] + # First get the sender's broadcast message + broadcast_message = sender.run(task) + conversation.add_log( + agent_name=sender.agent_name, + task=task, + response=broadcast_message, + ) + + # Then have all agents process it for agent in agents: - receive_tasks.append(agent.run(task)) + response = agent.run(broadcast_message) conversation.add_log( - agent_name=agent.agent_name, task=task, response=task + agent_name=agent.agent_name, + task=broadcast_message, + response=response, ) - await asyncio.gather(*receive_tasks) + return conversation.return_history() + except Exception as error: logger.error(f"Error during broadcast: {error}") raise error + + +async def one_to_three( + sender: Agent, agents: AgentListType, task: str +): + if len(agents) != 3: + raise ValueError("The number of agents must be exactly 3.") + + if not task or not sender: + raise ValueError("Sender and task cannot be empty.") + + conversation = Conversation() + + try: + # Get sender's message + sender_message = sender.run(task) + conversation.add_log( + agent_name=sender.agent_name, + task=task, + response=sender_message, + ) + + # Have each receiver process the message + for agent in agents: + response = agent.run(sender_message) + conversation.add_log( + agent_name=agent.agent_name, + task=sender_message, + response=response, + ) + + return conversation.return_history() + + except Exception as error: + logger.error(f"Error in one_to_three: {error}") + raise error diff --git a/swarms/utils/pandas_utils.py b/swarms/utils/pandas_utils.py index 1f712b7f2..2a738feee 100644 --- a/swarms/utils/pandas_utils.py +++ b/swarms/utils/pandas_utils.py @@ -10,7 +10,6 @@ logger = initialize_logger(log_folder="pandas_utils") - def display_agents_info(agents: List[Agent]) -> None: """ Displays information about all agents in a list using a DataFrame. @@ -18,7 +17,7 @@ def display_agents_info(agents: List[Agent]) -> None: :param agents: List of Agent instances. """ # Extracting relevant information from each agent - + try: import pandas as pd except ImportError: @@ -26,8 +25,6 @@ def display_agents_info(agents: List[Agent]) -> None: subprocess.run(["pip", "install", "pandas"]) import pandas as pd - - agent_data = [] for agent in agents: try: diff --git a/tests/requrements.txt b/tests/requrements.txt new file mode 100644 index 000000000..e4a264a5a --- /dev/null +++ b/tests/requrements.txt @@ -0,0 +1,6 @@ +pytest +swarms +loguru +pydantic +swarm-models +loguru diff --git a/tests/structs/test_agentrearrange 2.py b/tests/structs/test_agentrearrange 2.py index ba5bdad0f..abb23dd20 100644 --- a/tests/structs/test_agentrearrange 2.py +++ b/tests/structs/test_agentrearrange 2.py @@ -12,6 +12,7 @@ class TestResult: """Class to store test results and metadata""" + def __init__(self, test_name: str): self.test_name = test_name self.start_time = datetime.now() @@ -21,7 +22,9 @@ def __init__(self, test_name: str): self.traceback = None self.function_output = None - def complete(self, success: bool, error: Optional[Exception] = None): + def complete( + self, success: bool, error: Optional[Exception] = None + ): """Complete the test execution with results""" self.end_time = datetime.now() self.success = success @@ -35,36 +38,47 @@ def duration(self) -> float: return (self.end_time - self.start_time).total_seconds() return 0 + def run_test(test_func: Callable) -> TestResult: """ Decorator to run tests with error handling and logging - + Args: test_func (Callable): Test function to execute - + Returns: TestResult: Object containing test execution details """ + def wrapper(*args, **kwargs) -> TestResult: result = TestResult(test_func.__name__) - logger.info(f"\n{'='*20} Running test: {test_func.__name__} {'='*20}") - + logger.info( + f"\n{'='*20} Running test: {test_func.__name__} {'='*20}" + ) + try: output = test_func(*args, **kwargs) result.function_output = output result.complete(success=True) - logger.success(f"✅ Test {test_func.__name__} passed successfully") - + logger.success( + f"✅ Test {test_func.__name__} passed successfully" + ) + except Exception as e: result.complete(success=False, error=e) - logger.error(f"❌ Test {test_func.__name__} failed with error: {str(e)}") + logger.error( + f"❌ Test {test_func.__name__} failed with error: {str(e)}" + ) logger.error(f"Traceback: {traceback.format_exc()}") - - logger.info(f"Test duration: {result.duration():.2f} seconds\n") + + logger.info( + f"Test duration: {result.duration():.2f} seconds\n" + ) return result - + return wrapper + def create_functional_agents() -> List[Agent]: """ Create a list of functional agents with real LLM integration for testing. @@ -73,16 +87,19 @@ def create_functional_agents() -> List[Agent]: # Initialize OpenAI Chat model api_key = os.getenv("OPENAI_API_KEY") if not api_key: - logger.warning("No OpenAI API key found. Using mock agents instead.") - return [create_mock_agent("TestAgent1"), create_mock_agent("TestAgent2")] - + logger.warning( + "No OpenAI API key found. Using mock agents instead." + ) + return [ + create_mock_agent("TestAgent1"), + create_mock_agent("TestAgent2"), + ] + try: model = OpenAIChat( - api_key=api_key, - model_name="gpt-4o", - temperature=0.1 + api_key=api_key, model_name="gpt-4o", temperature=0.1 ) - + # Create boss agent boss_agent = Agent( agent_name="BossAgent", @@ -101,7 +118,7 @@ def create_functional_agents() -> List[Agent]: state_save_file_type="json", saved_state_path="test_boss_agent.json", ) - + # Create analysis agent analysis_agent = Agent( agent_name="AnalysisAgent", @@ -119,7 +136,7 @@ def create_functional_agents() -> List[Agent]: state_save_file_type="json", saved_state_path="test_analysis_agent.json", ) - + # Create summary agent summary_agent = Agent( agent_name="SummaryAgent", @@ -137,83 +154,102 @@ def create_functional_agents() -> List[Agent]: state_save_file_type="json", saved_state_path="test_summary_agent.json", ) - - logger.info("Successfully created functional agents with LLM integration") + + logger.info( + "Successfully created functional agents with LLM integration" + ) return [boss_agent, analysis_agent, summary_agent] - + except Exception as e: logger.error(f"Failed to create functional agents: {str(e)}") logger.warning("Falling back to mock agents") - return [create_mock_agent("TestAgent1"), create_mock_agent("TestAgent2")] + return [ + create_mock_agent("TestAgent1"), + create_mock_agent("TestAgent2"), + ] + def create_mock_agent(name: str) -> Agent: """Create a mock agent for testing when LLM integration is not available""" return Agent( agent_name=name, system_prompt=f"You are a test agent named {name}", - llm=None + llm=None, ) + @run_test def test_init(): """Test AgentRearrange initialization with functional agents""" logger.info("Creating agents for initialization test") agents = create_functional_agents() - + rearrange = AgentRearrange( name="TestRearrange", agents=agents, - flow=f"{agents[0].agent_name} -> {agents[1].agent_name} -> {agents[2].agent_name}" + flow=f"{agents[0].agent_name} -> {agents[1].agent_name} -> {agents[2].agent_name}", ) - + assert rearrange.name == "TestRearrange" assert len(rearrange.agents) == 3 - assert rearrange.flow == f"{agents[0].agent_name} -> {agents[1].agent_name} -> {agents[2].agent_name}" - - logger.info(f"Initialized AgentRearrange with {len(agents)} agents") + assert ( + rearrange.flow + == f"{agents[0].agent_name} -> {agents[1].agent_name} -> {agents[2].agent_name}" + ) + + logger.info( + f"Initialized AgentRearrange with {len(agents)} agents" + ) return True + @run_test def test_validate_flow(): """Test flow validation logic""" agents = create_functional_agents() rearrange = AgentRearrange( agents=agents, - flow=f"{agents[0].agent_name} -> {agents[1].agent_name}" + flow=f"{agents[0].agent_name} -> {agents[1].agent_name}", ) - + logger.info("Testing valid flow pattern") valid = rearrange.validate_flow() assert valid is True - + logger.info("Testing invalid flow pattern") rearrange.flow = f"{agents[0].agent_name} {agents[1].agent_name}" # Missing arrow try: rearrange.validate_flow() assert False, "Should have raised ValueError" except ValueError as e: - logger.info(f"Successfully caught invalid flow error: {str(e)}") + logger.info( + f"Successfully caught invalid flow error: {str(e)}" + ) assert True - + return True + @run_test def test_add_remove_agent(): """Test adding and removing agents from the swarm""" agents = create_functional_agents() - rearrange = AgentRearrange(agents=agents[:2]) # Start with first two agents - + rearrange = AgentRearrange( + agents=agents[:2] + ) # Start with first two agents + logger.info("Testing agent addition") new_agent = agents[2] # Use the third agent as new agent rearrange.add_agent(new_agent) assert new_agent.agent_name in rearrange.agents - + logger.info("Testing agent removal") rearrange.remove_agent(new_agent.agent_name) assert new_agent.agent_name not in rearrange.agents - + return True + @run_test def test_basic_run(): """Test basic task execution with the swarm""" @@ -222,25 +258,30 @@ def test_basic_run(): name="TestSwarm", agents=agents, flow=f"{agents[0].agent_name} -> {agents[1].agent_name} -> {agents[2].agent_name}", - max_loops=1 + max_loops=1, + ) + + test_task = ( + "Analyze this test message and provide a brief summary." ) - - test_task = "Analyze this test message and provide a brief summary." logger.info(f"Running test task: {test_task}") - + try: result = rearrange.run(test_task) assert result is not None - logger.info(f"Successfully executed task with result length: {len(str(result))}") + logger.info( + f"Successfully executed task with result length: {len(str(result))}" + ) return True except Exception as e: logger.error(f"Task execution failed: {str(e)}") raise + def run_all_tests() -> Dict[str, TestResult]: """ Run all test cases and collect results - + Returns: Dict[str, TestResult]: Dictionary mapping test names to their results """ @@ -249,26 +290,26 @@ def run_all_tests() -> Dict[str, TestResult]: test_init, test_validate_flow, test_add_remove_agent, - test_basic_run + test_basic_run, ] - + results = {} for test in test_functions: result = test() results[test.__name__] = result - + # Log summary total_tests = len(results) passed_tests = sum(1 for r in results.values() if r.success) failed_tests = total_tests - passed_tests - + logger.info("\n📊 Test Suite Summary:") logger.info(f"Total Tests: {total_tests}") print(f"✅ Passed: {passed_tests}") - + if failed_tests > 0: logger.error(f"❌ Failed: {failed_tests}") - + # Detailed failure information if failed_tests > 0: logger.error("\n❌ Failed Tests Details:") @@ -277,10 +318,11 @@ def run_all_tests() -> Dict[str, TestResult]: logger.error(f"\n{name}:") logger.error(f"Error: {result.error}") logger.error(f"Traceback: {result.traceback}") - + return results + if __name__ == "__main__": print("🌟 Starting AgentRearrange Test Suite") results = run_all_tests() - print("🏁 Test Suite Execution Completed") \ No newline at end of file + print("🏁 Test Suite Execution Completed") diff --git a/tests/structs/test_agentrearrange.py b/tests/structs/test_agentrearrange.py index ba5bdad0f..abb23dd20 100644 --- a/tests/structs/test_agentrearrange.py +++ b/tests/structs/test_agentrearrange.py @@ -12,6 +12,7 @@ class TestResult: """Class to store test results and metadata""" + def __init__(self, test_name: str): self.test_name = test_name self.start_time = datetime.now() @@ -21,7 +22,9 @@ def __init__(self, test_name: str): self.traceback = None self.function_output = None - def complete(self, success: bool, error: Optional[Exception] = None): + def complete( + self, success: bool, error: Optional[Exception] = None + ): """Complete the test execution with results""" self.end_time = datetime.now() self.success = success @@ -35,36 +38,47 @@ def duration(self) -> float: return (self.end_time - self.start_time).total_seconds() return 0 + def run_test(test_func: Callable) -> TestResult: """ Decorator to run tests with error handling and logging - + Args: test_func (Callable): Test function to execute - + Returns: TestResult: Object containing test execution details """ + def wrapper(*args, **kwargs) -> TestResult: result = TestResult(test_func.__name__) - logger.info(f"\n{'='*20} Running test: {test_func.__name__} {'='*20}") - + logger.info( + f"\n{'='*20} Running test: {test_func.__name__} {'='*20}" + ) + try: output = test_func(*args, **kwargs) result.function_output = output result.complete(success=True) - logger.success(f"✅ Test {test_func.__name__} passed successfully") - + logger.success( + f"✅ Test {test_func.__name__} passed successfully" + ) + except Exception as e: result.complete(success=False, error=e) - logger.error(f"❌ Test {test_func.__name__} failed with error: {str(e)}") + logger.error( + f"❌ Test {test_func.__name__} failed with error: {str(e)}" + ) logger.error(f"Traceback: {traceback.format_exc()}") - - logger.info(f"Test duration: {result.duration():.2f} seconds\n") + + logger.info( + f"Test duration: {result.duration():.2f} seconds\n" + ) return result - + return wrapper + def create_functional_agents() -> List[Agent]: """ Create a list of functional agents with real LLM integration for testing. @@ -73,16 +87,19 @@ def create_functional_agents() -> List[Agent]: # Initialize OpenAI Chat model api_key = os.getenv("OPENAI_API_KEY") if not api_key: - logger.warning("No OpenAI API key found. Using mock agents instead.") - return [create_mock_agent("TestAgent1"), create_mock_agent("TestAgent2")] - + logger.warning( + "No OpenAI API key found. Using mock agents instead." + ) + return [ + create_mock_agent("TestAgent1"), + create_mock_agent("TestAgent2"), + ] + try: model = OpenAIChat( - api_key=api_key, - model_name="gpt-4o", - temperature=0.1 + api_key=api_key, model_name="gpt-4o", temperature=0.1 ) - + # Create boss agent boss_agent = Agent( agent_name="BossAgent", @@ -101,7 +118,7 @@ def create_functional_agents() -> List[Agent]: state_save_file_type="json", saved_state_path="test_boss_agent.json", ) - + # Create analysis agent analysis_agent = Agent( agent_name="AnalysisAgent", @@ -119,7 +136,7 @@ def create_functional_agents() -> List[Agent]: state_save_file_type="json", saved_state_path="test_analysis_agent.json", ) - + # Create summary agent summary_agent = Agent( agent_name="SummaryAgent", @@ -137,83 +154,102 @@ def create_functional_agents() -> List[Agent]: state_save_file_type="json", saved_state_path="test_summary_agent.json", ) - - logger.info("Successfully created functional agents with LLM integration") + + logger.info( + "Successfully created functional agents with LLM integration" + ) return [boss_agent, analysis_agent, summary_agent] - + except Exception as e: logger.error(f"Failed to create functional agents: {str(e)}") logger.warning("Falling back to mock agents") - return [create_mock_agent("TestAgent1"), create_mock_agent("TestAgent2")] + return [ + create_mock_agent("TestAgent1"), + create_mock_agent("TestAgent2"), + ] + def create_mock_agent(name: str) -> Agent: """Create a mock agent for testing when LLM integration is not available""" return Agent( agent_name=name, system_prompt=f"You are a test agent named {name}", - llm=None + llm=None, ) + @run_test def test_init(): """Test AgentRearrange initialization with functional agents""" logger.info("Creating agents for initialization test") agents = create_functional_agents() - + rearrange = AgentRearrange( name="TestRearrange", agents=agents, - flow=f"{agents[0].agent_name} -> {agents[1].agent_name} -> {agents[2].agent_name}" + flow=f"{agents[0].agent_name} -> {agents[1].agent_name} -> {agents[2].agent_name}", ) - + assert rearrange.name == "TestRearrange" assert len(rearrange.agents) == 3 - assert rearrange.flow == f"{agents[0].agent_name} -> {agents[1].agent_name} -> {agents[2].agent_name}" - - logger.info(f"Initialized AgentRearrange with {len(agents)} agents") + assert ( + rearrange.flow + == f"{agents[0].agent_name} -> {agents[1].agent_name} -> {agents[2].agent_name}" + ) + + logger.info( + f"Initialized AgentRearrange with {len(agents)} agents" + ) return True + @run_test def test_validate_flow(): """Test flow validation logic""" agents = create_functional_agents() rearrange = AgentRearrange( agents=agents, - flow=f"{agents[0].agent_name} -> {agents[1].agent_name}" + flow=f"{agents[0].agent_name} -> {agents[1].agent_name}", ) - + logger.info("Testing valid flow pattern") valid = rearrange.validate_flow() assert valid is True - + logger.info("Testing invalid flow pattern") rearrange.flow = f"{agents[0].agent_name} {agents[1].agent_name}" # Missing arrow try: rearrange.validate_flow() assert False, "Should have raised ValueError" except ValueError as e: - logger.info(f"Successfully caught invalid flow error: {str(e)}") + logger.info( + f"Successfully caught invalid flow error: {str(e)}" + ) assert True - + return True + @run_test def test_add_remove_agent(): """Test adding and removing agents from the swarm""" agents = create_functional_agents() - rearrange = AgentRearrange(agents=agents[:2]) # Start with first two agents - + rearrange = AgentRearrange( + agents=agents[:2] + ) # Start with first two agents + logger.info("Testing agent addition") new_agent = agents[2] # Use the third agent as new agent rearrange.add_agent(new_agent) assert new_agent.agent_name in rearrange.agents - + logger.info("Testing agent removal") rearrange.remove_agent(new_agent.agent_name) assert new_agent.agent_name not in rearrange.agents - + return True + @run_test def test_basic_run(): """Test basic task execution with the swarm""" @@ -222,25 +258,30 @@ def test_basic_run(): name="TestSwarm", agents=agents, flow=f"{agents[0].agent_name} -> {agents[1].agent_name} -> {agents[2].agent_name}", - max_loops=1 + max_loops=1, + ) + + test_task = ( + "Analyze this test message and provide a brief summary." ) - - test_task = "Analyze this test message and provide a brief summary." logger.info(f"Running test task: {test_task}") - + try: result = rearrange.run(test_task) assert result is not None - logger.info(f"Successfully executed task with result length: {len(str(result))}") + logger.info( + f"Successfully executed task with result length: {len(str(result))}" + ) return True except Exception as e: logger.error(f"Task execution failed: {str(e)}") raise + def run_all_tests() -> Dict[str, TestResult]: """ Run all test cases and collect results - + Returns: Dict[str, TestResult]: Dictionary mapping test names to their results """ @@ -249,26 +290,26 @@ def run_all_tests() -> Dict[str, TestResult]: test_init, test_validate_flow, test_add_remove_agent, - test_basic_run + test_basic_run, ] - + results = {} for test in test_functions: result = test() results[test.__name__] = result - + # Log summary total_tests = len(results) passed_tests = sum(1 for r in results.values() if r.success) failed_tests = total_tests - passed_tests - + logger.info("\n📊 Test Suite Summary:") logger.info(f"Total Tests: {total_tests}") print(f"✅ Passed: {passed_tests}") - + if failed_tests > 0: logger.error(f"❌ Failed: {failed_tests}") - + # Detailed failure information if failed_tests > 0: logger.error("\n❌ Failed Tests Details:") @@ -277,10 +318,11 @@ def run_all_tests() -> Dict[str, TestResult]: logger.error(f"\n{name}:") logger.error(f"Error: {result.error}") logger.error(f"Traceback: {result.traceback}") - + return results + if __name__ == "__main__": print("🌟 Starting AgentRearrange Test Suite") results = run_all_tests() - print("🏁 Test Suite Execution Completed") \ No newline at end of file + print("🏁 Test Suite Execution Completed") diff --git a/tests/structs/test_auto_swarms_builder.py b/tests/structs/test_auto_swarms_builder.py new file mode 100644 index 000000000..4d690678c --- /dev/null +++ b/tests/structs/test_auto_swarms_builder.py @@ -0,0 +1,198 @@ +from swarms.structs.auto_swarm_builder import AutoSwarmBuilder +from dotenv import load_dotenv + +load_dotenv() + + +def print_separator(): + print("\n" + "=" * 50) + + +def test_initialization(): + """Test basic initialization of AutoSwarmBuilder""" + print_separator() + print("Testing AutoSwarmBuilder Initialization") + try: + swarm = AutoSwarmBuilder( + name="TestSwarm", + description="A test swarm for validation", + verbose=True, + max_loops=2, + ) + + print("✓ Created swarm with configuration:") + print(f" - Name: {swarm.name}") + print(f" - Description: {swarm.description}") + print(f" - Max loops: {swarm.max_loops}") + print(f" - Verbose: {swarm.verbose}") + print("✓ Initialization test passed") + return swarm + except Exception as e: + print(f"✗ Initialization test failed: {str(e)}") + raise + + +def test_agent_building(): + """Test building individual agents""" + print_separator() + print("Testing Agent Building") + try: + swarm = AutoSwarmBuilder() + agent = swarm.build_agent( + agent_name="TestAgent", + agent_description="A test agent", + agent_system_prompt="You are a test agent", + max_loops=1, + ) + + print("✓ Built agent with configuration:") + print(f" - Name: {agent.agent_name}") + print(f" - Description: {agent.description}") + print(f" - Max loops: {agent.max_loops}") + print("✓ Agent building test passed") + return agent + except Exception as e: + print(f"✗ Agent building test failed: {str(e)}") + raise + + +def test_agent_creation(): + """Test creating multiple agents for a task""" + print_separator() + print("Testing Agent Creation from Task") + try: + swarm = AutoSwarmBuilder( + name="ResearchSwarm", + description="A swarm for research tasks", + ) + task = "Research the latest developments in quantum computing" + agents = swarm._create_agents(task) + + print("✓ Created agents for research task:") + for i, agent in enumerate(agents, 1): + print(f" Agent {i}:") + print(f" - Name: {agent.agent_name}") + print(f" - Description: {agent.description}") + print(f"✓ Created {len(agents)} agents successfully") + return agents + except Exception as e: + print(f"✗ Agent creation test failed: {str(e)}") + raise + + +def test_swarm_routing(): + """Test routing tasks through the swarm""" + print_separator() + print("Testing Swarm Routing") + try: + swarm = AutoSwarmBuilder( + name="RouterTestSwarm", + description="Testing routing capabilities", + ) + agents = ( + test_agent_creation() + ) # Get agents from previous test + task = "Analyze the impact of AI on healthcare" + + print("Starting task routing...") + result = swarm.swarm_router(agents, task) + + print("✓ Task routed successfully") + print( + f" - Result length: {len(str(result)) if result else 0} characters" + ) + print("✓ Swarm routing test passed") + return result + except Exception as e: + print(f"✗ Swarm routing test failed: {str(e)}") + raise + + +def test_full_swarm_execution(): + """Test complete swarm execution with a real task""" + print_separator() + print("Testing Full Swarm Execution") + try: + swarm = AutoSwarmBuilder( + name="FullTestSwarm", + description="Testing complete swarm functionality", + max_loops=1, + ) + task = ( + "Create a summary of recent advances in renewable energy" + ) + + print("Starting full swarm execution...") + result = swarm.run(task) + + print("✓ Full swarm execution completed:") + print(f" - Output generated: {bool(result)}") + print( + f" - Output length: {len(str(result)) if result else 0} characters" + ) + print("✓ Full swarm execution test passed") + return result + except Exception as e: + print(f"✗ Full swarm execution test failed: {str(e)}") + raise + + +def test_error_handling(): + """Test error handling in swarm operations""" + print_separator() + print("Testing Error Handling") + try: + swarm = AutoSwarmBuilder() + + # Test with invalid agent configuration + print("Testing invalid agent configuration...") + try: + swarm.build_agent("", "", "") + print( + "✗ Should have raised an error for empty agent configuration" + ) + except Exception as e: + print( + f"✓ Correctly handled invalid agent configuration: {type(e).__name__}" + ) + + # Test with None task + print("\nTesting None task...") + try: + swarm.run(None) + print("✗ Should have raised an error for None task") + except Exception as e: + print( + f"✓ Correctly handled None task: {type(e).__name__}" + ) + + print("✓ Error handling test passed") + except Exception as e: + print(f"✗ Error handling test failed: {str(e)}") + raise + + +def run_all_tests(): + """Run complete test suite""" + print("\n=== Starting AutoSwarmBuilder Test Suite ===\n") + + try: + # Run all tests in sequence + test_initialization() + test_agent_building() + test_agent_creation() + test_swarm_routing() + test_full_swarm_execution() + test_error_handling() + + print_separator() + print("🎉 All tests completed successfully!") + + except Exception as e: + print_separator() + print(f"❌ Test suite failed: {str(e)}") + raise + + +if __name__ == "__main__": + run_all_tests() diff --git a/tests/structs/test_company.py b/tests/structs/test_company.py index 15c7e7150..746e4c830 100644 --- a/tests/structs/test_company.py +++ b/tests/structs/test_company.py @@ -12,6 +12,7 @@ dev = Agent(llm=llm, name="Developer") va = Agent(llm=llm, name="VA") hr = Agent(llm=llm, name="HR") + shared_instructions = "Listen to your boss" diff --git a/tests/structs/test_multiprocess.py b/tests/structs/test_multiprocess.py new file mode 100644 index 000000000..92d5dc838 --- /dev/null +++ b/tests/structs/test_multiprocess.py @@ -0,0 +1,177 @@ +import asyncio +import time +from swarms.structs.agent import Agent +from swarms.structs.multi_process_workflow import MultiProcessWorkflow + + +def create_test_agent(name: str) -> Agent: + """Create a test agent that simply returns its input with a timestamp""" + return Agent( + agent_name=name, + system_prompt=f"Test prompt for {name}", + model_name="gpt-4o-mini", + max_loops=1, + ) + + +def test_initialization(): + """Test basic workflow initialization""" + print("\n=== Testing Workflow Initialization ===") + try: + agents = [create_test_agent(f"agent{i}") for i in range(3)] + workflow = MultiProcessWorkflow(max_workers=2, agents=agents) + + print("✓ Created workflow with configuration:") + print(f" - Max workers: {workflow.max_workers}") + print(f" - Number of agents: {len(workflow.agents)}") + print(f" - Autosave: {workflow.autosave}") + print("✓ Initialization test passed") + except Exception as e: + print(f"✗ Initialization test failed: {str(e)}") + raise + + +def test_execute_task(): + """Test execution of a single task""" + print("\n=== Testing Task Execution ===") + try: + agents = [create_test_agent("test_agent")] + workflow = MultiProcessWorkflow(agents=agents) + + test_task = "Return this message with timestamp" + result = workflow.execute_task(test_task) + + print("✓ Task executed successfully") + print(f" - Input task: {test_task}") + print(f" - Result: {result}") + print("✓ Task execution test passed") + except Exception as e: + print(f"✗ Task execution test failed: {str(e)}") + raise + + +def test_parallel_run(): + """Test parallel execution of tasks""" + print("\n=== Testing Parallel Run ===") + try: + agents = [create_test_agent(f"agent{i}") for i in range(3)] + workflow = MultiProcessWorkflow(max_workers=2, agents=agents) + + test_task = "Process this in parallel" + results = workflow.run(test_task) + + print("✓ Parallel execution completed") + # print(f" - Number of results: {len(results)}") + print(f" - Results: {results}") + print("✓ Parallel run test passed") + except Exception as e: + print(f"✗ Parallel run test failed: {str(e)}") + raise + + +async def test_async_run(): + """Test asynchronous execution of tasks""" + print("\n=== Testing Async Run ===") + try: + agents = [create_test_agent(f"agent{i}") for i in range(3)] + workflow = MultiProcessWorkflow(max_workers=2, agents=agents) + + test_task = "Process this asynchronously" + results = await workflow.async_run(test_task) + + print("✓ Async execution completed") + print(f" - Number of results: {len(results)}") + print(f" - Results: {results}") + print("✓ Async run test passed") + except Exception as e: + print(f"✗ Async run test failed: {str(e)}") + raise + + +def test_batched_run(): + """Test batch execution of tasks""" + print("\n=== Testing Batched Run ===") + try: + agents = [create_test_agent(f"agent{i}") for i in range(2)] + workflow = MultiProcessWorkflow(max_workers=2, agents=agents) + + tasks = [f"Batch task {i}" for i in range(5)] + results = workflow.batched_run(tasks, batch_size=2) + + print("✓ Batch execution completed") + print(f" - Number of tasks: {len(tasks)}") + print(" - Batch size: 2") + print(f" - Results: {results}") + print("✓ Batched run test passed") + except Exception as e: + print(f"✗ Batched run test failed: {str(e)}") + raise + + +def test_concurrent_run(): + """Test concurrent execution of tasks""" + print("\n=== Testing Concurrent Run ===") + try: + agents = [create_test_agent(f"agent{i}") for i in range(2)] + workflow = MultiProcessWorkflow(max_workers=2, agents=agents) + + tasks = [f"Concurrent task {i}" for i in range(4)] + results = workflow.concurrent_run(tasks) + + print("✓ Concurrent execution completed") + print(f" - Number of tasks: {len(tasks)}") + print(f" - Results: {results}") + print("✓ Concurrent run test passed") + except Exception as e: + print(f"✗ Concurrent run test failed: {str(e)}") + raise + + +def test_error_handling(): + """Test error handling in workflow""" + print("\n=== Testing Error Handling ===") + try: + # Create workflow with no agents to trigger error + workflow = MultiProcessWorkflow(max_workers=2, agents=None) + result = workflow.execute_task( + "This should handle the error gracefully" + ) + + print("✓ Error handled gracefully") + print(f" - Result when no agents: {result}") + print("✓ Error handling test passed") + except Exception as e: + print(f"✗ Error handling test failed: {str(e)}") + raise + + +async def run_all_tests(): + """Run all tests""" + print("\n=== Starting MultiProcessWorkflow Test Suite ===") + start_time = time.time() + + try: + # Run synchronous tests + test_initialization() + test_execute_task() + test_parallel_run() + test_batched_run() + test_concurrent_run() + test_error_handling() + + # Run async test + await test_async_run() + + end_time = time.time() + duration = round(end_time - start_time, 2) + print("\n=== Test Suite Completed Successfully ===") + print(f"Time taken: {duration} seconds") + + except Exception as e: + print("\n=== Test Suite Failed ===") + print(f"Error: {str(e)}") + raise + + +if __name__ == "__main__": + asyncio.run(run_all_tests()) diff --git a/tests/structs/test_spreadsheet.py b/tests/structs/test_spreadsheet.py new file mode 100644 index 000000000..25ce6b17a --- /dev/null +++ b/tests/structs/test_spreadsheet.py @@ -0,0 +1,226 @@ +import os +import asyncio +from loguru import logger +from swarms.structs.agent import Agent +from swarms.structs.spreadsheet_swarm import SpreadSheetSwarm + + +def create_test_csv() -> str: + """Create a test CSV file with agent configurations.""" + print("\nStarting creation of test CSV file") + try: + csv_content = """agent_name,description,system_prompt,task +test_agent_1,Test Agent 1,System prompt 1,Task 1 +test_agent_2,Test Agent 2,System prompt 2,Task 2""" + + file_path = "test_agents.csv" + with open(file_path, "w") as f: + f.write(csv_content) + + print(f"Created CSV with content:\n{csv_content}") + print(f"CSV file created at: {file_path}") + return file_path + except Exception as e: + logger.error(f"Failed to create test CSV: {str(e)}") + raise + + +def create_test_agent(name: str) -> Agent: + """Create a test agent with specified name.""" + print(f"\nCreating test agent: {name}") + try: + agent = Agent( + agent_name=name, + system_prompt=f"Test prompt for {name}", + model_name="gpt-4o-mini", + max_loops=1, + autosave=True, + verbose=True, + ) + print(f"Created agent: {name}") + return agent + except Exception as e: + logger.error(f"Failed to create agent {name}: {str(e)}") + raise + + +def test_swarm_initialization() -> None: + """Test basic swarm initialization.""" + print("\n[TEST] Starting swarm initialization test") + try: + print("Creating test agents...") + agents = [ + create_test_agent("agent1"), + create_test_agent("agent2"), + ] + + print("Initializing swarm...") + swarm = SpreadSheetSwarm( + name="Test Swarm", + description="Test Description", + agents=agents, + max_loops=2, + ) + + print("Verifying swarm configuration...") + assert swarm.name == "Test Swarm" + assert swarm.description == "Test Description" + assert len(swarm.agents) == 2 + assert swarm.max_loops == 2 + + print("✅ Swarm initialization test PASSED") + except Exception as e: + logger.error(f"❌ Swarm initialization test FAILED: {str(e)}") + raise + + +async def test_load_from_csv() -> None: + """Test loading agent configurations from CSV.""" + print("\n[TEST] Starting CSV loading test") + try: + csv_path = create_test_csv() + print("Initializing swarm with CSV...") + swarm = SpreadSheetSwarm(load_path=csv_path) + + print("Loading configurations...") + await swarm._load_from_csv() + + print("Verifying loaded configurations...") + assert len(swarm.agents) == 2 + assert len(swarm.agent_configs) == 2 + assert "test_agent_1" in swarm.agent_configs + assert "test_agent_2" in swarm.agent_configs + + os.remove(csv_path) + print(f"Cleaned up test file: {csv_path}") + + print("✅ CSV loading test PASSED") + except Exception as e: + logger.error(f"❌ CSV loading test FAILED: {str(e)}") + raise + + +async def test_run_tasks() -> None: + """Test running tasks with multiple agents.""" + print("\n[TEST] Starting task execution test") + try: + print("Setting up test swarm...") + agents = [ + create_test_agent("agent1"), + create_test_agent("agent2"), + ] + swarm = SpreadSheetSwarm(agents=agents, max_loops=1) + + test_task = "Test task for all agents" + print(f"Running test task: {test_task}") + await swarm._run_tasks(test_task) + + print("Verifying task execution...") + assert swarm.metadata.tasks_completed == 2 + assert len(swarm.metadata.outputs) == 2 + + print("✅ Task execution test PASSED") + except Exception as e: + logger.error(f"❌ Task execution test FAILED: {str(e)}") + raise + + +def test_output_tracking() -> None: + """Test tracking of task outputs.""" + print("\n[TEST] Starting output tracking test") + try: + print("Creating test swarm...") + swarm = SpreadSheetSwarm(agents=[create_test_agent("agent1")]) + + print("Tracking test output...") + swarm._track_output("agent1", "Test task", "Test result") + + print("Verifying output tracking...") + assert swarm.metadata.tasks_completed == 1 + assert len(swarm.metadata.outputs) == 1 + assert swarm.metadata.outputs[0].agent_name == "agent1" + + print("✅ Output tracking test PASSED") + except Exception as e: + logger.error(f"❌ Output tracking test FAILED: {str(e)}") + raise + + +async def test_save_to_csv() -> None: + """Test saving metadata to CSV.""" + print("\n[TEST] Starting CSV saving test") + try: + print("Setting up test data...") + swarm = SpreadSheetSwarm( + agents=[create_test_agent("agent1")], + save_file_path="test_output.csv", + ) + swarm._track_output("agent1", "Test task", "Test result") + + print("Saving to CSV...") + await swarm._save_to_csv() + + print("Verifying file creation...") + assert os.path.exists(swarm.save_file_path) + + os.remove(swarm.save_file_path) + print("Cleaned up test file") + + print("✅ CSV saving test PASSED") + except Exception as e: + logger.error(f"❌ CSV saving test FAILED: {str(e)}") + raise + + +def test_json_export() -> None: + """Test JSON export functionality.""" + print("\n[TEST] Starting JSON export test") + try: + print("Creating test data...") + swarm = SpreadSheetSwarm(agents=[create_test_agent("agent1")]) + swarm._track_output("agent1", "Test task", "Test result") + + print("Exporting to JSON...") + json_output = swarm.export_to_json() + + print("Verifying JSON output...") + assert isinstance(json_output, str) + assert "run_id" in json_output + assert "tasks_completed" in json_output + + print("✅ JSON export test PASSED") + except Exception as e: + logger.error(f"❌ JSON export test FAILED: {str(e)}") + raise + + +async def run_all_tests() -> None: + """Run all test functions.""" + print("\n" + "=" * 50) + print("Starting SpreadsheetSwarm Test Suite") + print("=" * 50 + "\n") + + try: + # Run synchronous tests + print("Running synchronous tests...") + test_swarm_initialization() + test_output_tracking() + test_json_export() + + # Run asynchronous tests + print("\nRunning asynchronous tests...") + await test_load_from_csv() + await test_run_tasks() + await test_save_to_csv() + + print("\n🎉 All tests completed successfully!") + print("=" * 50) + except Exception as e: + logger.error(f"\n❌ Test suite failed: {str(e)}") + print("=" * 50) + raise + + +if __name__ == "__main__": + # Run all tests + asyncio.run(run_all_tests()) diff --git a/tests/structs/test_swarm_architectures.py b/tests/structs/test_swarm_architectures.py new file mode 100644 index 000000000..8913a1d01 --- /dev/null +++ b/tests/structs/test_swarm_architectures.py @@ -0,0 +1,301 @@ +import asyncio +import time +from typing import List + +from swarms.structs.agent import Agent +from swarms.structs.swarming_architectures import ( + broadcast, + circular_swarm, + exponential_swarm, + geometric_swarm, + grid_swarm, + harmonic_swarm, + linear_swarm, + log_swarm, + mesh_swarm, + one_to_one, + one_to_three, + power_swarm, + pyramid_swarm, + sigmoid_swarm, + sinusoidal_swarm, + staircase_swarm, + star_swarm, +) + + +def create_test_agent(name: str) -> Agent: + """Create a test agent with specified name""" + return Agent( + agent_name=name, + system_prompt=f"You are {name}. Respond with your name and the task you received.", + model_name="gpt-4o-mini", + max_loops=1, + ) + + +def create_test_agents(num_agents: int) -> List[Agent]: + """Create specified number of test agents""" + return [ + create_test_agent(f"Agent{i+1}") for i in range(num_agents) + ] + + +def print_separator(): + print("\n" + "=" * 50 + "\n") + + +def test_circular_swarm(): + """Test and display circular swarm outputs""" + print_separator() + print("CIRCULAR SWARM TEST") + try: + agents = create_test_agents(3) + tasks = [ + "Analyze data", + "Generate report", + "Summarize findings", + ] + + print("Running circular swarm with:") + print(f"Tasks: {tasks}\n") + + result = circular_swarm(agents, tasks) + print("Circular Swarm Outputs:") + for log in result["history"]: + print(f"\nAgent: {log['agent_name']}") + print(f"Task: {log['task']}") + print(f"Response: {log['response']}") + except Exception as e: + print(f"Error: {str(e)}") + + +def test_grid_swarm(): + """Test and display grid swarm outputs""" + print_separator() + print("GRID SWARM TEST") + try: + agents = create_test_agents(4) # 2x2 grid + tasks = ["Task A", "Task B", "Task C", "Task D"] + + print("Running grid swarm with 2x2 grid") + print(f"Tasks: {tasks}\n") + + print(grid_swarm(agents, tasks)) + print( + "Grid Swarm completed - each agent processed tasks in its grid position" + ) + except Exception as e: + print(f"Error: {str(e)}") + + +def test_linear_swarm(): + """Test and display linear swarm outputs""" + print_separator() + print("LINEAR SWARM TEST") + try: + agents = create_test_agents(3) + tasks = ["Research task", "Write content", "Review output"] + + print("Running linear swarm with:") + print(f"Tasks: {tasks}\n") + + result = linear_swarm(agents, tasks) + print("Linear Swarm Outputs:") + for log in result["history"]: + print(f"\nAgent: {log['agent_name']}") + print(f"Task: {log['task']}") + print(f"Response: {log['response']}") + except Exception as e: + print(f"Error: {str(e)}") + + +def test_star_swarm(): + """Test and display star swarm outputs""" + print_separator() + print("STAR SWARM TEST") + try: + agents = create_test_agents(4) # 1 center + 3 peripheral + tasks = ["Coordinate workflow", "Process data"] + + print("Running star swarm with:") + print(f"Center agent: {agents[0].agent_name}") + print( + f"Peripheral agents: {[agent.agent_name for agent in agents[1:]]}" + ) + print(f"Tasks: {tasks}\n") + + result = star_swarm(agents, tasks) + print("Star Swarm Outputs:") + for log in result["history"]: + print(f"\nAgent: {log['agent_name']}") + print(f"Task: {log['task']}") + print(f"Response: {log['response']}") + except Exception as e: + print(f"Error: {str(e)}") + + +def test_mesh_swarm(): + """Test and display mesh swarm outputs""" + print_separator() + print("MESH SWARM TEST") + try: + agents = create_test_agents(3) + tasks = [ + "Analyze data", + "Process information", + "Generate insights", + ] + + print("Running mesh swarm with:") + print(f"Tasks: {tasks}\n") + + result = mesh_swarm(agents, tasks) + print(f"Mesh Swarm Outputs: {result}") + for log in result["history"]: + print(f"\nAgent: {log['agent_name']}") + print(f"Task: {log['task']}") + print(f"Response: {log['response']}") + except Exception as e: + print(f"Error: {str(e)}") + + +def test_pyramid_swarm(): + """Test and display pyramid swarm outputs""" + print_separator() + print("PYRAMID SWARM TEST") + try: + agents = create_test_agents(6) # 1-2-3 pyramid + tasks = [ + "Top task", + "Middle task 1", + "Middle task 2", + "Bottom task 1", + "Bottom task 2", + "Bottom task 3", + ] + + print("Running pyramid swarm with:") + print(f"Tasks: {tasks}\n") + + result = pyramid_swarm(agents, tasks) + print(f"Pyramid Swarm Outputs: {result}") + for log in result["history"]: + print(f"\nAgent: {log['agent_name']}") + print(f"Task: {log['task']}") + print(f"Response: {log['response']}") + except Exception as e: + print(f"Error: {str(e)}") + + +async def test_communication_patterns(): + """Test and display agent communication patterns""" + print_separator() + print("COMMUNICATION PATTERNS TEST") + try: + sender = create_test_agent("Sender") + receiver = create_test_agent("Receiver") + task = "Process and relay this message" + + print("Testing One-to-One Communication:") + result = one_to_one(sender, receiver, task) + print(f"\nOne-to-One Communication Outputs: {result}") + for log in result["history"]: + print(f"\nAgent: {log['agent_name']}") + print(f"Task: {log['task']}") + print(f"Response: {log['response']}") + + print("\nTesting One-to-Three Communication:") + receivers = create_test_agents(3) + await one_to_three(sender, receivers, task) + + print("\nTesting Broadcast Communication:") + broadcast_receivers = create_test_agents(5) + await broadcast(sender, broadcast_receivers, task) + + except Exception as e: + print(f"Error: {str(e)}") + + +def test_mathematical_swarms(): + """Test and display mathematical swarm patterns""" + print_separator() + print("MATHEMATICAL SWARMS TEST") + try: + agents = create_test_agents(8) + base_tasks = ["Calculate", "Process", "Analyze"] + + # Test each mathematical swarm + for swarm_type, swarm_func in [ + ("Power Swarm", power_swarm), + ("Log Swarm", log_swarm), + ("Exponential Swarm", exponential_swarm), + ("Geometric Swarm", geometric_swarm), + ("Harmonic Swarm", harmonic_swarm), + ]: + print(f"\nTesting {swarm_type}:") + tasks = [f"{task} in {swarm_type}" for task in base_tasks] + print(f"Tasks: {tasks}") + swarm_func(agents, tasks.copy()) + + except Exception as e: + print(f"Error: {str(e)}") + + +def test_pattern_swarms(): + """Test and display pattern-based swarms""" + print_separator() + print("PATTERN-BASED SWARMS TEST") + try: + agents = create_test_agents(10) + task = "Process according to pattern" + + for swarm_type, swarm_func in [ + ("Staircase Swarm", staircase_swarm), + ("Sigmoid Swarm", sigmoid_swarm), + ("Sinusoidal Swarm", sinusoidal_swarm), + ]: + print(f"\nTesting {swarm_type}:") + print(f"Task: {task}") + swarm_func(agents, task) + + except Exception as e: + print(f"Error: {str(e)}") + + +def run_all_tests(): + """Run all swarm architecture tests""" + print( + "\n=== Starting Swarm Architectures Test Suite with Outputs ===" + ) + start_time = time.time() + + try: + # Test basic swarm patterns + test_circular_swarm() + test_grid_swarm() + test_linear_swarm() + test_star_swarm() + test_mesh_swarm() + test_pyramid_swarm() + + # Test mathematical and pattern swarms + test_mathematical_swarms() + test_pattern_swarms() + + # Test communication patterns + asyncio.run(test_communication_patterns()) + + end_time = time.time() + duration = round(end_time - start_time, 2) + print("\n=== Test Suite Completed Successfully ===") + print(f"Time taken: {duration} seconds") + + except Exception as e: + print("\n=== Test Suite Failed ===") + print(f"Error: {str(e)}") + raise + + +if __name__ == "__main__": + run_all_tests() diff --git a/tests/structs/test_swarmnetwork.py b/tests/structs/test_swarmnetwork.py deleted file mode 100644 index 9dc6d9030..000000000 --- a/tests/structs/test_swarmnetwork.py +++ /dev/null @@ -1,52 +0,0 @@ -from unittest.mock import Mock, patch - -import pytest - -from swarms.structs.agent import Agent -from swarms.structs.swarm_net import SwarmNetwork - - -@pytest.fixture -def swarm_network(): - agents = [Agent(id=f"Agent_{i}") for i in range(5)] - return SwarmNetwork(agents=agents) - - -def test_swarm_network_init(swarm_network): - assert isinstance(swarm_network.agents, list) - assert len(swarm_network.agents) == 5 - - -@patch("swarms.structs.swarm_net.SwarmNetwork.logger") -def test_run(mock_logger, swarm_network): - swarm_network.run() - assert ( - mock_logger.info.call_count == 10 - ) # 2 log messages per agent - - -def test_run_with_mocked_agents(mocker, swarm_network): - mock_agents = [Mock(spec=Agent) for _ in range(5)] - mocker.patch.object(swarm_network, "agents", mock_agents) - swarm_network.run() - for mock_agent in mock_agents: - assert mock_agent.run.called - - -def test_swarm_network_with_no_agents(): - swarm_network = SwarmNetwork(agents=[]) - assert swarm_network.agents == [] - - -def test_swarm_network_add_agent(swarm_network): - new_agent = Agent(id="Agent_5") - swarm_network.add_agent(new_agent) - assert len(swarm_network.agents) == 6 - assert swarm_network.agents[-1] == new_agent - - -def test_swarm_network_remove_agent(swarm_network): - agent_to_remove = swarm_network.agents[0] - swarm_network.remove_agent(agent_to_remove) - assert len(swarm_network.agents) == 4 - assert agent_to_remove not in swarm_network.agents