Skip to content

Commit

Permalink
Merge branch 'main' into kostapetan/hello-distributed
Browse files Browse the repository at this point in the history
  • Loading branch information
rysweet authored Nov 27, 2024
2 parents 2e3b9a6 + 52790a8 commit f985f7d
Show file tree
Hide file tree
Showing 91 changed files with 4,093 additions and 4,674 deletions.
13 changes: 13 additions & 0 deletions docs/design/02 - Topics.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,16 @@ Agents are able to handle certain types of messages. This is an internal detail

> [!NOTE]
> This might be revisited based on scaling and performance considerations.
## Well known topic types

Agents should subscribe via a prefix subscription to the `{AgentType}:` topic as a direct message channel for the agent type.

For this subscription source should map directly to agent key.

This subscription will therefore receive all events for the following well known topics:

- `{AgentType}:` - General purpose direct messages. These should be routed to the approriate message handler.
- `{AgentType}:rpc_request` - RPC request messages. These should be routed to the approriate RPC handler.
- `{AgentType}:rpc_response={RequestId}` - RPC response messages. These should be routed back to the response future of the caller.
- `{AgentType}:error={RequestId}` - Error message that corresponds to the given request.
3 changes: 1 addition & 2 deletions protos/agent_worker.proto
Original file line number Diff line number Diff line change
Expand Up @@ -117,12 +117,11 @@ message Message {
oneof message {
RpcRequest request = 1;
RpcResponse response = 2;
Event event = 3;
cloudevent.CloudEvent cloudEvent = 3;
RegisterAgentTypeRequest registerAgentTypeRequest = 4;
RegisterAgentTypeResponse registerAgentTypeResponse = 5;
AddSubscriptionRequest addSubscriptionRequest = 6;
AddSubscriptionResponse addSubscriptionResponse = 7;
cloudevent.CloudEvent cloudEvent = 8;
}
}

1 change: 1 addition & 0 deletions python/packages/autogen-agentchat/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ include = ["src/**", "tests/*.py"]
[tool.pyright]
extends = "../../pyproject.toml"
include = ["src", "tests"]
reportDeprecated = true

