diff --git a/.travis.yml b/.travis.yml index e2b77e7..bb512a7 100644 --- a/.travis.yml +++ b/.travis.yml @@ -23,7 +23,7 @@ before_install: - mkdir -p data/db - mongod --dbpath=data/db > /dev/null & - sleep 5 - - wget http://www.us.apache.org/dist/kafka/0.9.0.1/kafka_2.11-0.9.0.1.tgz -O kafka.tgz + - wget http://www.us.apache.org/dist/kafka/0.11.0.2/kafka_2.12-0.11.0.2.tgz -O kafka.tgz - mkdir -p kafka && tar xzf kafka.tgz -C kafka --strip-components 1 - nohup bash -c "cd kafka && bin/zookeeper-server-start.sh config/zookeeper.properties &" - sleep 10 diff --git a/README.md b/README.md index 497c65c..7272390 100644 --- a/README.md +++ b/README.md @@ -237,7 +237,9 @@ var settings = { type: 'amqplib', json: false, amqp: require('amqplib/callback_api'), - exchange: 'ascolatore5672' + exchange: 'ascolatore5672', + queue: 'queueName', + durableQueue: true }; ascoltatori.build(settings, function (err, ascoltatore) { diff --git a/lib/amqplib_ascoltatore.js b/lib/amqplib_ascoltatore.js index 8ee8cc2..8e2df62 100644 --- a/lib/amqplib_ascoltatore.js +++ b/lib/amqplib_ascoltatore.js @@ -92,8 +92,8 @@ AMQPLibAscoltatore.prototype._startConn = function () { function(callback){ debug('channel created'); - that._queue = util.buildIdentifier(); - that._channel.assertQueue(that._queue, {durable: false}, wrap(callback)); + that._queue = that._opts.queue || util.buildIdentifier(); + that._channel.assertQueue(that._queue, {durable: !!that._opts.durableQueue}, wrap(callback)); }, function (callback){ @@ -145,7 +145,7 @@ AMQPLibAscoltatore.prototype.subscribe = function subscribe(topic, callback, don AMQPLibAscoltatore.prototype.publish = function publish(topic, message, done) { this._raiseIfClosed(); - debug("new message published to " + topic); + debug("new message published to " + this._pubTopic(topic)); this._channel.publish(this._opts.exchange, this._pubTopic(topic), new Buffer(String(message))); defer(done); @@ -199,7 +199,7 @@ AMQPLibAscoltatore.prototype.close = function close(done) { }; this._client_conn.on("close", doClose); - this._channel.deleteQueue(this._queue); + this._channel.deleteQueue(this._queue, { "ifUnused": true, "ifEmpty": true }); this._channel.close(); this._client_conn.close();