Skip to content

Commit

Permalink
fix(sendPacket): drain leak (mqttjs#1401)
Browse files Browse the repository at this point in the history
  • Loading branch information
BertKleewein authored Jan 18, 2022
1 parent 4604b10 commit 7ec4b8f
Showing 1 changed file with 5 additions and 3 deletions.
8 changes: 5 additions & 3 deletions lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ function sendPacket (client, packet, cb) {
debug('sendPacket :: writing to stream')
const result = mqttPacket.writeToStream(packet, client.stream, client.options)
debug('sendPacket :: writeToStream result %s', result)
if (!result && cb) {
if (!result && cb && cb !== nop) {
debug('sendPacket :: handle events on `drain` once through callback.')
client.stream.once('drain', cb)
} else if (cb) {
Expand All @@ -190,6 +190,8 @@ function flush (queue) {
Object.keys(queue).forEach(function (messageId) {
if (typeof queue[messageId].cb === 'function') {
queue[messageId].cb(new Error('Connection closed'))
// This is suspicious. Why do we only delete this if we have a callbck?
// If this is by-design, then adding no as callback would cause this to get deleted unintentionally.
delete queue[messageId]
}
})
Expand Down Expand Up @@ -569,7 +571,7 @@ MqttClient.prototype._handlePacket = function (packet, done) {

MqttClient.prototype._checkDisconnecting = function (callback) {
if (this.disconnecting) {
if (callback) {
if (callback && callback !== nop) {
callback(new Error('client disconnecting'))
} else {
this.emit('error', new Error('client disconnecting'))
Expand Down Expand Up @@ -1581,7 +1583,7 @@ MqttClient.prototype._handleAck = function (packet) {
const that = this
let err

if (!cb) {
if (!cb || cb === nop) {
debug('_handleAck :: Server sent an ack in error. Ignoring.')
// Server sent an ack in error, ignore it.
return
Expand Down

0 comments on commit 7ec4b8f

Please sign in to comment.