simple rabbitmq listener in the style of an express app
Start a server that consumes messages from a rabbitmq server, listening to one or more topics coming from one queue, powered by amqplib.
Defines the topics you want to listen to as express routes and add middlewares to them, just like you would to an express server.
It uses middlewary
const rabbitExpress = require('rabbitmq-express');
const server = rabbitExpress();
server.use((req, res, next) => {
console.log('Shared middleware');
next();
});
server.use('my-test-topic', (req, res, next) => {
console.log('my-test-topic middleware');
res.end();
});
server.use('another-test-topic', (req, res, next) => {
console.log('another-test-topic first middleware');
next();
}, (req, res, next) => {
console.log('another-test-topic second middleware');
res.end();
});
server.use('test.topic.*', (req, res, next) => {
console.log('all topics that start by test.topic middleware');
res.status(201).end();
});
server.use((err, req, res, next) => {
console.log('Global error middleware');
res.status(500).end(err);
});
server.listen({
rabbitURI: 'amqp://myuser:mypassword@localhost:5672',
exchange: 'myexchange',
queue: 'myqueue',
consumerOptions: { noAck: true },
});
If you are unfamiliar with how express middlewares work, I would suggest you read the Express doc, here is just a reminder that you need to call either "next()" or "res.end()" in each of your middleware.
For obvious reasons the request and reponse objects you receive in your middlewares are different than for a HTTP request:
const {
app, // The rabbitmq-express running application
raw, // raw rabbitmq message (rabbit message)
properties, // message properties if present (object)
fields, // message fields if present (object)
topic, // name of the topic (routing key) sending the message (string)
path, // name of the topic (routing key) sending the message (string)
value, // message content (string)
body, // message content converted to JSON if possible (object)
params, // converted params from the topic name if present (object)
res, // the response object
next, // the next callback
} = req;
const {
rabbitChannel, // The amqplib channel object in case you to need to interact with it in your handlers (channel object)
acknowledge, // flag to trigger the message acknowledgement at the end of the cycle, coming from the consumer options (boolean)
req, // The request object
statusCode, // Status code for the response (default to 200)
end(), // function to call to end the request-response cycle
status(), // Sets the status for the response. It is chainable.
} = res;
The response object is an event dispatcher and emits the 'finish' event when the request cycle ends.
The "Topic" object replaces the Router object. Note that you can mount a topic to another topic to create chained topics but only those that have a mounted middleware will be subscribed to the rabbit exchange.
const rabbitExpress = require('rabbitmq-express');
const { Topic } = rabbitExpress;
const server = rabbitExpress();
const testTopic = new Topic('test');
const outTopic = new Topic(':myparam');
const noMiddlewareTopic = new Topic('no');
testTopic.use((req, res) => {
console.log('hello test');
next();
});
outTopic.use((req, res) => {
console.log('hello out', req.params.myparam);
res.end();
});
outTopic.use(noMiddlewareTopic);
testTopic.use(outTopic);
server.use(testTopic);
// defined topics here are ['test', 'test.:myparam']
console.log(server.topics);
// defined patterns here are ['test', 'test.*']
console.log(server.patterns);
You can subscribe to topics using a Regexp and including parameters (i.e 'topic.name.:param'), these parameters will be added to the request object.
Note that the routing on the rabbit server side is dependent on the type of exchange used.
The "patterns" property of the application is a translation of the middleware paths to the amqp pattern used when subscribing to "topic" exchanges. Here are some examples:
topics (regexp) patterns
:userId -> *
:userId.* -> #
*.type -> #.type
test.:type -> test.*
topic.:type.test -> topic.*.test
topic.:type.:userId.test -> topic.#.test
The "topics" property of the application are the middleware paths, it is used when subscribing to "direct" exchanges since the routing is done on exact match. In that case your topics shouldn't declare parameters.
The listen
function is called with a configuration object that can declare 5 properties:
{
rabbitURI,
exchange,
exchangeType,
queue,
consumerOptions
}
this field is mandatory.
It is either an amqp URI in string format or an amqplib (opened) connection object, in case you want to share a connection between rabbitmq-express servers.
This field is optional.
It is either the name of an exchange in string format or an exchange options object with a name added, here are the default values:
{
name: '',
durable: true,
internal: false,
autoDelete: false,
alternateExchange: undefined,
arguments: undefined,
}
All fields are optional.
If there is no name for the exchange the server will use rabbitmq default exchange.
This field is optional.
One of the 4 type of exchange in string format (i.e. 'fanout', 'direct', 'topic', 'header').
It defaults to 'topic'.
This field is optional.
It is either the name of queue in string format or a queue options object with a name added, here are the default values:
{
name: '',
durable: true,
exclusive: false,
autoDelete: false,
arguments: undefined,
}
All fields are optional.
If there is no name for the queue the server will assign a new one.
This field is optional.
A consumer options object here are the default values:
{
consumerTag: '',
noAck: false,
exclusive: false,
priority: 0,
arguments: undefined,
}
All fields are optional.
Note that noAck
value is used by the response to automatically send, or not, the acknowledgement at the end of the cycle.