-
Notifications
You must be signed in to change notification settings - Fork 25
/
connection.js
107 lines (89 loc) · 2.51 KB
/
connection.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
'use strict'
var generateStream = require('./lib/generateStream')
var parseStream = require('./lib/parseStream')
var writeToStream = require('./lib/writeToStream')
var Duplexify = require('duplexify')
var inherits = require('inherits')
function emitPacket (packet) {
this.emit(packet.cmd, packet)
}
function Connection (duplex, opts, cb) {
if (!(this instanceof Connection)) {
return new Connection(duplex, opts)
}
if (typeof opts === 'function') {
cb = opts
opts = {}
}
opts = opts || {}
this._generator = writeToStream(duplex, opts)
this._parser = parseStream(opts)
// defer piping, so consumer can attach event listeners
// otherwise we might lose events
process.nextTick(() => {
duplex.pipe(this._parser)
})
this._generator.on('error', this.emit.bind(this, 'error'))
this._parser.on('error', this.emit.bind(this, 'error'))
this.stream = duplex
duplex.on('error', this.emit.bind(this, 'error'))
duplex.on('close', this.emit.bind(this, 'close'))
Duplexify.call(this, this._generator, this._parser, { objectMode: true })
// MQTT.js basic default
if (opts.notData !== true) {
var that = this
this.once('data', function (connectPacket) {
that.setOptions(connectPacket, opts)
that.on('data', emitPacket)
if (cb) {
cb()
}
that.emit('data', connectPacket)
})
}
}
inherits(Connection, Duplexify)
;['connect',
'connack',
'publish',
'puback',
'pubrec',
'pubrel',
'pubcomp',
'subscribe',
'suback',
'unsubscribe',
'unsuback',
'pingreq',
'pingresp',
'disconnect',
'auth'
].forEach(function (cmd) {
Connection.prototype[cmd] = function (opts, cb) {
opts = opts || {}
opts.cmd = cmd
// Flush the buffer if needed
// UGLY hack, we should listen for the 'drain' event
// and start writing again, but this works too
this.write(opts)
if (cb) setImmediate(cb)
}
})
Connection.prototype.destroy = function () {
if (this.stream.destroy) this.stream.destroy()
else this.stream.end()
}
Connection.prototype.setOptions = function (packet, opts) {
let options = {}
Object.assign(options, packet)
// Specifically set the protocol version for client connections
if (options.cmd === 'connack') {
options.protocolVersion = opts && opts.protocolVersion ? opts.protocolVersion : 4
}
this.options = options
this._parser.setOptions(options)
this._generator.setOptions(options)
}
module.exports = Connection
module.exports.parseStream = parseStream
module.exports.generateStream = generateStream