Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support custom subscriptions response. #961

Closed
wants to merge 9 commits into from
8 changes: 7 additions & 1 deletion src/Dapr.AspNetCore/DaprEndpointRouteBuilderExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,13 @@ private static IEndpointConventionBuilder CreateSubscribeEndPoint(IEndpointRoute

return subscription;
})
.OrderBy(e => (e.PubsubName, e.Topic));
.OrderBy(e => (e.PubsubName, e.Topic))
.ToList();

if (options != null && options.SubscriptionsCallback != null)
{
await options.SubscriptionsCallback(subscriptions);
}

await context.Response.WriteAsync(JsonSerializer.Serialize(subscriptions,
new JsonSerializerOptions
Expand Down
9 changes: 9 additions & 0 deletions src/Dapr.AspNetCore/SubscribeOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@
// limitations under the License.
// ------------------------------------------------------------------------

using System;
using System.Collections.Generic;
using System.Threading.Tasks;

namespace Dapr
{
/// <summary>
Expand All @@ -22,5 +26,10 @@ public class SubscribeOptions
/// Gets or Sets a value which indicates whether to enable or disable processing raw messages.
/// </summary>
public bool EnableRawPayload { get; set; }

/// <summary>
/// An optional delegate used to configure the subscriptions.
/// </summary>
public Func<List<Subscription>, Task> SubscriptionsCallback { get; set; }
}
}
28 changes: 22 additions & 6 deletions src/Dapr.AspNetCore/Subscription.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ namespace Dapr
/// <summary>
/// This class defines subscribe endpoint response
/// </summary>
internal class Subscription
public class Subscription
{
/// <summary>
/// Gets or sets the topic name.
Expand All @@ -44,7 +44,7 @@ internal class Subscription
/// Gets or sets the metadata.
/// </summary>
public Metadata Metadata { get; set; }

/// <summary>
/// Gets or sets the deadletter topic.
/// </summary>
Expand All @@ -59,10 +59,17 @@ internal class Subscription
/// <summary>
/// This class defines the metadata for subscribe endpoint.
/// </summary>
internal class Metadata : Dictionary<string, string>
public class Metadata : Dictionary<string, string>
{
/// <summary>
/// Initializes a new instance of the Metadata class.
/// </summary>
public Metadata() { }

/// <summary>
/// Initializes a new instance of the Metadata class.
/// </summary>
/// <param name="dictionary"></param>
public Metadata(IDictionary<string, string> dictionary) : base(dictionary) { }

/// <summary>
Expand All @@ -71,7 +78,10 @@ public Metadata(IDictionary<string, string> dictionary) : base(dictionary) { }
internal const string RawPayload = "rawPayload";
}

internal class Routes
/// <summary>
/// This class defines the routes for subscribe endpoint.
/// </summary>
public class Routes
{
/// <summary>
/// Gets or sets the default route
Expand All @@ -84,7 +94,10 @@ internal class Routes
public List<Rule> Rules { get; set; }
}

internal class Rule
/// <summary>
/// This class defines the rule for subscribe endpoint.
/// </summary>
public class Rule
{
/// <summary>
/// Gets or sets the CEL expression to match this route.
Expand All @@ -97,7 +110,10 @@ internal class Rule
public string Path { get; set; }
}

internal class DaprTopicBulkSubscribe
/// <summary>
/// This class defines the bulk subscribe options for subscribe endpoint.
/// </summary>
public class DaprTopicBulkSubscribe
{
/// <summary>
/// Gets or sets whether bulk subscribe option is enabled for a topic.
Expand Down
14 changes: 13 additions & 1 deletion test/Dapr.AspNetCore.IntegrationTest.App/Startup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,19 @@ public void Configure(IApplicationBuilder app, IWebHostEnvironment env)

app.UseEndpoints(endpoints =>
{
endpoints.MapSubscribeHandler();
endpoints.MapSubscribeHandler(new SubscribeOptions()
{
SubscriptionsCallback = subscriptions =>
{
subscriptions.Add(new Subscription()
{
PubsubName = "dynamic-pubsub",
Topic = "dynamic-topic",
Route = "/dynamic-route"
});
return Task.CompletedTask;
}
});
endpoints.MapControllers();

endpoints.MapPost("/topic-a", context => Task.CompletedTask).WithTopic("testpubsub", "A").WithTopic("testpubsub", "A.1");
Expand Down
19 changes: 11 additions & 8 deletions test/Dapr.AspNetCore.IntegrationTest/SubscribeEndpointTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
// limitations under the License.
// ------------------------------------------------------------------------

using System;

namespace Dapr.AspNetCore.IntegrationTest
{
using System;
Expand Down Expand Up @@ -40,9 +42,9 @@ public async Task SubscribeEndpoint_ReportsTopics()
var json = await JsonSerializer.DeserializeAsync<JsonElement>(stream);

json.ValueKind.Should().Be(JsonValueKind.Array);
json.GetArrayLength().Should().Be(18);
json.GetArrayLength().Should().Be(19);

var subscriptions = new List<(string PubsubName, string Topic, string Route, string rawPayload,
var subscriptions = new List<(string PubsubName, string Topic, string Route, string rawPayload,
string match, string metadata, string DeadLetterTopic, string bulkSubscribeMetadata)>();

foreach (var element in json.EnumerateArray())
Expand Down Expand Up @@ -86,7 +88,7 @@ public async Task SubscribeEndpoint_ReportsTopics()

if (element.TryGetProperty("route", out JsonElement route))
{
subscriptions.Add((pubsubName, topic, route.GetString(), rawPayload, string.Empty,
subscriptions.Add((pubsubName, topic, route.GetString(), rawPayload, string.Empty,
originalMetadataString, deadLetterTopic, bulkSubscribeMetadata));
}
else if (element.TryGetProperty("routes", out JsonElement routes))
Expand All @@ -97,13 +99,13 @@ public async Task SubscribeEndpoint_ReportsTopics()
{
var match = rule.GetProperty("match").GetString();
var path = rule.GetProperty("path").GetString();
subscriptions.Add((pubsubName, topic, path, rawPayload, match,
subscriptions.Add((pubsubName, topic, path, rawPayload, match,
originalMetadataString, deadLetterTopic, bulkSubscribeMetadata));
}
}
if (routes.TryGetProperty("default", out JsonElement defaultProperty))
{
subscriptions.Add((pubsubName, topic, defaultProperty.GetString(), rawPayload,
subscriptions.Add((pubsubName, topic, defaultProperty.GetString(), rawPayload,
string.Empty, originalMetadataString, deadLetterTopic, bulkSubscribeMetadata));
}
}
Expand All @@ -119,18 +121,19 @@ public async Task SubscribeEndpoint_ReportsTopics()
subscriptions.Should().Contain(("pubsub", "E", "E", string.Empty, string.Empty, string.Empty, string.Empty, String.Empty));
subscriptions.Should().Contain(("pubsub", "E", "E-Critical", string.Empty, "event.type == \"critical\"", string.Empty, string.Empty, String.Empty));
subscriptions.Should().Contain(("pubsub", "E", "E-Important", string.Empty, "event.type == \"important\"", string.Empty, string.Empty, String.Empty));
subscriptions.Should().Contain(("pubsub", "F", "multiTopicAttr", string.Empty, string.Empty, string.Empty, string.Empty,
subscriptions.Should().Contain(("pubsub", "F", "multiTopicAttr", string.Empty, string.Empty, string.Empty, string.Empty,
"{\"enabled\":true,\"maxMessagesCount\":100,\"maxAwaitDurationMs\":1000}"));
subscriptions.Should().Contain(("pubsub", "F.1", "multiTopicAttr", "true", string.Empty, string.Empty, string.Empty, String.Empty));
subscriptions.Should().Contain(("pubsub", "G", "G", string.Empty, string.Empty, string.Empty, "deadLetterTopicName",
subscriptions.Should().Contain(("pubsub", "G", "G", string.Empty, string.Empty, string.Empty, "deadLetterTopicName",
"{\"enabled\":true,\"maxMessagesCount\":300,\"maxAwaitDurationMs\":1000}"));
subscriptions.Should().Contain(("pubsub", "splitTopicBuilder", "splitTopics", string.Empty, string.Empty, string.Empty, string.Empty, String.Empty));
subscriptions.Should().Contain(("pubsub", "splitTopicAttr", "splitTopics", "true", string.Empty, string.Empty, string.Empty, String.Empty));
subscriptions.Should().Contain(("pubsub", "metadata", "multiMetadataTopicAttr", string.Empty, string.Empty, "n1=v1;n2=v2,v3", string.Empty, String.Empty));
subscriptions.Should().Contain(("pubsub", "metadata.1", "multiMetadataTopicAttr", "true", string.Empty, "n1=v1", string.Empty,
subscriptions.Should().Contain(("pubsub", "metadata.1", "multiMetadataTopicAttr", "true", string.Empty, "n1=v1", string.Empty,
"{\"enabled\":true,\"maxMessagesCount\":500,\"maxAwaitDurationMs\":2000}"));
subscriptions.Should().Contain(("pubsub", "splitMetadataTopicBuilder", "splitMetadataTopics", string.Empty, string.Empty, "n1=v1;n2=v1", string.Empty, String.Empty));
subscriptions.Should().Contain(("pubsub", "metadataseparatorbyemptytring", "topicmetadataseparatorattrbyemptytring", string.Empty, string.Empty, "n1=v1,", string.Empty, String.Empty));
subscriptions.Should().Contain(("dynamic-pubsub", "dynamic-topic", "/dynamic-route", string.Empty, string.Empty, string.Empty, string.Empty, string.Empty));
// Test priority route sorting
var eTopic = subscriptions.FindAll(e => e.Topic == "E");
eTopic.Count.Should().Be(3);
Expand Down