Skip to content

Commit

Permalink
Add sample distributed group chat notebook (microsoft#3759)
Browse files Browse the repository at this point in the history
* first notebook for distributed rock, paper and scissors

* add distributed group chat notebook

* fix formatting

* fix pipeline issues

* fix formatting issue

* promote distributed group chat notebook into a multiple files

* fix docs

* fix docs

* fix pyright

* Apply suggestions from code review

Add PR review suggestions

Co-authored-by: Eric Zhu <[email protected]>

* improving group chat manager from round robin to LLM based

Signed-off-by: Mohammad Mazraeh <[email protected]>

* remove lfs file to fix

Signed-off-by: Mohammad Mazraeh <[email protected]>

* add gut back using lfs

Signed-off-by: Mohammad Mazraeh <[email protected]>

* re-add gif using lfs

Signed-off-by: Mohammad Mazraeh <[email protected]>

* remove gitattributes

Signed-off-by: Mohammad Mazraeh <[email protected]>

* redo git lfs add

---------

Signed-off-by: Mohammad Mazraeh <[email protected]>
Co-authored-by: Ryan Sweet <[email protected]>
Co-authored-by: Eric Zhu <[email protected]>
  • Loading branch information
3 people authored Oct 28, 2024
1 parent c06f8d3 commit 0052e81
Show file tree
Hide file tree
Showing 12 changed files with 561 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,18 @@
"# To keep the host service running until a termination signal (e.g., SIGTERM)\n",
"# await host.stop_when_signal()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Next Steps\n",
"To see complete examples of using distributed runtime, please take a look at the following samples:\n",
"\n",
"- [Distributed Workers](https://github.com/microsoft/autogen/tree/main/python/packages/autogen-core/samples/worker) \n",
"- [Distributed Semantic Router](https://github.com/microsoft/autogen/tree/main/python/packages/autogen-core/samples/semantic_router) \n",
"- [Distributed Group Chat](https://github.com/microsoft/autogen/tree/main/python/packages/autogen-core/samples/distributed_group_chat) \n"
]
}
],
"metadata": {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
distributed_group_chat.gif filter=lfs diff=lfs merge=lfs -text
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
# Distributed Group Chat

from autogen_core.application import WorkerAgentRuntimeHost

This example runs a gRPC server using [WorkerAgentRuntimeHost](../../src/autogen_core/application/_worker_runtime_host.py) and instantiates three distributed runtimes using [WorkerAgentRuntime](../../src/autogen_core/application/_worker_runtime.py). These runtimes connect to the gRPC server as hosts and facilitate a round-robin distributed group chat. This example leverages the [Azure OpenAI Service](https://azure.microsoft.com/en-us/products/ai-services/openai-service) to implement writer and editor LLM agents. Agents are instructed to provide concise answers, as the primary goal of this example is to showcase the distributed runtime rather than the quality of agent responses.

## Setup

### Setup Python Environment

You should run this project using the same virtual environment created for it. Instructions are provided in the [README](../../../../../../../../README.md).

### General Configuration

In the `config.yaml` file, you can configure the `client_config` section to connect the code to the Azure OpenAI Service.

### Authentication

The recommended method for authentication is through Azure Active Directory (AAD), as explained in [Model Clients - Azure AI](https://microsoft.github.io/autogen/dev/user-guide/core-user-guide/framework/model-clients.html#azure-openai). This example works with both the AAD approach (recommended) and by providing the `api_key` in the `config.yaml` file.

## Run

### Run Through Scripts

The [run.sh](./run.sh) file provides commands to run the host and agents using [tmux](https://github.com/tmux/tmux/wiki). The steps for this approach are:

1. Install tmux.
2. Activate the Python environment: `source .venv/bin/activate`.
3. Run the bash script: `./run.sh`.

Here is a screen recording of the execution:

![Distributed Group Chat Sample Run](./distributed_group_chat.gif)

**Note**: Some `asyncio.sleep` commands have been added to the example code to make the `./run.sh` execution look sequential and visually easy to follow. In practice, these lines are not necessary.

### Run Individual Files

If you prefer to run Python files individually, follow these steps. Note that each step must be run in a different terminal process, and the virtual environment should be activated using `source .venv/bin/activate`.

1. `python run_host.py`: Starts the host and listens for agent connections.
2. `python run_editor.py`: Starts the editor agent and connects it to the host.
3. `python run_writer.py`: Starts the writer agent and connects it to the host.
4. `python run_group_chat_manager.py`: Starts the group chat manager and sends a message to initiate the writer agent.

## What's Going On?

The general flow of this example is as follows:

1. The Group Chat Manager sends a `RequestToSpeak` request to the `writer_agent`.
2. The `writer_agent` writes a short sentence into the group chat topic.
3. The `editor_agent` receives the message in the group chat topic and updates its memory.
4. The Group Chat Manager receives the message sent by the writer into the group chat simultaneously and sends the next participant, the `editor_agent`, a `RequestToSpeak` message.
5. The `editor_agent` sends its feedback to the group chat topic.
6. The `writer_agent` receives the feedback and updates its memory.
7. The Group Chat Manager receives the message simultaneously and repeats the loop from step 1.

Here is an illustration of the system developed in this example:

```mermaid
graph TD;
subgraph Host
A1[GRPC Server]
wt[Writer Topic]
et[Editor Topic]
gct[Group Chat Topic]
end
subgraph Distributed Writer Runtime
writer_agent[Writer Agent] --> A1
wt -.->|2 - Subscription| writer_agent
gct -.->|4 - Subscription| writer_agent
writer_agent -.->|3 - Publish: Group Chat Message| gct
end
subgraph Distributed Editor Runtime
editor_agent[Editor Agent] --> A1
et -.->|6 - Subscription| editor_agent
gct -.->|4 - Subscription| editor_agent
editor_agent -.->|7 - Publish: Group Chat Message| gct
end
subgraph Distributed Group Chat Manager Runtime
group_chat_manager[Group Chat Manager Agent] --> A1
gct -.->|4 - Subscription| group_chat_manager
group_chat_manager -.->|1 - Request To Speak| wt
group_chat_manager -.->|5 - Request To Speak| et
end
style wt fill:#beb2c3,color:#000
style et fill:#beb2c3,color:#000
style gct fill:#beb2c3,color:#000
style writer_agent fill:#b7c4d7,color:#000
style editor_agent fill:#b7c4d7,color:#000
style group_chat_manager fill:#b7c4d7,color:#000
```
138 changes: 138 additions & 0 deletions python/packages/autogen-core/samples/distributed-group-chat/_agents.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
from typing import List

from _types import GroupChatMessage, RequestToSpeak
from autogen_core.base import MessageContext
from autogen_core.components import DefaultTopicId, RoutedAgent, message_handler
from autogen_core.components.models import (
AssistantMessage,
ChatCompletionClient,
LLMMessage,
SystemMessage,
UserMessage,
)
from rich.console import Console
from rich.markdown import Markdown


class BaseGroupChatAgent(RoutedAgent):
"""A group chat participant using an LLM."""

def __init__(
self,
description: str,
group_chat_topic_type: str,
model_client: ChatCompletionClient,
system_message: str,
) -> None:
super().__init__(description=description)
self._group_chat_topic_type = group_chat_topic_type
self._model_client = model_client
self._system_message = SystemMessage(system_message)
self._chat_history: List[LLMMessage] = []

@message_handler
async def handle_message(self, message: GroupChatMessage, ctx: MessageContext) -> None:
self._chat_history.extend(
[
UserMessage(content=f"Transferred to {message.body.source}", source="system"), # type: ignore[union-attr]
message.body,
]
)

@message_handler
async def handle_request_to_speak(self, message: RequestToSpeak, ctx: MessageContext) -> None:
self._chat_history.append(
UserMessage(content=f"Transferred to {self.id.type}, adopt the persona immediately.", source="system")
)
completion = await self._model_client.create([self._system_message] + self._chat_history)
assert isinstance(completion.content, str)
self._chat_history.append(AssistantMessage(content=completion.content, source=self.id.type))
Console().print(Markdown(f"**{self.id.type}**: {completion.content}\n"))

await self.publish_message(
GroupChatMessage(body=UserMessage(content=completion.content, source=self.id.type)),
topic_id=DefaultTopicId(type=self._group_chat_topic_type),
)


class GroupChatManager(RoutedAgent):
def __init__(
self,
model_client: ChatCompletionClient,
participant_topic_types: List[str],
participant_descriptions: List[str],
max_rounds: int = 3,
) -> None:
super().__init__("Group chat manager")
self._model_client = model_client
self._num_rounds = 0
self._participant_topic_types = participant_topic_types
self._chat_history: List[GroupChatMessage] = []
self._max_rounds = max_rounds
self.console = Console()
self._participant_descriptions = participant_descriptions
self._previous_participant_topic_type: str | None = None

@message_handler
async def handle_message(self, message: GroupChatMessage, ctx: MessageContext) -> None:
assert isinstance(message.body, UserMessage)

self._chat_history.append(message.body) # type: ignore[reportargumenttype]

# Format message history.
messages: List[str] = []
for msg in self._chat_history:
if isinstance(msg.content, str): # type: ignore[attr-defined]
messages.append(f"{msg.source}: {msg.content}") # type: ignore[attr-defined]
elif isinstance(msg.content, list): # type: ignore[attr-defined]
messages.append(f"{msg.source}: {', '.join(msg.content)}") # type: ignore[attr-defined,reportUnknownArgumentType]
history = "\n".join(messages)
# Format roles.
roles = "\n".join(
[
f"{topic_type}: {description}".strip()
for topic_type, description in zip(
self._participant_topic_types, self._participant_descriptions, strict=True
)
if topic_type != self._previous_participant_topic_type
]
)
participants = str(
[
topic_type
for topic_type in self._participant_topic_types
if topic_type != self._previous_participant_topic_type
]
)

selector_prompt = f"""You are in a role play game. The following roles are available:
{roles}.
Read the following conversation. Then select the next role from {participants} to play. Only return the role.
{history}
Read the above conversation. Then select the next role from {participants} to play. if you think it's enough talking (for example they have talked for {self._max_rounds} rounds), return 'FINISH'.
"""
system_message = SystemMessage(selector_prompt)
completion = await self._model_client.create([system_message], cancellation_token=ctx.cancellation_token)
assert isinstance(completion.content, str)

if completion.content.upper() == "FINISH":
self.console.print(
Markdown(
f"\n{'-'*80}\n Manager ({id(self)}): I think it's enough iterations on the story! Thanks for collaborating!"
)
)
return

selected_topic_type: str
for topic_type in self._participant_topic_types:
if topic_type.lower() in completion.content.lower():
selected_topic_type = topic_type
self._previous_participant_topic_type = selected_topic_type
self.console.print(
Markdown(f"\n{'-'*80}\n Manager ({id(self)}): Asking `{selected_topic_type}` to speak")
)
await self.publish_message(RequestToSpeak(), DefaultTopicId(type=selected_topic_type))
return
raise ValueError(f"Invalid role selected: {completion.content}")
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
from autogen_core.components.models import (
LLMMessage,
)
from autogen_core.components.models.config import AzureOpenAIClientConfiguration
from pydantic import BaseModel


class GroupChatMessage(BaseModel):
"""Implements a sample message sent by an LLM agent"""

body: LLMMessage


class RequestToSpeak(BaseModel):
"""Message type for agents to speak"""

pass


# Define Host configuration model
class HostConfig(BaseModel):
hostname: str
port: int

@property
def address(self) -> str:
return f"{self.hostname}:{self.port}"


# Define GroupChatManager configuration model
class GroupChatManagerConfig(BaseModel):
topic_type: str
max_rounds: int


# Define WriterAgent configuration model
class ChatAgentConfig(BaseModel):
topic_type: str
description: str
system_message: str


# Define the overall AppConfig model
class AppConfig(BaseModel):
host: HostConfig
group_chat_manager: GroupChatManagerConfig
writer_agent: ChatAgentConfig
editor_agent: ChatAgentConfig
client_config: AzureOpenAIClientConfiguration = None # type: ignore[assignment] # This was required to do custom instantiation in `load_config``
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import os
from typing import Any, Iterable, Type

import yaml
from _types import AppConfig
from autogen_core.base import MessageSerializer, try_get_known_serializers_for_type
from autogen_core.components.models.config import AzureOpenAIClientConfiguration
from azure.identity import DefaultAzureCredential, get_bearer_token_provider


def load_config(file_path: str = os.path.join(os.path.dirname(__file__), "config.yaml")) -> AppConfig:
model_client = {}
with open(file_path, "r") as file:
config_data = yaml.safe_load(file)
model_client = config_data["client_config"]
del config_data["client_config"]
app_config = AppConfig(**config_data)
# This was required as it couldn't automatically instantiate AzureOpenAIClientConfiguration

aad_params = {}
if len(model_client.get("api_key", "")) == 0:
aad_params["azure_ad_token_provider"] = get_bearer_token_provider(
DefaultAzureCredential(), "https://cognitiveservices.azure.com/.default"
)

app_config.client_config = AzureOpenAIClientConfiguration(**model_client, **aad_params) # type: ignore[typeddict-item]
return app_config


def get_serializers(types: Iterable[Type[Any]]) -> list[MessageSerializer[Any]]:
serializers = []
for type in types:
serializers.extend(try_get_known_serializers_for_type(type)) # type: ignore
return serializers # type: ignore [reportUnknownVariableType]
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
host:
hostname: "localhost"
port: 50060

group_chat_manager:
topic_type: "group_chat"
max_rounds: 7

writer_agent:
topic_type: "Writer"
description: "Writer for creating any text content."
system_message: "You are a one sentence Writer and provide one line content each time"

editor_agent:
topic_type: "Editor"
description: "Editor for planning and reviewing the content."
system_message: "You are an Editor. You provide just max 10 words as feedback on writers content."

client_config:
model: "gpt-4o"
azure_endpoint: "https://{your-custom-endpoint}.openai.azure.com"
azure_deployment: "{your-azure-deployment}"
api_version: "2024-08-01-preview"
api_key: ""
model_capabilities:
vision: True
function_calling: True
json_output: True
26 changes: 26 additions & 0 deletions python/packages/autogen-core/samples/distributed-group-chat/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#!/bin/bash
# # Start a new tmux session named 'distributed_group_chat'
tmux new-session -d -s distributed_group_chat

# # Split the terminal into 2 vertical panes
tmux split-window -h

# # Split the left pane horizontally
tmux select-pane -t distributed_group_chat:0.0
tmux split-window -v

# # Split the right pane horizontally
tmux select-pane -t distributed_group_chat:0.2
tmux split-window -v

# Select the first pane to start
tmux select-pane -t distributed_group_chat:0.0

# Activate the virtual environment and run the scripts in each pane
tmux send-keys -t distributed_group_chat:0.0 "python run_host.py" C-m
tmux send-keys -t distributed_group_chat:0.2 "python run_writer_agent.py" C-m
tmux send-keys -t distributed_group_chat:0.3 "python run_editor_agent.py" C-m
tmux send-keys -t distributed_group_chat:0.1 "python run_group_chat_manager.py" C-m

# # Attach to the session
tmux attach-session -t distributed_group_chat
Loading

0 comments on commit 0052e81

Please sign in to comment.