Skip to content
This repository has been archived by the owner on Sep 17, 2023. It is now read-only.

Commit

Permalink
Use SubscriptionId to dispatch messages to handlers internally, fixes
Browse files Browse the repository at this point in the history
…#47 (#48)

* Add test demonstrating receiving messages meant for other subs #47

* Only dispatch messages intended for our subscription #47

Use SubscriptionId to dispatch messages to handlers internally,
rather than matching on the subject.

If multiple subscriptions existed that had overlapping subjects,
either through wildcards or explicit matches, then each message
received for each subscription would get dispatched to every
subscription (N^2).

Given:
    - sidA = Sub("a.b", OnX)
    - sidB = Sub("a.*", OnY)
    - Pub("a.b", ...)

MyNatsClient would receive:
    - sidA a.b <payload>
    - sidB a.b <payload>

And dispatch:
    - OnX(sidA, a.b, payload)
    - OnY(sidA, a.b, payload)
    - OnX(sidB, a.b, payload)
    - OnY(sidB, a.b, payload)
  • Loading branch information
watfordgnf authored Aug 16, 2020
1 parent 36df86d commit 0fc0815
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 2 deletions.
30 changes: 29 additions & 1 deletion src/IntegrationTests/SubTests.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using FluentAssertions;
using MyNatsClient;
Expand Down Expand Up @@ -425,5 +426,32 @@ public async Task Given_subscribed_with_wildcard_async_using_handler_It_should_g
_sync.InterceptedCount.Should().Be(3);
_sync.Intercepted.Select(i => i.Subject).Should().OnlyContain(i => i.StartsWith(subjectNs));
}

[Fact]
public async Task Given_subscribed_with_overlapping_wildcards_async_using_handler_It_should_get_only_subcription_messages()
{
const string subjectNs = "foo.tests.";

_sync = Sync.MaxOne();
_client = await Context.ConnectClientAsync();

// First subscription
await _client.SubAsync(subjectNs + ">");

// Second, overlapping subscription
var subscription = await _client.SubAsync(subjectNs + "*", stream => stream.Subscribe(msg => _sync.Release(msg)));

await Context.DelayAsync();

await _client.PubAsync(subjectNs + "type1", "Test1");
_sync.WaitForAny();
await _client.PubAsync(subjectNs + "type2", "Test2");
_sync.WaitForAny();
await _client.PubAsync(subjectNs + "type3", "Test3");
_sync.WaitForAny();

_sync.InterceptedCount.Should().Be(3);
_sync.Intercepted.Select(i => i.SubscriptionId).Should().OnlyContain(i => i == subscription.SubscriptionInfo.Id);
}
}
}
}
2 changes: 1 addition & 1 deletion src/MyNatsClient/NatsClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -608,7 +608,7 @@ private Subscription CreateSubscription(SubscriptionInfo subscriptionInfo, Func<
{
var subscription = Subscription.Create(
subscriptionInfo,
subscriptionFactory(MsgOpStream.Where(msg => subscriptionInfo.Matches(msg.Subject))),
subscriptionFactory(MsgOpStream.Where(msg => msg.SubscriptionId == subscriptionInfo.Id)),
DisposeSubscription);

if (!_subscriptions.TryAdd(subscription.SubscriptionInfo.Id, subscription))
Expand Down

0 comments on commit 0fc0815

Please sign in to comment.