Skip to content

Commit

Permalink
Improve telemetry and fast reconnect logic (#70)
Browse files Browse the repository at this point in the history
* Add connect packet round trip time in connect success event

* Reset fast reconnect variables on disconnect

* Add time taken in message send events

* Add different timeout for CONNACK
  • Loading branch information
deepanshu42 authored Jul 17, 2023
1 parent 1c9ce1c commit e3fe343
Show file tree
Hide file tree
Showing 11 changed files with 97 additions and 41 deletions.
1 change: 1 addition & 0 deletions app/src/main/java/com/gojek/courier/app/ui/MainActivity.kt
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ class MainActivity : AppCompatActivity() {
),
inactivityTimeoutSeconds = 45,
activityCheckIntervalSeconds = 30,
connectPacketTimeoutSeconds = 5,
incomingMessagesTTLSecs = 60,
incomingMessagesCleanupIntervalSecs = 10,
maxInflightMessagesLimit = 1000,
Expand Down
52 changes: 30 additions & 22 deletions mqtt-client/api/mqtt-client.api
Original file line number Diff line number Diff line change
Expand Up @@ -29,22 +29,24 @@ public abstract interface class com/gojek/mqtt/client/MqttInterceptor {

public final class com/gojek/mqtt/client/config/ExperimentConfigs {
public fun <init> ()V
public fun <init> (Lcom/gojek/mqtt/client/config/SubscriptionStore;Lcom/gojek/mqtt/model/AdaptiveKeepAliveConfig;IIIJJZI)V
public synthetic fun <init> (Lcom/gojek/mqtt/client/config/SubscriptionStore;Lcom/gojek/mqtt/model/AdaptiveKeepAliveConfig;IIIJJZIILkotlin/jvm/internal/DefaultConstructorMarker;)V
public fun <init> (Lcom/gojek/mqtt/client/config/SubscriptionStore;Lcom/gojek/mqtt/model/AdaptiveKeepAliveConfig;IIIIJJZI)V
public synthetic fun <init> (Lcom/gojek/mqtt/client/config/SubscriptionStore;Lcom/gojek/mqtt/model/AdaptiveKeepAliveConfig;IIIIJJZIILkotlin/jvm/internal/DefaultConstructorMarker;)V
public final fun component1 ()Lcom/gojek/mqtt/client/config/SubscriptionStore;
public final fun component10 ()I
public final fun component2 ()Lcom/gojek/mqtt/model/AdaptiveKeepAliveConfig;
public final fun component3 ()I
public final fun component4 ()I
public final fun component5 ()I
public final fun component6 ()J
public final fun component6 ()I
public final fun component7 ()J
public final fun component8 ()Z
public final fun component9 ()I
public final fun copy (Lcom/gojek/mqtt/client/config/SubscriptionStore;Lcom/gojek/mqtt/model/AdaptiveKeepAliveConfig;IIIJJZI)Lcom/gojek/mqtt/client/config/ExperimentConfigs;
public static synthetic fun copy$default (Lcom/gojek/mqtt/client/config/ExperimentConfigs;Lcom/gojek/mqtt/client/config/SubscriptionStore;Lcom/gojek/mqtt/model/AdaptiveKeepAliveConfig;IIIJJZIILjava/lang/Object;)Lcom/gojek/mqtt/client/config/ExperimentConfigs;
public final fun component8 ()J
public final fun component9 ()Z
public final fun copy (Lcom/gojek/mqtt/client/config/SubscriptionStore;Lcom/gojek/mqtt/model/AdaptiveKeepAliveConfig;IIIIJJZI)Lcom/gojek/mqtt/client/config/ExperimentConfigs;
public static synthetic fun copy$default (Lcom/gojek/mqtt/client/config/ExperimentConfigs;Lcom/gojek/mqtt/client/config/SubscriptionStore;Lcom/gojek/mqtt/model/AdaptiveKeepAliveConfig;IIIIJJZIILjava/lang/Object;)Lcom/gojek/mqtt/client/config/ExperimentConfigs;
public fun equals (Ljava/lang/Object;)Z
public final fun getActivityCheckIntervalSeconds ()I
public final fun getAdaptiveKeepAliveConfig ()Lcom/gojek/mqtt/model/AdaptiveKeepAliveConfig;
public final fun getConnectPacketTimeoutSeconds ()I
public final fun getInactivityTimeoutSeconds ()I
public final fun getIncomingMessagesCleanupIntervalSecs ()J
public final fun getIncomingMessagesTTLSecs ()J
Expand Down Expand Up @@ -381,16 +383,18 @@ public final class com/gojek/mqtt/event/MqttEvent$MqttConnectFailureEvent : com/
}

public final class com/gojek/mqtt/event/MqttEvent$MqttConnectSuccessEvent : com/gojek/mqtt/event/MqttEvent {
public fun <init> (Lcom/gojek/mqtt/network/ActiveNetInfo;Lcom/gojek/mqtt/model/ServerUri;JLcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;)V
public synthetic fun <init> (Lcom/gojek/mqtt/network/ActiveNetInfo;Lcom/gojek/mqtt/model/ServerUri;JLcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
public fun <init> (Lcom/gojek/mqtt/network/ActiveNetInfo;Lcom/gojek/mqtt/model/ServerUri;JJLcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;)V
public synthetic fun <init> (Lcom/gojek/mqtt/network/ActiveNetInfo;Lcom/gojek/mqtt/model/ServerUri;JJLcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
public final fun component1 ()Lcom/gojek/mqtt/network/ActiveNetInfo;
public final fun component2 ()Lcom/gojek/mqtt/model/ServerUri;
public final fun component3 ()J
public final fun component4 ()Lcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;
public final fun copy (Lcom/gojek/mqtt/network/ActiveNetInfo;Lcom/gojek/mqtt/model/ServerUri;JLcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;)Lcom/gojek/mqtt/event/MqttEvent$MqttConnectSuccessEvent;
public static synthetic fun copy$default (Lcom/gojek/mqtt/event/MqttEvent$MqttConnectSuccessEvent;Lcom/gojek/mqtt/network/ActiveNetInfo;Lcom/gojek/mqtt/model/ServerUri;JLcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;ILjava/lang/Object;)Lcom/gojek/mqtt/event/MqttEvent$MqttConnectSuccessEvent;
public final fun component4 ()J
public final fun component5 ()Lcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;
public final fun copy (Lcom/gojek/mqtt/network/ActiveNetInfo;Lcom/gojek/mqtt/model/ServerUri;JJLcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;)Lcom/gojek/mqtt/event/MqttEvent$MqttConnectSuccessEvent;
public static synthetic fun copy$default (Lcom/gojek/mqtt/event/MqttEvent$MqttConnectSuccessEvent;Lcom/gojek/mqtt/network/ActiveNetInfo;Lcom/gojek/mqtt/model/ServerUri;JJLcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;ILjava/lang/Object;)Lcom/gojek/mqtt/event/MqttEvent$MqttConnectSuccessEvent;
public fun equals (Ljava/lang/Object;)Z
public final fun getActiveNetInfo ()Lcom/gojek/mqtt/network/ActiveNetInfo;
public final fun getConnectPacketRTTime ()J
public fun getConnectionInfo ()Lcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;
public final fun getServerUri ()Lcom/gojek/mqtt/model/ServerUri;
public final fun getTimeTakenMillis ()J
Expand Down Expand Up @@ -520,39 +524,43 @@ public final class com/gojek/mqtt/event/MqttEvent$MqttMessageSendEvent : com/goj
}

public final class com/gojek/mqtt/event/MqttEvent$MqttMessageSendFailureEvent : com/gojek/mqtt/event/MqttEvent {
public fun <init> (Ljava/lang/String;IILcom/gojek/mqtt/exception/CourierException;Lcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;)V
public synthetic fun <init> (Ljava/lang/String;IILcom/gojek/mqtt/exception/CourierException;Lcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
public fun <init> (Ljava/lang/String;IILcom/gojek/mqtt/exception/CourierException;JLcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;)V
public synthetic fun <init> (Ljava/lang/String;IILcom/gojek/mqtt/exception/CourierException;JLcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
public final fun component1 ()Ljava/lang/String;
public final fun component2 ()I
public final fun component3 ()I
public final fun component4 ()Lcom/gojek/mqtt/exception/CourierException;
public final fun component5 ()Lcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;
public final fun copy (Ljava/lang/String;IILcom/gojek/mqtt/exception/CourierException;Lcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;)Lcom/gojek/mqtt/event/MqttEvent$MqttMessageSendFailureEvent;
public static synthetic fun copy$default (Lcom/gojek/mqtt/event/MqttEvent$MqttMessageSendFailureEvent;Ljava/lang/String;IILcom/gojek/mqtt/exception/CourierException;Lcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;ILjava/lang/Object;)Lcom/gojek/mqtt/event/MqttEvent$MqttMessageSendFailureEvent;
public final fun component5 ()J
public final fun component6 ()Lcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;
public final fun copy (Ljava/lang/String;IILcom/gojek/mqtt/exception/CourierException;JLcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;)Lcom/gojek/mqtt/event/MqttEvent$MqttMessageSendFailureEvent;
public static synthetic fun copy$default (Lcom/gojek/mqtt/event/MqttEvent$MqttMessageSendFailureEvent;Ljava/lang/String;IILcom/gojek/mqtt/exception/CourierException;JLcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;ILjava/lang/Object;)Lcom/gojek/mqtt/event/MqttEvent$MqttMessageSendFailureEvent;
public fun equals (Ljava/lang/Object;)Z
public fun getConnectionInfo ()Lcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;
public final fun getException ()Lcom/gojek/mqtt/exception/CourierException;
public final fun getQos ()I
public final fun getSizeBytes ()I
public final fun getTimeTakenMillis ()J
public final fun getTopic ()Ljava/lang/String;
public fun hashCode ()I
public fun setConnectionInfo (Lcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;)V
public fun toString ()Ljava/lang/String;
}

public final class com/gojek/mqtt/event/MqttEvent$MqttMessageSendSuccessEvent : com/gojek/mqtt/event/MqttEvent {
public fun <init> (Ljava/lang/String;IILcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;)V
public synthetic fun <init> (Ljava/lang/String;IILcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
public fun <init> (Ljava/lang/String;IIJLcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;)V
public synthetic fun <init> (Ljava/lang/String;IIJLcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
public final fun component1 ()Ljava/lang/String;
public final fun component2 ()I
public final fun component3 ()I
public final fun component4 ()Lcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;
public final fun copy (Ljava/lang/String;IILcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;)Lcom/gojek/mqtt/event/MqttEvent$MqttMessageSendSuccessEvent;
public static synthetic fun copy$default (Lcom/gojek/mqtt/event/MqttEvent$MqttMessageSendSuccessEvent;Ljava/lang/String;IILcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;ILjava/lang/Object;)Lcom/gojek/mqtt/event/MqttEvent$MqttMessageSendSuccessEvent;
public final fun component4 ()J
public final fun component5 ()Lcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;
public final fun copy (Ljava/lang/String;IIJLcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;)Lcom/gojek/mqtt/event/MqttEvent$MqttMessageSendSuccessEvent;
public static synthetic fun copy$default (Lcom/gojek/mqtt/event/MqttEvent$MqttMessageSendSuccessEvent;Ljava/lang/String;IIJLcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;ILjava/lang/Object;)Lcom/gojek/mqtt/event/MqttEvent$MqttMessageSendSuccessEvent;
public fun equals (Ljava/lang/Object;)Z
public fun getConnectionInfo ()Lcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;
public final fun getQos ()I
public final fun getSizeBytes ()I
public final fun getTimeTakenMillis ()J
public final fun getTopic ()Ljava/lang/String;
public fun hashCode ()I
public fun setConnectionInfo (Lcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;)V
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ data class ExperimentConfigs(
val adaptiveKeepAliveConfig: AdaptiveKeepAliveConfig? = null,
val activityCheckIntervalSeconds: Int = DEFAULT_ACTIVITY_CHECK_INTERVAL_SECS,
val inactivityTimeoutSeconds: Int = DEFAULT_INACTIVITY_TIMEOUT_SECS,
val connectPacketTimeoutSeconds: Int = DEFAULT_INACTIVITY_TIMEOUT_SECS,
val policyResetTimeSeconds: Int = DEFAULT_POLICY_RESET_TIME_SECS,
val incomingMessagesTTLSecs: Long = 360,
val incomingMessagesCleanupIntervalSecs: Long = 60,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.gojek.mqtt.client.event.adapter

import com.gojek.courier.QoS
import com.gojek.courier.extensions.fromNanosToMillis
import com.gojek.mqtt.connection.event.ConnectionEventHandler
import com.gojek.mqtt.event.EventHandler
import com.gojek.mqtt.event.MqttEvent.ConnectPacketSendEvent
Expand Down Expand Up @@ -34,6 +35,9 @@ internal class MqttClientEventAdapter(
private val eventHandler: EventHandler,
private val networkHandler: NetworkHandler
) {

private var connectPacketSendTime = 0L

fun adapt(): ConnectionEventHandler {
return object : ConnectionEventHandler {
override fun onMqttConnectAttempt(
Expand All @@ -57,7 +61,8 @@ internal class MqttClientEventAdapter(
MqttConnectSuccessEvent(
activeNetInfo = networkHandler.getActiveNetworkInfo(),
serverUri = serverUri,
timeTakenMillis = timeTakenMillis
timeTakenMillis = timeTakenMillis,
connectPacketRTTime = (System.nanoTime() - connectPacketSendTime).fromNanosToMillis()
)
)
}
Expand Down Expand Up @@ -207,6 +212,7 @@ internal class MqttClientEventAdapter(
}

override fun onConnectPacketSend() {
connectPacketSendTime = System.nanoTime()
eventHandler.onEvent(ConnectPacketSendEvent())
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ internal data class MqttSendPacket(
var qos: Int,
var topic: String,
var type: Int,
var triggerTime: Long,
var sendMessageCallback: SendMessageCallback
) : Parcelable {
constructor(parcel: Parcel) : this(
Expand All @@ -21,6 +22,7 @@ internal data class MqttSendPacket(
parcel.readInt(),
parcel.readString()!!,
parcel.readInt(),
parcel.readLong(),
NoOpSendMessageCallback
)

Expand All @@ -31,6 +33,7 @@ internal data class MqttSendPacket(
parcel.writeInt(qos)
parcel.writeString(topic)
parcel.writeInt(type)
parcel.writeLong(triggerTime)
}

override fun describeContents(): Int {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,8 @@ internal class AndroidMqttClient(
persistenceOptions = mqttConfiguration.persistenceOptions,
inactivityTimeoutSeconds = experimentConfigs.inactivityTimeoutSeconds,
policyResetTimeSeconds = experimentConfigs.policyResetTimeSeconds,
shouldUseNewSSLFlow = experimentConfigs.shouldUseNewSSLFlow
shouldUseNewSSLFlow = experimentConfigs.shouldUseNewSSLFlow,
connectPacketTimeoutSeconds = experimentConfigs.connectPacketTimeoutSeconds
)

mqttConnection = MqttConnection(
Expand Down Expand Up @@ -239,7 +240,6 @@ internal class AndroidMqttClient(
if (!isConnected()) {
connectMqtt()
}

try {
logger.d(
TAG,
Expand All @@ -261,6 +261,7 @@ internal class AndroidMqttClient(
topic = topic,
qos = qos,
sizeBytes = message.size,
timeTakenMillis = (System.nanoTime() - triggerTime).fromNanosToMillis(),
exception = e.toCourierException()
)
)
Expand All @@ -273,6 +274,7 @@ internal class AndroidMqttClient(
topic = topic,
qos = qos,
sizeBytes = message.size,
timeTakenMillis = (System.nanoTime() - triggerTime).fromNanosToMillis(),
exception = e.toCourierException()
)
)
Expand All @@ -287,6 +289,7 @@ internal class AndroidMqttClient(
topic = topic,
qos = qos,
sizeBytes = message.size,
timeTakenMillis = (System.nanoTime() - triggerTime).fromNanosToMillis(),
exception = e.toCourierException()
)
)
Expand All @@ -297,13 +300,14 @@ internal class AndroidMqttClient(
// This can be invoked on any thread
override fun send(mqttPacket: MqttPacket, sendMessageCallback: SendMessageCallback): Boolean {
val mqttSendPacket = MqttSendPacket(
mqttPacket.message,
0,
System.currentTimeMillis(),
mqttPacket.qos.value,
mqttPacket.topic,
mqttPacket.qos.type,
sendMessageCallback
message = mqttPacket.message,
messageId = 0,
timestamp = System.currentTimeMillis(),
qos = mqttPacket.qos.value,
topic = mqttPacket.topic,
type = mqttPacket.qos.type,
triggerTime = System.nanoTime(),
sendMessageCallback = sendMessageCallback
)

val msg = Message.obtain()
Expand Down Expand Up @@ -587,16 +591,28 @@ internal class AndroidMqttClient(
with(packet) {
eventHandler.onEvent(
MqttMessageSendSuccessEvent(
topic,
qos,
message.size
topic = topic,
qos = qos,
sizeBytes = message.size,
timeTakenMillis = (System.nanoTime() - triggerTime).fromNanosToMillis()
)
)
}
}

override fun onFailure(packet: MqttSendPacket, exception: Throwable) {
packet.sendMessageCallback.onMessageSendFailure(exception)
with(packet) {
eventHandler.onEvent(
MqttMessageSendFailureEvent(
topic = topic,
qos = qos,
sizeBytes = message.size,
timeTakenMillis = (System.nanoTime() - triggerTime).fromNanosToMillis(),
exception = exception.toCourierException()
)
)
}
runnableScheduler.connectMqtt()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ internal class MqttConnection(
connectOptions.keepAlive.isOptimal,
serverUri
)
mqtt!!.connect(options, null, getConnectListener(subscriptionTopicMap))
mqtt!!.connect(options, null, getConnectListener())
runnableScheduler.scheduleNextActivityCheck()
} catch (e: MqttSecurityException) {
logger.e(TAG, "mqtt security exception while connecting $e")
Expand Down Expand Up @@ -407,7 +407,7 @@ internal class MqttConnection(
return mqttAsyncClient
}

private fun getConnectListener(subscriptionTopicMap: Map<String, QoS>): IMqttActionListener? {
private fun getConnectListener(): IMqttActionListener {
return object : IMqttActionListener {
override fun onSuccess(iMqttToken: IMqttToken) {
try {
Expand Down Expand Up @@ -673,6 +673,10 @@ internal class MqttConnection(
return connectionConfig.inactivityTimeoutSeconds
}

override fun connectPacketTimeoutSecs(): Int {
return connectionConfig.connectPacketTimeoutSeconds
}

override fun useNewSSLFlow(): Boolean {
return connectionConfig.shouldUseNewSSLFlow
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,6 @@ internal data class ConnectionConfig(
val persistenceOptions: PersistenceOptions,
val inactivityTimeoutSeconds: Int,
val policyResetTimeSeconds: Int,
val shouldUseNewSSLFlow: Boolean
val shouldUseNewSSLFlow: Boolean,
val connectPacketTimeoutSeconds: Int
)
3 changes: 3 additions & 0 deletions mqtt-client/src/main/java/com/gojek/mqtt/event/MqttEvent.kt
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ sealed class MqttEvent(open var connectionInfo: ConnectionInfo?) {
val activeNetInfo: ActiveNetInfo,
val serverUri: ServerUri?,
val timeTakenMillis: Long,
val connectPacketRTTime: Long,
override var connectionInfo: ConnectionInfo? = null
) : MqttEvent(connectionInfo)

Expand Down Expand Up @@ -165,6 +166,7 @@ sealed class MqttEvent(open var connectionInfo: ConnectionInfo?) {
val topic: String,
val qos: Int,
val sizeBytes: Int,
val timeTakenMillis: Long,
override var connectionInfo: ConnectionInfo? = null
) : MqttEvent(connectionInfo)

Expand All @@ -173,6 +175,7 @@ sealed class MqttEvent(open var connectionInfo: ConnectionInfo?) {
val qos: Int,
val sizeBytes: Int,
val exception: CourierException,
val timeTakenMillis: Long,
override var connectionInfo: ConnectionInfo? = null
) : MqttEvent(connectionInfo)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,7 @@
public interface IExperimentsConfig {
int inactivityTimeoutSecs();

int connectPacketTimeoutSecs();

Boolean useNewSSLFlow();
}
Loading

0 comments on commit e3fe343

Please sign in to comment.