From 5c4681de11e292c2583e01d092bbbc8f4ea5d136 Mon Sep 17 00:00:00 2001 From: Phuc PNT Date: Thu, 8 Sep 2016 01:06:29 +0700 Subject: [PATCH 1/7] feat (amqplib): integrate amqplib --- lib/amqplib_ascoltatore.js | 225 +++++++++++++++++++++++++++++++++++++ lib/ascoltatori.js | 1 + 2 files changed, 226 insertions(+) create mode 100644 lib/amqplib_ascoltatore.js diff --git a/lib/amqplib_ascoltatore.js b/lib/amqplib_ascoltatore.js new file mode 100644 index 0000000..f52b000 --- /dev/null +++ b/lib/amqplib_ascoltatore.js @@ -0,0 +1,225 @@ +"use strict"; + +var util = require("./util"); +var wrap = util.wrap; +var defer = util.defer; +var TrieAscoltatore = require("./trie_ascoltatore"); +var AbstractAscoltatore = require('./abstract_ascoltatore'); +var steed = require("steed")(); +var SubsCounter = require("./subs_counter"); +var debug = require("debug")("ascoltatori:amqplib"); + +/** + * The AMQPAscoltatore is a class that inherits from AbstractAscoltatore. + * It is backed by node-amqp. + * It creates or use an exchange with the given name, using a "topic" topology. + * It creates a single amqp queue for this process, in order to keep + * the overhead low. + * + * It accepts these options: + * - `client`, which is passed through to the amq.createConnection method; + * - `exchange`, the exchange name; + * - `amqp`, the amqp module (it will automatically be required if not present); + * + * @param {Object} opts The options for creating this ascoltatore. + * @api public + */ + +function AMQPLibAscoltatore(opts) { + AbstractAscoltatore.call(this, opts, { + separator: '.', + wildcardOne: '*', + wildcardSome: '#' + }); + + this._opts = opts || {}; + this._opts.amqp = this._opts.amqp || require("amqplib/callback_api"); + this._ascoltatore = new TrieAscoltatore(opts); + + this._subs_counter = new SubsCounter(); + this._startConn(); +} + +/** + * The client connection decends from AbstractAscoltatore. + * + * @api private + */ +AMQPLibAscoltatore.prototype = Object.create(AbstractAscoltatore.prototype); + +/** + * Starts a new connection to an AMQP server. + * Do nothing if it is already started. + * + * @api private + */ +AMQPLibAscoltatore.prototype._startConn = function () { + var conn = null, + channel = null, + that = this; + + if (this._client_conn === undefined) { + + var clientOpts = this._opts.client; + var url = ['amqp://' , clientOpts.login, ':', clientOpts.password, '@', clientOpts.host, ':', clientOpts.port].join(''); + var socketOptions = this._opts.socketOptions || {}; + + debug("connecting to " + this._opts.client); + + steed.series([ + function (callback) { + that._opts.amqp.connect(url, socketOptions, function (err, conn) { + that._client_conn = conn; + conn.on("error", function (error) { + if (typeof error === 'string') { + error = (new Error(error)); + } + + that.emit("error", error); + }); + callback(); + }); + }, + + function (callback) { + debug('connected'); + that._client_conn.createChannel(function(err, channel){ + that._channel = channel; + that._channel.prefetch(42); // magic number? + callback(); + }); + }, + + function(callback){ + debug('channel created'); + that._queue = util.buildIdentifier(); + that._channel.assertQueue(that._queue, null, wrap(callback)); + }, + + function (callback){ + debug('queue created'); + that._channel.assertExchange(that._opts.exchange, 'topic', {}, wrap(callback)); + }, + + function (callback){ + debug('exchange existed'); + that._channel.consume(that._queue, function(msg){ + that._channel.ack(msg); + var topic = that._recvTopic(msg.fields.routingKey); + debug("new message received from queue on topic " + topic); + that._ascoltatore.publish(topic, msg.content.toString()); + }, null, wrap(callback)); + }, + + function (callback) { + debug("subscribed to queue"); + that.emit("ready"); + callback(); + } + ]); + } + return this._client_conn; +}; + +AMQPLibAscoltatore.prototype.subscribe = function subscribe(topic, callback, done) { + this._raiseIfClosed(); + + this._ascoltatore.subscribe(topic, callback); + + if (!this._subs_counter.include(topic)) { + debug("binding queue to topic " + topic); + + console.log('pattern', topic, this._subTopic(topic)); + + this._channel.bindQueue(this._queue, this._opts.exchange, this._subTopic(topic), {}, function(err, ok){ + debug("queue bound to topic " + topic); + defer(done); + }); + } else { + defer(done); + } + + this._subs_counter.add(topic); + + debug("registered new subscriber for topic " + topic); +}; + +AMQPLibAscoltatore.prototype.publish = function publish(topic, message, done) { + this._raiseIfClosed(); + + debug("new message published to " + topic); + console.log('pattern', topic, this._pubTopic(topic)); + + this._channel.publish(this._opts.exchange, this._pubTopic(topic), new Buffer(String(message))); + defer(done); +}; + +AMQPLibAscoltatore.prototype.unsubscribe = function unsubscribe(topic, callback, done) { + this._raiseIfClosed(); + this._subs_counter.remove(topic); + + debug("deregistered subscriber for topic " + topic); + + this._ascoltatore.unsubscribe(topic, callback); + + if (!this._subs_counter.include(topic)) { + this._channel.unbindQueue(this._queue, this._opts.exchange, this._subTopic(topic), {}, function(err, ok) { + debug("queue unbound to topic " + topic); + defer(done); + }); + } else { + defer(done); + } + + return this; +}; + +AMQPLibAscoltatore.prototype.close = function close(done) { + var that = this; + + if (this._closed) { + wrap(done)(); + return; + } + + if (this._closing) { + this.on("closed", done); + return; + } + + this._closing = true; + + if (this._client_conn !== undefined) { + var doClose = function () { + if (that._closed) { + debug("closing twice, one was an error"); + return; + } + + debug("closed"); + defer(done); + that.emit("closed"); + }; + + this._client_conn.on("close", doClose); + this._channel.deleteQueue(this._queue); + this._channel.close(); + + this._client_conn.close(); + this._client_conn.removeAllListeners("error"); + this._client_conn.on("error", doClose); + + delete this._client_conn; + delete this._channel; + delete this._queue; + } +}; + +util.aliasAscoltatore(AMQPLibAscoltatore.prototype); + +/** + * Exports the AMQPAscoltatore + * + * @api public + */ +module.exports = AMQPLibAscoltatore; diff --git a/lib/ascoltatori.js b/lib/ascoltatori.js index 68edede..dc62c5d 100644 --- a/lib/ascoltatori.js +++ b/lib/ascoltatori.js @@ -14,6 +14,7 @@ module.exports.EventEmitter2Ascoltatore = require('./event_emitter2_ascoltatore' module.exports.RedisAscoltatore = require("./redis_ascoltatore"); module.exports.ZeromqAscoltatore = require("./zeromq_ascoltatore"); module.exports.AMQPAscoltatore = require("./amqp_ascoltatore"); +module.exports.AMQPLibAscoltatore = require("./amqplib_ascoltatore"); module.exports.MQTTAscoltatore = require("./mqtt_ascoltatore"); module.exports.PrefixAscoltatore = require("./prefix_acoltatore"); module.exports.MongoAscoltatore = require('./mongo_ascoltatore'); From 64ffe19af2f4e02e8e64b134a6b385df78cfae40 Mon Sep 17 00:00:00 2001 From: Phuc PNT Date: Thu, 8 Sep 2016 01:19:48 +0700 Subject: [PATCH 2/7] test (amqplib): add test case Copied from the amqp test case --- test/amqplib_ascoltatore_spec.js | 31 +++++++++++++++++++++++++++++++ test/common.js | 8 ++++++++ 2 files changed, 39 insertions(+) create mode 100644 test/amqplib_ascoltatore_spec.js diff --git a/test/amqplib_ascoltatore_spec.js b/test/amqplib_ascoltatore_spec.js new file mode 100644 index 0000000..1a14356 --- /dev/null +++ b/test/amqplib_ascoltatore_spec.js @@ -0,0 +1,31 @@ +var steed = require('steed')(); + +describeAscoltatore("AMQPLib", function() { + afterEach(function() { + this.instance.close(); + this.instance.on("error", function () { + console.log(arguments); + // we should just close it, + // avoid errors + }); + }); + + it("should sync two instances", function(done) { + var other = new ascoltatori.AMQPLibAscoltatore(this.instance._opts); + var that = this; + steed.series([ + + function(cb) { + other.on("ready", cb); + }, + + function(cb) { + that.instance.subscribe("hello", wrap(done), cb); + }, + + function(cb) { + other.publish("hello", null, cb); + } + ]); + }); +}); diff --git a/test/common.js b/test/common.js index 7574eae..3f4ac72 100644 --- a/test/common.js +++ b/test/common.js @@ -47,6 +47,14 @@ global.AMQPSettings = function() { }; }; +global.AMQPLibSettings = function() { + return { + json: false, + amqp: require("amqplib/callback_api"), + exchange: "ascolatore" + global.nextPort() + }; +}; + global.MQTTSettings = function() { return { json: false, From e31ec8db4ca30e5c6d5083f697c9873c1b8baa49 Mon Sep 17 00:00:00 2001 From: Phuc PNT Date: Thu, 8 Sep 2016 01:22:48 +0700 Subject: [PATCH 3/7] fix (package.json): add amqplib as optional dependency --- package.json | 1 + 1 file changed, 1 insertion(+) diff --git a/package.json b/package.json index 904ece8..96d508e 100644 --- a/package.json +++ b/package.json @@ -70,6 +70,7 @@ "hiredis": "^0.4.1", "zmq": "^2.14.0", "amqp": "~0.2.4", + "amqplib": "~0.4.1", "mqtt": "^1.10.0", "mongodb": "^2.1.18", "kerberos": "~0.0", From bc6a2d0f0825422ef111738ceaa335546c53f05a Mon Sep 17 00:00:00 2001 From: Phuc PNT Date: Thu, 8 Sep 2016 22:41:07 +0700 Subject: [PATCH 4/7] fix (url): correct url for amqp broker --- lib/amqplib_ascoltatore.js | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/amqplib_ascoltatore.js b/lib/amqplib_ascoltatore.js index f52b000..436746d 100644 --- a/lib/amqplib_ascoltatore.js +++ b/lib/amqplib_ascoltatore.js @@ -60,11 +60,11 @@ AMQPLibAscoltatore.prototype._startConn = function () { if (this._client_conn === undefined) { - var clientOpts = this._opts.client; - var url = ['amqp://' , clientOpts.login, ':', clientOpts.password, '@', clientOpts.host, ':', clientOpts.port].join(''); + var url = this._opts.url || 'amqp://127.0.0.1:5672'; + var socketOptions = this._opts.socketOptions || {}; - debug("connecting to " + this._opts.client); + debug("connecting to " + this._opts.url); steed.series([ function (callback) { From 01b837d08045dae582f38fa2b50b97e8e6567c09 Mon Sep 17 00:00:00 2001 From: Phuc PNT Date: Thu, 8 Sep 2016 22:45:53 +0700 Subject: [PATCH 5/7] docs (README.md): add example config for usage with amqplib --- README.md | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/README.md b/README.md index 0392381..7537b0a 100644 --- a/README.md +++ b/README.md @@ -229,6 +229,22 @@ ascoltatori.build(settings, function (err, ascoltatore) { }); ``` +Use with [amqplib](https://www.npmjs.com/package/amqplib) + +```javascript +var ascoltatori = require('ascoltatori'); +var settings = { + type: 'amqp', + json: false, + amqp: require('amqplib/callback_api'), + exchange: 'ascolatore5672' +}; + +ascoltatori.build(settings, function (err, ascoltatore) { + // ... +}); +``` + ### ZeroMQ ```javascript From 860b1a2987c4abf750f360387a6b82eac8a98432 Mon Sep 17 00:00:00 2001 From: Phuc PNT Date: Sat, 10 Sep 2016 10:22:12 +0700 Subject: [PATCH 6/7] fix (store type declare): add amqplib as store type --- README.md | 2 +- lib/ascoltatori.js | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 7537b0a..7fbebf7 100644 --- a/README.md +++ b/README.md @@ -234,7 +234,7 @@ Use with [amqplib](https://www.npmjs.com/package/amqplib) ```javascript var ascoltatori = require('ascoltatori'); var settings = { - type: 'amqp', + type: 'amqplib', json: false, amqp: require('amqplib/callback_api'), exchange: 'ascolatore5672' diff --git a/lib/ascoltatori.js b/lib/ascoltatori.js index dc62c5d..0e4df04 100644 --- a/lib/ascoltatori.js +++ b/lib/ascoltatori.js @@ -29,6 +29,7 @@ module.exports.KafkaAscoltatore = require("./kafka_ascoltatore"); */ var classes = { "amqp": module.exports.AMQPAscoltatore, + "amqplib": module.exports.AMQPLibAscoltatore, "trie": module.exports.TrieAscoltatore, "eventemitter2": module.exports.EventEmitter2Ascoltatore, "mqtt": module.exports.MQTTAscoltatore, From eb1390918db1f4618f364a39368092b3306dcec7 Mon Sep 17 00:00:00 2001 From: Phuc PNT Date: Mon, 12 Sep 2016 21:40:49 +0700 Subject: [PATCH 7/7] chore (amqplib): remove console.log --- lib/amqplib_ascoltatore.js | 3 --- 1 file changed, 3 deletions(-) diff --git a/lib/amqplib_ascoltatore.js b/lib/amqplib_ascoltatore.js index 436746d..664cc6b 100644 --- a/lib/amqplib_ascoltatore.js +++ b/lib/amqplib_ascoltatore.js @@ -129,8 +129,6 @@ AMQPLibAscoltatore.prototype.subscribe = function subscribe(topic, callback, don if (!this._subs_counter.include(topic)) { debug("binding queue to topic " + topic); - console.log('pattern', topic, this._subTopic(topic)); - this._channel.bindQueue(this._queue, this._opts.exchange, this._subTopic(topic), {}, function(err, ok){ debug("queue bound to topic " + topic); defer(done); @@ -148,7 +146,6 @@ AMQPLibAscoltatore.prototype.publish = function publish(topic, message, done) { this._raiseIfClosed(); debug("new message published to " + topic); - console.log('pattern', topic, this._pubTopic(topic)); this._channel.publish(this._opts.exchange, this._pubTopic(topic), new Buffer(String(message))); defer(done);