From 9d627462cc1c1ee76a306ac47a4b4966c56169ab Mon Sep 17 00:00:00 2001 From: Aleksey Konstantinov Date: Fri, 13 Sep 2024 13:09:38 +0300 Subject: [PATCH] ULMS-3327 Fixed connect flow --- package-lock.json | 4 +-- package.json | 2 +- src/mqtt.js | 79 ++++++++++++++++++++++++++--------------------- 3 files changed, 47 insertions(+), 38 deletions(-) diff --git a/package-lock.json b/package-lock.json index 4dbbd38..6628667 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "@ulms/api-clients", - "version": "7.19.0", + "version": "7.20.0", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "@ulms/api-clients", - "version": "7.19.0", + "version": "7.20.0", "license": "MIT", "dependencies": { "axios": "1.6.2", diff --git a/package.json b/package.json index 33bc46b..6718e01 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@ulms/api-clients", - "version": "7.19.0", + "version": "7.20.0", "description": "JavaScript API clients for ULMS platform", "keywords": [], "homepage": "https://github.com/foxford/ulms-api-clients-js#readme", diff --git a/src/mqtt.js b/src/mqtt.js index 8baf243..d97fade 100644 --- a/src/mqtt.js +++ b/src/mqtt.js @@ -5,7 +5,7 @@ // import mqtt from 'mqtt' import Backoff from './backoff' -import { sleep } from './common' +import { makeDeferred, sleep } from './common' import { mqttReasonCodeNameEnum } from './constants' import retry from './retry' import loadScript from './script-loader' @@ -111,6 +111,7 @@ class ReconnectingMQTTClient extends MQTTClient { super(url) this.backoff = new Backoff() + this.connectDeferred = undefined this.forcedClose = false this.reconnectCount = 0 this.reconnectLimit = reconnectLimit || 3 @@ -120,6 +121,7 @@ class ReconnectingMQTTClient extends MQTTClient { this.handleCloseEvent = this.handleCloseEvent.bind(this) this.handleConnectEvent = this.handleConnectEvent.bind(this) + this.handleErrorEvent = this.handleErrorEvent.bind(this) this.handlePacketReceiveEvent = this.handlePacketReceiveEvent.bind(this) this.handleReconnectEvent = this.handleReconnectEvent.bind(this) } @@ -130,6 +132,7 @@ class ReconnectingMQTTClient extends MQTTClient { ReconnectingMQTTClient.events.CONNECT, this.handleConnectEvent, ) + this.client.on(ReconnectingMQTTClient.events.ERROR, this.handleErrorEvent) this.client.on( ReconnectingMQTTClient.events.PACKETRECEIVE, this.handlePacketReceiveEvent, @@ -148,10 +151,7 @@ class ReconnectingMQTTClient extends MQTTClient { console.log('[mqttClient] close, forced:', this.forcedClose) if (this.reconnectCount >= this.reconnectLimit) { - this.disconnect() - - // onCloseHandler && onCloseHandler(RECONNECT_LIMIT_EXCEEDED) - // this.client.emit('reconnect_limit_exceeded') + this.disconnect(new Error('retry_limit_exceeded')) } else if (!this.forcedClose) { this.reconnect() } @@ -164,6 +164,25 @@ class ReconnectingMQTTClient extends MQTTClient { this.reconnectCount = 0 this.backoff.reset() + + if (this.connectDeferred) { + this.connectDeferred.resolve() + + this.connectDeferred = undefined + } + } + + handleErrorEvent(error) { + console.log('[mqttClient] error', error) + + if (this.connectDeferred) { + // ignoring recoverable errors while connecting (through retry flow) + if (error && error.message === 'connack timeout') return + + this.connectDeferred.reject(error) + + this.connectDeferred = undefined + } } handlePacketReceiveEvent(packet) { @@ -192,7 +211,7 @@ class ReconnectingMQTTClient extends MQTTClient { return } - this.disconnect() + this.disconnect(new Error(mqttReasonCodeNameEnum[reasonCode])) // reconnect only on KEEP_ALIVE_TIMEOUT if (reasonCode === 141) { @@ -208,41 +227,31 @@ class ReconnectingMQTTClient extends MQTTClient { this.reconnectCount += 1 } - connect(options) { - return this.tokenProvider.getToken().then((password) => { - super.connect({ ...options, password }) - - this.bindEventListeners() - - return new Promise((resolve, reject) => { - const connectHandler = () => { - // eslint-disable-next-line no-use-before-define - offHandlers() - resolve() - } - const errorHandler = (error) => { - // eslint-disable-next-line no-use-before-define - offHandlers() - reject(error) - } - const offHandlers = () => { - super.off(ReconnectingMQTTClient.events.CONNECT, connectHandler) - super.off(ReconnectingMQTTClient.events.ERROR, errorHandler) - } - - super.on(ReconnectingMQTTClient.events.CONNECT, connectHandler) - super.on(ReconnectingMQTTClient.events.ERROR, errorHandler) - }) - }) + async connect(options) { + this.connectDeferred = makeDeferred() + + const token = await this.tokenProvider.getToken() + + super.connect({ ...options, password: token }) + + this.bindEventListeners() + + return this.connectDeferred.promise } - disconnect() { + disconnect(reason) { this.forcedClose = true this.reconnectCount = 0 this.backoff.reset() super.disconnect(true) + + if (this.connectDeferred) { + this.connectDeferred.reject(reason) + + this.connectDeferred = undefined + } } async reconnect() { @@ -265,10 +274,10 @@ class ReconnectingMQTTClient extends MQTTClient { super.reconnect() }) - .catch(() => { + .catch((error) => { this.tokenProviderPromise = undefined - this.disconnect() + this.disconnect(error) }) }