Skip to content
This repository has been archived by the owner on Mar 10, 2020. It is now read-only.

Commit

Permalink
Merge pull request #182 from shadyvd/master
Browse files Browse the repository at this point in the history
AMQPLib: Allow queue and durability to be part of the configuration
  • Loading branch information
mcollina authored Feb 15, 2018
2 parents bc72263 + 91a6bd4 commit 97b3a9b
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 6 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ before_install:
- mkdir -p data/db
- mongod --dbpath=data/db > /dev/null &
- sleep 5
- wget http://www.us.apache.org/dist/kafka/0.9.0.1/kafka_2.11-0.9.0.1.tgz -O kafka.tgz
- wget http://www.us.apache.org/dist/kafka/0.11.0.2/kafka_2.12-0.11.0.2.tgz -O kafka.tgz
- mkdir -p kafka && tar xzf kafka.tgz -C kafka --strip-components 1
- nohup bash -c "cd kafka && bin/zookeeper-server-start.sh config/zookeeper.properties &"
- sleep 10
Expand Down
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,9 @@ var settings = {
type: 'amqplib',
json: false,
amqp: require('amqplib/callback_api'),
exchange: 'ascolatore5672'
exchange: 'ascolatore5672',
queue: 'queueName',
durableQueue: true
};

ascoltatori.build(settings, function (err, ascoltatore) {
Expand Down
8 changes: 4 additions & 4 deletions lib/amqplib_ascoltatore.js
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ AMQPLibAscoltatore.prototype._startConn = function () {

function(callback){
debug('channel created');
that._queue = util.buildIdentifier();
that._channel.assertQueue(that._queue, {durable: false}, wrap(callback));
that._queue = that._opts.queue || util.buildIdentifier();
that._channel.assertQueue(that._queue, {durable: !!that._opts.durableQueue}, wrap(callback));
},

function (callback){
Expand Down Expand Up @@ -145,7 +145,7 @@ AMQPLibAscoltatore.prototype.subscribe = function subscribe(topic, callback, don
AMQPLibAscoltatore.prototype.publish = function publish(topic, message, done) {
this._raiseIfClosed();

debug("new message published to " + topic);
debug("new message published to " + this._pubTopic(topic));

this._channel.publish(this._opts.exchange, this._pubTopic(topic), new Buffer(String(message)));
defer(done);
Expand Down Expand Up @@ -199,7 +199,7 @@ AMQPLibAscoltatore.prototype.close = function close(done) {
};

this._client_conn.on("close", doClose);
this._channel.deleteQueue(this._queue);
this._channel.deleteQueue(this._queue, { "ifUnused": true, "ifEmpty": true });
this._channel.close();

this._client_conn.close();
Expand Down

0 comments on commit 97b3a9b

Please sign in to comment.