From fda8d21a5e875d28726a85710e1dd94ac041b159 Mon Sep 17 00:00:00 2001 From: Your Name Date: Wed, 27 Nov 2024 12:16:00 -0800 Subject: [PATCH] [PACKAGES CLEANUP] --- docs/swarms/install/workspace_manager.md | 7 +- example_async_vs_multithread.py | 8 +- multi_step_tool_agent.py | 352 +++++++++++++++++++++++ ollama_wrapper.py | 198 +++++++++++++ pyproject.toml | 15 +- requirements.txt | 8 +- swarms/__init__.py | 13 +- swarms/structs/mixture_of_agents.py | 4 +- swarms/structs/multi_agent_exec.py | 2 +- 9 files changed, 573 insertions(+), 34 deletions(-) create mode 100644 multi_step_tool_agent.py create mode 100644 ollama_wrapper.py diff --git a/docs/swarms/install/workspace_manager.md b/docs/swarms/install/workspace_manager.md index d2cb4ca35..abdcb2a02 100644 --- a/docs/swarms/install/workspace_manager.md +++ b/docs/swarms/install/workspace_manager.md @@ -14,10 +14,7 @@ Create a `.env` file in your project's root directory to configure the Swarms fr - **Purpose**: Defines the directory where all agent states and execution logs are stored - **Type**: String (path) - **Default**: `./workspace` -- **Example**: -```bash -WORKSPACE_DIR=/path/to/your/workspace -``` +- **Example**: ```bashWORKSPACE_DIR=/path/to/your/workspace``` - **Usage**: - Stores JSON files containing agent states - Maintains execution history @@ -183,4 +180,4 @@ Common issues and solutions: - [Swarms Framework Documentation](https://github.com/kyegomez/swarms) - [Swarms Analytics Dashboard](https://swarms.ai) -- [API Reference](https://swarms.ai/docs/api) \ No newline at end of file +- [API Reference](https://swarms.ai/docs/api) diff --git a/example_async_vs_multithread.py b/example_async_vs_multithread.py index f547abc8b..25d514aa4 100644 --- a/example_async_vs_multithread.py +++ b/example_async_vs_multithread.py @@ -1,6 +1,5 @@ import os import asyncio -import threading from swarms import Agent from swarm_models import OpenAIChat import time @@ -40,18 +39,21 @@ streaming_on=False, ) + # Function to measure time and memory usage def measure_time_and_memory(func): def wrapper(*args, **kwargs): start_time = time.time() result = func(*args, **kwargs) end_time = time.time() - memory_usage = psutil.Process().memory_info().rss / 1024 ** 2 + memory_usage = psutil.Process().memory_info().rss / 1024**2 print(f"Time taken: {end_time - start_time} seconds") print(f"Memory used: {memory_usage} MB") return result + return wrapper + # Function to run the agent asynchronously @measure_time_and_memory async def run_agent_async(): @@ -61,11 +63,13 @@ async def run_agent_async(): ) ) + # Function to run the agent on another thread @measure_time_and_memory def run_agent_thread(): asyncio.run(run_agent_async()) + # Run the agent asynchronously and on another thread to test the speed asyncio.run(run_agent_async()) run_agent_thread() diff --git a/multi_step_tool_agent.py b/multi_step_tool_agent.py new file mode 100644 index 000000000..2a5cac75a --- /dev/null +++ b/multi_step_tool_agent.py @@ -0,0 +1,352 @@ +from typing import List, Dict, Any, Optional, Callable +from dataclasses import dataclass +import json +from datetime import datetime +import inspect +import typing +from typing import Union +from swarms import Agent +from swarm_models import OpenAIChat +from dotenv import load_dotenv + + +@dataclass +class ToolDefinition: + name: str + description: str + parameters: Dict[str, Any] + required_params: List[str] + callable: Optional[Callable] = None + + +@dataclass +class ExecutionStep: + step_id: str + tool_name: str + parameters: Dict[str, Any] + purpose: str + depends_on: List[str] + completed: bool = False + result: Optional[Any] = None + + +def extract_type_hints(func: Callable) -> Dict[str, Any]: + """Extract parameter types from function type hints.""" + return typing.get_type_hints(func) + + +def extract_tool_info(func: Callable) -> ToolDefinition: + """Extract tool information from a callable function.""" + # Get function name + name = func.__name__ + + # Get docstring + description = inspect.getdoc(func) or "No description available" + + # Get parameters and their types + signature = inspect.signature(func) + type_hints = extract_type_hints(func) + + parameters = {} + required_params = [] + + for param_name, param in signature.parameters.items(): + # Skip self parameter for methods + if param_name == "self": + continue + + param_type = type_hints.get(param_name, Any) + + # Handle optional parameters + is_optional = ( + param.default != inspect.Parameter.empty + or getattr(param_type, "__origin__", None) is Union + and type(None) in param_type.__args__ + ) + + if not is_optional: + required_params.append(param_name) + + parameters[param_name] = { + "type": str(param_type), + "default": ( + None + if param.default is inspect.Parameter.empty + else param.default + ), + "required": not is_optional, + } + + return ToolDefinition( + name=name, + description=description, + parameters=parameters, + required_params=required_params, + callable=func, + ) + + +class ToolUsingAgent: + def __init__( + self, + tools: List[Callable], + openai_api_key: str, + model_name: str = "gpt-4", + temperature: float = 0.1, + max_loops: int = 10, + ): + # Convert callable tools to ToolDefinitions + self.available_tools = { + tool.__name__: extract_tool_info(tool) for tool in tools + } + + self.execution_plan: List[ExecutionStep] = [] + self.current_step_index = 0 + self.max_loops = max_loops + + # Initialize the OpenAI model + self.model = OpenAIChat( + openai_api_key=openai_api_key, + model_name=model_name, + temperature=temperature, + ) + + # Create system prompt with tool descriptions + self.system_prompt = self._create_system_prompt() + + self.agent = Agent( + agent_name="Tool-Using-Agent", + system_prompt=self.system_prompt, + llm=self.model, + max_loops=1, + autosave=True, + verbose=True, + saved_state_path="tool_agent_state.json", + context_length=200000, + ) + + def _create_system_prompt(self) -> str: + """Create system prompt with available tools information.""" + tools_description = [] + for tool_name, tool in self.available_tools.items(): + tools_description.append( + f""" + Tool: {tool_name} + Description: {tool.description} + Parameters: {json.dumps(tool.parameters, indent=2)} + Required Parameters: {tool.required_params} + """ + ) + + output = f"""You are an autonomous agent capable of executing complex tasks using available tools. + + Available Tools: + {chr(10).join(tools_description)} + + Follow these protocols: + 1. Create a detailed plan using available tools + 2. Execute each step in order + 3. Handle errors appropriately + 4. Maintain execution state + 5. Return results in structured format + + You must ALWAYS respond in the following JSON format: + {{ + "plan": {{ + "description": "Brief description of the overall plan", + "steps": [ + {{ + "step_number": 1, + "tool_name": "name_of_tool", + "description": "What this step accomplishes", + "parameters": {{ + "param1": "value1", + "param2": "value2" + }}, + "expected_output": "Description of expected output" + }} + ] + }}, + "reasoning": "Explanation of why this plan was chosen" + }} + + Before executing any tool: + 1. Validate all required parameters are present + 2. Verify parameter types match specifications + 3. Check parameter values are within valid ranges/formats + 4. Ensure logical dependencies between steps are met + + If any validation fails: + 1. Return error in JSON format with specific details + 2. Suggest corrections if possible + 3. Do not proceed with execution + + After each step execution: + 1. Verify output matches expected format + 2. Log results and any warnings/errors + 3. Update execution state + 4. Determine if plan adjustment needed + + Error Handling: + 1. Catch and classify all errors + 2. Provide detailed error messages + 3. Suggest recovery steps + 4. Maintain system stability + + The final output must be valid JSON that can be parsed. Always check your response can be parsed as JSON before returning. + """ + return output + + def execute_tool( + self, tool_name: str, parameters: Dict[str, Any] + ) -> Any: + """Execute a tool with given parameters.""" + tool = self.available_tools[tool_name] + if not tool.callable: + raise ValueError( + f"Tool {tool_name} has no associated callable" + ) + + # Convert parameters to appropriate types + converted_params = {} + for param_name, param_value in parameters.items(): + param_info = tool.parameters[param_name] + param_type = eval( + param_info["type"] + ) # Note: Be careful with eval + converted_params[param_name] = param_type(param_value) + + return tool.callable(**converted_params) + + def run(self, task: str) -> Dict[str, Any]: + """Execute the complete task with proper logging and error handling.""" + execution_log = { + "task": task, + "start_time": datetime.utcnow().isoformat(), + "steps": [], + "final_result": None + } + + try: + # Create and execute plan + plan_response = self.agent.run(f"Create a plan for: {task}") + plan_data = json.loads(plan_response) + + # Extract steps from the correct path in JSON + steps = plan_data["plan"]["steps"] # Changed from plan_data["steps"] + + for step in steps: + try: + # Check if parameters need default values + for param_name, param_value in step["parameters"].items(): + if isinstance(param_value, str) and not param_value.replace(".", "").isdigit(): + # If parameter is a description rather than a value, set default + if "income" in param_name.lower(): + step["parameters"][param_name] = 75000.0 + elif "year" in param_name.lower(): + step["parameters"][param_name] = 2024 + elif "investment" in param_name.lower(): + step["parameters"][param_name] = 1000.0 + + # Execute the tool + result = self.execute_tool( + step["tool_name"], + step["parameters"] + ) + + execution_log["steps"].append({ + "step_number": step["step_number"], + "tool": step["tool_name"], + "parameters": step["parameters"], + "success": True, + "result": result, + "description": step["description"] + }) + + except Exception as e: + execution_log["steps"].append({ + "step_number": step["step_number"], + "tool": step["tool_name"], + "parameters": step["parameters"], + "success": False, + "error": str(e), + "description": step["description"] + }) + print(f"Error executing step {step['step_number']}: {str(e)}") + # Continue with next step instead of raising + continue + + # Only mark as success if at least some steps succeeded + successful_steps = [s for s in execution_log["steps"] if s["success"]] + if successful_steps: + execution_log["final_result"] = { + "success": True, + "results": successful_steps, + "reasoning": plan_data.get("reasoning", "No reasoning provided") + } + else: + execution_log["final_result"] = { + "success": False, + "error": "No steps completed successfully", + "plan": plan_data + } + + except Exception as e: + execution_log["final_result"] = { + "success": False, + "error": str(e), + "plan": plan_data if 'plan_data' in locals() else None + } + + execution_log["end_time"] = datetime.utcnow().isoformat() + return execution_log + + +# Example usage +if __name__ == "__main__": + load_dotenv() + + # Example tool functions + def research_ira_requirements() -> Dict[str, Any]: + """Research and return ROTH IRA eligibility requirements.""" + return { + "age_requirement": "Must have earned income", + "income_limits": {"single": 144000, "married": 214000}, + } + + def calculate_contribution_limit( + income: float, tax_year: int + ) -> Dict[str, float]: + """Calculate maximum ROTH IRA contribution based on income and tax year.""" + base_limit = 6000 if tax_year <= 2022 else 6500 + if income > 144000: + return {"limit": 0} + return {"limit": base_limit} + + def find_brokers(min_investment: float) -> List[Dict[str, Any]]: + """Find suitable brokers for ROTH IRA based on minimum investment.""" + return [ + {"name": "Broker A", "min_investment": min_investment}, + { + "name": "Broker B", + "min_investment": min_investment * 1.5, + }, + ] + + # Initialize agent with tools + agent = ToolUsingAgent( + tools=[ + research_ira_requirements, + calculate_contribution_limit, + find_brokers, + ], + openai_api_key="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", + ) + + # Run a task + result = agent.run( + "How can I establish a ROTH IRA to buy stocks and get a tax break? " + "What are the criteria?" + ) + + print(json.dumps(result, indent=2)) diff --git a/ollama_wrapper.py b/ollama_wrapper.py new file mode 100644 index 000000000..61add21ab --- /dev/null +++ b/ollama_wrapper.py @@ -0,0 +1,198 @@ +from typing import Dict, Union, Optional, Any, AsyncGenerator +from dataclasses import dataclass +from loguru import logger +import ollama +import asyncio +import functools +import time +from datetime import datetime + + +@dataclass +class ModelConfig: + """Configuration for model parameters""" + + temperature: float = 0.7 + top_p: float = 0.9 + top_k: int = 40 + repeat_penalty: float = 1.1 + max_tokens: int = 2048 + presence_penalty: float = 0.0 + frequency_penalty: float = 0.0 + + +class OllamaWrapper: + """ + A comprehensive wrapper for the Ollama API with enhanced functionality. + + Features: + - Automatic model loading and verification + - Configurable retry mechanism + - Detailed logging with loguru + - Async and sync support + - Request rate limiting + - Automatic error handling and recovery + + Args: + model_name (str): Name of the Ollama model to use + host (str, optional): Ollama API host. Defaults to "http://localhost:11434" + max_retries (int, optional): Maximum number of retry attempts. Defaults to 3 + timeout (int, optional): Request timeout in seconds. Defaults to 30 + config (ModelConfig, optional): Model configuration parameters + """ + + def __init__( + self, + model_name: str, + host: str = "http://localhost:11434", + max_retries: int = 3, + timeout: int = 30, + config: Optional[ModelConfig] = None, + ): + self.model_name = model_name + self.host = host + self.max_retries = max_retries + self.timeout = timeout + self.config = config or ModelConfig() + + # Initialize clients + self.sync_client = ollama.Client(host=host) + self.async_client = ollama.AsyncClient(host=host) + + # Setup logging + logger.add( + f"ollama_wrapper_{datetime.now().strftime('%Y%m%d')}.log", + rotation="500 MB", + level="INFO", + ) + + # Verify model availability + # self._verify_model() + + def _verify_model(self) -> None: + """Verify if the specified model is available and pull if needed.""" + try: + models = self.sync_client.list() + if not any( + model["name"] == self.model_name + for model in models["models"] + ): + logger.info( + f"Model {self.model_name} not found. Pulling..." + ) + self.sync_client.pull(self.model_name) + except Exception as e: + logger.error(f"Error verifying model: {str(e)}") + raise RuntimeError( + f"Failed to verify/pull model {self.model_name}" + ) + + def _retry_decorator(func): + """Decorator for implementing retry logic with exponential backoff.""" + + @functools.wraps(func) + async def async_wrapper(self, *args, **kwargs): + for attempt in range(self.max_retries): + try: + return await func(self, *args, **kwargs) + except Exception as e: + if attempt == self.max_retries - 1: + logger.error( + f"Final retry attempt failed: {str(e)}" + ) + raise + wait_time = (2**attempt) + (time.time() % 1) + logger.warning( + f"Attempt {attempt + 1} failed. Retrying in {wait_time:.2f}s" + ) + await asyncio.sleep(wait_time) + + @functools.wraps(func) + def sync_wrapper(self, *args, **kwargs): + for attempt in range(self.max_retries): + try: + return func(self, *args, **kwargs) + except Exception as e: + if attempt == self.max_retries - 1: + logger.error( + f"Final retry attempt failed: {str(e)}" + ) + raise + wait_time = (2**attempt) + (time.time() % 1) + logger.warning( + f"Attempt {attempt + 1} failed. Retrying in {wait_time:.2f}s" + ) + time.sleep(wait_time) + + return ( + async_wrapper + if asyncio.iscoroutinefunction(func) + else sync_wrapper + ) + + @_retry_decorator + async def _run( + self, task: str, stream: bool = False + ) -> Union[Dict[str, Any], AsyncGenerator]: + """ + Execute a task using the configured model. + + Args: + task (str): The task/prompt to process + stream (bool, optional): Whether to stream the response. Defaults to False + + Returns: + Union[Dict[str, Any], AsyncGenerator]: Response from the model + """ + logger.info(f"Processing task with {self.model_name}") + try: + messages = [{"role": "user", "content": task}] + response = await self.async_client.chat( + model=self.model_name, + messages=messages, + stream=stream, + options={ + "temperature": self.config.temperature, + "top_p": self.config.top_p, + "top_k": self.config.top_k, + "repeat_penalty": self.config.repeat_penalty, + "max_tokens": self.config.max_tokens, + "presence_penalty": self.config.presence_penalty, + "frequency_penalty": self.config.frequency_penalty, + }, + ) + logger.debug("Response received for task") + return response + except Exception as e: + logger.error(f"Error in run: {str(e)}") + raise + + def run( + self, task: str, stream: bool = False + ) -> Union[Dict[str, Any], AsyncGenerator]: + """ + Synchronous version of run() method. + + Args: + task (str): The task/prompt to process + stream (bool, optional): Whether to stream the response. Defaults to False + + Returns: + Union[Dict[str, Any], AsyncGenerator]: Response from the model + """ + return asyncio.run(self._run(task, stream)) + + def __call__( + self, task: str, stream: bool = False + ) -> Union[Dict[str, Any], AsyncGenerator]: + """ + Callable wrapper around run() method. + + Args: + task (str): The task/prompt to process + stream (bool, optional): Whether to stream the response. Defaults to False + + Returns: + Union[Dict[str, Any], AsyncGenerator]: Response from the model + """ + return self.run(task, stream) diff --git a/pyproject.toml b/pyproject.toml index d8d06c615..e27f7fed9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,7 +5,7 @@ build-backend = "poetry.core.masonry.api" [tool.poetry] name = "swarms" -version = "6.2.9" +version = "6.3.0" description = "Swarms - Pytorch" license = "MIT" authors = ["Kye Gomez "] @@ -52,23 +52,18 @@ python = ">=3.10,<4.0" torch = ">=2.1.1,<3.0" transformers = ">= 4.39.0, <5.0.0" asyncio = ">=3.4.3,<4.0" -langchain-community = "0.0.29" -langchain-experimental = "0.0.55" backoff = "2.2.1" toml = "*" pypdf = "4.3.1" loguru = "0.7.2" pydantic = "2.8.2" tenacity = "8.5.0" -Pillow = "10.4.0" psutil = "*" sentry-sdk = {version = "*", extras = ["http"]} # Updated here python-dotenv = "*" PyYAML = "*" docstring_parser = "0.16" -fastapi = "*" -openai = ">=1.30.1,<2.0" -termcolor = "*" +# termcolor = "*" tiktoken = "*" networkx = "*" swarms-memory = "*" @@ -92,13 +87,13 @@ types-toml = "^0.10.8.1" types-pytz = ">=2023.3,<2025.0" types-chardet = "^5.0.4.6" mypy-protobuf = "^3.0.0" +pytest = "*" [tool.poetry.group.test.dependencies] pytest = "^8.1.1" -termcolor = "^2.4.0" -pandas = "^2.2.2" -fastapi = ">=0.110.1,<0.116.0" +black = "*" +ruff = "*" [tool.ruff] line-length = 70 diff --git a/requirements.txt b/requirements.txt index d422222b9..ff11557f4 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,8 +2,6 @@ torch>=2.1.1,<3.0 transformers>=4.39.0,<5.0.0 asyncio>=3.4.3,<4.0 -langchain-community==0.0.28 -langchain-experimental==0.0.55 backoff==2.2.1 toml pypdf==4.3.1 @@ -11,12 +9,10 @@ ratelimit==2.2.1 loguru==0.7.2 pydantic==2.8.2 tenacity==8.5.0 -Pillow==10.4.0 rich psutil sentry-sdk python-dotenv -opencv-python-headless PyYAML docstring_parser==0.16 black>=23.1,<25.0 @@ -26,14 +22,12 @@ types-pytz>=2023.3,<2025.0 types-chardet>=5.0.4.6 mypy-protobuf>=3.0.0 pytest>=8.1.1 -termcolor>=2.4.0 pandas>=2.2.2 fastapi>=0.110.1 networkx swarms-memory -pre-commit aiofiles swarm-models clusterops reportlab -doc-master +doc-master \ No newline at end of file diff --git a/swarms/__init__.py b/swarms/__init__.py index 22fa61301..345ffb842 100644 --- a/swarms/__init__.py +++ b/swarms/__init__.py @@ -3,10 +3,7 @@ from dotenv import load_dotenv from loguru import logger -try: - load_dotenv() -except Exception as e: - logger.error(f"Failed to load environment variables: {e}") +load_dotenv() # More reliable string comparison if os.getenv("SWARMS_VERBOSE_GLOBAL", "True").lower() == "false": @@ -18,19 +15,23 @@ activate_sentry, ) # noqa: E402 + # Run telemetry functions concurrently with error handling def run_telemetry(): try: - with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor: + with concurrent.futures.ThreadPoolExecutor( + max_workers=2 + ) as executor: future_bootup = executor.submit(bootup) future_sentry = executor.submit(activate_sentry) - + # Wait for completion and check for exceptions future_bootup.result() future_sentry.result() except Exception as e: logger.error(f"Error running telemetry functions: {e}") + run_telemetry() from swarms.agents import * # noqa: E402, F403 diff --git a/swarms/structs/mixture_of_agents.py b/swarms/structs/mixture_of_agents.py index f80701ef3..e91d565f9 100644 --- a/swarms/structs/mixture_of_agents.py +++ b/swarms/structs/mixture_of_agents.py @@ -86,9 +86,7 @@ def __init__( self.input_schema = MixtureOfAgentsInput( name=name, description=description, - agents=[ - agent.to_dict() for agent in self.agents - ], + agents=[agent.to_dict() for agent in self.agents], aggregator_agent=aggregator_agent.to_dict(), aggregator_system_prompt=self.aggregator_system_prompt, layers=self.layers, diff --git a/swarms/structs/multi_agent_exec.py b/swarms/structs/multi_agent_exec.py index d733f49f0..b66af8a5e 100644 --- a/swarms/structs/multi_agent_exec.py +++ b/swarms/structs/multi_agent_exec.py @@ -414,7 +414,7 @@ def run_agents_with_tasks_concurrently( List[Any]: A list of outputs from each agent execution. """ # Make the first agent not use the ifrs - + if no_clusterops: return _run_agents_with_tasks_concurrently( agents, tasks, batch_size, max_workers