Skip to content

Commit

Permalink
Use official cloud event proto schema (microsoft#4487)
Browse files Browse the repository at this point in the history
* Use official cloud event proto schema

* format

* fix bug in cloud event attribute creation

---------

Co-authored-by: Kosta Petan <[email protected]>
Co-authored-by: Ryan Sweet <[email protected]>
  • Loading branch information
3 people authored Dec 10, 2024
1 parent 48778e5 commit b32f1a0
Show file tree
Hide file tree
Showing 17 changed files with 297 additions and 535 deletions.
1 change: 0 additions & 1 deletion dotnet/src/Microsoft.AutoGen/Abstractions/IAgentWorker.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// IAgentWorker.cs

namespace Microsoft.AutoGen.Abstractions;

public interface IAgentWorker
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ public static CloudEvent ToCloudEvent<T>(this T message, string source) where T
Type = message.Descriptor.FullName,
Source = source,
Id = Guid.NewGuid().ToString(),
SpecVersion = "1.0",
Attributes = { { "datacontenttype", new CloudEvent.Types.CloudEventAttributeValue { CeString = PROTO_DATA_CONTENT_TYPE } } }
};
}
Expand Down
2 changes: 1 addition & 1 deletion dotnet/src/Microsoft.AutoGen/Agents/AgentBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ protected internal async Task HandleRpcMessage(Message msg, CancellationToken ca
{
case Message.MessageOneofCase.CloudEvent:
{
var activity = this.ExtractActivity(msg.CloudEvent.Type, msg.CloudEvent.Metadata);
var activity = this.ExtractActivity(msg.CloudEvent.Type, msg.CloudEvent.Attributes);
await this.InvokeWithActivityAsync(
static ((AgentBase Agent, CloudEvent Item) state, CancellationToken _) => state.Agent.CallHandler(state.Item),
(this, msg.CloudEvent),
Expand Down
7 changes: 7 additions & 0 deletions dotnet/src/Microsoft.AutoGen/Agents/AgentBaseExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
// AgentBaseExtensions.cs

using System.Diagnostics;
using Google.Protobuf.Collections;
using static Microsoft.AutoGen.Abstractions.CloudEvent.Types;

namespace Microsoft.AutoGen.Agents;

Expand Down Expand Up @@ -57,6 +59,11 @@ public static class AgentBaseExtensions
return activity;
}

public static Activity? ExtractActivity(this AgentBase agent, string activityName, MapField<string, CloudEventAttributeValue> metadata)
{
return ExtractActivity(agent, activityName, metadata.ToDictionary(kvp => kvp.Key, kvp => kvp.Value.CeString));
}

/// <summary>
/// Invokes a function asynchronously within the context of an <see cref="Activity"/>.
/// </summary>
Expand Down
42 changes: 41 additions & 1 deletion dotnet/src/Microsoft.AutoGen/Agents/AgentRuntime.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
// AgentRuntime.cs

using System.Diagnostics;
using Google.Protobuf.Collections;
using Microsoft.AutoGen.Abstractions;
using Microsoft.Extensions.Logging;
using static Microsoft.AutoGen.Abstractions.CloudEvent.Types;

namespace Microsoft.AutoGen.Agents;

Expand All @@ -28,13 +30,38 @@ internal sealed class AgentRuntime(AgentId agentId, IAgentWorker worker, ILogger
out var traceState);
return (traceParent, traceState);
}
public (string?, string?) GetTraceIdAndState(MapField<string, CloudEventAttributeValue> metadata)
{
DistributedContextPropagator.ExtractTraceIdAndState(metadata,
static (object? carrier, string fieldName, out string? fieldValue, out IEnumerable<string>? fieldValues) =>
{
var metadata = (MapField<string, CloudEventAttributeValue>)carrier!;
fieldValues = null;
metadata.TryGetValue(fieldName, out var ceValue);
fieldValue = ceValue?.CeString;
},
out var traceParent,
out var traceState);
return (traceParent, traceState);
}
public void Update(RpcRequest request, Activity? activity = null)
{
DistributedContextPropagator.Inject(activity, request.Metadata, static (carrier, key, value) => ((IDictionary<string, string>)carrier!)[key] = value);
}
public void Update(CloudEvent cloudEvent, Activity? activity = null)
{
DistributedContextPropagator.Inject(activity, cloudEvent.Metadata, static (carrier, key, value) => ((IDictionary<string, string>)carrier!)[key] = value);
DistributedContextPropagator.Inject(activity, cloudEvent.Attributes, static (carrier, key, value) =>
{
var mapField = (MapField<string, CloudEventAttributeValue>)carrier!;
if (mapField.TryGetValue(key, out var ceValue))
{
mapField[key] = new CloudEventAttributeValue { CeString = value };
}
else
{
mapField.Add(key, new CloudEventAttributeValue { CeString = value });
}
});
}
public async ValueTask SendResponseAsync(RpcRequest request, RpcResponse response, CancellationToken cancellationToken = default)
{
Expand Down Expand Up @@ -73,4 +100,17 @@ public IDictionary<string, string> ExtractMetadata(IDictionary<string, string> m

return baggage as IDictionary<string, string> ?? new Dictionary<string, string>();
}

public IDictionary<string, string> ExtractMetadata(MapField<string, CloudEventAttributeValue> metadata)
{
var baggage = DistributedContextPropagator.ExtractBaggage(metadata, static (object? carrier, string fieldName, out string? fieldValue, out IEnumerable<string>? fieldValues) =>
{
var metadata = (MapField<string, CloudEventAttributeValue>)carrier!;
fieldValues = null;
metadata.TryGetValue(fieldName, out var ceValue);
fieldValue = ceValue?.CeString;
});

return baggage as IDictionary<string, string> ?? new Dictionary<string, string>();
}
}
2 changes: 1 addition & 1 deletion protos/agent_worker.proto
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ message Message {
oneof message {
RpcRequest request = 1;
RpcResponse response = 2;
cloudevent.CloudEvent cloudEvent = 3;
io.cloudevents.v1.CloudEvent cloudEvent = 3;
RegisterAgentTypeRequest registerAgentTypeRequest = 4;
RegisterAgentTypeResponse registerAgentTypeResponse = 5;
AddSubscriptionRequest addSubscriptionRequest = 6;
Expand Down
22 changes: 16 additions & 6 deletions protos/cloudevent.proto
Original file line number Diff line number Diff line change
@@ -1,6 +1,16 @@
// https://github.com/cloudevents/spec/blob/main/cloudevents/formats/cloudevents.proto

/**
* CloudEvent Protobuf Format
*
* - Required context attributes are explicitly represented.
* - Optional and Extension context attributes are carried in a map structure.
* - Data may be represented as binary, text, or protobuf messages.
*/

syntax = "proto3";

package cloudevent;
package io.cloudevents.v1;

import "google/protobuf/any.proto";
import "google/protobuf/timestamp.proto";
Expand All @@ -20,12 +30,12 @@ message CloudEvent {

// Optional & Extension Attributes
map<string, CloudEventAttributeValue> attributes = 5;
map<string, string> metadata = 6;

// -- CloudEvent Data (Bytes, Text, or Proto)
oneof data {
bytes binary_data = 7;
string text_data = 8;
google.protobuf.Any proto_data = 9;
bytes binary_data = 6;
string text_data = 7;
google.protobuf.Any proto_data = 8;
}

/**
Expand All @@ -45,4 +55,4 @@ message CloudEvent {
google.protobuf.Timestamp ce_timestamp = 7;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -626,11 +626,38 @@ async def _process_event(self, event: cloudevent_pb2.CloudEvent) -> None:
agent = await self._get_agent(agent_id)
with MessageHandlerContext.populate_context(agent.id):

def stringify_attributes(
attributes: Mapping[str, cloudevent_pb2.CloudEvent.CloudEventAttributeValue],
) -> Mapping[str, str]:
result: Dict[str, str] = {}
for key, value in attributes.items():
item = None
match value.WhichOneof("attr"):
case "ce_boolean":
item = str(value.ce_boolean)
case "ce_integer":
item = str(value.ce_integer)
case "ce_string":
item = value.ce_string
case "ce_bytes":
item = str(value.ce_bytes)
case "ce_uri":
item = value.ce_uri
case "ce_uri_ref":
item = value.ce_uri_ref
case "ce_timestamp":
item = str(value.ce_timestamp)
case _:
raise ValueError("Unknown attribute kind")
result[key] = item

return result

async def send_message(agent: Agent, message_context: MessageContext) -> Any:
with self._trace_helper.trace_block(
"process",
agent.id,
parent=event.metadata,
parent=stringify_attributes(event.attributes),
extraAttributes={"message_type": message_type},
):
await agent.on_message(message, ctx=message_context)
Expand Down
Loading

0 comments on commit b32f1a0

Please sign in to comment.