diff --git a/README.md b/README.md index 8a4b8ba507b..a876fa19ea2 100644 --- a/README.md +++ b/README.md @@ -181,7 +181,7 @@ var app = await App.PublishMessageAsync("HelloAgents", new NewMessageReceived await App.RuntimeApp!.WaitForShutdownAsync(); await app.WaitForShutdownAsync(); -[TopicSubscription("HelloAgents")] +[TopicSubscription("agents")] public class HelloAgent( IAgentContext context, [FromKeyedServices("EventTypes")] EventTypes typeRegistry) : ConsoleAgent( diff --git a/dotnet/Directory.Packages.props b/dotnet/Directory.Packages.props index 517eaf735c3..008c2864737 100644 --- a/dotnet/Directory.Packages.props +++ b/dotnet/Directory.Packages.props @@ -14,6 +14,7 @@ + diff --git a/dotnet/samples/Hello/Hello.AppHost/Hello.AppHost.csproj b/dotnet/samples/Hello/Hello.AppHost/Hello.AppHost.csproj index 5ce0d0531fa..370d13fb32e 100644 --- a/dotnet/samples/Hello/Hello.AppHost/Hello.AppHost.csproj +++ b/dotnet/samples/Hello/Hello.AppHost/Hello.AppHost.csproj @@ -14,6 +14,7 @@ + diff --git a/dotnet/samples/Hello/Hello.AppHost/Program.cs b/dotnet/samples/Hello/Hello.AppHost/Program.cs index 326eddbcc9e..62af2b249a9 100644 --- a/dotnet/samples/Hello/Hello.AppHost/Program.cs +++ b/dotnet/samples/Hello/Hello.AppHost/Program.cs @@ -5,15 +5,24 @@ var builder = DistributedApplication.CreateBuilder(args); var backend = builder.AddProject("backend").WithExternalHttpEndpoints(); -builder.AddProject("client") +var client = builder.AddProject("HelloAgentsDotNET") .WithReference(backend) - .WithEnvironment("AGENT_HOST", $"{backend.GetEndpoint("https").Property(EndpointProperty.Url)}") + .WithEnvironment("AGENT_HOST", backend.GetEndpoint("https")) + .WithEnvironment("STAY_ALIVE_ON_GOODBYE", "true") .WaitFor(backend); - +#pragma warning disable ASPIREHOSTINGPYTHON001 // Type is for evaluation purposes only and is subject to change or removal in future updates. Suppress this diagnostic to proceed. +// xlang is over http for now - in prod use TLS between containers +builder.AddPythonApp("HelloAgentsPython", "../../../../python/packages/autogen-core/samples/xlang/hello_python_agent", "hello_python_agent.py", "../../../../../.venv") + .WithReference(backend) + .WithEnvironment("AGENT_HOST", backend.GetEndpoint("http")) + .WithEnvironment("STAY_ALIVE_ON_GOODBYE", "true") + .WithEnvironment("GRPC_DNS_RESOLVER", "native") + .WithOtlpExporter() + .WaitFor(backend) + .WaitFor(client); +#pragma warning restore ASPIREHOSTINGPYTHON001 // Type is for evaluation purposes only and is subject to change or removal in future updates. Suppress this diagnostic to proceed. using var app = builder.Build(); - await app.StartAsync(); var url = backend.GetEndpoint("http").Url; Console.WriteLine("Backend URL: " + url); - await app.WaitForShutdownAsync(); diff --git a/dotnet/samples/Hello/HelloAIAgents/HelloAIAgent.cs b/dotnet/samples/Hello/HelloAIAgents/HelloAIAgent.cs index d2ba81e659a..4b8d663de19 100644 --- a/dotnet/samples/Hello/HelloAIAgents/HelloAIAgent.cs +++ b/dotnet/samples/Hello/HelloAIAgents/HelloAIAgent.cs @@ -6,7 +6,7 @@ using Microsoft.Extensions.AI; namespace Hello; -[TopicSubscription("HelloAgents")] +[TopicSubscription("agents")] public class HelloAIAgent( IAgentRuntime context, [FromKeyedServices("EventTypes")] EventTypes typeRegistry, diff --git a/dotnet/samples/Hello/HelloAIAgents/Program.cs b/dotnet/samples/Hello/HelloAIAgents/Program.cs index 9612a0a0795..891c026f943 100644 --- a/dotnet/samples/Hello/HelloAIAgents/Program.cs +++ b/dotnet/samples/Hello/HelloAIAgents/Program.cs @@ -30,7 +30,7 @@ namespace Hello { - [TopicSubscription("HelloAgents")] + [TopicSubscription("agents")] public class HelloAgent( IAgentRuntime context, [FromKeyedServices("EventTypes")] EventTypes typeRegistry, diff --git a/dotnet/samples/Hello/HelloAgent/Program.cs b/dotnet/samples/Hello/HelloAgent/Program.cs index 4f74520a71e..ce3fed2f61d 100644 --- a/dotnet/samples/Hello/HelloAgent/Program.cs +++ b/dotnet/samples/Hello/HelloAgent/Program.cs @@ -6,25 +6,17 @@ using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; -// step 1: create in-memory agent runtime - -// step 2: register HelloAgent to that agent runtime - -// step 3: start the agent runtime - -// step 4: send a message to the agent - -// step 5: wait for the agent runtime to shutdown +var local = true; +if (Environment.GetEnvironmentVariable("AGENT_HOST") != null) { local = false; } var app = await AgentsApp.PublishMessageAsync("HelloAgents", new NewMessageReceived { Message = "World" -}, local: true); -//var app = await AgentsApp.StartAsync(); +}, local: local).ConfigureAwait(false); await app.WaitForShutdownAsync(); namespace Hello { - [TopicSubscription("HelloAgents")] + [TopicSubscription("agents")] public class HelloAgent( IAgentRuntime context, IHostApplicationLifetime hostApplicationLifetime, [FromKeyedServices("EventTypes")] EventTypes typeRegistry) : AgentBase( @@ -53,7 +45,10 @@ public async Task Handle(ConversationClosed item) var goodbye = $"********************* {item.UserId} said {item.UserMessage} ************************"; var evt = new Output { Message = goodbye }; await PublishMessageAsync(evt).ConfigureAwait(true); - await PublishMessageAsync(new Shutdown()).ConfigureAwait(false); + if (Environment.GetEnvironmentVariable("STAY_ALIVE_ON_GOODBYE") != "true") + { + await PublishMessageAsync(new Shutdown()).ConfigureAwait(false); + } } public async Task Handle(Shutdown item) diff --git a/dotnet/samples/Hello/HelloAgentState/Program.cs b/dotnet/samples/Hello/HelloAgentState/Program.cs index 664689de824..e3c9dd2121c 100644 --- a/dotnet/samples/Hello/HelloAgentState/Program.cs +++ b/dotnet/samples/Hello/HelloAgentState/Program.cs @@ -15,7 +15,7 @@ namespace Hello { - [TopicSubscription("HelloAgents")] + [TopicSubscription("agents")] public class HelloAgent( IAgentRuntime context, IHostApplicationLifetime hostApplicationLifetime, diff --git a/dotnet/samples/Hello/protos/agent_events.proto b/dotnet/samples/Hello/protos/agent_events.proto new file mode 100644 index 00000000000..64ef2d69d60 --- /dev/null +++ b/dotnet/samples/Hello/protos/agent_events.proto @@ -0,0 +1,43 @@ +syntax = "proto3"; + +package HelloAgents; + +option csharp_namespace = "Microsoft.AutoGen.Abstractions"; +message TextMessage { + string textMessage = 1; + string source = 2; +} +message Input { + string message = 1; +} +message InputProcessed { + string route = 1; +} +message Output { + string message = 1; +} +message OutputWritten { + string route = 1; +} +message IOError { + string message = 1; +} +message NewMessageReceived { + string message = 1; +} +message ResponseGenerated { + string response = 1; +} +message GoodBye { + string message = 1; +} +message MessageStored { + string message = 1; +} +message ConversationClosed { + string user_id = 1; + string user_message = 2; +} +message Shutdown { + string message = 1; +} diff --git a/dotnet/src/Microsoft.AutoGen/Agents/AgentBase.cs b/dotnet/src/Microsoft.AutoGen/Agents/AgentBase.cs index 01ad856a2d4..5ff964070ff 100644 --- a/dotnet/src/Microsoft.AutoGen/Agents/AgentBase.cs +++ b/dotnet/src/Microsoft.AutoGen/Agents/AgentBase.cs @@ -36,23 +36,50 @@ protected AgentBase( runtime.AgentInstance = this; this.EventTypes = eventTypes; _logger = logger ?? LoggerFactory.Create(builder => { }).CreateLogger(); - var subscriptionRequest = new AddSubscriptionRequest + AddImplicitSubscriptionsAsync().AsTask().Wait(); + Completion = Start(); + } + internal Task Completion { get; } + + private async ValueTask AddImplicitSubscriptionsAsync() + { + var topicTypes = new List + { + this.AgentId.Type + ":" + this.AgentId.Key, + this.AgentId.Type + }; + + foreach (var topicType in topicTypes) { - RequestId = Guid.NewGuid().ToString(), - Subscription = new Subscription + var subscriptionRequest = new AddSubscriptionRequest { - TypeSubscription = new TypeSubscription + RequestId = Guid.NewGuid().ToString(), + Subscription = new Subscription { - AgentType = this.AgentId.Type, - TopicType = this.AgentId.Type + "/" + this.AgentId.Key + TypeSubscription = new TypeSubscription + { + AgentType = this.AgentId.Type, + TopicType = topicType + } } + }; + // explicitly wait for this to complete + await _runtime.SendMessageAsync(new Message { AddSubscriptionRequest = subscriptionRequest }).ConfigureAwait(true); + } + + // using reflection, find all methods that Handle and subscribe to the topic T + var handleMethods = this.GetType().GetMethods().Where(m => m.Name == "Handle").ToList(); + foreach (var method in handleMethods) + { + var eventType = method.GetParameters()[0].ParameterType; + var topic = EventTypes.EventsMap.FirstOrDefault(x => x.Value.Contains(eventType.Name)).Key; + if (topic != null) + { + Subscribe(nameof(topic)); } - }; - _runtime.SendMessageAsync(new Message { AddSubscriptionRequest = subscriptionRequest }).AsTask().Wait(); - Completion = Start(); - } - internal Task Completion { get; } + } + } internal Task Start() { var didSuppress = false; diff --git a/dotnet/src/Microsoft.AutoGen/Agents/Services/Grpc/GrpcAgentWorker.cs b/dotnet/src/Microsoft.AutoGen/Agents/Services/Grpc/GrpcAgentWorker.cs index 48f07573430..636bca487fc 100644 --- a/dotnet/src/Microsoft.AutoGen/Agents/Services/Grpc/GrpcAgentWorker.cs +++ b/dotnet/src/Microsoft.AutoGen/Agents/Services/Grpc/GrpcAgentWorker.cs @@ -216,6 +216,7 @@ private async ValueTask RegisterAgentTypeAsync(string type, Type agentType, Canc //var state = agentType.BaseType?.GetGenericArguments().First(); var topicTypes = agentType.GetCustomAttributes().Select(t => t.Topic); + //TODO: do something with the response (like retry on error) await WriteChannelAsync(new Message { RegisterAgentTypeRequest = new RegisterAgentTypeRequest @@ -227,9 +228,47 @@ await WriteChannelAsync(new Message //Events = { events } } }, cancellationToken).ConfigureAwait(false); + + foreach (var topic in topicTypes) + { + var subscriptionRequest = new Message + { + AddSubscriptionRequest = new AddSubscriptionRequest + { + RequestId = Guid.NewGuid().ToString(), + Subscription = new Subscription + { + TypeSubscription = new TypeSubscription + { + AgentType = type, + TopicType = topic + } + } + } + }; + await WriteChannelAsync(subscriptionRequest, cancellationToken).ConfigureAwait(true); + foreach (var e in events) + { + subscriptionRequest = new Message + { + AddSubscriptionRequest = new AddSubscriptionRequest + { + RequestId = Guid.NewGuid().ToString(), + Subscription = new Subscription + { + TypeSubscription = new TypeSubscription + { + AgentType = type, + TopicType = topic + "." + e + } + } + } + }; + await WriteChannelAsync(subscriptionRequest, cancellationToken).ConfigureAwait(true); + } + } } } - // new is intentional public new async ValueTask SendResponseAsync(RpcResponse response, CancellationToken cancellationToken = default) { diff --git a/dotnet/src/Microsoft.AutoGen/Agents/Services/Grpc/GrpcGateway.cs b/dotnet/src/Microsoft.AutoGen/Agents/Services/Grpc/GrpcGateway.cs index ab24a0e15fe..ea488568e4e 100644 --- a/dotnet/src/Microsoft.AutoGen/Agents/Services/Grpc/GrpcGateway.cs +++ b/dotnet/src/Microsoft.AutoGen/Agents/Services/Grpc/GrpcGateway.cs @@ -40,7 +40,6 @@ public GrpcGateway(IClusterClient clusterClient, ILogger logger) } public async ValueTask BroadcastEvent(CloudEvent evt) { - // TODO: filter the workers that receive the event var tasks = new List(_workers.Count); foreach (var (_, connection) in _supportedAgentTypes) { @@ -119,10 +118,23 @@ private async ValueTask RespondBadRequestAsync(GrpcWorkerConnection connection, { throw new RpcException(new Status(StatusCode.InvalidArgument, error)); } + + // agentype:rpc_request={requesting_agent_id} + // {genttype}:rpc_response={request_id} private async ValueTask AddSubscriptionAsync(GrpcWorkerConnection connection, AddSubscriptionRequest request) { - var topic = request.Subscription.TypeSubscription.TopicType; - var agentType = request.Subscription.TypeSubscription.AgentType; + var topic = ""; + var agentType = ""; + if (request.Subscription.TypePrefixSubscription is not null) + { + topic = request.Subscription.TypePrefixSubscription.TopicTypePrefix; + agentType = request.Subscription.TypePrefixSubscription.AgentType; + } + else if (request.Subscription.TypeSubscription is not null) + { + topic = request.Subscription.TypeSubscription.TopicType; + agentType = request.Subscription.TypeSubscription.AgentType; + } _subscriptionsByAgentType[agentType] = request.Subscription; _subscriptionsByTopic.GetOrAdd(topic, _ => []).Add(agentType); await _subscriptions.Subscribe(topic, agentType); @@ -153,31 +165,50 @@ private async ValueTask RegisterAgentTypeAsync(GrpcWorkerConnection connection, Success = true } }; - // add a default subscription for the agent type - //TODO: we should consider having constraints on the namespace or at least migrate all our examples to use well typed namesspaces like com.microsoft.autogen/hello/HelloAgents etc - var subscriptionRequest = new AddSubscriptionRequest + await connection.ResponseStream.WriteAsync(response).ConfigureAwait(false); + } + private async ValueTask DispatchEventAsync(CloudEvent evt) + { + // get the event type and then send to all agents that are subscribed to that event type + var eventType = evt.Type; + // ensure that we get agentTypes as an async enumerable list - try to get the value of agentTypes by topic and then cast it to an async enumerable list + if (_subscriptionsByTopic.TryGetValue(eventType, out var agentTypes)) + { + await DispatchEventToAgentsAsync(agentTypes, evt); + } + // instead of an exact match, we can also check for a prefix match where key starts with the eventType + else if (_subscriptionsByTopic.Keys.Any(key => key.StartsWith(eventType))) + { + _subscriptionsByTopic.Where( + kvp => kvp.Key.StartsWith(eventType)) + .SelectMany(kvp => kvp.Value) + .Distinct() + .ToList() + .ForEach(async agentType => + { + await DispatchEventToAgentsAsync(new List { agentType }, evt).ConfigureAwait(false); + }); + } + else { - RequestId = Guid.NewGuid().ToString(), - Subscription = new Subscription + // log that no agent types were found + _logger.LogWarning("No agent types found for event type {EventType}.", eventType); + } + } + private async ValueTask DispatchEventToAgentsAsync(IEnumerable agentTypes, CloudEvent evt) + { + var tasks = new List(agentTypes.Count()); + foreach (var agentType in agentTypes) + { + if (_supportedAgentTypes.TryGetValue(agentType, out var connections)) { - TypeSubscription = new TypeSubscription + foreach (var connection in connections) { - AgentType = msg.Type, - TopicType = msg.Type + tasks.Add(this.SendMessageAsync(connection, evt)); } } - }; - await AddSubscriptionAsync(connection, subscriptionRequest).ConfigureAwait(true); - - await connection.ResponseStream.WriteAsync(response).ConfigureAwait(false); - } - private async ValueTask DispatchEventAsync(CloudEvent evt) - { - await BroadcastEvent(evt).ConfigureAwait(false); - /* - var topic = _clusterClient.GetStreamProvider("agents").GetStream(StreamId.Create(evt.Namespace, evt.Type)); - await topic.OnNextAsync(evt.ToEvent()); - */ + } + await Task.WhenAll(tasks).ConfigureAwait(false); } private async ValueTask DispatchRequestAsync(GrpcWorkerConnection connection, RpcRequest request) { diff --git a/python/packages/autogen-core/samples/protos/__init__.py b/python/packages/autogen-core/samples/protos/__init__.py new file mode 100644 index 00000000000..b3ea671c3b9 --- /dev/null +++ b/python/packages/autogen-core/samples/protos/__init__.py @@ -0,0 +1,8 @@ +""" +The :mod:`autogen_core.worker.protos` module provides Google Protobuf classes for agent-worker communication +""" + +import os +import sys + +sys.path.insert(0, os.path.abspath(os.path.dirname(__file__))) diff --git a/python/packages/autogen-core/samples/protos/agent_events_pb2.py b/python/packages/autogen-core/samples/protos/agent_events_pb2.py new file mode 100644 index 00000000000..dfa96941b24 --- /dev/null +++ b/python/packages/autogen-core/samples/protos/agent_events_pb2.py @@ -0,0 +1,50 @@ +# -*- coding: utf-8 -*- +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: agent_events.proto +# Protobuf Python Version: 4.25.1 +"""Generated protocol buffer code.""" + +from google.protobuf import descriptor as _descriptor +from google.protobuf import descriptor_pool as _descriptor_pool +from google.protobuf import symbol_database as _symbol_database +from google.protobuf.internal import builder as _builder +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile( + b'\n\x12\x61gent_events.proto\x12\x06\x61gents"2\n\x0bTextMessage\x12\x13\n\x0btextMessage\x18\x01 \x01(\t\x12\x0e\n\x06source\x18\x02 \x01(\t"\x18\n\x05Input\x12\x0f\n\x07message\x18\x01 \x01(\t"\x1f\n\x0eInputProcessed\x12\r\n\x05route\x18\x01 \x01(\t"\x19\n\x06Output\x12\x0f\n\x07message\x18\x01 \x01(\t"\x1e\n\rOutputWritten\x12\r\n\x05route\x18\x01 \x01(\t"\x1a\n\x07IOError\x12\x0f\n\x07message\x18\x01 \x01(\t"%\n\x12NewMessageReceived\x12\x0f\n\x07message\x18\x01 \x01(\t"%\n\x11ResponseGenerated\x12\x10\n\x08response\x18\x01 \x01(\t"\x1a\n\x07GoodBye\x12\x0f\n\x07message\x18\x01 \x01(\t" \n\rMessageStored\x12\x0f\n\x07message\x18\x01 \x01(\t";\n\x12\x43onversationClosed\x12\x0f\n\x07user_id\x18\x01 \x01(\t\x12\x14\n\x0cuser_message\x18\x02 \x01(\t"\x1b\n\x08Shutdown\x12\x0f\n\x07message\x18\x01 \x01(\tB!\xaa\x02\x1eMicrosoft.AutoGen.Abstractionsb\x06proto3' +) + +_globals = globals() +_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) +_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, "agent_events_pb2", _globals) +if _descriptor._USE_C_DESCRIPTORS == False: + _globals["DESCRIPTOR"]._options = None + _globals["DESCRIPTOR"]._serialized_options = b"\252\002\036Microsoft.AutoGen.Abstractions" + _globals["_TEXTMESSAGE"]._serialized_start = 30 + _globals["_TEXTMESSAGE"]._serialized_end = 80 + _globals["_INPUT"]._serialized_start = 82 + _globals["_INPUT"]._serialized_end = 106 + _globals["_INPUTPROCESSED"]._serialized_start = 108 + _globals["_INPUTPROCESSED"]._serialized_end = 139 + _globals["_OUTPUT"]._serialized_start = 141 + _globals["_OUTPUT"]._serialized_end = 166 + _globals["_OUTPUTWRITTEN"]._serialized_start = 168 + _globals["_OUTPUTWRITTEN"]._serialized_end = 198 + _globals["_IOERROR"]._serialized_start = 200 + _globals["_IOERROR"]._serialized_end = 226 + _globals["_NEWMESSAGERECEIVED"]._serialized_start = 228 + _globals["_NEWMESSAGERECEIVED"]._serialized_end = 265 + _globals["_RESPONSEGENERATED"]._serialized_start = 267 + _globals["_RESPONSEGENERATED"]._serialized_end = 304 + _globals["_GOODBYE"]._serialized_start = 306 + _globals["_GOODBYE"]._serialized_end = 332 + _globals["_MESSAGESTORED"]._serialized_start = 334 + _globals["_MESSAGESTORED"]._serialized_end = 366 + _globals["_CONVERSATIONCLOSED"]._serialized_start = 368 + _globals["_CONVERSATIONCLOSED"]._serialized_end = 427 + _globals["_SHUTDOWN"]._serialized_start = 429 + _globals["_SHUTDOWN"]._serialized_end = 456 +# @@protoc_insertion_point(module_scope) diff --git a/python/packages/autogen-core/samples/protos/agent_events_pb2.pyi b/python/packages/autogen-core/samples/protos/agent_events_pb2.pyi new file mode 100644 index 00000000000..01cfbafee51 --- /dev/null +++ b/python/packages/autogen-core/samples/protos/agent_events_pb2.pyi @@ -0,0 +1,197 @@ +""" +@generated by mypy-protobuf. Do not edit manually! +isort:skip_file +""" + +import builtins +import google.protobuf.descriptor +import google.protobuf.message +import typing + +DESCRIPTOR: google.protobuf.descriptor.FileDescriptor + +@typing.final +class TextMessage(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + TEXTMESSAGE_FIELD_NUMBER: builtins.int + SOURCE_FIELD_NUMBER: builtins.int + textMessage: builtins.str + source: builtins.str + def __init__( + self, + *, + textMessage: builtins.str = ..., + source: builtins.str = ..., + ) -> None: ... + def ClearField(self, field_name: typing.Literal["source", b"source", "textMessage", b"textMessage"]) -> None: ... + +global___TextMessage = TextMessage + +@typing.final +class Input(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + MESSAGE_FIELD_NUMBER: builtins.int + message: builtins.str + def __init__( + self, + *, + message: builtins.str = ..., + ) -> None: ... + def ClearField(self, field_name: typing.Literal["message", b"message"]) -> None: ... + +global___Input = Input + +@typing.final +class InputProcessed(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + ROUTE_FIELD_NUMBER: builtins.int + route: builtins.str + def __init__( + self, + *, + route: builtins.str = ..., + ) -> None: ... + def ClearField(self, field_name: typing.Literal["route", b"route"]) -> None: ... + +global___InputProcessed = InputProcessed + +@typing.final +class Output(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + MESSAGE_FIELD_NUMBER: builtins.int + message: builtins.str + def __init__( + self, + *, + message: builtins.str = ..., + ) -> None: ... + def ClearField(self, field_name: typing.Literal["message", b"message"]) -> None: ... + +global___Output = Output + +@typing.final +class OutputWritten(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + ROUTE_FIELD_NUMBER: builtins.int + route: builtins.str + def __init__( + self, + *, + route: builtins.str = ..., + ) -> None: ... + def ClearField(self, field_name: typing.Literal["route", b"route"]) -> None: ... + +global___OutputWritten = OutputWritten + +@typing.final +class IOError(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + MESSAGE_FIELD_NUMBER: builtins.int + message: builtins.str + def __init__( + self, + *, + message: builtins.str = ..., + ) -> None: ... + def ClearField(self, field_name: typing.Literal["message", b"message"]) -> None: ... + +global___IOError = IOError + +@typing.final +class NewMessageReceived(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + MESSAGE_FIELD_NUMBER: builtins.int + message: builtins.str + def __init__( + self, + *, + message: builtins.str = ..., + ) -> None: ... + def ClearField(self, field_name: typing.Literal["message", b"message"]) -> None: ... + +global___NewMessageReceived = NewMessageReceived + +@typing.final +class ResponseGenerated(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + RESPONSE_FIELD_NUMBER: builtins.int + response: builtins.str + def __init__( + self, + *, + response: builtins.str = ..., + ) -> None: ... + def ClearField(self, field_name: typing.Literal["response", b"response"]) -> None: ... + +global___ResponseGenerated = ResponseGenerated + +@typing.final +class GoodBye(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + MESSAGE_FIELD_NUMBER: builtins.int + message: builtins.str + def __init__( + self, + *, + message: builtins.str = ..., + ) -> None: ... + def ClearField(self, field_name: typing.Literal["message", b"message"]) -> None: ... + +global___GoodBye = GoodBye + +@typing.final +class MessageStored(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + MESSAGE_FIELD_NUMBER: builtins.int + message: builtins.str + def __init__( + self, + *, + message: builtins.str = ..., + ) -> None: ... + def ClearField(self, field_name: typing.Literal["message", b"message"]) -> None: ... + +global___MessageStored = MessageStored + +@typing.final +class ConversationClosed(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + USER_ID_FIELD_NUMBER: builtins.int + USER_MESSAGE_FIELD_NUMBER: builtins.int + user_id: builtins.str + user_message: builtins.str + def __init__( + self, + *, + user_id: builtins.str = ..., + user_message: builtins.str = ..., + ) -> None: ... + def ClearField(self, field_name: typing.Literal["user_id", b"user_id", "user_message", b"user_message"]) -> None: ... + +global___ConversationClosed = ConversationClosed + +@typing.final +class Shutdown(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + MESSAGE_FIELD_NUMBER: builtins.int + message: builtins.str + def __init__( + self, + *, + message: builtins.str = ..., + ) -> None: ... + def ClearField(self, field_name: typing.Literal["message", b"message"]) -> None: ... + +global___Shutdown = Shutdown diff --git a/python/packages/autogen-core/samples/protos/agent_events_pb2_grpc.py b/python/packages/autogen-core/samples/protos/agent_events_pb2_grpc.py new file mode 100644 index 00000000000..bf947056a2f --- /dev/null +++ b/python/packages/autogen-core/samples/protos/agent_events_pb2_grpc.py @@ -0,0 +1,4 @@ +# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! +"""Client and server classes corresponding to protobuf-defined services.""" + +import grpc diff --git a/python/packages/autogen-core/samples/protos/agent_events_pb2_grpc.pyi b/python/packages/autogen-core/samples/protos/agent_events_pb2_grpc.pyi new file mode 100644 index 00000000000..a6a9cff9dfd --- /dev/null +++ b/python/packages/autogen-core/samples/protos/agent_events_pb2_grpc.pyi @@ -0,0 +1,17 @@ +""" +@generated by mypy-protobuf. Do not edit manually! +isort:skip_file +""" + +import abc +import collections.abc +import grpc +import grpc.aio +import typing + +_T = typing.TypeVar("_T") + +class _MaybeAsyncIterator(collections.abc.AsyncIterator[_T], collections.abc.Iterator[_T], metaclass=abc.ABCMeta): ... + +class _ServicerContext(grpc.ServicerContext, grpc.aio.ServicerContext): # type: ignore[misc, type-arg] + ... diff --git a/python/packages/autogen-core/samples/xlang/hello_python_agent/hello_python_agent.py b/python/packages/autogen-core/samples/xlang/hello_python_agent/hello_python_agent.py new file mode 100644 index 00000000000..0b5796ccd1a --- /dev/null +++ b/python/packages/autogen-core/samples/xlang/hello_python_agent/hello_python_agent.py @@ -0,0 +1,61 @@ +import asyncio +import logging +import os +import sys + +thisdir = os.path.dirname(os.path.abspath(__file__)) +sys.path.append(os.path.join(thisdir, "..", "..")) + +from autogen_core.application import WorkerAgentRuntime +from protos.agent_events_pb2 import NewMessageReceived + +# from protos.agents_events_pb2 import NewMessageReceived +from autogen_core.base import AgentId, try_get_known_serializers_for_type, PROTOBUF_DATA_CONTENT_TYPE +from autogen_core.components import DefaultSubscription, DefaultTopicId + +# Add the local package directory to sys.path +# sys.path.append(os.path.abspath('../../../../python/packages/autogen-core')) +from dotenv import load_dotenv +from user_input import UserProxy + +agnext_logger = logging.getLogger("autogen_core") + + +async def main() -> None: + load_dotenv() + agentHost = os.getenv("AGENT_HOST") or "localhost:53072" + # stupid grpc python bug - can only use the hostname, not prefix - if hostname has a prefix we have to remove it: + if agentHost.startswith("http://"): + agentHost = agentHost[7:] + if agentHost.startswith("https://"): + agentHost = agentHost[8:] + agnext_logger.info("0") + agnext_logger.info(agentHost) + runtime = WorkerAgentRuntime(host_address=agentHost, payload_serialization_format=PROTOBUF_DATA_CONTENT_TYPE) + + agnext_logger.info("1") + runtime.start() + runtime.add_message_serializer(try_get_known_serializers_for_type(NewMessageReceived)) + + agnext_logger.info("2") + + await UserProxy.register(runtime, "HelloAgents", lambda: UserProxy()) + await runtime.add_subscription(DefaultSubscription(agent_type="HelloAgents")) + agnext_logger.info("3") + + message = NewMessageReceived(message="Hello from Python!") + + await runtime.publish_message( + message=message, + topic_id=DefaultTopicId("agents.NewMessageReceived","HelloAgents/python"), + sender=AgentId("HelloAgents", "python"), + ) + await runtime.stop_when_signal() + # await runtime.stop_when_idle() + + +if __name__ == "__main__": + logging.basicConfig(level=logging.DEBUG) + agnext_logger.setLevel(logging.DEBUG) + agnext_logger.log(logging.DEBUG, "Starting worker") + asyncio.run(main()) diff --git a/python/packages/autogen-core/samples/xlang/hello_python_agent/user_input.py b/python/packages/autogen-core/samples/xlang/hello_python_agent/user_input.py new file mode 100644 index 00000000000..0da6c5355fc --- /dev/null +++ b/python/packages/autogen-core/samples/xlang/hello_python_agent/user_input.py @@ -0,0 +1,39 @@ +import asyncio +import logging +from typing import Union + +from protos.agent_events_pb2 import ConversationClosed, Input, NewMessageReceived, Output +from autogen_core.base import MessageContext +from autogen_core.components import DefaultTopicId, RoutedAgent, message_handler + +input_types = Union[ConversationClosed, Input, Output] + + +class UserProxy(RoutedAgent): + """An agent that allows the user to play the role of an agent in the conversation via input.""" + + DEFAULT_DESCRIPTION = "A human user." + + def __init__( + self, + description: str = DEFAULT_DESCRIPTION, + ) -> None: + super().__init__(description) + + @message_handler + async def handle_user_chat_input(self, message: input_types, ctx: MessageContext) -> None: + agnext_logger = logging.getLogger("autogen_core") + + if isinstance(message, Input): + response = await self.ainput("User input ('exit' to quit): ") + response = response.strip() + agnext_logger.info(response) + + await self.publish_message(NewMessageReceived(message=response), topic_id=DefaultTopicId()) + elif isinstance(message, Output): + agnext_logger.info(message.message) + else: + pass + + async def ainput(self, prompt: str) -> str: + return await asyncio.to_thread(input, f"{prompt} ") diff --git a/python/packages/autogen-core/src/autogen_core/application/_worker_runtime.py b/python/packages/autogen-core/src/autogen_core/application/_worker_runtime.py index 24007fadfc7..aa7d6b84e19 100644 --- a/python/packages/autogen-core/src/autogen_core/application/_worker_runtime.py +++ b/python/packages/autogen-core/src/autogen_core/application/_worker_runtime.py @@ -733,7 +733,7 @@ async def factory_wrapper() -> T: async def _process_register_agent_type_response(self, response: agent_worker_pb2.RegisterAgentTypeResponse) -> None: future = self._pending_requests.pop(response.request_id) - if response.HasField("error"): + if response.HasField("error") and response.error: future.set_exception(RuntimeError(response.error)) else: future.set_result(None) @@ -835,7 +835,7 @@ async def add_subscription(self, subscription: Subscription) -> None: async def _process_add_subscription_response(self, response: agent_worker_pb2.AddSubscriptionResponse) -> None: future = self._pending_requests.pop(response.request_id) - if response.HasField("error"): + if response.HasField("error") and response.error: future.set_exception(RuntimeError(response.error)) else: future.set_result(None) diff --git a/python/pyproject.toml b/python/pyproject.toml index e9b9753cfca..9f4d3773575 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -80,4 +80,6 @@ check = ["fmt", "lint", "pyright", "mypy", "test"] gen-proto = "python -m grpc_tools.protoc --python_out=./packages/autogen-core/src/autogen_core/application/protos --grpc_python_out=./packages/autogen-core/src/autogen_core/application/protos --mypy_out=./packages/autogen-core/src/autogen_core/application/protos --mypy_grpc_out=./packages/autogen-core/src/autogen_core/application/protos --proto_path ../protos/ agent_worker.proto --proto_path ../protos/ cloudevent.proto" -gen-test-proto = "python -m grpc_tools.protoc --python_out=./packages/autogen-core/tests/protos --grpc_python_out=./packages/autogen-core/tests/protos --mypy_out=./packages/autogen-core/tests/protos --mypy_grpc_out=./packages/autogen-core/tests/protos --proto_path ./packages/autogen-core/tests/protos serialization_test.proto" \ No newline at end of file +gen-test-proto = "python -m grpc_tools.protoc --python_out=./packages/autogen-core/tests/protos --grpc_python_out=./packages/autogen-core/tests/protos --mypy_out=./packages/autogen-core/tests/protos --mypy_grpc_out=./packages/autogen-core/tests/protos --proto_path ./packages/autogen-core/tests/protos serialization_test.proto" + +gen-proto-samples = "python -m grpc_tools.protoc --python_out=./packages/autogen-core/samples/protos --grpc_python_out=./packages/autogen-core/samples/protos --mypy_out=./packages/autogen-core/samples/protos --mypy_grpc_out=./packages/autogen-core/samples/protos --proto_path ../protos/ agent_events.proto" \ No newline at end of file