diff --git a/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/Mqttv5PahoMessageDrivenChannelAdapter.java b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/Mqttv5PahoMessageDrivenChannelAdapter.java index b589da35fc6..330b37aa4d0 100644 --- a/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/Mqttv5PahoMessageDrivenChannelAdapter.java +++ b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/Mqttv5PahoMessageDrivenChannelAdapter.java @@ -276,7 +276,8 @@ public void addTopic(String topic, int qos) { try { super.addTopic(topic, qos); if (this.mqttClient != null && this.mqttClient.isConnected()) { - this.mqttClient.subscribe(new MqttSubscription(topic, qos), this::messageArrived) + this.mqttClient.subscribe(new MqttSubscription[] { new MqttSubscription(topic, qos) }, + null, null, new IMqttMessageListener[] { this::messageArrived }, new MqttProperties()) .waitForCompletion(getCompletionTimeout()); } } @@ -408,7 +409,7 @@ private void subscribe() { IMqttMessageListener[] listeners = IntStream.range(0, topics.length) .mapToObj(t -> listener) .toArray(IMqttMessageListener[]::new); - this.mqttClient.subscribe(subscriptions, null, null, listeners, null) + this.mqttClient.subscribe(subscriptions, null, null, listeners, new MqttProperties()) .waitForCompletion(getCompletionTimeout()); String message = "Connected and subscribed to " + Arrays.toString(topics); logger.debug(message); diff --git a/spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/Mqttv5BackToBackTests.java b/spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/Mqttv5BackToBackTests.java index 427ec5efbe5..82f696d3690 100644 --- a/spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/Mqttv5BackToBackTests.java +++ b/spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/Mqttv5BackToBackTests.java @@ -74,6 +74,9 @@ public class Mqttv5BackToBackTests implements MosquittoContainerTest { @Autowired private Config config; + @Autowired + private Mqttv5PahoMessageDrivenChannelAdapter mqttv5MessageDrivenChannelAdapter; + @Test //GH-3732 public void testNoNpeIsNotThrownInCaseDoInitIsNotInvokedBeforeTopicAddition() { Mqttv5PahoMessageDrivenChannelAdapter channelAdapter = new Mqttv5PahoMessageDrivenChannelAdapter("tcp://mock-url.com:8091", "mock-client-id", "123"); @@ -115,6 +118,20 @@ public void testSimpleMqttv5Interaction() { .hasAtLeastOneElementOfType(MqttMessageSentEvent.class) .hasAtLeastOneElementOfType(MqttMessageDeliveredEvent.class) .hasAtLeastOneElementOfType(MqttSubscribedEvent.class); + + this.mqttv5MessageDrivenChannelAdapter.addTopic("anotherTopic"); + + testPayload = "another payload"; + + this.mqttOutFlowInput.send( + MessageBuilder.withPayload(testPayload) + .setHeader(MqttHeaders.TOPIC, "anotherTopic") + .build()); + + receive = this.fromMqttChannel.receive(10_000); + + assertThat(receive).isNotNull(); + assertThat(receive.getPayload()).isEqualTo(testPayload); }