diff --git a/lib/broadway_rabbitmq/producer.ex b/lib/broadway_rabbitmq/producer.ex index ec03748..7d4369c 100644 --- a/lib/broadway_rabbitmq/producer.ex +++ b/lib/broadway_rabbitmq/producer.ex @@ -376,56 +376,121 @@ defmodule BroadwayRabbitMQ.Producer do ## Dead-letter Exchanges - Here's an example of how to use a dead-letter exchange setup with broadway_rabbitmq: + Dead-letter exchanges are normal RabbitMQ exchanges designated for receiving messages + that have been rejected elsewhere. You can reference a dead-letter exchange when + defining a queue by supplying a `"x-dead-letter-exchange"` argument and optionally a + `"x-dead-letter-routing-key"`. When a queue has a dead-letter exchange defined, then + failing a message with `Broadway.Message.failed/2` or raising an exception in `c:Broadway.handle_message/3` + causes the message to be republished to the exchange named in the `"x-dead-letter-exchange"` + argument. The message's original routing key is kept unless the `"x-dead-letter-routing-key"` + argument specifies an override. + + A bit more care is needed during setup when using dead-letter exchanges because the dead-letter + exchange must be declared and exist *before* you attempt to reference it when declaring a queue. + For this reason, you may need to take additional steps when setting up your RabbitMQ instance + beyond what is available to you in the `:after_connect` option. + + You can declare your exchanges and queues in a `Mix` task or before your application starts: + + {:ok, connection} = AMQP.Connection.open() + {:ok, channel} = AMQP.Channel.open(connection) + + # Declare exchanges + :ok = AMQP.Exchange.declare(channel, "my_exchange", :fanout, durable: true) + :ok = AMQP.Exchange.declare(channel, "my_exchange.dlx", :fanout, durable: true) + + # Define and bind queues within the exchange depending on your needs + {:ok, _} = AMQP.Queue.declare(channel, "my_queue.dlx", durable: true) + :ok = AMQP.Queue.bind(channel, "my_queue.dlx", "my_exchange.dlx", []) + + {:ok, _} = + AMQP.Queue.declare(channel, "my_queue", + durable: true, + arguments: [ + {"x-dead-letter-exchange", :longstr, "my_exchange.dlx"}, + {"x-dead-letter-routing-key", :longstr, "my_queue.dlx"} + ] + ) + + :ok = AMQP.Queue.bind(channel, "my_queue", "my_exchange", []) + + Once you have your exchanges and queues declared, you can start Broadway pipelines + to consume messages in the queues. For a thorough example, we need one pipeline which + will fail messages and another pipeline to consume messages from the dead-letter exchange. + + In this example, `MyPipeline` represents the primary pipeline which may reject/fail + messages: defmodule MyPipeline do use Broadway + require Logger + @queue "my_queue" - @exchange "my_exchange" - @queue_dlx "my_queue.dlx" - @exchange_dlx "my_exchange.dlx" def start_link(_opts) do Broadway.start_link(__MODULE__, name: __MODULE__, producer: [ - module: { - BroadwayRabbitMQ.Producer, - on_failure: :reject, - after_connect: &declare_rabbitmq_topology/1, - queue: @queue, - declare: [ - durable: true, - arguments: [ - {"x-dead-letter-exchange", :longstr, @exchange_dlx}, - {"x-dead-letter-routing-key", :longstr, @queue_dlx} - ] - ], - bindings: [{@exchange, []}], - }, + module: {BroadwayRabbitMQ.Producer, on_failure: :reject, queue: @queue}, concurrency: 2 ], processors: [default: [concurrency: 4]] ) end - defp declare_rabbitmq_topology(amqp_channel) do - with :ok <- AMQP.Exchange.declare(amqp_channel, @exchange, :fanout, durable: true), - :ok <- AMQP.Exchange.declare(amqp_channel, @exchange_dlx, :fanout, durable: true), - {:ok, _} <- AMQP.Queue.declare(amqp_channel, @queue_dlx, durable: true), - :ok <- AMQP.Queue.bind(amqp_channel, @queue_dlx, @exchange_dlx) do - :ok - end + @impl true + def handle_message(_processor, message, _context) do + # Raising errors or returning a "failed" message here sends the message to the + # dead-letter queue, e.g. + Logger.debug("Failing message; this should republish to the dead-letter exchange") + + Broadway.Message.failed( + message, + "Failing a message triggers republication to the dead-letter exchange" + ) + end + end + + In order to see the message forwarding in action, we can spin up another pipeline that + consumes messages from the `my_queue.dlx` queue: + + defmodule DeadPipeline do + use Broadway + require Logger + + @queue_dlx "my_queue.dlx" + + def start_link(_opts) do + Broadway.start_link(__MODULE__, + name: __MODULE__, + producer: [ + module: {BroadwayRabbitMQ.Producer, on_failure: :reject, queue: @queue_dlx}, + concurrency: 2 + ], + processors: [default: [concurrency: 4]] + ) end @impl true def handle_message(_processor, message, _context) do - # Raising errors or returning a "failed" message here sends the message to the - # dead-letter queue. + Logger.debug("Dead letter message received!") + message end end + + To test out the dead-letter republishing behavior, try publishing a message into your + primary exchange and observe that it gets republished and consumed by the queue in the + dead-letter exchange: + + iex> {:ok, connection} = AMQP.Connection.open() + iex> {:ok, channel} = AMQP.Channel.open(connection) + iex> AMQP.Basic.publish(channel, "my_exchange", "my_queue", "Am I dead?") + :ok + [debug] Failing message; this should republish to the dead-letter exchange + [debug] Dead letter message received! + """ use GenStage