From 64bb73c2528a49ef4ba68d0c9d8f13efb4de53d9 Mon Sep 17 00:00:00 2001 From: deepanshu Date: Tue, 5 Dec 2023 09:57:42 +0530 Subject: [PATCH 1/6] Cleanup AndroidMqttClient on destroy --- .../com/gojek/courier/app/ui/MainActivity.kt | 1 + .../gojek/mqtt/client/MqttCourierClient.kt | 6 +- .../mqtt/client/config/ExperimentConfigs.kt | 3 +- .../client/internal/MqttClientInternal.kt | 120 ++++++++++++++---- .../mqtt/client/model/ConnectionState.kt | 4 + .../mqtt/client/v3/IAndroidMqttClient.kt | 3 +- .../mqtt/client/v3/impl/AndroidMqttClient.kt | 27 +++- .../gojek/mqtt/connection/MqttConnection.kt | 4 +- .../java/com/gojek/mqtt/event/MqttEvent.kt | 4 + .../mqtt/scheduler/IRunnableScheduler.kt | 4 +- .../mqtt/scheduler/MqttRunnableScheduler.kt | 4 + 11 files changed, 146 insertions(+), 34 deletions(-) diff --git a/app/src/main/java/com/gojek/courier/app/ui/MainActivity.kt b/app/src/main/java/com/gojek/courier/app/ui/MainActivity.kt index 2c4e524c..75c0f945 100644 --- a/app/src/main/java/com/gojek/courier/app/ui/MainActivity.kt +++ b/app/src/main/java/com/gojek/courier/app/ui/MainActivity.kt @@ -150,6 +150,7 @@ class MainActivity : AppCompatActivity() { mqttInterceptorList = listOf(MqttChuckInterceptor(this, MqttChuckConfig(retentionPeriod = Period.ONE_HOUR))), persistenceOptions = PahoPersistenceOptions(100, false), experimentConfigs = ExperimentConfigs( + cleanMqttClientOnDestroy = true, adaptiveKeepAliveConfig = AdaptiveKeepAliveConfig( lowerBoundMinutes = 1, upperBoundMinutes = 9, diff --git a/mqtt-client/src/main/java/com/gojek/mqtt/client/MqttCourierClient.kt b/mqtt-client/src/main/java/com/gojek/mqtt/client/MqttCourierClient.kt index d8d87c3f..c34b0282 100644 --- a/mqtt-client/src/main/java/com/gojek/mqtt/client/MqttCourierClient.kt +++ b/mqtt-client/src/main/java/com/gojek/mqtt/client/MqttCourierClient.kt @@ -22,7 +22,11 @@ internal class MqttCourierClient( } override fun disconnect(clearState: Boolean) { - mqttClient.disconnect(clearState) + if (clearState) { + mqttClient.destroy() + } else { + mqttClient.disconnect() + } } override fun reconnect() { diff --git a/mqtt-client/src/main/java/com/gojek/mqtt/client/config/ExperimentConfigs.kt b/mqtt-client/src/main/java/com/gojek/mqtt/client/config/ExperimentConfigs.kt index 61299a8a..a916f740 100644 --- a/mqtt-client/src/main/java/com/gojek/mqtt/client/config/ExperimentConfigs.kt +++ b/mqtt-client/src/main/java/com/gojek/mqtt/client/config/ExperimentConfigs.kt @@ -17,7 +17,8 @@ data class ExperimentConfigs( val incomingMessagesTTLSecs: Long = 360, val incomingMessagesCleanupIntervalSecs: Long = 60, val shouldUseNewSSLFlow: Boolean = false, - val maxInflightMessagesLimit: Int = MAX_INFLIGHT_MESSAGES_ALLOWED + val maxInflightMessagesLimit: Int = MAX_INFLIGHT_MESSAGES_ALLOWED, + val cleanMqttClientOnDestroy: Boolean = false ) enum class SubscriptionStore { diff --git a/mqtt-client/src/main/java/com/gojek/mqtt/client/internal/MqttClientInternal.kt b/mqtt-client/src/main/java/com/gojek/mqtt/client/internal/MqttClientInternal.kt index cbd51033..7fa9f83e 100644 --- a/mqtt-client/src/main/java/com/gojek/mqtt/client/internal/MqttClientInternal.kt +++ b/mqtt-client/src/main/java/com/gojek/mqtt/client/internal/MqttClientInternal.kt @@ -14,9 +14,12 @@ import com.gojek.mqtt.client.event.interceptor.MqttEventHandler import com.gojek.mqtt.client.factory.getAndroidMqttClientFactory import com.gojek.mqtt.client.listener.MessageListener import com.gojek.mqtt.client.model.ConnectionState +import com.gojek.mqtt.client.model.ConnectionState.UNINITIALISED import com.gojek.mqtt.client.v3.IAndroidMqttClient import com.gojek.mqtt.event.AdaptivePingEventHandler import com.gojek.mqtt.event.EventHandler +import com.gojek.mqtt.event.MqttEvent +import com.gojek.mqtt.event.MqttEvent.MqttClientDestroyedEvent import com.gojek.mqtt.event.MqttEvent.OptimalKeepAliveFoundEvent import com.gojek.mqtt.event.PingEventHandler import com.gojek.mqtt.model.AdaptiveKeepAliveConfig @@ -34,7 +37,7 @@ internal class MqttClientInternal( private val networkStateTracker = NetworkStateTrackerFactory.create(context.applicationContext, mqttConfiguration.logger) - private val androidMqttClient: IAndroidMqttClient + private var androidMqttClient: IAndroidMqttClient? = null private var optimalKeepAliveProvider: OptimalKeepAliveProvider? = null @@ -64,59 +67,124 @@ internal class MqttClientInternal( } init { - initialiseAdaptiveMqttClient() - androidMqttClient = androidMqttClientFactory.createAndroidMqttClient( - context = context, - mqttConfiguration = mqttConfiguration, - networkStateTracker = networkStateTracker, - keepAliveProvider = keepAliveProvider, - keepAliveFailureHandler = keepAliveFailureHandler, - eventHandler = eventHandler, - pingEventHandler = PingEventHandler(eventHandler) - ) + eventHandler.addEventHandler(object : EventHandler { + override fun onEvent(mqttEvent: MqttEvent) { + if (mqttEvent is MqttClientDestroyedEvent) { + cleanup() + } + } + }) } + @Synchronized fun connect(connectOptions: MqttConnectOptions) { - androidMqttClient.connect(connectOptions) + if (androidMqttClient == null) { + initialiseAdaptiveMqttClient() + androidMqttClient = androidMqttClientFactory.createAndroidMqttClient( + context = context, + mqttConfiguration = mqttConfiguration, + networkStateTracker = networkStateTracker, + keepAliveProvider = keepAliveProvider, + keepAliveFailureHandler = keepAliveFailureHandler, + eventHandler = eventHandler, + pingEventHandler = PingEventHandler(eventHandler) + ) + } + androidMqttClient?.connect(connectOptions) adaptiveMqttClient?.connect(connectOptions) } - fun disconnect(clearState: Boolean) { - androidMqttClient.disconnect(clearState) - adaptiveMqttClient?.disconnect(clearState) + @Synchronized + fun disconnect() { + if (androidMqttClient == null) { + mqttConfiguration.logger.d("MqttClient", "MqttClient is not initialised") + return + } + androidMqttClient?.disconnect() + adaptiveMqttClient?.disconnect() } + @Synchronized + fun destroy() { + if (androidMqttClient == null) { + mqttConfiguration.logger.d("MqttClient", "MqttClient is not initialised") + return + } + androidMqttClient?.destroy() + adaptiveMqttClient?.destroy() + } + + @Synchronized fun reconnect() { - androidMqttClient.reconnect() + if (androidMqttClient == null) { + mqttConfiguration.logger.d("MqttClient", "MqttClient is not initialised") + return + } + androidMqttClient?.reconnect() adaptiveMqttClient?.reconnect() } + @Synchronized fun subscribe(vararg topics: Pair) { - androidMqttClient.subscribe(mapOf(*topics)) + if (androidMqttClient == null) { + mqttConfiguration.logger.d("MqttClient", "MqttClient is not initialised") + return + } + androidMqttClient?.subscribe(mapOf(*topics)) } + @Synchronized fun unsubscribe(vararg topics: String) { - androidMqttClient.unsubscribe(listOf(*topics)) + if (androidMqttClient == null) { + mqttConfiguration.logger.d("MqttClient", "MqttClient is not initialised") + return + } + androidMqttClient?.unsubscribe(listOf(*topics)) } + @Synchronized fun send(mqttPacket: MqttPacket, sendMessageCallback: SendMessageCallback): Boolean { - return androidMqttClient.send(mqttPacket, sendMessageCallback) + if (androidMqttClient == null) { + mqttConfiguration.logger.d("MqttClient", "MqttClient is not initialised") + return false + } + return androidMqttClient?.send(mqttPacket, sendMessageCallback) ?: false } + @Synchronized fun addMessageListener(topic: String, listener: MessageListener) { - return androidMqttClient.addMessageListener(topic, listener) + if (androidMqttClient == null) { + mqttConfiguration.logger.d("MqttClient", "MqttClient is not initialised") + return + } + androidMqttClient?.addMessageListener(topic, listener) } + @Synchronized fun removeMessageListener(topic: String, listener: MessageListener) { - return androidMqttClient.removeMessageListener(topic, listener) + if (androidMqttClient == null) { + mqttConfiguration.logger.d("MqttClient", "MqttClient is not initialised") + return + } + androidMqttClient?.removeMessageListener(topic, listener) } + @Synchronized fun addGlobalMessageListener(listener: MessageListener) { - return androidMqttClient.addGlobalMessageListener(listener) + if (androidMqttClient == null) { + mqttConfiguration.logger.d("MqttClient", "MqttClient is not initialised") + return + } + androidMqttClient?.addGlobalMessageListener(listener) } + @Synchronized fun getCurrentState(): ConnectionState { - return androidMqttClient.getCurrentState() + if (androidMqttClient == null) { + mqttConfiguration.logger.d("MqttClient", "MqttClient is not initialised") + return UNINITIALISED + } + return androidMqttClient?.getCurrentState() ?: UNINITIALISED } private fun initialiseAdaptiveMqttClient() { @@ -177,4 +245,10 @@ internal class MqttClientInternal( fun removeEventHandler(eventHandler: EventHandler) { this.eventHandler.removeEventHandler(eventHandler) } + + @Synchronized + private fun cleanup() { + androidMqttClient = null + adaptiveMqttClient = null + } } diff --git a/mqtt-client/src/main/java/com/gojek/mqtt/client/model/ConnectionState.kt b/mqtt-client/src/main/java/com/gojek/mqtt/client/model/ConnectionState.kt index c4318ded..5cf368c0 100644 --- a/mqtt-client/src/main/java/com/gojek/mqtt/client/model/ConnectionState.kt +++ b/mqtt-client/src/main/java/com/gojek/mqtt/client/model/ConnectionState.kt @@ -3,6 +3,10 @@ package com.gojek.mqtt.client.model import com.gojek.mqtt.client.MqttClient enum class ConnectionState { + /** + * Represents state when [MqttClient] is not created/initialised + */ + UNINITIALISED, /** * Represents state when [MqttClient] is created/initialised */ diff --git a/mqtt-client/src/main/java/com/gojek/mqtt/client/v3/IAndroidMqttClient.kt b/mqtt-client/src/main/java/com/gojek/mqtt/client/v3/IAndroidMqttClient.kt index feb6b164..d5e4b17a 100644 --- a/mqtt-client/src/main/java/com/gojek/mqtt/client/v3/IAndroidMqttClient.kt +++ b/mqtt-client/src/main/java/com/gojek/mqtt/client/v3/IAndroidMqttClient.kt @@ -12,7 +12,8 @@ internal interface IAndroidMqttClient { fun connect(connectOptions: MqttConnectOptions) fun connect(timeMillis: Long = MQTT_WAIT_BEFORE_RECONNECT_TIME_MS) fun reconnect() - fun disconnect(clearState: Boolean = false) + fun disconnect() + fun destroy() fun send(mqttPacket: MqttPacket, sendMessageCallback: SendMessageCallback): Boolean fun addMessageListener(topic: String, listener: MessageListener) fun removeMessageListener(topic: String, listener: MessageListener) diff --git a/mqtt-client/src/main/java/com/gojek/mqtt/client/v3/impl/AndroidMqttClient.kt b/mqtt-client/src/main/java/com/gojek/mqtt/client/v3/impl/AndroidMqttClient.kt index 0807e68a..355039d7 100644 --- a/mqtt-client/src/main/java/com/gojek/mqtt/client/v3/impl/AndroidMqttClient.kt +++ b/mqtt-client/src/main/java/com/gojek/mqtt/client/v3/impl/AndroidMqttClient.kt @@ -45,6 +45,7 @@ import com.gojek.mqtt.constants.MESSAGE import com.gojek.mqtt.constants.MSG_APP_PUBLISH import com.gojek.mqtt.event.EventHandler import com.gojek.mqtt.event.MqttEvent.AuthenticatorErrorEvent +import com.gojek.mqtt.event.MqttEvent.MqttClientDestroyedEvent import com.gojek.mqtt.event.MqttEvent.MqttConnectDiscardedEvent import com.gojek.mqtt.event.MqttEvent.MqttConnectFailureEvent import com.gojek.mqtt.event.MqttEvent.MqttDisconnectEvent @@ -206,7 +207,9 @@ internal class AndroidMqttClient( experimentConfigs.incomingMessagesCleanupIntervalSecs, clock ) - networkHandler.init() + if (experimentConfigs.cleanMqttClientOnDestroy.not()) { + networkHandler.init() + } } // This can be invoked on any thread @@ -214,6 +217,9 @@ internal class AndroidMqttClient( connectOptions: MqttConnectOptions ) { this.connectOptions = connectOptions + if (experimentConfigs.cleanMqttClientOnDestroy) { + networkHandler.init() + } isInitialised = true runnableScheduler.connectMqtt() } @@ -221,13 +227,19 @@ internal class AndroidMqttClient( // This can be invoked on any thread override fun reconnect() { eventHandler.onEvent(MqttReconnectEvent()) - runnableScheduler.disconnectMqtt(true) + runnableScheduler.disconnectMqtt(reconnect = true, clearState = false) } // This can be invoked on any thread - override fun disconnect(clearState: Boolean) { + override fun disconnect() { isInitialised = false - runnableScheduler.disconnectMqtt(false, clearState) + runnableScheduler.disconnectMqtt(reconnect = false, clearState = false) + } + + // This can be invoked on any thread + override fun destroy() { + isInitialised = false + runnableScheduler.disconnectMqtt(reconnect = false, clearState = true) } // This can be invoked on any thread @@ -417,6 +429,11 @@ internal class AndroidMqttClient( mqttConnection.shutDown() subscriptionStore.clear() mqttPersistence.clearAll() + if (experimentConfigs.cleanMqttClientOnDestroy) { + runnableScheduler.stopThread() + networkHandler.destroy() + eventHandler.onEvent(MqttClientDestroyedEvent()) + } } } @@ -572,7 +589,7 @@ internal class AndroidMqttClient( MqttMessageReceiveErrorEvent(topic, byteArray.size, e.toCourierException()) ) logger.e(TAG, "Exception when msg arrived : ", e) - runnableScheduler.disconnectMqtt(true) + runnableScheduler.disconnectMqtt(reconnect = true, clearState = false) return false } catch (e: Throwable) { eventHandler.onEvent( diff --git a/mqtt-client/src/main/java/com/gojek/mqtt/connection/MqttConnection.kt b/mqtt-client/src/main/java/com/gojek/mqtt/connection/MqttConnection.kt index 8d040836..485594ed 100644 --- a/mqtt-client/src/main/java/com/gojek/mqtt/connection/MqttConnection.kt +++ b/mqtt-client/src/main/java/com/gojek/mqtt/connection/MqttConnection.kt @@ -603,7 +603,7 @@ internal class MqttConnection( throwable = throwable, timeTakenMillis = (clock.nanoTime() - context.startTime).fromNanosToMillis() ) - runnableScheduler.disconnectMqtt(true) + runnableScheduler.disconnectMqtt(reconnect = true, clearState = false) } } } @@ -638,7 +638,7 @@ internal class MqttConnection( throwable = throwable, timeTakenMillis = (clock.nanoTime() - context.startTime).fromNanosToMillis() ) - runnableScheduler.disconnectMqtt(true) + runnableScheduler.disconnectMqtt(reconnect = true, clearState = false) } } } diff --git a/mqtt-client/src/main/java/com/gojek/mqtt/event/MqttEvent.kt b/mqtt-client/src/main/java/com/gojek/mqtt/event/MqttEvent.kt index 7e114e29..4c46cf80 100644 --- a/mqtt-client/src/main/java/com/gojek/mqtt/event/MqttEvent.kt +++ b/mqtt-client/src/main/java/com/gojek/mqtt/event/MqttEvent.kt @@ -251,6 +251,10 @@ sealed class MqttEvent(open var connectionInfo: ConnectionInfo?) { override var connectionInfo: ConnectionInfo? = null ) : MqttEvent(connectionInfo) + data class MqttClientDestroyedEvent( + override var connectionInfo: ConnectionInfo? = null + ) : MqttEvent(connectionInfo) + data class OfflineMessageDiscardedEvent( val messageId: Int, override var connectionInfo: ConnectionInfo? = null diff --git a/mqtt-client/src/main/java/com/gojek/mqtt/scheduler/IRunnableScheduler.kt b/mqtt-client/src/main/java/com/gojek/mqtt/scheduler/IRunnableScheduler.kt index 70b8b93c..5793b4ab 100644 --- a/mqtt-client/src/main/java/com/gojek/mqtt/scheduler/IRunnableScheduler.kt +++ b/mqtt-client/src/main/java/com/gojek/mqtt/scheduler/IRunnableScheduler.kt @@ -7,7 +7,7 @@ internal interface IRunnableScheduler { fun connectMqtt(timeMillis: Long) - fun disconnectMqtt(reconnect: Boolean, clearState: Boolean = false) + fun disconnectMqtt(reconnect: Boolean, clearState: Boolean) fun scheduleNextActivityCheck() @@ -24,4 +24,6 @@ internal interface IRunnableScheduler { fun scheduleResetParams(delayMillis: Long) fun scheduleAuthFailureRunnable(delayMillis: Long) + + fun stopThread() } diff --git a/mqtt-client/src/main/java/com/gojek/mqtt/scheduler/MqttRunnableScheduler.kt b/mqtt-client/src/main/java/com/gojek/mqtt/scheduler/MqttRunnableScheduler.kt index f1be998d..4ec429d6 100644 --- a/mqtt-client/src/main/java/com/gojek/mqtt/scheduler/MqttRunnableScheduler.kt +++ b/mqtt-client/src/main/java/com/gojek/mqtt/scheduler/MqttRunnableScheduler.kt @@ -154,6 +154,10 @@ internal class MqttRunnableScheduler( } } + override fun stopThread() { + handlerThread.quitSafely() + } + private fun sendThreadEventIfNotAlive() { if (handlerThread.isAlive.not()) { eventHandler.onEvent( From f9439343dd310f6f17ec821a21752ee468c512b1 Mon Sep 17 00:00:00 2001 From: deepanshu Date: Tue, 5 Dec 2023 12:23:06 +0530 Subject: [PATCH 2/6] Update unit tests --- .../com/gojek/mqtt/client/model/ConnectionState.kt | 1 + .../com/gojek/mqtt/client/MqttCourierClientTest.kt | 11 +++++++++-- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/mqtt-client/src/main/java/com/gojek/mqtt/client/model/ConnectionState.kt b/mqtt-client/src/main/java/com/gojek/mqtt/client/model/ConnectionState.kt index 5cf368c0..cc0a1a3d 100644 --- a/mqtt-client/src/main/java/com/gojek/mqtt/client/model/ConnectionState.kt +++ b/mqtt-client/src/main/java/com/gojek/mqtt/client/model/ConnectionState.kt @@ -7,6 +7,7 @@ enum class ConnectionState { * Represents state when [MqttClient] is not created/initialised */ UNINITIALISED, + /** * Represents state when [MqttClient] is created/initialised */ diff --git a/mqtt-client/src/test/java/com/gojek/mqtt/client/MqttCourierClientTest.kt b/mqtt-client/src/test/java/com/gojek/mqtt/client/MqttCourierClientTest.kt index ccbdef06..e8a1e6af 100644 --- a/mqtt-client/src/test/java/com/gojek/mqtt/client/MqttCourierClientTest.kt +++ b/mqtt-client/src/test/java/com/gojek/mqtt/client/MqttCourierClientTest.kt @@ -42,10 +42,17 @@ class MqttCourierClientTest { } @Test - fun `test disconnect`() { + fun `test disconnect with clearState=false`() { val clearState = false mqttCourierClient.disconnect(clearState) - verify(mqttClientInternal).disconnect(clearState) + verify(mqttClientInternal).disconnect() + } + + @Test + fun `test disconnect with clearState=true`() { + val clearState = true + mqttCourierClient.disconnect(clearState) + verify(mqttClientInternal).destroy() } @Test From 4977005c1453e858393c133d6edb5c2835e4b759 Mon Sep 17 00:00:00 2001 From: deepanshu Date: Tue, 5 Dec 2023 12:32:32 +0530 Subject: [PATCH 3/6] Update APIs --- mqtt-client/api/mqtt-client.api | 25 +++++++++++++++++++++---- 1 file changed, 21 insertions(+), 4 deletions(-) diff --git a/mqtt-client/api/mqtt-client.api b/mqtt-client/api/mqtt-client.api index 382de056..095090cd 100644 --- a/mqtt-client/api/mqtt-client.api +++ b/mqtt-client/api/mqtt-client.api @@ -29,10 +29,11 @@ public abstract interface class com/gojek/mqtt/client/MqttInterceptor { public final class com/gojek/mqtt/client/config/ExperimentConfigs { public fun ()V - public fun (Lcom/gojek/mqtt/client/config/SubscriptionStore;Lcom/gojek/mqtt/model/AdaptiveKeepAliveConfig;IIIIJJZI)V - public synthetic fun (Lcom/gojek/mqtt/client/config/SubscriptionStore;Lcom/gojek/mqtt/model/AdaptiveKeepAliveConfig;IIIIJJZIILkotlin/jvm/internal/DefaultConstructorMarker;)V + public fun (Lcom/gojek/mqtt/client/config/SubscriptionStore;Lcom/gojek/mqtt/model/AdaptiveKeepAliveConfig;IIIIJJZIZ)V + public synthetic fun (Lcom/gojek/mqtt/client/config/SubscriptionStore;Lcom/gojek/mqtt/model/AdaptiveKeepAliveConfig;IIIIJJZIZILkotlin/jvm/internal/DefaultConstructorMarker;)V public final fun component1 ()Lcom/gojek/mqtt/client/config/SubscriptionStore; public final fun component10 ()I + public final fun component11 ()Z public final fun component2 ()Lcom/gojek/mqtt/model/AdaptiveKeepAliveConfig; public final fun component3 ()I public final fun component4 ()I @@ -41,11 +42,12 @@ public final class com/gojek/mqtt/client/config/ExperimentConfigs { public final fun component7 ()J public final fun component8 ()J public final fun component9 ()Z - public final fun copy (Lcom/gojek/mqtt/client/config/SubscriptionStore;Lcom/gojek/mqtt/model/AdaptiveKeepAliveConfig;IIIIJJZI)Lcom/gojek/mqtt/client/config/ExperimentConfigs; - public static synthetic fun copy$default (Lcom/gojek/mqtt/client/config/ExperimentConfigs;Lcom/gojek/mqtt/client/config/SubscriptionStore;Lcom/gojek/mqtt/model/AdaptiveKeepAliveConfig;IIIIJJZIILjava/lang/Object;)Lcom/gojek/mqtt/client/config/ExperimentConfigs; + public final fun copy (Lcom/gojek/mqtt/client/config/SubscriptionStore;Lcom/gojek/mqtt/model/AdaptiveKeepAliveConfig;IIIIJJZIZ)Lcom/gojek/mqtt/client/config/ExperimentConfigs; + public static synthetic fun copy$default (Lcom/gojek/mqtt/client/config/ExperimentConfigs;Lcom/gojek/mqtt/client/config/SubscriptionStore;Lcom/gojek/mqtt/model/AdaptiveKeepAliveConfig;IIIIJJZIZILjava/lang/Object;)Lcom/gojek/mqtt/client/config/ExperimentConfigs; public fun equals (Ljava/lang/Object;)Z public final fun getActivityCheckIntervalSeconds ()I public final fun getAdaptiveKeepAliveConfig ()Lcom/gojek/mqtt/model/AdaptiveKeepAliveConfig; + public final fun getCleanMqttClientOnDestroy ()Z public final fun getConnectPacketTimeoutSeconds ()I public final fun getInactivityTimeoutSeconds ()I public final fun getIncomingMessagesCleanupIntervalSecs ()J @@ -177,6 +179,7 @@ public final class com/gojek/mqtt/client/model/ConnectionState : java/lang/Enum public static final field DISCONNECTED Lcom/gojek/mqtt/client/model/ConnectionState; public static final field DISCONNECTING Lcom/gojek/mqtt/client/model/ConnectionState; public static final field INITIALISED Lcom/gojek/mqtt/client/model/ConnectionState; + public static final field UNINITIALISED Lcom/gojek/mqtt/client/model/ConnectionState; public static fun valueOf (Ljava/lang/String;)Lcom/gojek/mqtt/client/model/ConnectionState; public static fun values ()[Lcom/gojek/mqtt/client/model/ConnectionState; } @@ -325,6 +328,20 @@ public final class com/gojek/mqtt/event/MqttEvent$InboundInactivityEvent : com/g public fun toString ()Ljava/lang/String; } +public final class com/gojek/mqtt/event/MqttEvent$MqttClientDestroyedEvent : com/gojek/mqtt/event/MqttEvent { + public fun ()V + public fun (Lcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;)V + public synthetic fun (Lcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;ILkotlin/jvm/internal/DefaultConstructorMarker;)V + public final fun component1 ()Lcom/gojek/mqtt/client/connectioninfo/ConnectionInfo; + public final fun copy (Lcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;)Lcom/gojek/mqtt/event/MqttEvent$MqttClientDestroyedEvent; + public static synthetic fun copy$default (Lcom/gojek/mqtt/event/MqttEvent$MqttClientDestroyedEvent;Lcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;ILjava/lang/Object;)Lcom/gojek/mqtt/event/MqttEvent$MqttClientDestroyedEvent; + public fun equals (Ljava/lang/Object;)Z + public fun getConnectionInfo ()Lcom/gojek/mqtt/client/connectioninfo/ConnectionInfo; + public fun hashCode ()I + public fun setConnectionInfo (Lcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;)V + public fun toString ()Ljava/lang/String; +} + public final class com/gojek/mqtt/event/MqttEvent$MqttConnectAttemptEvent : com/gojek/mqtt/event/MqttEvent { public fun (ZLcom/gojek/mqtt/network/ActiveNetInfo;Lcom/gojek/mqtt/model/ServerUri;Lcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;)V public synthetic fun (ZLcom/gojek/mqtt/network/ActiveNetInfo;Lcom/gojek/mqtt/model/ServerUri;Lcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;ILkotlin/jvm/internal/DefaultConstructorMarker;)V From cc5664751c3db5e4dafea97690a304326c8eb586 Mon Sep 17 00:00:00 2001 From: deepanshu Date: Wed, 6 Dec 2023 09:19:06 +0530 Subject: [PATCH 4/6] Replace null checks with state checks --- .../client/internal/MqttClientInternal.kt | 53 ++++++++----------- 1 file changed, 22 insertions(+), 31 deletions(-) diff --git a/mqtt-client/src/main/java/com/gojek/mqtt/client/internal/MqttClientInternal.kt b/mqtt-client/src/main/java/com/gojek/mqtt/client/internal/MqttClientInternal.kt index 7fa9f83e..f04a2101 100644 --- a/mqtt-client/src/main/java/com/gojek/mqtt/client/internal/MqttClientInternal.kt +++ b/mqtt-client/src/main/java/com/gojek/mqtt/client/internal/MqttClientInternal.kt @@ -12,14 +12,13 @@ import com.gojek.keepalive.config.AdaptiveKeepAliveConfig as AdaptiveKAConfig import com.gojek.mqtt.client.config.v3.MqttV3Configuration import com.gojek.mqtt.client.event.interceptor.MqttEventHandler import com.gojek.mqtt.client.factory.getAndroidMqttClientFactory +import com.gojek.mqtt.client.internal.State.INITIALISED +import com.gojek.mqtt.client.internal.State.UNINITIALISED import com.gojek.mqtt.client.listener.MessageListener import com.gojek.mqtt.client.model.ConnectionState -import com.gojek.mqtt.client.model.ConnectionState.UNINITIALISED import com.gojek.mqtt.client.v3.IAndroidMqttClient import com.gojek.mqtt.event.AdaptivePingEventHandler import com.gojek.mqtt.event.EventHandler -import com.gojek.mqtt.event.MqttEvent -import com.gojek.mqtt.event.MqttEvent.MqttClientDestroyedEvent import com.gojek.mqtt.event.MqttEvent.OptimalKeepAliveFoundEvent import com.gojek.mqtt.event.PingEventHandler import com.gojek.mqtt.model.AdaptiveKeepAliveConfig @@ -48,6 +47,8 @@ internal class MqttClientInternal( private val eventHandler = MqttEventHandler(MqttUtils()) + private var initialisationState = UNINITIALISED + private val optimalKeepAliveObserver = object : OptimalKeepAliveObserver { override fun onOptimalKeepAliveFound( timeMinutes: Int, @@ -66,19 +67,9 @@ internal class MqttClientInternal( } } - init { - eventHandler.addEventHandler(object : EventHandler { - override fun onEvent(mqttEvent: MqttEvent) { - if (mqttEvent is MqttClientDestroyedEvent) { - cleanup() - } - } - }) - } - @Synchronized fun connect(connectOptions: MqttConnectOptions) { - if (androidMqttClient == null) { + if (initialisationState == UNINITIALISED) { initialiseAdaptiveMqttClient() androidMqttClient = androidMqttClientFactory.createAndroidMqttClient( context = context, @@ -89,6 +80,7 @@ internal class MqttClientInternal( eventHandler = eventHandler, pingEventHandler = PingEventHandler(eventHandler) ) + initialisationState = INITIALISED } androidMqttClient?.connect(connectOptions) adaptiveMqttClient?.connect(connectOptions) @@ -96,7 +88,7 @@ internal class MqttClientInternal( @Synchronized fun disconnect() { - if (androidMqttClient == null) { + if (initialisationState == UNINITIALISED) { mqttConfiguration.logger.d("MqttClient", "MqttClient is not initialised") return } @@ -106,17 +98,18 @@ internal class MqttClientInternal( @Synchronized fun destroy() { - if (androidMqttClient == null) { + if (initialisationState == UNINITIALISED) { mqttConfiguration.logger.d("MqttClient", "MqttClient is not initialised") return } androidMqttClient?.destroy() adaptiveMqttClient?.destroy() + initialisationState = UNINITIALISED } @Synchronized fun reconnect() { - if (androidMqttClient == null) { + if (initialisationState == UNINITIALISED) { mqttConfiguration.logger.d("MqttClient", "MqttClient is not initialised") return } @@ -126,7 +119,7 @@ internal class MqttClientInternal( @Synchronized fun subscribe(vararg topics: Pair) { - if (androidMqttClient == null) { + if (initialisationState == UNINITIALISED) { mqttConfiguration.logger.d("MqttClient", "MqttClient is not initialised") return } @@ -135,7 +128,7 @@ internal class MqttClientInternal( @Synchronized fun unsubscribe(vararg topics: String) { - if (androidMqttClient == null) { + if (initialisationState == UNINITIALISED) { mqttConfiguration.logger.d("MqttClient", "MqttClient is not initialised") return } @@ -144,7 +137,7 @@ internal class MqttClientInternal( @Synchronized fun send(mqttPacket: MqttPacket, sendMessageCallback: SendMessageCallback): Boolean { - if (androidMqttClient == null) { + if (initialisationState == UNINITIALISED) { mqttConfiguration.logger.d("MqttClient", "MqttClient is not initialised") return false } @@ -153,7 +146,7 @@ internal class MqttClientInternal( @Synchronized fun addMessageListener(topic: String, listener: MessageListener) { - if (androidMqttClient == null) { + if (initialisationState == UNINITIALISED) { mqttConfiguration.logger.d("MqttClient", "MqttClient is not initialised") return } @@ -162,7 +155,7 @@ internal class MqttClientInternal( @Synchronized fun removeMessageListener(topic: String, listener: MessageListener) { - if (androidMqttClient == null) { + if (initialisationState == UNINITIALISED) { mqttConfiguration.logger.d("MqttClient", "MqttClient is not initialised") return } @@ -171,7 +164,7 @@ internal class MqttClientInternal( @Synchronized fun addGlobalMessageListener(listener: MessageListener) { - if (androidMqttClient == null) { + if (initialisationState == UNINITIALISED) { mqttConfiguration.logger.d("MqttClient", "MqttClient is not initialised") return } @@ -180,11 +173,11 @@ internal class MqttClientInternal( @Synchronized fun getCurrentState(): ConnectionState { - if (androidMqttClient == null) { + if (initialisationState == UNINITIALISED) { mqttConfiguration.logger.d("MqttClient", "MqttClient is not initialised") - return UNINITIALISED + return ConnectionState.UNINITIALISED } - return androidMqttClient?.getCurrentState() ?: UNINITIALISED + return androidMqttClient?.getCurrentState() ?: ConnectionState.UNINITIALISED } private fun initialiseAdaptiveMqttClient() { @@ -245,10 +238,8 @@ internal class MqttClientInternal( fun removeEventHandler(eventHandler: EventHandler) { this.eventHandler.removeEventHandler(eventHandler) } +} - @Synchronized - private fun cleanup() { - androidMqttClient = null - adaptiveMqttClient = null - } +private enum class State { + UNINITIALISED, INITIALISED } From dba0bfd24061a8882faa45bd77109e931c500147 Mon Sep 17 00:00:00 2001 From: deepanshu Date: Wed, 6 Dec 2023 09:34:19 +0530 Subject: [PATCH 5/6] Add event when operations are discarded --- .../mqtt/client/connectioninfo/ConnectionInfoStore.kt | 4 ++++ .../gojek/mqtt/client/internal/MqttClientInternal.kt | 10 ++++++++++ .../com/gojek/mqtt/client/v3/impl/AndroidMqttClient.kt | 1 + .../src/main/java/com/gojek/mqtt/event/MqttEvent.kt | 6 ++++++ 4 files changed, 21 insertions(+) diff --git a/mqtt-client/src/main/java/com/gojek/mqtt/client/connectioninfo/ConnectionInfoStore.kt b/mqtt-client/src/main/java/com/gojek/mqtt/client/connectioninfo/ConnectionInfoStore.kt index 7779e311..9acb268a 100644 --- a/mqtt-client/src/main/java/com/gojek/mqtt/client/connectioninfo/ConnectionInfoStore.kt +++ b/mqtt-client/src/main/java/com/gojek/mqtt/client/connectioninfo/ConnectionInfoStore.kt @@ -11,6 +11,10 @@ internal object ConnectionInfoStore { fun updateConnectionInfo(connectionInfo: ConnectionInfo) { state = State.InitialisedState(connectionInfo) } + + fun removeConnectionInfo() { + state = State.UninitialisedState + } } private sealed class State( diff --git a/mqtt-client/src/main/java/com/gojek/mqtt/client/internal/MqttClientInternal.kt b/mqtt-client/src/main/java/com/gojek/mqtt/client/internal/MqttClientInternal.kt index f04a2101..5ae69ceb 100644 --- a/mqtt-client/src/main/java/com/gojek/mqtt/client/internal/MqttClientInternal.kt +++ b/mqtt-client/src/main/java/com/gojek/mqtt/client/internal/MqttClientInternal.kt @@ -19,6 +19,7 @@ import com.gojek.mqtt.client.model.ConnectionState import com.gojek.mqtt.client.v3.IAndroidMqttClient import com.gojek.mqtt.event.AdaptivePingEventHandler import com.gojek.mqtt.event.EventHandler +import com.gojek.mqtt.event.MqttEvent.OperationDiscardedEvent import com.gojek.mqtt.event.MqttEvent.OptimalKeepAliveFoundEvent import com.gojek.mqtt.event.PingEventHandler import com.gojek.mqtt.model.AdaptiveKeepAliveConfig @@ -90,6 +91,7 @@ internal class MqttClientInternal( fun disconnect() { if (initialisationState == UNINITIALISED) { mqttConfiguration.logger.d("MqttClient", "MqttClient is not initialised") + eventHandler.onEvent(OperationDiscardedEvent("Disconnect", "State uninitialised")) return } androidMqttClient?.disconnect() @@ -100,6 +102,7 @@ internal class MqttClientInternal( fun destroy() { if (initialisationState == UNINITIALISED) { mqttConfiguration.logger.d("MqttClient", "MqttClient is not initialised") + eventHandler.onEvent(OperationDiscardedEvent("Destroy", "State uninitialised")) return } androidMqttClient?.destroy() @@ -111,6 +114,7 @@ internal class MqttClientInternal( fun reconnect() { if (initialisationState == UNINITIALISED) { mqttConfiguration.logger.d("MqttClient", "MqttClient is not initialised") + eventHandler.onEvent(OperationDiscardedEvent("Reconnect", "State uninitialised")) return } androidMqttClient?.reconnect() @@ -121,6 +125,7 @@ internal class MqttClientInternal( fun subscribe(vararg topics: Pair) { if (initialisationState == UNINITIALISED) { mqttConfiguration.logger.d("MqttClient", "MqttClient is not initialised") + eventHandler.onEvent(OperationDiscardedEvent("Subscribe", "State uninitialised")) return } androidMqttClient?.subscribe(mapOf(*topics)) @@ -130,6 +135,7 @@ internal class MqttClientInternal( fun unsubscribe(vararg topics: String) { if (initialisationState == UNINITIALISED) { mqttConfiguration.logger.d("MqttClient", "MqttClient is not initialised") + eventHandler.onEvent(OperationDiscardedEvent("Unsubscribe", "State uninitialised")) return } androidMqttClient?.unsubscribe(listOf(*topics)) @@ -139,6 +145,7 @@ internal class MqttClientInternal( fun send(mqttPacket: MqttPacket, sendMessageCallback: SendMessageCallback): Boolean { if (initialisationState == UNINITIALISED) { mqttConfiguration.logger.d("MqttClient", "MqttClient is not initialised") + eventHandler.onEvent(OperationDiscardedEvent("SendMessage", "State uninitialised")) return false } return androidMqttClient?.send(mqttPacket, sendMessageCallback) ?: false @@ -148,6 +155,7 @@ internal class MqttClientInternal( fun addMessageListener(topic: String, listener: MessageListener) { if (initialisationState == UNINITIALISED) { mqttConfiguration.logger.d("MqttClient", "MqttClient is not initialised") + eventHandler.onEvent(OperationDiscardedEvent("AddMessageListener", "State uninitialised")) return } androidMqttClient?.addMessageListener(topic, listener) @@ -157,6 +165,7 @@ internal class MqttClientInternal( fun removeMessageListener(topic: String, listener: MessageListener) { if (initialisationState == UNINITIALISED) { mqttConfiguration.logger.d("MqttClient", "MqttClient is not initialised") + eventHandler.onEvent(OperationDiscardedEvent("RemoveMessageListener", "State uninitialised")) return } androidMqttClient?.removeMessageListener(topic, listener) @@ -166,6 +175,7 @@ internal class MqttClientInternal( fun addGlobalMessageListener(listener: MessageListener) { if (initialisationState == UNINITIALISED) { mqttConfiguration.logger.d("MqttClient", "MqttClient is not initialised") + eventHandler.onEvent(OperationDiscardedEvent("AddGlobalMessageListener", "State uninitialised")) return } androidMqttClient?.addGlobalMessageListener(listener) diff --git a/mqtt-client/src/main/java/com/gojek/mqtt/client/v3/impl/AndroidMqttClient.kt b/mqtt-client/src/main/java/com/gojek/mqtt/client/v3/impl/AndroidMqttClient.kt index 355039d7..9d47d85a 100644 --- a/mqtt-client/src/main/java/com/gojek/mqtt/client/v3/impl/AndroidMqttClient.kt +++ b/mqtt-client/src/main/java/com/gojek/mqtt/client/v3/impl/AndroidMqttClient.kt @@ -433,6 +433,7 @@ internal class AndroidMqttClient( runnableScheduler.stopThread() networkHandler.destroy() eventHandler.onEvent(MqttClientDestroyedEvent()) + ConnectionInfoStore.removeConnectionInfo() } } } diff --git a/mqtt-client/src/main/java/com/gojek/mqtt/event/MqttEvent.kt b/mqtt-client/src/main/java/com/gojek/mqtt/event/MqttEvent.kt index 4c46cf80..75fe1fbb 100644 --- a/mqtt-client/src/main/java/com/gojek/mqtt/event/MqttEvent.kt +++ b/mqtt-client/src/main/java/com/gojek/mqtt/event/MqttEvent.kt @@ -290,4 +290,10 @@ sealed class MqttEvent(open var connectionInfo: ConnectionInfo?) { val timeTakenMillis: Long, override var connectionInfo: ConnectionInfo? = null ) : MqttEvent(connectionInfo) + + data class OperationDiscardedEvent( + val name: String, + val reason: String, + override var connectionInfo: ConnectionInfo? = null + ) : MqttEvent(connectionInfo) } From f4d89ae5fbcb215bb77df016e1a6117c13923053 Mon Sep 17 00:00:00 2001 From: deepanshu Date: Wed, 6 Dec 2023 13:21:08 +0530 Subject: [PATCH 6/6] Update the default behaviour --- mqtt-client/api/mqtt-client.api | 17 +++++++++++++++++ .../mqtt/client/internal/MqttClientInternal.kt | 18 +++++++++++++++++- 2 files changed, 34 insertions(+), 1 deletion(-) diff --git a/mqtt-client/api/mqtt-client.api b/mqtt-client/api/mqtt-client.api index 095090cd..ea7a7a6e 100644 --- a/mqtt-client/api/mqtt-client.api +++ b/mqtt-client/api/mqtt-client.api @@ -833,6 +833,23 @@ public final class com/gojek/mqtt/event/MqttEvent$OfflineMessageDiscardedEvent : public fun toString ()Ljava/lang/String; } +public final class com/gojek/mqtt/event/MqttEvent$OperationDiscardedEvent : com/gojek/mqtt/event/MqttEvent { + public fun (Ljava/lang/String;Ljava/lang/String;Lcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;)V + public synthetic fun (Ljava/lang/String;Ljava/lang/String;Lcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;ILkotlin/jvm/internal/DefaultConstructorMarker;)V + public final fun component1 ()Ljava/lang/String; + public final fun component2 ()Ljava/lang/String; + public final fun component3 ()Lcom/gojek/mqtt/client/connectioninfo/ConnectionInfo; + public final fun copy (Ljava/lang/String;Ljava/lang/String;Lcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;)Lcom/gojek/mqtt/event/MqttEvent$OperationDiscardedEvent; + public static synthetic fun copy$default (Lcom/gojek/mqtt/event/MqttEvent$OperationDiscardedEvent;Ljava/lang/String;Ljava/lang/String;Lcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;ILjava/lang/Object;)Lcom/gojek/mqtt/event/MqttEvent$OperationDiscardedEvent; + public fun equals (Ljava/lang/Object;)Z + public fun getConnectionInfo ()Lcom/gojek/mqtt/client/connectioninfo/ConnectionInfo; + public final fun getName ()Ljava/lang/String; + public final fun getReason ()Ljava/lang/String; + public fun hashCode ()I + public fun setConnectionInfo (Lcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;)V + public fun toString ()Ljava/lang/String; +} + public final class com/gojek/mqtt/event/MqttEvent$OptimalKeepAliveFoundEvent : com/gojek/mqtt/event/MqttEvent { public fun (IIILcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;)V public synthetic fun (IIILcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;ILkotlin/jvm/internal/DefaultConstructorMarker;)V diff --git a/mqtt-client/src/main/java/com/gojek/mqtt/client/internal/MqttClientInternal.kt b/mqtt-client/src/main/java/com/gojek/mqtt/client/internal/MqttClientInternal.kt index 5ae69ceb..cbe33cdb 100644 --- a/mqtt-client/src/main/java/com/gojek/mqtt/client/internal/MqttClientInternal.kt +++ b/mqtt-client/src/main/java/com/gojek/mqtt/client/internal/MqttClientInternal.kt @@ -68,6 +68,20 @@ internal class MqttClientInternal( } } + init { + initialiseAdaptiveMqttClient() + androidMqttClient = androidMqttClientFactory.createAndroidMqttClient( + context = context, + mqttConfiguration = mqttConfiguration, + networkStateTracker = networkStateTracker, + keepAliveProvider = keepAliveProvider, + keepAliveFailureHandler = keepAliveFailureHandler, + eventHandler = eventHandler, + pingEventHandler = PingEventHandler(eventHandler) + ) + initialisationState = INITIALISED + } + @Synchronized fun connect(connectOptions: MqttConnectOptions) { if (initialisationState == UNINITIALISED) { @@ -107,7 +121,9 @@ internal class MqttClientInternal( } androidMqttClient?.destroy() adaptiveMqttClient?.destroy() - initialisationState = UNINITIALISED + if (mqttConfiguration.experimentConfigs.cleanMqttClientOnDestroy) { + initialisationState = UNINITIALISED + } } @Synchronized