From 32c2db8ae7e6b6159b2db8dcd35ec0ec9507b852 Mon Sep 17 00:00:00 2001 From: Whit Waldo Date: Thu, 24 Oct 2024 07:51:30 -0500 Subject: [PATCH 01/16] Building out Dapr.Messaging and test project for streaming pubsub subscriptions Signed-off-by: Whit Waldo --- Directory.Packages.props | 1 + all.sln | 14 + src/Dapr.Messaging/Dapr.Messaging.csproj | 22 ++ .../DaprPublishSubscribeClient.cs | 18 ++ .../DaprPublishSubscribeClientBuilder.cs | 24 ++ .../DaprPublishSubscribeGrpcClient.cs | 36 +++ .../DaprSubscriptionOptions.cs | 31 ++ ...ishSubscribeServiceCollectionExtensions.cs | 55 ++++ .../PublishSubscribe/MessageHandlingPolicy.cs | 9 + .../PublishSubscribeReceiver.cs | 293 ++++++++++++++++++ .../PublishSubscribe/TopicMessage.cs | 33 ++ .../PublishSubscribe/TopicMessageHandler.cs | 10 + .../PublishSubscribe/TopicResponseAction.cs | 21 ++ .../Dapr.Messaging.Test.csproj | 47 +++ test/Dapr.Messaging.Test/protos/test.proto | 32 ++ 15 files changed, 646 insertions(+) create mode 100644 src/Dapr.Messaging/Dapr.Messaging.csproj create mode 100644 src/Dapr.Messaging/PublishSubscribe/DaprPublishSubscribeClient.cs create mode 100644 src/Dapr.Messaging/PublishSubscribe/DaprPublishSubscribeClientBuilder.cs create mode 100644 src/Dapr.Messaging/PublishSubscribe/DaprPublishSubscribeGrpcClient.cs create mode 100644 src/Dapr.Messaging/PublishSubscribe/DaprSubscriptionOptions.cs create mode 100644 src/Dapr.Messaging/PublishSubscribe/Extensions/PublishSubscribeServiceCollectionExtensions.cs create mode 100644 src/Dapr.Messaging/PublishSubscribe/MessageHandlingPolicy.cs create mode 100644 src/Dapr.Messaging/PublishSubscribe/PublishSubscribeReceiver.cs create mode 100644 src/Dapr.Messaging/PublishSubscribe/TopicMessage.cs create mode 100644 src/Dapr.Messaging/PublishSubscribe/TopicMessageHandler.cs create mode 100644 src/Dapr.Messaging/PublishSubscribe/TopicResponseAction.cs create mode 100644 test/Dapr.Messaging.Test/Dapr.Messaging.Test.csproj create mode 100644 test/Dapr.Messaging.Test/protos/test.proto diff --git a/Directory.Packages.props b/Directory.Packages.props index 3c1459b5d..99ea0f2cc 100644 --- a/Directory.Packages.props +++ b/Directory.Packages.props @@ -27,6 +27,7 @@ + diff --git a/all.sln b/all.sln index 1dd0ab3c5..d8fc37b4b 100644 --- a/all.sln +++ b/all.sln @@ -119,6 +119,10 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Dapr.Common", "src\Dapr.Com EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Dapr.Common.Test", "test\Dapr.Common.Test\Dapr.Common.Test.csproj", "{CDB47863-BEBD-4841-A807-46D868962521}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Dapr.Messaging.Test", "test\Dapr.Messaging.Test\Dapr.Messaging.Test.csproj", "{4E04EB35-7FD2-4FDB-B09A-F75CE24053B9}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Dapr.Messaging", "src\Dapr.Messaging\Dapr.Messaging.csproj", "{0EAE36A1-B578-4F13-A113-7A477ECA1BDA}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -303,6 +307,14 @@ Global {CDB47863-BEBD-4841-A807-46D868962521}.Debug|Any CPU.Build.0 = Debug|Any CPU {CDB47863-BEBD-4841-A807-46D868962521}.Release|Any CPU.ActiveCfg = Release|Any CPU {CDB47863-BEBD-4841-A807-46D868962521}.Release|Any CPU.Build.0 = Release|Any CPU + {4E04EB35-7FD2-4FDB-B09A-F75CE24053B9}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {4E04EB35-7FD2-4FDB-B09A-F75CE24053B9}.Debug|Any CPU.Build.0 = Debug|Any CPU + {4E04EB35-7FD2-4FDB-B09A-F75CE24053B9}.Release|Any CPU.ActiveCfg = Release|Any CPU + {4E04EB35-7FD2-4FDB-B09A-F75CE24053B9}.Release|Any CPU.Build.0 = Release|Any CPU + {0EAE36A1-B578-4F13-A113-7A477ECA1BDA}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {0EAE36A1-B578-4F13-A113-7A477ECA1BDA}.Debug|Any CPU.Build.0 = Debug|Any CPU + {0EAE36A1-B578-4F13-A113-7A477ECA1BDA}.Release|Any CPU.ActiveCfg = Release|Any CPU + {0EAE36A1-B578-4F13-A113-7A477ECA1BDA}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -359,6 +371,8 @@ Global {DFBABB04-50E9-42F6-B470-310E1B545638} = {27C5D71D-0721-4221-9286-B94AB07B58CF} {B445B19C-A925-4873-8CB7-8317898B6970} = {27C5D71D-0721-4221-9286-B94AB07B58CF} {CDB47863-BEBD-4841-A807-46D868962521} = {DD020B34-460F-455F-8D17-CF4A949F100B} + {4E04EB35-7FD2-4FDB-B09A-F75CE24053B9} = {DD020B34-460F-455F-8D17-CF4A949F100B} + {0EAE36A1-B578-4F13-A113-7A477ECA1BDA} = {27C5D71D-0721-4221-9286-B94AB07B58CF} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {65220BF2-EAE1-4CB2-AA58-EBE80768CB40} diff --git a/src/Dapr.Messaging/Dapr.Messaging.csproj b/src/Dapr.Messaging/Dapr.Messaging.csproj new file mode 100644 index 000000000..0c7f159a4 --- /dev/null +++ b/src/Dapr.Messaging/Dapr.Messaging.csproj @@ -0,0 +1,22 @@ + + + + This package contains the reference assemblies for developing messaging services using Dapr. + enable + enable + Dapr.Messaging + Dapr Messaging SDK + Dapr Messaging SDK for building applications that utilize messaging components. + alpha + + + + + + + + + + + + diff --git a/src/Dapr.Messaging/PublishSubscribe/DaprPublishSubscribeClient.cs b/src/Dapr.Messaging/PublishSubscribe/DaprPublishSubscribeClient.cs new file mode 100644 index 000000000..61ee621db --- /dev/null +++ b/src/Dapr.Messaging/PublishSubscribe/DaprPublishSubscribeClient.cs @@ -0,0 +1,18 @@ +namespace Dapr.Messaging.PublishSubscribe; + +/// +/// The base implementation of a Dapr pub/sub client. +/// +public abstract class DaprPublishSubscribeClient +{ + /// + /// Dynamically subscribes to a Publish/Subscribe component and topic. + /// + /// The name of the Publish/Subscribe component. + /// The name of the topic to subscribe to. + /// Configuration options. + /// The delegate reflecting the action to take upon messages received by the subscription. + /// Cancellation token. + /// + public abstract Task SubscribeAsync(string pubSubName, string topicName, DaprSubscriptionOptions options, TopicMessageHandler messageHandler, CancellationToken cancellationToken); +} diff --git a/src/Dapr.Messaging/PublishSubscribe/DaprPublishSubscribeClientBuilder.cs b/src/Dapr.Messaging/PublishSubscribe/DaprPublishSubscribeClientBuilder.cs new file mode 100644 index 000000000..4bc945ecc --- /dev/null +++ b/src/Dapr.Messaging/PublishSubscribe/DaprPublishSubscribeClientBuilder.cs @@ -0,0 +1,24 @@ +// using Autogenerated = Dapr.Client.Autogen.Grpc.v1; +// +// namespace Dapr.Messaging.PublishSubscribe; +// +// /// +// /// Builds a . +// /// +// public sealed class DaprPublishSubscribeClientBuilder : DaprGenericClientBuilder +// { +// /// +// /// Builds the client instance from the properties of the builder. +// /// +// /// The Dapr client instance. +// /// +// /// Builds the client instance from the properties of the builder. +// /// +// public override DaprPublishSubscribeClient Build() +// { +// var daprClientDependencies = BuildDaprClientDependencies(); +// var client = new Autogenerated.Dapr.DaprClient(daprClientDependencies.channel); +// +// return new DaprPublishSubscribeGrpcClient(client); +// } +// } diff --git a/src/Dapr.Messaging/PublishSubscribe/DaprPublishSubscribeGrpcClient.cs b/src/Dapr.Messaging/PublishSubscribe/DaprPublishSubscribeGrpcClient.cs new file mode 100644 index 000000000..4990dc891 --- /dev/null +++ b/src/Dapr.Messaging/PublishSubscribe/DaprPublishSubscribeGrpcClient.cs @@ -0,0 +1,36 @@ +using P = Dapr.Client.Autogen.Grpc.v1.Dapr; + +namespace Dapr.Messaging.PublishSubscribe; + +/// +/// A client for interacting with the Dapr endpoints. +/// +internal sealed class DaprPublishSubscribeGrpcClient : DaprPublishSubscribeClient +{ + private readonly P.DaprClient daprClient; + + /// + /// Creates a new instance of a + /// + public DaprPublishSubscribeGrpcClient(P.DaprClient client) + { + daprClient = client; + } + + /// + /// Dynamically subscribes to a Publish/Subscribe component and topic. + /// + /// The name of the Publish/Subscribe component. + /// The name of the topic to subscribe to. + /// Configuration options. + /// The delegate reflecting the action to take upon messages received by the subscription. + /// Cancellation token. + /// + public override async Task SubscribeAsync(string pubSubName, string topicName, DaprSubscriptionOptions options, TopicMessageHandler messageHandler, CancellationToken cancellationToken) + { + var receiver = new PublishSubscribeReceiver(pubSubName, topicName, options, messageHandler, daprClient); + await receiver.SubscribeAsync(cancellationToken); + return receiver; + } +} + diff --git a/src/Dapr.Messaging/PublishSubscribe/DaprSubscriptionOptions.cs b/src/Dapr.Messaging/PublishSubscribe/DaprSubscriptionOptions.cs new file mode 100644 index 000000000..dd124e8f6 --- /dev/null +++ b/src/Dapr.Messaging/PublishSubscribe/DaprSubscriptionOptions.cs @@ -0,0 +1,31 @@ +namespace Dapr.Messaging.PublishSubscribe; + +/// +/// Options used to configure the dynamic Dapr subscription. +/// +/// Describes the policy to take on messages that have not been acknowledged within the timeout period. +public sealed record DaprSubscriptionOptions(MessageHandlingPolicy MessageHandlingPolicy) +{ + /// + /// Subscription metadata. + /// + public IReadOnlyDictionary Metadata { get; init; } = new Dictionary(); + + /// + /// The optional name of the dead-letter topic to send unprocessed messages to. + /// + public string? DeadLetterTopic { get; init; } + + /// + /// If populated, this reflects the maximum number of messages that can be queued for processing on the replica. By default, + /// no maximum boundary is enforced. + /// + public int? MaximumQueuedMessages { get; init; } + + /// + /// The maximum amount of time to take to dispose of acknowledgement messages after the cancellation token has + /// been signaled. + /// + public TimeSpan MaximumCleanupTimeout { get; init; } = TimeSpan.FromSeconds(30); +} + diff --git a/src/Dapr.Messaging/PublishSubscribe/Extensions/PublishSubscribeServiceCollectionExtensions.cs b/src/Dapr.Messaging/PublishSubscribe/Extensions/PublishSubscribeServiceCollectionExtensions.cs new file mode 100644 index 000000000..ae09ddb48 --- /dev/null +++ b/src/Dapr.Messaging/PublishSubscribe/Extensions/PublishSubscribeServiceCollectionExtensions.cs @@ -0,0 +1,55 @@ +// using Microsoft.Extensions.DependencyInjection; +// +// namespace System.Runtime.CompilerServices.PublishSubscribe.Extensions; +// +// /// +// /// Contains extension methods for using Dapr Publish/Subscribe with dependency injection. +// /// +// public static class PublishSubscribeServiceCollectionExtensions +// { +// /// +// /// Adds Dapr Publish/Subscribe support to the service collection. +// /// +// /// The . +// /// +// public static IServiceCollection AddDaprPubSubClient(this IServiceCollection services) => +// AddDaprPubSubClient(services, (_, _) => { }); +// +// /// +// /// Adds Dapr Publish/Subscribe support to the service collection. +// /// +// /// The . +// /// Optionally allows greater configuration of the . +// /// +// public static IServiceCollection AddDaprPubSubClient(this IServiceCollection services, +// Action? configure) => +// services.AddDaprPubSubClient((_, builder) => configure?.Invoke(builder)); +// +// /// +// /// Adds Dapr Publish/Subscribe support to the service collection. +// /// +// /// The . +// /// Optionally allows greater configuration of the using injected services. +// /// +// public static IServiceCollection AddDaprPubSubClient(this IServiceCollection services, Action? configure) +// { +// ArgumentNullException.ThrowIfNull(services, nameof(services)); +// +// //Register the IHttpClientFactory implementation +// services.AddHttpClient(); +// +// services.TryAddSingleton(serviceProvider => +// { +// var httpClientFactory = serviceProvider.GetRequiredService(); +// +// var builder = new DaprPublishSubscribeClientBuilder(); +// builder.UseHttpClientFactory(httpClientFactory); +// +// configure?.Invoke(serviceProvider, builder); +// +// return builder.Build(); +// }); +// +// return services; +// } +// } diff --git a/src/Dapr.Messaging/PublishSubscribe/MessageHandlingPolicy.cs b/src/Dapr.Messaging/PublishSubscribe/MessageHandlingPolicy.cs new file mode 100644 index 000000000..1cfbc3fbb --- /dev/null +++ b/src/Dapr.Messaging/PublishSubscribe/MessageHandlingPolicy.cs @@ -0,0 +1,9 @@ +namespace Dapr.Messaging.PublishSubscribe; + +/// +/// Defines the policy for handling streaming message subscriptions, including retry logic and timeout settings. +/// +/// The duration to wait before timing out a message handling operation. +/// The default action to take when a message handling operation times out. +public sealed record MessageHandlingPolicy(TimeSpan TimeoutDuration, TopicResponseAction DefaultResponseAction); + diff --git a/src/Dapr.Messaging/PublishSubscribe/PublishSubscribeReceiver.cs b/src/Dapr.Messaging/PublishSubscribe/PublishSubscribeReceiver.cs new file mode 100644 index 000000000..2f7109521 --- /dev/null +++ b/src/Dapr.Messaging/PublishSubscribe/PublishSubscribeReceiver.cs @@ -0,0 +1,293 @@ +using System.Threading.Channels; +using Dapr.AppCallback.Autogen.Grpc.v1; +using Grpc.Core; +using P = Dapr.Client.Autogen.Grpc.v1; + +namespace Dapr.Messaging.PublishSubscribe; + +/// +/// A thread-safe implementation of a receiver for messages from a specified Dapr publish/subscribe component and +/// topic. +/// +public sealed class PublishSubscribeReceiver : IAsyncDisposable +{ + private readonly static UnboundedChannelOptions UnboundedChannelOptions = new UnboundedChannelOptions + { + SingleWriter = true, SingleReader = true + }; + + /// + /// The name of the Dapr pubsub component. + /// + private readonly string pubSubName; + /// + /// The name of the topic to subscribe to. + /// + private readonly string topicName; + /// + /// Options allowing the behavior of the receiver to be configured. + /// + private readonly DaprSubscriptionOptions options; + /// + /// A channel used to decouple the messages received from the sidecar to their consumption. + /// + private readonly Channel topicMessagesChannel; + /// + /// Maintains the various acknowledgements for each message. + /// + private readonly Channel acknowledgementsChannel = Channel.CreateUnbounded(UnboundedChannelOptions); + /// + /// The stream connection between this instance and the Dapr sidecar. + /// + private AsyncDuplexStreamingCall? clientStream; + /// + /// Used to ensure thread-safe operations against the stream. + /// + private readonly SemaphoreSlim semaphore = new(1, 1); + /// + /// The handler delegate responsible for processing the topic messages. + /// + private readonly TopicMessageHandler messageHandler; + /// + /// A reference to the DaprClient instance. + /// + private readonly P.Dapr.DaprClient client; + /// + /// Flag that prevents the developer from accidentally initializing the subscription more than once from the same receiver. + /// + private bool hasInitialized; + /// + /// Flag that ensures the instance is only disposed a single time. + /// + private bool isDisposed; + + /// + /// Constructs a new instance of a instance. + /// + /// The name of the Dapr Publish/Subscribe component. + /// The name of the topic to subscribe to. + /// Options allowing the behavior of the receiver to be configured. + /// The delegate reflecting the action to take upon messages received by the subscription. + /// A reference to the DaprClient instance. + internal PublishSubscribeReceiver(string pubSubName, string topicName, DaprSubscriptionOptions options, TopicMessageHandler handler, P.Dapr.DaprClient client) + { + this.client = client; + this.pubSubName = pubSubName; + this.topicName = topicName; + this.options = options; + this.messageHandler = handler; + topicMessagesChannel = options.MaximumQueuedMessages is > 0 + ? Channel.CreateBounded(new BoundedChannelOptions((int)options.MaximumQueuedMessages) + { + SingleReader = true, SingleWriter = true, FullMode = BoundedChannelFullMode.Wait + }) + : Channel.CreateUnbounded(UnboundedChannelOptions); + } + + /// + /// Dynamically subscribes to messages on a PubSub topic provided by the Dapr sidecar. + /// + /// Cancellation token. + /// An containing messages provided by the sidecar. + internal async Task SubscribeAsync(CancellationToken cancellationToken = default) + { + //Prevents the receiver from performing the subscribe operation more than once (as the multiple initialization messages would cancel the stream). + if (hasInitialized) + return; + hasInitialized = true; + + var stream = await GetStreamAsync(cancellationToken); + + //Retrieve the messages from the sidecar and write to the messages channel + var fetchMessagesTask = FetchDataFromSidecarAsync(stream, topicMessagesChannel.Writer, cancellationToken); + + //Process the messages as they're written to either channel + var acknowledgementProcessorTask = ProcessAcknowledgementChannelMessagesAsync(cancellationToken); + var topicMessageProcessorTask = ProcessTopicChannelMessagesAsync(cancellationToken); + + try + { + await Task.WhenAll(fetchMessagesTask, acknowledgementProcessorTask, topicMessageProcessorTask); + } + catch (OperationCanceledException) + { + // Will be cleaned up during DisposeAsync + } + } + + /// + /// Retrieves or creates the bidirectional stream to the DaprClient for transacting pub/sub subscriptions. + /// + /// Cancellation token. + /// The stream connection. + private async Task> GetStreamAsync(CancellationToken cancellationToken) + { + await semaphore.WaitAsync(cancellationToken); + + try + { + return clientStream ??= client.SubscribeTopicEventsAlpha1(cancellationToken: cancellationToken); + } + finally + { + semaphore.Release(); + } + } + + /// + /// Acknowledges the indicated message back to the Dapr sidecar with an indicated behavior to take on the message. + /// + /// The identifier of the message the behavior is in reference to. + /// The behavior to take on the message as indicated by either the message handler or timeout message handling configuration. + /// Cancellation token. + /// + private async Task AcknowledgeMessageAsync(string messageId, TopicResponseAction behavior, CancellationToken cancellationToken) + { + var action = behavior switch + { + TopicResponseAction.Success => TopicEventResponse.Types.TopicEventResponseStatus.Success, + TopicResponseAction.Retry => TopicEventResponse.Types.TopicEventResponseStatus.Retry, + TopicResponseAction.Drop => TopicEventResponse.Types.TopicEventResponseStatus.Drop, + _ => throw new InvalidOperationException( + $"Unrecognized topic acknowledgement action: {behavior}") + }; + + var acknowledgement = new TopicAcknowledgement(messageId, action); + await acknowledgementsChannel.Writer.WriteAsync(acknowledgement, cancellationToken); + } + + /// + /// Processes each acknowledgement from the acknowledgement channel reader as it's populated. + /// + /// Cancellation token. + private async Task ProcessAcknowledgementChannelMessagesAsync(CancellationToken cancellationToken) + { + var messageStream = await GetStreamAsync(cancellationToken); + await foreach (var acknowledgement in acknowledgementsChannel.Reader.ReadAllAsync(cancellationToken)) + { + await messageStream.RequestStream.WriteAsync(new P.SubscribeTopicEventsRequestAlpha1 + { + EventProcessed = new() + { + Id = acknowledgement.MessageId, + Status = new() { Status = acknowledgement.Action } + } + }, cancellationToken); + } + } + + /// + /// Processes each topic messages from the channel as it's populated. + /// + /// Cancellation token. + private async Task ProcessTopicChannelMessagesAsync(CancellationToken cancellationToken) + { + await foreach (var message in topicMessagesChannel.Reader.ReadAllAsync(cancellationToken)) + { + using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + cts.CancelAfter(options.MessageHandlingPolicy.TimeoutDuration); + + //Evaluate the message and return an acknowledgement result + var messageAction = await messageHandler(message, cts.Token); + + try + { + //Share the result with the sidecar + await AcknowledgeMessageAsync(message.Id, messageAction, cancellationToken); + } + catch (OperationCanceledException) + { + //Acknowledge the message using the configured default response action + await AcknowledgeMessageAsync(message.Id, options.MessageHandlingPolicy.DefaultResponseAction, cancellationToken); + } + } + } + + /// + /// Retrieves the subscription stream data from the Dapr sidecar. + /// + /// The stream connection to and from the Dapr sidecar instance. + /// The channel writer instance. + /// Cancellation token. + private async Task FetchDataFromSidecarAsync( + AsyncDuplexStreamingCall stream, + ChannelWriter channelWriter, CancellationToken cancellationToken) + { + //Build out the initial topic events request + var initialRequest = new P.SubscribeTopicEventsRequestInitialAlpha1() + { + PubsubName = pubSubName, DeadLetterTopic = options.DeadLetterTopic ?? string.Empty, Topic = topicName + }; + + if (options.Metadata.Count > 0) + { + foreach (var (key, value) in options.Metadata) + { + initialRequest.Metadata.Add(key, value); + } + } + + //Send this request to the Dapr sidecar + await stream.RequestStream.WriteAsync( + new P.SubscribeTopicEventsRequestAlpha1 { InitialRequest = initialRequest }, cancellationToken); + + //Each time a message is received from the stream, push it into the topic messages channel + await foreach (var response in stream.ResponseStream.ReadAllAsync(cancellationToken)) + { + var message = + new TopicMessage(response.EventMessage.Id, response.EventMessage.Source, response.EventMessage.Type, + response.EventMessage.SpecVersion, response.EventMessage.DataContentType, + response.EventMessage.Topic, response.EventMessage.PubsubName) + { + Path = response.EventMessage.Path, + Extensions = response.EventMessage.Extensions.Fields.ToDictionary(f => f.Key, kvp => kvp.Value) + }; + + try + { + await channelWriter.WaitToWriteAsync(cancellationToken); + await channelWriter.WriteAsync(message, cancellationToken); + } + catch (Exception) + { + // Handle being unable to write because the writer is completed due to an active DisposeAsync operation + } + } + } + + /// + /// Disposes the various resources associated with the instance. + /// + /// + public async ValueTask DisposeAsync() + { + if (isDisposed) + return; + isDisposed = true; + + //Stop processing new events - we'll leave any messages yet unseen as unprocessed and + //Dapr will handle as necessary when they're not acknowledged + topicMessagesChannel.Writer.Complete(); + + //Flush the remaining acknowledgements, but start by marking the writer as complete so it doesn't receive any other messages either + acknowledgementsChannel.Writer.Complete(); + + try + { + //Process any remaining acknowledgements on the channel without exceeding the configured maximum clean up time + await acknowledgementsChannel.Reader.Completion.WaitAsync(options.MaximumCleanupTimeout); + } + catch (OperationCanceledException) + { + //Handled + } + } + + /// + /// Reflects the action to take on a given message identifier. + /// + /// The identifier of the message. + /// The action to take on the message in the acknowledgement request. + private sealed record TopicAcknowledgement(string MessageId, TopicEventResponse.Types.TopicEventResponseStatus Action); +} + diff --git a/src/Dapr.Messaging/PublishSubscribe/TopicMessage.cs b/src/Dapr.Messaging/PublishSubscribe/TopicMessage.cs new file mode 100644 index 000000000..f65b38069 --- /dev/null +++ b/src/Dapr.Messaging/PublishSubscribe/TopicMessage.cs @@ -0,0 +1,33 @@ +using Google.Protobuf.WellKnownTypes; + +namespace Dapr.Messaging.PublishSubscribe; + +/// +/// A message retrieved from a Dapr publish/subscribe topic. +/// +/// The unique identifier of the topic message. +/// Identifies the context in which an event happened, such as the organization publishing the +/// event or the process that produced the event. The exact syntax and semantics behind the data +/// encoded in the URI is defined by the event producer. +/// The type of event related to the originating occurrence. +/// The spec version of the CloudEvents specification. +/// The content type of the data. +/// The name of the topic. +/// The name of the Dapr publish/subscribe component. +public sealed record TopicMessage(string Id, string Source, string Type, string SpecVersion, string DataContentType, string Topic, string PubSubName) +{ + /// + /// The content of the event. + /// + public ReadOnlyMemory Data { get; init; } + + /// + /// The matching path from the topic subscription/routes (if specified) for this event. + /// + public string? Path { get; init; } + + /// + /// A map of additional custom properties sent to the app. These are considered to be CloudEvent extensions. + /// + public Dictionary Extensions { get; init; } = new(); +} diff --git a/src/Dapr.Messaging/PublishSubscribe/TopicMessageHandler.cs b/src/Dapr.Messaging/PublishSubscribe/TopicMessageHandler.cs new file mode 100644 index 000000000..9c89c4a0d --- /dev/null +++ b/src/Dapr.Messaging/PublishSubscribe/TopicMessageHandler.cs @@ -0,0 +1,10 @@ +namespace Dapr.Messaging.PublishSubscribe; + +/// +/// The handler delegate responsible for processing the topic message. +/// +/// The message request to process. +/// Cancellation token. +/// The acknowledgement behavior to report back to the pub/sub endpoint about the message. +public delegate Task TopicMessageHandler(TopicMessage request, + CancellationToken cancellationToken = default); diff --git a/src/Dapr.Messaging/PublishSubscribe/TopicResponseAction.cs b/src/Dapr.Messaging/PublishSubscribe/TopicResponseAction.cs new file mode 100644 index 000000000..a36ed9831 --- /dev/null +++ b/src/Dapr.Messaging/PublishSubscribe/TopicResponseAction.cs @@ -0,0 +1,21 @@ +namespace Dapr.Messaging.PublishSubscribe; + +/// +/// Describes the various actions that can be taken on a topic message. +/// +public enum TopicResponseAction +{ + /// + /// Indicates the message was processed successfully and should be deleted from the pub/sub topic. + /// + Success, + /// + /// Indicates a failure while processing the message and that the message should be resent for a retry. + /// + Retry, + /// + /// Indicates a failure while processing the message and that the message should be dropped or sent to the + /// dead-letter topic if specified. + /// + Drop +} diff --git a/test/Dapr.Messaging.Test/Dapr.Messaging.Test.csproj b/test/Dapr.Messaging.Test/Dapr.Messaging.Test.csproj new file mode 100644 index 000000000..83dc7b13e --- /dev/null +++ b/test/Dapr.Messaging.Test/Dapr.Messaging.Test.csproj @@ -0,0 +1,47 @@ + + + + enable + enable + + false + true + + + + + runtime; build; native; contentfiles; analyzers; buildtransitive + all + + + + + + + all + runtime; build; native; contentfiles; analyzers; buildtransitive + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/test/Dapr.Messaging.Test/protos/test.proto b/test/Dapr.Messaging.Test/protos/test.proto new file mode 100644 index 000000000..9763fb596 --- /dev/null +++ b/test/Dapr.Messaging.Test/protos/test.proto @@ -0,0 +1,32 @@ +// ------------------------------------------------------------------------ +// Copyright 2021 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +syntax = "proto3"; + +option csharp_namespace = "Dapr.Client.Autogen.Test.Grpc.v1"; + +message TestRun { + repeated TestCase tests = 1; +} + +message TestCase { + string name = 1; +} + +message Request { + string RequestParameter = 1; +} + +message Response { + string Name = 1; +} \ No newline at end of file From 9cfdf764627ea442567b80ef784ff5541ec1f3bf Mon Sep 17 00:00:00 2001 From: Whit Waldo Date: Thu, 24 Oct 2024 07:55:26 -0500 Subject: [PATCH 02/16] Added copyright notices Signed-off-by: Whit Waldo --- .../DaprPublishSubscribeClient.cs | 15 ++++++++++++++- .../DaprPublishSubscribeClientBuilder.cs | 15 ++++++++++++++- .../DaprPublishSubscribeGrpcClient.cs | 15 ++++++++++++++- .../PublishSubscribe/DaprSubscriptionOptions.cs | 15 ++++++++++++++- .../PublishSubscribe/MessageHandlingPolicy.cs | 16 +++++++++++++++- .../PublishSubscribe/PublishSubscribeReceiver.cs | 15 ++++++++++++++- .../PublishSubscribe/TopicMessage.cs | 15 ++++++++++++++- .../PublishSubscribe/TopicMessageHandler.cs | 15 ++++++++++++++- .../PublishSubscribe/TopicResponseAction.cs | 15 ++++++++++++++- 9 files changed, 127 insertions(+), 9 deletions(-) diff --git a/src/Dapr.Messaging/PublishSubscribe/DaprPublishSubscribeClient.cs b/src/Dapr.Messaging/PublishSubscribe/DaprPublishSubscribeClient.cs index 61ee621db..5d933a860 100644 --- a/src/Dapr.Messaging/PublishSubscribe/DaprPublishSubscribeClient.cs +++ b/src/Dapr.Messaging/PublishSubscribe/DaprPublishSubscribeClient.cs @@ -1,4 +1,17 @@ -namespace Dapr.Messaging.PublishSubscribe; +// ------------------------------------------------------------------------ +// Copyright 2024 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +namespace Dapr.Messaging.PublishSubscribe; /// /// The base implementation of a Dapr pub/sub client. diff --git a/src/Dapr.Messaging/PublishSubscribe/DaprPublishSubscribeClientBuilder.cs b/src/Dapr.Messaging/PublishSubscribe/DaprPublishSubscribeClientBuilder.cs index 4bc945ecc..1948dcc6c 100644 --- a/src/Dapr.Messaging/PublishSubscribe/DaprPublishSubscribeClientBuilder.cs +++ b/src/Dapr.Messaging/PublishSubscribe/DaprPublishSubscribeClientBuilder.cs @@ -1,4 +1,17 @@ -// using Autogenerated = Dapr.Client.Autogen.Grpc.v1; +// ------------------------------------------------------------------------ +// Copyright 2024 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +// using Autogenerated = Dapr.Client.Autogen.Grpc.v1; // // namespace Dapr.Messaging.PublishSubscribe; // diff --git a/src/Dapr.Messaging/PublishSubscribe/DaprPublishSubscribeGrpcClient.cs b/src/Dapr.Messaging/PublishSubscribe/DaprPublishSubscribeGrpcClient.cs index 4990dc891..d935d250b 100644 --- a/src/Dapr.Messaging/PublishSubscribe/DaprPublishSubscribeGrpcClient.cs +++ b/src/Dapr.Messaging/PublishSubscribe/DaprPublishSubscribeGrpcClient.cs @@ -1,4 +1,17 @@ -using P = Dapr.Client.Autogen.Grpc.v1.Dapr; +// ------------------------------------------------------------------------ +// Copyright 2024 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +using P = Dapr.Client.Autogen.Grpc.v1.Dapr; namespace Dapr.Messaging.PublishSubscribe; diff --git a/src/Dapr.Messaging/PublishSubscribe/DaprSubscriptionOptions.cs b/src/Dapr.Messaging/PublishSubscribe/DaprSubscriptionOptions.cs index dd124e8f6..73838b605 100644 --- a/src/Dapr.Messaging/PublishSubscribe/DaprSubscriptionOptions.cs +++ b/src/Dapr.Messaging/PublishSubscribe/DaprSubscriptionOptions.cs @@ -1,4 +1,17 @@ -namespace Dapr.Messaging.PublishSubscribe; +// ------------------------------------------------------------------------ +// Copyright 2024 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +namespace Dapr.Messaging.PublishSubscribe; /// /// Options used to configure the dynamic Dapr subscription. diff --git a/src/Dapr.Messaging/PublishSubscribe/MessageHandlingPolicy.cs b/src/Dapr.Messaging/PublishSubscribe/MessageHandlingPolicy.cs index 1cfbc3fbb..de6882095 100644 --- a/src/Dapr.Messaging/PublishSubscribe/MessageHandlingPolicy.cs +++ b/src/Dapr.Messaging/PublishSubscribe/MessageHandlingPolicy.cs @@ -1,4 +1,18 @@ -namespace Dapr.Messaging.PublishSubscribe; +// ------------------------------------------------------------------------ +// Copyright 2024 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + + +namespace Dapr.Messaging.PublishSubscribe; /// /// Defines the policy for handling streaming message subscriptions, including retry logic and timeout settings. diff --git a/src/Dapr.Messaging/PublishSubscribe/PublishSubscribeReceiver.cs b/src/Dapr.Messaging/PublishSubscribe/PublishSubscribeReceiver.cs index 2f7109521..69f212bd1 100644 --- a/src/Dapr.Messaging/PublishSubscribe/PublishSubscribeReceiver.cs +++ b/src/Dapr.Messaging/PublishSubscribe/PublishSubscribeReceiver.cs @@ -1,4 +1,17 @@ -using System.Threading.Channels; +// ------------------------------------------------------------------------ +// Copyright 2024 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +using System.Threading.Channels; using Dapr.AppCallback.Autogen.Grpc.v1; using Grpc.Core; using P = Dapr.Client.Autogen.Grpc.v1; diff --git a/src/Dapr.Messaging/PublishSubscribe/TopicMessage.cs b/src/Dapr.Messaging/PublishSubscribe/TopicMessage.cs index f65b38069..820a3abe5 100644 --- a/src/Dapr.Messaging/PublishSubscribe/TopicMessage.cs +++ b/src/Dapr.Messaging/PublishSubscribe/TopicMessage.cs @@ -1,4 +1,17 @@ -using Google.Protobuf.WellKnownTypes; +// ------------------------------------------------------------------------ +// Copyright 2024 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +using Google.Protobuf.WellKnownTypes; namespace Dapr.Messaging.PublishSubscribe; diff --git a/src/Dapr.Messaging/PublishSubscribe/TopicMessageHandler.cs b/src/Dapr.Messaging/PublishSubscribe/TopicMessageHandler.cs index 9c89c4a0d..65b7abf01 100644 --- a/src/Dapr.Messaging/PublishSubscribe/TopicMessageHandler.cs +++ b/src/Dapr.Messaging/PublishSubscribe/TopicMessageHandler.cs @@ -1,4 +1,17 @@ -namespace Dapr.Messaging.PublishSubscribe; +// ------------------------------------------------------------------------ +// Copyright 2024 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +namespace Dapr.Messaging.PublishSubscribe; /// /// The handler delegate responsible for processing the topic message. diff --git a/src/Dapr.Messaging/PublishSubscribe/TopicResponseAction.cs b/src/Dapr.Messaging/PublishSubscribe/TopicResponseAction.cs index a36ed9831..5a34f4cc2 100644 --- a/src/Dapr.Messaging/PublishSubscribe/TopicResponseAction.cs +++ b/src/Dapr.Messaging/PublishSubscribe/TopicResponseAction.cs @@ -1,4 +1,17 @@ -namespace Dapr.Messaging.PublishSubscribe; +// ------------------------------------------------------------------------ +// Copyright 2024 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +namespace Dapr.Messaging.PublishSubscribe; /// /// Describes the various actions that can be taken on a topic message. From 704849eec7b628058c6c51973b8598f61e471a3a Mon Sep 17 00:00:00 2001 From: Whit Waldo Date: Thu, 24 Oct 2024 07:57:42 -0500 Subject: [PATCH 03/16] Minor stylistic updates Signed-off-by: Whit Waldo --- .../PublishSubscribe/PublishSubscribeReceiver.cs | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/src/Dapr.Messaging/PublishSubscribe/PublishSubscribeReceiver.cs b/src/Dapr.Messaging/PublishSubscribe/PublishSubscribeReceiver.cs index 69f212bd1..7a269b856 100644 --- a/src/Dapr.Messaging/PublishSubscribe/PublishSubscribeReceiver.cs +++ b/src/Dapr.Messaging/PublishSubscribe/PublishSubscribeReceiver.cs @@ -24,13 +24,13 @@ namespace Dapr.Messaging.PublishSubscribe; /// public sealed class PublishSubscribeReceiver : IAsyncDisposable { - private readonly static UnboundedChannelOptions UnboundedChannelOptions = new UnboundedChannelOptions + private readonly static UnboundedChannelOptions UnboundedChannelOptions = new() { SingleWriter = true, SingleReader = true }; /// - /// The name of the Dapr pubsub component. + /// The name of the Dapr pub/sub component. /// private readonly string pubSubName; /// @@ -106,7 +106,10 @@ internal async Task SubscribeAsync(CancellationToken cancellationToken = default { //Prevents the receiver from performing the subscribe operation more than once (as the multiple initialization messages would cancel the stream). if (hasInitialized) + { return; + } + hasInitialized = true; var stream = await GetStreamAsync(cancellationToken); @@ -180,10 +183,10 @@ private async Task ProcessAcknowledgementChannelMessagesAsync(CancellationToken { await messageStream.RequestStream.WriteAsync(new P.SubscribeTopicEventsRequestAlpha1 { - EventProcessed = new() + EventProcessed = new P.SubscribeTopicEventsRequestProcessedAlpha1 { Id = acknowledgement.MessageId, - Status = new() { Status = acknowledgement.Action } + Status = new TopicEventResponse { Status = acknowledgement.Action } } }, cancellationToken); } @@ -275,7 +278,10 @@ await stream.RequestStream.WriteAsync( public async ValueTask DisposeAsync() { if (isDisposed) + { return; + } + isDisposed = true; //Stop processing new events - we'll leave any messages yet unseen as unprocessed and From 2fa0a98923a504476af58ddef22476b280f6cb98 Mon Sep 17 00:00:00 2001 From: Whit Waldo Date: Thu, 24 Oct 2024 08:06:07 -0500 Subject: [PATCH 04/16] Added generic client builder to support publish/subscribe client builder Signed-off-by: Whit Waldo --- src/Dapr.Common/DaprGenericClientBuilder.cs | 214 ++++++++++++++++++ .../DaprPublishSubscribeClientBuilder.cs | 58 +++-- 2 files changed, 248 insertions(+), 24 deletions(-) create mode 100644 src/Dapr.Common/DaprGenericClientBuilder.cs diff --git a/src/Dapr.Common/DaprGenericClientBuilder.cs b/src/Dapr.Common/DaprGenericClientBuilder.cs new file mode 100644 index 000000000..f88b810bc --- /dev/null +++ b/src/Dapr.Common/DaprGenericClientBuilder.cs @@ -0,0 +1,214 @@ +using System.Text.Json; +using Grpc.Net.Client; +using Microsoft.Extensions.Configuration; + +namespace Dapr.Common; + +/// +/// Builder for building a generic Dapr client. +/// +public abstract class DaprGenericClientBuilder where TClientBuilder : class +{ + /// + /// Initializes a new instance of the class. + /// + protected DaprGenericClientBuilder(IConfiguration? configuration = null) + { + this.GrpcEndpoint = DaprDefaults.GetDefaultGrpcEndpoint(); + this.HttpEndpoint = DaprDefaults.GetDefaultHttpEndpoint(); + + this.GrpcChannelOptions = new GrpcChannelOptions() + { + // The gRPC client doesn't throw the right exception for cancellation + // by default, this switches that behavior on. + ThrowOperationCanceledOnCancellation = true, + }; + + this.JsonSerializerOptions = new JsonSerializerOptions(JsonSerializerDefaults.Web); + this.DaprApiToken = DaprDefaults.GetDefaultDaprApiToken(configuration); + } + + /// + /// Property exposed for testing purposes. + /// + internal string GrpcEndpoint { get; private set; } + + /// + /// Property exposed for testing purposes. + /// + internal string HttpEndpoint { get; private set; } + + /// + /// Property exposed for testing purposes. + /// + internal Func? HttpClientFactory { get; set; } + + /// + /// Property exposed for testing purposes. + /// + public JsonSerializerOptions JsonSerializerOptions { get; private set; } + + /// + /// Property exposed for testing purposes. + /// + internal GrpcChannelOptions GrpcChannelOptions { get; private set; } + + /// + /// Property exposed for testing purposes. + /// + public string DaprApiToken { get; private set; } + + /// + /// Property exposed for testing purposes. + /// + internal TimeSpan Timeout { get; private set; } + + /// + /// Overrides the HTTP endpoint used by the Dapr client for communicating with the Dapr runtime. + /// + /// + /// The URI endpoint to use for HTTP calls to the Dapr runtime. The default value will be + /// DAPR_HTTP_ENDPOINT first, or http://127.0.0.1:DAPR_HTTP_PORT as fallback + /// where DAPR_HTTP_ENDPOINT and DAPR_HTTP_PORT represents the value of the + /// corresponding environment variables. + /// + /// The instance. + public DaprGenericClientBuilder UseHttpEndpoint(string httpEndpoint) + { + ArgumentVerifier.ThrowIfNullOrEmpty(httpEndpoint, nameof(httpEndpoint)); + this.HttpEndpoint = httpEndpoint; + return this; + } + + /// + /// Exposed internally for testing purposes. + /// + internal DaprGenericClientBuilder UseHttpClientFactory(Func factory) + { + this.HttpClientFactory = factory; + return this; + } + + /// + /// Overrides the legacy mechanism for building an HttpClient and uses the new + /// introduced in .NET Core 2.1. + /// + /// The factory used to create instances. + /// + public DaprGenericClientBuilder UseHttpClientFactory(IHttpClientFactory httpClientFactory) + { + this.HttpClientFactory = httpClientFactory.CreateClient; + return this; + } + + /// + /// Overrides the gRPC endpoint used by the Dapr client for communicating with the Dapr runtime. + /// + /// + /// The URI endpoint to use for gRPC calls to the Dapr runtime. The default value will be + /// http://127.0.0.1:DAPR_GRPC_PORT where DAPR_GRPC_PORT represents the value of the + /// DAPR_GRPC_PORT environment variable. + /// + /// The instance. + public DaprGenericClientBuilder UseGrpcEndpoint(string grpcEndpoint) + { + ArgumentVerifier.ThrowIfNullOrEmpty(grpcEndpoint, nameof(grpcEndpoint)); + this.GrpcEndpoint = grpcEndpoint; + return this; + } + + /// + /// + /// Uses the specified when serializing or deserializing using . + /// + /// + /// The default value is created using . + /// + /// + /// Json serialization options. + /// The instance. + public DaprGenericClientBuilder UseJsonSerializationOptions(JsonSerializerOptions options) + { + this.JsonSerializerOptions = options; + return this; + } + + /// + /// Uses the provided for creating the . + /// + /// The to use for creating the . + /// The instance. + public DaprGenericClientBuilder UseGrpcChannelOptions(GrpcChannelOptions grpcChannelOptions) + { + this.GrpcChannelOptions = grpcChannelOptions; + return this; + } + + /// + /// Adds the provided on every request to the Dapr runtime. + /// + /// The token to be added to the request headers/>. + /// The instance. + public DaprGenericClientBuilder UseDaprApiToken(string apiToken) + { + this.DaprApiToken = apiToken; + return this; + } + + /// + /// Sets the timeout for the HTTP client used by the Dapr client. + /// + /// + /// + public DaprGenericClientBuilder UseTimeout(TimeSpan timeout) + { + this.Timeout = timeout; + return this; + } + + /// + /// Builds out the inner DaprClient that provides the core shape of the + /// runtime gRPC client used by the consuming package. + /// + /// + protected (GrpcChannel channel, HttpClient httpClient, Uri httpEndpoint) BuildDaprClientDependencies() + { + var grpcEndpoint = new Uri(this.GrpcEndpoint); + if (grpcEndpoint.Scheme != "http" && grpcEndpoint.Scheme != "https") + { + throw new InvalidOperationException("The gRPC endpoint must use http or https."); + } + + if (grpcEndpoint.Scheme.Equals(Uri.UriSchemeHttp)) + { + // Set correct switch to make secure gRPC service calls. This switch must be set before creating the GrpcChannel. + AppContext.SetSwitch("System.Net.Http.SocketsHttpHandler.Http2UnencryptedSupport", true); + } + + var httpEndpoint = new Uri(this.HttpEndpoint); + if (httpEndpoint.Scheme != "http" && httpEndpoint.Scheme != "https") + { + throw new InvalidOperationException("The HTTP endpoint must use http or https."); + } + + var channel = GrpcChannel.ForAddress(this.GrpcEndpoint, this.GrpcChannelOptions); + + var httpClient = HttpClientFactory is not null ? HttpClientFactory() : new HttpClient(); + if (this.Timeout > TimeSpan.Zero) + { + httpClient.Timeout = this.Timeout; + } + + return (channel, httpClient, httpEndpoint); + } + + /// + /// Builds the client instance from the properties of the builder. + /// + /// The Dapr client instance. + /// + /// Builds the client instance from the properties of the builder. + /// + public abstract TClientBuilder Build(); +} + diff --git a/src/Dapr.Messaging/PublishSubscribe/DaprPublishSubscribeClientBuilder.cs b/src/Dapr.Messaging/PublishSubscribe/DaprPublishSubscribeClientBuilder.cs index 1948dcc6c..b94bc5cdf 100644 --- a/src/Dapr.Messaging/PublishSubscribe/DaprPublishSubscribeClientBuilder.cs +++ b/src/Dapr.Messaging/PublishSubscribe/DaprPublishSubscribeClientBuilder.cs @@ -11,27 +11,37 @@ // limitations under the License. // ------------------------------------------------------------------------ -// using Autogenerated = Dapr.Client.Autogen.Grpc.v1; -// -// namespace Dapr.Messaging.PublishSubscribe; -// -// /// -// /// Builds a . -// /// -// public sealed class DaprPublishSubscribeClientBuilder : DaprGenericClientBuilder -// { -// /// -// /// Builds the client instance from the properties of the builder. -// /// -// /// The Dapr client instance. -// /// -// /// Builds the client instance from the properties of the builder. -// /// -// public override DaprPublishSubscribeClient Build() -// { -// var daprClientDependencies = BuildDaprClientDependencies(); -// var client = new Autogenerated.Dapr.DaprClient(daprClientDependencies.channel); -// -// return new DaprPublishSubscribeGrpcClient(client); -// } -// } +using Dapr.Common; +using Microsoft.Extensions.Configuration; +using Autogenerated = Dapr.Client.Autogen.Grpc.v1; + +namespace Dapr.Messaging.PublishSubscribe; + +/// +/// Builds a . +/// +public sealed class DaprPublishSubscribeClientBuilder : DaprGenericClientBuilder +{ + /// + /// Used to initialize a new instance of the . + /// + /// An optional instance of . + public DaprPublishSubscribeClientBuilder(IConfiguration? configuration = null) : base(configuration) + { + } + + /// + /// Builds the client instance from the properties of the builder. + /// + /// The Dapr client instance. + /// + /// Builds the client instance from the properties of the builder. + /// + public override DaprPublishSubscribeClient Build() + { + var daprClientDependencies = BuildDaprClientDependencies(); + var client = new Autogenerated.Dapr.DaprClient(daprClientDependencies.channel); + + return new DaprPublishSubscribeGrpcClient(client); + } +} From f37ec295cc79a7ad0573c4782789ccade5533c6f Mon Sep 17 00:00:00 2001 From: Whit Waldo Date: Thu, 24 Oct 2024 08:33:19 -0500 Subject: [PATCH 05/16] Tweaked XML comment Signed-off-by: Whit Waldo --- src/Dapr.Messaging/PublishSubscribe/PublishSubscribeReceiver.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Dapr.Messaging/PublishSubscribe/PublishSubscribeReceiver.cs b/src/Dapr.Messaging/PublishSubscribe/PublishSubscribeReceiver.cs index 7a269b856..25f715cf5 100644 --- a/src/Dapr.Messaging/PublishSubscribe/PublishSubscribeReceiver.cs +++ b/src/Dapr.Messaging/PublishSubscribe/PublishSubscribeReceiver.cs @@ -30,7 +30,7 @@ public sealed class PublishSubscribeReceiver : IAsyncDisposable }; /// - /// The name of the Dapr pub/sub component. + /// The name of the Dapr publish/subscribe component. /// private readonly string pubSubName; /// From f82e03d47a8c6a0ba76611df36781531a708d855 Mon Sep 17 00:00:00 2001 From: Whit Waldo Date: Thu, 24 Oct 2024 11:30:23 -0500 Subject: [PATCH 06/16] Added several unit tests for the generic client builder Signed-off-by: Whit Waldo --- .../DaprGenericClientBuilderTest.cs | 94 +++++++++++++++++++ .../MessageHandlingPolicyTest.cs | 57 +++++++++++ 2 files changed, 151 insertions(+) create mode 100644 test/Dapr.Common.Test/DaprGenericClientBuilderTest.cs create mode 100644 test/Dapr.Messaging.Test/PublishSubscribe/MessageHandlingPolicyTest.cs diff --git a/test/Dapr.Common.Test/DaprGenericClientBuilderTest.cs b/test/Dapr.Common.Test/DaprGenericClientBuilderTest.cs new file mode 100644 index 000000000..d28b40058 --- /dev/null +++ b/test/Dapr.Common.Test/DaprGenericClientBuilderTest.cs @@ -0,0 +1,94 @@ +using System; +using System.Text.Json; +using Xunit; + +namespace Dapr.Common.Test; + +public class DaprGenericClientBuilderTest +{ + [Fact] + public void DaprGenericClientBuilder_ShouldUpdateHttpEndpoint_WhenHttpEndpointIsProvided() + { + // Arrange + var builder = new SampleDaprGenericClientBuilder(); + const string endpointValue = "http://sample-endpoint"; + + // Act + builder.UseHttpEndpoint(endpointValue); + + // Assert + Assert.Equal(endpointValue, builder.HttpEndpoint); + } + + [Fact] + public void DaprGenericClientBuilder_ShouldUpdateHttpEndpoint_WhenGrpcEndpointIsProvided() + { + // Arrange + var builder = new SampleDaprGenericClientBuilder(); + const string endpointValue = "http://sample-endpoint"; + + // Act + builder.UseGrpcEndpoint(endpointValue); + + // Assert + Assert.Equal(endpointValue, builder.GrpcEndpoint); + } + + [Fact] + public void DaprGenericClientBuilder_ShouldUpdateJsonSerializerOptions() + { + // Arrange + const int maxDepth = 8; + const bool writeIndented = true; + var builder = new SampleDaprGenericClientBuilder(); + var options = new JsonSerializerOptions + { + WriteIndented = writeIndented, + MaxDepth = maxDepth + }; + + // Act + builder.UseJsonSerializationOptions(options); + + // Assert + Assert.Equal(writeIndented, builder.JsonSerializerOptions.WriteIndented); + Assert.Equal(maxDepth, builder.JsonSerializerOptions.MaxDepth); + } + + [Fact] + public void DaprGenericClientBuilder_ShouldUpdateDaprApiToken() + { + // Arrange + const string apiToken = "abc123"; + var builder = new SampleDaprGenericClientBuilder(); + + // Act + builder.UseDaprApiToken(apiToken); + + // Assert + Assert.Equal(apiToken, builder.DaprApiToken); + } + + [Fact] + public void DaprGenericClientBuilder_ShouldUpdateTimeout() + { + // Arrange + var timeout = new TimeSpan(4, 2, 1, 2); + var builder = new SampleDaprGenericClientBuilder(); + + // Act + builder.UseTimeout(timeout); + + // Assert + Assert.Equal(timeout, builder.Timeout); + } + + private class SampleDaprGenericClientBuilder : DaprGenericClientBuilder + { + public override SampleDaprGenericClientBuilder Build() + { + // Implementation + throw new NotImplementedException(); + } + } +} diff --git a/test/Dapr.Messaging.Test/PublishSubscribe/MessageHandlingPolicyTest.cs b/test/Dapr.Messaging.Test/PublishSubscribe/MessageHandlingPolicyTest.cs new file mode 100644 index 000000000..450f91ddb --- /dev/null +++ b/test/Dapr.Messaging.Test/PublishSubscribe/MessageHandlingPolicyTest.cs @@ -0,0 +1,57 @@ +using System; +using Xunit; +using Dapr.Messaging.PublishSubscribe; + +namespace Dapr.Messaging.Test.PublishSubscribe +{ + public class MessageHandlingPolicyTest + { + [Fact] + public void Test_MessageHandlingPolicy_Constructor() + { + var timeoutDuration = TimeSpan.FromMilliseconds(2000); + var defaultResponseAction = TopicResponseAction.Drop; + + var policy = new MessageHandlingPolicy(timeoutDuration, defaultResponseAction); + + Assert.Equal(timeoutDuration, policy.TimeoutDuration); + Assert.Equal(defaultResponseAction, policy.DefaultResponseAction); + } + + [Fact] + public void Test_MessageHandlingPolicy_Equality() + { + var timeSpan1 = TimeSpan.FromMilliseconds(1000); + var timeSpan2 = TimeSpan.FromMilliseconds(2000); + + var policy1 = new MessageHandlingPolicy(timeSpan1, TopicResponseAction.Success); + var policy2 = new MessageHandlingPolicy(timeSpan1, TopicResponseAction.Success); + var policy3 = new MessageHandlingPolicy(timeSpan2, TopicResponseAction.Retry); + + Assert.Equal(policy1, policy2); // Value Equality + Assert.NotEqual(policy1, policy3); // Different values + } + + [Fact] + public void Test_MessageHandlingPolicy_Immutability() + { + var timeoutDuration = TimeSpan.FromMilliseconds(2000); + var defaultResponseAction = TopicResponseAction.Drop; + + var policy1 = new MessageHandlingPolicy(timeoutDuration, defaultResponseAction); + + var newTimeoutDuration = TimeSpan.FromMilliseconds(3000); + var newDefaultResponseAction = TopicResponseAction.Retry; + + // Creating a new policy with different values. + var policy2 = policy1 with + { + TimeoutDuration = newTimeoutDuration, DefaultResponseAction = newDefaultResponseAction + }; + + // Asserting that original policy is unaffected by changes made to new policy. + Assert.Equal(timeoutDuration, policy1.TimeoutDuration); + Assert.Equal(defaultResponseAction, policy1.DefaultResponseAction); + } + } +} From 86a17c1cf1bce0439bddc949a4c1c6cbb32544d1 Mon Sep 17 00:00:00 2001 From: Whit Waldo Date: Thu, 24 Oct 2024 11:38:25 -0500 Subject: [PATCH 07/16] Updated to include latest review changes: - Added lock so that while we guarantee the method is called only once, it should be thread-safe now - Marked PublishSubscribeReceiver as internal so its members aren't part of the public API - Updated TopicMessage to use IReadOnlyDictionary Signed-off-by: Whit Waldo --- .../PublishSubscribeReceiver.cs | 18 +++++++++++++----- .../PublishSubscribe/TopicMessage.cs | 2 +- 2 files changed, 14 insertions(+), 6 deletions(-) diff --git a/src/Dapr.Messaging/PublishSubscribe/PublishSubscribeReceiver.cs b/src/Dapr.Messaging/PublishSubscribe/PublishSubscribeReceiver.cs index 25f715cf5..a360e2cb2 100644 --- a/src/Dapr.Messaging/PublishSubscribe/PublishSubscribeReceiver.cs +++ b/src/Dapr.Messaging/PublishSubscribe/PublishSubscribeReceiver.cs @@ -22,12 +22,17 @@ namespace Dapr.Messaging.PublishSubscribe; /// A thread-safe implementation of a receiver for messages from a specified Dapr publish/subscribe component and /// topic. /// -public sealed class PublishSubscribeReceiver : IAsyncDisposable +internal sealed class PublishSubscribeReceiver : IAsyncDisposable { + /// + /// Provides options for the unbounded channel. + /// private readonly static UnboundedChannelOptions UnboundedChannelOptions = new() { SingleWriter = true, SingleReader = true }; + + private static readonly object threadLock = new object(); /// /// The name of the Dapr publish/subscribe component. @@ -105,12 +110,15 @@ internal PublishSubscribeReceiver(string pubSubName, string topicName, DaprSubsc internal async Task SubscribeAsync(CancellationToken cancellationToken = default) { //Prevents the receiver from performing the subscribe operation more than once (as the multiple initialization messages would cancel the stream). - if (hasInitialized) + lock (threadLock) { - return; - } + if (hasInitialized) + { + return; + } - hasInitialized = true; + hasInitialized = true; + } var stream = await GetStreamAsync(cancellationToken); diff --git a/src/Dapr.Messaging/PublishSubscribe/TopicMessage.cs b/src/Dapr.Messaging/PublishSubscribe/TopicMessage.cs index 820a3abe5..402a89e9f 100644 --- a/src/Dapr.Messaging/PublishSubscribe/TopicMessage.cs +++ b/src/Dapr.Messaging/PublishSubscribe/TopicMessage.cs @@ -42,5 +42,5 @@ public sealed record TopicMessage(string Id, string Source, string Type, string /// /// A map of additional custom properties sent to the app. These are considered to be CloudEvent extensions. /// - public Dictionary Extensions { get; init; } = new(); + public IReadOnlyDictionary Extensions { get; init; } = new Dictionary(); } From db9b475c9fbc53788624b77a9a81f4e2fcf89676 Mon Sep 17 00:00:00 2001 From: Whit Waldo Date: Thu, 24 Oct 2024 11:41:42 -0500 Subject: [PATCH 08/16] Switched to interlock exchange instead of lock to slightly simplify code Signed-off-by: Whit Waldo --- .../PublishSubscribe/PublishSubscribeReceiver.cs | 13 +++---------- 1 file changed, 3 insertions(+), 10 deletions(-) diff --git a/src/Dapr.Messaging/PublishSubscribe/PublishSubscribeReceiver.cs b/src/Dapr.Messaging/PublishSubscribe/PublishSubscribeReceiver.cs index a360e2cb2..be80809f8 100644 --- a/src/Dapr.Messaging/PublishSubscribe/PublishSubscribeReceiver.cs +++ b/src/Dapr.Messaging/PublishSubscribe/PublishSubscribeReceiver.cs @@ -32,8 +32,6 @@ internal sealed class PublishSubscribeReceiver : IAsyncDisposable SingleWriter = true, SingleReader = true }; - private static readonly object threadLock = new object(); - /// /// The name of the Dapr publish/subscribe component. /// @@ -73,7 +71,7 @@ internal sealed class PublishSubscribeReceiver : IAsyncDisposable /// /// Flag that prevents the developer from accidentally initializing the subscription more than once from the same receiver. /// - private bool hasInitialized; + private int hasInitialized; /// /// Flag that ensures the instance is only disposed a single time. /// @@ -110,14 +108,9 @@ internal PublishSubscribeReceiver(string pubSubName, string topicName, DaprSubsc internal async Task SubscribeAsync(CancellationToken cancellationToken = default) { //Prevents the receiver from performing the subscribe operation more than once (as the multiple initialization messages would cancel the stream). - lock (threadLock) + if (Interlocked.Exchange(ref hasInitialized, 1) == 1) { - if (hasInitialized) - { - return; - } - - hasInitialized = true; + return; } var stream = await GetStreamAsync(cancellationToken); From 5c2bafd4e24301af3e5cb2acec57da7832f88775 Mon Sep 17 00:00:00 2001 From: Whit Waldo Date: Thu, 24 Oct 2024 16:31:54 -0500 Subject: [PATCH 09/16] Added sample project Signed-off-by: Whit Waldo --- all.sln | 7 +++++ .../StreamingSubscriptionExample/Program.cs | 31 +++++++++++++++++++ .../StreamingSubscriptionExample.csproj | 14 +++++++++ 3 files changed, 52 insertions(+) create mode 100644 examples/Client/PublishSubscribe/StreamingSubscriptionExample/Program.cs create mode 100644 examples/Client/PublishSubscribe/StreamingSubscriptionExample/StreamingSubscriptionExample.csproj diff --git a/all.sln b/all.sln index d8fc37b4b..02d9afcc4 100644 --- a/all.sln +++ b/all.sln @@ -123,6 +123,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Dapr.Messaging.Test", "test EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Dapr.Messaging", "src\Dapr.Messaging\Dapr.Messaging.csproj", "{0EAE36A1-B578-4F13-A113-7A477ECA1BDA}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "StreamingSubscriptionExample", "examples\Client\PublishSubscribe\StreamingSubscriptionExample\StreamingSubscriptionExample.csproj", "{290D1278-F613-4DF3-9DF5-F37E38CDC363}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -315,6 +317,10 @@ Global {0EAE36A1-B578-4F13-A113-7A477ECA1BDA}.Debug|Any CPU.Build.0 = Debug|Any CPU {0EAE36A1-B578-4F13-A113-7A477ECA1BDA}.Release|Any CPU.ActiveCfg = Release|Any CPU {0EAE36A1-B578-4F13-A113-7A477ECA1BDA}.Release|Any CPU.Build.0 = Release|Any CPU + {290D1278-F613-4DF3-9DF5-F37E38CDC363}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {290D1278-F613-4DF3-9DF5-F37E38CDC363}.Debug|Any CPU.Build.0 = Debug|Any CPU + {290D1278-F613-4DF3-9DF5-F37E38CDC363}.Release|Any CPU.ActiveCfg = Release|Any CPU + {290D1278-F613-4DF3-9DF5-F37E38CDC363}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -373,6 +379,7 @@ Global {CDB47863-BEBD-4841-A807-46D868962521} = {DD020B34-460F-455F-8D17-CF4A949F100B} {4E04EB35-7FD2-4FDB-B09A-F75CE24053B9} = {DD020B34-460F-455F-8D17-CF4A949F100B} {0EAE36A1-B578-4F13-A113-7A477ECA1BDA} = {27C5D71D-0721-4221-9286-B94AB07B58CF} + {290D1278-F613-4DF3-9DF5-F37E38CDC363} = {0EF6EA64-D7C3-420D-9890-EAE8D54A57E6} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {65220BF2-EAE1-4CB2-AA58-EBE80768CB40} diff --git a/examples/Client/PublishSubscribe/StreamingSubscriptionExample/Program.cs b/examples/Client/PublishSubscribe/StreamingSubscriptionExample/Program.cs new file mode 100644 index 000000000..2eac0c919 --- /dev/null +++ b/examples/Client/PublishSubscribe/StreamingSubscriptionExample/Program.cs @@ -0,0 +1,31 @@ +using System.Text; +using Dapr.Messaging.PublishSubscribe; + +var daprMessagingClientBuilder = new DaprPublishSubscribeClientBuilder(null); +var daprMessagingClient = daprMessagingClientBuilder.Build(); + +//Process each message returned from the subscription +Task HandleMessageAsync(TopicMessage message, CancellationToken cancellationToken = default) +{ + try + { + //Do something with the message + Console.WriteLine(Encoding.UTF8.GetString(message.Data.Span)); + return Task.FromResult(TopicResponseAction.Success); + } + catch + { + return Task.FromResult(TopicResponseAction.Retry); + } +} + +//Create a dynamic streaming subscription and subscribe with a timeout of 20 seconds and 10 seconds for message handling +var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(20)); +var subscription = await daprMessagingClient.SubscribeAsync("pubsub", "myTopic", + new DaprSubscriptionOptions(new MessageHandlingPolicy(TimeSpan.FromSeconds(10), TopicResponseAction.Retry)), + HandleMessageAsync, cancellationTokenSource.Token); + +await Task.Delay(TimeSpan.FromMinutes(1)); + +//When you're done with the subscription, simply dispose of it +await subscription.DisposeAsync(); diff --git a/examples/Client/PublishSubscribe/StreamingSubscriptionExample/StreamingSubscriptionExample.csproj b/examples/Client/PublishSubscribe/StreamingSubscriptionExample/StreamingSubscriptionExample.csproj new file mode 100644 index 000000000..c73345b40 --- /dev/null +++ b/examples/Client/PublishSubscribe/StreamingSubscriptionExample/StreamingSubscriptionExample.csproj @@ -0,0 +1,14 @@ + + + + Exe + net6.0 + enable + enable + + + + + + + From 5f1a5f82efa93d238d2c53539a7cdbf28e678abc Mon Sep 17 00:00:00 2001 From: Whit Waldo Date: Thu, 24 Oct 2024 16:56:25 -0500 Subject: [PATCH 10/16] Minor changes to unit test Signed-off-by: Whit Waldo --- test/Dapr.Messaging.Test/Dapr.Messaging.Test.csproj | 5 ----- .../PublishSubscribe/MessageHandlingPolicyTest.cs | 10 ++++------ 2 files changed, 4 insertions(+), 11 deletions(-) diff --git a/test/Dapr.Messaging.Test/Dapr.Messaging.Test.csproj b/test/Dapr.Messaging.Test/Dapr.Messaging.Test.csproj index 83dc7b13e..f3ce7891d 100644 --- a/test/Dapr.Messaging.Test/Dapr.Messaging.Test.csproj +++ b/test/Dapr.Messaging.Test/Dapr.Messaging.Test.csproj @@ -3,7 +3,6 @@ enable enable - false true @@ -40,8 +39,4 @@ - - - - diff --git a/test/Dapr.Messaging.Test/PublishSubscribe/MessageHandlingPolicyTest.cs b/test/Dapr.Messaging.Test/PublishSubscribe/MessageHandlingPolicyTest.cs index 450f91ddb..0efb5e879 100644 --- a/test/Dapr.Messaging.Test/PublishSubscribe/MessageHandlingPolicyTest.cs +++ b/test/Dapr.Messaging.Test/PublishSubscribe/MessageHandlingPolicyTest.cs @@ -1,6 +1,4 @@ -using System; -using Xunit; -using Dapr.Messaging.PublishSubscribe; +using Dapr.Messaging.PublishSubscribe; namespace Dapr.Messaging.Test.PublishSubscribe { @@ -10,7 +8,7 @@ public class MessageHandlingPolicyTest public void Test_MessageHandlingPolicy_Constructor() { var timeoutDuration = TimeSpan.FromMilliseconds(2000); - var defaultResponseAction = TopicResponseAction.Drop; + const TopicResponseAction defaultResponseAction = TopicResponseAction.Drop; var policy = new MessageHandlingPolicy(timeoutDuration, defaultResponseAction); @@ -36,12 +34,12 @@ public void Test_MessageHandlingPolicy_Equality() public void Test_MessageHandlingPolicy_Immutability() { var timeoutDuration = TimeSpan.FromMilliseconds(2000); - var defaultResponseAction = TopicResponseAction.Drop; + const TopicResponseAction defaultResponseAction = TopicResponseAction.Drop; var policy1 = new MessageHandlingPolicy(timeoutDuration, defaultResponseAction); var newTimeoutDuration = TimeSpan.FromMilliseconds(3000); - var newDefaultResponseAction = TopicResponseAction.Retry; + const TopicResponseAction newDefaultResponseAction = TopicResponseAction.Retry; // Creating a new policy with different values. var policy2 = policy1 with From 55fc274c243570d2def6eea3badd38958cef69c9 Mon Sep 17 00:00:00 2001 From: Whit Waldo Date: Thu, 24 Oct 2024 17:15:52 -0500 Subject: [PATCH 11/16] Deleted protos folder Signed-off-by: Whit Waldo --- test/Dapr.Messaging.Test/Dapr.Messaging.Test.csproj | 4 ---- 1 file changed, 4 deletions(-) diff --git a/test/Dapr.Messaging.Test/Dapr.Messaging.Test.csproj b/test/Dapr.Messaging.Test/Dapr.Messaging.Test.csproj index f3ce7891d..f3769200d 100644 --- a/test/Dapr.Messaging.Test/Dapr.Messaging.Test.csproj +++ b/test/Dapr.Messaging.Test/Dapr.Messaging.Test.csproj @@ -31,10 +31,6 @@ - - - - From 0741dde714205e4fe156ab965ca6f3d10970f5f7 Mon Sep 17 00:00:00 2001 From: Whit Waldo Date: Thu, 24 Oct 2024 17:18:56 -0500 Subject: [PATCH 12/16] Using lowercase protos dir name Signed-off-by: Whit Waldo --- test/Dapr.Messaging.Test/Dapr.Messaging.Test.csproj | 4 ++++ test/Dapr.Messaging.Test/protos/test.proto | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/test/Dapr.Messaging.Test/Dapr.Messaging.Test.csproj b/test/Dapr.Messaging.Test/Dapr.Messaging.Test.csproj index f3769200d..8f39e1713 100644 --- a/test/Dapr.Messaging.Test/Dapr.Messaging.Test.csproj +++ b/test/Dapr.Messaging.Test/Dapr.Messaging.Test.csproj @@ -31,6 +31,10 @@ + + + + diff --git a/test/Dapr.Messaging.Test/protos/test.proto b/test/Dapr.Messaging.Test/protos/test.proto index 9763fb596..b1c1ad8a9 100644 --- a/test/Dapr.Messaging.Test/protos/test.proto +++ b/test/Dapr.Messaging.Test/protos/test.proto @@ -1,4 +1,4 @@ -// ------------------------------------------------------------------------ +// ------------------------------------------------------------------------ // Copyright 2021 The Dapr Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. From 2a51b6ed7c3fc9389cd167014135c4472a6b9649 Mon Sep 17 00:00:00 2001 From: Whit Waldo Date: Wed, 30 Oct 2024 06:21:18 -0500 Subject: [PATCH 13/16] Added registration extension methods Signed-off-by: Whit Waldo --- ...ishSubscribeServiceCollectionExtensions.cs | 93 ++++++++----------- 1 file changed, 38 insertions(+), 55 deletions(-) diff --git a/src/Dapr.Messaging/PublishSubscribe/Extensions/PublishSubscribeServiceCollectionExtensions.cs b/src/Dapr.Messaging/PublishSubscribe/Extensions/PublishSubscribeServiceCollectionExtensions.cs index ae09ddb48..bc60c5880 100644 --- a/src/Dapr.Messaging/PublishSubscribe/Extensions/PublishSubscribeServiceCollectionExtensions.cs +++ b/src/Dapr.Messaging/PublishSubscribe/Extensions/PublishSubscribeServiceCollectionExtensions.cs @@ -1,55 +1,38 @@ -// using Microsoft.Extensions.DependencyInjection; -// -// namespace System.Runtime.CompilerServices.PublishSubscribe.Extensions; -// -// /// -// /// Contains extension methods for using Dapr Publish/Subscribe with dependency injection. -// /// -// public static class PublishSubscribeServiceCollectionExtensions -// { -// /// -// /// Adds Dapr Publish/Subscribe support to the service collection. -// /// -// /// The . -// /// -// public static IServiceCollection AddDaprPubSubClient(this IServiceCollection services) => -// AddDaprPubSubClient(services, (_, _) => { }); -// -// /// -// /// Adds Dapr Publish/Subscribe support to the service collection. -// /// -// /// The . -// /// Optionally allows greater configuration of the . -// /// -// public static IServiceCollection AddDaprPubSubClient(this IServiceCollection services, -// Action? configure) => -// services.AddDaprPubSubClient((_, builder) => configure?.Invoke(builder)); -// -// /// -// /// Adds Dapr Publish/Subscribe support to the service collection. -// /// -// /// The . -// /// Optionally allows greater configuration of the using injected services. -// /// -// public static IServiceCollection AddDaprPubSubClient(this IServiceCollection services, Action? configure) -// { -// ArgumentNullException.ThrowIfNull(services, nameof(services)); -// -// //Register the IHttpClientFactory implementation -// services.AddHttpClient(); -// -// services.TryAddSingleton(serviceProvider => -// { -// var httpClientFactory = serviceProvider.GetRequiredService(); -// -// var builder = new DaprPublishSubscribeClientBuilder(); -// builder.UseHttpClientFactory(httpClientFactory); -// -// configure?.Invoke(serviceProvider, builder); -// -// return builder.Build(); -// }); -// -// return services; -// } -// } +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.DependencyInjection.Extensions; + +namespace Dapr.Messaging.PublishSubscribe.Extensions; + +/// +/// Contains extension methods for using Dapr Publish/Subscribe with dependency injection. +/// +public static class PublishSubscribeServiceCollectionExtensions +{ + /// + /// Adds Dapr Publish/Subscribe support to the service collection. + /// + /// The . + /// Optionally allows greater configuration of the using injected services. + /// + public static IServiceCollection AddDaprPubSubClient(this IServiceCollection services, Action? configure = null) + { + ArgumentNullException.ThrowIfNull(services, nameof(services)); + + //Register the IHttpClientFactory implementation + services.AddHttpClient(); + + services.TryAddSingleton(serviceProvider => + { + var httpClientFactory = serviceProvider.GetRequiredService(); + + var builder = new DaprPublishSubscribeClientBuilder(); + builder.UseHttpClientFactory(httpClientFactory); + + configure?.Invoke(serviceProvider, builder); + + return builder.Build(); + }); + + return services; + } +} From 8febaa50db5447cedf4f661b657d6dd48162ebff Mon Sep 17 00:00:00 2001 From: Whit Waldo Date: Wed, 30 Oct 2024 06:27:39 -0500 Subject: [PATCH 14/16] Updated example to use DI registration Signed-off-by: Whit Waldo --- .../StreamingSubscriptionExample/Program.cs | 14 +++++++++----- .../StreamingSubscriptionExample.csproj | 2 +- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/examples/Client/PublishSubscribe/StreamingSubscriptionExample/Program.cs b/examples/Client/PublishSubscribe/StreamingSubscriptionExample/Program.cs index 2eac0c919..ac00d8798 100644 --- a/examples/Client/PublishSubscribe/StreamingSubscriptionExample/Program.cs +++ b/examples/Client/PublishSubscribe/StreamingSubscriptionExample/Program.cs @@ -1,8 +1,10 @@ using System.Text; using Dapr.Messaging.PublishSubscribe; +using Dapr.Messaging.PublishSubscribe.Extensions; -var daprMessagingClientBuilder = new DaprPublishSubscribeClientBuilder(null); -var daprMessagingClient = daprMessagingClientBuilder.Build(); +var builder = WebApplication.CreateBuilder(args); +builder.Services.AddDaprPubSubClient(); +var app = builder.Build(); //Process each message returned from the subscription Task HandleMessageAsync(TopicMessage message, CancellationToken cancellationToken = default) @@ -19,9 +21,11 @@ Task HandleMessageAsync(TopicMessage message, CancellationT } } -//Create a dynamic streaming subscription and subscribe with a timeout of 20 seconds and 10 seconds for message handling -var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(20)); -var subscription = await daprMessagingClient.SubscribeAsync("pubsub", "myTopic", +var messagingClient = app.Services.GetRequiredService(); + +//Create a dynamic streaming subscription and subscribe with a timeout of 30 seconds and 10 seconds for message handling +var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(30)); +var subscription = await messagingClient.SubscribeAsync("pubsub", "myTopic", new DaprSubscriptionOptions(new MessageHandlingPolicy(TimeSpan.FromSeconds(10), TopicResponseAction.Retry)), HandleMessageAsync, cancellationTokenSource.Token); diff --git a/examples/Client/PublishSubscribe/StreamingSubscriptionExample/StreamingSubscriptionExample.csproj b/examples/Client/PublishSubscribe/StreamingSubscriptionExample/StreamingSubscriptionExample.csproj index c73345b40..4ad620d00 100644 --- a/examples/Client/PublishSubscribe/StreamingSubscriptionExample/StreamingSubscriptionExample.csproj +++ b/examples/Client/PublishSubscribe/StreamingSubscriptionExample/StreamingSubscriptionExample.csproj @@ -1,4 +1,4 @@ - + Exe From 6ebb6dafec5ac2a968932e10afdadf29a11c5663 Mon Sep 17 00:00:00 2001 From: Whit Waldo Date: Wed, 30 Oct 2024 06:29:51 -0500 Subject: [PATCH 15/16] Added default cancellation token Signed-off-by: Whit Waldo --- .../PublishSubscribe/DaprPublishSubscribeClient.cs | 2 +- .../PublishSubscribe/DaprPublishSubscribeGrpcClient.cs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Dapr.Messaging/PublishSubscribe/DaprPublishSubscribeClient.cs b/src/Dapr.Messaging/PublishSubscribe/DaprPublishSubscribeClient.cs index 5d933a860..8fbec2dfe 100644 --- a/src/Dapr.Messaging/PublishSubscribe/DaprPublishSubscribeClient.cs +++ b/src/Dapr.Messaging/PublishSubscribe/DaprPublishSubscribeClient.cs @@ -27,5 +27,5 @@ public abstract class DaprPublishSubscribeClient /// The delegate reflecting the action to take upon messages received by the subscription. /// Cancellation token. /// - public abstract Task SubscribeAsync(string pubSubName, string topicName, DaprSubscriptionOptions options, TopicMessageHandler messageHandler, CancellationToken cancellationToken); + public abstract Task SubscribeAsync(string pubSubName, string topicName, DaprSubscriptionOptions options, TopicMessageHandler messageHandler, CancellationToken cancellationToken = default); } diff --git a/src/Dapr.Messaging/PublishSubscribe/DaprPublishSubscribeGrpcClient.cs b/src/Dapr.Messaging/PublishSubscribe/DaprPublishSubscribeGrpcClient.cs index d935d250b..df6ccdcfe 100644 --- a/src/Dapr.Messaging/PublishSubscribe/DaprPublishSubscribeGrpcClient.cs +++ b/src/Dapr.Messaging/PublishSubscribe/DaprPublishSubscribeGrpcClient.cs @@ -39,7 +39,7 @@ public DaprPublishSubscribeGrpcClient(P.DaprClient client) /// The delegate reflecting the action to take upon messages received by the subscription. /// Cancellation token. /// - public override async Task SubscribeAsync(string pubSubName, string topicName, DaprSubscriptionOptions options, TopicMessageHandler messageHandler, CancellationToken cancellationToken) + public override async Task SubscribeAsync(string pubSubName, string topicName, DaprSubscriptionOptions options, TopicMessageHandler messageHandler, CancellationToken cancellationToken = default) { var receiver = new PublishSubscribeReceiver(pubSubName, topicName, options, messageHandler, daprClient); await receiver.SubscribeAsync(cancellationToken); From 68f09ac08cd3e876c3e0ef7eac48043bcc5d9165 Mon Sep 17 00:00:00 2001 From: Whit Waldo Date: Tue, 5 Nov 2024 01:18:33 -0600 Subject: [PATCH 16/16] Passing stream into method instead of creating it twice Signed-off-by: Whit Waldo --- .../PublishSubscribe/PublishSubscribeReceiver.cs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/Dapr.Messaging/PublishSubscribe/PublishSubscribeReceiver.cs b/src/Dapr.Messaging/PublishSubscribe/PublishSubscribeReceiver.cs index be80809f8..886d57006 100644 --- a/src/Dapr.Messaging/PublishSubscribe/PublishSubscribeReceiver.cs +++ b/src/Dapr.Messaging/PublishSubscribe/PublishSubscribeReceiver.cs @@ -119,7 +119,7 @@ internal async Task SubscribeAsync(CancellationToken cancellationToken = default var fetchMessagesTask = FetchDataFromSidecarAsync(stream, topicMessagesChannel.Writer, cancellationToken); //Process the messages as they're written to either channel - var acknowledgementProcessorTask = ProcessAcknowledgementChannelMessagesAsync(cancellationToken); + var acknowledgementProcessorTask = ProcessAcknowledgementChannelMessagesAsync(stream, cancellationToken); var topicMessageProcessorTask = ProcessTopicChannelMessagesAsync(cancellationToken); try @@ -176,10 +176,10 @@ private async Task AcknowledgeMessageAsync(string messageId, TopicResponseAction /// /// Processes each acknowledgement from the acknowledgement channel reader as it's populated. /// + /// The stream used to interact with the Dapr sidecar. /// Cancellation token. - private async Task ProcessAcknowledgementChannelMessagesAsync(CancellationToken cancellationToken) + private async Task ProcessAcknowledgementChannelMessagesAsync(AsyncDuplexStreamingCall messageStream, CancellationToken cancellationToken) { - var messageStream = await GetStreamAsync(cancellationToken); await foreach (var acknowledgement in acknowledgementsChannel.Reader.ReadAllAsync(cancellationToken)) { await messageStream.RequestStream.WriteAsync(new P.SubscribeTopicEventsRequestAlpha1