Skip to content

Commit

Permalink
Add support for global message listener (#17)
Browse files Browse the repository at this point in the history
  • Loading branch information
deepanshu42 authored May 27, 2022
1 parent 9f0f0b9 commit 836d3c3
Show file tree
Hide file tree
Showing 9 changed files with 35 additions and 11 deletions.
1 change: 1 addition & 0 deletions mqtt-client/api/mqtt-client.api
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ public abstract interface class com/gojek/mqtt/auth/Authenticator {
}

public abstract interface class com/gojek/mqtt/client/MqttClient {
public abstract fun addGlobalMessageListener (Lcom/gojek/mqtt/client/listener/MessageListener;)V
public abstract fun addMessageListener (Ljava/lang/String;Lcom/gojek/mqtt/client/listener/MessageListener;)V
public abstract fun connect (Lcom/gojek/mqtt/model/MqttConnectOptions;)V
public abstract fun disconnect (Z)V
Expand Down
Original file line number Diff line number Diff line change
@@ -1,23 +1,17 @@
package com.gojek.mqtt.client

import android.os.HandlerThread
import android.os.Process
import android.os.Process.THREAD_PRIORITY_BACKGROUND
import android.os.Process.THREAD_PRIORITY_MORE_FAVORABLE
import com.gojek.courier.Message
import com.gojek.courier.extensions.fromSecondsToNanos
import com.gojek.courier.logging.ILogger
import com.gojek.courier.utils.Clock
import com.gojek.mqtt.client.listener.MessageListener
import com.gojek.mqtt.client.model.MqttMessage
import com.gojek.mqtt.event.EventHandler
import com.gojek.mqtt.event.MqttEvent.MqttMessageReceiveErrorEvent
import com.gojek.mqtt.exception.toCourierException
import com.gojek.mqtt.persistence.IMqttReceivePersistence
import com.gojek.mqtt.persistence.model.MqttReceivePacket
import com.gojek.mqtt.persistence.model.toMqttMessage
import com.gojek.mqtt.utils.MqttUtils
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.Future
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.ScheduledFuture
import java.util.concurrent.ScheduledThreadPoolExecutor
Expand Down Expand Up @@ -134,9 +128,7 @@ internal class IncomingMsgControllerImpl(
try {
listenerMap[message.topic]!!.forEach {
notified = true
it.onMessageReceived(
MqttMessage(message.topic, Message.Bytes(message.message))
)
it.onMessageReceived(message.toMqttMessage())
}
return notified
} catch (e: Throwable) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,5 @@ interface MqttClient {
fun send(message: Message, topic: String, qos: QoS): Boolean
fun addMessageListener(topic: String, listener: MessageListener)
fun removeMessageListener(topic: String, listener: MessageListener)
fun addGlobalMessageListener(listener: MessageListener)
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,8 @@ internal class MqttCourierClient(
override fun removeMessageListener(topic: String, listener: MessageListener) {
mqttClient.removeMessageListener(topic, listener)
}

override fun addGlobalMessageListener(listener: MessageListener) {
mqttClient.addGlobalMessageListener(listener)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@ internal class MqttClientInternal(
return androidMqttClient.removeMessageListener(topic, listener)
}

fun addGlobalMessageListener(listener: MessageListener) {
return androidMqttClient.addGlobalMessageListener(listener)
}

fun getCurrentState(): ConnectionState {
return androidMqttClient.getCurrentState()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ internal interface IAndroidMqttClient {
fun send(mqttPacket: MqttPacket): Boolean
fun addMessageListener(topic: String, listener: MessageListener)
fun removeMessageListener(topic: String, listener: MessageListener)
fun addGlobalMessageListener(listener: MessageListener)
fun isConnected(): Boolean
fun subscribe(topicMap: Map<String, QoS>)
fun unsubscribe(topics: List<String>)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ import com.gojek.mqtt.model.MqttPacket
import com.gojek.mqtt.network.NetworkHandler
import com.gojek.mqtt.persistence.impl.PahoPersistence
import com.gojek.mqtt.persistence.model.MqttReceivePacket
import com.gojek.mqtt.persistence.model.toMqttMessage
import com.gojek.mqtt.pingsender.MqttPingSender
import com.gojek.mqtt.policies.hostfallback.HostFallbackPolicy
import com.gojek.mqtt.policies.hostfallback.IHostFallbackPolicy
Expand Down Expand Up @@ -104,6 +105,9 @@ internal class AndroidMqttClient(

private val experimentConfigs = mqttConfiguration.experimentConfigs

@Volatile
private var globalListener: MessageListener? = null

@Volatile
private var isInitialised = false

Expand Down Expand Up @@ -313,6 +317,10 @@ internal class AndroidMqttClient(
incomingMsgController.unregisterListener(topic, listener)
}

override fun addGlobalMessageListener(listener: MessageListener) {
this.globalListener = listener
}

//This runs on Mqtt thread
override fun connectMqtt() {
val startTime = clock.nanoTime()
Expand Down Expand Up @@ -533,6 +541,7 @@ internal class AndroidMqttClient(
topic
)
mqttPersistence.addReceivedMessage(mqttPacket)
globalListener?.onMessageReceived(mqttPacket.toMqttMessage())
triggerHandleMessage()
} catch (e: IllegalStateException){
mqttConfiguration.eventHandler.onEvent(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package com.gojek.mqtt.persistence.model
import androidx.room.ColumnInfo
import androidx.room.Entity
import androidx.room.PrimaryKey
import com.gojek.courier.Message
import com.gojek.mqtt.client.model.MqttMessage

@Entity(tableName = "incoming_messages")
internal data class MqttReceivePacket(
Expand All @@ -15,4 +17,7 @@ internal data class MqttReceivePacket(
var nanosTimestamp: Long,
@ColumnInfo(name = "topic")
var topic: String
)
)

internal fun MqttReceivePacket.toMqttMessage() =
MqttMessage(this.topic, Message.Bytes(this.message))
Original file line number Diff line number Diff line change
Expand Up @@ -98,4 +98,11 @@ class MqttCourierClientTest {
mqttCourierClient.removeMessageListener(topic, listener)
verify(mqttClientInternal).removeMessageListener(topic, listener)
}

@Test
fun `test addGlobalMessageListener`() {
val listener = mock<MessageListener>()
mqttCourierClient.addGlobalMessageListener(listener)
verify(mqttClientInternal).addGlobalMessageListener(listener)
}
}

0 comments on commit 836d3c3

Please sign in to comment.