From a789796e94f2aa51dae1b7ea76dc3c3d4f614af1 Mon Sep 17 00:00:00 2001 From: Deepanshu Date: Mon, 18 Jul 2022 15:24:50 +0530 Subject: [PATCH] Create new implementation of PersistableSubscriptionStore (#34) --- mqtt-client/api/mqtt-client.api | 20 ++- .../mqtt/client/config/ExperimentConfigs.kt | 7 +- .../mqtt/client/v3/impl/AndroidMqttClient.kt | 12 +- .../PersistableSubscriptionStore.kt | 4 +- .../PersistableSubscriptionStoreV2.kt | 92 +++++++++++ .../PersistableSubscriptionStoreTest.kt | 20 ++- .../PersistableSubscriptionStoreV2Test.kt | 144 ++++++++++++++++++ 7 files changed, 281 insertions(+), 18 deletions(-) create mode 100644 mqtt-client/src/main/java/com/gojek/mqtt/subscription/PersistableSubscriptionStoreV2.kt create mode 100644 mqtt-client/src/test/java/com/gojek/mqtt/subscription/PersistableSubscriptionStoreV2Test.kt diff --git a/mqtt-client/api/mqtt-client.api b/mqtt-client/api/mqtt-client.api index 04152f30..f0b0e04f 100644 --- a/mqtt-client/api/mqtt-client.api +++ b/mqtt-client/api/mqtt-client.api @@ -26,17 +26,17 @@ public abstract interface class com/gojek/mqtt/client/MqttInterceptor { public final class com/gojek/mqtt/client/config/ExperimentConfigs { public fun ()V - public fun (ZLcom/gojek/mqtt/model/AdaptiveKeepAliveConfig;IIIJJ)V - public synthetic fun (ZLcom/gojek/mqtt/model/AdaptiveKeepAliveConfig;IIIJJILkotlin/jvm/internal/DefaultConstructorMarker;)V - public final fun component1 ()Z + public fun (Lcom/gojek/mqtt/client/config/SubscriptionStore;Lcom/gojek/mqtt/model/AdaptiveKeepAliveConfig;IIIJJ)V + public synthetic fun (Lcom/gojek/mqtt/client/config/SubscriptionStore;Lcom/gojek/mqtt/model/AdaptiveKeepAliveConfig;IIIJJILkotlin/jvm/internal/DefaultConstructorMarker;)V + public final fun component1 ()Lcom/gojek/mqtt/client/config/SubscriptionStore; public final fun component2 ()Lcom/gojek/mqtt/model/AdaptiveKeepAliveConfig; public final fun component3 ()I public final fun component4 ()I public final fun component5 ()I public final fun component6 ()J public final fun component7 ()J - public final fun copy (ZLcom/gojek/mqtt/model/AdaptiveKeepAliveConfig;IIIJJ)Lcom/gojek/mqtt/client/config/ExperimentConfigs; - public static synthetic fun copy$default (Lcom/gojek/mqtt/client/config/ExperimentConfigs;ZLcom/gojek/mqtt/model/AdaptiveKeepAliveConfig;IIIJJILjava/lang/Object;)Lcom/gojek/mqtt/client/config/ExperimentConfigs; + public final fun copy (Lcom/gojek/mqtt/client/config/SubscriptionStore;Lcom/gojek/mqtt/model/AdaptiveKeepAliveConfig;IIIJJ)Lcom/gojek/mqtt/client/config/ExperimentConfigs; + public static synthetic fun copy$default (Lcom/gojek/mqtt/client/config/ExperimentConfigs;Lcom/gojek/mqtt/client/config/SubscriptionStore;Lcom/gojek/mqtt/model/AdaptiveKeepAliveConfig;IIIJJILjava/lang/Object;)Lcom/gojek/mqtt/client/config/ExperimentConfigs; public fun equals (Ljava/lang/Object;)Z public final fun getActivityCheckIntervalSeconds ()I public final fun getAdaptiveKeepAliveConfig ()Lcom/gojek/mqtt/model/AdaptiveKeepAliveConfig; @@ -44,8 +44,8 @@ public final class com/gojek/mqtt/client/config/ExperimentConfigs { public final fun getIncomingMessagesCleanupIntervalSecs ()J public final fun getIncomingMessagesTTLSecs ()J public final fun getPolicyResetTimeSeconds ()I + public final fun getSubscriptionStore ()Lcom/gojek/mqtt/client/config/SubscriptionStore; public fun hashCode ()I - public final fun isPersistentSubscriptionStoreEnabled ()Z public fun toString ()Ljava/lang/String; } @@ -85,6 +85,14 @@ public final class com/gojek/mqtt/client/config/PersistenceOptions$PahoPersisten public fun toString ()Ljava/lang/String; } +public final class com/gojek/mqtt/client/config/SubscriptionStore : java/lang/Enum { + public static final field IN_MEMORY Lcom/gojek/mqtt/client/config/SubscriptionStore; + public static final field PERSISTABLE Lcom/gojek/mqtt/client/config/SubscriptionStore; + public static final field PERSISTABLE_V2 Lcom/gojek/mqtt/client/config/SubscriptionStore; + public static fun valueOf (Ljava/lang/String;)Lcom/gojek/mqtt/client/config/SubscriptionStore; + public static fun values ()[Lcom/gojek/mqtt/client/config/SubscriptionStore; +} + public final class com/gojek/mqtt/client/config/v3/MqttV3Configuration : com/gojek/mqtt/client/config/MqttConfiguration { public fun (Lcom/gojek/mqtt/policies/connectretrytime/IConnectRetryTimePolicy;Lcom/gojek/mqtt/policies/connecttimeout/IConnectTimeoutPolicy;Lcom/gojek/mqtt/policies/subscriptionretry/ISubscriptionRetryPolicy;Lcom/gojek/mqtt/policies/subscriptionretry/ISubscriptionRetryPolicy;ILjavax/net/SocketFactory;Lcom/gojek/courier/logging/ILogger;Lcom/gojek/mqtt/auth/Authenticator;Lcom/gojek/mqtt/exception/handler/v3/AuthFailureHandler;Lcom/gojek/mqtt/event/EventHandler;Lcom/gojek/mqtt/pingsender/MqttPingSender;Ljava/util/List;Lcom/gojek/mqtt/client/config/PersistenceOptions;Lcom/gojek/mqtt/client/config/ExperimentConfigs;)V public synthetic fun (Lcom/gojek/mqtt/policies/connectretrytime/IConnectRetryTimePolicy;Lcom/gojek/mqtt/policies/connecttimeout/IConnectTimeoutPolicy;Lcom/gojek/mqtt/policies/subscriptionretry/ISubscriptionRetryPolicy;Lcom/gojek/mqtt/policies/subscriptionretry/ISubscriptionRetryPolicy;ILjavax/net/SocketFactory;Lcom/gojek/courier/logging/ILogger;Lcom/gojek/mqtt/auth/Authenticator;Lcom/gojek/mqtt/exception/handler/v3/AuthFailureHandler;Lcom/gojek/mqtt/event/EventHandler;Lcom/gojek/mqtt/pingsender/MqttPingSender;Ljava/util/List;Lcom/gojek/mqtt/client/config/PersistenceOptions;Lcom/gojek/mqtt/client/config/ExperimentConfigs;ILkotlin/jvm/internal/DefaultConstructorMarker;)V diff --git a/mqtt-client/src/main/java/com/gojek/mqtt/client/config/ExperimentConfigs.kt b/mqtt-client/src/main/java/com/gojek/mqtt/client/config/ExperimentConfigs.kt index ba3f7f68..ed49af0c 100644 --- a/mqtt-client/src/main/java/com/gojek/mqtt/client/config/ExperimentConfigs.kt +++ b/mqtt-client/src/main/java/com/gojek/mqtt/client/config/ExperimentConfigs.kt @@ -1,12 +1,13 @@ package com.gojek.mqtt.client.config +import com.gojek.mqtt.client.config.SubscriptionStore.PERSISTABLE import com.gojek.mqtt.constants.DEFAULT_ACTIVITY_CHECK_INTERVAL_SECS import com.gojek.mqtt.constants.DEFAULT_INACTIVITY_TIMEOUT_SECS import com.gojek.mqtt.constants.DEFAULT_POLICY_RESET_TIME_SECS import com.gojek.mqtt.model.AdaptiveKeepAliveConfig data class ExperimentConfigs( - val isPersistentSubscriptionStoreEnabled: Boolean = true, + val subscriptionStore: SubscriptionStore = PERSISTABLE, val adaptiveKeepAliveConfig: AdaptiveKeepAliveConfig? = null, val activityCheckIntervalSeconds: Int = DEFAULT_ACTIVITY_CHECK_INTERVAL_SECS, val inactivityTimeoutSeconds: Int = DEFAULT_INACTIVITY_TIMEOUT_SECS, @@ -14,3 +15,7 @@ data class ExperimentConfigs( val incomingMessagesTTLSecs: Long = 360, val incomingMessagesCleanupIntervalSecs: Long = 60 ) + +enum class SubscriptionStore { + IN_MEMORY, PERSISTABLE, PERSISTABLE_V2 +} diff --git a/mqtt-client/src/main/java/com/gojek/mqtt/client/v3/impl/AndroidMqttClient.kt b/mqtt-client/src/main/java/com/gojek/mqtt/client/v3/impl/AndroidMqttClient.kt index c3b92841..6963419a 100644 --- a/mqtt-client/src/main/java/com/gojek/mqtt/client/v3/impl/AndroidMqttClient.kt +++ b/mqtt-client/src/main/java/com/gojek/mqtt/client/v3/impl/AndroidMqttClient.kt @@ -19,6 +19,9 @@ import com.gojek.mqtt.client.IClientSchedulerBridge import com.gojek.mqtt.client.IMessageReceiveListener import com.gojek.mqtt.client.IncomingMsgController import com.gojek.mqtt.client.IncomingMsgControllerImpl +import com.gojek.mqtt.client.config.SubscriptionStore.IN_MEMORY +import com.gojek.mqtt.client.config.SubscriptionStore.PERSISTABLE +import com.gojek.mqtt.client.config.SubscriptionStore.PERSISTABLE_V2 import com.gojek.mqtt.client.config.v3.MqttV3Configuration import com.gojek.mqtt.client.connectioninfo.ConnectionInfo import com.gojek.mqtt.client.connectioninfo.ConnectionInfoStore @@ -66,6 +69,7 @@ import com.gojek.mqtt.scheduler.MqttRunnableScheduler import com.gojek.mqtt.send.listener.IMessageSendListener import com.gojek.mqtt.subscription.InMemorySubscriptionStore import com.gojek.mqtt.subscription.PersistableSubscriptionStore +import com.gojek.mqtt.subscription.PersistableSubscriptionStoreV2 import com.gojek.mqtt.subscription.SubscriptionStore import com.gojek.mqtt.utils.MqttUtils import com.gojek.mqtt.utils.NetworkUtils @@ -115,10 +119,10 @@ internal class AndroidMqttClient( private var forceRefresh = false private val subscriptionStore: SubscriptionStore = - if (experimentConfigs.isPersistentSubscriptionStoreEnabled) { - PersistableSubscriptionStore(context) - } else { - InMemorySubscriptionStore() + when (experimentConfigs.subscriptionStore) { + IN_MEMORY -> InMemorySubscriptionStore() + PERSISTABLE -> PersistableSubscriptionStore(context) + PERSISTABLE_V2 -> PersistableSubscriptionStoreV2(context) } private var hostFallbackPolicy: IHostFallbackPolicy? = null diff --git a/mqtt-client/src/main/java/com/gojek/mqtt/subscription/PersistableSubscriptionStore.kt b/mqtt-client/src/main/java/com/gojek/mqtt/subscription/PersistableSubscriptionStore.kt index 3be031e5..6afaffe5 100644 --- a/mqtt-client/src/main/java/com/gojek/mqtt/subscription/PersistableSubscriptionStore.kt +++ b/mqtt-client/src/main/java/com/gojek/mqtt/subscription/PersistableSubscriptionStore.kt @@ -92,8 +92,8 @@ internal class PersistableSubscriptionStore(context: Context) : SubscriptionStor } } -private class Persistence(context: Context) { - val sharedPreferences: SharedPreferences = +internal class Persistence(context: Context) { + private val sharedPreferences: SharedPreferences = context.getSharedPreferences("SubscriptionStorePrefs", MODE_PRIVATE) fun get(key: String, default: Set): Set { diff --git a/mqtt-client/src/main/java/com/gojek/mqtt/subscription/PersistableSubscriptionStoreV2.kt b/mqtt-client/src/main/java/com/gojek/mqtt/subscription/PersistableSubscriptionStoreV2.kt new file mode 100644 index 00000000..f26368e5 --- /dev/null +++ b/mqtt-client/src/main/java/com/gojek/mqtt/subscription/PersistableSubscriptionStoreV2.kt @@ -0,0 +1,92 @@ +package com.gojek.mqtt.subscription + +import android.content.Context +import androidx.annotation.VisibleForTesting +import com.gojek.courier.QoS +import com.gojek.courier.extensions.toImmutableMap +import com.gojek.courier.extensions.toImmutableSet + +internal class PersistableSubscriptionStoreV2(context: Context) : SubscriptionStore { + private lateinit var state: State + private val persistence = Persistence(context) + private val listener = object : SubscriptionStoreListener { + override fun onTopicsUnsubscribed(topics: Set) { + onTopicsUnsubscribedInternal(topics) + } + } + + private data class State( + val subscriptionTopics: Map, + val pendingUnsubscribeTopics: Set + ) + + init { + restoreState() + } + + @Synchronized + override fun getSubscribeTopics(): Map { + return state.subscriptionTopics.toImmutableMap() + } + + @Synchronized + override fun getUnsubscribeTopics(cleanSession: Boolean): Set { + if (cleanSession) { + persistence.put(PREF_KEY_PENDING_UNSUBSCRIBES, emptySet()) + state = state.copy(pendingUnsubscribeTopics = emptySet()) + } + return state.pendingUnsubscribeTopics.toImmutableSet() + } + + @Synchronized + override fun subscribeTopics(topicMap: Map): Map { + state = state.copy( + subscriptionTopics = state.subscriptionTopics + topicMap, + pendingUnsubscribeTopics = state.pendingUnsubscribeTopics - topicMap.keys + ) + persistence.put(PREF_KEY_PENDING_UNSUBSCRIBES, state.pendingUnsubscribeTopics) + return topicMap + } + + @Synchronized + override fun unsubscribeTopics(topics: List): Set { + state = state.copy( + subscriptionTopics = state.subscriptionTopics - topics, + pendingUnsubscribeTopics = state.pendingUnsubscribeTopics + topics + ) + persistence.put(PREF_KEY_PENDING_UNSUBSCRIBES, state.pendingUnsubscribeTopics) + return topics.toSet() + } + + override fun getListener(): SubscriptionStoreListener { + return listener + } + + @Synchronized + override fun clear() { + persistence.put(PREF_KEY_PENDING_UNSUBSCRIBES, emptySet()) + state = State( + subscriptionTopics = emptyMap(), + pendingUnsubscribeTopics = emptySet() + ) + } + + @VisibleForTesting + internal fun restoreState() { + state = State( + subscriptionTopics = emptyMap(), + pendingUnsubscribeTopics = persistence.get(PREF_KEY_PENDING_UNSUBSCRIBES, emptySet()) + ) + } + + @Synchronized + private fun onTopicsUnsubscribedInternal(topics: Set) { + state = state.copy( + subscriptionTopics = state.subscriptionTopics, + pendingUnsubscribeTopics = state.pendingUnsubscribeTopics - topics + ) + persistence.put(PREF_KEY_PENDING_UNSUBSCRIBES, state.pendingUnsubscribeTopics) + } +} + +private const val PREF_KEY_PENDING_UNSUBSCRIBES = "PendingUnsubscribes" diff --git a/mqtt-client/src/test/java/com/gojek/mqtt/subscription/PersistableSubscriptionStoreTest.kt b/mqtt-client/src/test/java/com/gojek/mqtt/subscription/PersistableSubscriptionStoreTest.kt index fe356ab7..86552ce7 100644 --- a/mqtt-client/src/test/java/com/gojek/mqtt/subscription/PersistableSubscriptionStoreTest.kt +++ b/mqtt-client/src/test/java/com/gojek/mqtt/subscription/PersistableSubscriptionStoreTest.kt @@ -60,26 +60,34 @@ class PersistableSubscriptionStoreTest { val topic5 = "topic5" to QoS.ONE // Test when topic1, topic2, topic3 are subscribed for the first time - subscriptionStore.subscribeTopics(mapOf(topic1, topic2, topic3)) + var topicMap = mapOf(topic1, topic2, topic3) + var subscribeTopics = subscriptionStore.subscribeTopics(topicMap) + assertEquals(topicMap, subscribeTopics) assertEquals(subscriptionStore.getSubscribeTopics().size, 3) assertEquals(subscriptionStore.getSubscribeTopics(), mapOf(topic1, topic2, topic3)) assertEquals(subscriptionStore.getUnsubscribeTopics(false).size, 0) // Test subscribing topic3 when its already subscribed - subscriptionStore.subscribeTopics(mapOf(topic3)) + topicMap = mapOf(topic3) + subscribeTopics = subscriptionStore.subscribeTopics(topicMap) + assert(subscribeTopics.isEmpty()) assertEquals(subscriptionStore.getSubscribeTopics().size, 3) assertEquals(subscriptionStore.getSubscribeTopics(), mapOf(topic1, topic2, topic3)) assertEquals(subscriptionStore.getUnsubscribeTopics(false).size, 0) // Test unsubscribing topic2 when its subscribed - subscriptionStore.unsubscribeTopics(listOf(topic2.first)) + var topics = listOf(topic2.first) + var unsubscribeTopics = subscriptionStore.unsubscribeTopics(topics) + assertEquals(topics, unsubscribeTopics.toList()) assertEquals(subscriptionStore.getSubscribeTopics().size, 2) assertEquals(subscriptionStore.getSubscribeTopics(), mapOf(topic1, topic3)) assertEquals(subscriptionStore.getUnsubscribeTopics(false).size, 1) assertEquals(subscriptionStore.getUnsubscribeTopics(false), setOf(topic2.first)) // Test unsubscribing topic4 when its subscribed - subscriptionStore.unsubscribeTopics(listOf(topic4.first)) + topics = listOf(topic4.first) + unsubscribeTopics = subscriptionStore.unsubscribeTopics(topics) + assertEquals(topics, unsubscribeTopics.toList()) assertEquals(subscriptionStore.getSubscribeTopics().size, 2) assertEquals(subscriptionStore.getSubscribeTopics(), mapOf(topic1, topic3)) assertEquals(subscriptionStore.getUnsubscribeTopics(false).size, 2) @@ -89,7 +97,9 @@ class PersistableSubscriptionStoreTest { ) // Test subscribing topic5 when its not subscribed - subscriptionStore.subscribeTopics(mapOf(topic5)) + topicMap = mapOf(topic5) + subscribeTopics = subscriptionStore.subscribeTopics(topicMap) + assertEquals(topicMap, subscribeTopics) assertEquals(subscriptionStore.getSubscribeTopics().size, 3) assertEquals(subscriptionStore.getSubscribeTopics(), mapOf(topic1, topic5, topic3)) assertEquals(subscriptionStore.getUnsubscribeTopics(false).size, 2) diff --git a/mqtt-client/src/test/java/com/gojek/mqtt/subscription/PersistableSubscriptionStoreV2Test.kt b/mqtt-client/src/test/java/com/gojek/mqtt/subscription/PersistableSubscriptionStoreV2Test.kt new file mode 100644 index 00000000..0b33a5f8 --- /dev/null +++ b/mqtt-client/src/test/java/com/gojek/mqtt/subscription/PersistableSubscriptionStoreV2Test.kt @@ -0,0 +1,144 @@ +package com.gojek.mqtt.subscription + +import android.content.Context +import android.content.SharedPreferences +import com.gojek.courier.QoS +import com.nhaarman.mockitokotlin2.any +import com.nhaarman.mockitokotlin2.whenever +import org.junit.Assert.assertEquals +import org.junit.Assert.assertTrue +import org.junit.Before +import org.junit.Test +import org.junit.runner.RunWith +import org.mockito.Mock +import org.mockito.junit.MockitoJUnitRunner + +@RunWith(MockitoJUnitRunner::class) +class PersistableSubscriptionStoreV2Test { + @Mock + private lateinit var context: Context + + @Mock + private lateinit var sharedPreferences: SharedPreferences + + @Mock + private lateinit var editor: SharedPreferences.Editor + + private lateinit var subscriptionStore: PersistableSubscriptionStoreV2 + + @Before + fun setup() { + whenever(sharedPreferences.getStringSet("PendingUnsubscribes", emptySet())) + .thenReturn(emptySet()) + whenever(sharedPreferences.edit()).thenReturn(editor) + whenever(editor.putStringSet(any(), any())).thenReturn(editor) + whenever(context.getSharedPreferences("SubscriptionStorePrefs", Context.MODE_PRIVATE)) + .thenReturn(sharedPreferences) + subscriptionStore = PersistableSubscriptionStoreV2(context) + } + + @Test + fun testRestoreState() { + assertEquals(subscriptionStore.getSubscribeTopics().size, 0) + assertEquals(subscriptionStore.getUnsubscribeTopics(false).size, 0) + whenever(sharedPreferences.getStringSet("PendingUnsubscribes", emptySet())) + .thenReturn(setOf("topic1", "topic2")) + subscriptionStore.restoreState() + assertEquals(subscriptionStore.getSubscribeTopics().size, 0) + assertEquals(subscriptionStore.getUnsubscribeTopics(false), setOf("topic1", "topic2")) + assertEquals(subscriptionStore.getUnsubscribeTopics(false).size, 2) + } + + @Test + fun testSubscriptionStore() { + assertTrue(subscriptionStore.getSubscribeTopics().isEmpty()) + + val topic1 = "topic1" to QoS.ONE + val topic2 = "topic2" to QoS.ONE + val topic3 = "topic3" to QoS.ONE + val topic4 = "topic4" to QoS.ONE + val topic5 = "topic5" to QoS.ONE + + // Test when topic1, topic2, topic3 are subscribed for the first time + var topicMap = mapOf(topic1, topic2, topic3) + var subscribeTopics = subscriptionStore.subscribeTopics(topicMap) + assertEquals(topicMap, subscribeTopics) + assertEquals(subscriptionStore.getSubscribeTopics().size, 3) + assertEquals(subscriptionStore.getSubscribeTopics(), mapOf(topic1, topic2, topic3)) + assertEquals(subscriptionStore.getUnsubscribeTopics(false).size, 0) + + // Test subscribing topic3 when its already subscribed + topicMap = mapOf(topic3) + subscribeTopics = subscriptionStore.subscribeTopics(topicMap) + assertEquals(topicMap, subscribeTopics) + assertEquals(subscriptionStore.getSubscribeTopics().size, 3) + assertEquals(subscriptionStore.getSubscribeTopics(), mapOf(topic1, topic2, topic3)) + assertEquals(subscriptionStore.getUnsubscribeTopics(false).size, 0) + + // Test unsubscribing topic2 when its subscribed + var topics = listOf(topic2.first) + var unsubscribeTopics = subscriptionStore.unsubscribeTopics(topics) + assertEquals(topics, unsubscribeTopics.toList()) + assertEquals(subscriptionStore.getSubscribeTopics().size, 2) + assertEquals(subscriptionStore.getSubscribeTopics(), mapOf(topic1, topic3)) + assertEquals(subscriptionStore.getUnsubscribeTopics(false).size, 1) + assertEquals(subscriptionStore.getUnsubscribeTopics(false), setOf(topic2.first)) + + // Test unsubscribing topic4 when its subscribed + topics = listOf(topic4.first) + unsubscribeTopics = subscriptionStore.unsubscribeTopics(topics) + assertEquals(topics, unsubscribeTopics.toList()) + assertEquals(subscriptionStore.getSubscribeTopics().size, 2) + assertEquals(subscriptionStore.getSubscribeTopics(), mapOf(topic1, topic3)) + assertEquals(subscriptionStore.getUnsubscribeTopics(false).size, 2) + assertEquals( + subscriptionStore.getUnsubscribeTopics(false), + setOf(topic2.first, topic4.first) + ) + + // Test subscribing topic5 when its not subscribed + topicMap = mapOf(topic5) + subscribeTopics = subscriptionStore.subscribeTopics(topicMap) + assertEquals(topicMap, subscribeTopics) + assertEquals(subscriptionStore.getSubscribeTopics().size, 3) + assertEquals(subscriptionStore.getSubscribeTopics(), mapOf(topic1, topic5, topic3)) + assertEquals(subscriptionStore.getUnsubscribeTopics(false).size, 2) + assertEquals( + subscriptionStore.getUnsubscribeTopics(false), + setOf(topic2.first, topic4.first) + ) + + // Test notifying unsubscribe success of topic2 + subscriptionStore.getListener().onTopicsUnsubscribed(setOf(topic2.first)) + assertEquals(subscriptionStore.getSubscribeTopics().size, 3) + assertEquals(subscriptionStore.getSubscribeTopics(), mapOf(topic1, topic5, topic3)) + assertEquals(subscriptionStore.getUnsubscribeTopics(false).size, 1) + assertEquals(subscriptionStore.getUnsubscribeTopics(false), setOf(topic4.first)) + + // Test notifying unsubscribe success of topic5 + subscriptionStore.getListener().onTopicsSubscribed(mapOf(topic5)) + assertEquals(subscriptionStore.getSubscribeTopics().size, 3) + assertEquals(subscriptionStore.getSubscribeTopics(), mapOf(topic1, topic5, topic3)) + assertEquals(subscriptionStore.getUnsubscribeTopics(false).size, 1) + assertEquals(subscriptionStore.getUnsubscribeTopics(false), setOf(topic4.first)) + + // Test getUnsubscribeTopics with cleansession true + subscriptionStore.getListener().onTopicsSubscribed(mapOf(topic5)) + assertEquals(subscriptionStore.getSubscribeTopics().size, 3) + assertEquals(subscriptionStore.getSubscribeTopics(), mapOf(topic1, topic5, topic3)) + assertEquals(subscriptionStore.getUnsubscribeTopics(true).size, 0) + } + + @Test + fun testClear() { + val topic1 = "topic1" to QoS.ONE + val topic2 = "topic2" to QoS.ZERO + subscriptionStore.subscribeTopics(mapOf(topic1, topic2)) + subscriptionStore.unsubscribeTopics(listOf("topic3", "topic4")) + assertEquals(subscriptionStore.getSubscribeTopics().size, 2) + assertEquals(subscriptionStore.getUnsubscribeTopics(false).size, 2) + subscriptionStore.clear() + assertEquals(subscriptionStore.getSubscribeTopics().size, 0) + assertEquals(subscriptionStore.getUnsubscribeTopics(false).size, 0) + } +}