Skip to content

Commit

Permalink
GH-8873: Fix on-demand subscription for MQTT v5
Browse files Browse the repository at this point in the history
Fixes: #8873

The `mqttClient.subscribe()` API does not check if properties are provided and fails on the
`subscriptionProperties.getSubscriptionIdentifiers().get(0)` call with an `IndexOutOfBoundsException`

* Use another `mqttClient.subscribe()` API in the `Mqttv5PahoMessageDrivenChannelAdapter` where there is not such a check
* Ensure that `addTopic(NAME)` works as expected in the `Mqttv5BackToBackTests`

**Cherry-pick to `6.2.x` & `6.1.x`**

# Conflicts:
#	spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/Mqttv5PahoMessageDrivenChannelAdapter.java
  • Loading branch information
artembilan committed Jan 30, 2024
1 parent a73fecf commit c4b8bc7
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,8 @@ public void addTopic(String topic, int qos) {
try {
super.addTopic(topic, qos);
if (this.mqttClient != null && this.mqttClient.isConnected()) {
this.mqttClient.subscribe(new MqttSubscription(topic, qos), this::messageArrived)
this.mqttClient.subscribe(new MqttSubscription[] { new MqttSubscription(topic, qos) },
null, null, new IMqttMessageListener[] { this::messageArrived }, new MqttProperties())
.waitForCompletion(getCompletionTimeout());
}
}
Expand Down Expand Up @@ -408,7 +409,7 @@ private void subscribe() {
IMqttMessageListener[] listeners = IntStream.range(0, topics.length)
.mapToObj(t -> listener)
.toArray(IMqttMessageListener[]::new);
this.mqttClient.subscribe(subscriptions, null, null, listeners, null)
this.mqttClient.subscribe(subscriptions, null, null, listeners, new MqttProperties())
.waitForCompletion(getCompletionTimeout());
String message = "Connected and subscribed to " + Arrays.toString(topics);
logger.debug(message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ public class Mqttv5BackToBackTests implements MosquittoContainerTest {
@Autowired
private Config config;

@Autowired
private Mqttv5PahoMessageDrivenChannelAdapter mqttv5MessageDrivenChannelAdapter;

@Test //GH-3732
public void testNoNpeIsNotThrownInCaseDoInitIsNotInvokedBeforeTopicAddition() {
Mqttv5PahoMessageDrivenChannelAdapter channelAdapter = new Mqttv5PahoMessageDrivenChannelAdapter("tcp://mock-url.com:8091", "mock-client-id", "123");
Expand Down Expand Up @@ -115,6 +118,20 @@ public void testSimpleMqttv5Interaction() {
.hasAtLeastOneElementOfType(MqttMessageSentEvent.class)
.hasAtLeastOneElementOfType(MqttMessageDeliveredEvent.class)
.hasAtLeastOneElementOfType(MqttSubscribedEvent.class);

this.mqttv5MessageDrivenChannelAdapter.addTopic("anotherTopic");

testPayload = "another payload";

this.mqttOutFlowInput.send(
MessageBuilder.withPayload(testPayload)
.setHeader(MqttHeaders.TOPIC, "anotherTopic")
.build());

receive = this.fromMqttChannel.receive(10_000);

assertThat(receive).isNotNull();
assertThat(receive.getPayload()).isEqualTo(testPayload);
}


Expand Down

0 comments on commit c4b8bc7

Please sign in to comment.