Skip to content

Commit

Permalink
Merge pull request mqttjs#6 from mqttjs/writeToStream
Browse files Browse the repository at this point in the history
Write directly to the Stream
  • Loading branch information
mcollina committed Oct 21, 2015
2 parents 929a046 + da49701 commit c1ebbbd
Show file tree
Hide file tree
Showing 10 changed files with 829 additions and 614 deletions.
10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ API
---
* <a href="#generate"><code>mqtt#<b>generate()</b></code></a>
* <a href="#writeToStream"><code>mqtt#<b>writeToStream()</b></code></a>
* <a href="#parser"><code>mqtt#<b>parser()</b></code></a>
<a name="generate">
Expand All @@ -96,6 +97,15 @@ Generates a `Buffer` containing an MQTT packet.
The object must be one of the ones specified by the [packets](#packets)
section. Throws an `Error` if a packet cannot be generated.
<a name="writeToStream">
### mqtt.writeToStream(object, stream)
Writes the mqtt packet defined by `object` to the given stream.
The object must be one of the ones specified by the [packets](#packets)
section. Emits an `Error` on the stream if a packet cannot be generated.
On node >= 12, this function automatically calls `cork()` on your stream,
and then it calls `uncork()` on the next tick.
<a name="parser">
### mqtt.parser()
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/generate.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@

var mqtt = require('../')
, max = 10000000
, max = 100000
, i
, start = Date.now()
, time
Expand Down
53 changes: 53 additions & 0 deletions benchmarks/generateTick.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@

var mqtt = require('../')
, max = 1000000
, i = 0
, start = Date.now()
, time
, buf = new Buffer(10)
, net = require('net')
, server = net.createServer(handle)
, dest

buf.fill('test')

function handle(sock) {
sock.resume();
}

server.listen(0, function() {
dest = net.connect(server.address());

dest.on('connect', tickWait);
dest.on('drain', tickWait);

dest.on('finish', function () {
time = Date.now() - start;
console.log('Total time', time);
console.log('Total packets', max);
console.log('Packet/s', max / time * 1000);
server.close();
});
});

function tickWait () {
//console.log('tickWait', i)
var res = true
//var toSend = new Buffer(5 + buf.length)

for (; i < max && res; i++) {
res = dest.write(mqtt.generate({
cmd: 'publish'
, topic: 'test'
, payload: buf
}))
//buf.copy(toSend, 5)
//res = dest.write(toSend, 'buffer')
//console.log(res)
}

if (i >= max) {
dest.end();
return;
}
}
51 changes: 51 additions & 0 deletions benchmarks/writeToStream.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@

var mqtt = require('../')
, max = 1000000
, i = 0
, start = Date.now()
, time
, buf = new Buffer(10)
, net = require('net')
, server = net.createServer(handle)
, dest

function handle(sock) {
sock.resume();
}

buf.fill('test')

server.listen(0, function() {
dest = net.connect(server.address());

dest.on('connect', tickWait);
dest.on('drain', tickWait);

dest.on('finish', function () {
time = Date.now() - start;
console.log('Total time', time);
console.log('Total packets', max);
console.log('Packet/s', max / time * 1000);
server.close();
});
});

function tickWait() {
var res = true
//var toSend = new Buffer(5)

for (; i < max && res; i++) {
res = mqtt.writeToStream({
cmd: 'publish'
, topic: 'test'
, payload: buf
}, dest)
//dest.write(toSend, 'buffer')
//res = dest.write(buf, 'buffer')
}

if (i >= max) {
dest.end();
return;
}
}
97 changes: 76 additions & 21 deletions constants.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
/* Protocol - protocol constants */
var protocol = module.exports;

/* Command code => mnemonic */
module.exports.types = {
protocol.types = {
0: 'reserved',
1: 'connect',
2: 'connack',
Expand All @@ -21,32 +22,86 @@ module.exports.types = {
};

/* Mnemonic => Command code */
module.exports.codes = {}
for(var k in module.exports.types) {
var v = module.exports.types[k];
module.exports.codes[v] = k;
protocol.codes = {}
for(var k in protocol.types) {
var v = protocol.types[k];
protocol.codes[v] = k;
}

/* Header */
module.exports.CMD_SHIFT = 4;
module.exports.CMD_MASK = 0xF0;
module.exports.DUP_MASK = 0x08;
module.exports.QOS_MASK = 0x03;
module.exports.QOS_SHIFT = 1;
module.exports.RETAIN_MASK = 0x01;
protocol.CMD_SHIFT = 4;
protocol.CMD_MASK = 0xF0;
protocol.DUP_MASK = 0x08;
protocol.QOS_MASK = 0x03;
protocol.QOS_SHIFT = 1;
protocol.RETAIN_MASK = 0x01;

/* Length */
module.exports.LENGTH_MASK = 0x7F;
module.exports.LENGTH_FIN_MASK = 0x80;
protocol.LENGTH_MASK = 0x7F;
protocol.LENGTH_FIN_MASK = 0x80;

/* Connack */
module.exports.SESSIONPRESENT_MASK = 0x01;
protocol.SESSIONPRESENT_MASK = 0x01;
protocol.SESSIONPRESENT_HEADER = new Buffer([protocol.SESSIONPRESENT_MASK]);
protocol.CONNACK_HEADER = new Buffer([protocol.codes['connack'] << protocol.CMD_SHIFT])

/* Connect */
module.exports.USERNAME_MASK = 0x80;
module.exports.PASSWORD_MASK = 0x40;
module.exports.WILL_RETAIN_MASK = 0x20;
module.exports.WILL_QOS_MASK = 0x18;
module.exports.WILL_QOS_SHIFT = 3;
module.exports.WILL_FLAG_MASK = 0x04;
module.exports.CLEAN_SESSION_MASK = 0x02;
protocol.USERNAME_MASK = 0x80;
protocol.PASSWORD_MASK = 0x40;
protocol.WILL_RETAIN_MASK = 0x20;
protocol.WILL_QOS_MASK = 0x18;
protocol.WILL_QOS_SHIFT = 3;
protocol.WILL_FLAG_MASK = 0x04;
protocol.CLEAN_SESSION_MASK = 0x02;
protocol.CONNECT_HEADER = new Buffer([protocol.codes['connect'] << protocol.CMD_SHIFT])

function genHeader (type) {
return [0, 1, 2].map(function(qos) {
return [0, 1].map(function(dup) {
return [0, 1].map(function(retain) {
var buf = new Buffer(1)
buf.writeUInt8(
protocol.codes[type] << protocol.CMD_SHIFT |
(dup ? protocol.DUP_MASK : 0 ) |
qos << protocol.QOS_SHIFT | retain, 0, true)
return buf
});
});
});
}

/* Publish */
protocol.PUBLISH_HEADER = genHeader('publish');

/* SUBSCRIBE */
protocol.SUBSCRIBE_HEADER = genHeader('subscribe');

/* UNSUBSCRIBE */
protocol.UNSUBSCRIBE_HEADER = genHeader('unsubscribe');

/* Confirmations */
protocol.ACKS = {
unsuback: genHeader('unsuback'),
puback: genHeader('puback'),
pubcomp: genHeader('pubcomp'),
pubrel: genHeader('pubrel'),
pubrec: genHeader('pubrec')
};

protocol.SUBACK_HEADER = new Buffer([protocol.codes['suback'] << protocol.CMD_SHIFT]);

/* Protocol versions */
protocol.VERSION3 = new Buffer([3])
protocol.VERSION4 = new Buffer([4])

/* QOS */
protocol.QOS = [0, 1, 2].map(function(qos) {
return new Buffer([qos])
})

/* empty packets */
protocol.EMPTY = {
pingreq: new Buffer([protocol.codes['pingreq'] << 4, 0]),
pingresp: new Buffer([protocol.codes['pingresp'] << 4, 0]),
disconnect: new Buffer([protocol.codes['disconnect'] << 4, 0])
};
Loading

0 comments on commit c1ebbbd

Please sign in to comment.