Skip to content

Commit

Permalink
add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Rod committed Feb 15, 2018
1 parent 479ed9d commit 26c761e
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 356 deletions.
5 changes: 3 additions & 2 deletions lib/amqp-base.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ amqp = require 'amqp'

class AmqpBase

constructor:()->
undefined
constructor:(@connection)->
if @connection?
@_on_connect()

# Establish a new connection to the specified broker.
#
Expand Down
204 changes: 16 additions & 188 deletions lib/amqp-consumer.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -67,25 +67,8 @@ AmqpBase = require(path.join(LIB_DIR, 'amqp-base')).AmqpBase
#
class AmqpConsumer extends AmqpBase

_on_connect:(callback)=>
@queues_by_name ?= { }
@queue_names_by_subscription_tag ?= { }
callback?()

_on_disconnect:(callback)=>
@queues_by_name = undefined # TODO cleanly unsub from queues?
@queue_names_by_subscription_tag ?= { }
callback?()

@deprecation_warning_shown: false
@always_show_deprecation_warning: false

constructor:(args...)->
if args? and args.length > 0
if AmqpConsumer.always_show_deprecation_warning or not AmqpConsumer.deprecation_warning_shown
console.error "WARNING: Passing arguments to the AmqpConsumer constructor is deprecated. Please use the new API."
AmqpConsumer.deprecation_warning_shown = true
super()
super(args...)

# **message_converter** - *a utility method used to convert the message before consuming.*
#
Expand Down Expand Up @@ -156,7 +139,7 @@ class AmqpConsumer extends AmqpBase
callback new Error("Queue #{queue_name} not known.")
else
queue.destroy(options)
unless options.ifUnused or options.ifEmpty # when ifUnused or ifEmpty is set there doesn't seem to be any way to tell if the queue was actually destroyed, so keep the reference
unless options?.ifUnused or options?.ifEmpty # when ifUnused or ifEmpty is set there doesn't seem to be any way to tell if the queue was actually destroyed, so keep the reference
if queue_name?
delete @queues_by_name[queue_name]
callback undefined
Expand Down Expand Up @@ -406,150 +389,15 @@ class AmqpConsumer extends AmqpBase
queue.__amqp_util_queue_name = queue_name
callback? undefined, queue, false

_on_connect:(callback)=>
@queues_by_name ?= { }
@queue_names_by_subscription_tag ?= { }
callback?()

