From 1c9ce1cc39e487a3f33729823986c4aa10d0b65d Mon Sep 17 00:00:00 2001 From: Deepanshu Date: Wed, 14 Jun 2023 11:35:33 +0530 Subject: [PATCH] Add functionality to get individual message sent callbacks via courier service interface (#68) * Add functionality to get individual message sent callbacks via courier service interface --- .../app/data/network/CourierService.kt | 4 ++- .../com/gojek/courier/app/ui/MainActivity.kt | 21 ++++++++++++- courier-core/api/courier-core.api | 15 ++++++++++ .../callback/NoOpSendMessageCallback.kt | 19 ++++++++++++ .../courier/callback/SendMessageCallback.kt | 8 +++++ courier/api/courier.api | 3 ++ .../com/gojek/courier/annotation/Callback.kt | 6 ++++ .../parser/MethodAnnotationsParser.kt | 30 +++++++++++++++++-- .../processor/SendArgumentProcessor.kt | 14 ++++++++- .../gojek/courier/coordinator/Coordinator.kt | 3 +- mqtt-client/api/mqtt-client.api | 3 +- .../java/com/gojek/mqtt/client/MqttClient.kt | 9 +++++- .../gojek/mqtt/client/MqttCourierClient.kt | 5 ++-- .../client/internal/MqttClientInternal.kt | 5 ++-- .../gojek/mqtt/client/model/MqttSendPacket.kt | 8 +++-- .../mqtt/client/v3/IAndroidMqttClient.kt | 3 +- .../mqtt/client/v3/impl/AndroidMqttClient.kt | 16 ++++++++-- .../mqtt/client/MqttCourierClientTest.kt | 7 +++-- 18 files changed, 159 insertions(+), 20 deletions(-) create mode 100644 courier-core/src/main/java/com/gojek/courier/callback/NoOpSendMessageCallback.kt create mode 100644 courier-core/src/main/java/com/gojek/courier/callback/SendMessageCallback.kt create mode 100644 courier/src/main/java/com/gojek/courier/annotation/Callback.kt diff --git a/app/src/main/java/com/gojek/courier/app/data/network/CourierService.kt b/app/src/main/java/com/gojek/courier/app/data/network/CourierService.kt index ed741451..7e296dcc 100644 --- a/app/src/main/java/com/gojek/courier/app/data/network/CourierService.kt +++ b/app/src/main/java/com/gojek/courier/app/data/network/CourierService.kt @@ -1,17 +1,19 @@ package com.gojek.courier.app.data.network import com.gojek.courier.QoS +import com.gojek.courier.annotation.Callback import com.gojek.courier.annotation.Data import com.gojek.courier.annotation.Path import com.gojek.courier.annotation.Send import com.gojek.courier.annotation.Subscribe import com.gojek.courier.annotation.Unsubscribe import com.gojek.courier.app.data.network.model.Message +import com.gojek.courier.callback.SendMessageCallback import io.reactivex.Observable interface CourierService { @Send(topic = "{topic}", qos = QoS.ONE) - fun publish(@Path("topic") topic: String, @Data message: Message) + fun publish(@Path("topic") topic: String, @Data message: Message, @Callback callback: SendMessageCallback) @Subscribe(topic = "{topic}") fun subscribe(@Path("topic") topic: String): Observable diff --git a/app/src/main/java/com/gojek/courier/app/ui/MainActivity.kt b/app/src/main/java/com/gojek/courier/app/ui/MainActivity.kt index 2244ce47..f93a38bd 100644 --- a/app/src/main/java/com/gojek/courier/app/ui/MainActivity.kt +++ b/app/src/main/java/com/gojek/courier/app/ui/MainActivity.kt @@ -1,6 +1,7 @@ package com.gojek.courier.app.ui import android.os.Bundle +import android.util.Log import androidx.appcompat.app.AppCompatActivity import com.gojek.chuckmqtt.external.MqttChuckConfig import com.gojek.chuckmqtt.external.MqttChuckInterceptor @@ -9,6 +10,7 @@ import com.gojek.courier.Courier import com.gojek.courier.app.R import com.gojek.courier.app.data.network.CourierService import com.gojek.courier.app.data.network.model.Message +import com.gojek.courier.callback.SendMessageCallback import com.gojek.courier.logging.ILogger import com.gojek.courier.messageadapter.gson.GsonMessageAdapterFactory import com.gojek.courier.streamadapter.rxjava2.RxJava2StreamAdapterFactory @@ -78,7 +80,24 @@ class MainActivity : AppCompatActivity() { send.setOnClickListener { courierService.publish( topic = topic.text.toString(), - message = Message(123, message.text.toString()) + message = Message(123, message.text.toString()), + callback = object : SendMessageCallback { + override fun onMessageSendTrigger() { + Log.d("Courier", "onMessageSendTrigger") + } + + override fun onMessageWrittenOnSocket() { + Log.d("Courier", "onMessageWrittenOnSocket") + } + + override fun onMessageSendSuccess() { + Log.d("Courier", "onMessageSendSuccess") + } + + override fun onMessageSendFailure(error: Throwable) { + Log.d("Courier", "onMessageSendFailure") + } + } ) } diff --git a/courier-core/api/courier-core.api b/courier-core/api/courier-core.api index edccd6e2..11bf9339 100644 --- a/courier-core/api/courier-core.api +++ b/courier-core/api/courier-core.api @@ -52,6 +52,21 @@ public abstract interface class com/gojek/courier/StreamAdapter$Factory { public abstract fun create (Ljava/lang/reflect/Type;)Lcom/gojek/courier/StreamAdapter; } +public final class com/gojek/courier/callback/NoOpSendMessageCallback : com/gojek/courier/callback/SendMessageCallback { + public static final field INSTANCE Lcom/gojek/courier/callback/NoOpSendMessageCallback; + public fun onMessageSendFailure (Ljava/lang/Throwable;)V + public fun onMessageSendSuccess ()V + public fun onMessageSendTrigger ()V + public fun onMessageWrittenOnSocket ()V +} + +public abstract interface class com/gojek/courier/callback/SendMessageCallback { + public abstract fun onMessageSendFailure (Ljava/lang/Throwable;)V + public abstract fun onMessageSendSuccess ()V + public abstract fun onMessageSendTrigger ()V + public abstract fun onMessageWrittenOnSocket ()V +} + public final class com/gojek/courier/extensions/CollectionExtensionsKt { public static final fun toImmutableMap (Ljava/util/Map;)Ljava/util/Map; public static final fun toImmutableSet (Ljava/util/Set;)Ljava/util/Set; diff --git a/courier-core/src/main/java/com/gojek/courier/callback/NoOpSendMessageCallback.kt b/courier-core/src/main/java/com/gojek/courier/callback/NoOpSendMessageCallback.kt new file mode 100644 index 00000000..9ff86a4b --- /dev/null +++ b/courier-core/src/main/java/com/gojek/courier/callback/NoOpSendMessageCallback.kt @@ -0,0 +1,19 @@ +package com.gojek.courier.callback + +object NoOpSendMessageCallback : SendMessageCallback { + override fun onMessageSendTrigger() { + // no-op + } + + override fun onMessageWrittenOnSocket() { + // no-op + } + + override fun onMessageSendSuccess() { + // no-op + } + + override fun onMessageSendFailure(error: Throwable) { + // no-op + } +} diff --git a/courier-core/src/main/java/com/gojek/courier/callback/SendMessageCallback.kt b/courier-core/src/main/java/com/gojek/courier/callback/SendMessageCallback.kt new file mode 100644 index 00000000..dee5a54e --- /dev/null +++ b/courier-core/src/main/java/com/gojek/courier/callback/SendMessageCallback.kt @@ -0,0 +1,8 @@ +package com.gojek.courier.callback + +interface SendMessageCallback { + fun onMessageSendTrigger() + fun onMessageWrittenOnSocket() + fun onMessageSendSuccess() + fun onMessageSendFailure(error: Throwable) +} diff --git a/courier/api/courier.api b/courier/api/courier.api index 825a2dca..67bfedde 100644 --- a/courier/api/courier.api +++ b/courier/api/courier.api @@ -31,6 +31,9 @@ public final class com/gojek/courier/Courier$Configuration { public fun toString ()Ljava/lang/String; } +public abstract interface annotation class com/gojek/courier/annotation/Callback : java/lang/annotation/Annotation { +} + public abstract interface annotation class com/gojek/courier/annotation/Data : java/lang/annotation/Annotation { } diff --git a/courier/src/main/java/com/gojek/courier/annotation/Callback.kt b/courier/src/main/java/com/gojek/courier/annotation/Callback.kt new file mode 100644 index 00000000..0ea543f7 --- /dev/null +++ b/courier/src/main/java/com/gojek/courier/annotation/Callback.kt @@ -0,0 +1,6 @@ +package com.gojek.courier.annotation + +@MustBeDocumented +@Target(AnnotationTarget.VALUE_PARAMETER) +@kotlin.annotation.Retention(AnnotationRetention.RUNTIME) +annotation class Callback diff --git a/courier/src/main/java/com/gojek/courier/annotation/parser/MethodAnnotationsParser.kt b/courier/src/main/java/com/gojek/courier/annotation/parser/MethodAnnotationsParser.kt index 9f5b3032..401135fd 100644 --- a/courier/src/main/java/com/gojek/courier/annotation/parser/MethodAnnotationsParser.kt +++ b/courier/src/main/java/com/gojek/courier/annotation/parser/MethodAnnotationsParser.kt @@ -1,6 +1,7 @@ package com.gojek.courier.annotation.parser import com.gojek.courier.QoS +import com.gojek.courier.annotation.Callback import com.gojek.courier.annotation.Data import com.gojek.courier.annotation.Path import com.gojek.courier.annotation.Receive @@ -13,6 +14,7 @@ import com.gojek.courier.argument.processor.ReceiveArgumentProcessor import com.gojek.courier.argument.processor.SendArgumentProcessor import com.gojek.courier.argument.processor.SubscriptionArgumentProcessor import com.gojek.courier.argument.processor.UnsubscriptionArgumentProcessor +import com.gojek.courier.callback.SendMessageCallback import com.gojek.courier.stub.StubMethod import com.gojek.courier.utils.MessageAdapterResolver import com.gojek.courier.utils.StreamAdapterResolver @@ -87,7 +89,8 @@ internal class MethodAnnotationsParser( val messageType = method.getDataParameterType(dataParameterIndex) val annotations = method.getDataParameterAnnotations(dataParameterIndex) val adapter = messageAdapterResolver.resolve(messageType, annotations) - val argumentProcessor = SendArgumentProcessor(pathMap, topic, dataParameterIndex) + val callbackIndex = method.getCallbackParameterIndex() + val argumentProcessor = SendArgumentProcessor(pathMap, topic, dataParameterIndex, callbackIndex) stubMethod = StubMethod.Send(adapter, qos, argumentProcessor) } @@ -226,7 +229,7 @@ internal class MethodAnnotationsParser( private val PARAM_NAME_REGEX = Pattern.compile(PARAM) private fun Annotation.isParameterAnnotation(): Boolean { - return this is Path || this is Data || this is TopicMap + return this is Path || this is Data || this is TopicMap || this is Callback } private fun Annotation.isStubMethodAnnotation(): Boolean { @@ -268,6 +271,29 @@ internal class MethodAnnotationsParser( return index } + private fun Method.getCallbackParameterIndex(): Int { + var index = -1 + for (parameterIndex in parameterAnnotations.indices) { + val parameterAnnotations = parameterAnnotations[parameterIndex] + val annotations = parameterAnnotations.filter { it.isParameterAnnotation() } + require(annotations.size == 1) { + "A parameter must have one and only one parameter annotation: $parameterIndex" + } + if (annotations.first() is Callback) { + if (index == -1) { + index = parameterIndex + break + } else { + throw IllegalArgumentException("Multiple parameters found with @Callback annotation") + } + } + } + if (index != -1 && parameterTypes[index] != SendMessageCallback::class.java) { + throw IllegalArgumentException("Parameter annotated with @Callback should be of type SendMessageCallback: ${parameterTypes[index]}") + } + return index + } + private fun Method.getDataParameterType(index: Int): Type { return parameterTypes[index] } diff --git a/courier/src/main/java/com/gojek/courier/argument/processor/SendArgumentProcessor.kt b/courier/src/main/java/com/gojek/courier/argument/processor/SendArgumentProcessor.kt index f7bc0b3a..b5a8a43f 100644 --- a/courier/src/main/java/com/gojek/courier/argument/processor/SendArgumentProcessor.kt +++ b/courier/src/main/java/com/gojek/courier/argument/processor/SendArgumentProcessor.kt @@ -1,9 +1,13 @@ package com.gojek.courier.argument.processor +import com.gojek.courier.callback.NoOpSendMessageCallback +import com.gojek.courier.callback.SendMessageCallback + internal class SendArgumentProcessor( private val pathMap: Map, private val topic: String, - private val dataParameterIndex: Int + private val dataParameterIndex: Int, + private val callbackIndex: Int ) : ArgumentProcessor() { private var parsedTopic = topic @@ -21,4 +25,12 @@ internal class SendArgumentProcessor( fun getDataArgument(args: Array): Any { return args[dataParameterIndex] } + + fun getCallbackArgument(args: Array): SendMessageCallback { + return if (callbackIndex == -1) { + NoOpSendMessageCallback + } else { + args[callbackIndex] as SendMessageCallback + } + } } diff --git a/courier/src/main/java/com/gojek/courier/coordinator/Coordinator.kt b/courier/src/main/java/com/gojek/courier/coordinator/Coordinator.kt index c221db2a..453f980b 100644 --- a/courier/src/main/java/com/gojek/courier/coordinator/Coordinator.kt +++ b/courier/src/main/java/com/gojek/courier/coordinator/Coordinator.kt @@ -34,9 +34,10 @@ internal class Coordinator( val data = stubMethod.argumentProcessor.getDataArgument(args) stubMethod.argumentProcessor.inject(args) val topic = stubMethod.argumentProcessor.getTopic() + val callback = stubMethod.argumentProcessor.getCallbackArgument(args) val message = stubMethod.messageAdapter.toMessage(topic, data) val qos = stubMethod.qos - val sent = client.send(message, topic, qos) + val sent = client.send(message, topic, qos, callback) logger.d("Coordinator", "Sending message on topic: $topic, qos: $qos, message: $data") return sent } diff --git a/mqtt-client/api/mqtt-client.api b/mqtt-client/api/mqtt-client.api index b9874972..56a598c6 100644 --- a/mqtt-client/api/mqtt-client.api +++ b/mqtt-client/api/mqtt-client.api @@ -12,13 +12,14 @@ public abstract interface class com/gojek/mqtt/client/MqttClient { public abstract fun reconnect ()V public abstract fun removeEventHandler (Lcom/gojek/mqtt/event/EventHandler;)V public abstract fun removeMessageListener (Ljava/lang/String;Lcom/gojek/mqtt/client/listener/MessageListener;)V - public abstract fun send (Lcom/gojek/courier/Message;Ljava/lang/String;Lcom/gojek/courier/QoS;)Z + public abstract fun send (Lcom/gojek/courier/Message;Ljava/lang/String;Lcom/gojek/courier/QoS;Lcom/gojek/courier/callback/SendMessageCallback;)Z public abstract fun subscribe (Lkotlin/Pair;[Lkotlin/Pair;)V public abstract fun unsubscribe (Ljava/lang/String;[Ljava/lang/String;)V } public final class com/gojek/mqtt/client/MqttClient$DefaultImpls { public static synthetic fun disconnect$default (Lcom/gojek/mqtt/client/MqttClient;ZILjava/lang/Object;)V + public static synthetic fun send$default (Lcom/gojek/mqtt/client/MqttClient;Lcom/gojek/courier/Message;Ljava/lang/String;Lcom/gojek/courier/QoS;Lcom/gojek/courier/callback/SendMessageCallback;ILjava/lang/Object;)Z } public abstract interface class com/gojek/mqtt/client/MqttInterceptor { diff --git a/mqtt-client/src/main/java/com/gojek/mqtt/client/MqttClient.kt b/mqtt-client/src/main/java/com/gojek/mqtt/client/MqttClient.kt index 2aadb27f..f8138c1c 100644 --- a/mqtt-client/src/main/java/com/gojek/mqtt/client/MqttClient.kt +++ b/mqtt-client/src/main/java/com/gojek/mqtt/client/MqttClient.kt @@ -2,6 +2,8 @@ package com.gojek.mqtt.client import com.gojek.courier.Message import com.gojek.courier.QoS +import com.gojek.courier.callback.NoOpSendMessageCallback +import com.gojek.courier.callback.SendMessageCallback import com.gojek.mqtt.client.listener.MessageListener import com.gojek.mqtt.client.model.ConnectionState import com.gojek.mqtt.event.EventHandler @@ -14,7 +16,12 @@ interface MqttClient { fun reconnect() fun subscribe(topic: Pair, vararg topics: Pair) fun unsubscribe(topic: String, vararg topics: String) - fun send(message: Message, topic: String, qos: QoS): Boolean + fun send( + message: Message, + topic: String, + qos: QoS, + sendMessageCallback: SendMessageCallback = NoOpSendMessageCallback + ): Boolean fun addMessageListener(topic: String, listener: MessageListener) fun removeMessageListener(topic: String, listener: MessageListener) fun addGlobalMessageListener(listener: MessageListener) diff --git a/mqtt-client/src/main/java/com/gojek/mqtt/client/MqttCourierClient.kt b/mqtt-client/src/main/java/com/gojek/mqtt/client/MqttCourierClient.kt index a0643119..d8d87c3f 100644 --- a/mqtt-client/src/main/java/com/gojek/mqtt/client/MqttCourierClient.kt +++ b/mqtt-client/src/main/java/com/gojek/mqtt/client/MqttCourierClient.kt @@ -2,6 +2,7 @@ package com.gojek.mqtt.client import com.gojek.courier.Message import com.gojek.courier.QoS +import com.gojek.courier.callback.SendMessageCallback import com.gojek.mqtt.client.internal.MqttClientInternal import com.gojek.mqtt.client.listener.MessageListener import com.gojek.mqtt.client.model.ConnectionState @@ -36,8 +37,8 @@ internal class MqttCourierClient( mqttClient.unsubscribe(topic, *topics) } - override fun send(message: Message, topic: String, qos: QoS): Boolean { - return mqttClient.send(MqttPacket((message as Message.Bytes).value, topic, qos)) + override fun send(message: Message, topic: String, qos: QoS, sendMessageCallback: SendMessageCallback): Boolean { + return mqttClient.send(MqttPacket((message as Message.Bytes).value, topic, qos), sendMessageCallback) } override fun addMessageListener(topic: String, listener: MessageListener) { diff --git a/mqtt-client/src/main/java/com/gojek/mqtt/client/internal/MqttClientInternal.kt b/mqtt-client/src/main/java/com/gojek/mqtt/client/internal/MqttClientInternal.kt index ea2259b2..340a8c29 100644 --- a/mqtt-client/src/main/java/com/gojek/mqtt/client/internal/MqttClientInternal.kt +++ b/mqtt-client/src/main/java/com/gojek/mqtt/client/internal/MqttClientInternal.kt @@ -2,6 +2,7 @@ package com.gojek.mqtt.client.internal import android.content.Context import com.gojek.courier.QoS +import com.gojek.courier.callback.SendMessageCallback import com.gojek.keepalive.KeepAliveFailureHandler import com.gojek.keepalive.NoOpKeepAliveFailureHandler import com.gojek.keepalive.OptimalKeepAliveFailureHandler @@ -97,8 +98,8 @@ internal class MqttClientInternal( androidMqttClient.unsubscribe(listOf(*topics)) } - fun send(mqttPacket: MqttPacket): Boolean { - return androidMqttClient.send(mqttPacket) + fun send(mqttPacket: MqttPacket, sendMessageCallback: SendMessageCallback): Boolean { + return androidMqttClient.send(mqttPacket, sendMessageCallback) } fun addMessageListener(topic: String, listener: MessageListener) { diff --git a/mqtt-client/src/main/java/com/gojek/mqtt/client/model/MqttSendPacket.kt b/mqtt-client/src/main/java/com/gojek/mqtt/client/model/MqttSendPacket.kt index 50c96220..0d7251cf 100644 --- a/mqtt-client/src/main/java/com/gojek/mqtt/client/model/MqttSendPacket.kt +++ b/mqtt-client/src/main/java/com/gojek/mqtt/client/model/MqttSendPacket.kt @@ -2,6 +2,8 @@ package com.gojek.mqtt.client.model import android.os.Parcel import android.os.Parcelable +import com.gojek.courier.callback.NoOpSendMessageCallback +import com.gojek.courier.callback.SendMessageCallback internal data class MqttSendPacket( var message: ByteArray, @@ -9,7 +11,8 @@ internal data class MqttSendPacket( var timestamp: Long, var qos: Int, var topic: String, - var type: Int + var type: Int, + var sendMessageCallback: SendMessageCallback ) : Parcelable { constructor(parcel: Parcel) : this( parcel.createByteArray()!!, @@ -17,7 +20,8 @@ internal data class MqttSendPacket( parcel.readLong(), parcel.readInt(), parcel.readString()!!, - parcel.readInt() + parcel.readInt(), + NoOpSendMessageCallback ) override fun writeToParcel(parcel: Parcel, flags: Int) { diff --git a/mqtt-client/src/main/java/com/gojek/mqtt/client/v3/IAndroidMqttClient.kt b/mqtt-client/src/main/java/com/gojek/mqtt/client/v3/IAndroidMqttClient.kt index 54443e40..feb6b164 100644 --- a/mqtt-client/src/main/java/com/gojek/mqtt/client/v3/IAndroidMqttClient.kt +++ b/mqtt-client/src/main/java/com/gojek/mqtt/client/v3/IAndroidMqttClient.kt @@ -1,6 +1,7 @@ package com.gojek.mqtt.client.v3 import com.gojek.courier.QoS +import com.gojek.courier.callback.SendMessageCallback import com.gojek.mqtt.client.listener.MessageListener import com.gojek.mqtt.client.model.ConnectionState import com.gojek.mqtt.constants.MQTT_WAIT_BEFORE_RECONNECT_TIME_MS @@ -12,7 +13,7 @@ internal interface IAndroidMqttClient { fun connect(timeMillis: Long = MQTT_WAIT_BEFORE_RECONNECT_TIME_MS) fun reconnect() fun disconnect(clearState: Boolean = false) - fun send(mqttPacket: MqttPacket): Boolean + fun send(mqttPacket: MqttPacket, sendMessageCallback: SendMessageCallback): Boolean fun addMessageListener(topic: String, listener: MessageListener) fun removeMessageListener(topic: String, listener: MessageListener) fun addGlobalMessageListener(listener: MessageListener) diff --git a/mqtt-client/src/main/java/com/gojek/mqtt/client/v3/impl/AndroidMqttClient.kt b/mqtt-client/src/main/java/com/gojek/mqtt/client/v3/impl/AndroidMqttClient.kt index 28e0376c..1e2f7d2d 100644 --- a/mqtt-client/src/main/java/com/gojek/mqtt/client/v3/impl/AndroidMqttClient.kt +++ b/mqtt-client/src/main/java/com/gojek/mqtt/client/v3/impl/AndroidMqttClient.kt @@ -10,6 +10,7 @@ import android.os.Messenger import android.os.RemoteException import androidx.annotation.RequiresApi import com.gojek.courier.QoS +import com.gojek.courier.callback.SendMessageCallback import com.gojek.courier.exception.AuthApiException import com.gojek.courier.extensions.fromNanosToMillis import com.gojek.courier.logging.ILogger @@ -250,8 +251,10 @@ internal class AndroidMqttClient( MqttMessageSendEvent(topic, qos, message.size) ) } + mqttPacket.sendMessageCallback.onMessageSendTrigger() mqttConnection.publish(mqttPacket) } catch (e: MqttPersistenceException) { + mqttPacket.sendMessageCallback.onMessageSendFailure(e) with(mqttPacket) { eventHandler.onEvent( MqttMessageSendFailureEvent( @@ -263,6 +266,7 @@ internal class AndroidMqttClient( ) } } catch (e: MqttException) { + mqttPacket.sendMessageCallback.onMessageSendFailure(e) with(mqttPacket) { eventHandler.onEvent( MqttMessageSendFailureEvent( @@ -275,6 +279,7 @@ internal class AndroidMqttClient( } runnableScheduler.scheduleMqttHandleExceptionRunnable(e, true) } catch (e: java.lang.Exception) { + mqttPacket.sendMessageCallback.onMessageSendFailure(e) // this might happen if mqtt object becomes null while disconnect, so just ignore with(mqttPacket) { eventHandler.onEvent( @@ -290,14 +295,15 @@ internal class AndroidMqttClient( } // This can be invoked on any thread - override fun send(mqttPacket: MqttPacket): Boolean { + override fun send(mqttPacket: MqttPacket, sendMessageCallback: SendMessageCallback): Boolean { val mqttSendPacket = MqttSendPacket( mqttPacket.message, 0, System.currentTimeMillis(), mqttPacket.qos.value, mqttPacket.topic, - mqttPacket.qos.type + mqttPacket.qos.type, + sendMessageCallback ) val msg = Message.obtain() @@ -577,6 +583,7 @@ internal class AndroidMqttClient( inner class MqttMessageSendListener : IMessageSendListener { override fun onSuccess(packet: MqttSendPacket) { + packet.sendMessageCallback.onMessageSendSuccess() with(packet) { eventHandler.onEvent( MqttMessageSendSuccessEvent( @@ -589,10 +596,13 @@ internal class AndroidMqttClient( } override fun onFailure(packet: MqttSendPacket, exception: Throwable) { + packet.sendMessageCallback.onMessageSendFailure(exception) runnableScheduler.connectMqtt() } - override fun notifyWrittenOnSocket(packet: MqttSendPacket) {} + override fun notifyWrittenOnSocket(packet: MqttSendPacket) { + packet.sendMessageCallback.onMessageWrittenOnSocket() + } } companion object { diff --git a/mqtt-client/src/test/java/com/gojek/mqtt/client/MqttCourierClientTest.kt b/mqtt-client/src/test/java/com/gojek/mqtt/client/MqttCourierClientTest.kt index c1726d22..ccbdef06 100644 --- a/mqtt-client/src/test/java/com/gojek/mqtt/client/MqttCourierClientTest.kt +++ b/mqtt-client/src/test/java/com/gojek/mqtt/client/MqttCourierClientTest.kt @@ -2,12 +2,14 @@ package com.gojek.mqtt.client import com.gojek.courier.Message import com.gojek.courier.QoS +import com.gojek.courier.callback.SendMessageCallback import com.gojek.mqtt.client.internal.MqttClientInternal import com.gojek.mqtt.client.listener.MessageListener import com.gojek.mqtt.client.model.ConnectionState import com.gojek.mqtt.model.MqttConnectOptions import com.gojek.mqtt.model.MqttPacket import com.nhaarman.mockitokotlin2.argumentCaptor +import com.nhaarman.mockitokotlin2.eq import com.nhaarman.mockitokotlin2.mock import com.nhaarman.mockitokotlin2.verify import com.nhaarman.mockitokotlin2.whenever @@ -71,13 +73,14 @@ class MqttCourierClientTest { @Test fun `test send`() { val message = mock() + val callback = mock() val byteArray = ByteArray(10) whenever(message.value).thenReturn(byteArray) val topic = "test/topic" val qos = QoS.ZERO - mqttCourierClient.send(message, topic, qos) + mqttCourierClient.send(message, topic, qos, callback) val argumentCaptor = argumentCaptor() - verify(mqttClientInternal).send(argumentCaptor.capture()) + verify(mqttClientInternal).send(argumentCaptor.capture(), eq(callback)) assertEquals(argumentCaptor.lastValue.message, byteArray) assertEquals(argumentCaptor.lastValue.topic, topic) assertEquals(argumentCaptor.lastValue.qos, qos)