diff --git a/Directory.Packages.props b/Directory.Packages.props index 4a9c47ad4..e5e60cd7b 100644 --- a/Directory.Packages.props +++ b/Directory.Packages.props @@ -1,51 +1,51 @@ - - true - true - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + + true + true + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/all.sln b/all.sln index 6e55f247b..3b9959902 100644 --- a/all.sln +++ b/all.sln @@ -119,6 +119,11 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Dapr.Common", "src\Dapr.Com EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Dapr.Common.Test", "test\Dapr.Common.Test\Dapr.Common.Test.csproj", "{CDB47863-BEBD-4841-A807-46D868962521}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Dapr.Messaging.Test", "test\Dapr.Messaging.Test\Dapr.Messaging.Test.csproj", "{4E04EB35-7FD2-4FDB-B09A-F75CE24053B9}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Dapr.Messaging", "src\Dapr.Messaging\Dapr.Messaging.csproj", "{0EAE36A1-B578-4F13-A113-7A477ECA1BDA}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "StreamingSubscriptionExample", "examples\Client\PublishSubscribe\StreamingSubscriptionExample\StreamingSubscriptionExample.csproj", "{290D1278-F613-4DF3-9DF5-F37E38CDC363}" Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Dapr.Jobs", "src\Dapr.Jobs\Dapr.Jobs.csproj", "{C8BB6A85-A7EA-40C0-893D-F36F317829B3}" EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Dapr.Jobs.Test", "test\Dapr.Jobs.Test\Dapr.Jobs.Test.csproj", "{BF9828E9-5597-4D42-AA6E-6E6C12214204}" @@ -311,6 +316,18 @@ Global {CDB47863-BEBD-4841-A807-46D868962521}.Debug|Any CPU.Build.0 = Debug|Any CPU {CDB47863-BEBD-4841-A807-46D868962521}.Release|Any CPU.ActiveCfg = Release|Any CPU {CDB47863-BEBD-4841-A807-46D868962521}.Release|Any CPU.Build.0 = Release|Any CPU + {4E04EB35-7FD2-4FDB-B09A-F75CE24053B9}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {4E04EB35-7FD2-4FDB-B09A-F75CE24053B9}.Debug|Any CPU.Build.0 = Debug|Any CPU + {4E04EB35-7FD2-4FDB-B09A-F75CE24053B9}.Release|Any CPU.ActiveCfg = Release|Any CPU + {4E04EB35-7FD2-4FDB-B09A-F75CE24053B9}.Release|Any CPU.Build.0 = Release|Any CPU + {0EAE36A1-B578-4F13-A113-7A477ECA1BDA}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {0EAE36A1-B578-4F13-A113-7A477ECA1BDA}.Debug|Any CPU.Build.0 = Debug|Any CPU + {0EAE36A1-B578-4F13-A113-7A477ECA1BDA}.Release|Any CPU.ActiveCfg = Release|Any CPU + {0EAE36A1-B578-4F13-A113-7A477ECA1BDA}.Release|Any CPU.Build.0 = Release|Any CPU + {290D1278-F613-4DF3-9DF5-F37E38CDC363}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {290D1278-F613-4DF3-9DF5-F37E38CDC363}.Debug|Any CPU.Build.0 = Debug|Any CPU + {290D1278-F613-4DF3-9DF5-F37E38CDC363}.Release|Any CPU.ActiveCfg = Release|Any CPU + {290D1278-F613-4DF3-9DF5-F37E38CDC363}.Release|Any CPU.Build.0 = Release|Any CPU {C8BB6A85-A7EA-40C0-893D-F36F317829B3}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {C8BB6A85-A7EA-40C0-893D-F36F317829B3}.Debug|Any CPU.Build.0 = Debug|Any CPU {C8BB6A85-A7EA-40C0-893D-F36F317829B3}.Release|Any CPU.ActiveCfg = Release|Any CPU @@ -379,6 +396,9 @@ Global {DFBABB04-50E9-42F6-B470-310E1B545638} = {27C5D71D-0721-4221-9286-B94AB07B58CF} {B445B19C-A925-4873-8CB7-8317898B6970} = {27C5D71D-0721-4221-9286-B94AB07B58CF} {CDB47863-BEBD-4841-A807-46D868962521} = {DD020B34-460F-455F-8D17-CF4A949F100B} + {4E04EB35-7FD2-4FDB-B09A-F75CE24053B9} = {DD020B34-460F-455F-8D17-CF4A949F100B} + {0EAE36A1-B578-4F13-A113-7A477ECA1BDA} = {27C5D71D-0721-4221-9286-B94AB07B58CF} + {290D1278-F613-4DF3-9DF5-F37E38CDC363} = {0EF6EA64-D7C3-420D-9890-EAE8D54A57E6} {C8BB6A85-A7EA-40C0-893D-F36F317829B3} = {27C5D71D-0721-4221-9286-B94AB07B58CF} {BF9828E9-5597-4D42-AA6E-6E6C12214204} = {DD020B34-460F-455F-8D17-CF4A949F100B} {D9697361-232F-465D-A136-4561E0E88488} = {D687DDC4-66C5-4667-9E3A-FD8B78ECAA78} diff --git a/examples/Client/PublishSubscribe/StreamingSubscriptionExample/Program.cs b/examples/Client/PublishSubscribe/StreamingSubscriptionExample/Program.cs new file mode 100644 index 000000000..ac00d8798 --- /dev/null +++ b/examples/Client/PublishSubscribe/StreamingSubscriptionExample/Program.cs @@ -0,0 +1,35 @@ +using System.Text; +using Dapr.Messaging.PublishSubscribe; +using Dapr.Messaging.PublishSubscribe.Extensions; + +var builder = WebApplication.CreateBuilder(args); +builder.Services.AddDaprPubSubClient(); +var app = builder.Build(); + +//Process each message returned from the subscription +Task HandleMessageAsync(TopicMessage message, CancellationToken cancellationToken = default) +{ + try + { + //Do something with the message + Console.WriteLine(Encoding.UTF8.GetString(message.Data.Span)); + return Task.FromResult(TopicResponseAction.Success); + } + catch + { + return Task.FromResult(TopicResponseAction.Retry); + } +} + +var messagingClient = app.Services.GetRequiredService(); + +//Create a dynamic streaming subscription and subscribe with a timeout of 30 seconds and 10 seconds for message handling +var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(30)); +var subscription = await messagingClient.SubscribeAsync("pubsub", "myTopic", + new DaprSubscriptionOptions(new MessageHandlingPolicy(TimeSpan.FromSeconds(10), TopicResponseAction.Retry)), + HandleMessageAsync, cancellationTokenSource.Token); + +await Task.Delay(TimeSpan.FromMinutes(1)); + +//When you're done with the subscription, simply dispose of it +await subscription.DisposeAsync(); diff --git a/examples/Client/PublishSubscribe/StreamingSubscriptionExample/StreamingSubscriptionExample.csproj b/examples/Client/PublishSubscribe/StreamingSubscriptionExample/StreamingSubscriptionExample.csproj new file mode 100644 index 000000000..4ad620d00 --- /dev/null +++ b/examples/Client/PublishSubscribe/StreamingSubscriptionExample/StreamingSubscriptionExample.csproj @@ -0,0 +1,14 @@ + + + + Exe + net6.0 + enable + enable + + + + + + + diff --git a/src/Dapr.Messaging/Dapr.Messaging.csproj b/src/Dapr.Messaging/Dapr.Messaging.csproj new file mode 100644 index 000000000..0c7f159a4 --- /dev/null +++ b/src/Dapr.Messaging/Dapr.Messaging.csproj @@ -0,0 +1,22 @@ + + + + This package contains the reference assemblies for developing messaging services using Dapr. + enable + enable + Dapr.Messaging + Dapr Messaging SDK + Dapr Messaging SDK for building applications that utilize messaging components. + alpha + + + + + + + + + + + + diff --git a/src/Dapr.Messaging/PublishSubscribe/DaprPublishSubscribeClient.cs b/src/Dapr.Messaging/PublishSubscribe/DaprPublishSubscribeClient.cs new file mode 100644 index 000000000..8fbec2dfe --- /dev/null +++ b/src/Dapr.Messaging/PublishSubscribe/DaprPublishSubscribeClient.cs @@ -0,0 +1,31 @@ +// ------------------------------------------------------------------------ +// Copyright 2024 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +namespace Dapr.Messaging.PublishSubscribe; + +/// +/// The base implementation of a Dapr pub/sub client. +/// +public abstract class DaprPublishSubscribeClient +{ + /// + /// Dynamically subscribes to a Publish/Subscribe component and topic. + /// + /// The name of the Publish/Subscribe component. + /// The name of the topic to subscribe to. + /// Configuration options. + /// The delegate reflecting the action to take upon messages received by the subscription. + /// Cancellation token. + /// + public abstract Task SubscribeAsync(string pubSubName, string topicName, DaprSubscriptionOptions options, TopicMessageHandler messageHandler, CancellationToken cancellationToken = default); +} diff --git a/src/Dapr.Messaging/PublishSubscribe/DaprPublishSubscribeClientBuilder.cs b/src/Dapr.Messaging/PublishSubscribe/DaprPublishSubscribeClientBuilder.cs new file mode 100644 index 000000000..b94bc5cdf --- /dev/null +++ b/src/Dapr.Messaging/PublishSubscribe/DaprPublishSubscribeClientBuilder.cs @@ -0,0 +1,47 @@ +// ------------------------------------------------------------------------ +// Copyright 2024 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +using Dapr.Common; +using Microsoft.Extensions.Configuration; +using Autogenerated = Dapr.Client.Autogen.Grpc.v1; + +namespace Dapr.Messaging.PublishSubscribe; + +/// +/// Builds a . +/// +public sealed class DaprPublishSubscribeClientBuilder : DaprGenericClientBuilder +{ + /// + /// Used to initialize a new instance of the . + /// + /// An optional instance of . + public DaprPublishSubscribeClientBuilder(IConfiguration? configuration = null) : base(configuration) + { + } + + /// + /// Builds the client instance from the properties of the builder. + /// + /// The Dapr client instance. + /// + /// Builds the client instance from the properties of the builder. + /// + public override DaprPublishSubscribeClient Build() + { + var daprClientDependencies = BuildDaprClientDependencies(); + var client = new Autogenerated.Dapr.DaprClient(daprClientDependencies.channel); + + return new DaprPublishSubscribeGrpcClient(client); + } +} diff --git a/src/Dapr.Messaging/PublishSubscribe/DaprPublishSubscribeGrpcClient.cs b/src/Dapr.Messaging/PublishSubscribe/DaprPublishSubscribeGrpcClient.cs new file mode 100644 index 000000000..df6ccdcfe --- /dev/null +++ b/src/Dapr.Messaging/PublishSubscribe/DaprPublishSubscribeGrpcClient.cs @@ -0,0 +1,49 @@ +// ------------------------------------------------------------------------ +// Copyright 2024 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +using P = Dapr.Client.Autogen.Grpc.v1.Dapr; + +namespace Dapr.Messaging.PublishSubscribe; + +/// +/// A client for interacting with the Dapr endpoints. +/// +internal sealed class DaprPublishSubscribeGrpcClient : DaprPublishSubscribeClient +{ + private readonly P.DaprClient daprClient; + + /// + /// Creates a new instance of a + /// + public DaprPublishSubscribeGrpcClient(P.DaprClient client) + { + daprClient = client; + } + + /// + /// Dynamically subscribes to a Publish/Subscribe component and topic. + /// + /// The name of the Publish/Subscribe component. + /// The name of the topic to subscribe to. + /// Configuration options. + /// The delegate reflecting the action to take upon messages received by the subscription. + /// Cancellation token. + /// + public override async Task SubscribeAsync(string pubSubName, string topicName, DaprSubscriptionOptions options, TopicMessageHandler messageHandler, CancellationToken cancellationToken = default) + { + var receiver = new PublishSubscribeReceiver(pubSubName, topicName, options, messageHandler, daprClient); + await receiver.SubscribeAsync(cancellationToken); + return receiver; + } +} + diff --git a/src/Dapr.Messaging/PublishSubscribe/DaprSubscriptionOptions.cs b/src/Dapr.Messaging/PublishSubscribe/DaprSubscriptionOptions.cs new file mode 100644 index 000000000..73838b605 --- /dev/null +++ b/src/Dapr.Messaging/PublishSubscribe/DaprSubscriptionOptions.cs @@ -0,0 +1,44 @@ +// ------------------------------------------------------------------------ +// Copyright 2024 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +namespace Dapr.Messaging.PublishSubscribe; + +/// +/// Options used to configure the dynamic Dapr subscription. +/// +/// Describes the policy to take on messages that have not been acknowledged within the timeout period. +public sealed record DaprSubscriptionOptions(MessageHandlingPolicy MessageHandlingPolicy) +{ + /// + /// Subscription metadata. + /// + public IReadOnlyDictionary Metadata { get; init; } = new Dictionary(); + + /// + /// The optional name of the dead-letter topic to send unprocessed messages to. + /// + public string? DeadLetterTopic { get; init; } + + /// + /// If populated, this reflects the maximum number of messages that can be queued for processing on the replica. By default, + /// no maximum boundary is enforced. + /// + public int? MaximumQueuedMessages { get; init; } + + /// + /// The maximum amount of time to take to dispose of acknowledgement messages after the cancellation token has + /// been signaled. + /// + public TimeSpan MaximumCleanupTimeout { get; init; } = TimeSpan.FromSeconds(30); +} + diff --git a/src/Dapr.Messaging/PublishSubscribe/Extensions/PublishSubscribeServiceCollectionExtensions.cs b/src/Dapr.Messaging/PublishSubscribe/Extensions/PublishSubscribeServiceCollectionExtensions.cs new file mode 100644 index 000000000..bc60c5880 --- /dev/null +++ b/src/Dapr.Messaging/PublishSubscribe/Extensions/PublishSubscribeServiceCollectionExtensions.cs @@ -0,0 +1,38 @@ +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.DependencyInjection.Extensions; + +namespace Dapr.Messaging.PublishSubscribe.Extensions; + +/// +/// Contains extension methods for using Dapr Publish/Subscribe with dependency injection. +/// +public static class PublishSubscribeServiceCollectionExtensions +{ + /// + /// Adds Dapr Publish/Subscribe support to the service collection. + /// + /// The . + /// Optionally allows greater configuration of the using injected services. + /// + public static IServiceCollection AddDaprPubSubClient(this IServiceCollection services, Action? configure = null) + { + ArgumentNullException.ThrowIfNull(services, nameof(services)); + + //Register the IHttpClientFactory implementation + services.AddHttpClient(); + + services.TryAddSingleton(serviceProvider => + { + var httpClientFactory = serviceProvider.GetRequiredService(); + + var builder = new DaprPublishSubscribeClientBuilder(); + builder.UseHttpClientFactory(httpClientFactory); + + configure?.Invoke(serviceProvider, builder); + + return builder.Build(); + }); + + return services; + } +} diff --git a/src/Dapr.Messaging/PublishSubscribe/MessageHandlingPolicy.cs b/src/Dapr.Messaging/PublishSubscribe/MessageHandlingPolicy.cs new file mode 100644 index 000000000..de6882095 --- /dev/null +++ b/src/Dapr.Messaging/PublishSubscribe/MessageHandlingPolicy.cs @@ -0,0 +1,23 @@ +// ------------------------------------------------------------------------ +// Copyright 2024 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + + +namespace Dapr.Messaging.PublishSubscribe; + +/// +/// Defines the policy for handling streaming message subscriptions, including retry logic and timeout settings. +/// +/// The duration to wait before timing out a message handling operation. +/// The default action to take when a message handling operation times out. +public sealed record MessageHandlingPolicy(TimeSpan TimeoutDuration, TopicResponseAction DefaultResponseAction); + diff --git a/src/Dapr.Messaging/PublishSubscribe/PublishSubscribeReceiver.cs b/src/Dapr.Messaging/PublishSubscribe/PublishSubscribeReceiver.cs new file mode 100644 index 000000000..886d57006 --- /dev/null +++ b/src/Dapr.Messaging/PublishSubscribe/PublishSubscribeReceiver.cs @@ -0,0 +1,313 @@ +// ------------------------------------------------------------------------ +// Copyright 2024 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +using System.Threading.Channels; +using Dapr.AppCallback.Autogen.Grpc.v1; +using Grpc.Core; +using P = Dapr.Client.Autogen.Grpc.v1; + +namespace Dapr.Messaging.PublishSubscribe; + +/// +/// A thread-safe implementation of a receiver for messages from a specified Dapr publish/subscribe component and +/// topic. +/// +internal sealed class PublishSubscribeReceiver : IAsyncDisposable +{ + /// + /// Provides options for the unbounded channel. + /// + private readonly static UnboundedChannelOptions UnboundedChannelOptions = new() + { + SingleWriter = true, SingleReader = true + }; + + /// + /// The name of the Dapr publish/subscribe component. + /// + private readonly string pubSubName; + /// + /// The name of the topic to subscribe to. + /// + private readonly string topicName; + /// + /// Options allowing the behavior of the receiver to be configured. + /// + private readonly DaprSubscriptionOptions options; + /// + /// A channel used to decouple the messages received from the sidecar to their consumption. + /// + private readonly Channel topicMessagesChannel; + /// + /// Maintains the various acknowledgements for each message. + /// + private readonly Channel acknowledgementsChannel = Channel.CreateUnbounded(UnboundedChannelOptions); + /// + /// The stream connection between this instance and the Dapr sidecar. + /// + private AsyncDuplexStreamingCall? clientStream; + /// + /// Used to ensure thread-safe operations against the stream. + /// + private readonly SemaphoreSlim semaphore = new(1, 1); + /// + /// The handler delegate responsible for processing the topic messages. + /// + private readonly TopicMessageHandler messageHandler; + /// + /// A reference to the DaprClient instance. + /// + private readonly P.Dapr.DaprClient client; + /// + /// Flag that prevents the developer from accidentally initializing the subscription more than once from the same receiver. + /// + private int hasInitialized; + /// + /// Flag that ensures the instance is only disposed a single time. + /// + private bool isDisposed; + + /// + /// Constructs a new instance of a instance. + /// + /// The name of the Dapr Publish/Subscribe component. + /// The name of the topic to subscribe to. + /// Options allowing the behavior of the receiver to be configured. + /// The delegate reflecting the action to take upon messages received by the subscription. + /// A reference to the DaprClient instance. + internal PublishSubscribeReceiver(string pubSubName, string topicName, DaprSubscriptionOptions options, TopicMessageHandler handler, P.Dapr.DaprClient client) + { + this.client = client; + this.pubSubName = pubSubName; + this.topicName = topicName; + this.options = options; + this.messageHandler = handler; + topicMessagesChannel = options.MaximumQueuedMessages is > 0 + ? Channel.CreateBounded(new BoundedChannelOptions((int)options.MaximumQueuedMessages) + { + SingleReader = true, SingleWriter = true, FullMode = BoundedChannelFullMode.Wait + }) + : Channel.CreateUnbounded(UnboundedChannelOptions); + } + + /// + /// Dynamically subscribes to messages on a PubSub topic provided by the Dapr sidecar. + /// + /// Cancellation token. + /// An containing messages provided by the sidecar. + internal async Task SubscribeAsync(CancellationToken cancellationToken = default) + { + //Prevents the receiver from performing the subscribe operation more than once (as the multiple initialization messages would cancel the stream). + if (Interlocked.Exchange(ref hasInitialized, 1) == 1) + { + return; + } + + var stream = await GetStreamAsync(cancellationToken); + + //Retrieve the messages from the sidecar and write to the messages channel + var fetchMessagesTask = FetchDataFromSidecarAsync(stream, topicMessagesChannel.Writer, cancellationToken); + + //Process the messages as they're written to either channel + var acknowledgementProcessorTask = ProcessAcknowledgementChannelMessagesAsync(stream, cancellationToken); + var topicMessageProcessorTask = ProcessTopicChannelMessagesAsync(cancellationToken); + + try + { + await Task.WhenAll(fetchMessagesTask, acknowledgementProcessorTask, topicMessageProcessorTask); + } + catch (OperationCanceledException) + { + // Will be cleaned up during DisposeAsync + } + } + + /// + /// Retrieves or creates the bidirectional stream to the DaprClient for transacting pub/sub subscriptions. + /// + /// Cancellation token. + /// The stream connection. + private async Task> GetStreamAsync(CancellationToken cancellationToken) + { + await semaphore.WaitAsync(cancellationToken); + + try + { + return clientStream ??= client.SubscribeTopicEventsAlpha1(cancellationToken: cancellationToken); + } + finally + { + semaphore.Release(); + } + } + + /// + /// Acknowledges the indicated message back to the Dapr sidecar with an indicated behavior to take on the message. + /// + /// The identifier of the message the behavior is in reference to. + /// The behavior to take on the message as indicated by either the message handler or timeout message handling configuration. + /// Cancellation token. + /// + private async Task AcknowledgeMessageAsync(string messageId, TopicResponseAction behavior, CancellationToken cancellationToken) + { + var action = behavior switch + { + TopicResponseAction.Success => TopicEventResponse.Types.TopicEventResponseStatus.Success, + TopicResponseAction.Retry => TopicEventResponse.Types.TopicEventResponseStatus.Retry, + TopicResponseAction.Drop => TopicEventResponse.Types.TopicEventResponseStatus.Drop, + _ => throw new InvalidOperationException( + $"Unrecognized topic acknowledgement action: {behavior}") + }; + + var acknowledgement = new TopicAcknowledgement(messageId, action); + await acknowledgementsChannel.Writer.WriteAsync(acknowledgement, cancellationToken); + } + + /// + /// Processes each acknowledgement from the acknowledgement channel reader as it's populated. + /// + /// The stream used to interact with the Dapr sidecar. + /// Cancellation token. + private async Task ProcessAcknowledgementChannelMessagesAsync(AsyncDuplexStreamingCall messageStream, CancellationToken cancellationToken) + { + await foreach (var acknowledgement in acknowledgementsChannel.Reader.ReadAllAsync(cancellationToken)) + { + await messageStream.RequestStream.WriteAsync(new P.SubscribeTopicEventsRequestAlpha1 + { + EventProcessed = new P.SubscribeTopicEventsRequestProcessedAlpha1 + { + Id = acknowledgement.MessageId, + Status = new TopicEventResponse { Status = acknowledgement.Action } + } + }, cancellationToken); + } + } + + /// + /// Processes each topic messages from the channel as it's populated. + /// + /// Cancellation token. + private async Task ProcessTopicChannelMessagesAsync(CancellationToken cancellationToken) + { + await foreach (var message in topicMessagesChannel.Reader.ReadAllAsync(cancellationToken)) + { + using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + cts.CancelAfter(options.MessageHandlingPolicy.TimeoutDuration); + + //Evaluate the message and return an acknowledgement result + var messageAction = await messageHandler(message, cts.Token); + + try + { + //Share the result with the sidecar + await AcknowledgeMessageAsync(message.Id, messageAction, cancellationToken); + } + catch (OperationCanceledException) + { + //Acknowledge the message using the configured default response action + await AcknowledgeMessageAsync(message.Id, options.MessageHandlingPolicy.DefaultResponseAction, cancellationToken); + } + } + } + + /// + /// Retrieves the subscription stream data from the Dapr sidecar. + /// + /// The stream connection to and from the Dapr sidecar instance. + /// The channel writer instance. + /// Cancellation token. + private async Task FetchDataFromSidecarAsync( + AsyncDuplexStreamingCall stream, + ChannelWriter channelWriter, CancellationToken cancellationToken) + { + //Build out the initial topic events request + var initialRequest = new P.SubscribeTopicEventsRequestInitialAlpha1() + { + PubsubName = pubSubName, DeadLetterTopic = options.DeadLetterTopic ?? string.Empty, Topic = topicName + }; + + if (options.Metadata.Count > 0) + { + foreach (var (key, value) in options.Metadata) + { + initialRequest.Metadata.Add(key, value); + } + } + + //Send this request to the Dapr sidecar + await stream.RequestStream.WriteAsync( + new P.SubscribeTopicEventsRequestAlpha1 { InitialRequest = initialRequest }, cancellationToken); + + //Each time a message is received from the stream, push it into the topic messages channel + await foreach (var response in stream.ResponseStream.ReadAllAsync(cancellationToken)) + { + var message = + new TopicMessage(response.EventMessage.Id, response.EventMessage.Source, response.EventMessage.Type, + response.EventMessage.SpecVersion, response.EventMessage.DataContentType, + response.EventMessage.Topic, response.EventMessage.PubsubName) + { + Path = response.EventMessage.Path, + Extensions = response.EventMessage.Extensions.Fields.ToDictionary(f => f.Key, kvp => kvp.Value) + }; + + try + { + await channelWriter.WaitToWriteAsync(cancellationToken); + await channelWriter.WriteAsync(message, cancellationToken); + } + catch (Exception) + { + // Handle being unable to write because the writer is completed due to an active DisposeAsync operation + } + } + } + + /// + /// Disposes the various resources associated with the instance. + /// + /// + public async ValueTask DisposeAsync() + { + if (isDisposed) + { + return; + } + + isDisposed = true; + + //Stop processing new events - we'll leave any messages yet unseen as unprocessed and + //Dapr will handle as necessary when they're not acknowledged + topicMessagesChannel.Writer.Complete(); + + //Flush the remaining acknowledgements, but start by marking the writer as complete so it doesn't receive any other messages either + acknowledgementsChannel.Writer.Complete(); + + try + { + //Process any remaining acknowledgements on the channel without exceeding the configured maximum clean up time + await acknowledgementsChannel.Reader.Completion.WaitAsync(options.MaximumCleanupTimeout); + } + catch (OperationCanceledException) + { + //Handled + } + } + + /// + /// Reflects the action to take on a given message identifier. + /// + /// The identifier of the message. + /// The action to take on the message in the acknowledgement request. + private sealed record TopicAcknowledgement(string MessageId, TopicEventResponse.Types.TopicEventResponseStatus Action); +} + diff --git a/src/Dapr.Messaging/PublishSubscribe/TopicMessage.cs b/src/Dapr.Messaging/PublishSubscribe/TopicMessage.cs new file mode 100644 index 000000000..402a89e9f --- /dev/null +++ b/src/Dapr.Messaging/PublishSubscribe/TopicMessage.cs @@ -0,0 +1,46 @@ +// ------------------------------------------------------------------------ +// Copyright 2024 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +using Google.Protobuf.WellKnownTypes; + +namespace Dapr.Messaging.PublishSubscribe; + +/// +/// A message retrieved from a Dapr publish/subscribe topic. +/// +/// The unique identifier of the topic message. +/// Identifies the context in which an event happened, such as the organization publishing the +/// event or the process that produced the event. The exact syntax and semantics behind the data +/// encoded in the URI is defined by the event producer. +/// The type of event related to the originating occurrence. +/// The spec version of the CloudEvents specification. +/// The content type of the data. +/// The name of the topic. +/// The name of the Dapr publish/subscribe component. +public sealed record TopicMessage(string Id, string Source, string Type, string SpecVersion, string DataContentType, string Topic, string PubSubName) +{ + /// + /// The content of the event. + /// + public ReadOnlyMemory Data { get; init; } + + /// + /// The matching path from the topic subscription/routes (if specified) for this event. + /// + public string? Path { get; init; } + + /// + /// A map of additional custom properties sent to the app. These are considered to be CloudEvent extensions. + /// + public IReadOnlyDictionary Extensions { get; init; } = new Dictionary(); +} diff --git a/src/Dapr.Messaging/PublishSubscribe/TopicMessageHandler.cs b/src/Dapr.Messaging/PublishSubscribe/TopicMessageHandler.cs new file mode 100644 index 000000000..65b7abf01 --- /dev/null +++ b/src/Dapr.Messaging/PublishSubscribe/TopicMessageHandler.cs @@ -0,0 +1,23 @@ +// ------------------------------------------------------------------------ +// Copyright 2024 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +namespace Dapr.Messaging.PublishSubscribe; + +/// +/// The handler delegate responsible for processing the topic message. +/// +/// The message request to process. +/// Cancellation token. +/// The acknowledgement behavior to report back to the pub/sub endpoint about the message. +public delegate Task TopicMessageHandler(TopicMessage request, + CancellationToken cancellationToken = default); diff --git a/src/Dapr.Messaging/PublishSubscribe/TopicResponseAction.cs b/src/Dapr.Messaging/PublishSubscribe/TopicResponseAction.cs new file mode 100644 index 000000000..5a34f4cc2 --- /dev/null +++ b/src/Dapr.Messaging/PublishSubscribe/TopicResponseAction.cs @@ -0,0 +1,34 @@ +// ------------------------------------------------------------------------ +// Copyright 2024 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +namespace Dapr.Messaging.PublishSubscribe; + +/// +/// Describes the various actions that can be taken on a topic message. +/// +public enum TopicResponseAction +{ + /// + /// Indicates the message was processed successfully and should be deleted from the pub/sub topic. + /// + Success, + /// + /// Indicates a failure while processing the message and that the message should be resent for a retry. + /// + Retry, + /// + /// Indicates a failure while processing the message and that the message should be dropped or sent to the + /// dead-letter topic if specified. + /// + Drop +} diff --git a/test/Dapr.Common.Test/DaprGenericClientBuilderTest.cs b/test/Dapr.Common.Test/DaprGenericClientBuilderTest.cs new file mode 100644 index 000000000..d28b40058 --- /dev/null +++ b/test/Dapr.Common.Test/DaprGenericClientBuilderTest.cs @@ -0,0 +1,94 @@ +using System; +using System.Text.Json; +using Xunit; + +namespace Dapr.Common.Test; + +public class DaprGenericClientBuilderTest +{ + [Fact] + public void DaprGenericClientBuilder_ShouldUpdateHttpEndpoint_WhenHttpEndpointIsProvided() + { + // Arrange + var builder = new SampleDaprGenericClientBuilder(); + const string endpointValue = "http://sample-endpoint"; + + // Act + builder.UseHttpEndpoint(endpointValue); + + // Assert + Assert.Equal(endpointValue, builder.HttpEndpoint); + } + + [Fact] + public void DaprGenericClientBuilder_ShouldUpdateHttpEndpoint_WhenGrpcEndpointIsProvided() + { + // Arrange + var builder = new SampleDaprGenericClientBuilder(); + const string endpointValue = "http://sample-endpoint"; + + // Act + builder.UseGrpcEndpoint(endpointValue); + + // Assert + Assert.Equal(endpointValue, builder.GrpcEndpoint); + } + + [Fact] + public void DaprGenericClientBuilder_ShouldUpdateJsonSerializerOptions() + { + // Arrange + const int maxDepth = 8; + const bool writeIndented = true; + var builder = new SampleDaprGenericClientBuilder(); + var options = new JsonSerializerOptions + { + WriteIndented = writeIndented, + MaxDepth = maxDepth + }; + + // Act + builder.UseJsonSerializationOptions(options); + + // Assert + Assert.Equal(writeIndented, builder.JsonSerializerOptions.WriteIndented); + Assert.Equal(maxDepth, builder.JsonSerializerOptions.MaxDepth); + } + + [Fact] + public void DaprGenericClientBuilder_ShouldUpdateDaprApiToken() + { + // Arrange + const string apiToken = "abc123"; + var builder = new SampleDaprGenericClientBuilder(); + + // Act + builder.UseDaprApiToken(apiToken); + + // Assert + Assert.Equal(apiToken, builder.DaprApiToken); + } + + [Fact] + public void DaprGenericClientBuilder_ShouldUpdateTimeout() + { + // Arrange + var timeout = new TimeSpan(4, 2, 1, 2); + var builder = new SampleDaprGenericClientBuilder(); + + // Act + builder.UseTimeout(timeout); + + // Assert + Assert.Equal(timeout, builder.Timeout); + } + + private class SampleDaprGenericClientBuilder : DaprGenericClientBuilder + { + public override SampleDaprGenericClientBuilder Build() + { + // Implementation + throw new NotImplementedException(); + } + } +} diff --git a/test/Dapr.Messaging.Test/Dapr.Messaging.Test.csproj b/test/Dapr.Messaging.Test/Dapr.Messaging.Test.csproj new file mode 100644 index 000000000..8f39e1713 --- /dev/null +++ b/test/Dapr.Messaging.Test/Dapr.Messaging.Test.csproj @@ -0,0 +1,42 @@ + + + + enable + enable + false + true + + + + + runtime; build; native; contentfiles; analyzers; buildtransitive + all + + + + + + + all + runtime; build; native; contentfiles; analyzers; buildtransitive + + + + + + + + + + + + + + + + + + + + + diff --git a/test/Dapr.Messaging.Test/PublishSubscribe/MessageHandlingPolicyTest.cs b/test/Dapr.Messaging.Test/PublishSubscribe/MessageHandlingPolicyTest.cs new file mode 100644 index 000000000..0efb5e879 --- /dev/null +++ b/test/Dapr.Messaging.Test/PublishSubscribe/MessageHandlingPolicyTest.cs @@ -0,0 +1,55 @@ +using Dapr.Messaging.PublishSubscribe; + +namespace Dapr.Messaging.Test.PublishSubscribe +{ + public class MessageHandlingPolicyTest + { + [Fact] + public void Test_MessageHandlingPolicy_Constructor() + { + var timeoutDuration = TimeSpan.FromMilliseconds(2000); + const TopicResponseAction defaultResponseAction = TopicResponseAction.Drop; + + var policy = new MessageHandlingPolicy(timeoutDuration, defaultResponseAction); + + Assert.Equal(timeoutDuration, policy.TimeoutDuration); + Assert.Equal(defaultResponseAction, policy.DefaultResponseAction); + } + + [Fact] + public void Test_MessageHandlingPolicy_Equality() + { + var timeSpan1 = TimeSpan.FromMilliseconds(1000); + var timeSpan2 = TimeSpan.FromMilliseconds(2000); + + var policy1 = new MessageHandlingPolicy(timeSpan1, TopicResponseAction.Success); + var policy2 = new MessageHandlingPolicy(timeSpan1, TopicResponseAction.Success); + var policy3 = new MessageHandlingPolicy(timeSpan2, TopicResponseAction.Retry); + + Assert.Equal(policy1, policy2); // Value Equality + Assert.NotEqual(policy1, policy3); // Different values + } + + [Fact] + public void Test_MessageHandlingPolicy_Immutability() + { + var timeoutDuration = TimeSpan.FromMilliseconds(2000); + const TopicResponseAction defaultResponseAction = TopicResponseAction.Drop; + + var policy1 = new MessageHandlingPolicy(timeoutDuration, defaultResponseAction); + + var newTimeoutDuration = TimeSpan.FromMilliseconds(3000); + const TopicResponseAction newDefaultResponseAction = TopicResponseAction.Retry; + + // Creating a new policy with different values. + var policy2 = policy1 with + { + TimeoutDuration = newTimeoutDuration, DefaultResponseAction = newDefaultResponseAction + }; + + // Asserting that original policy is unaffected by changes made to new policy. + Assert.Equal(timeoutDuration, policy1.TimeoutDuration); + Assert.Equal(defaultResponseAction, policy1.DefaultResponseAction); + } + } +} diff --git a/test/Dapr.Messaging.Test/protos/test.proto b/test/Dapr.Messaging.Test/protos/test.proto new file mode 100644 index 000000000..b1c1ad8a9 --- /dev/null +++ b/test/Dapr.Messaging.Test/protos/test.proto @@ -0,0 +1,32 @@ +// ------------------------------------------------------------------------ +// Copyright 2021 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +syntax = "proto3"; + +option csharp_namespace = "Dapr.Client.Autogen.Test.Grpc.v1"; + +message TestRun { + repeated TestCase tests = 1; +} + +message TestCase { + string name = 1; +} + +message Request { + string RequestParameter = 1; +} + +message Response { + string Name = 1; +} \ No newline at end of file