diff --git a/bus/rabbitmq/bus.js b/bus/rabbitmq/bus.js index 040d987..45d4ab7 100644 --- a/bus/rabbitmq/bus.js +++ b/bus/rabbitmq/bus.js @@ -22,7 +22,7 @@ function RabbitMQBus (options, implOpts) { this.assertQueuesOnFirstSend = (options.assertQueuesOnFirstSend === undefined) ? true : options.assertQueuesOnFirstSend; this.channels = []; - this.correlator = new Correlator(options); + this.correlator = options.correlator || new Correlator(options); this.delayOnStartup = options.delayOnStartup || 10; this.exchangeName = options.exchangeName; this.formatter = json; @@ -105,7 +105,9 @@ util.inherits(RabbitMQBus, Bus); RabbitMQBus.prototype.listen = function listen (queueName, options, callback) { - this.log('listen on queue %s', queueName); + var self = this; + + this.log('listen on queue %j', queueName); if (typeof options === "function") { callback = options; @@ -120,7 +122,11 @@ RabbitMQBus.prototype.listen = function listen (queueName, options, callback) { if (this.queues[options.queueName] === undefined) { this.log('creating queue %s', options.queueName); - this.queues[options.queueName] = new Queue(options); + var queue = new Queue(options); + queue.on('listening', function () { + self.emit('listening', queue); + }); + this.queues[options.queueName] = queue; } this.queues[options.queueName].listen(callback, options); @@ -214,6 +220,8 @@ RabbitMQBus.prototype.subscribe = function subscribe (queueName, options, callba options = {}; } + this.log('subscribe on queue %j', queueName); + var handle = null; function _unsubscribe (options) { handle.unsubscribe(options); @@ -227,7 +235,11 @@ RabbitMQBus.prototype.subscribe = function subscribe (queueName, options, callba if (this.pubsubqueues[options.queueName] === undefined) { this.log('creating pusubqueue %s', options.queueName); - this.pubsubqueues[options.queueName] = new PubSubQueue(options); + var pubSubQueue = new PubSubQueue(options); + pubSubQueue.on('subscribed', function () { + self.emit('subscribed', pubSubQueue); + }); + this.pubsubqueues[options.queueName] = pubSubQueue; } handle = this.pubsubqueues[options.queueName].subscribe(options, callback); diff --git a/bus/rabbitmq/pubsubqueue.js b/bus/rabbitmq/pubsubqueue.js index aadbeac..ffbb4fc 100644 --- a/bus/rabbitmq/pubsubqueue.js +++ b/bus/rabbitmq/pubsubqueue.js @@ -69,11 +69,13 @@ PubSubQueue.prototype.publish = function publish (event, options, cb) { PubSubQueue.prototype.subscribe = function subscribe (options, callback) { var self = this; - var listening = false; + var subscribed = false; var subscription = null; + this.log('subscribing to queue %j with routingKey %j', this.queueName, this.routingKey); + function _unsubscribe (cb) { - if (listening) { + if (subscribed) { // should we prevent multiple cancel calls? self.listenChannel .cancel(subscription.consumerTag) @@ -84,7 +86,7 @@ PubSubQueue.prototype.subscribe = function subscribe (options, callback) { } }); } else { - self.on('listening', _unsubscribe.bind(this, cb)); + self.on('subscribed', _unsubscribe.bind(this, cb)); } } @@ -119,9 +121,9 @@ PubSubQueue.prototype.subscribe = function subscribe (options, callback) { }); }, { noAck: ! self.ack }) .then(function (ok) { - listening = true; + subscribed = true; subscription = { consumerTag: ok.consumerTag }; - self.emit('listening'); + self.emit('subscribed'); }); } diff --git a/bus/rabbitmq/queue.js b/bus/rabbitmq/queue.js index 429aca6..7b3181a 100644 --- a/bus/rabbitmq/queue.js +++ b/bus/rabbitmq/queue.js @@ -74,7 +74,7 @@ Queue.prototype.listen = function listen (callback, options) { var self = this; - this.log('listening to queue %s', this.queueName); + this.log('listening to queue %j', this.queueName); if ( ! this.initialized) { return this.on('ready', listen.bind(this, callback, options));