From aefed74339e73b8cc48f37c5ecffd3da2a21952c Mon Sep 17 00:00:00 2001 From: Everett Griffiths Date: Sat, 5 Oct 2024 14:19:05 -0400 Subject: [PATCH 1/5] Documentation change: refactors dead-letter exchange example and its description --- lib/broadway_rabbitmq/producer.ex | 120 +++++++++++++++++++++++------- 1 file changed, 92 insertions(+), 28 deletions(-) diff --git a/lib/broadway_rabbitmq/producer.ex b/lib/broadway_rabbitmq/producer.ex index ec03748..3e021b5 100644 --- a/lib/broadway_rabbitmq/producer.ex +++ b/lib/broadway_rabbitmq/producer.ex @@ -376,56 +376,120 @@ defmodule BroadwayRabbitMQ.Producer do ## Dead-letter Exchanges - Here's an example of how to use a dead-letter exchange setup with broadway_rabbitmq: + Dead-letter exchanges are normal RabbitMQ exchanges designated for receiving messages + that have been rejected elsewhere. You can reference a dead-letter exchange when + defining a queue by supplying an `"x-dead-letter-exchange"` argument and optionally an + `"x-dead-letter-routing-key"`. When a queue has a dead-letter exchange defined, then + failing a message with `Broadway.failed/2` or raising an exception in `Broadway.handle_message/3` + causes the message to be republished to the exchange named in the `"x-dead-letter-exchange"` + argument. The message's original routing key is kept unless the `"x-dead-letter-routing-key"` + argument specifies an override. + + A bit more care is needed during setup when using dead-letter exchanges because the dead-letter + exchange must be declared and exist _before_ you attempt to reference it when declaring a queue. + For this reason, you may need to take additional steps when setting up your RabbitMQ instance + beyond what is available to you in the `:after_connect` option. + + You can declare your exchanges and queues in a `Mix` task or before your application starts: + + {:ok, connection} = AMQP.Connection.open() + {:ok, channel} = AMQP.Channel.open(connection) + + # Declare exchanges + :ok = AMQP.Exchange.declare(channel, "my_exchange", :fanout, durable: true) + :ok = AMQP.Exchange.declare(channel, "my_exchange.dlx", :fanout, durable: true) + + # Define and bind queues within the exchange depending on your needs + {:ok, _} = AMQP.Queue.declare(channel, "my_queue.dlx", durable: true) + :ok = AMQP.Queue.bind(channel, "my_queue.dlx", "my_exchange.dlx", []) + + {:ok, _} = + AMQP.Queue.declare(channel, "my_queue", + durable: true, + arguments: [ + {"x-dead-letter-exchange", :longstr, "my_exchange.dlx"}, + {"x-dead-letter-routing-key", :longstr, "my_queue.dlx"} + ] + ) + + :ok = AMQP.Queue.bind(channel, "my_queue", "my_exchange", []) + + + Once you have your exchanges and queues established, you can start Broadway pipelines + to consume messages in the queues. For a thorough example, we need one pipeline which + will fail messages and another pipeline to consume messages from the dead-letter exchange. + + In this example, `MyPipeline` represents the primary pipeline which may reject/fail + messages: defmodule MyPipeline do use Broadway - + require Logger @queue "my_queue" - @exchange "my_exchange" - @queue_dlx "my_queue.dlx" - @exchange_dlx "my_exchange.dlx" def start_link(_opts) do Broadway.start_link(__MODULE__, name: __MODULE__, producer: [ - module: { - BroadwayRabbitMQ.Producer, - on_failure: :reject, - after_connect: &declare_rabbitmq_topology/1, - queue: @queue, - declare: [ - durable: true, - arguments: [ - {"x-dead-letter-exchange", :longstr, @exchange_dlx}, - {"x-dead-letter-routing-key", :longstr, @queue_dlx} - ] - ], - bindings: [{@exchange, []}], - }, + module: {BroadwayRabbitMQ.Producer, on_failure: :reject, queue: @queue}, concurrency: 2 ], processors: [default: [concurrency: 4]] ) end - defp declare_rabbitmq_topology(amqp_channel) do - with :ok <- AMQP.Exchange.declare(amqp_channel, @exchange, :fanout, durable: true), - :ok <- AMQP.Exchange.declare(amqp_channel, @exchange_dlx, :fanout, durable: true), - {:ok, _} <- AMQP.Queue.declare(amqp_channel, @queue_dlx, durable: true), - :ok <- AMQP.Queue.bind(amqp_channel, @queue_dlx, @exchange_dlx) do - :ok - end + @impl true + def handle_message(_processor, message, _context) do + # Raising errors or returning a "failed" message here sends the message to the + # dead-letter queue, e.g. + Logger.debug("Failing message; this should republish to the dead-letter exchange") + + Broadway.Message.failed( + message, + "Failing a message triggers republication to the dead-letter exchange" + ) + end + end + + In order to see the message forwarding in action, we can spin up another pipeline that + consumes messages from the `my_queue.dlx` queue: + + defmodule DeadPipeline do + use Broadway + require Logger + + @queue_dlx "my_queue.dlx" + + def start_link(_opts) do + Broadway.start_link(__MODULE__, + name: __MODULE__, + producer: [ + module: {BroadwayRabbitMQ.Producer, on_failure: :reject, queue: @queue_dlx}, + concurrency: 2 + ], + processors: [default: [concurrency: 4]] + ) end @impl true def handle_message(_processor, message, _context) do - # Raising errors or returning a "failed" message here sends the message to the - # dead-letter queue. + Logger.debug("Dead letter message received!") + message end end + + + To test out the dead-letter republishing behavior, try publishing a message into your + primary exchange and observe that it gets republished and consumed by the queue in the + dead-letter exchange, e.g. + + iex> {:ok, connection} = AMQP.Connection.open() + iex> {:ok, channel} = AMQP.Channel.open(connection) + iex> AMQP.Basic.publish(channel, "my_exchange", "my_queue", "Am I dead?") + :ok + [debug] Failing message; this should republish to the dead-letter exchange + [debug] Dead letter message received! """ use GenStage From 955ab22e4b148c5d51d1153e9e112d6cac12eeca Mon Sep 17 00:00:00 2001 From: Andrea Leopardi Date: Sun, 6 Oct 2024 18:16:24 +0200 Subject: [PATCH 2/5] Apply suggestions from code review --- lib/broadway_rabbitmq/producer.ex | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/lib/broadway_rabbitmq/producer.ex b/lib/broadway_rabbitmq/producer.ex index 3e021b5..04a0f08 100644 --- a/lib/broadway_rabbitmq/producer.ex +++ b/lib/broadway_rabbitmq/producer.ex @@ -378,15 +378,15 @@ defmodule BroadwayRabbitMQ.Producer do Dead-letter exchanges are normal RabbitMQ exchanges designated for receiving messages that have been rejected elsewhere. You can reference a dead-letter exchange when - defining a queue by supplying an `"x-dead-letter-exchange"` argument and optionally an + defining a queue by supplying a `"x-dead-letter-exchange"` argument and optionally a `"x-dead-letter-routing-key"`. When a queue has a dead-letter exchange defined, then - failing a message with `Broadway.failed/2` or raising an exception in `Broadway.handle_message/3` + failing a message with `Broadway.Message.failed/2` or raising an exception in `c:Broadway.handle_message/3` causes the message to be republished to the exchange named in the `"x-dead-letter-exchange"` argument. The message's original routing key is kept unless the `"x-dead-letter-routing-key"` argument specifies an override. A bit more care is needed during setup when using dead-letter exchanges because the dead-letter - exchange must be declared and exist _before_ you attempt to reference it when declaring a queue. + exchange must be declared and exist *before* you attempt to reference it when declaring a queue. For this reason, you may need to take additional steps when setting up your RabbitMQ instance beyond what is available to you in the `:after_connect` option. @@ -414,7 +414,6 @@ defmodule BroadwayRabbitMQ.Producer do :ok = AMQP.Queue.bind(channel, "my_queue", "my_exchange", []) - Once you have your exchanges and queues established, you can start Broadway pipelines to consume messages in the queues. For a thorough example, we need one pipeline which will fail messages and another pipeline to consume messages from the dead-letter exchange. @@ -479,10 +478,9 @@ defmodule BroadwayRabbitMQ.Producer do end - To test out the dead-letter republishing behavior, try publishing a message into your primary exchange and observe that it gets republished and consumed by the queue in the - dead-letter exchange, e.g. + dead-letter exchange: iex> {:ok, connection} = AMQP.Connection.open() iex> {:ok, channel} = AMQP.Channel.open(connection) @@ -490,6 +488,7 @@ defmodule BroadwayRabbitMQ.Producer do :ok [debug] Failing message; this should republish to the dead-letter exchange [debug] Dead letter message received! + """ use GenStage From 446a3015cc16eb5818e7832ec547e01244bc0a24 Mon Sep 17 00:00:00 2001 From: Everett Griffiths Date: Mon, 7 Oct 2024 07:58:46 -0400 Subject: [PATCH 3/5] Fixes indenting on code in @doc from 2 spaces -> 4 spaces for proper syntax --- lib/broadway_rabbitmq/producer.ex | 40 +++++++++++++++---------------- 1 file changed, 20 insertions(+), 20 deletions(-) diff --git a/lib/broadway_rabbitmq/producer.ex b/lib/broadway_rabbitmq/producer.ex index 04a0f08..31506b3 100644 --- a/lib/broadway_rabbitmq/producer.ex +++ b/lib/broadway_rabbitmq/producer.ex @@ -392,27 +392,27 @@ defmodule BroadwayRabbitMQ.Producer do You can declare your exchanges and queues in a `Mix` task or before your application starts: - {:ok, connection} = AMQP.Connection.open() - {:ok, channel} = AMQP.Channel.open(connection) - - # Declare exchanges - :ok = AMQP.Exchange.declare(channel, "my_exchange", :fanout, durable: true) - :ok = AMQP.Exchange.declare(channel, "my_exchange.dlx", :fanout, durable: true) - - # Define and bind queues within the exchange depending on your needs - {:ok, _} = AMQP.Queue.declare(channel, "my_queue.dlx", durable: true) - :ok = AMQP.Queue.bind(channel, "my_queue.dlx", "my_exchange.dlx", []) - - {:ok, _} = - AMQP.Queue.declare(channel, "my_queue", - durable: true, - arguments: [ - {"x-dead-letter-exchange", :longstr, "my_exchange.dlx"}, - {"x-dead-letter-routing-key", :longstr, "my_queue.dlx"} - ] - ) + {:ok, connection} = AMQP.Connection.open() + {:ok, channel} = AMQP.Channel.open(connection) + + # Declare exchanges + :ok = AMQP.Exchange.declare(channel, "my_exchange", :fanout, durable: true) + :ok = AMQP.Exchange.declare(channel, "my_exchange.dlx", :fanout, durable: true) + + # Define and bind queues within the exchange depending on your needs + {:ok, _} = AMQP.Queue.declare(channel, "my_queue.dlx", durable: true) + :ok = AMQP.Queue.bind(channel, "my_queue.dlx", "my_exchange.dlx", []) + + {:ok, _} = + AMQP.Queue.declare(channel, "my_queue", + durable: true, + arguments: [ + {"x-dead-letter-exchange", :longstr, "my_exchange.dlx"}, + {"x-dead-letter-routing-key", :longstr, "my_queue.dlx"} + ] + ) - :ok = AMQP.Queue.bind(channel, "my_queue", "my_exchange", []) + :ok = AMQP.Queue.bind(channel, "my_queue", "my_exchange", []) Once you have your exchanges and queues established, you can start Broadway pipelines to consume messages in the queues. For a thorough example, we need one pipeline which From c87e6d3a59e4176f59ae88dd73bc906f1c70cc69 Mon Sep 17 00:00:00 2001 From: Everett Griffiths <303735+fireproofsocks@users.noreply.github.com> Date: Mon, 7 Oct 2024 15:20:59 -0600 Subject: [PATCH 4/5] Update lib/broadway_rabbitmq/producer.ex Co-authored-by: Andrea Leopardi --- lib/broadway_rabbitmq/producer.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/broadway_rabbitmq/producer.ex b/lib/broadway_rabbitmq/producer.ex index 31506b3..fcc2f89 100644 --- a/lib/broadway_rabbitmq/producer.ex +++ b/lib/broadway_rabbitmq/producer.ex @@ -414,7 +414,7 @@ defmodule BroadwayRabbitMQ.Producer do :ok = AMQP.Queue.bind(channel, "my_queue", "my_exchange", []) - Once you have your exchanges and queues established, you can start Broadway pipelines + Once you have your exchanges and queues declared, you can start Broadway pipelines to consume messages in the queues. For a thorough example, we need one pipeline which will fail messages and another pipeline to consume messages from the dead-letter exchange. From 67457f17bc7ec1c5b0fec4ad0081ca4ae25f9748 Mon Sep 17 00:00:00 2001 From: Everett Griffiths <303735+fireproofsocks@users.noreply.github.com> Date: Mon, 7 Oct 2024 15:21:07 -0600 Subject: [PATCH 5/5] Update lib/broadway_rabbitmq/producer.ex Co-authored-by: Andrea Leopardi --- lib/broadway_rabbitmq/producer.ex | 2 ++ 1 file changed, 2 insertions(+) diff --git a/lib/broadway_rabbitmq/producer.ex b/lib/broadway_rabbitmq/producer.ex index fcc2f89..7d4369c 100644 --- a/lib/broadway_rabbitmq/producer.ex +++ b/lib/broadway_rabbitmq/producer.ex @@ -423,7 +423,9 @@ defmodule BroadwayRabbitMQ.Producer do defmodule MyPipeline do use Broadway + require Logger + @queue "my_queue" def start_link(_opts) do