diff --git a/swarms/schemas/swarm_output_schemas.py b/swarms/schemas/swarm_output_schemas.py new file mode 100644 index 000000000..fe4fd1ee5 --- /dev/null +++ b/swarms/schemas/swarm_output_schemas.py @@ -0,0 +1,79 @@ +from typing import List, Dict, Any, Optional +from pydantic import BaseModel, Field +from datetime import datetime + +class AgentStep(BaseModel): + """Schema for individual agent execution steps""" + role: str = Field(description="Role of the agent in this step") + content: str = Field(description="Content/response from the agent") + +class AgentOutput(BaseModel): + """Schema for individual agent outputs""" + agent_name: str = Field(description="Name of the agent") + steps: List[AgentStep] = Field(description="List of execution steps by this agent") + metadata: Optional[Dict[str, Any]] = Field(default=None, description="Additional metadata about the agent's execution") + +class SwarmInput(BaseModel): + """Schema for swarm input configuration""" + swarm_id: str = Field(description="Unique identifier for the swarm execution") + name: str = Field(description="Name of the swarm type") + flow: str = Field(description="Agent execution flow configuration") + description: Optional[str] = Field(default=None, description="Description of the swarm execution") + +class SwarmOutput(BaseModel): + """Unified schema for all swarm type outputs""" + input: SwarmInput = Field(description="Input configuration for the swarm") + outputs: List[AgentOutput] = Field(description="List of outputs from all agents") + time: float = Field(description="Timestamp of execution") + execution_time: Optional[float] = Field(default=None, description="Total execution time in seconds") + metadata: Optional[Dict[str, Any]] = Field(default=None, description="Additional swarm execution metadata") + + def format_output(self) -> str: + """Format the swarm output into a readable string""" + output = f"Workflow Execution Details\n\n" + output += f"Swarm ID: `{self.input.swarm_id}`\n" + output += f"Swarm Name: `{self.input.name}`\n" + output += f"Agent Flow: `{self.input.flow}`\n\n---\n" + output += f"Agent Task Execution\n\n" + + for i, agent_output in enumerate(self.outputs, start=1): + output += f"Run {i} (Agent: `{agent_output.agent_name}`)\n\n" + + for j, step in enumerate(agent_output.steps, start=1): + if step.role.strip() != "System:": + output += f"Step {j}:\n" + output += f"Response: {step.content}\n\n" + + if self.execution_time: + output += f"Overall Execution Time: `{self.execution_time:.2f}s`" + + return output + +class MixtureOfAgentsOutput(SwarmOutput): + """Schema specific to MixtureOfAgents output""" + aggregator_summary: Optional[str] = Field(default=None, description="Aggregated summary from all agents") + + def format_output(self) -> str: + """Format MixtureOfAgents output""" + output = super().format_output() + if self.aggregator_summary: + output += f"\nAggregated Summary:\n{self.aggregator_summary}\n{'=' * 50}\n" + return output + +class SpreadsheetSwarmOutput(SwarmOutput): + """Schema specific to SpreadsheetSwarm output""" + csv_data: List[List[str]] = Field(description="CSV data in list format") + + def format_output(self) -> str: + """Format SpreadsheetSwarm output""" + output = "### Spreadsheet Swarm Output ###\n\n" + if self.csv_data: + # Create markdown table + header = self.csv_data[0] + output += "| " + " | ".join(header) + " |\n" + output += "| " + " | ".join(["---"] * len(header)) + " |\n" + + for row in self.csv_data[1:]: + output += "| " + " | ".join(row) + " |\n" + + return output \ No newline at end of file diff --git a/swarms/structs/agent.py b/swarms/structs/agent.py index 97fe3f3d8..dcef5633a 100644 --- a/swarms/structs/agent.py +++ b/swarms/structs/agent.py @@ -38,6 +38,14 @@ ChatCompletionResponseChoice, ChatMessageResponse, ) +from swarms.schemas.swarm_output_schemas import ( + SwarmOutput, + SwarmInput, + AgentOutput, + AgentStep, + MixtureOfAgentsOutput, + SpreadsheetSwarmOutput +) from swarms.structs.concat import concat_strings from swarms.structs.conversation import Conversation from swarms.structs.safe_loading import ( @@ -210,6 +218,7 @@ class Agent: run_async_concurrent: Run the agent asynchronously and concurrently construct_dynamic_prompt: Construct the dynamic prompt handle_artifacts: Handle artifacts + create_swarm_output: Create standardized swarm output Examples: @@ -2596,3 +2605,64 @@ def showcase_config(self): return formatter.print_table( f"Agent: {self.agent_name} Configuration", config_dict ) + + def create_swarm_output( + self, + swarm_type: str, + swarm_id: str, + agent_outputs: List[Dict[str, Any]], + flow: Optional[str] = None, + execution_time: Optional[float] = None, + metadata: Optional[Dict[str, Any]] = None + ) -> SwarmOutput: + """Create standardized swarm output""" + + # Create input config + input_config = SwarmInput( + swarm_id=swarm_id, + name=swarm_type, + flow=flow or "->".join([out["agent_name"] for out in agent_outputs]), + ) + + # Create agent outputs + formatted_outputs = [] + for agent_out in agent_outputs: + steps = [ + AgentStep(role=step["role"], content=step["content"]) + for step in agent_out.get("steps", []) + ] + formatted_outputs.append( + AgentOutput( + agent_name=agent_out["agent_name"], + steps=steps, + metadata=agent_out.get("metadata") + ) + ) + + # Create appropriate output type based on swarm_type + if swarm_type == "MixtureOfAgents": + return MixtureOfAgentsOutput( + input=input_config, + outputs=formatted_outputs, + time=time.time(), + execution_time=execution_time, + metadata=metadata, + aggregator_summary=metadata.get("aggregator_summary") if metadata else None + ) + elif swarm_type == "SpreadSheetSwarm": + return SpreadsheetSwarmOutput( + input=input_config, + outputs=formatted_outputs, + time=time.time(), + execution_time=execution_time, + metadata=metadata, + csv_data=metadata.get("csv_data", []) if metadata else [] + ) + else: + return SwarmOutput( + input=input_config, + outputs=formatted_outputs, + time=time.time(), + execution_time=execution_time, + metadata=metadata + )