Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup AndroidMqttClient on destroy #80

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions app/src/main/java/com/gojek/courier/app/ui/MainActivity.kt
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ class MainActivity : AppCompatActivity() {
mqttInterceptorList = listOf(MqttChuckInterceptor(this, MqttChuckConfig(retentionPeriod = Period.ONE_HOUR))),
persistenceOptions = PahoPersistenceOptions(100, false),
experimentConfigs = ExperimentConfigs(
cleanMqttClientOnDestroy = true,
adaptiveKeepAliveConfig = AdaptiveKeepAliveConfig(
lowerBoundMinutes = 1,
upperBoundMinutes = 9,
Expand Down
42 changes: 38 additions & 4 deletions mqtt-client/api/mqtt-client.api
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,11 @@ public abstract interface class com/gojek/mqtt/client/MqttInterceptor {

public final class com/gojek/mqtt/client/config/ExperimentConfigs {
public fun <init> ()V
public fun <init> (Lcom/gojek/mqtt/client/config/SubscriptionStore;Lcom/gojek/mqtt/model/AdaptiveKeepAliveConfig;IIIIJJZI)V
public synthetic fun <init> (Lcom/gojek/mqtt/client/config/SubscriptionStore;Lcom/gojek/mqtt/model/AdaptiveKeepAliveConfig;IIIIJJZIILkotlin/jvm/internal/DefaultConstructorMarker;)V
public fun <init> (Lcom/gojek/mqtt/client/config/SubscriptionStore;Lcom/gojek/mqtt/model/AdaptiveKeepAliveConfig;IIIIJJZIZ)V
public synthetic fun <init> (Lcom/gojek/mqtt/client/config/SubscriptionStore;Lcom/gojek/mqtt/model/AdaptiveKeepAliveConfig;IIIIJJZIZILkotlin/jvm/internal/DefaultConstructorMarker;)V
public final fun component1 ()Lcom/gojek/mqtt/client/config/SubscriptionStore;
public final fun component10 ()I
public final fun component11 ()Z
public final fun component2 ()Lcom/gojek/mqtt/model/AdaptiveKeepAliveConfig;
public final fun component3 ()I
public final fun component4 ()I
Expand All @@ -41,11 +42,12 @@ public final class com/gojek/mqtt/client/config/ExperimentConfigs {
public final fun component7 ()J
public final fun component8 ()J
public final fun component9 ()Z
public final fun copy (Lcom/gojek/mqtt/client/config/SubscriptionStore;Lcom/gojek/mqtt/model/AdaptiveKeepAliveConfig;IIIIJJZI)Lcom/gojek/mqtt/client/config/ExperimentConfigs;
public static synthetic fun copy$default (Lcom/gojek/mqtt/client/config/ExperimentConfigs;Lcom/gojek/mqtt/client/config/SubscriptionStore;Lcom/gojek/mqtt/model/AdaptiveKeepAliveConfig;IIIIJJZIILjava/lang/Object;)Lcom/gojek/mqtt/client/config/ExperimentConfigs;
public final fun copy (Lcom/gojek/mqtt/client/config/SubscriptionStore;Lcom/gojek/mqtt/model/AdaptiveKeepAliveConfig;IIIIJJZIZ)Lcom/gojek/mqtt/client/config/ExperimentConfigs;
public static synthetic fun copy$default (Lcom/gojek/mqtt/client/config/ExperimentConfigs;Lcom/gojek/mqtt/client/config/SubscriptionStore;Lcom/gojek/mqtt/model/AdaptiveKeepAliveConfig;IIIIJJZIZILjava/lang/Object;)Lcom/gojek/mqtt/client/config/ExperimentConfigs;
public fun equals (Ljava/lang/Object;)Z
public final fun getActivityCheckIntervalSeconds ()I
public final fun getAdaptiveKeepAliveConfig ()Lcom/gojek/mqtt/model/AdaptiveKeepAliveConfig;
public final fun getCleanMqttClientOnDestroy ()Z
public final fun getConnectPacketTimeoutSeconds ()I
public final fun getInactivityTimeoutSeconds ()I
public final fun getIncomingMessagesCleanupIntervalSecs ()J
Expand Down Expand Up @@ -177,6 +179,7 @@ public final class com/gojek/mqtt/client/model/ConnectionState : java/lang/Enum
public static final field DISCONNECTED Lcom/gojek/mqtt/client/model/ConnectionState;
public static final field DISCONNECTING Lcom/gojek/mqtt/client/model/ConnectionState;
public static final field INITIALISED Lcom/gojek/mqtt/client/model/ConnectionState;
public static final field UNINITIALISED Lcom/gojek/mqtt/client/model/ConnectionState;
public static fun valueOf (Ljava/lang/String;)Lcom/gojek/mqtt/client/model/ConnectionState;
public static fun values ()[Lcom/gojek/mqtt/client/model/ConnectionState;
}
Expand Down Expand Up @@ -325,6 +328,20 @@ public final class com/gojek/mqtt/event/MqttEvent$InboundInactivityEvent : com/g
public fun toString ()Ljava/lang/String;
}

public final class com/gojek/mqtt/event/MqttEvent$MqttClientDestroyedEvent : com/gojek/mqtt/event/MqttEvent {
public fun <init> ()V
public fun <init> (Lcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;)V
public synthetic fun <init> (Lcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
public final fun component1 ()Lcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;
public final fun copy (Lcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;)Lcom/gojek/mqtt/event/MqttEvent$MqttClientDestroyedEvent;
public static synthetic fun copy$default (Lcom/gojek/mqtt/event/MqttEvent$MqttClientDestroyedEvent;Lcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;ILjava/lang/Object;)Lcom/gojek/mqtt/event/MqttEvent$MqttClientDestroyedEvent;
public fun equals (Ljava/lang/Object;)Z
public fun getConnectionInfo ()Lcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;
public fun hashCode ()I
public fun setConnectionInfo (Lcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;)V
public fun toString ()Ljava/lang/String;
}

public final class com/gojek/mqtt/event/MqttEvent$MqttConnectAttemptEvent : com/gojek/mqtt/event/MqttEvent {
public fun <init> (ZLcom/gojek/mqtt/network/ActiveNetInfo;Lcom/gojek/mqtt/model/ServerUri;Lcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;)V
public synthetic fun <init> (ZLcom/gojek/mqtt/network/ActiveNetInfo;Lcom/gojek/mqtt/model/ServerUri;Lcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
Expand Down Expand Up @@ -816,6 +833,23 @@ public final class com/gojek/mqtt/event/MqttEvent$OfflineMessageDiscardedEvent :
public fun toString ()Ljava/lang/String;
}

public final class com/gojek/mqtt/event/MqttEvent$OperationDiscardedEvent : com/gojek/mqtt/event/MqttEvent {
public fun <init> (Ljava/lang/String;Ljava/lang/String;Lcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;)V
public synthetic fun <init> (Ljava/lang/String;Ljava/lang/String;Lcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
public final fun component1 ()Ljava/lang/String;
public final fun component2 ()Ljava/lang/String;
public final fun component3 ()Lcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;
public final fun copy (Ljava/lang/String;Ljava/lang/String;Lcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;)Lcom/gojek/mqtt/event/MqttEvent$OperationDiscardedEvent;
public static synthetic fun copy$default (Lcom/gojek/mqtt/event/MqttEvent$OperationDiscardedEvent;Ljava/lang/String;Ljava/lang/String;Lcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;ILjava/lang/Object;)Lcom/gojek/mqtt/event/MqttEvent$OperationDiscardedEvent;
public fun equals (Ljava/lang/Object;)Z
public fun getConnectionInfo ()Lcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;
public final fun getName ()Ljava/lang/String;
public final fun getReason ()Ljava/lang/String;
public fun hashCode ()I
public fun setConnectionInfo (Lcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;)V
public fun toString ()Ljava/lang/String;
}

public final class com/gojek/mqtt/event/MqttEvent$OptimalKeepAliveFoundEvent : com/gojek/mqtt/event/MqttEvent {
public fun <init> (IIILcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;)V
public synthetic fun <init> (IIILcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@ internal class MqttCourierClient(
}

override fun disconnect(clearState: Boolean) {
mqttClient.disconnect(clearState)
if (clearState) {
mqttClient.destroy()
} else {
mqttClient.disconnect()
}
}

override fun reconnect() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ data class ExperimentConfigs(
val incomingMessagesTTLSecs: Long = 360,
val incomingMessagesCleanupIntervalSecs: Long = 60,
val shouldUseNewSSLFlow: Boolean = false,
val maxInflightMessagesLimit: Int = MAX_INFLIGHT_MESSAGES_ALLOWED
val maxInflightMessagesLimit: Int = MAX_INFLIGHT_MESSAGES_ALLOWED,
val cleanMqttClientOnDestroy: Boolean = false
)

enum class SubscriptionStore {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ internal object ConnectionInfoStore {
fun updateConnectionInfo(connectionInfo: ConnectionInfo) {
state = State.InitialisedState(connectionInfo)
}

fun removeConnectionInfo() {
state = State.UninitialisedState
}
}

private sealed class State(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,14 @@ import com.gojek.keepalive.config.AdaptiveKeepAliveConfig as AdaptiveKAConfig
import com.gojek.mqtt.client.config.v3.MqttV3Configuration
import com.gojek.mqtt.client.event.interceptor.MqttEventHandler
import com.gojek.mqtt.client.factory.getAndroidMqttClientFactory
import com.gojek.mqtt.client.internal.State.INITIALISED
import com.gojek.mqtt.client.internal.State.UNINITIALISED
import com.gojek.mqtt.client.listener.MessageListener
import com.gojek.mqtt.client.model.ConnectionState
import com.gojek.mqtt.client.v3.IAndroidMqttClient
import com.gojek.mqtt.event.AdaptivePingEventHandler
import com.gojek.mqtt.event.EventHandler
import com.gojek.mqtt.event.MqttEvent.OperationDiscardedEvent
import com.gojek.mqtt.event.MqttEvent.OptimalKeepAliveFoundEvent
import com.gojek.mqtt.event.PingEventHandler
import com.gojek.mqtt.model.AdaptiveKeepAliveConfig
Expand All @@ -34,7 +37,7 @@ internal class MqttClientInternal(
private val networkStateTracker =
NetworkStateTrackerFactory.create(context.applicationContext, mqttConfiguration.logger)

private val androidMqttClient: IAndroidMqttClient
private var androidMqttClient: IAndroidMqttClient? = null

private var optimalKeepAliveProvider: OptimalKeepAliveProvider? = null

Expand All @@ -45,6 +48,8 @@ internal class MqttClientInternal(

private val eventHandler = MqttEventHandler(MqttUtils())

private var initialisationState = UNINITIALISED

private val optimalKeepAliveObserver = object : OptimalKeepAliveObserver {
override fun onOptimalKeepAliveFound(
timeMinutes: Int,
Expand Down Expand Up @@ -74,49 +79,131 @@ internal class MqttClientInternal(
eventHandler = eventHandler,
pingEventHandler = PingEventHandler(eventHandler)
)
initialisationState = INITIALISED
}

@Synchronized
fun connect(connectOptions: MqttConnectOptions) {
androidMqttClient.connect(connectOptions)
if (initialisationState == UNINITIALISED) {
initialiseAdaptiveMqttClient()
androidMqttClient = androidMqttClientFactory.createAndroidMqttClient(
context = context,
mqttConfiguration = mqttConfiguration,
networkStateTracker = networkStateTracker,
keepAliveProvider = keepAliveProvider,
keepAliveFailureHandler = keepAliveFailureHandler,
eventHandler = eventHandler,
pingEventHandler = PingEventHandler(eventHandler)
)
initialisationState = INITIALISED
}
androidMqttClient?.connect(connectOptions)
adaptiveMqttClient?.connect(connectOptions)
}

fun disconnect(clearState: Boolean) {
androidMqttClient.disconnect(clearState)
adaptiveMqttClient?.disconnect(clearState)
@Synchronized
fun disconnect() {
if (initialisationState == UNINITIALISED) {
mqttConfiguration.logger.d("MqttClient", "MqttClient is not initialised")
eventHandler.onEvent(OperationDiscardedEvent("Disconnect", "State uninitialised"))
return
}
androidMqttClient?.disconnect()
adaptiveMqttClient?.disconnect()
}

@Synchronized
fun destroy() {
if (initialisationState == UNINITIALISED) {
mqttConfiguration.logger.d("MqttClient", "MqttClient is not initialised")
eventHandler.onEvent(OperationDiscardedEvent("Destroy", "State uninitialised"))
return
}
androidMqttClient?.destroy()
adaptiveMqttClient?.destroy()
if (mqttConfiguration.experimentConfigs.cleanMqttClientOnDestroy) {
initialisationState = UNINITIALISED
}
}

@Synchronized
fun reconnect() {
androidMqttClient.reconnect()
if (initialisationState == UNINITIALISED) {
mqttConfiguration.logger.d("MqttClient", "MqttClient is not initialised")
eventHandler.onEvent(OperationDiscardedEvent("Reconnect", "State uninitialised"))
return
}
androidMqttClient?.reconnect()
adaptiveMqttClient?.reconnect()
}

@Synchronized
fun subscribe(vararg topics: Pair<String, QoS>) {
androidMqttClient.subscribe(mapOf(*topics))
if (initialisationState == UNINITIALISED) {
mqttConfiguration.logger.d("MqttClient", "MqttClient is not initialised")
eventHandler.onEvent(OperationDiscardedEvent("Subscribe", "State uninitialised"))
return
}
androidMqttClient?.subscribe(mapOf(*topics))
}

@Synchronized
fun unsubscribe(vararg topics: String) {
androidMqttClient.unsubscribe(listOf(*topics))
if (initialisationState == UNINITIALISED) {
mqttConfiguration.logger.d("MqttClient", "MqttClient is not initialised")
eventHandler.onEvent(OperationDiscardedEvent("Unsubscribe", "State uninitialised"))
return
}
androidMqttClient?.unsubscribe(listOf(*topics))
}

@Synchronized
fun send(mqttPacket: MqttPacket, sendMessageCallback: SendMessageCallback): Boolean {
return androidMqttClient.send(mqttPacket, sendMessageCallback)
if (initialisationState == UNINITIALISED) {
mqttConfiguration.logger.d("MqttClient", "MqttClient is not initialised")
eventHandler.onEvent(OperationDiscardedEvent("SendMessage", "State uninitialised"))
return false
}
return androidMqttClient?.send(mqttPacket, sendMessageCallback) ?: false
}

@Synchronized
fun addMessageListener(topic: String, listener: MessageListener) {
return androidMqttClient.addMessageListener(topic, listener)
if (initialisationState == UNINITIALISED) {
mqttConfiguration.logger.d("MqttClient", "MqttClient is not initialised")
eventHandler.onEvent(OperationDiscardedEvent("AddMessageListener", "State uninitialised"))
return
}
androidMqttClient?.addMessageListener(topic, listener)
}

@Synchronized
fun removeMessageListener(topic: String, listener: MessageListener) {
return androidMqttClient.removeMessageListener(topic, listener)
if (initialisationState == UNINITIALISED) {
mqttConfiguration.logger.d("MqttClient", "MqttClient is not initialised")
eventHandler.onEvent(OperationDiscardedEvent("RemoveMessageListener", "State uninitialised"))
return
}
androidMqttClient?.removeMessageListener(topic, listener)
}

@Synchronized
fun addGlobalMessageListener(listener: MessageListener) {
return androidMqttClient.addGlobalMessageListener(listener)
if (initialisationState == UNINITIALISED) {
mqttConfiguration.logger.d("MqttClient", "MqttClient is not initialised")
eventHandler.onEvent(OperationDiscardedEvent("AddGlobalMessageListener", "State uninitialised"))
return
}
androidMqttClient?.addGlobalMessageListener(listener)
}

@Synchronized
fun getCurrentState(): ConnectionState {
return androidMqttClient.getCurrentState()
if (initialisationState == UNINITIALISED) {
mqttConfiguration.logger.d("MqttClient", "MqttClient is not initialised")
return ConnectionState.UNINITIALISED
}
return androidMqttClient?.getCurrentState() ?: ConnectionState.UNINITIALISED
}

private fun initialiseAdaptiveMqttClient() {
Expand Down Expand Up @@ -178,3 +265,7 @@ internal class MqttClientInternal(
this.eventHandler.removeEventHandler(eventHandler)
}
}

private enum class State {
UNINITIALISED, INITIALISED
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@ package com.gojek.mqtt.client.model
import com.gojek.mqtt.client.MqttClient

enum class ConnectionState {
/**
* Represents state when [MqttClient] is not created/initialised
*/
UNINITIALISED,

/**
* Represents state when [MqttClient] is created/initialised
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ internal interface IAndroidMqttClient {
fun connect(connectOptions: MqttConnectOptions)
fun connect(timeMillis: Long = MQTT_WAIT_BEFORE_RECONNECT_TIME_MS)
fun reconnect()
fun disconnect(clearState: Boolean = false)
fun disconnect()
fun destroy()
fun send(mqttPacket: MqttPacket, sendMessageCallback: SendMessageCallback): Boolean
fun addMessageListener(topic: String, listener: MessageListener)
fun removeMessageListener(topic: String, listener: MessageListener)
Expand Down
Loading
Loading