Skip to content

Commit

Permalink
ULMS-3316 Added backoff for mqtt client
Browse files Browse the repository at this point in the history
  • Loading branch information
alexkonst committed Sep 10, 2024
1 parent 36272bf commit 93e101b
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 53 deletions.
18 changes: 2 additions & 16 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@ulms/api-clients",
"version": "7.17.0",
"version": "7.17.0-dev.0-ULMS-3316",
"description": "JavaScript API clients for ULMS platform",
"keywords": [],
"homepage": "https://github.com/foxford/ulms-api-clients-js#readme",
Expand Down Expand Up @@ -41,7 +41,6 @@
"lodash": "4.17.21",
"mime": "2.6.0",
"mqtt": "3.0.0",
"mqtt-pattern": "1.2.0",
"nats.ws": "1.7.2",
"p-queue": "7.4.1",
"uuid": "8.3.2"
Expand Down
54 changes: 19 additions & 35 deletions src/mqtt.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@

// using version from cdn
// import mqtt from 'mqtt'
import MQTTPattern from 'mqtt-pattern'

import Backoff from './backoff'
import { sleep } from './common'
import { mqttReasonCodeNameEnum } from './constants'
import retry from './retry'
import loadScript from './script-loader'
Expand Down Expand Up @@ -53,10 +54,7 @@ class MQTTClient {

constructor(url) {
this.client = undefined
this.patterns = {}
this.url = url

this.handleMessageEvent = this.handleMessageEvent.bind(this)
}

get connected() {
Expand All @@ -75,36 +73,8 @@ class MQTTClient {
return this.client.reconnecting
}

bindEventListeners() {
this.client.on(MQTTClient.events.MESSAGE, this.handleMessageEvent)
}

handleMessageEvent(topic, message, packet) {
const patterns = Object.keys(this.patterns)

for (const pattern of patterns) {
const topicParameters = MQTTPattern.exec(pattern, topic)

if (topicParameters !== null) {
this.patterns[pattern](topicParameters, topic, message, packet)
}
}
}

attachRoute(topicPattern, handler) {
this.patterns[topicPattern] = handler
}

detachRoute(topicPattern) {
if (this.patterns[topicPattern]) {
delete this.patterns[topicPattern]
}
}

connect(options) {
this.client = mqtt.connect(this.url, options)

this.bindEventListeners()
}

disconnect(...arguments_) {
Expand Down Expand Up @@ -140,6 +110,7 @@ class ReconnectingMQTTClient extends MQTTClient {
constructor(url, tokenProvider, reconnectLimit) {
super(url)

this.backoff = new Backoff()
this.forcedClose = false
this.reconnectCount = 0
this.reconnectLimit = reconnectLimit || 3
Expand All @@ -154,8 +125,6 @@ class ReconnectingMQTTClient extends MQTTClient {
}

bindEventListeners() {
super.bindEventListeners()

this.client.on(ReconnectingMQTTClient.events.CLOSE, this.handleCloseEvent)
this.client.on(
ReconnectingMQTTClient.events.CONNECT,
Expand Down Expand Up @@ -193,6 +162,8 @@ class ReconnectingMQTTClient extends MQTTClient {

this.forcedClose = false
this.reconnectCount = 0

this.backoff.reset()
}

handlePacketReceiveEvent(packet) {
Expand Down Expand Up @@ -241,6 +212,8 @@ class ReconnectingMQTTClient extends MQTTClient {
return this.tokenProvider.getToken().then((password) => {
super.connect({ ...options, password })

this.bindEventListeners()

return new Promise((resolve, reject) => {
const connectHandler = () => {
// eslint-disable-next-line no-use-before-define
Expand All @@ -265,13 +238,24 @@ class ReconnectingMQTTClient extends MQTTClient {

disconnect() {
this.forcedClose = true
this.reconnectCount = 0

this.backoff.reset()

super.disconnect(true)
}

reconnect() {
async reconnect() {
if (this.tokenProviderPromise !== undefined) return

this.forcedClose = false

if (this.reconnectCount !== 0) {
await sleep(this.backoff.value)

this.backoff.next()
}

this.tokenProviderPromise = this.tokenProvider
.getToken()
.then((password) => {
Expand Down

0 comments on commit 93e101b

Please sign in to comment.