diff --git a/mqtt-client/src/main/java/com/gojek/mqtt/client/event/interceptor/MqttEventHandler.kt b/mqtt-client/src/main/java/com/gojek/mqtt/client/event/interceptor/MqttEventHandler.kt index 028c5cd0..f67e7f5f 100644 --- a/mqtt-client/src/main/java/com/gojek/mqtt/client/event/interceptor/MqttEventHandler.kt +++ b/mqtt-client/src/main/java/com/gojek/mqtt/client/event/interceptor/MqttEventHandler.kt @@ -2,10 +2,24 @@ package com.gojek.mqtt.client.event.interceptor import com.gojek.mqtt.event.EventHandler import com.gojek.mqtt.event.MqttEvent +import com.gojek.mqtt.utils.MqttUtils import java.util.concurrent.CopyOnWriteArrayList - -internal class MqttEventHandler : EventHandler { - +import java.util.concurrent.LinkedBlockingQueue +import java.util.concurrent.ThreadPoolExecutor +import java.util.concurrent.TimeUnit.SECONDS + +internal class MqttEventHandler( + mqttUtils: MqttUtils +) : EventHandler { + + private val eventScheduler = ThreadPoolExecutor( + /* corePoolSize = */ 1, + /* maximumPoolSize = */ 1, + /* keepAliveTime = */ 300, + /* unit = */ SECONDS, + /* workQueue = */ LinkedBlockingQueue(), + /* threadFactory = */ mqttUtils.threadFactory("mqtt-event-handler", false) + ).apply { allowCoreThreadTimeOut(true) } private val interceptorList = CopyOnWriteArrayList() private val eventHandlers = CopyOnWriteArrayList() @@ -18,7 +32,11 @@ internal class MqttEventHandler : EventHandler { interceptorList.forEach { event = it.intercept(event) } - eventHandlers.forEach { it.onEvent(mqttEvent) } + if (eventHandlers.isNotEmpty()) { + eventScheduler.submit { + eventHandlers.forEach { it.onEvent(mqttEvent) } + } + } } fun addEventHandler(handler: EventHandler) { diff --git a/mqtt-client/src/main/java/com/gojek/mqtt/client/internal/MqttClientInternal.kt b/mqtt-client/src/main/java/com/gojek/mqtt/client/internal/MqttClientInternal.kt index 340a8c29..cbd51033 100644 --- a/mqtt-client/src/main/java/com/gojek/mqtt/client/internal/MqttClientInternal.kt +++ b/mqtt-client/src/main/java/com/gojek/mqtt/client/internal/MqttClientInternal.kt @@ -22,6 +22,7 @@ import com.gojek.mqtt.event.PingEventHandler import com.gojek.mqtt.model.AdaptiveKeepAliveConfig import com.gojek.mqtt.model.MqttConnectOptions import com.gojek.mqtt.model.MqttPacket +import com.gojek.mqtt.utils.MqttUtils import com.gojek.networktracker.NetworkStateTrackerFactory internal class MqttClientInternal( @@ -42,7 +43,7 @@ internal class MqttClientInternal( private var keepAliveProvider: KeepAliveProvider = NonAdaptiveKeepAliveProvider() private var keepAliveFailureHandler: KeepAliveFailureHandler = NoOpKeepAliveFailureHandler() - private val eventHandler = MqttEventHandler() + private val eventHandler = MqttEventHandler(MqttUtils()) private val optimalKeepAliveObserver = object : OptimalKeepAliveObserver { override fun onOptimalKeepAliveFound(