Skip to content

Commit

Permalink
add optional debugging output; add note to README about enabling debu…
Browse files Browse the repository at this point in the history
…gging output
  • Loading branch information
Rod committed Feb 16, 2018
1 parent fc7bbf7 commit 181992d
Show file tree
Hide file tree
Showing 6 changed files with 129 additions and 41 deletions.
22 changes: 15 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,21 @@

*amqp-util* depends heavily on [*node-amqp*](https://github.com/postwait/node-amqp) (which see). This module provides utilities and base-classes which wrap *node-amqp* in support of common use-cases.

## VERSION NOTE:

Version 2.x of `amqp-util` changes some parts of the API but this README file currently describes the amqp-util v1.x.

In the meantime please see the comments in `amqp-consumer.coffee`, `amqp-producer.coffee` and `amqp-base.coffee` for details and examples for the new API.

---

NOTES:

* Include `all`, `amqp-util`, `amqp-base`, `amqp-consumer` or `amqp-producer` in the `NODE_DEBUG` environment variable to enable various categories of debugging output. (Case insensitive; the `-` is optional; NODE_DEBUG is treated as a comma-delimited string)

---


<!-- toc -->

- [AMQP-UTIL](#amqp-util)
Expand All @@ -24,13 +39,6 @@

<!-- tocstop -->

## PLEASE NOTE:

Version 2.x of `amqp-util` changes some parts of the API but this README file currently describes the amqp-util v1.x.

In the meantime please see the comments in `amqp-consumer.coffee`, `amqp-producer.coffee` and `amqp-base.coffee` for details and examples for the new API.


## Installing

### Via npm
Expand Down
19 changes: 15 additions & 4 deletions lib/amqp-base.coffee
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
amqp = require 'amqp'
LogUtil = require('inote-util').LogUtil
DEBUG = /(^|,)((all)|(amqp-?util)|(amqp-?base))(,|$)/i.test process.env.NODE_DEBUG # add `amqp-util` or `amqp-base` to NODE_DEBUG to enable debugging output
LogUtil = LogUtil.init({debug:DEBUG, prefix: "AmqpBase:"})

class AmqpBase

constructor:(@connection)->
if @connection?
LogUtil.tpdebug "Shared connection passed to AmqpBase constructor."
@connection_shared = true
@_on_connect()
else
Expand Down Expand Up @@ -41,7 +45,7 @@ class AmqpBase
if broker_url?
connection_options.url = broker_url
error_handler ?= (err)->
console.error "ERROR:", err
LogUtil.tperr "amqp-connection emitted an error:", err
# check input parameters
unless connection_options.url?
callback? new Error("Expected a broker URL value.")
Expand All @@ -50,12 +54,14 @@ class AmqpBase
if @connection?
callback? new Error("Already connected; please disconnect first.")
else
LogUtil.tpdebug "Connecting to #{broker_url}..."
# create the connection
called_back = false
@connection = amqp.createConnection connection_options, impl_options
@connection.once 'error', (err)=>
unless called_back
called_back = true
LogUtil.tpdebug "...encountered error while connecting:", err
callback? err, undefined
callback = undefined
if error_handler?
Expand All @@ -64,6 +70,7 @@ class AmqpBase
@_on_connect ()->
unless called_back
called_back = true
LogUtil.tpdebug "...successfully connected."
callback? undefined, @connection
callback = undefined
return @connection
Expand All @@ -73,19 +80,24 @@ class AmqpBase
callback = force
force = undefined
if @connection_shared and not force
callback(new Error("This class did not create the current connection and hence will not disconnect from it unless `true` is passed as the `force` parameter."))
err = new Error("This class did not create the current connection and hence will not disconnect from it unless `true` is passed as the `force` parameter.")
LogUtil.tpdebug "Asked to disconnect an amqp-connection that this class did not create.", err
callback(err)
else
LogUtil.tpdebug "Disconnecting..."
if @connection?.disconnect?
@_on_disconnect ()=>
@connection.disconnect()
@connection_shared = false
@connection = undefined
LogUtil.tpdebug "...disconnected."
callback?(undefined, true)
return true
else
@_on_disconnect ()=>
@connection_shared = false
@connection = undefined
LogUtil.tpdebug "...not connected in the first place."
callback?(undefined, false)
return false

Expand All @@ -99,7 +111,6 @@ class AmqpBase
return obj?.constructor?.name is 'Queue'

_object_is_exchange:(obj)->
return obj?.constructor?.name is 'Excha'

return obj?.constructor?.name is 'Exchange'

exports.AMQPBase = exports.AmqpBase = AmqpBase
52 changes: 45 additions & 7 deletions lib/amqp-consumer.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,14 @@ LIB_DIR = if fs.existsSync(LIB_COV) then LIB_COV else LIB
amqp = require 'amqp'
RandomUtil = require('inote-util').RandomUtil
AsyncUtil = require('inote-util').AsyncUtil
LogUtil = require('inote-util').LogUtil
process = require 'process'
################################################################################
AmqpBase = require(path.join(LIB_DIR, 'amqp-base')).AmqpBase
################################################################################
DEBUG = /(^|,)((all)|(amqp-?util)|(amqp-?consumer))(,|$)/i.test process.env.NODE_DEBUG # add `amqp-util` or `amqp-consumer` to NODE_DEBUG to enable debugging output
LogUtil = LogUtil.init({debug:DEBUG, prefix: "AmqpConsumer:"})
################################################################################

# An AMQP message consumer.
#
Expand Down Expand Up @@ -114,12 +118,19 @@ class AmqpConsumer extends AmqpBase
callback? new Error("Not connected."), undefined, undefined, undefined, undefined
return undefined
else
LogUtil.tpdebug "Creating (or fetching) the queue named `#{queue_name}`..."
@_make_or_get_queue queue_name, queue_options, (err, queue, queue_was_cached)=>
if err?
LogUtil.tpdebug "...encountered error:", err
callback? err, queue, queue_name, undefined, undefined
else unless queue?
LogUtil.tpdebug "...no queue was generated for some reason, but also no error was generated."
callback?(new Error("Unable to create queue for unknown reasons"), queue, queue_name)
else
if queue_was_cached
LogUtil.tpdebug "...pre-existing queue was found in cache."
else
LogUtil.tpdebug "...queue was created or fetched from AMQP server."
if (not queue_was_cached) and (bind_pattern? or exchange_name?)
@bind_queue_to_exchange queue, exchange_name, bind_pattern, (err, queue, exchange_name, bind_pattern)->
callback? err, queue, queue_name, exchange_name, bind_pattern
Expand Down Expand Up @@ -163,15 +174,18 @@ class AmqpConsumer extends AmqpBase
unless queue? and bind_pattern?
callback?(new Error("Expected queue and bind-pattern"))
else
LogUtil.tpdebug "Binding queue named `#{queue_name}` to the exchange named `#{exchange_name}` with bind-pattern `#{bind_pattern}`..."
called_back = false
queue.once 'error', (err)=>
unless called_back
called_back = true
LogUtil.tpdebug "...encountered error:", err
callback?( err, queue, exchange_name, bind_pattern )
callback = undefined
queue.once 'queueBindOk', ()=>
unless called_back
called_back = true
LogUtil.tpdebug "...queueBindOk."
callback?( undefined, queue, exchange_name, bind_pattern )
callback = undefined
queue.bind exchange_name, bind_pattern
Expand All @@ -188,15 +202,18 @@ class AmqpConsumer extends AmqpBase
unless queue? and bind_pattern?
callback?(new Error("Expected queue and bind-pattern"))
else
LogUtil.tpdebug "Unbinding queue named `#{queue_name}` from the exchange named `#{exchange_name}` with bind-pattern `#{bind_pattern}`..."
called_back = false
queue.once 'error', (err)=>
unless called_back
called_back = true
LogUtil.tpdebug "...encountered error:", err
callback?( err, queue, exchange_name, bind_pattern )
callback = undefined
queue.once 'queueUnbindOk', ()=>
unless called_back
called_back = true
LogUtil.tpdebug "...queueUnbindOk."
callback?( undefined, queue, exchange_name, bind_pattern )
callback = undefined
queue.unbind exchange_name, bind_pattern
Expand All @@ -222,13 +239,15 @@ class AmqpConsumer extends AmqpBase
unless queue? and message_handler?
callback?(new Error("Expected queue and message-handler"), undefined, undefined, undefined)
else
LogUtil.tpdebug "Subscribing to queue named #{queue_name}..."
called_back = false
basic_consume_ok = false
timer_one = null
timer_two = null
queue.once 'error', (err)=>
unless called_back
called_back = true
LogUtil.tpdebug "...encountered error:", err
callback?( err, queue, queue_name, undefined )
callback = undefined
queue.once 'basicConsumeOk', ()=>
Expand All @@ -237,14 +256,15 @@ class AmqpConsumer extends AmqpBase
DELAY_TWO = DELAY_ONE*2
timer_one = AsyncUtil.wait DELAY_ONE, ()->
unless called_back
console.log "WARNING: basicConsumeOk event emitted but the consumerTag callback was not called within #{DELAY_ONE} milliseconds."
LogUtil.tpdebug "WARNING: basicConsumeOk event emitted but the consumerTag callback was not called within #{DELAY_ONE} milliseconds."
AsyncUtil.cancel_wait timer_one
timer_two = AsyncUtil.wait DELAY_TWO, ()->
unless called_back
console.log "WARNING: basicConsumeOk event emitted but the consumerTag callback was not called within #{DELAY_TWO} milliseconds. Calling-back regardless."
LogUtil.tpwarn "WARNING: basicConsumeOk event emitted but the consumerTag callback was not called within #{DELAY_TWO} milliseconds. Calling-back regardless."
AsyncUtil.cancel_wait timer_one
AsyncUtil.cancel_wait timer_two
called_back = true
LogUtil.tpdebug "...subscribed but no consumerTag was returned in #{DELAY_TWO} milliseconds, giving up waiting on consumerTag."
callback?( err, queue, queue_name, undefined )
callback = undefined
# console.log queue.subscribe(subscription_options, ((message,tail...)=>message_handler(@message_converter(message), tail...)))
Expand All @@ -255,18 +275,25 @@ class AmqpConsumer extends AmqpBase
called_back = true
if ok?.consumerTag? and queue_name?
@queue_names_by_subscription_tag[ok?.consumerTag] = queue_name
LogUtil.tpdebug "...subscribed with consumerTag #{ok?.consumerTag}."
callback?( undefined, queue, queue_name, ok?.consumerTag )
callback = undefined

unsubscribe_from_queue:(subscription_tag, callback)=>
subscription_tag = @_resolve_subscription_tag_alias subscription_tag
LogUtil.tpdebug "Unsubscribing from consumerTag `#{subscription_tag}`..."
[subscription_tag, chain] = @_resolve_subscription_tag_alias subscription_tag
if chain?.length > 1
LogUtil.tpdebug "...after de-aliasing, consumerTag resolved to `#{subscription_tag}` via chain #{JSON.stringify(chain)}..."
queue = @get_queue_for_subscription_tag subscription_tag
unless queue?
callback? new Error("No queue found for subscription_tag #{subscription_tag}.")
LogUtil.tpdebug "...no associated queue found for `#{subscription_tag}`, calling back with error."
callback? new Error("No queue found for subscription_tag #{subscription_tag}."), subscription_tag, chain
else
queue.unsubscribe(subscription_tag)
delete @queue_names_by_subscription_tag[subscription_tag]
delete @subscription_tag_aliases[subscription_tag]
for tag in chain
delete @queue_names_by_subscription_tag[tag]
delete @subscription_tag_aliases[tag]
LogUtil.tpdebug "...successfully unsubscribed."
callback? undefined

# Subscribes the given `message_handler` to the specified queue, creating a
Expand Down Expand Up @@ -319,13 +346,20 @@ class AmqpConsumer extends AmqpBase
else
throw err
else
#
if typeof queue_or_queue_name is 'string'
LogUtil.tpdebug "Subscribing to the queue named #{queue_or_queue_name} via the queue+bind+subscribe convenience method..."
else
LogUtil.tpdebug "Subscribing to a queue object (name=#{queue_or_queue_name?.__amqp_util_queue_name}?) via the queue+bind+subscribe convenience method..."
@_maybe_create_queue queue_or_queue_name, queue_options, exchange_name, bind_pattern, (err, queue)=>
if err?
LogUtil.tpdebug "...encountered error:", err
if callback?
callback err
else
throw err
else
LogUtil.tpdebug "...queue created (and optionally bound) or fetched from cache. Subscribing."
@subscribe_to_queue (queue ? queue_name), subscription_options, message_handler, callback


Expand Down Expand Up @@ -409,13 +443,17 @@ class AmqpConsumer extends AmqpBase


_resolve_subscription_tag_alias:(tag)=>
chain = []
while @subscription_tag_aliases?[tag]?
chain.push tag
tag = @subscription_tag_aliases[tag]
return tag
chain.push tag
return [tag, chain]

_handle_tag_change:(event)=>
# if the given event is valid and referneces an oldConsumerTag we're tracking
if event?.oldConsumerTag? and event?.consumerTag? and (@subscription_tag_aliases?[event.oldConsumerTag]? or @queue_names_by_subscription_tag?[event.oldConsumerTag]?)
LogUtil.tpdebug "handling tag.change event #{JSON.stringify(event)}."
if @subscription_tag_aliases?
@subscription_tag_aliases[event.oldConsumerTag] = event.consumerTag
if @queue_names_by_subscription_tag?
Expand Down
9 changes: 8 additions & 1 deletion lib/amqp-producer.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,13 @@ LIB_DIR = if fs.existsSync(LIB_COV) then LIB_COV else LIB
amqp = require 'amqp'
RandomUtil = require('inote-util').RandomUtil
AsyncUtil = require('inote-util').AsyncUtil
LogUtil = require('inote-util').LogUtil
process = require 'process'
################################################################################
AmqpBase = require(path.join(LIB_DIR, 'amqp-base')).AmqpBase
################################################################################
DEBUG = /(^|,)((all)|(amqp-?util)|(amqp-?producer))(,|$)/i.test process.env.NODE_DEBUG # add `amqp-util` or `amqp-producer` to NODE_DEBUG to enable debugging output
LogUtil = LogUtil.init({debug:DEBUG, prefix: "AmqpProducer:"})

class AmqpProducer extends AmqpBase

Expand Down Expand Up @@ -61,11 +64,14 @@ class AmqpProducer extends AmqpBase
callback? new Error("Not connected.")
return undefined
else
LogUtil.tpdebug "Creating (or fetching) the exchange named `#{exchange_name}`..."
if @exchanges_by_name[exchange_name]?
LogUtil.tpdebug "...found in cache."
callback?(undefined,@exchanges_by_name[exchange_name],exchange_name,true)
else
called_back = false
LogUtil.tpdebug "...not found in cache, creating or fetching from AMQP server..."
exchange = @connection.exchange exchange_name, exchange_options, (x...)=>
LogUtil.tpdebug "...created or fetched."
exchange.__amqp_util_exchange_name = exchange_name
@exchanges_by_name[exchange_name] = exchange
callback?(undefined,exchange,exchange_name,false)
Expand Down Expand Up @@ -98,6 +104,7 @@ class AmqpProducer extends AmqpBase
unless publish_options?
publish_options = @default_publish_options
payload = @payload_converter(payload)
LogUtil.tpdebug "Publishing a payload of type `#{typeof payload}` to the exchange named `#{exchange_name}` with routing key `#{routing_key}`."
exchange.publish routing_key, payload, publish_options, (error_occured)=>
if error_occured
callback?(error_occured)
Expand Down
Loading

0 comments on commit 181992d

Please sign in to comment.