##############################################################################
##############################################################################
##############################################################################
# ##############################################################################
#
# # if typeof impl_options is 'function' and not callback?
# # callback = impl_options
# # impl_options = null
# # if typeof connection_options is 'function' and not callback?
# # callback = connection_options
# # connection_options = null
# # if typeof connection_options is 'function' and not callback?
# # callback = connection_options
# # connection_options = null
# # if typeof broker_url isnt 'string' and not connection_options?
# # connection_options = broker_url
# # broker_url = null
# #
# # @connection = amqp.createConnection({url:connection},connection_options)
# # @connection.on 'error', (err)=>
# # console.error "error",err
# # @connection.once 'ready', ()=>
# # @queue = @connection.queue queue, queue_options, (response...)=>
# # callback?(null,response...)
# #
#
# # **connect** - *connects to a new or existing AMQP exchange.*
# #
# # Accepts four arguments:
# #
# # - `broker_url` is the URL by which to connect to the message broker
# # (e.g., `amqp://guest:guest@localhost:5672`)
# #
# # - `connection_options` is a partially AMQP-implementation-specific map of
# # options. See
# # [postwait's node-amqp documentation](https://github.com/postwait/node-amqp/#connection-options-and-url)
# # for details.
# #
# # - `queue` is the name of the AMQP Queue on which to listen.
# #
# # - `queue_options` is an optional map of additional AMQP queue options (see
# # [node-ampq's documentation](https://github.com/postwait/node-amqp/#connectionqueuename-options-opencallback)
# # for details).
# #
# # - `callback` is an optional function that will be invoked once the consumer
# # is ready for use.
# old_connect:(args...)=>
# if AmqpConsumer.always_show_deprecation_warning or not AmqpConsumer.deprecation_warning_shown
# console.error "WARNING: The AmqpConsumer.old_connect method is deprecated. Please use the new API."
# AmqpConsumer.deprecation_warning_shown = true
# # Parse out the method parameters, allowing optional values.
# connection = args.shift()
# if args.length > 0 and ((not args[0]?) or typeof args[0] is 'object')
# connection_options = args.shift()
# if args.length > 0 and ((not args[0]?) or typeof args[0] is 'string')
# queue = args.shift()
# if args.length > 0 and ((not args[0]?) or typeof args[0] is 'object')
# queue_options = args.shift()
# else
# queue_options = {}
# if args.length > 0 and ((not args[0]?) or typeof args[0] is 'function')
# callback = args.shift()
# @connection = amqp.createConnection({url:connection},connection_options)
# @connection.on 'error', (err)=>
# console.log "error",err
# @connection.once 'ready', ()=>
# @queue = @connection.queue queue, queue_options, (response...)=>
# callback?(null,response...)
#
# # **subscribe** - *start listening for incoming messages.*
# #
# # The method takes two or four parameters:
# #
# # - `exchange_name` is the name of the AMQP exchange to bind to.
# #
# # - `pattern` is a routing-pattern to be used to filter the messages. (See
# # [node-ampq's documentation](https://github.com/postwait/node-amqp/#queuebindexchange-routing)
# # for details.)
# #
# # - `callback` is the "handle message" function to invoke when a message
# # is received. The callback has the following signature:
# #
# # callback(message, headers, deliveryInfo);
# #
# # (See
# # [node-ampq's documentation](https://github.com/postwait/node-amqp/#queuesubscribeoptions-listener)
# # for details.)
# #
# # - `done` is a callback method that is invoked when the subscription is
# # established and active.
# #
# # The `exchange_name` and `pattern` are optional. When present, I will attempt
# # to bind to the specified exchange. When absent, the queue should already
# # be bound to some exchange.
# #
# old_subscribe:(args...)=> # args:= exchange_name,pattern,subscribe_options,callback,done
# if AmqpConsumer.always_show_deprecation_warning or not AmqpConsumer.deprecation_warning_shown
# console.error "WARNING: The AmqpConsumer.old_subscribe method is deprecated. Please use the new API."
# AmqpConsumer.deprecation_warning_shown = true
# if args.length > 0 and ((not args[0]?) or typeof args[0] is 'string')
# exchange_name = args.shift()
# if args.length > 0 and ((not args[0]?) or typeof args[0] is 'string')
# pattern = args.shift()
# if args.length > 0 and ((not args[0]?) or typeof args[0] is 'object')
# subscribe_options = args.shift()
# if args.length > 0 and ((not args[0]?) or typeof args[0] is 'function')
# callback = args.shift()
# if args.length > 0 and ((not args[0]?) or typeof args[0] is 'function')
# done = args.shift()
# if exchange_name?
# @old_bind exchange_name, pattern, ()=>
# @_inner_subscribe(subscribe_options,callback,done)
# else
# @_inner_subscribe(subscribe_options,callback,done)
#
# _inner_subscribe:(subscribe_options,callback,done)=>
# @queue.once 'basicConsumeOk',()=>
# done?()
# @queue.subscribe(
# subscribe_options,
# (m,h,i,x...)=>callback(@message_converter(m),h,i,x...)
# ).addCallback(
# (ok)=>@subscription_tag = ok.consumerTag
# )
#
# old_bind:(exchange_name,pattern,callback)=>
# if AmqpConsumer.always_show_deprecation_warning or not AmqpConsumer.deprecation_warning_shown
# console.error "WARNING: The AmqpConsumer.old_bind method is deprecated. Please use the new API."
# AmqpConsumer.deprecation_warning_shown = true
# @queue.once 'queueBindOk', ()=>callback()
# @queue.bind(exchange_name,pattern)
#
# # **unsubscribe** - *stop listening for incoming messages.*
# old_unsubscribe:(callback)=>
# if AmqpConsumer.always_show_deprecation_warning or not AmqpConsumer.deprecation_warning_shown
# console.error "WARNING: The AmqpConsumer.old_unsubscribe method is deprecated. Please use the new API."
# AmqpConsumer.deprecation_warning_shown = true
# try
# @queue.unsubscribe(@subscription_tag).addCallback ()=>
# @subscription_tag = null
# callback?()
# catch err
# callback?(err)
_on_disconnect:(callback)=>
@queues_by_name = undefined # TODO cleanly unsub from queues?
@queue_names_by_subscription_tag ?= { }
callback?()

# ███████ ██ ██ ██████ ██████ ██ █████ ███████ ███████ ███████ ███████
# ██ ██ ██ ██ ██ ██ ██ ██ ██ ██ ██ ██ ██
Expand All @@ -560,32 +408,12 @@ class AmqpConsumer extends AmqpBase
# **AmqpStringConsumer** - *an `AmqpConsumer` that automatically converts inbound messages from Buffers into Strings.*
class AmqpStringConsumer extends AmqpConsumer

# **constructor** - *create a new `AmqpStringConsumer`.*
#
# Accepts 0 to 7 parameters:
#
# - `encoding` - the encoding to use when converting from bytes to characters.
# - others - when present, passed to `connect`
#
# When invoked with fewer than two, the `AmqpConsumer` instance is created
# but no connection is established. (A connection can be established later
# by calling `connect`.)
#
# When 2 or more arguments *are* passed, they are immediately passed to the
# `connect` function.
#
constructor:(@encoding,connection,connection_options,queue,queue_options,callback)->
if typeof queue_options is 'function' and (not callback?)
callback = queue_options
queue_options = queue
queue = connection_options
connection_options = connection
connection = @encoding
@encoding = null
if connection? or queue?
super(connection,connection_options,queue,queue_options,callback)
else
super()
constructor:(args...)->
if args?.length > 0 and (typeof args[0] is 'object' or not args[0]?)
connection = args.shift()
if args?.length > 0 and (typeof args[0] is 'string' or not args[0]?)
@encoding = args.shift()
super(connection)

# **message_converter** - *converts a Buffer to a String.*
message_converter:(msg)=>
Expand Down
4 changes: 2 additions & 2 deletions lib/amqp-producer.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ AmqpBase = require(path.join(LIB_DIR, 'amqp-base')).AmqpBase

class AmqpProducer extends AmqpBase

constructor:()->
super()
constructor:(args...)->
super(args...)

# **default_routing_key** - *the default key value to use in `publish`.*
default_routing_key: null
Expand Down
Loading

0 comments on commit 26c761e

Please sign in to comment.