This library provides a high-level API for consuming messages from a rabbitmq exchange with sidekiq-like behavior.
npm install -S @adwerx/rabbitmq-worker amqplib
Create a worker instance and use the configuration methods to indicate the exchange to bind to, a perform function to process messages, an error handler, and provide a connection factory function.
Call start
with an AbortSignal
to start consuming messages and use the abort signal to stop the worker.
import Worker from "rabbitmq-worker";
import amqplib from "amqplib";
Worker.toConnect(() => amqplib.connect(process.env.AMQP_URL));
const controller = new AbortController();
process.once("SIGINT", () => controller.abort());
await new Worker()
.from("events")
.perform(async (data, { fields, properties }) => {
// ... do work
// data is a *parsed* message
})
.on("error", console.error(err))
.start(controller.signal);
A worker must be configured with an exchange to source messages from. Use worker.from(exchangeName)
to set the exchange that the worker's queue will be bound to.
By default, starting a worker asserts a queue specifically for this worker. The default queue name is the same as the exchange name. You can indicate the queue name to be used if desired by calling worker.bind(queueName)
.
A worker must be given a perform function which will be called for each message retrieved. Use worker.perform(fn)
to provide a perform function. The perform function can be sync or async.
A worker must have an amqp connection factory function. You can provide this globally by setting a connection factory function to the Worker.connect
static property. It is also possible to provide the connection factory to a single worker instance with worker.toConnect(factoryFunc)
. The factory function must return (or resolve with) an amqplib
connection instance. This function will be called any time the worker needs to reconnect to the server.
All messages are assumed to be JSON data and will be parsed prior to being passed as an argument to your perform function. You can override this behavior by using .parse("none")
to disable parsing or by providing your own parsing function with .parse((buffer: Buffer) => any)
.
You can configure other worker behavior at the class level or instance level. For global default options, use Worker.defaults = { ... }
and the provided settings will be defaults for all instances. For options specific to one instance, use worker.with({ ... })
to set options for only that instance.
WorkerOptions is a type that looks like this:
interface WorkerOptions {
tag?: string; # the consumerTag the worker will use when connecting
concurrency: number; # how many jobs may be in progress at any given moment
retries: number; # how many times a job may retry before dying
parser: ParserFunction; # a function that is used to parse a message before perform
deadJobRetensionMs: number; # how long should dead jobs remain in the dead queue
}
This library prioritizes a simple convention over configuration. A worker will assert the exchange it intends to bind a queue to, a work queue to hold its messages, a binding between these two, a retry queue to hold messages awaiting retry, a dead queue to hold messages that have exhausted retries, a requeue exchange, and a binding between the requeue exchange and the work queue to automatically requeue messages that have waited their retry period.
await new Worker()
.from("events")
.perform(() => {})
.start(signal);
The worker defined above will create a work queue named events
. Since no queue name was explicitly provided, a queue name of events
is assumed. A queue to contain retryable messages is created named events.retry
. A queue to hold dead messages is created named events.dead
. An events.requeue
exchange is created in order to re-queue messages that wait their retry period. The events
exchange will be bound to the events
work queue. The events.requeue
exchange will be bound to the events
work queue.
await new Worker()
.from("orders")
.bind("provisioning")
.perform(() => {})
.start(signal);
The worker defined above will create a work queue named provisioning
. A queue to contain retryable messages is created named provisioning.retry
. A queue to hold dead messages is created named provisioning.dead
. An provisioning.requeue
exchange is created in order to re-queue messages that wait their retry period. The orders
exchange will be bound to the provisioning
work queue. The provisioning.requeue
exchange will be bound to the provisioning
work queue.
Jobs retry a default of 25 times with an exponential backoff. When all retries are exhausted, the job is sent to the dead queue.
When a message is delivered to the worker, and the perform function is successful, the message is acknowledged.
When a message is delivered to the worker, and an unexpected error occurs, and the message is unable to be sent to the retry or dead queue, the message is not acknowledged.
When a message is delivered to the worker, and an unexpected error occurs, and the message is sent to the retry or dead queue, the message is acknowledged.
When a message is sent to the retry queue, the message will have an expiration set.
When a message in the retry queue expires, the message is dead-lettered to the requeue exchange.
When a message's deaths meet or exceed the retry limit, the message is sent to the dead queue with an expiration.
When a message in the dead queue expires, the message is discarded.
Run tests with:
npm test
Run build with:
npm run build