Skip to content

Commit

Permalink
GH-8879: Add MQTT subscription identifier
Browse files Browse the repository at this point in the history
Fixes: #8879

To work around the problem with `$share/` subscriptions the `Mqttv5PahoMessageDrivenChannelAdapter` must provide a `subscriptionIdentifier` into `MqttProperties` on `subscribe()`

* Introduce a `Mqttv5PahoMessageDrivenChannelAdapter.subscriptionIdentifierCounter` according to the MQTT specification:
> 3.8.2.1.2 Subscription Identifier: [..]The Subscription Identifier is associated with any subscription created or modified as the result of this SUBSCRIBE packet. If there is a Subscription Identifier, it is stored with the subscription.

This one is associated with the MQTT session for the current subscriber and does not interfere into other sessions even if identifier is same from the counter.
It works because the Subscription identifier is per session and because you cannot have multiple connection with the same client ID.

**Cherry-pick to `6.2.x` & `6.1.x`**
# Conflicts:
#	spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/Mqttv5PahoMessageDrivenChannelAdapter.java
  • Loading branch information
wilx authored and artembilan committed Feb 2, 2024
1 parent 5996db5 commit 9ad3991
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021-2023 the original author or authors.
* Copyright 2021-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,6 +18,7 @@

import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;

import org.eclipse.paho.mqttv5.client.IMqttAsyncClient;
Expand Down Expand Up @@ -96,6 +97,8 @@ public class Mqttv5PahoMessageDrivenChannelAdapter

private volatile boolean readyToSubscribeOnStart;

private final AtomicInteger subscriptionIdentifierCounter = new AtomicInteger(0);

public Mqttv5PahoMessageDrivenChannelAdapter(String url, String clientId, String... topic) {
super(url, clientId, topic);
Assert.hasText(url, "'url' cannot be null or empty");
Expand Down Expand Up @@ -266,8 +269,10 @@ public void addTopic(String topic, int qos) {
try {
super.addTopic(topic, qos);
if (this.mqttClient != null && this.mqttClient.isConnected()) {
MqttProperties subscriptionProperties = new MqttProperties();
subscriptionProperties.setSubscriptionIdentifier(this.subscriptionIdentifierCounter.incrementAndGet());
this.mqttClient.subscribe(new MqttSubscription[] { new MqttSubscription(topic, qos) },
null, null, new IMqttMessageListener[] { this::messageArrived }, new MqttProperties())
null, null, new IMqttMessageListener[] { this::messageArrived }, subscriptionProperties)
.waitForCompletion(getCompletionTimeout());
}
}
Expand Down Expand Up @@ -399,6 +404,10 @@ private void subscribe() {
IMqttMessageListener[] listeners = IntStream.range(0, topics.length)
.mapToObj(t -> listener)
.toArray(IMqttMessageListener[]::new);
MqttProperties subscriptionProperties = new MqttProperties();
subscriptionProperties.setSubscriptionIdentifiers(IntStream.range(0, topics.length)
.mapToObj(i -> this.subscriptionIdentifierCounter.incrementAndGet())
.toList());
this.mqttClient.subscribe(subscriptions, null, null, listeners, new MqttProperties())
.waitForCompletion(getCompletionTimeout());
String message = "Connected and subscribed to " + Arrays.toString(topics);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,22 @@ public void testSimpleMqttv5Interaction() {
assertThat(receive.getPayload()).isEqualTo(testPayload);
}

@Test
public void testSharedTopicMqttv5Interaction() {
this.mqttv5MessageDrivenChannelAdapter.addTopic("$share/group/testTopic");

String testPayload = "shared topic payload";
this.mqttOutFlowInput.send(
MessageBuilder.withPayload(testPayload)
.setHeader(MqttHeaders.TOPIC, "testTopic")
.build());

Message<?> receive = this.fromMqttChannel.receive(10_000);

assertThat(receive).isNotNull();
assertThat(receive.getPayload()).isEqualTo(testPayload);
}


@Configuration
@EnableIntegration
Expand Down

0 comments on commit 9ad3991

Please sign in to comment.