From e3fe343a7bd1955a370c7fc221bb3d040a918c49 Mon Sep 17 00:00:00 2001 From: Deepanshu Date: Mon, 17 Jul 2023 11:43:46 +0530 Subject: [PATCH] Improve telemetry and fast reconnect logic (#70) * Add connect packet round trip time in connect success event * Reset fast reconnect variables on disconnect * Add time taken in message send events * Add different timeout for CONNACK --- .../com/gojek/courier/app/ui/MainActivity.kt | 1 + mqtt-client/api/mqtt-client.api | 52 +++++++++++-------- .../mqtt/client/config/ExperimentConfigs.kt | 1 + .../event/adapter/MqttClientEventAdapter.kt | 8 ++- .../gojek/mqtt/client/model/MqttSendPacket.kt | 3 ++ .../mqtt/client/v3/impl/AndroidMqttClient.kt | 40 +++++++++----- .../gojek/mqtt/connection/MqttConnection.kt | 8 ++- .../connection/config/v3/ConnectionConfig.kt | 3 +- .../java/com/gojek/mqtt/event/MqttEvent.kt | 3 ++ .../client/mqttv3/IExperimentsConfig.java | 2 + .../client/mqttv3/internal/ClientState.java | 17 ++++-- 11 files changed, 97 insertions(+), 41 deletions(-) diff --git a/app/src/main/java/com/gojek/courier/app/ui/MainActivity.kt b/app/src/main/java/com/gojek/courier/app/ui/MainActivity.kt index f93a38bd..6ee9a0c4 100644 --- a/app/src/main/java/com/gojek/courier/app/ui/MainActivity.kt +++ b/app/src/main/java/com/gojek/courier/app/ui/MainActivity.kt @@ -148,6 +148,7 @@ class MainActivity : AppCompatActivity() { ), inactivityTimeoutSeconds = 45, activityCheckIntervalSeconds = 30, + connectPacketTimeoutSeconds = 5, incomingMessagesTTLSecs = 60, incomingMessagesCleanupIntervalSecs = 10, maxInflightMessagesLimit = 1000, diff --git a/mqtt-client/api/mqtt-client.api b/mqtt-client/api/mqtt-client.api index 56a598c6..cbd36159 100644 --- a/mqtt-client/api/mqtt-client.api +++ b/mqtt-client/api/mqtt-client.api @@ -29,22 +29,24 @@ public abstract interface class com/gojek/mqtt/client/MqttInterceptor { public final class com/gojek/mqtt/client/config/ExperimentConfigs { public fun ()V - public fun (Lcom/gojek/mqtt/client/config/SubscriptionStore;Lcom/gojek/mqtt/model/AdaptiveKeepAliveConfig;IIIJJZI)V - public synthetic fun (Lcom/gojek/mqtt/client/config/SubscriptionStore;Lcom/gojek/mqtt/model/AdaptiveKeepAliveConfig;IIIJJZIILkotlin/jvm/internal/DefaultConstructorMarker;)V + public fun (Lcom/gojek/mqtt/client/config/SubscriptionStore;Lcom/gojek/mqtt/model/AdaptiveKeepAliveConfig;IIIIJJZI)V + public synthetic fun (Lcom/gojek/mqtt/client/config/SubscriptionStore;Lcom/gojek/mqtt/model/AdaptiveKeepAliveConfig;IIIIJJZIILkotlin/jvm/internal/DefaultConstructorMarker;)V public final fun component1 ()Lcom/gojek/mqtt/client/config/SubscriptionStore; + public final fun component10 ()I public final fun component2 ()Lcom/gojek/mqtt/model/AdaptiveKeepAliveConfig; public final fun component3 ()I public final fun component4 ()I public final fun component5 ()I - public final fun component6 ()J + public final fun component6 ()I public final fun component7 ()J - public final fun component8 ()Z - public final fun component9 ()I - public final fun copy (Lcom/gojek/mqtt/client/config/SubscriptionStore;Lcom/gojek/mqtt/model/AdaptiveKeepAliveConfig;IIIJJZI)Lcom/gojek/mqtt/client/config/ExperimentConfigs; - public static synthetic fun copy$default (Lcom/gojek/mqtt/client/config/ExperimentConfigs;Lcom/gojek/mqtt/client/config/SubscriptionStore;Lcom/gojek/mqtt/model/AdaptiveKeepAliveConfig;IIIJJZIILjava/lang/Object;)Lcom/gojek/mqtt/client/config/ExperimentConfigs; + public final fun component8 ()J + public final fun component9 ()Z + public final fun copy (Lcom/gojek/mqtt/client/config/SubscriptionStore;Lcom/gojek/mqtt/model/AdaptiveKeepAliveConfig;IIIIJJZI)Lcom/gojek/mqtt/client/config/ExperimentConfigs; + public static synthetic fun copy$default (Lcom/gojek/mqtt/client/config/ExperimentConfigs;Lcom/gojek/mqtt/client/config/SubscriptionStore;Lcom/gojek/mqtt/model/AdaptiveKeepAliveConfig;IIIIJJZIILjava/lang/Object;)Lcom/gojek/mqtt/client/config/ExperimentConfigs; public fun equals (Ljava/lang/Object;)Z public final fun getActivityCheckIntervalSeconds ()I public final fun getAdaptiveKeepAliveConfig ()Lcom/gojek/mqtt/model/AdaptiveKeepAliveConfig; + public final fun getConnectPacketTimeoutSeconds ()I public final fun getInactivityTimeoutSeconds ()I public final fun getIncomingMessagesCleanupIntervalSecs ()J public final fun getIncomingMessagesTTLSecs ()J @@ -381,16 +383,18 @@ public final class com/gojek/mqtt/event/MqttEvent$MqttConnectFailureEvent : com/ } public final class com/gojek/mqtt/event/MqttEvent$MqttConnectSuccessEvent : com/gojek/mqtt/event/MqttEvent { - public fun (Lcom/gojek/mqtt/network/ActiveNetInfo;Lcom/gojek/mqtt/model/ServerUri;JLcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;)V - public synthetic fun (Lcom/gojek/mqtt/network/ActiveNetInfo;Lcom/gojek/mqtt/model/ServerUri;JLcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;ILkotlin/jvm/internal/DefaultConstructorMarker;)V + public fun (Lcom/gojek/mqtt/network/ActiveNetInfo;Lcom/gojek/mqtt/model/ServerUri;JJLcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;)V + public synthetic fun (Lcom/gojek/mqtt/network/ActiveNetInfo;Lcom/gojek/mqtt/model/ServerUri;JJLcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;ILkotlin/jvm/internal/DefaultConstructorMarker;)V public final fun component1 ()Lcom/gojek/mqtt/network/ActiveNetInfo; public final fun component2 ()Lcom/gojek/mqtt/model/ServerUri; public final fun component3 ()J - public final fun component4 ()Lcom/gojek/mqtt/client/connectioninfo/ConnectionInfo; - public final fun copy (Lcom/gojek/mqtt/network/ActiveNetInfo;Lcom/gojek/mqtt/model/ServerUri;JLcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;)Lcom/gojek/mqtt/event/MqttEvent$MqttConnectSuccessEvent; - public static synthetic fun copy$default (Lcom/gojek/mqtt/event/MqttEvent$MqttConnectSuccessEvent;Lcom/gojek/mqtt/network/ActiveNetInfo;Lcom/gojek/mqtt/model/ServerUri;JLcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;ILjava/lang/Object;)Lcom/gojek/mqtt/event/MqttEvent$MqttConnectSuccessEvent; + public final fun component4 ()J + public final fun component5 ()Lcom/gojek/mqtt/client/connectioninfo/ConnectionInfo; + public final fun copy (Lcom/gojek/mqtt/network/ActiveNetInfo;Lcom/gojek/mqtt/model/ServerUri;JJLcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;)Lcom/gojek/mqtt/event/MqttEvent$MqttConnectSuccessEvent; + public static synthetic fun copy$default (Lcom/gojek/mqtt/event/MqttEvent$MqttConnectSuccessEvent;Lcom/gojek/mqtt/network/ActiveNetInfo;Lcom/gojek/mqtt/model/ServerUri;JJLcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;ILjava/lang/Object;)Lcom/gojek/mqtt/event/MqttEvent$MqttConnectSuccessEvent; public fun equals (Ljava/lang/Object;)Z public final fun getActiveNetInfo ()Lcom/gojek/mqtt/network/ActiveNetInfo; + public final fun getConnectPacketRTTime ()J public fun getConnectionInfo ()Lcom/gojek/mqtt/client/connectioninfo/ConnectionInfo; public final fun getServerUri ()Lcom/gojek/mqtt/model/ServerUri; public final fun getTimeTakenMillis ()J @@ -520,20 +524,22 @@ public final class com/gojek/mqtt/event/MqttEvent$MqttMessageSendEvent : com/goj } public final class com/gojek/mqtt/event/MqttEvent$MqttMessageSendFailureEvent : com/gojek/mqtt/event/MqttEvent { - public fun (Ljava/lang/String;IILcom/gojek/mqtt/exception/CourierException;Lcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;)V - public synthetic fun (Ljava/lang/String;IILcom/gojek/mqtt/exception/CourierException;Lcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;ILkotlin/jvm/internal/DefaultConstructorMarker;)V + public fun (Ljava/lang/String;IILcom/gojek/mqtt/exception/CourierException;JLcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;)V + public synthetic fun (Ljava/lang/String;IILcom/gojek/mqtt/exception/CourierException;JLcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;ILkotlin/jvm/internal/DefaultConstructorMarker;)V public final fun component1 ()Ljava/lang/String; public final fun component2 ()I public final fun component3 ()I public final fun component4 ()Lcom/gojek/mqtt/exception/CourierException; - public final fun component5 ()Lcom/gojek/mqtt/client/connectioninfo/ConnectionInfo; - public final fun copy (Ljava/lang/String;IILcom/gojek/mqtt/exception/CourierException;Lcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;)Lcom/gojek/mqtt/event/MqttEvent$MqttMessageSendFailureEvent; - public static synthetic fun copy$default (Lcom/gojek/mqtt/event/MqttEvent$MqttMessageSendFailureEvent;Ljava/lang/String;IILcom/gojek/mqtt/exception/CourierException;Lcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;ILjava/lang/Object;)Lcom/gojek/mqtt/event/MqttEvent$MqttMessageSendFailureEvent; + public final fun component5 ()J + public final fun component6 ()Lcom/gojek/mqtt/client/connectioninfo/ConnectionInfo; + public final fun copy (Ljava/lang/String;IILcom/gojek/mqtt/exception/CourierException;JLcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;)Lcom/gojek/mqtt/event/MqttEvent$MqttMessageSendFailureEvent; + public static synthetic fun copy$default (Lcom/gojek/mqtt/event/MqttEvent$MqttMessageSendFailureEvent;Ljava/lang/String;IILcom/gojek/mqtt/exception/CourierException;JLcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;ILjava/lang/Object;)Lcom/gojek/mqtt/event/MqttEvent$MqttMessageSendFailureEvent; public fun equals (Ljava/lang/Object;)Z public fun getConnectionInfo ()Lcom/gojek/mqtt/client/connectioninfo/ConnectionInfo; public final fun getException ()Lcom/gojek/mqtt/exception/CourierException; public final fun getQos ()I public final fun getSizeBytes ()I + public final fun getTimeTakenMillis ()J public final fun getTopic ()Ljava/lang/String; public fun hashCode ()I public fun setConnectionInfo (Lcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;)V @@ -541,18 +547,20 @@ public final class com/gojek/mqtt/event/MqttEvent$MqttMessageSendFailureEvent : } public final class com/gojek/mqtt/event/MqttEvent$MqttMessageSendSuccessEvent : com/gojek/mqtt/event/MqttEvent { - public fun (Ljava/lang/String;IILcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;)V - public synthetic fun (Ljava/lang/String;IILcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;ILkotlin/jvm/internal/DefaultConstructorMarker;)V + public fun (Ljava/lang/String;IIJLcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;)V + public synthetic fun (Ljava/lang/String;IIJLcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;ILkotlin/jvm/internal/DefaultConstructorMarker;)V public final fun component1 ()Ljava/lang/String; public final fun component2 ()I public final fun component3 ()I - public final fun component4 ()Lcom/gojek/mqtt/client/connectioninfo/ConnectionInfo; - public final fun copy (Ljava/lang/String;IILcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;)Lcom/gojek/mqtt/event/MqttEvent$MqttMessageSendSuccessEvent; - public static synthetic fun copy$default (Lcom/gojek/mqtt/event/MqttEvent$MqttMessageSendSuccessEvent;Ljava/lang/String;IILcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;ILjava/lang/Object;)Lcom/gojek/mqtt/event/MqttEvent$MqttMessageSendSuccessEvent; + public final fun component4 ()J + public final fun component5 ()Lcom/gojek/mqtt/client/connectioninfo/ConnectionInfo; + public final fun copy (Ljava/lang/String;IIJLcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;)Lcom/gojek/mqtt/event/MqttEvent$MqttMessageSendSuccessEvent; + public static synthetic fun copy$default (Lcom/gojek/mqtt/event/MqttEvent$MqttMessageSendSuccessEvent;Ljava/lang/String;IIJLcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;ILjava/lang/Object;)Lcom/gojek/mqtt/event/MqttEvent$MqttMessageSendSuccessEvent; public fun equals (Ljava/lang/Object;)Z public fun getConnectionInfo ()Lcom/gojek/mqtt/client/connectioninfo/ConnectionInfo; public final fun getQos ()I public final fun getSizeBytes ()I + public final fun getTimeTakenMillis ()J public final fun getTopic ()Ljava/lang/String; public fun hashCode ()I public fun setConnectionInfo (Lcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;)V diff --git a/mqtt-client/src/main/java/com/gojek/mqtt/client/config/ExperimentConfigs.kt b/mqtt-client/src/main/java/com/gojek/mqtt/client/config/ExperimentConfigs.kt index d0421ca0..61299a8a 100644 --- a/mqtt-client/src/main/java/com/gojek/mqtt/client/config/ExperimentConfigs.kt +++ b/mqtt-client/src/main/java/com/gojek/mqtt/client/config/ExperimentConfigs.kt @@ -12,6 +12,7 @@ data class ExperimentConfigs( val adaptiveKeepAliveConfig: AdaptiveKeepAliveConfig? = null, val activityCheckIntervalSeconds: Int = DEFAULT_ACTIVITY_CHECK_INTERVAL_SECS, val inactivityTimeoutSeconds: Int = DEFAULT_INACTIVITY_TIMEOUT_SECS, + val connectPacketTimeoutSeconds: Int = DEFAULT_INACTIVITY_TIMEOUT_SECS, val policyResetTimeSeconds: Int = DEFAULT_POLICY_RESET_TIME_SECS, val incomingMessagesTTLSecs: Long = 360, val incomingMessagesCleanupIntervalSecs: Long = 60, diff --git a/mqtt-client/src/main/java/com/gojek/mqtt/client/event/adapter/MqttClientEventAdapter.kt b/mqtt-client/src/main/java/com/gojek/mqtt/client/event/adapter/MqttClientEventAdapter.kt index 751e7a22..4abfc4ad 100644 --- a/mqtt-client/src/main/java/com/gojek/mqtt/client/event/adapter/MqttClientEventAdapter.kt +++ b/mqtt-client/src/main/java/com/gojek/mqtt/client/event/adapter/MqttClientEventAdapter.kt @@ -1,6 +1,7 @@ package com.gojek.mqtt.client.event.adapter import com.gojek.courier.QoS +import com.gojek.courier.extensions.fromNanosToMillis import com.gojek.mqtt.connection.event.ConnectionEventHandler import com.gojek.mqtt.event.EventHandler import com.gojek.mqtt.event.MqttEvent.ConnectPacketSendEvent @@ -34,6 +35,9 @@ internal class MqttClientEventAdapter( private val eventHandler: EventHandler, private val networkHandler: NetworkHandler ) { + + private var connectPacketSendTime = 0L + fun adapt(): ConnectionEventHandler { return object : ConnectionEventHandler { override fun onMqttConnectAttempt( @@ -57,7 +61,8 @@ internal class MqttClientEventAdapter( MqttConnectSuccessEvent( activeNetInfo = networkHandler.getActiveNetworkInfo(), serverUri = serverUri, - timeTakenMillis = timeTakenMillis + timeTakenMillis = timeTakenMillis, + connectPacketRTTime = (System.nanoTime() - connectPacketSendTime).fromNanosToMillis() ) ) } @@ -207,6 +212,7 @@ internal class MqttClientEventAdapter( } override fun onConnectPacketSend() { + connectPacketSendTime = System.nanoTime() eventHandler.onEvent(ConnectPacketSendEvent()) } diff --git a/mqtt-client/src/main/java/com/gojek/mqtt/client/model/MqttSendPacket.kt b/mqtt-client/src/main/java/com/gojek/mqtt/client/model/MqttSendPacket.kt index 0d7251cf..e89c20b6 100644 --- a/mqtt-client/src/main/java/com/gojek/mqtt/client/model/MqttSendPacket.kt +++ b/mqtt-client/src/main/java/com/gojek/mqtt/client/model/MqttSendPacket.kt @@ -12,6 +12,7 @@ internal data class MqttSendPacket( var qos: Int, var topic: String, var type: Int, + var triggerTime: Long, var sendMessageCallback: SendMessageCallback ) : Parcelable { constructor(parcel: Parcel) : this( @@ -21,6 +22,7 @@ internal data class MqttSendPacket( parcel.readInt(), parcel.readString()!!, parcel.readInt(), + parcel.readLong(), NoOpSendMessageCallback ) @@ -31,6 +33,7 @@ internal data class MqttSendPacket( parcel.writeInt(qos) parcel.writeString(topic) parcel.writeInt(type) + parcel.writeLong(triggerTime) } override fun describeContents(): Int { diff --git a/mqtt-client/src/main/java/com/gojek/mqtt/client/v3/impl/AndroidMqttClient.kt b/mqtt-client/src/main/java/com/gojek/mqtt/client/v3/impl/AndroidMqttClient.kt index 1e2f7d2d..0807e68a 100644 --- a/mqtt-client/src/main/java/com/gojek/mqtt/client/v3/impl/AndroidMqttClient.kt +++ b/mqtt-client/src/main/java/com/gojek/mqtt/client/v3/impl/AndroidMqttClient.kt @@ -179,7 +179,8 @@ internal class AndroidMqttClient( persistenceOptions = mqttConfiguration.persistenceOptions, inactivityTimeoutSeconds = experimentConfigs.inactivityTimeoutSeconds, policyResetTimeSeconds = experimentConfigs.policyResetTimeSeconds, - shouldUseNewSSLFlow = experimentConfigs.shouldUseNewSSLFlow + shouldUseNewSSLFlow = experimentConfigs.shouldUseNewSSLFlow, + connectPacketTimeoutSeconds = experimentConfigs.connectPacketTimeoutSeconds ) mqttConnection = MqttConnection( @@ -239,7 +240,6 @@ internal class AndroidMqttClient( if (!isConnected()) { connectMqtt() } - try { logger.d( TAG, @@ -261,6 +261,7 @@ internal class AndroidMqttClient( topic = topic, qos = qos, sizeBytes = message.size, + timeTakenMillis = (System.nanoTime() - triggerTime).fromNanosToMillis(), exception = e.toCourierException() ) ) @@ -273,6 +274,7 @@ internal class AndroidMqttClient( topic = topic, qos = qos, sizeBytes = message.size, + timeTakenMillis = (System.nanoTime() - triggerTime).fromNanosToMillis(), exception = e.toCourierException() ) ) @@ -287,6 +289,7 @@ internal class AndroidMqttClient( topic = topic, qos = qos, sizeBytes = message.size, + timeTakenMillis = (System.nanoTime() - triggerTime).fromNanosToMillis(), exception = e.toCourierException() ) ) @@ -297,13 +300,14 @@ internal class AndroidMqttClient( // This can be invoked on any thread override fun send(mqttPacket: MqttPacket, sendMessageCallback: SendMessageCallback): Boolean { val mqttSendPacket = MqttSendPacket( - mqttPacket.message, - 0, - System.currentTimeMillis(), - mqttPacket.qos.value, - mqttPacket.topic, - mqttPacket.qos.type, - sendMessageCallback + message = mqttPacket.message, + messageId = 0, + timestamp = System.currentTimeMillis(), + qos = mqttPacket.qos.value, + topic = mqttPacket.topic, + type = mqttPacket.qos.type, + triggerTime = System.nanoTime(), + sendMessageCallback = sendMessageCallback ) val msg = Message.obtain() @@ -587,9 +591,10 @@ internal class AndroidMqttClient( with(packet) { eventHandler.onEvent( MqttMessageSendSuccessEvent( - topic, - qos, - message.size + topic = topic, + qos = qos, + sizeBytes = message.size, + timeTakenMillis = (System.nanoTime() - triggerTime).fromNanosToMillis() ) ) } @@ -597,6 +602,17 @@ internal class AndroidMqttClient( override fun onFailure(packet: MqttSendPacket, exception: Throwable) { packet.sendMessageCallback.onMessageSendFailure(exception) + with(packet) { + eventHandler.onEvent( + MqttMessageSendFailureEvent( + topic = topic, + qos = qos, + sizeBytes = message.size, + timeTakenMillis = (System.nanoTime() - triggerTime).fromNanosToMillis(), + exception = exception.toCourierException() + ) + ) + } runnableScheduler.connectMqtt() } diff --git a/mqtt-client/src/main/java/com/gojek/mqtt/connection/MqttConnection.kt b/mqtt-client/src/main/java/com/gojek/mqtt/connection/MqttConnection.kt index 97761d6c..81e7f01b 100644 --- a/mqtt-client/src/main/java/com/gojek/mqtt/connection/MqttConnection.kt +++ b/mqtt-client/src/main/java/com/gojek/mqtt/connection/MqttConnection.kt @@ -192,7 +192,7 @@ internal class MqttConnection( connectOptions.keepAlive.isOptimal, serverUri ) - mqtt!!.connect(options, null, getConnectListener(subscriptionTopicMap)) + mqtt!!.connect(options, null, getConnectListener()) runnableScheduler.scheduleNextActivityCheck() } catch (e: MqttSecurityException) { logger.e(TAG, "mqtt security exception while connecting $e") @@ -407,7 +407,7 @@ internal class MqttConnection( return mqttAsyncClient } - private fun getConnectListener(subscriptionTopicMap: Map): IMqttActionListener? { + private fun getConnectListener(): IMqttActionListener { return object : IMqttActionListener { override fun onSuccess(iMqttToken: IMqttToken) { try { @@ -673,6 +673,10 @@ internal class MqttConnection( return connectionConfig.inactivityTimeoutSeconds } + override fun connectPacketTimeoutSecs(): Int { + return connectionConfig.connectPacketTimeoutSeconds + } + override fun useNewSSLFlow(): Boolean { return connectionConfig.shouldUseNewSSLFlow } diff --git a/mqtt-client/src/main/java/com/gojek/mqtt/connection/config/v3/ConnectionConfig.kt b/mqtt-client/src/main/java/com/gojek/mqtt/connection/config/v3/ConnectionConfig.kt index 8329b27c..a4640751 100644 --- a/mqtt-client/src/main/java/com/gojek/mqtt/connection/config/v3/ConnectionConfig.kt +++ b/mqtt-client/src/main/java/com/gojek/mqtt/connection/config/v3/ConnectionConfig.kt @@ -25,5 +25,6 @@ internal data class ConnectionConfig( val persistenceOptions: PersistenceOptions, val inactivityTimeoutSeconds: Int, val policyResetTimeSeconds: Int, - val shouldUseNewSSLFlow: Boolean + val shouldUseNewSSLFlow: Boolean, + val connectPacketTimeoutSeconds: Int ) diff --git a/mqtt-client/src/main/java/com/gojek/mqtt/event/MqttEvent.kt b/mqtt-client/src/main/java/com/gojek/mqtt/event/MqttEvent.kt index f52ed9dc..7e114e29 100644 --- a/mqtt-client/src/main/java/com/gojek/mqtt/event/MqttEvent.kt +++ b/mqtt-client/src/main/java/com/gojek/mqtt/event/MqttEvent.kt @@ -25,6 +25,7 @@ sealed class MqttEvent(open var connectionInfo: ConnectionInfo?) { val activeNetInfo: ActiveNetInfo, val serverUri: ServerUri?, val timeTakenMillis: Long, + val connectPacketRTTime: Long, override var connectionInfo: ConnectionInfo? = null ) : MqttEvent(connectionInfo) @@ -165,6 +166,7 @@ sealed class MqttEvent(open var connectionInfo: ConnectionInfo?) { val topic: String, val qos: Int, val sizeBytes: Int, + val timeTakenMillis: Long, override var connectionInfo: ConnectionInfo? = null ) : MqttEvent(connectionInfo) @@ -173,6 +175,7 @@ sealed class MqttEvent(open var connectionInfo: ConnectionInfo?) { val qos: Int, val sizeBytes: Int, val exception: CourierException, + val timeTakenMillis: Long, override var connectionInfo: ConnectionInfo? = null ) : MqttEvent(connectionInfo) diff --git a/paho/src/main/java/org/eclipse/paho/client/mqttv3/IExperimentsConfig.java b/paho/src/main/java/org/eclipse/paho/client/mqttv3/IExperimentsConfig.java index 955afe43..a8502119 100644 --- a/paho/src/main/java/org/eclipse/paho/client/mqttv3/IExperimentsConfig.java +++ b/paho/src/main/java/org/eclipse/paho/client/mqttv3/IExperimentsConfig.java @@ -3,5 +3,7 @@ public interface IExperimentsConfig { int inactivityTimeoutSecs(); + int connectPacketTimeoutSecs(); + Boolean useNewSSLFlow(); } diff --git a/paho/src/main/java/org/eclipse/paho/client/mqttv3/internal/ClientState.java b/paho/src/main/java/org/eclipse/paho/client/mqttv3/internal/ClientState.java index c38cc94a..ea878feb 100644 --- a/paho/src/main/java/org/eclipse/paho/client/mqttv3/internal/ClientState.java +++ b/paho/src/main/java/org/eclipse/paho/client/mqttv3/internal/ClientState.java @@ -45,7 +45,6 @@ import java.io.EOFException; import java.util.Enumeration; import java.util.Hashtable; -import java.util.Map; import java.util.Properties; import java.util.Vector; @@ -150,6 +149,8 @@ public class ClientState private final static String className = ClientState.class.getName(); private long inactivityTimeout = DEFAULT_INACTIVITY_TIMEOUT; + + private long connectPacketTimeout = DEFAULT_INACTIVITY_TIMEOUT; private final static long DEFAULT_INACTIVITY_TIMEOUT = 60 * 1000; private IPahoEvents pahoEvents; @@ -191,6 +192,7 @@ protected ClientState( if (experimentsConfig != null) { inactivityTimeout = experimentsConfig.inactivityTimeoutSecs() * 1000L; + connectPacketTimeout = experimentsConfig.connectPacketTimeoutSecs() * 1000L; } restoreState(); @@ -775,8 +777,13 @@ public void checkActivity() throws MqttException { if (fastReconnectCheckStartTime > lastInboundActivity) { - if(System.currentTimeMillis() - fastReconnectCheckStartTime >= inactivityTimeout) - + long timeout; + if (clientComms.isConnecting()) { + timeout = connectPacketTimeout; + } else { + timeout = inactivityTimeout; + } + if(System.currentTimeMillis() - fastReconnectCheckStartTime >= timeout) { logger.logFastReconnectEvent(fastReconnectCheckStartTime, lastInboundActivity); logger.e(TAG, "not recieved ack for 1 min so disconnecting"); @@ -1345,6 +1352,10 @@ public void disconnected(MqttException reason) // Reset pingOutstanding to allow reconnects to assume no previous ping. pingOutstanding = Boolean.FALSE; } + fastReconnectCheckStartTime = 0; + lastInboundActivity = 0; + lastOutboundActivity = 0; + lastPing = 0; } catch (MqttException e) {