From 7dead9cc9490818a1c8794007d63cb4b3f550646 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?V=C3=A1clav=20Haisman?= Date: Fri, 2 Feb 2024 21:00:11 +0100 Subject: [PATCH] GH-8879: Add MQTT subscription identifier Fixes: #8879 To workaround the problem with `$share/` subscriptions the `Mqttv5PahoMessageDrivenChannelAdapter` must provide a `subscriptionIdentifier` into `MqttProperties` on `subscribe()` * Introduce a `Mqttv5PahoMessageDrivenChannelAdapter.subscriptionIdentifierCounter` according to the MQTT specification: > 3.8.2.1.2 Subscription Identifier: [..]The Subscription Identifier is associated with any subscription created or modified as the result of this SUBSCRIBE packet. If there is a Subscription Identifier, it is stored with the subscription. This one is associated with the MQTT session for the current subscriber and does not interfere into other sessions even if identifier is same from the counter. It works because the Subscription identifier is per session and because you cannot have multiple connection with the same client ID. **Cherry-pick to `6.2.x` & `6.1.x`** --- .../Mqttv5PahoMessageDrivenChannelAdapter.java | 13 +++++++++++-- .../integration/mqtt/Mqttv5BackToBackTests.java | 16 ++++++++++++++++ 2 files changed, 27 insertions(+), 2 deletions(-) diff --git a/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/Mqttv5PahoMessageDrivenChannelAdapter.java b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/Mqttv5PahoMessageDrivenChannelAdapter.java index cdb59906e27..acc3e04e042 100644 --- a/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/Mqttv5PahoMessageDrivenChannelAdapter.java +++ b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/Mqttv5PahoMessageDrivenChannelAdapter.java @@ -21,6 +21,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.stream.IntStream; @@ -107,6 +108,8 @@ public class Mqttv5PahoMessageDrivenChannelAdapter private volatile boolean readyToSubscribeOnStart; + private final AtomicInteger subscriptionIdentifierCounter = new AtomicInteger(0); + /** * Create an instance based on the MQTT url, client id and subscriptions. * @param url the MQTT url to connect. @@ -336,8 +339,10 @@ public void addTopic(String topic, int qos) { this.subscriptions.add(subscription); } if (this.mqttClient != null && this.mqttClient.isConnected()) { + MqttProperties subscriptionProperties = new MqttProperties(); + subscriptionProperties.setSubscriptionIdentifier(this.subscriptionIdentifierCounter.incrementAndGet()); this.mqttClient.subscribe(new MqttSubscription[] { subscription }, - null, null, new IMqttMessageListener[] { this::messageArrived }, new MqttProperties()) + null, null, new IMqttMessageListener[] { this::messageArrived }, subscriptionProperties) .waitForCompletion(getCompletionTimeout()); } } @@ -467,7 +472,11 @@ private void subscribe() { IMqttMessageListener[] listeners = IntStream.range(0, mqttSubscriptions.length) .mapToObj(t -> listener) .toArray(IMqttMessageListener[]::new); - this.mqttClient.subscribe(mqttSubscriptions, null, null, listeners, new MqttProperties()) + MqttProperties subscriptionProperties = new MqttProperties(); + subscriptionProperties.setSubscriptionIdentifiers(IntStream.range(0, mqttSubscriptions.length) + .mapToObj(i -> this.subscriptionIdentifierCounter.incrementAndGet()) + .toList()); + this.mqttClient.subscribe(mqttSubscriptions, null, null, listeners, subscriptionProperties) .waitForCompletion(getCompletionTimeout()); String message = "Connected and subscribed to " + Arrays.toString(mqttSubscriptions); logger.debug(message); diff --git a/spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/Mqttv5BackToBackTests.java b/spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/Mqttv5BackToBackTests.java index 80e596d0cf9..588d62d8e54 100644 --- a/spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/Mqttv5BackToBackTests.java +++ b/spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/Mqttv5BackToBackTests.java @@ -137,6 +137,22 @@ public void testSimpleMqttv5Interaction() { assertThat(receive.getPayload()).isEqualTo(testPayload); } + @Test + public void testSharedTopicMqttv5Interaction() { + this.mqttv5MessageDrivenChannelAdapter.addTopic("$share/group/testTopic"); + + String testPayload = "shared topic payload"; + this.mqttOutFlowInput.send( + MessageBuilder.withPayload(testPayload) + .setHeader(MqttHeaders.TOPIC, "testTopic") + .build()); + + Message receive = this.fromMqttChannel.receive(10_000); + + assertThat(receive).isNotNull(); + assertThat(receive.getPayload()).isEqualTo(testPayload); + } + @Configuration @EnableIntegration