Skip to content

Commit

Permalink
Add connect retry policy for auth failures
Browse files Browse the repository at this point in the history
  • Loading branch information
deepanshu42 committed Jul 26, 2023
1 parent 98d85d3 commit 8f852ce
Show file tree
Hide file tree
Showing 7 changed files with 35 additions and 33 deletions.
10 changes: 4 additions & 6 deletions mqtt-client/api/mqtt-client.api
Original file line number Diff line number Diff line change
Expand Up @@ -1151,18 +1151,16 @@ public final class com/gojek/mqtt/policies/connectretrytime/ConnectRetryTimeConf

public final class com/gojek/mqtt/policies/connectretrytime/ConnectRetryTimePolicy : com/gojek/mqtt/policies/connectretrytime/IConnectRetryTimePolicy {
public fun <init> (Lcom/gojek/mqtt/policies/connectretrytime/ConnectRetryTimeConfig;)V
public fun getConnRetryTimeSecs ()I
public fun getConnRetryTimeSecs (Z)I
public fun getCurrentRetryTime ()I
public fun getRetryCount ()I
public fun resetParams ()V
}

public abstract interface class com/gojek/mqtt/policies/connectretrytime/IConnectRetryTimePolicy : com/gojek/mqtt/policies/IFallbackPolicy {
public abstract fun getConnRetryTimeSecs ()I
public abstract fun getConnRetryTimeSecs (Z)I
public abstract fun getCurrentRetryTime ()I
public abstract fun getRetryCount ()I
}

public final class com/gojek/mqtt/policies/connectretrytime/IConnectRetryTimePolicy$DefaultImpls {
public static synthetic fun getConnRetryTimeSecs$default (Lcom/gojek/mqtt/policies/connectretrytime/IConnectRetryTimePolicy;ZILjava/lang/Object;)I
}

