-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathamqp-producer.coffee
168 lines (150 loc) · 6.68 KB
/
amqp-producer.coffee
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
fs = require 'fs'
path = require 'path'
HOME_DIR = path.join __dirname, '..'
LIB_COV = path.join HOME_DIR, 'lib-cov'
LIB = path.join HOME_DIR, 'lib'
LIB_DIR = if fs.existsSync(LIB_COV) then LIB_COV else LIB
################################################################################
amqp = require 'amqp'
RandomUtil = require('inote-util').RandomUtil
AsyncUtil = require('inote-util').AsyncUtil
LogUtil = require('inote-util').LogUtil
process = require 'process'
################################################################################
AmqpBase = require(path.join(LIB_DIR, 'amqp-base')).AmqpBase
################################################################################
DEBUG = /(^|,)((all)|(amqp-?util)|(amqp-?producer))(,|$)/i.test process.env.NODE_DEBUG # add `amqp-util` or `amqp-producer` to NODE_DEBUG to enable debugging output
LogUtil = LogUtil.init({debug:DEBUG, prefix: "AmqpProducer:"})
class AmqpProducer extends AmqpBase
constructor:(args...)->
super(args...)
# **default_routing_key** - *the default key value to use in `publish`.*
default_routing_key: null
# **default_publish_options** - *map of the default publishing options for `publish`.*
default_publish_options: null
# **set_default_publish_option** - *sets one of the default publishing options.*
set_default_publish_option:(name,value)=>
@default_publish_options ?= {}
@default_publish_options[name] = value
# **set_default_publish_header** - *sets one of the default publishing headers.*
set_default_publish_header:(name,value)=>
@default_publish_options ?= {}
@default_publish_options.headers ?= {}
@default_publish_options.headers[name] = value
# **payload_converter** - *a utility method used to convert the payload before publishing.*
#
# (The default method is the identity function, no conversion occurs.)
payload_converter:(payload)=>payload
_on_connect:(callback)=>
@exchanges_by_name ?= { }
callback?()
_on_disconnect:(callback)=>
@exchanges_by_name = undefined
callback?()
# args:
# - exchange_name
# - exchange_options
# - callback
create_exchange:(exchange_name, exchange_options, callback)=>
if typeof exchange_options is 'function' and not callback?
callback = exchange_options
exchange_options = undefined#
unless @connection?
callback? new Error("Not connected.")
return undefined
else
LogUtil.tpdebug "Creating (or fetching) the exchange named `#{exchange_name}`..."
if @exchanges_by_name[exchange_name]?
LogUtil.tpdebug "...found in cache."
callback?(undefined,@exchanges_by_name[exchange_name],exchange_name,true)
else
LogUtil.tpdebug "...not found in cache, creating or fetching from AMQP server..."
exchange = @connection.exchange exchange_name, exchange_options, (x...)=>
LogUtil.tpdebug "...created or fetched."
exchange.__amqp_util_exchange_name = exchange_name
@exchanges_by_name[exchange_name] = exchange
callback?(undefined,exchange,exchange_name,false)
return exchange_name
get_exchange:(exchange_name, exchange_options, callback)=>
@create_exchange exchange_name, exchange_options, callback
# args: exchange_or_exchange_name, payload, routing_key, publish_options, callback
publish:(args...)=>
if args?.length > 0 and (typeof args[0] is 'string' or not args[0]?)
exchange_or_exchange_name = args.shift()
else if args?.length > 0 and typeof args[0] is 'object' and @_object_is_exchange(args[0])
exchange_or_exchange_name = args.shift()
if args?.length > 0
payload = args.shift()
if args?.length > 0 and (typeof args[0] is 'string' or not args[0]?)
routing_key = args.shift()
if args?.length > 0 and (typeof args[0] is 'object' or not args[0]?)
publish_options = args.shift()
if args?.length > 0 and (typeof args[0] is 'function' or not args[0]?)
callback = args.shift()
#
@_maybe_create_exchange exchange_or_exchange_name, (err, exchange, exchange_name)=>
if err?
callback?(err)
else
unless routing_key?
routing_key = @default_routing_key
unless publish_options?
publish_options = @default_publish_options
payload = @payload_converter(payload)
LogUtil.tpdebug "Publishing a payload of type `#{typeof payload}` to the exchange named `#{exchange_name}` with routing key `#{routing_key}`."
exchange.publish routing_key, payload, publish_options, (error_occured)=>
if error_occured
callback?(error_occured)
else
callback?(null)
get_exchange_by_name:(exchange_name)=>
return @exchanges_by_name[exchange_name]
_maybe_create_exchange:(exchange_or_exchange_name, args..., callback)=>
[exchange, exchange_name] = @_to_exchange_exchange_name_pair exchange_or_exchange_name
if exchange?
callback? undefined, exchange, exchange_name, true
else
@create_exchange exchange_name, args..., callback
_to_exchange_exchange_name_pair:(exchange_or_exchange_name)=>
if typeof exchange_or_exchange_name is 'string'
exchange_name = exchange_or_exchange_name
exchange = @exchanges_by_name[exchange_or_exchange_name] ? undefined
else if @_object_is_exchange exchange_or_exchange_name
exchange = exchange_or_exchange_name
exchange_name = exchange?.__amqp_util_exchange_name ? undefined
else
exchange_name = undefined
exchange = undefined
return [exchange, exchange_name]
# Exported as `AMQPProducer`.
exports.AMQPProducer = exports.AmqpProducer = AmqpProducer
# When loaded directly, use `AMQPProducer` to publish a simple message.
#
# Accepts up to 4 command line parameters:
# - the payload (message contents)
# - the routing key
# - the exchange name
# - the broker URI
#
if require.main is module
payload = (process.argv?[2]) ? 'Example Payload'
key = (process.argv?[3]) ? 'amqp-demo-queue'
exchange_name = (process.argv?[4]) ? 'foobar' #'amq.topic'
broker_url = (process.argv?[5]) ? 'amqp://guest:guest@localhost:5672'
producer = new AmqpProducer()
# producer.set_default_publish_option("confirm", true)
producer.connect broker_url, (err)->
if err?
console.error err
process.exit 1
else
console.log "AmqpProducer connected to broker at \"#{broker_url}\"."
producer.create_exchange exchange_name, {confirm:true}, (err)->
if err?
console.error err
process.exit 1
else
console.log "AmqpProducer fetched or created exchange \"#{exchange_name}\"."
producer.publish exchange_name, payload, key, {}, ()=>
console.log "Confirmed."
process.exit()