Skip to content

Commit

Permalink
Create new implementation of PersistableSubscriptionStore (#34)
Browse files Browse the repository at this point in the history
  • Loading branch information
deepanshu42 authored Jul 18, 2022
1 parent 4ea455b commit a789796
Show file tree
Hide file tree
Showing 7 changed files with 281 additions and 18 deletions.
20 changes: 14 additions & 6 deletions mqtt-client/api/mqtt-client.api
Original file line number Diff line number Diff line change
Expand Up @@ -26,26 +26,26 @@ public abstract interface class com/gojek/mqtt/client/MqttInterceptor {

public final class com/gojek/mqtt/client/config/ExperimentConfigs {
public fun <init> ()V
public fun <init> (ZLcom/gojek/mqtt/model/AdaptiveKeepAliveConfig;IIIJJ)V
public synthetic fun <init> (ZLcom/gojek/mqtt/model/AdaptiveKeepAliveConfig;IIIJJILkotlin/jvm/internal/DefaultConstructorMarker;)V
public final fun component1 ()Z
public fun <init> (Lcom/gojek/mqtt/client/config/SubscriptionStore;Lcom/gojek/mqtt/model/AdaptiveKeepAliveConfig;IIIJJ)V
public synthetic fun <init> (Lcom/gojek/mqtt/client/config/SubscriptionStore;Lcom/gojek/mqtt/model/AdaptiveKeepAliveConfig;IIIJJILkotlin/jvm/internal/DefaultConstructorMarker;)V
public final fun component1 ()Lcom/gojek/mqtt/client/config/SubscriptionStore;
public final fun component2 ()Lcom/gojek/mqtt/model/AdaptiveKeepAliveConfig;
public final fun component3 ()I
public final fun component4 ()I
public final fun component5 ()I
public final fun component6 ()J
public final fun component7 ()J
public final fun copy (ZLcom/gojek/mqtt/model/AdaptiveKeepAliveConfig;IIIJJ)Lcom/gojek/mqtt/client/config/ExperimentConfigs;
public static synthetic fun copy$default (Lcom/gojek/mqtt/client/config/ExperimentConfigs;ZLcom/gojek/mqtt/model/AdaptiveKeepAliveConfig;IIIJJILjava/lang/Object;)Lcom/gojek/mqtt/client/config/ExperimentConfigs;
public final fun copy (Lcom/gojek/mqtt/client/config/SubscriptionStore;Lcom/gojek/mqtt/model/AdaptiveKeepAliveConfig;IIIJJ)Lcom/gojek/mqtt/client/config/ExperimentConfigs;
public static synthetic fun copy$default (Lcom/gojek/mqtt/client/config/ExperimentConfigs;Lcom/gojek/mqtt/client/config/SubscriptionStore;Lcom/gojek/mqtt/model/AdaptiveKeepAliveConfig;IIIJJILjava/lang/Object;)Lcom/gojek/mqtt/client/config/ExperimentConfigs;
public fun equals (Ljava/lang/Object;)Z
public final fun getActivityCheckIntervalSeconds ()I
public final fun getAdaptiveKeepAliveConfig ()Lcom/gojek/mqtt/model/AdaptiveKeepAliveConfig;
public final fun getInactivityTimeoutSeconds ()I
public final fun getIncomingMessagesCleanupIntervalSecs ()J
public final fun getIncomingMessagesTTLSecs ()J
public final fun getPolicyResetTimeSeconds ()I
public final fun getSubscriptionStore ()Lcom/gojek/mqtt/client/config/SubscriptionStore;
public fun hashCode ()I
public final fun isPersistentSubscriptionStoreEnabled ()Z
public fun toString ()Ljava/lang/String;
}

Expand Down Expand Up @@ -85,6 +85,14 @@ public final class com/gojek/mqtt/client/config/PersistenceOptions$PahoPersisten
public fun toString ()Ljava/lang/String;
}

public final class com/gojek/mqtt/client/config/SubscriptionStore : java/lang/Enum {
public static final field IN_MEMORY Lcom/gojek/mqtt/client/config/SubscriptionStore;
public static final field PERSISTABLE Lcom/gojek/mqtt/client/config/SubscriptionStore;
public static final field PERSISTABLE_V2 Lcom/gojek/mqtt/client/config/SubscriptionStore;
public static fun valueOf (Ljava/lang/String;)Lcom/gojek/mqtt/client/config/SubscriptionStore;
public static fun values ()[Lcom/gojek/mqtt/client/config/SubscriptionStore;
}

public final class com/gojek/mqtt/client/config/v3/MqttV3Configuration : com/gojek/mqtt/client/config/MqttConfiguration {
public fun <init> (Lcom/gojek/mqtt/policies/connectretrytime/IConnectRetryTimePolicy;Lcom/gojek/mqtt/policies/connecttimeout/IConnectTimeoutPolicy;Lcom/gojek/mqtt/policies/subscriptionretry/ISubscriptionRetryPolicy;Lcom/gojek/mqtt/policies/subscriptionretry/ISubscriptionRetryPolicy;ILjavax/net/SocketFactory;Lcom/gojek/courier/logging/ILogger;Lcom/gojek/mqtt/auth/Authenticator;Lcom/gojek/mqtt/exception/handler/v3/AuthFailureHandler;Lcom/gojek/mqtt/event/EventHandler;Lcom/gojek/mqtt/pingsender/MqttPingSender;Ljava/util/List;Lcom/gojek/mqtt/client/config/PersistenceOptions;Lcom/gojek/mqtt/client/config/ExperimentConfigs;)V
public synthetic fun <init> (Lcom/gojek/mqtt/policies/connectretrytime/IConnectRetryTimePolicy;Lcom/gojek/mqtt/policies/connecttimeout/IConnectTimeoutPolicy;Lcom/gojek/mqtt/policies/subscriptionretry/ISubscriptionRetryPolicy;Lcom/gojek/mqtt/policies/subscriptionretry/ISubscriptionRetryPolicy;ILjavax/net/SocketFactory;Lcom/gojek/courier/logging/ILogger;Lcom/gojek/mqtt/auth/Authenticator;Lcom/gojek/mqtt/exception/handler/v3/AuthFailureHandler;Lcom/gojek/mqtt/event/EventHandler;Lcom/gojek/mqtt/pingsender/MqttPingSender;Ljava/util/List;Lcom/gojek/mqtt/client/config/PersistenceOptions;Lcom/gojek/mqtt/client/config/ExperimentConfigs;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,21 @@
package com.gojek.mqtt.client.config

import com.gojek.mqtt.client.config.SubscriptionStore.PERSISTABLE
import com.gojek.mqtt.constants.DEFAULT_ACTIVITY_CHECK_INTERVAL_SECS
import com.gojek.mqtt.constants.DEFAULT_INACTIVITY_TIMEOUT_SECS
import com.gojek.mqtt.constants.DEFAULT_POLICY_RESET_TIME_SECS
import com.gojek.mqtt.model.AdaptiveKeepAliveConfig

data class ExperimentConfigs(
val isPersistentSubscriptionStoreEnabled: Boolean = true,
val subscriptionStore: SubscriptionStore = PERSISTABLE,
val adaptiveKeepAliveConfig: AdaptiveKeepAliveConfig? = null,
val activityCheckIntervalSeconds: Int = DEFAULT_ACTIVITY_CHECK_INTERVAL_SECS,
val inactivityTimeoutSeconds: Int = DEFAULT_INACTIVITY_TIMEOUT_SECS,
val policyResetTimeSeconds: Int = DEFAULT_POLICY_RESET_TIME_SECS,
val incomingMessagesTTLSecs: Long = 360,
val incomingMessagesCleanupIntervalSecs: Long = 60
)

enum class SubscriptionStore {
IN_MEMORY, PERSISTABLE, PERSISTABLE_V2
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ import com.gojek.mqtt.client.IClientSchedulerBridge
import com.gojek.mqtt.client.IMessageReceiveListener
import com.gojek.mqtt.client.IncomingMsgController
import com.gojek.mqtt.client.IncomingMsgControllerImpl
import com.gojek.mqtt.client.config.SubscriptionStore.IN_MEMORY
import com.gojek.mqtt.client.config.SubscriptionStore.PERSISTABLE
import com.gojek.mqtt.client.config.SubscriptionStore.PERSISTABLE_V2
import com.gojek.mqtt.client.config.v3.MqttV3Configuration
import com.gojek.mqtt.client.connectioninfo.ConnectionInfo
import com.gojek.mqtt.client.connectioninfo.ConnectionInfoStore
Expand Down Expand Up @@ -66,6 +69,7 @@ import com.gojek.mqtt.scheduler.MqttRunnableScheduler
import com.gojek.mqtt.send.listener.IMessageSendListener
import com.gojek.mqtt.subscription.InMemorySubscriptionStore
import com.gojek.mqtt.subscription.PersistableSubscriptionStore
import com.gojek.mqtt.subscription.PersistableSubscriptionStoreV2
import com.gojek.mqtt.subscription.SubscriptionStore
import com.gojek.mqtt.utils.MqttUtils
import com.gojek.mqtt.utils.NetworkUtils
Expand Down Expand Up @@ -115,10 +119,10 @@ internal class AndroidMqttClient(
private var forceRefresh = false

private val subscriptionStore: SubscriptionStore =
if (experimentConfigs.isPersistentSubscriptionStoreEnabled) {
PersistableSubscriptionStore(context)
} else {
InMemorySubscriptionStore()
when (experimentConfigs.subscriptionStore) {
IN_MEMORY -> InMemorySubscriptionStore()
PERSISTABLE -> PersistableSubscriptionStore(context)
PERSISTABLE_V2 -> PersistableSubscriptionStoreV2(context)
}

private var hostFallbackPolicy: IHostFallbackPolicy? = null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ internal class PersistableSubscriptionStore(context: Context) : SubscriptionStor
}
}

private class Persistence(context: Context) {
val sharedPreferences: SharedPreferences =
internal class Persistence(context: Context) {
private val sharedPreferences: SharedPreferences =
context.getSharedPreferences("SubscriptionStorePrefs", MODE_PRIVATE)

fun get(key: String, default: Set<String>): Set<String> {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package com.gojek.mqtt.subscription

import android.content.Context
import androidx.annotation.VisibleForTesting
import com.gojek.courier.QoS
import com.gojek.courier.extensions.toImmutableMap
import com.gojek.courier.extensions.toImmutableSet

internal class PersistableSubscriptionStoreV2(context: Context) : SubscriptionStore {
private lateinit var state: State
private val persistence = Persistence(context)
private val listener = object : SubscriptionStoreListener {
override fun onTopicsUnsubscribed(topics: Set<String>) {
onTopicsUnsubscribedInternal(topics)
}
}

private data class State(
val subscriptionTopics: Map<String, QoS>,
val pendingUnsubscribeTopics: Set<String>
)

init {
restoreState()
}

@Synchronized
override fun getSubscribeTopics(): Map<String, QoS> {
return state.subscriptionTopics.toImmutableMap()
}

@Synchronized
override fun getUnsubscribeTopics(cleanSession: Boolean): Set<String> {
if (cleanSession) {
persistence.put(PREF_KEY_PENDING_UNSUBSCRIBES, emptySet())
state = state.copy(pendingUnsubscribeTopics = emptySet())
}
return state.pendingUnsubscribeTopics.toImmutableSet()
}

@Synchronized
override fun subscribeTopics(topicMap: Map<String, QoS>): Map<String, QoS> {
state = state.copy(
subscriptionTopics = state.subscriptionTopics + topicMap,
pendingUnsubscribeTopics = state.pendingUnsubscribeTopics - topicMap.keys
)
persistence.put(PREF_KEY_PENDING_UNSUBSCRIBES, state.pendingUnsubscribeTopics)
return topicMap
}

@Synchronized
override fun unsubscribeTopics(topics: List<String>): Set<String> {
state = state.copy(
subscriptionTopics = state.subscriptionTopics - topics,
pendingUnsubscribeTopics = state.pendingUnsubscribeTopics + topics
)
persistence.put(PREF_KEY_PENDING_UNSUBSCRIBES, state.pendingUnsubscribeTopics)
return topics.toSet()
}

override fun getListener(): SubscriptionStoreListener {
return listener
}

@Synchronized
override fun clear() {
persistence.put(PREF_KEY_PENDING_UNSUBSCRIBES, emptySet())
state = State(
subscriptionTopics = emptyMap(),
pendingUnsubscribeTopics = emptySet()
)
}

@VisibleForTesting
internal fun restoreState() {
state = State(
subscriptionTopics = emptyMap(),
pendingUnsubscribeTopics = persistence.get(PREF_KEY_PENDING_UNSUBSCRIBES, emptySet())
)
}

@Synchronized
private fun onTopicsUnsubscribedInternal(topics: Set<String>) {
state = state.copy(
subscriptionTopics = state.subscriptionTopics,
pendingUnsubscribeTopics = state.pendingUnsubscribeTopics - topics
)
persistence.put(PREF_KEY_PENDING_UNSUBSCRIBES, state.pendingUnsubscribeTopics)
}
}

private const val PREF_KEY_PENDING_UNSUBSCRIBES = "PendingUnsubscribes"
Original file line number Diff line number Diff line change
Expand Up @@ -60,26 +60,34 @@ class PersistableSubscriptionStoreTest {
val topic5 = "topic5" to QoS.ONE

// Test when topic1, topic2, topic3 are subscribed for the first time
subscriptionStore.subscribeTopics(mapOf(topic1, topic2, topic3))
var topicMap = mapOf(topic1, topic2, topic3)
var subscribeTopics = subscriptionStore.subscribeTopics(topicMap)
assertEquals(topicMap, subscribeTopics)
assertEquals(subscriptionStore.getSubscribeTopics().size, 3)
assertEquals(subscriptionStore.getSubscribeTopics(), mapOf(topic1, topic2, topic3))
assertEquals(subscriptionStore.getUnsubscribeTopics(false).size, 0)

// Test subscribing topic3 when its already subscribed
subscriptionStore.subscribeTopics(mapOf(topic3))
topicMap = mapOf(topic3)
subscribeTopics = subscriptionStore.subscribeTopics(topicMap)
assert(subscribeTopics.isEmpty())
assertEquals(subscriptionStore.getSubscribeTopics().size, 3)
assertEquals(subscriptionStore.getSubscribeTopics(), mapOf(topic1, topic2, topic3))
assertEquals(subscriptionStore.getUnsubscribeTopics(false).size, 0)

// Test unsubscribing topic2 when its subscribed
subscriptionStore.unsubscribeTopics(listOf(topic2.first))
var topics = listOf(topic2.first)
var unsubscribeTopics = subscriptionStore.unsubscribeTopics(topics)
assertEquals(topics, unsubscribeTopics.toList())
assertEquals(subscriptionStore.getSubscribeTopics().size, 2)
assertEquals(subscriptionStore.getSubscribeTopics(), mapOf(topic1, topic3))
assertEquals(subscriptionStore.getUnsubscribeTopics(false).size, 1)
assertEquals(subscriptionStore.getUnsubscribeTopics(false), setOf(topic2.first))

// Test unsubscribing topic4 when its subscribed
subscriptionStore.unsubscribeTopics(listOf(topic4.first))
topics = listOf(topic4.first)
unsubscribeTopics = subscriptionStore.unsubscribeTopics(topics)
assertEquals(topics, unsubscribeTopics.toList())
assertEquals(subscriptionStore.getSubscribeTopics().size, 2)
assertEquals(subscriptionStore.getSubscribeTopics(), mapOf(topic1, topic3))
assertEquals(subscriptionStore.getUnsubscribeTopics(false).size, 2)
Expand All @@ -89,7 +97,9 @@ class PersistableSubscriptionStoreTest {
)

// Test subscribing topic5 when its not subscribed
subscriptionStore.subscribeTopics(mapOf(topic5))
topicMap = mapOf(topic5)
subscribeTopics = subscriptionStore.subscribeTopics(topicMap)
assertEquals(topicMap, subscribeTopics)
assertEquals(subscriptionStore.getSubscribeTopics().size, 3)
assertEquals(subscriptionStore.getSubscribeTopics(), mapOf(topic1, topic5, topic3))
assertEquals(subscriptionStore.getUnsubscribeTopics(false).size, 2)
Expand Down
Loading

0 comments on commit a789796

Please sign in to comment.