Skip to content

Commit

Permalink
GH-8879: Add MQTT subscription identifier
Browse files Browse the repository at this point in the history
Fixes: #8879

To workaround the problem with `$share/` subscriptions the `Mqttv5PahoMessageDrivenChannelAdapter` must provide a `subscriptionIdentifier` into `MqttProperties` on `subscribe()`

* Introduce a `Mqttv5PahoMessageDrivenChannelAdapter.subscriptionIdentifierCounter` according to the MQTT specification:
> 3.8.2.1.2 Subscription Identifier: [..]The Subscription Identifier is associated with any subscription created or modified as the result of this SUBSCRIBE packet. If there is a Subscription Identifier, it is stored with the subscription. 

This one is associated with the MQTT session for the current subscriber and does not interfere into other sessions even if identifier is same from the counter.
It works because the Subscription identifier is per session and because you cannot have multiple connection with the same client ID.

**Cherry-pick to `6.2.x` & `6.1.x`**
  • Loading branch information
wilx authored Feb 2, 2024
1 parent 7b2d8a4 commit 7dead9c
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.IntStream;
Expand Down Expand Up @@ -107,6 +108,8 @@ public class Mqttv5PahoMessageDrivenChannelAdapter

private volatile boolean readyToSubscribeOnStart;

private final AtomicInteger subscriptionIdentifierCounter = new AtomicInteger(0);

/**
* Create an instance based on the MQTT url, client id and subscriptions.
* @param url the MQTT url to connect.
Expand Down Expand Up @@ -336,8 +339,10 @@ public void addTopic(String topic, int qos) {
this.subscriptions.add(subscription);
}
if (this.mqttClient != null && this.mqttClient.isConnected()) {
MqttProperties subscriptionProperties = new MqttProperties();
subscriptionProperties.setSubscriptionIdentifier(this.subscriptionIdentifierCounter.incrementAndGet());
this.mqttClient.subscribe(new MqttSubscription[] { subscription },
null, null, new IMqttMessageListener[] { this::messageArrived }, new MqttProperties())
null, null, new IMqttMessageListener[] { this::messageArrived }, subscriptionProperties)
.waitForCompletion(getCompletionTimeout());
}
}
Expand Down Expand Up @@ -467,7 +472,11 @@ private void subscribe() {
IMqttMessageListener[] listeners = IntStream.range(0, mqttSubscriptions.length)
.mapToObj(t -> listener)
.toArray(IMqttMessageListener[]::new);
this.mqttClient.subscribe(mqttSubscriptions, null, null, listeners, new MqttProperties())
MqttProperties subscriptionProperties = new MqttProperties();
subscriptionProperties.setSubscriptionIdentifiers(IntStream.range(0, mqttSubscriptions.length)
.mapToObj(i -> this.subscriptionIdentifierCounter.incrementAndGet())
.toList());
this.mqttClient.subscribe(mqttSubscriptions, null, null, listeners, subscriptionProperties)
.waitForCompletion(getCompletionTimeout());
String message = "Connected and subscribed to " + Arrays.toString(mqttSubscriptions);
logger.debug(message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,22 @@ public void testSimpleMqttv5Interaction() {
assertThat(receive.getPayload()).isEqualTo(testPayload);
}

@Test
public void testSharedTopicMqttv5Interaction() {
this.mqttv5MessageDrivenChannelAdapter.addTopic("$share/group/testTopic");

String testPayload = "shared topic payload";
this.mqttOutFlowInput.send(
MessageBuilder.withPayload(testPayload)
.setHeader(MqttHeaders.TOPIC, "testTopic")
.build());

Message<?> receive = this.fromMqttChannel.receive(10_000);

assertThat(receive).isNotNull();
assertThat(receive.getPayload()).isEqualTo(testPayload);
}


@Configuration
@EnableIntegration
Expand Down

0 comments on commit 7dead9c

Please sign in to comment.