diff --git a/mqtt-client/api/mqtt-client.api b/mqtt-client/api/mqtt-client.api index 7ea6b4b6..353291d5 100644 --- a/mqtt-client/api/mqtt-client.api +++ b/mqtt-client/api/mqtt-client.api @@ -29,10 +29,11 @@ public abstract interface class com/gojek/mqtt/client/MqttInterceptor { public final class com/gojek/mqtt/client/config/ExperimentConfigs { public fun ()V - public fun (Lcom/gojek/mqtt/client/config/SubscriptionStore;Lcom/gojek/mqtt/model/AdaptiveKeepAliveConfig;IIIIJJZI)V - public synthetic fun (Lcom/gojek/mqtt/client/config/SubscriptionStore;Lcom/gojek/mqtt/model/AdaptiveKeepAliveConfig;IIIIJJZIILkotlin/jvm/internal/DefaultConstructorMarker;)V + public fun (Lcom/gojek/mqtt/client/config/SubscriptionStore;Lcom/gojek/mqtt/model/AdaptiveKeepAliveConfig;IIIIJJZIZ)V + public synthetic fun (Lcom/gojek/mqtt/client/config/SubscriptionStore;Lcom/gojek/mqtt/model/AdaptiveKeepAliveConfig;IIIIJJZIZILkotlin/jvm/internal/DefaultConstructorMarker;)V public final fun component1 ()Lcom/gojek/mqtt/client/config/SubscriptionStore; public final fun component10 ()I + public final fun component11 ()Z public final fun component2 ()Lcom/gojek/mqtt/model/AdaptiveKeepAliveConfig; public final fun component3 ()I public final fun component4 ()I @@ -41,8 +42,8 @@ public final class com/gojek/mqtt/client/config/ExperimentConfigs { public final fun component7 ()J public final fun component8 ()J public final fun component9 ()Z - public final fun copy (Lcom/gojek/mqtt/client/config/SubscriptionStore;Lcom/gojek/mqtt/model/AdaptiveKeepAliveConfig;IIIIJJZI)Lcom/gojek/mqtt/client/config/ExperimentConfigs; - public static synthetic fun copy$default (Lcom/gojek/mqtt/client/config/ExperimentConfigs;Lcom/gojek/mqtt/client/config/SubscriptionStore;Lcom/gojek/mqtt/model/AdaptiveKeepAliveConfig;IIIIJJZIILjava/lang/Object;)Lcom/gojek/mqtt/client/config/ExperimentConfigs; + public final fun copy (Lcom/gojek/mqtt/client/config/SubscriptionStore;Lcom/gojek/mqtt/model/AdaptiveKeepAliveConfig;IIIIJJZIZ)Lcom/gojek/mqtt/client/config/ExperimentConfigs; + public static synthetic fun copy$default (Lcom/gojek/mqtt/client/config/ExperimentConfigs;Lcom/gojek/mqtt/client/config/SubscriptionStore;Lcom/gojek/mqtt/model/AdaptiveKeepAliveConfig;IIIIJJZIZILjava/lang/Object;)Lcom/gojek/mqtt/client/config/ExperimentConfigs; public fun equals (Ljava/lang/Object;)Z public final fun getActivityCheckIntervalSeconds ()I public final fun getAdaptiveKeepAliveConfig ()Lcom/gojek/mqtt/model/AdaptiveKeepAliveConfig; @@ -53,6 +54,7 @@ public final class com/gojek/mqtt/client/config/ExperimentConfigs { public final fun getMaxInflightMessagesLimit ()I public final fun getPolicyResetTimeSeconds ()I public final fun getShouldUseNewSSLFlow ()Z + public final fun getStopMqttThreadOnDestroy ()Z public final fun getSubscriptionStore ()Lcom/gojek/mqtt/client/config/SubscriptionStore; public fun hashCode ()I public fun toString ()Ljava/lang/String; @@ -816,6 +818,23 @@ public final class com/gojek/mqtt/event/MqttEvent$OfflineMessageDiscardedEvent : public fun toString ()Ljava/lang/String; } +public final class com/gojek/mqtt/event/MqttEvent$OperationDiscardedEvent : com/gojek/mqtt/event/MqttEvent { + public fun (Ljava/lang/String;Ljava/lang/String;Lcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;)V + public synthetic fun (Ljava/lang/String;Ljava/lang/String;Lcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;ILkotlin/jvm/internal/DefaultConstructorMarker;)V + public final fun component1 ()Ljava/lang/String; + public final fun component2 ()Ljava/lang/String; + public final fun component3 ()Lcom/gojek/mqtt/client/connectioninfo/ConnectionInfo; + public final fun copy (Ljava/lang/String;Ljava/lang/String;Lcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;)Lcom/gojek/mqtt/event/MqttEvent$OperationDiscardedEvent; + public static synthetic fun copy$default (Lcom/gojek/mqtt/event/MqttEvent$OperationDiscardedEvent;Ljava/lang/String;Ljava/lang/String;Lcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;ILjava/lang/Object;)Lcom/gojek/mqtt/event/MqttEvent$OperationDiscardedEvent; + public fun equals (Ljava/lang/Object;)Z + public fun getConnectionInfo ()Lcom/gojek/mqtt/client/connectioninfo/ConnectionInfo; + public final fun getName ()Ljava/lang/String; + public final fun getReason ()Ljava/lang/String; + public fun hashCode ()I + public fun setConnectionInfo (Lcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;)V + public fun toString ()Ljava/lang/String; +} + public final class com/gojek/mqtt/event/MqttEvent$OptimalKeepAliveFoundEvent : com/gojek/mqtt/event/MqttEvent { public fun (IIILcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;)V public synthetic fun (IIILcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;ILkotlin/jvm/internal/DefaultConstructorMarker;)V diff --git a/mqtt-client/src/main/java/com/gojek/mqtt/client/config/ExperimentConfigs.kt b/mqtt-client/src/main/java/com/gojek/mqtt/client/config/ExperimentConfigs.kt index 61299a8a..3c7cae0d 100644 --- a/mqtt-client/src/main/java/com/gojek/mqtt/client/config/ExperimentConfigs.kt +++ b/mqtt-client/src/main/java/com/gojek/mqtt/client/config/ExperimentConfigs.kt @@ -17,7 +17,8 @@ data class ExperimentConfigs( val incomingMessagesTTLSecs: Long = 360, val incomingMessagesCleanupIntervalSecs: Long = 60, val shouldUseNewSSLFlow: Boolean = false, - val maxInflightMessagesLimit: Int = MAX_INFLIGHT_MESSAGES_ALLOWED + val maxInflightMessagesLimit: Int = MAX_INFLIGHT_MESSAGES_ALLOWED, + val stopMqttThreadOnDestroy: Boolean = false ) enum class SubscriptionStore { diff --git a/mqtt-client/src/main/java/com/gojek/mqtt/client/v3/impl/AndroidMqttClient.kt b/mqtt-client/src/main/java/com/gojek/mqtt/client/v3/impl/AndroidMqttClient.kt index d22ec73e..475a7322 100644 --- a/mqtt-client/src/main/java/com/gojek/mqtt/client/v3/impl/AndroidMqttClient.kt +++ b/mqtt-client/src/main/java/com/gojek/mqtt/client/v3/impl/AndroidMqttClient.kt @@ -1,13 +1,6 @@ package com.gojek.mqtt.client.v3.impl import android.content.Context -import android.os.Bundle -import android.os.Handler -import android.os.HandlerThread -import android.os.Looper -import android.os.Message -import android.os.Messenger -import android.os.RemoteException import androidx.annotation.RequiresApi import com.gojek.courier.QoS import com.gojek.courier.callback.SendMessageCallback @@ -38,11 +31,11 @@ import com.gojek.mqtt.client.model.ConnectionState.DISCONNECTING import com.gojek.mqtt.client.model.ConnectionState.INITIALISED import com.gojek.mqtt.client.model.MqttSendPacket import com.gojek.mqtt.client.v3.IAndroidMqttClient +import com.gojek.mqtt.client.v3.impl.State.DESTROYED +import com.gojek.mqtt.client.v3.impl.State.UNINITIALISED import com.gojek.mqtt.connection.IMqttConnection import com.gojek.mqtt.connection.MqttConnection import com.gojek.mqtt.connection.config.v3.ConnectionConfig -import com.gojek.mqtt.constants.MESSAGE -import com.gojek.mqtt.constants.MSG_APP_PUBLISH import com.gojek.mqtt.event.EventHandler import com.gojek.mqtt.event.MqttEvent.AuthenticatorErrorEvent import com.gojek.mqtt.event.MqttEvent.MqttConnectDiscardedEvent @@ -54,8 +47,8 @@ import com.gojek.mqtt.event.MqttEvent.MqttMessageSendEvent import com.gojek.mqtt.event.MqttEvent.MqttMessageSendFailureEvent import com.gojek.mqtt.event.MqttEvent.MqttMessageSendSuccessEvent import com.gojek.mqtt.event.MqttEvent.MqttReconnectEvent +import com.gojek.mqtt.event.MqttEvent.OperationDiscardedEvent import com.gojek.mqtt.exception.toCourierException -import com.gojek.mqtt.handler.IncomingHandler import com.gojek.mqtt.model.MqttConnectOptions import com.gojek.mqtt.model.MqttPacket import com.gojek.mqtt.network.NetworkHandler @@ -78,6 +71,7 @@ import com.gojek.mqtt.wakelock.WakeLockProvider import com.gojek.networktracker.NetworkStateTracker import java.nio.charset.StandardCharsets import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicReference import org.eclipse.paho.client.mqttv3.MqttException import org.eclipse.paho.client.mqttv3.MqttException.REASON_CODE_UNEXPECTED_ERROR import org.eclipse.paho.client.mqttv3.MqttPersistenceException @@ -95,9 +89,6 @@ internal class AndroidMqttClient( private val runnableScheduler: IRunnableScheduler private val mqttConnection: IMqttConnection - private val mqttThreadLooper: Looper - private val mqttThreadHandler: Handler - private var mMessenger: Messenger private val networkUtils: NetworkUtils private val mqttUtils: MqttUtils private val mqttPersistence: PahoPersistence @@ -114,8 +105,7 @@ internal class AndroidMqttClient( @Volatile private var globalListener: MessageListener? = null - @Volatile - private var isInitialised = false + private var state = AtomicReference(UNINITIALISED) // Accessed only from mqtt thread private var forceRefresh = false @@ -133,21 +123,12 @@ internal class AndroidMqttClient( init { logger = mqttConfiguration.logger - val mqttHandlerThread = HandlerThread("MQTT_Thread") - mqttHandlerThread.start() - mqttThreadLooper = mqttHandlerThread.looper - mqttThreadHandler = Handler(mqttThreadLooper) - mMessenger = Messenger( - IncomingHandler(mqttThreadLooper, this, logger) - ) @RequiresApi runnableScheduler = MqttRunnableScheduler( - mqttHandlerThread, - mqttThreadHandler, - this, - logger, - eventHandler, - experimentConfigs.activityCheckIntervalSeconds + clientSchedulerBridge = this, + logger = logger, + eventHandler = eventHandler, + activityCheckIntervalSeconds = experimentConfigs.activityCheckIntervalSeconds ) mqttUtils = MqttUtils() networkUtils = NetworkUtils() @@ -214,19 +195,24 @@ internal class AndroidMqttClient( connectOptions: MqttConnectOptions ) { this.connectOptions = connectOptions - isInitialised = true + runnableScheduler.start() + state.set(State.INITIALISED) runnableScheduler.connectMqtt() } // This can be invoked on any thread override fun reconnect() { + if (state.get() != State.INITIALISED) { + eventHandler.onEvent(OperationDiscardedEvent("Reconnect", "Client is not initialised")) + return + } eventHandler.onEvent(MqttReconnectEvent()) runnableScheduler.disconnectMqtt(true) } // This can be invoked on any thread override fun disconnect(clearState: Boolean) { - isInitialised = false + state.set(State.DISCONNECTED) runnableScheduler.disconnectMqtt(false, clearState) } @@ -299,6 +285,10 @@ internal class AndroidMqttClient( // This can be invoked on any thread override fun send(mqttPacket: MqttPacket, sendMessageCallback: SendMessageCallback): Boolean { + if (state.get() == DESTROYED) { + eventHandler.onEvent(OperationDiscardedEvent("SendMessage", "Client is not initialised")) + return false + } val mqttSendPacket = MqttSendPacket( message = mqttPacket.message, messageId = 0, @@ -309,25 +299,7 @@ internal class AndroidMqttClient( triggerTime = System.nanoTime(), sendMessageCallback = sendMessageCallback ) - - val msg = Message.obtain() - msg.what = MSG_APP_PUBLISH - - val bundle = Bundle() - bundle.putParcelable(MESSAGE, mqttSendPacket) - - msg.data = bundle - msg.replyTo = mMessenger - - try { - mMessenger.send(msg) - } catch (e: RemoteException) { - /* Service is dead. What to do? */ - logger.e(TAG, "Remote Service dead", e) - return false - } - - return true + return runnableScheduler.sendMessage(mqttSendPacket) } override fun addMessageListener(topic: String, listener: MessageListener) { @@ -347,7 +319,7 @@ internal class AndroidMqttClient( val startTime = clock.nanoTime() try { logger.d(TAG, "Sending onConnectAttempt event") - if (!isInitialised) { + if (state.get() != State.INITIALISED) { logger.d(TAG, "Mqtt Client not initialised") eventHandler.onEvent( MqttConnectDiscardedEvent( @@ -414,6 +386,10 @@ internal class AndroidMqttClient( eventHandler.onEvent(MqttDisconnectEvent()) mqttConnection.disconnect() if (clearState) { + if (experimentConfigs.stopMqttThreadOnDestroy) { + state.set(DESTROYED) + runnableScheduler.stop() + } mqttConnection.shutDown() subscriptionStore.clear() mqttPersistence.clearAll() @@ -432,11 +408,19 @@ internal class AndroidMqttClient( override fun subscribe(topicMap: Map) { val addedTopics = subscriptionStore.subscribeTopics(topicMap) + if (state.get() != State.INITIALISED) { + eventHandler.onEvent(OperationDiscardedEvent("Subscribe", "Client is not initialised")) + return + } runnableScheduler.scheduleSubscribe(0, addedTopics) } override fun unsubscribe(topics: List) { val removedTopics = subscriptionStore.unsubscribeTopics(topics) + if (state.get() != State.INITIALISED) { + eventHandler.onEvent(OperationDiscardedEvent("Unsubscribe", "Client is not initialised")) + return + } runnableScheduler.scheduleUnsubscribe(0, removedTopics) } @@ -626,3 +610,7 @@ internal class AndroidMqttClient( const val TAG = "AndroidMqttClient" } } + +private enum class State { + UNINITIALISED, INITIALISED, DISCONNECTED, DESTROYED +} diff --git a/mqtt-client/src/main/java/com/gojek/mqtt/event/MqttEvent.kt b/mqtt-client/src/main/java/com/gojek/mqtt/event/MqttEvent.kt index 7e114e29..48c069c4 100644 --- a/mqtt-client/src/main/java/com/gojek/mqtt/event/MqttEvent.kt +++ b/mqtt-client/src/main/java/com/gojek/mqtt/event/MqttEvent.kt @@ -21,6 +21,12 @@ sealed class MqttEvent(open var connectionInfo: ConnectionInfo?) { override var connectionInfo: ConnectionInfo? = null ) : MqttEvent(connectionInfo) + data class OperationDiscardedEvent( + val name: String, + val reason: String, + override var connectionInfo: ConnectionInfo? = null + ) : MqttEvent(connectionInfo) + data class MqttConnectSuccessEvent( val activeNetInfo: ActiveNetInfo, val serverUri: ServerUri?, diff --git a/mqtt-client/src/main/java/com/gojek/mqtt/scheduler/IRunnableScheduler.kt b/mqtt-client/src/main/java/com/gojek/mqtt/scheduler/IRunnableScheduler.kt index 70b8b93c..eae23a78 100644 --- a/mqtt-client/src/main/java/com/gojek/mqtt/scheduler/IRunnableScheduler.kt +++ b/mqtt-client/src/main/java/com/gojek/mqtt/scheduler/IRunnableScheduler.kt @@ -1,6 +1,7 @@ package com.gojek.mqtt.scheduler import com.gojek.courier.QoS +import com.gojek.mqtt.client.model.MqttSendPacket internal interface IRunnableScheduler { fun connectMqtt() @@ -24,4 +25,9 @@ internal interface IRunnableScheduler { fun scheduleResetParams(delayMillis: Long) fun scheduleAuthFailureRunnable(delayMillis: Long) + + fun sendMessage(mqttSendPacket: MqttSendPacket): Boolean + + fun start() + fun stop() } diff --git a/mqtt-client/src/main/java/com/gojek/mqtt/scheduler/MqttRunnableScheduler.kt b/mqtt-client/src/main/java/com/gojek/mqtt/scheduler/MqttRunnableScheduler.kt index f1be998d..bc61e93d 100644 --- a/mqtt-client/src/main/java/com/gojek/mqtt/scheduler/MqttRunnableScheduler.kt +++ b/mqtt-client/src/main/java/com/gojek/mqtt/scheduler/MqttRunnableScheduler.kt @@ -1,13 +1,22 @@ package com.gojek.mqtt.scheduler +import android.os.Bundle import android.os.Handler import android.os.HandlerThread +import android.os.Message +import android.os.Messenger +import android.os.RemoteException import com.gojek.courier.QoS import com.gojek.courier.logging.ILogger import com.gojek.mqtt.client.IClientSchedulerBridge +import com.gojek.mqtt.client.model.MqttSendPacket +import com.gojek.mqtt.client.v3.impl.AndroidMqttClient +import com.gojek.mqtt.constants.MESSAGE import com.gojek.mqtt.constants.MQTT_WAIT_BEFORE_RECONNECT_TIME_MS +import com.gojek.mqtt.constants.MSG_APP_PUBLISH import com.gojek.mqtt.event.EventHandler import com.gojek.mqtt.event.MqttEvent.HandlerThreadNotAliveEvent +import com.gojek.mqtt.handler.IncomingHandler import com.gojek.mqtt.policies.connectretrytime.ConnectRetryTimeConfig import com.gojek.mqtt.scheduler.runnable.ActivityCheckRunnable import com.gojek.mqtt.scheduler.runnable.AuthFailureRunnable @@ -19,13 +28,14 @@ import com.gojek.mqtt.scheduler.runnable.SubscribeRunnable import com.gojek.mqtt.scheduler.runnable.UnsubscribeRunnable internal class MqttRunnableScheduler( - private val handlerThread: HandlerThread, - private val mqttThreadHandler: Handler, private val clientSchedulerBridge: IClientSchedulerBridge, private val logger: ILogger, private val eventHandler: EventHandler, private val activityCheckIntervalSeconds: Int ) : IRunnableScheduler { + private var handlerThread: HandlerThread + private var mqttThreadHandler: Handler + private var mMessenger: Messenger private val connectionCheckRunnable = ConnectionCheckRunnable(clientSchedulerBridge) private val mqttExceptionRunnable = MqttExceptionRunnable(clientSchedulerBridge) private val disconnectRunnable = DisconnectRunnable( @@ -35,6 +45,15 @@ internal class MqttRunnableScheduler( private val resetParamsRunnable = ResetParamsRunnable(clientSchedulerBridge) private val authFailureRunnable = AuthFailureRunnable(clientSchedulerBridge) + init { + handlerThread = HandlerThread("MQTT_Thread") + handlerThread.start() + mqttThreadHandler = Handler(handlerThread.looper) + mMessenger = Messenger( + IncomingHandler(handlerThread.looper, clientSchedulerBridge, logger) + ) + } + override fun connectMqtt() { connectMqtt(MQTT_WAIT_BEFORE_RECONNECT_TIME_MS) } @@ -154,6 +173,43 @@ internal class MqttRunnableScheduler( } } + override fun sendMessage(mqttSendPacket: MqttSendPacket): Boolean { + val msg = Message.obtain() + msg.what = MSG_APP_PUBLISH + + val bundle = Bundle() + bundle.putParcelable(MESSAGE, mqttSendPacket) + + msg.data = bundle + msg.replyTo = mMessenger + + try { + mMessenger.send(msg) + } catch (e: RemoteException) { + /* Service is dead. What to do? */ + logger.e(AndroidMqttClient.TAG, "Remote Service dead", e) + return false + } + return true + } + + @Synchronized + override fun start() { + if (handlerThread.isAlive.not()) { + handlerThread = HandlerThread("MQTT_Thread") + handlerThread.start() + mqttThreadHandler = Handler(handlerThread.looper) + mMessenger = Messenger( + IncomingHandler(handlerThread.looper, clientSchedulerBridge, logger) + ) + } + } + + @Synchronized + override fun stop() { + handlerThread.quitSafely() + } + private fun sendThreadEventIfNotAlive() { if (handlerThread.isAlive.not()) { eventHandler.onEvent(