Skip to content

Commit

Permalink
ULMS-3327 Fixed connect flow
Browse files Browse the repository at this point in the history
  • Loading branch information
alexkonst committed Sep 13, 2024
1 parent 6b6b5fe commit 9d62746
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 38 deletions.
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@ulms/api-clients",
"version": "7.19.0",
"version": "7.20.0",
"description": "JavaScript API clients for ULMS platform",
"keywords": [],
"homepage": "https://github.com/foxford/ulms-api-clients-js#readme",
Expand Down
79 changes: 44 additions & 35 deletions src/mqtt.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
// import mqtt from 'mqtt'

import Backoff from './backoff'
import { sleep } from './common'
import { makeDeferred, sleep } from './common'
import { mqttReasonCodeNameEnum } from './constants'
import retry from './retry'
import loadScript from './script-loader'
Expand Down Expand Up @@ -111,6 +111,7 @@ class ReconnectingMQTTClient extends MQTTClient {
super(url)

this.backoff = new Backoff()
this.connectDeferred = undefined
this.forcedClose = false
this.reconnectCount = 0
this.reconnectLimit = reconnectLimit || 3
Expand All @@ -120,6 +121,7 @@ class ReconnectingMQTTClient extends MQTTClient {

this.handleCloseEvent = this.handleCloseEvent.bind(this)
this.handleConnectEvent = this.handleConnectEvent.bind(this)
this.handleErrorEvent = this.handleErrorEvent.bind(this)
this.handlePacketReceiveEvent = this.handlePacketReceiveEvent.bind(this)
this.handleReconnectEvent = this.handleReconnectEvent.bind(this)
}
Expand All @@ -130,6 +132,7 @@ class ReconnectingMQTTClient extends MQTTClient {
ReconnectingMQTTClient.events.CONNECT,
this.handleConnectEvent,
)
this.client.on(ReconnectingMQTTClient.events.ERROR, this.handleErrorEvent)
this.client.on(
ReconnectingMQTTClient.events.PACKETRECEIVE,
this.handlePacketReceiveEvent,
Expand All @@ -148,10 +151,7 @@ class ReconnectingMQTTClient extends MQTTClient {
console.log('[mqttClient] close, forced:', this.forcedClose)

if (this.reconnectCount >= this.reconnectLimit) {
this.disconnect()

// onCloseHandler && onCloseHandler(RECONNECT_LIMIT_EXCEEDED)
// this.client.emit('reconnect_limit_exceeded')
this.disconnect(new Error('retry_limit_exceeded'))
} else if (!this.forcedClose) {
this.reconnect()
}
Expand All @@ -164,6 +164,25 @@ class ReconnectingMQTTClient extends MQTTClient {
this.reconnectCount = 0

this.backoff.reset()

if (this.connectDeferred) {
this.connectDeferred.resolve()

this.connectDeferred = undefined
}
}

handleErrorEvent(error) {
console.log('[mqttClient] error', error)

if (this.connectDeferred) {
// ignoring recoverable errors while connecting (through retry flow)
if (error && error.message === 'connack timeout') return

this.connectDeferred.reject(error)

this.connectDeferred = undefined
}
}

handlePacketReceiveEvent(packet) {
Expand Down Expand Up @@ -192,7 +211,7 @@ class ReconnectingMQTTClient extends MQTTClient {
return
}

this.disconnect()
this.disconnect(new Error(mqttReasonCodeNameEnum[reasonCode]))

// reconnect only on KEEP_ALIVE_TIMEOUT
if (reasonCode === 141) {
Expand All @@ -208,41 +227,31 @@ class ReconnectingMQTTClient extends MQTTClient {
this.reconnectCount += 1
}

connect(options) {
return this.tokenProvider.getToken().then((password) => {
super.connect({ ...options, password })

this.bindEventListeners()

return new Promise((resolve, reject) => {
const connectHandler = () => {
// eslint-disable-next-line no-use-before-define
offHandlers()
resolve()
}
const errorHandler = (error) => {
// eslint-disable-next-line no-use-before-define
offHandlers()
reject(error)
}
const offHandlers = () => {
super.off(ReconnectingMQTTClient.events.CONNECT, connectHandler)
super.off(ReconnectingMQTTClient.events.ERROR, errorHandler)
}

super.on(ReconnectingMQTTClient.events.CONNECT, connectHandler)
super.on(ReconnectingMQTTClient.events.ERROR, errorHandler)
})
})
async connect(options) {
this.connectDeferred = makeDeferred()

const token = await this.tokenProvider.getToken()

super.connect({ ...options, password: token })

this.bindEventListeners()

return this.connectDeferred.promise
}

disconnect() {
disconnect(reason) {
this.forcedClose = true
this.reconnectCount = 0

this.backoff.reset()

super.disconnect(true)

if (this.connectDeferred) {
this.connectDeferred.reject(reason)

this.connectDeferred = undefined
}
}

async reconnect() {
Expand All @@ -265,10 +274,10 @@ class ReconnectingMQTTClient extends MQTTClient {

super.reconnect()
})
.catch(() => {
.catch((error) => {
this.tokenProviderPromise = undefined

this.disconnect()
this.disconnect(error)
})
}

Expand Down

0 comments on commit 9d62746

Please sign in to comment.