public final class com/gojek/mqtt/policies/connecttimeout/ConnectTimeoutConfig {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,13 +120,19 @@ internal class MqttExceptionHandlerImpl(
*/
}
MqttException.REASON_CODE_FAILED_AUTHENTICATION -> {
runnableScheduler.scheduleAuthFailureRunnable()
runnableScheduler.scheduleAuthFailureRunnable(
connectRetryTimePolicy.getConnRetryTimeSecs(true) * 1000L
)
}
MqttException.REASON_CODE_NOT_AUTHORIZED -> {
runnableScheduler.scheduleAuthFailureRunnable()
runnableScheduler.scheduleAuthFailureRunnable(
connectRetryTimePolicy.getConnRetryTimeSecs(true) * 1000L
)
}
MqttException.REASON_CODE_INVALID_CONNECT_OPTIONS -> {
runnableScheduler.scheduleAuthFailureRunnable()
runnableScheduler.scheduleAuthFailureRunnable(
connectRetryTimePolicy.getConnRetryTimeSecs(true) * 1000L
)
}
MqttException.REASON_CODE_INVALID_CLIENT_ID -> {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,16 @@ class ConnectRetryTimePolicy(
private val reconnectTime: AtomicInteger = AtomicInteger(0)
private val retryCount: AtomicInteger = AtomicInteger(0)

// this function works on exponential retrying
override fun getConnRetryTimeSecs(forceExp: Boolean): Int {
override fun getConnRetryTimeSecs(isAuthFailure: Boolean): Int {
if (isAuthFailure) {
return 0
}
val maxRetryCount = connectRetryTimeConfig.maxRetryCount
val reconnectTimeFixed = connectRetryTimeConfig.reconnectTimeFixed
val reconnectTimeRandom = connectRetryTimeConfig.reconnectTimeRandom
val maxReconnectTime = connectRetryTimeConfig.maxReconnectTime
val random = Random()
if ((reconnectTime.get() == 0 || retryCount.get() < maxRetryCount) && !forceExp) {
if ((reconnectTime.get() == 0 || retryCount.get() < maxRetryCount)) {
reconnectTime.set(reconnectTimeFixed + random.nextInt(reconnectTimeRandom) + 1)
retryCount.getAndIncrement()
} else {
Expand All @@ -32,18 +34,6 @@ class ConnectRetryTimePolicy(
return reconnectTime.get()
}

override fun getRetryCount(): Int {
return retryCount.get()
}

override fun getConnRetryTimeSecs(): Int {
return getConnRetryTimeSecs(false)
}

override fun getCurrentRetryTime(): Int {
return reconnectTime.get()
}

private fun updateReconnectTimeExponentially(exponentialFactor: Int): Int {
while (true) {
val current = reconnectTime.get()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,5 @@ package com.gojek.mqtt.policies.connectretrytime
import com.gojek.mqtt.policies.IFallbackPolicy

interface IConnectRetryTimePolicy : IFallbackPolicy {
fun getConnRetryTimeSecs(): Int
fun getConnRetryTimeSecs(forceExp: Boolean): Int
fun getRetryCount(): Int
fun getCurrentRetryTime(): Int
fun getConnRetryTimeSecs(isAuthFailure: Boolean = false): Int
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,5 @@ internal interface IRunnableScheduler {

fun scheduleResetParams(delayMillis: Long)

fun scheduleAuthFailureRunnable()
fun scheduleAuthFailureRunnable(delayMillis: Long)
}
Original file line number Diff line number Diff line change
Expand Up @@ -144,11 +144,11 @@ internal class MqttRunnableScheduler(
}
}

override fun scheduleAuthFailureRunnable() {
override fun scheduleAuthFailureRunnable(delayMillis: Long) {
try {
sendThreadEventIfNotAlive()
mqttThreadHandler.removeCallbacks(authFailureRunnable)
mqttThreadHandler.post(authFailureRunnable)
mqttThreadHandler.postDelayed(authFailureRunnable, delayMillis)
} catch (ex: Exception) {
logger.e(TAG, "Exception while scheduleAuthFailureRunnable", ex)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.eclipse.paho.client.mqttv3.MqttException.REASON_CODE_CONNECTION_LOST
import org.eclipse.paho.client.mqttv3.MqttException.REASON_CODE_CONNECT_IN_PROGRESS
import org.eclipse.paho.client.mqttv3.MqttException.REASON_CODE_FAILED_AUTHENTICATION
import org.eclipse.paho.client.mqttv3.MqttException.REASON_CODE_INVALID_CLIENT_ID
import org.eclipse.paho.client.mqttv3.MqttException.REASON_CODE_INVALID_CONNECT_OPTIONS
import org.eclipse.paho.client.mqttv3.MqttException.REASON_CODE_INVALID_MESSAGE
import org.eclipse.paho.client.mqttv3.MqttException.REASON_CODE_INVALID_PROTOCOL_VERSION
import org.eclipse.paho.client.mqttv3.MqttException.REASON_CODE_MAX_INFLIGHT
Expand Down Expand Up @@ -146,15 +147,25 @@ class MqttExceptionHandlerImplTest {
@Test
fun `test exception with reason code 4`() {
val exception = MqttException(REASON_CODE_FAILED_AUTHENTICATION.toInt())
whenever(connectRetryTimePolicy.getConnRetryTimeSecs(true)).thenReturn(20)
mqttExceptionHandlerImpl.handleException(exception, true)
verify(runnableScheduler).scheduleAuthFailureRunnable()
verify(runnableScheduler).scheduleAuthFailureRunnable(20000)
}

@Test
fun `test exception with reason code 5`() {
val exception = MqttException(REASON_CODE_NOT_AUTHORIZED.toInt())
whenever(connectRetryTimePolicy.getConnRetryTimeSecs(true)).thenReturn(20)
mqttExceptionHandlerImpl.handleException(exception, true)
verify(runnableScheduler).scheduleAuthFailureRunnable()
verify(runnableScheduler).scheduleAuthFailureRunnable(20000)
}

@Test
fun `test exception with reason code 32205`() {
val exception = MqttException(REASON_CODE_INVALID_CONNECT_OPTIONS.toInt())
whenever(connectRetryTimePolicy.getConnRetryTimeSecs(true)).thenReturn(20)
mqttExceptionHandlerImpl.handleException(exception, true)
verify(runnableScheduler).scheduleAuthFailureRunnable(20000)
}

@Test
Expand Down

0 comments on commit 8f852ce

Please sign in to comment.