[tool.pytest.ini_options]
minversion = "6.0"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
AgentMessage,
ChatMessage,
HandoffMessage,
MultiModalMessage,
TextMessage,
ToolCallMessage,
ToolCallResultMessage,
Expand Down Expand Up @@ -113,7 +114,10 @@ class AssistantAgent(BaseChatAgent):
async def main() -> None:
model_client = OpenAIChatCompletionClient(model="gpt-4o")
model_client = OpenAIChatCompletionClient(
model="gpt-4o",
# api_key = "your_openai_api_key"
)
agent = AssistantAgent(name="assistant", model_client=model_client)
response = await agent.on_messages(
Expand Down Expand Up @@ -144,7 +148,10 @@ async def get_current_time() -> str:
async def main() -> None:
model_client = OpenAIChatCompletionClient(model="gpt-4o")
model_client = OpenAIChatCompletionClient(
model="gpt-4o",
# api_key = "your_openai_api_key"
)
agent = AssistantAgent(name="assistant", model_client=model_client, tools=[get_current_time])
await Console(
Expand All @@ -156,6 +163,39 @@ async def main() -> None:
asyncio.run(main())
The following example shows how to use `o1-mini` model with the assistant agent.
.. code-block:: python
import asyncio
from autogen_core.base import CancellationToken
from autogen_ext.models import OpenAIChatCompletionClient
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.messages import TextMessage
async def main() -> None:
model_client = OpenAIChatCompletionClient(
model="o1-mini",
# api_key = "your_openai_api_key"
)
# The system message is not supported by the o1 series model.
agent = AssistantAgent(name="assistant", model_client=model_client, system_message=None)
response = await agent.on_messages(
[TextMessage(content="What is the capital of France?", source="user")], CancellationToken()
)
print(response)
asyncio.run(main())
.. note::
The `o1-preview` and `o1-mini` models do not support system message and function calling.
So the `system_message` should be set to `None` and the `tools` and `handoffs` should not be set.
See `o1 beta limitations <https://platform.openai.com/docs/guides/reasoning#beta-limitations>`_ for more details.
"""

def __init__(
Expand All @@ -166,13 +206,19 @@ def __init__(
tools: List[Tool | Callable[..., Any] | Callable[..., Awaitable[Any]]] | None = None,
handoffs: List[Handoff | str] | None = None,
description: str = "An agent that provides assistance with ability to use tools.",
system_message: str = "You are a helpful AI assistant. Solve tasks using your tools. Reply with TERMINATE when the task has been completed.",
system_message: str
| None = "You are a helpful AI assistant. Solve tasks using your tools. Reply with TERMINATE when the task has been completed.",
):
super().__init__(name=name, description=description)
self._model_client = model_client
self._system_messages = [SystemMessage(content=system_message)]
if system_message is None:
self._system_messages = []
else:
self._system_messages = [SystemMessage(content=system_message)]
self._tools: List[Tool] = []
if tools is not None:
if model_client.capabilities["function_calling"] is False:
raise ValueError("The model does not support function calling.")
for tool in tools:
if isinstance(tool, Tool):
self._tools.append(tool)
Expand All @@ -192,6 +238,8 @@ def __init__(
self._handoff_tools: List[Tool] = []
self._handoffs: Dict[str, Handoff] = {}
if handoffs is not None:
if model_client.capabilities["function_calling"] is False:
raise ValueError("The model does not support function calling, which is needed for handoffs.")
for handoff in handoffs:
if isinstance(handoff, str):
handoff = Handoff(target=handoff)
Expand Down Expand Up @@ -229,6 +277,8 @@ async def on_messages_stream(
) -> AsyncGenerator[AgentMessage | Response, None]:
# Add messages to the model context.
for msg in messages:
if isinstance(msg, MultiModalMessage) and self._model_client.capabilities["vision"] is False:
raise ValueError("The model does not support vision.")
self._model_context.append(UserMessage(content=msg.content, source=msg.source))

# Inner messages.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
MessageContext,
)
from autogen_core.components import ClosureAgent, TypeSubscription
from autogen_core.components._closure_agent import ClosureContext

from ... import EVENT_LOGGER_NAME
from ...base import ChatAgent, TaskResult, Team, TerminationCondition
Expand Down Expand Up @@ -139,8 +140,7 @@ async def _init(self, runtime: AgentRuntime) -> None:
)

async def collect_output_messages(
_runtime: AgentRuntime,
id: AgentId,
_runtime: ClosureContext,
message: GroupChatStart | GroupChatMessage | GroupChatTermination,
ctx: MessageContext,
) -> None:
Expand All @@ -150,7 +150,7 @@ async def collect_output_messages(
return
await self._output_message_queue.put(message.message)

await ClosureAgent.register(
await ClosureAgent.register_closure(
runtime,
type=self._collector_agent_type,
closure=collect_output_messages,
Expand All @@ -170,6 +170,13 @@ async def run(
:meth:`run_stream` to run the team and then returns the final result.
Once the team is stopped, the termination condition is reset.
Args:
task (str | ChatMessage | None): The task to run the team with.
cancellation_token (CancellationToken | None): The cancellation token to kill the task immediately.
Setting the cancellation token potentially put the team in an inconsistent state,
and it may not reset the termination condition.
To gracefully stop the team, use :class:`~autogen_agentchat.task.ExternalTermination` instead.
Example using the :class:`~autogen_agentchat.teams.RoundRobinGroupChat` team:
Expand Down Expand Up @@ -198,6 +205,47 @@ async def main() -> None:
print(result)
asyncio.run(main())
Example using the :class:`~autogen_core.base.CancellationToken` to cancel the task:
.. code-block:: python
import asyncio
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.task import MaxMessageTermination
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_core.base import CancellationToken
from autogen_ext.models import OpenAIChatCompletionClient
async def main() -> None:
model_client = OpenAIChatCompletionClient(model="gpt-4o")
agent1 = AssistantAgent("Assistant1", model_client=model_client)
agent2 = AssistantAgent("Assistant2", model_client=model_client)
termination = MaxMessageTermination(3)
team = RoundRobinGroupChat([agent1, agent2], termination_condition=termination)
cancellation_token = CancellationToken()
# Create a task to run the team in the background.
run_task = asyncio.create_task(
team.run(
task="Count from 1 to 10, respond one at a time.",
cancellation_token=cancellation_token,
)
)
# Wait for 1 second and then cancel the task.
await asyncio.sleep(1)
cancellation_token.cancel()
# This will raise a cancellation error.
await run_task
asyncio.run(main())
"""
result: TaskResult | None = None
Expand All @@ -221,6 +269,13 @@ async def run_stream(
of the type :class:`TaskResult` as the last item in the stream. Once the
team is stopped, the termination condition is reset.
Args:
task (str | ChatMessage | None): The task to run the team with.
cancellation_token (CancellationToken | None): The cancellation token to kill the task immediately.
Setting the cancellation token potentially put the team in an inconsistent state,
and it may not reset the termination condition.
To gracefully stop the team, use :class:`~autogen_agentchat.task.ExternalTermination` instead.
Example using the :class:`~autogen_agentchat.teams.RoundRobinGroupChat` team:
.. code-block:: python
Expand Down Expand Up @@ -251,7 +306,52 @@ async def main() -> None:
asyncio.run(main())
Example using the :class:`~autogen_core.base.CancellationToken` to cancel the task:
.. code-block:: python
import asyncio
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.task import MaxMessageTermination, Console
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_core.base import CancellationToken
from autogen_ext.models import OpenAIChatCompletionClient
async def main() -> None:
model_client = OpenAIChatCompletionClient(model="gpt-4o")
agent1 = AssistantAgent("Assistant1", model_client=model_client)
agent2 = AssistantAgent("Assistant2", model_client=model_client)
termination = MaxMessageTermination(3)
team = RoundRobinGroupChat([agent1, agent2], termination_condition=termination)
cancellation_token = CancellationToken()
# Create a task to run the team in the background.
run_task = asyncio.create_task(
Console(
team.run_stream(
task="Count from 1 to 10, respond one at a time.",
cancellation_token=cancellation_token,
)
)
)
# Wait for 1 second and then cancel the task.
await asyncio.sleep(1)
cancellation_token.cancel()
# This will raise a cancellation error.
await run_task
asyncio.run(main())
"""

# Create the first chat message if the task is a string or a chat message.
first_chat_message: ChatMessage | None = None
if task is None:
Expand Down Expand Up @@ -288,12 +388,17 @@ async def stop_runtime() -> None:
await self._runtime.send_message(
GroupChatStart(message=first_chat_message),
recipient=AgentId(type=self._group_chat_manager_topic_type, key=self._team_id),
cancellation_token=cancellation_token,
)
# Collect the output messages in order.
output_messages: List[AgentMessage] = []
# Yield the messsages until the queue is empty.
while True:
message = await self._output_message_queue.get()
message_future = asyncio.ensure_future(self._output_message_queue.get())
if cancellation_token is not None:
cancellation_token.link_future(message_future)
# Wait for the next message, this will raise an exception if the task is cancelled.
message = await message_future
if message is None:
break
yield message
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
from abc import ABC, abstractmethod
from typing import Any, List

Expand Down Expand Up @@ -78,7 +79,9 @@ async def handle_start(self, message: GroupChatStart, ctx: MessageContext) -> No
await self.publish_message(message, topic_id=DefaultTopicId(type=self._output_topic_type))

# Relay the start message to the participants.
await self.publish_message(message, topic_id=DefaultTopicId(type=self._group_topic_type))
await self.publish_message(
message, topic_id=DefaultTopicId(type=self._group_topic_type), cancellation_token=ctx.cancellation_token
)

# Append the user message to the message thread.
self._message_thread.append(message.message)
Expand All @@ -95,8 +98,16 @@ async def handle_start(self, message: GroupChatStart, ctx: MessageContext) -> No
await self._termination_condition.reset()
return

speaker_topic_type = await self.select_speaker(self._message_thread)
await self.publish_message(GroupChatRequestPublish(), topic_id=DefaultTopicId(type=speaker_topic_type))
# Select a speaker to start the conversation.
speaker_topic_type_future = asyncio.ensure_future(self.select_speaker(self._message_thread))
# Link the select speaker future to the cancellation token.
ctx.cancellation_token.link_future(speaker_topic_type_future)
speaker_topic_type = await speaker_topic_type_future
await self.publish_message(
GroupChatRequestPublish(),
topic_id=DefaultTopicId(type=speaker_topic_type),
cancellation_token=ctx.cancellation_token,
)

@event
async def handle_agent_response(self, message: GroupChatAgentResponse, ctx: MessageContext) -> None:
Expand Down Expand Up @@ -140,8 +151,15 @@ async def handle_agent_response(self, message: GroupChatAgentResponse, ctx: Mess
return

# Select a speaker to continue the conversation.
speaker_topic_type = await self.select_speaker(self._message_thread)
await self.publish_message(GroupChatRequestPublish(), topic_id=DefaultTopicId(type=speaker_topic_type))
speaker_topic_type_future = asyncio.ensure_future(self.select_speaker(self._message_thread))
# Link the select speaker future to the cancellation token.
ctx.cancellation_token.link_future(speaker_topic_type_future)
speaker_topic_type = await speaker_topic_type_future
await self.publish_message(
GroupChatRequestPublish(),
topic_id=DefaultTopicId(type=speaker_topic_type),
cancellation_token=ctx.cancellation_token,
)

@rpc
async def handle_reset(self, message: GroupChatReset, ctx: MessageContext) -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ async def handle_request(self, message: GroupChatRequestPublish, ctx: MessageCon
await self.publish_message(
GroupChatAgentResponse(agent_response=response),
topic_id=DefaultTopicId(type=self._parent_topic_type),
cancellation_token=ctx.cancellation_token,
)

async def on_unhandled_message(self, message: Any, ctx: MessageContext) -> None:
Expand Down
Loading

0 comments on commit f985f7d

Please sign in to comment.