Skip to content

Commit

Permalink
Create single thread scheduler for emitting MQTT events (#79)
Browse files Browse the repository at this point in the history
  • Loading branch information
deepanshu42 authored Nov 21, 2023
1 parent eba8018 commit 8e3c564
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,24 @@ package com.gojek.mqtt.client.event.interceptor

import com.gojek.mqtt.event.EventHandler
import com.gojek.mqtt.event.MqttEvent
import com.gojek.mqtt.utils.MqttUtils
import java.util.concurrent.CopyOnWriteArrayList

internal class MqttEventHandler : EventHandler {

import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit.SECONDS

internal class MqttEventHandler(
mqttUtils: MqttUtils
) : EventHandler {

private val eventScheduler = ThreadPoolExecutor(
/* corePoolSize = */ 1,
/* maximumPoolSize = */ 1,
/* keepAliveTime = */ 300,
/* unit = */ SECONDS,
/* workQueue = */ LinkedBlockingQueue(),
/* threadFactory = */ mqttUtils.threadFactory("mqtt-event-handler", false)
).apply { allowCoreThreadTimeOut(true) }
private val interceptorList = CopyOnWriteArrayList<EventInterceptor>()
private val eventHandlers = CopyOnWriteArrayList<EventHandler>()

Expand All @@ -18,7 +32,11 @@ internal class MqttEventHandler : EventHandler {
interceptorList.forEach {
event = it.intercept(event)
}
eventHandlers.forEach { it.onEvent(mqttEvent) }
if (eventHandlers.isNotEmpty()) {
eventScheduler.submit {
eventHandlers.forEach { it.onEvent(mqttEvent) }
}
}
}

fun addEventHandler(handler: EventHandler) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import com.gojek.mqtt.event.PingEventHandler
import com.gojek.mqtt.model.AdaptiveKeepAliveConfig
import com.gojek.mqtt.model.MqttConnectOptions
import com.gojek.mqtt.model.MqttPacket
import com.gojek.mqtt.utils.MqttUtils
import com.gojek.networktracker.NetworkStateTrackerFactory

internal class MqttClientInternal(
Expand All @@ -42,7 +43,7 @@ internal class MqttClientInternal(
private var keepAliveProvider: KeepAliveProvider = NonAdaptiveKeepAliveProvider()
private var keepAliveFailureHandler: KeepAliveFailureHandler = NoOpKeepAliveFailureHandler()

private val eventHandler = MqttEventHandler()
private val eventHandler = MqttEventHandler(MqttUtils())

private val optimalKeepAliveObserver = object : OptimalKeepAliveObserver {
override fun onOptimalKeepAliveFound(
Expand Down

0 comments on commit 8e3c564

Please sign in to comment.