Skip to content

Commit

Permalink
Improved subscription remove performance
Browse files Browse the repository at this point in the history
  • Loading branch information
hylkevds committed Aug 25, 2024
1 parent f38b771 commit 0642876
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 27 deletions.
37 changes: 12 additions & 25 deletions broker/src/main/java/io/moquette/broker/subscriptions/CNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ class CNode implements Comparable<CNode> {
public static final Random SECURE_RANDOM = new SecureRandom();
private final Token token;
private PMap<String, INode> children;
// Map of subscriptions. Not a Set, because Set doesn't have a Get method and we may need to update.
private PMap<Subscription, Subscription> subscriptions;
// Map of subscriptions per clientId.
private PMap<String, Subscription> subscriptions;
// the list of SharedSubscription is sorted. The sort is necessary for fast access, instead of linear scan.
private Map<ShareName, List<SharedSubscription>> sharedSubscriptions;

Expand All @@ -55,7 +55,7 @@ class CNode implements Comparable<CNode> {
}

//Copy constructor
private CNode(Token token, PMap<String, INode> children, PMap<Subscription, Subscription> subscriptions, Map<ShareName, List<SharedSubscription>> sharedSubscriptions) {
private CNode(Token token, PMap<String, INode> children, PMap<String, Subscription> subscriptions, Map<ShareName, List<SharedSubscription>> sharedSubscriptions) {
this.token = token; // keep reference, root comparison in directory logic relies on it for now.
this.subscriptions = subscriptions;
this.sharedSubscriptions = new HashMap<>(sharedSubscriptions);
Expand Down Expand Up @@ -113,8 +113,8 @@ private List<Subscription> sharedSubscriptions() {
return selectedSubscriptions;
}

Set<Subscription> subscriptions() {
return subscriptions.keySet();
Collection<Subscription> subscriptions() {
return subscriptions.values();
}

// Mutating operation
Expand All @@ -136,15 +136,15 @@ CNode addSubscription(SubscriptionRequest request) {
final Subscription newSubscription = request.subscription();

// if already contains one with same topic and same client, keep that with higher QoS
final Subscription existing = subscriptions.get(newSubscription);
final Subscription existing = subscriptions.get(newSubscription.clientId);
if (existing != null) {
// Subscription already exists
if (needsToUpdateExistingSubscription(newSubscription, existing)) {
subscriptions = subscriptions.plus(newSubscription, newSubscription);
subscriptions = subscriptions.plus(newSubscription.clientId, newSubscription);
}
} else {
// insert into the expected index so that the sorting is maintained
subscriptions = subscriptions.plus(newSubscription, newSubscription);
subscriptions = subscriptions.plus(newSubscription.clientId, newSubscription);
}
}
return this;
Expand All @@ -170,8 +170,8 @@ private static boolean needsToUpdateExistingSubscription(Subscription newSubscri
* AND at least one subscription is actually present for that clientId
* */
boolean containsOnly(String clientId) {
for (Subscription sub : this.subscriptions.values()) {
if (!sub.clientId.equals(clientId)) {
for (String sub : this.subscriptions.keySet()) {
if (!sub.equals(clientId)) {
return false;
}
}
Expand Down Expand Up @@ -200,12 +200,7 @@ private static SharedSubscription wrapKey(String clientId) {

//TODO this is equivalent to negate(containsOnly(clientId))
private boolean containsSubscriptionsForClient(String clientId) {
for (Subscription sub : this.subscriptions.values()) {
if (sub.clientId.equals(clientId)) {
return true;
}
}
return false;
return subscriptions.containsKey(clientId);
}

void removeSubscriptionsFor(UnsubscribeRequest request) {
Expand All @@ -224,15 +219,7 @@ void removeSubscriptionsFor(UnsubscribeRequest request) {
this.sharedSubscriptions.replace(request.getSharedName(), subscriptionsForName);
}
} else {
// collect Subscription instances to remove
Set<Subscription> toRemove = new HashSet<>();
for (Subscription sub : this.subscriptions.values()) {
if (sub.clientId.equals(clientId)) {
toRemove.add(sub);
}
}
// effectively remove the instances
subscriptions = subscriptions.minusAll(toRemove);
subscriptions = subscriptions.minus(clientId);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import static io.moquette.broker.subscriptions.SubscriptionTestUtils.asSubscription;
import static io.moquette.broker.subscriptions.Topic.asTopic;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -100,7 +101,7 @@ public void testAddNewSubscriptionOnExistingNode() {
//Verify
final Optional<CNode> matchedNode = sut.lookup(asTopic("/temp"));
assertTrue(matchedNode.isPresent(), "Node on path /temp must be present");
final Set<Subscription> subscriptions = matchedNode.get().subscriptions();
final Collection<Subscription> subscriptions = matchedNode.get().subscriptions();
assertTrue(subscriptions.contains(asSubscription("TempSensor2", "/temp")));
}

Expand All @@ -118,7 +119,7 @@ public void testAddNewDeepNodes() {
//Verify
final Optional<CNode> matchedNode = sut.lookup(asTopic("/italy/happiness"));
assertTrue(matchedNode.isPresent(), "Node on path /italy/happiness must be present");
final Set<Subscription> subscriptions = matchedNode.get().subscriptions();
final Collection<Subscription> subscriptions = matchedNode.get().subscriptions();
assertTrue(subscriptions.contains(asSubscription("HappinessSensor", "/italy/happiness")));
}

Expand Down

0 comments on commit 0642876

Please sign in to comment.