Skip to content

Commit

Permalink
[ISSUE#31] Stop handler thread on destroy (#82)
Browse files Browse the repository at this point in the history
  • Loading branch information
deepanshu42 authored Dec 11, 2024
1 parent cee9bf7 commit cbae5d1
Show file tree
Hide file tree
Showing 6 changed files with 133 additions and 57 deletions.
27 changes: 23 additions & 4 deletions mqtt-client/api/mqtt-client.api
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,11 @@ public abstract interface class com/gojek/mqtt/client/MqttInterceptor {

public final class com/gojek/mqtt/client/config/ExperimentConfigs {
public fun <init> ()V
public fun <init> (Lcom/gojek/mqtt/client/config/SubscriptionStore;Lcom/gojek/mqtt/model/AdaptiveKeepAliveConfig;IIIIJJZI)V
public synthetic fun <init> (Lcom/gojek/mqtt/client/config/SubscriptionStore;Lcom/gojek/mqtt/model/AdaptiveKeepAliveConfig;IIIIJJZIILkotlin/jvm/internal/DefaultConstructorMarker;)V
public fun <init> (Lcom/gojek/mqtt/client/config/SubscriptionStore;Lcom/gojek/mqtt/model/AdaptiveKeepAliveConfig;IIIIJJZIZ)V
public synthetic fun <init> (Lcom/gojek/mqtt/client/config/SubscriptionStore;Lcom/gojek/mqtt/model/AdaptiveKeepAliveConfig;IIIIJJZIZILkotlin/jvm/internal/DefaultConstructorMarker;)V
public final fun component1 ()Lcom/gojek/mqtt/client/config/SubscriptionStore;
public final fun component10 ()I
public final fun component11 ()Z
public final fun component2 ()Lcom/gojek/mqtt/model/AdaptiveKeepAliveConfig;
public final fun component3 ()I
public final fun component4 ()I
Expand All @@ -41,8 +42,8 @@ public final class com/gojek/mqtt/client/config/ExperimentConfigs {
public final fun component7 ()J
public final fun component8 ()J
public final fun component9 ()Z
public final fun copy (Lcom/gojek/mqtt/client/config/SubscriptionStore;Lcom/gojek/mqtt/model/AdaptiveKeepAliveConfig;IIIIJJZI)Lcom/gojek/mqtt/client/config/ExperimentConfigs;
public static synthetic fun copy$default (Lcom/gojek/mqtt/client/config/ExperimentConfigs;Lcom/gojek/mqtt/client/config/SubscriptionStore;Lcom/gojek/mqtt/model/AdaptiveKeepAliveConfig;IIIIJJZIILjava/lang/Object;)Lcom/gojek/mqtt/client/config/ExperimentConfigs;
public final fun copy (Lcom/gojek/mqtt/client/config/SubscriptionStore;Lcom/gojek/mqtt/model/AdaptiveKeepAliveConfig;IIIIJJZIZ)Lcom/gojek/mqtt/client/config/ExperimentConfigs;
public static synthetic fun copy$default (Lcom/gojek/mqtt/client/config/ExperimentConfigs;Lcom/gojek/mqtt/client/config/SubscriptionStore;Lcom/gojek/mqtt/model/AdaptiveKeepAliveConfig;IIIIJJZIZILjava/lang/Object;)Lcom/gojek/mqtt/client/config/ExperimentConfigs;
public fun equals (Ljava/lang/Object;)Z
public final fun getActivityCheckIntervalSeconds ()I
public final fun getAdaptiveKeepAliveConfig ()Lcom/gojek/mqtt/model/AdaptiveKeepAliveConfig;
Expand All @@ -53,6 +54,7 @@ public final class com/gojek/mqtt/client/config/ExperimentConfigs {
public final fun getMaxInflightMessagesLimit ()I
public final fun getPolicyResetTimeSeconds ()I
public final fun getShouldUseNewSSLFlow ()Z
public final fun getStopMqttThreadOnDestroy ()Z
public final fun getSubscriptionStore ()Lcom/gojek/mqtt/client/config/SubscriptionStore;
public fun hashCode ()I
public fun toString ()Ljava/lang/String;
Expand Down Expand Up @@ -816,6 +818,23 @@ public final class com/gojek/mqtt/event/MqttEvent$OfflineMessageDiscardedEvent :
public fun toString ()Ljava/lang/String;
}

public final class com/gojek/mqtt/event/MqttEvent$OperationDiscardedEvent : com/gojek/mqtt/event/MqttEvent {
public fun <init> (Ljava/lang/String;Ljava/lang/String;Lcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;)V
public synthetic fun <init> (Ljava/lang/String;Ljava/lang/String;Lcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
public final fun component1 ()Ljava/lang/String;
public final fun component2 ()Ljava/lang/String;
public final fun component3 ()Lcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;
public final fun copy (Ljava/lang/String;Ljava/lang/String;Lcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;)Lcom/gojek/mqtt/event/MqttEvent$OperationDiscardedEvent;
public static synthetic fun copy$default (Lcom/gojek/mqtt/event/MqttEvent$OperationDiscardedEvent;Ljava/lang/String;Ljava/lang/String;Lcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;ILjava/lang/Object;)Lcom/gojek/mqtt/event/MqttEvent$OperationDiscardedEvent;
public fun equals (Ljava/lang/Object;)Z
public fun getConnectionInfo ()Lcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;
public final fun getName ()Ljava/lang/String;
public final fun getReason ()Ljava/lang/String;
public fun hashCode ()I
public fun setConnectionInfo (Lcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;)V
public fun toString ()Ljava/lang/String;
}

public final class com/gojek/mqtt/event/MqttEvent$OptimalKeepAliveFoundEvent : com/gojek/mqtt/event/MqttEvent {
public fun <init> (IIILcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;)V
public synthetic fun <init> (IIILcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ data class ExperimentConfigs(
val incomingMessagesTTLSecs: Long = 360,
val incomingMessagesCleanupIntervalSecs: Long = 60,
val shouldUseNewSSLFlow: Boolean = false,
val maxInflightMessagesLimit: Int = MAX_INFLIGHT_MESSAGES_ALLOWED
val maxInflightMessagesLimit: Int = MAX_INFLIGHT_MESSAGES_ALLOWED,
val stopMqttThreadOnDestroy: Boolean = false
)

enum class SubscriptionStore {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,6 @@
package com.gojek.mqtt.client.v3.impl

import android.content.Context
import android.os.Bundle
import android.os.Handler
import android.os.HandlerThread
import android.os.Looper
import android.os.Message
import android.os.Messenger
import android.os.RemoteException
import androidx.annotation.RequiresApi
import com.gojek.courier.QoS
import com.gojek.courier.callback.SendMessageCallback
Expand Down Expand Up @@ -38,11 +31,11 @@ import com.gojek.mqtt.client.model.ConnectionState.DISCONNECTING
import com.gojek.mqtt.client.model.ConnectionState.INITIALISED
import com.gojek.mqtt.client.model.MqttSendPacket
import com.gojek.mqtt.client.v3.IAndroidMqttClient
import com.gojek.mqtt.client.v3.impl.State.DESTROYED
import com.gojek.mqtt.client.v3.impl.State.UNINITIALISED
import com.gojek.mqtt.connection.IMqttConnection
import com.gojek.mqtt.connection.MqttConnection
import com.gojek.mqtt.connection.config.v3.ConnectionConfig
import com.gojek.mqtt.constants.MESSAGE
import com.gojek.mqtt.constants.MSG_APP_PUBLISH
import com.gojek.mqtt.event.EventHandler
import com.gojek.mqtt.event.MqttEvent.AuthenticatorErrorEvent
import com.gojek.mqtt.event.MqttEvent.MqttConnectDiscardedEvent
Expand All @@ -54,8 +47,8 @@ import com.gojek.mqtt.event.MqttEvent.MqttMessageSendEvent
import com.gojek.mqtt.event.MqttEvent.MqttMessageSendFailureEvent
import com.gojek.mqtt.event.MqttEvent.MqttMessageSendSuccessEvent
import com.gojek.mqtt.event.MqttEvent.MqttReconnectEvent
import com.gojek.mqtt.event.MqttEvent.OperationDiscardedEvent
import com.gojek.mqtt.exception.toCourierException
import com.gojek.mqtt.handler.IncomingHandler
import com.gojek.mqtt.model.MqttConnectOptions
import com.gojek.mqtt.model.MqttPacket
import com.gojek.mqtt.network.NetworkHandler
Expand All @@ -78,6 +71,7 @@ import com.gojek.mqtt.wakelock.WakeLockProvider
import com.gojek.networktracker.NetworkStateTracker
import java.nio.charset.StandardCharsets
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicReference
import org.eclipse.paho.client.mqttv3.MqttException
import org.eclipse.paho.client.mqttv3.MqttException.REASON_CODE_UNEXPECTED_ERROR
import org.eclipse.paho.client.mqttv3.MqttPersistenceException
Expand All @@ -95,9 +89,6 @@ internal class AndroidMqttClient(

private val runnableScheduler: IRunnableScheduler
private val mqttConnection: IMqttConnection
private val mqttThreadLooper: Looper
private val mqttThreadHandler: Handler
private var mMessenger: Messenger
private val networkUtils: NetworkUtils
private val mqttUtils: MqttUtils
private val mqttPersistence: PahoPersistence
Expand All @@ -114,8 +105,7 @@ internal class AndroidMqttClient(
@Volatile
private var globalListener: MessageListener? = null

@Volatile
private var isInitialised = false
private var state = AtomicReference(UNINITIALISED)

// Accessed only from mqtt thread
private var forceRefresh = false
Expand All @@ -133,21 +123,12 @@ internal class AndroidMqttClient(

init {
logger = mqttConfiguration.logger
val mqttHandlerThread = HandlerThread("MQTT_Thread")
mqttHandlerThread.start()
mqttThreadLooper = mqttHandlerThread.looper
mqttThreadHandler = Handler(mqttThreadLooper)
mMessenger = Messenger(
IncomingHandler(mqttThreadLooper, this, logger)
)
@RequiresApi
runnableScheduler = MqttRunnableScheduler(
mqttHandlerThread,
mqttThreadHandler,
this,
logger,
eventHandler,
experimentConfigs.activityCheckIntervalSeconds
clientSchedulerBridge = this,
logger = logger,
eventHandler = eventHandler,
activityCheckIntervalSeconds = experimentConfigs.activityCheckIntervalSeconds
)
mqttUtils = MqttUtils()
networkUtils = NetworkUtils()
Expand Down Expand Up @@ -214,19 +195,24 @@ internal class AndroidMqttClient(
connectOptions: MqttConnectOptions
) {
this.connectOptions = connectOptions
isInitialised = true
runnableScheduler.start()
state.set(State.INITIALISED)
runnableScheduler.connectMqtt()
}

// This can be invoked on any thread
override fun reconnect() {
if (state.get() != State.INITIALISED) {
eventHandler.onEvent(OperationDiscardedEvent("Reconnect", "Client is not initialised"))
return
}
eventHandler.onEvent(MqttReconnectEvent())
runnableScheduler.disconnectMqtt(true)
}

// This can be invoked on any thread
override fun disconnect(clearState: Boolean) {
isInitialised = false
state.set(State.DISCONNECTED)
runnableScheduler.disconnectMqtt(false, clearState)
}

Expand Down Expand Up @@ -299,6 +285,10 @@ internal class AndroidMqttClient(

// This can be invoked on any thread
override fun send(mqttPacket: MqttPacket, sendMessageCallback: SendMessageCallback): Boolean {
if (state.get() == DESTROYED) {
eventHandler.onEvent(OperationDiscardedEvent("SendMessage", "Client is not initialised"))
return false
}
val mqttSendPacket = MqttSendPacket(
message = mqttPacket.message,
messageId = 0,
Expand All @@ -309,25 +299,7 @@ internal class AndroidMqttClient(
triggerTime = System.nanoTime(),
sendMessageCallback = sendMessageCallback
)

val msg = Message.obtain()
msg.what = MSG_APP_PUBLISH

val bundle = Bundle()
bundle.putParcelable(MESSAGE, mqttSendPacket)

msg.data = bundle
msg.replyTo = mMessenger

try {
mMessenger.send(msg)
} catch (e: RemoteException) {
/* Service is dead. What to do? */
logger.e(TAG, "Remote Service dead", e)
return false
}

return true
return runnableScheduler.sendMessage(mqttSendPacket)
}

override fun addMessageListener(topic: String, listener: MessageListener) {
Expand All @@ -347,7 +319,7 @@ internal class AndroidMqttClient(
val startTime = clock.nanoTime()
try {
logger.d(TAG, "Sending onConnectAttempt event")
if (!isInitialised) {
if (state.get() != State.INITIALISED) {
logger.d(TAG, "Mqtt Client not initialised")
eventHandler.onEvent(
MqttConnectDiscardedEvent(
Expand Down Expand Up @@ -414,6 +386,10 @@ internal class AndroidMqttClient(
eventHandler.onEvent(MqttDisconnectEvent())
mqttConnection.disconnect()
if (clearState) {
if (experimentConfigs.stopMqttThreadOnDestroy) {
state.set(DESTROYED)
runnableScheduler.stop()
}
mqttConnection.shutDown()
subscriptionStore.clear()
mqttPersistence.clearAll()
Expand All @@ -432,11 +408,19 @@ internal class AndroidMqttClient(

override fun subscribe(topicMap: Map<String, QoS>) {
val addedTopics = subscriptionStore.subscribeTopics(topicMap)
if (state.get() != State.INITIALISED) {
eventHandler.onEvent(OperationDiscardedEvent("Subscribe", "Client is not initialised"))
return
}
runnableScheduler.scheduleSubscribe(0, addedTopics)
}

override fun unsubscribe(topics: List<String>) {
val removedTopics = subscriptionStore.unsubscribeTopics(topics)
if (state.get() != State.INITIALISED) {
eventHandler.onEvent(OperationDiscardedEvent("Unsubscribe", "Client is not initialised"))
return
}
runnableScheduler.scheduleUnsubscribe(0, removedTopics)
}

Expand Down Expand Up @@ -626,3 +610,7 @@ internal class AndroidMqttClient(
const val TAG = "AndroidMqttClient"
}
}

private enum class State {
UNINITIALISED, INITIALISED, DISCONNECTED, DESTROYED
}
6 changes: 6 additions & 0 deletions mqtt-client/src/main/java/com/gojek/mqtt/event/MqttEvent.kt
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ sealed class MqttEvent(open var connectionInfo: ConnectionInfo?) {
override var connectionInfo: ConnectionInfo? = null
) : MqttEvent(connectionInfo)

data class OperationDiscardedEvent(
val name: String,
val reason: String,
override var connectionInfo: ConnectionInfo? = null
) : MqttEvent(connectionInfo)

data class MqttConnectSuccessEvent(
val activeNetInfo: ActiveNetInfo,
val serverUri: ServerUri?,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.gojek.mqtt.scheduler

import com.gojek.courier.QoS
import com.gojek.mqtt.client.model.MqttSendPacket

internal interface IRunnableScheduler {
fun connectMqtt()
Expand All @@ -24,4 +25,9 @@ internal interface IRunnableScheduler {
fun scheduleResetParams(delayMillis: Long)

fun scheduleAuthFailureRunnable(delayMillis: Long)

fun sendMessage(mqttSendPacket: MqttSendPacket): Boolean

fun start()
fun stop()
}
Loading

0 comments on commit cbae5d1

Please sign in to comment.