From 93e101b1c202c540de4dd42725128a7c4a516a9a Mon Sep 17 00:00:00 2001 From: Aleksey Konstantinov Date: Tue, 10 Sep 2024 18:06:03 +0300 Subject: [PATCH] ULMS-3316 Added backoff for mqtt client --- package-lock.json | 18 ++-------------- package.json | 3 +-- src/mqtt.js | 54 +++++++++++++++++------------------------------ 3 files changed, 22 insertions(+), 53 deletions(-) diff --git a/package-lock.json b/package-lock.json index f439e73..f6c5b82 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "@ulms/api-clients", - "version": "7.17.0", + "version": "7.17.0-dev.0-ULMS-3316", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "@ulms/api-clients", - "version": "7.17.0", + "version": "7.17.0-dev.0-ULMS-3316", "license": "MIT", "dependencies": { "axios": "1.6.2", @@ -15,7 +15,6 @@ "lodash": "4.17.21", "mime": "2.6.0", "mqtt": "3.0.0", - "mqtt-pattern": "1.2.0", "nats.ws": "1.7.2", "p-queue": "7.4.1", "uuid": "8.3.2" @@ -9944,11 +9943,6 @@ "node": ">=4.0.0" } }, - "node_modules/mqtt-match": { - "version": "1.0.3", - "resolved": "https://registry.npmjs.org/mqtt-match/-/mqtt-match-1.0.3.tgz", - "integrity": "sha512-nfeAp+chyjVeIvvrgMhQCfDAIVp/zXX8rtxHQwuAWuapqAdFs1F0kIekG445ps3xs/qFPK6l2xRlAyiqqwbmrQ==" - }, "node_modules/mqtt-packet": { "version": "6.10.0", "resolved": "https://registry.npmjs.org/mqtt-packet/-/mqtt-packet-6.10.0.tgz", @@ -9959,14 +9953,6 @@ "process-nextick-args": "^2.0.1" } }, - "node_modules/mqtt-pattern": { - "version": "1.2.0", - "resolved": "https://registry.npmjs.org/mqtt-pattern/-/mqtt-pattern-1.2.0.tgz", - "integrity": "sha512-5tvJTrMXcvAWRc4J+YW0pnZOLi20yXFX1R7cjB0291X9aRFPyO+5vbh/kHVwUlNPfJuwijcvrd/AmwDLe1021w==", - "dependencies": { - "mqtt-match": "^1.0.2" - } - }, "node_modules/ms": { "version": "2.1.2", "resolved": "https://registry.npmjs.org/ms/-/ms-2.1.2.tgz", diff --git a/package.json b/package.json index 6825b38..3039ef1 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@ulms/api-clients", - "version": "7.17.0", + "version": "7.17.0-dev.0-ULMS-3316", "description": "JavaScript API clients for ULMS platform", "keywords": [], "homepage": "https://github.com/foxford/ulms-api-clients-js#readme", @@ -41,7 +41,6 @@ "lodash": "4.17.21", "mime": "2.6.0", "mqtt": "3.0.0", - "mqtt-pattern": "1.2.0", "nats.ws": "1.7.2", "p-queue": "7.4.1", "uuid": "8.3.2" diff --git a/src/mqtt.js b/src/mqtt.js index 121f71d..8baf243 100644 --- a/src/mqtt.js +++ b/src/mqtt.js @@ -3,8 +3,9 @@ // using version from cdn // import mqtt from 'mqtt' -import MQTTPattern from 'mqtt-pattern' +import Backoff from './backoff' +import { sleep } from './common' import { mqttReasonCodeNameEnum } from './constants' import retry from './retry' import loadScript from './script-loader' @@ -53,10 +54,7 @@ class MQTTClient { constructor(url) { this.client = undefined - this.patterns = {} this.url = url - - this.handleMessageEvent = this.handleMessageEvent.bind(this) } get connected() { @@ -75,36 +73,8 @@ class MQTTClient { return this.client.reconnecting } - bindEventListeners() { - this.client.on(MQTTClient.events.MESSAGE, this.handleMessageEvent) - } - - handleMessageEvent(topic, message, packet) { - const patterns = Object.keys(this.patterns) - - for (const pattern of patterns) { - const topicParameters = MQTTPattern.exec(pattern, topic) - - if (topicParameters !== null) { - this.patterns[pattern](topicParameters, topic, message, packet) - } - } - } - - attachRoute(topicPattern, handler) { - this.patterns[topicPattern] = handler - } - - detachRoute(topicPattern) { - if (this.patterns[topicPattern]) { - delete this.patterns[topicPattern] - } - } - connect(options) { this.client = mqtt.connect(this.url, options) - - this.bindEventListeners() } disconnect(...arguments_) { @@ -140,6 +110,7 @@ class ReconnectingMQTTClient extends MQTTClient { constructor(url, tokenProvider, reconnectLimit) { super(url) + this.backoff = new Backoff() this.forcedClose = false this.reconnectCount = 0 this.reconnectLimit = reconnectLimit || 3 @@ -154,8 +125,6 @@ class ReconnectingMQTTClient extends MQTTClient { } bindEventListeners() { - super.bindEventListeners() - this.client.on(ReconnectingMQTTClient.events.CLOSE, this.handleCloseEvent) this.client.on( ReconnectingMQTTClient.events.CONNECT, @@ -193,6 +162,8 @@ class ReconnectingMQTTClient extends MQTTClient { this.forcedClose = false this.reconnectCount = 0 + + this.backoff.reset() } handlePacketReceiveEvent(packet) { @@ -241,6 +212,8 @@ class ReconnectingMQTTClient extends MQTTClient { return this.tokenProvider.getToken().then((password) => { super.connect({ ...options, password }) + this.bindEventListeners() + return new Promise((resolve, reject) => { const connectHandler = () => { // eslint-disable-next-line no-use-before-define @@ -265,13 +238,24 @@ class ReconnectingMQTTClient extends MQTTClient { disconnect() { this.forcedClose = true + this.reconnectCount = 0 + + this.backoff.reset() super.disconnect(true) } - reconnect() { + async reconnect() { if (this.tokenProviderPromise !== undefined) return + this.forcedClose = false + + if (this.reconnectCount !== 0) { + await sleep(this.backoff.value) + + this.backoff.next() + } + this.tokenProviderPromise = this.tokenProvider .getToken() .then((password) => {