From 26c761e5797a6d0d2912466d6eb35c23fc952bb7 Mon Sep 17 00:00:00 2001 From: Rod Date: Thu, 15 Feb 2018 18:13:23 -0500 Subject: [PATCH] add tests --- lib/amqp-base.coffee | 5 +- lib/amqp-consumer.coffee | 204 +++------------------------------ lib/amqp-producer.coffee | 4 +- test/test-amqp-consumer.coffee | 171 ++------------------------- test/test-amqp-producer.coffee | 7 +- 5 files changed, 35 insertions(+), 356 deletions(-) diff --git a/lib/amqp-base.coffee b/lib/amqp-base.coffee index 001595d..5832b66 100644 --- a/lib/amqp-base.coffee +++ b/lib/amqp-base.coffee @@ -2,8 +2,9 @@ amqp = require 'amqp' class AmqpBase - constructor:()-> - undefined + constructor:(@connection)-> + if @connection? + @_on_connect() # Establish a new connection to the specified broker. # diff --git a/lib/amqp-consumer.coffee b/lib/amqp-consumer.coffee index a3f6ec4..d067eef 100644 --- a/lib/amqp-consumer.coffee +++ b/lib/amqp-consumer.coffee @@ -67,25 +67,8 @@ AmqpBase = require(path.join(LIB_DIR, 'amqp-base')).AmqpBase # class AmqpConsumer extends AmqpBase - _on_connect:(callback)=> - @queues_by_name ?= { } - @queue_names_by_subscription_tag ?= { } - callback?() - - _on_disconnect:(callback)=> - @queues_by_name = undefined # TODO cleanly unsub from queues? - @queue_names_by_subscription_tag ?= { } - callback?() - - @deprecation_warning_shown: false - @always_show_deprecation_warning: false - constructor:(args...)-> - if args? and args.length > 0 - if AmqpConsumer.always_show_deprecation_warning or not AmqpConsumer.deprecation_warning_shown - console.error "WARNING: Passing arguments to the AmqpConsumer constructor is deprecated. Please use the new API." - AmqpConsumer.deprecation_warning_shown = true - super() + super(args...) # **message_converter** - *a utility method used to convert the message before consuming.* # @@ -156,7 +139,7 @@ class AmqpConsumer extends AmqpBase callback new Error("Queue #{queue_name} not known.") else queue.destroy(options) - unless options.ifUnused or options.ifEmpty # when ifUnused or ifEmpty is set there doesn't seem to be any way to tell if the queue was actually destroyed, so keep the reference + unless options?.ifUnused or options?.ifEmpty # when ifUnused or ifEmpty is set there doesn't seem to be any way to tell if the queue was actually destroyed, so keep the reference if queue_name? delete @queues_by_name[queue_name] callback undefined @@ -406,150 +389,15 @@ class AmqpConsumer extends AmqpBase queue.__amqp_util_queue_name = queue_name callback? undefined, queue, false + _on_connect:(callback)=> + @queues_by_name ?= { } + @queue_names_by_subscription_tag ?= { } + callback?() - ############################################################################## - ############################################################################## - ############################################################################## - # ############################################################################## - # - # # if typeof impl_options is 'function' and not callback? - # # callback = impl_options - # # impl_options = null - # # if typeof connection_options is 'function' and not callback? - # # callback = connection_options - # # connection_options = null - # # if typeof connection_options is 'function' and not callback? - # # callback = connection_options - # # connection_options = null - # # if typeof broker_url isnt 'string' and not connection_options? - # # connection_options = broker_url - # # broker_url = null - # # - # # @connection = amqp.createConnection({url:connection},connection_options) - # # @connection.on 'error', (err)=> - # # console.error "error",err - # # @connection.once 'ready', ()=> - # # @queue = @connection.queue queue, queue_options, (response...)=> - # # callback?(null,response...) - # # - # - # # **connect** - *connects to a new or existing AMQP exchange.* - # # - # # Accepts four arguments: - # # - # # - `broker_url` is the URL by which to connect to the message broker - # # (e.g., `amqp://guest:guest@localhost:5672`) - # # - # # - `connection_options` is a partially AMQP-implementation-specific map of - # # options. See - # # [postwait's node-amqp documentation](https://github.com/postwait/node-amqp/#connection-options-and-url) - # # for details. - # # - # # - `queue` is the name of the AMQP Queue on which to listen. - # # - # # - `queue_options` is an optional map of additional AMQP queue options (see - # # [node-ampq's documentation](https://github.com/postwait/node-amqp/#connectionqueuename-options-opencallback) - # # for details). - # # - # # - `callback` is an optional function that will be invoked once the consumer - # # is ready for use. - # old_connect:(args...)=> - # if AmqpConsumer.always_show_deprecation_warning or not AmqpConsumer.deprecation_warning_shown - # console.error "WARNING: The AmqpConsumer.old_connect method is deprecated. Please use the new API." - # AmqpConsumer.deprecation_warning_shown = true - # # Parse out the method parameters, allowing optional values. - # connection = args.shift() - # if args.length > 0 and ((not args[0]?) or typeof args[0] is 'object') - # connection_options = args.shift() - # if args.length > 0 and ((not args[0]?) or typeof args[0] is 'string') - # queue = args.shift() - # if args.length > 0 and ((not args[0]?) or typeof args[0] is 'object') - # queue_options = args.shift() - # else - # queue_options = {} - # if args.length > 0 and ((not args[0]?) or typeof args[0] is 'function') - # callback = args.shift() - # @connection = amqp.createConnection({url:connection},connection_options) - # @connection.on 'error', (err)=> - # console.log "error",err - # @connection.once 'ready', ()=> - # @queue = @connection.queue queue, queue_options, (response...)=> - # callback?(null,response...) - # - # # **subscribe** - *start listening for incoming messages.* - # # - # # The method takes two or four parameters: - # # - # # - `exchange_name` is the name of the AMQP exchange to bind to. - # # - # # - `pattern` is a routing-pattern to be used to filter the messages. (See - # # [node-ampq's documentation](https://github.com/postwait/node-amqp/#queuebindexchange-routing) - # # for details.) - # # - # # - `callback` is the "handle message" function to invoke when a message - # # is received. The callback has the following signature: - # # - # # callback(message, headers, deliveryInfo); - # # - # # (See - # # [node-ampq's documentation](https://github.com/postwait/node-amqp/#queuesubscribeoptions-listener) - # # for details.) - # # - # # - `done` is a callback method that is invoked when the subscription is - # # established and active. - # # - # # The `exchange_name` and `pattern` are optional. When present, I will attempt - # # to bind to the specified exchange. When absent, the queue should already - # # be bound to some exchange. - # # - # old_subscribe:(args...)=> # args:= exchange_name,pattern,subscribe_options,callback,done - # if AmqpConsumer.always_show_deprecation_warning or not AmqpConsumer.deprecation_warning_shown - # console.error "WARNING: The AmqpConsumer.old_subscribe method is deprecated. Please use the new API." - # AmqpConsumer.deprecation_warning_shown = true - # if args.length > 0 and ((not args[0]?) or typeof args[0] is 'string') - # exchange_name = args.shift() - # if args.length > 0 and ((not args[0]?) or typeof args[0] is 'string') - # pattern = args.shift() - # if args.length > 0 and ((not args[0]?) or typeof args[0] is 'object') - # subscribe_options = args.shift() - # if args.length > 0 and ((not args[0]?) or typeof args[0] is 'function') - # callback = args.shift() - # if args.length > 0 and ((not args[0]?) or typeof args[0] is 'function') - # done = args.shift() - # if exchange_name? - # @old_bind exchange_name, pattern, ()=> - # @_inner_subscribe(subscribe_options,callback,done) - # else - # @_inner_subscribe(subscribe_options,callback,done) - # - # _inner_subscribe:(subscribe_options,callback,done)=> - # @queue.once 'basicConsumeOk',()=> - # done?() - # @queue.subscribe( - # subscribe_options, - # (m,h,i,x...)=>callback(@message_converter(m),h,i,x...) - # ).addCallback( - # (ok)=>@subscription_tag = ok.consumerTag - # ) - # - # old_bind:(exchange_name,pattern,callback)=> - # if AmqpConsumer.always_show_deprecation_warning or not AmqpConsumer.deprecation_warning_shown - # console.error "WARNING: The AmqpConsumer.old_bind method is deprecated. Please use the new API." - # AmqpConsumer.deprecation_warning_shown = true - # @queue.once 'queueBindOk', ()=>callback() - # @queue.bind(exchange_name,pattern) - # - # # **unsubscribe** - *stop listening for incoming messages.* - # old_unsubscribe:(callback)=> - # if AmqpConsumer.always_show_deprecation_warning or not AmqpConsumer.deprecation_warning_shown - # console.error "WARNING: The AmqpConsumer.old_unsubscribe method is deprecated. Please use the new API." - # AmqpConsumer.deprecation_warning_shown = true - # try - # @queue.unsubscribe(@subscription_tag).addCallback ()=> - # @subscription_tag = null - # callback?() - # catch err - # callback?(err) + _on_disconnect:(callback)=> + @queues_by_name = undefined # TODO cleanly unsub from queues? + @queue_names_by_subscription_tag ?= { } + callback?() # ███████ ██ ██ ██████ ██████ ██ █████ ███████ ███████ ███████ ███████ # ██ ██ ██ ██ ██ ██ ██ ██ ██ ██ ██ ██ ██ @@ -560,32 +408,12 @@ class AmqpConsumer extends AmqpBase # **AmqpStringConsumer** - *an `AmqpConsumer` that automatically converts inbound messages from Buffers into Strings.* class AmqpStringConsumer extends AmqpConsumer - # **constructor** - *create a new `AmqpStringConsumer`.* - # - # Accepts 0 to 7 parameters: - # - # - `encoding` - the encoding to use when converting from bytes to characters. - # - others - when present, passed to `connect` - # - # When invoked with fewer than two, the `AmqpConsumer` instance is created - # but no connection is established. (A connection can be established later - # by calling `connect`.) - # - # When 2 or more arguments *are* passed, they are immediately passed to the - # `connect` function. - # - constructor:(@encoding,connection,connection_options,queue,queue_options,callback)-> - if typeof queue_options is 'function' and (not callback?) - callback = queue_options - queue_options = queue - queue = connection_options - connection_options = connection - connection = @encoding - @encoding = null - if connection? or queue? - super(connection,connection_options,queue,queue_options,callback) - else - super() + constructor:(args...)-> + if args?.length > 0 and (typeof args[0] is 'object' or not args[0]?) + connection = args.shift() + if args?.length > 0 and (typeof args[0] is 'string' or not args[0]?) + @encoding = args.shift() + super(connection) # **message_converter** - *converts a Buffer to a String.* message_converter:(msg)=> diff --git a/lib/amqp-producer.coffee b/lib/amqp-producer.coffee index f0d11ca..56c851c 100644 --- a/lib/amqp-producer.coffee +++ b/lib/amqp-producer.coffee @@ -15,8 +15,8 @@ AmqpBase = require(path.join(LIB_DIR, 'amqp-base')).AmqpBase class AmqpProducer extends AmqpBase - constructor:()-> - super() + constructor:(args...)-> + super(args...) # **default_routing_key** - *the default key value to use in `publish`.* default_routing_key: null diff --git a/test/test-amqp-consumer.coffee b/test/test-amqp-consumer.coffee index 48db632..323883e 100644 --- a/test/test-amqp-consumer.coffee +++ b/test/test-amqp-consumer.coffee @@ -41,7 +41,7 @@ describe 'AmqpConsumer',-> subscription_tag = null amqpc.connect TEST_BROKER, (err)=> assert.ok not err?, err - amqpc.create_queue TEST_QUEUE, TEST_QUEUE_OPTIONS, TEST_EXCHANGE, TEST_ROUTING_KEY, (err)=> + amqpc.get_queue TEST_QUEUE, TEST_QUEUE_OPTIONS, TEST_EXCHANGE, TEST_ROUTING_KEY, (err)=> assert.ok not err?, err handler = (message,headers,info)=> received_count += 1 @@ -49,7 +49,11 @@ describe 'AmqpConsumer',-> if received_count is 3 amqpc.unsubscribe_from_queue subscription_tag, (err)-> assert.ok not err?, err - done() + amqpc.unbind_queue_from_exchange TEST_QUEUE, TEST_EXCHANGE, TEST_ROUTING_KEY, (err)=> + assert.ok not err?, err + amqpc.destroy_queue TEST_QUEUE, ()=> + amqpc.disconnect ()=> + done() else received_count.should.not.be.above 3 amqpc.subscribe_to_queue TEST_QUEUE, handler, (err, queue, queue_name, st)=> @@ -64,6 +68,7 @@ describe 'AmqpConsumer',-> received_count = 0 amqpc = new AmqpConsumer() subscription_tag = null + the_queue = null amqpc.connect TEST_BROKER, (err)=> assert.ok not err?, err amqpc.create_queue TEST_QUEUE, TEST_QUEUE_OPTIONS, TEST_EXCHANGE, TEST_ROUTING_KEY, (err)=> @@ -74,12 +79,14 @@ describe 'AmqpConsumer',-> if received_count is 3 amqpc.unsubscribe_from_queue subscription_tag, (err)-> assert.ok not err?, err - done() + amqpc.destroy_queue the_queue, ()=> + done() else received_count.should.not.be.above 3 amqpc.subscribe_to_queue TEST_QUEUE, {exclusive:true}, handler, (err, queue, queue_name, st)=> assert.ok not err?, err assert.ok st? + the_queue = queue subscription_tag = st @exchange.publish TEST_ROUTING_KEY, 'my-test-message-1' @exchange.publish TEST_ROUTING_KEY, 'my-test-message-2' @@ -410,161 +417,3 @@ describe 'AmqpConsumer',-> @exchange.publish TEST_ROUTING_KEY, 'my-test-message-1' @exchange.publish TEST_ROUTING_KEY, 'my-test-message-2' @exchange.publish TEST_ROUTING_KEY, 'my-test-message-3' - -# describe '[DEPRECATED] AmqpConsumer (old methods)',-> -# -# beforeEach (done)=> -# @connection = amqp.createConnection({url:TEST_BROKER}) -# @connection.once 'ready', ()=> -# @exchange = @connection?.exchange TEST_EXCHANGE, TEST_EXCHANGE_OPTIONS, ()=> -# done() -# -# afterEach (done)=> -# @exchange?.destroy(false) -# @exchange = null -# @connection?.end() -# @connection = null -# done() -# -# it 'can accept published messages',(done)=> -# received_count = 0 -# amqpc = new AmqpConsumer TEST_BROKER, null, TEST_QUEUE, TEST_QUEUE_OPTIONS, ()=> -# handler = (message,headers,info)=> -# received_count += 1 -# message.data.toString().should.equal "my-test-message-#{received_count}" -# if received_count is 3 -# amqpc.old_unsubscribe ()-> -# done() -# else -# received_count.should.not.be.above 3 -# amqpc.old_subscribe TEST_EXCHANGE, TEST_ROUTING_KEY, handler, ()=> -# @exchange.publish TEST_ROUTING_KEY, 'my-test-message-1' -# @exchange.publish TEST_ROUTING_KEY, 'my-test-message-2' -# @exchange.publish TEST_ROUTING_KEY, 'my-test-message-3' -# -# it 'supports optional arguments in the constructor',(done)=> -# received_count = 0 -# amqpc = new AmqpConsumer TEST_BROKER, TEST_QUEUE, ()=> -# handler = (message,headers,info)=> -# received_count += 1 -# message.data.toString().should.equal "my-test-message-#{received_count}" -# if received_count is 3 -# amqpc.old_unsubscribe ()-> -# done() -# else -# received_count.should.not.be.above 3 -# amqpc.old_subscribe TEST_EXCHANGE, TEST_ROUTING_KEY, handler, ()=> -# @exchange.publish TEST_ROUTING_KEY, 'my-test-message-1' -# @exchange.publish TEST_ROUTING_KEY, 'my-test-message-2' -# @exchange.publish TEST_ROUTING_KEY, 'my-test-message-3' -# -# it 'can defer connection to the message broker',(done)=> -# received_count = 0 -# amqpc = new AmqpConsumer() -# amqpc.old_connect TEST_BROKER, TEST_QUEUE, ()=> -# handler = (message,headers,info)=> -# received_count += 1 -# message.data.toString().should.equal "my-test-message-#{received_count}" -# if received_count is 3 -# amqpc.old_unsubscribe ()-> -# done() -# else -# received_count.should.not.be.above 3 -# amqpc.old_subscribe TEST_EXCHANGE, TEST_ROUTING_KEY, handler, ()=> -# @exchange.publish TEST_ROUTING_KEY, 'my-test-message-1' -# @exchange.publish TEST_ROUTING_KEY, 'my-test-message-2' -# @exchange.publish TEST_ROUTING_KEY, 'my-test-message-3' -# -# it 'can use a pre-established queue without specifying an exchange',(done)=> -# queue = @connection.queue TEST_QUEUE, TEST_QUEUE_OPTIONS, ()=> -# queue.once 'queueBindOk', ()=> -# received_count = 0 -# amqpc = new AmqpConsumer TEST_BROKER, null, TEST_QUEUE, TEST_QUEUE_OPTIONS, ()=> -# handler = (message,headers,info)=> -# received_count += 1 -# message.data.toString().should.equal "my-test-message-#{received_count}" -# if received_count is 3 -# amqpc.old_unsubscribe ()-> -# queue.destroy() -# done() -# else -# received_count.should.not.be.above 3 -# amqpc.old_subscribe handler, ()=> -# @exchange.publish TEST_ROUTING_KEY, 'my-test-message-1' -# @exchange.publish TEST_ROUTING_KEY, 'my-test-message-2' -# @exchange.publish TEST_ROUTING_KEY, 'my-test-message-3' -# queue.bind(TEST_EXCHANGE,TEST_ROUTING_KEY) -# -# -# it 'accepts "subscribe options" in subscribe method',(done)=> -# queue = @connection.queue TEST_QUEUE, TEST_QUEUE_OPTIONS, ()=> -# queue.once 'queueBindOk', ()=> -# received_count = 0 -# amqpc = new AmqpConsumer TEST_BROKER, null, TEST_QUEUE, TEST_QUEUE_OPTIONS, ()=> -# handler = (message,headers,info)=> -# received_count += 1 -# message.data.toString().should.equal "my-test-message-#{received_count}" -# if received_count is 3 -# amqpc.old_unsubscribe ()-> -# queue.destroy() -# done() -# else -# received_count.should.not.be.above 3 -# amqpc.old_subscribe {exclusive:true}, handler, ()=> -# @exchange.publish TEST_ROUTING_KEY, 'my-test-message-1' -# @exchange.publish TEST_ROUTING_KEY, 'my-test-message-2' -# @exchange.publish TEST_ROUTING_KEY, 'my-test-message-3' -# queue.bind(TEST_EXCHANGE,TEST_ROUTING_KEY) -# -# it 'can accept a JSON-valued message as a JSON object',(done)=> -# amqpc = new AmqpConsumer TEST_BROKER, null, TEST_QUEUE, TEST_QUEUE_OPTIONS, ()=> -# handler = (message,headers,info)-> -# message.foo.should.equal 'bar' -# message.a.should.equal 1 -# info.contentType.should.equal 'application/json' -# amqpc.old_unsubscribe ()-> -# done() -# amqpc.old_subscribe TEST_EXCHANGE, TEST_ROUTING_KEY, handler, ()=> -# @exchange.publish TEST_ROUTING_KEY, { foo:'bar', a:1 } -# -# it 'passes raw message from node-amqp to message handler',(done)=> -# amqpc = new AmqpConsumer TEST_BROKER, null, TEST_QUEUE, TEST_QUEUE_OPTIONS, ()=> -# handler = (message,headers,info,raw)-> -# message.foo.should.equal 'bar' -# message.a.should.equal 1 -# info.contentType.should.equal 'application/json' -# should.exist raw -# amqpc.old_unsubscribe ()-> -# done() -# amqpc.old_subscribe TEST_EXCHANGE, TEST_ROUTING_KEY, handler, ()=> -# @exchange.publish TEST_ROUTING_KEY, { foo:'bar', a:1 } -# -# it 'supports a payload converter for transforming messages before they are consumed',(done)=> -# amqpc = new AmqpConsumer TEST_BROKER, null, TEST_QUEUE, TEST_QUEUE_OPTIONS, ()=> -# amqpc.message_converter = (message)->message.data.toString().toUpperCase() -# handler = (message,headers,info)-> -# message.should.equal "THE QUICK BROWN FOX JUMPED." -# amqpc.old_unsubscribe ()-> -# done() -# amqpc.old_subscribe TEST_EXCHANGE, TEST_ROUTING_KEY, handler, ()=> -# @exchange.publish TEST_ROUTING_KEY, "the quick brown fox jumped." -# -# it 'AMQPJSONConsumer accept a JSON-valued message as a JSON object',(done)=> -# amqpc = new AMQPJSONConsumer TEST_BROKER, null, TEST_QUEUE, TEST_QUEUE_OPTIONS, ()=> -# handler = (message,headers,info)-> -# message.foo.should.equal 'bar' -# message.a.should.equal 1 -# info.contentType.should.equal 'application/json' -# amqpc.old_unsubscribe ()-> -# done() -# amqpc.old_subscribe TEST_EXCHANGE, TEST_ROUTING_KEY, handler, ()=> -# @exchange.publish TEST_ROUTING_KEY, { foo:'bar', a:1 } -# -# it 'AMQPStringConsumer can accept a Buffer-valued message as a String',(done)=> -# amqpc = new AMQPStringConsumer TEST_BROKER, null, TEST_QUEUE, TEST_QUEUE_OPTIONS, ()=> -# handler = (message,headers,info)-> -# message.should.equal "The quick brown fox jumped." #note that in the AmqpConsumer case, message.data.toString() would be needed instead -# amqpc.old_unsubscribe ()-> -# done() -# amqpc.old_subscribe TEST_EXCHANGE, TEST_ROUTING_KEY, handler, ()=> -# @exchange.publish TEST_ROUTING_KEY, "The quick brown fox jumped." diff --git a/test/test-amqp-producer.coffee b/test/test-amqp-producer.coffee index 8e58075..2f0253d 100644 --- a/test/test-amqp-producer.coffee +++ b/test/test-amqp-producer.coffee @@ -36,14 +36,14 @@ describe 'AMQPProducer',-> it "can publish messages",(done)=> # Thanks to the magic of node.js callbacks, this test case reads backwards. - + amqpp = null # In a moment we'll subscribe to messages from the Queue. # Once our subscription is set up, we will publish a couple of messages. @queue.once 'basicConsumeOk',()=> amqpp = new AMQPProducer() amqpp.connect TEST_BROKER, (err)=> assert.ok not err?, err - amqpp.create_exchange TEST_EXCHANGE, TEST_EXCHANGE_OPTIONS, (err)=> + amqpp.get_exchange TEST_EXCHANGE, TEST_EXCHANGE_OPTIONS, (err)=> assert.ok not err?, err amqpp.publish TEST_EXCHANGE, {body:"test-message"}, TEST_ROUTING_KEY, (err)-> should.not.exist err @@ -63,7 +63,8 @@ describe 'AMQPProducer',-> if received.length is 2 received[0].message.body.should.equal 'test-message' received[1].message.body.should.equal 'test-message' - done() + amqpp.disconnect ()=> + done() else received.length.should.be.above 0 received.length.should.not.be.above 2