Skip to content

Commit

Permalink
Merge pull request #5 from GetThru/ENG-2031-stuck-messages-fix
Browse files Browse the repository at this point in the history
Fixes a bug in our Broadway weighted rate limiting. If the next message in the queue was too large to process with the remaining rate limit in the interval, but the rate limit was not 0 yet, Broadway would stop processing messages. The code was assuming that the remaining rate limit would always get to 0. That was true before weights were added.

The fix was to keep track of the buffer length when no messages were emittable. That means there is a message in the buffer, but it's too large to process right now. In that case, we zero out the remaining rate limit. This causes Broadway to close the rate limit like normal and messages continue to process.

There is a lot of extra logging in here. I'm leaving it for now just in case we see more issues in prod. Once we're comfortable that this is working properly, we can circle back and remove the logging.
  • Loading branch information
CuriousCurmudgeon authored Oct 18, 2024
2 parents 1be5ba2 + 0c97ece commit 13b2730
Show file tree
Hide file tree
Showing 9 changed files with 122 additions and 37 deletions.
18 changes: 11 additions & 7 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,13 @@ jobs:
fail-fast: false
matrix:
include:
- pair:
elixir: 1.15.7
otp: 26.1.2
# Earliest-supported versions.
- elixir: "1.7.4"
otp: "21.3.8.17"

# Latest versions.
- elixir: "1.17"
otp: "27.0"
lint: lint
test: test
steps:
Expand All @@ -28,8 +32,8 @@ jobs:
- name: Set up Erlang and Elixir
uses: erlef/setup-beam@v1
with:
otp-version: ${{matrix.pair.otp}}
elixir-version: ${{matrix.pair.elixir}}
otp-version: ${{matrix.otp}}
elixir-version: ${{matrix.elixir}}

- name: Cache Mix dependencies
uses: actions/cache@v3
Expand All @@ -39,9 +43,9 @@ jobs:
deps
_build
key: |
mix-${{ runner.os }}-${{matrix.pair.elixir}}-${{matrix.pair.otp}}-${{ hashFiles('**/mix.lock') }}
mix-${{ runner.os }}-${{matrix.elixir}}-${{matrix.otp}}-${{ hashFiles('**/mix.lock') }}
restore-keys: |
mix-${{ runner.os }}-${{matrix.pair.elixir}}-${{matrix.pair.otp}}-
mix-${{ runner.os }}-${{matrix.elixir}}-${{matrix.otp}}-
- run: mix do deps.get --check-locked, deps.compile
if: steps.cache-deps.outputs.cache-hit != 'true'
Expand Down
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
# Changelog

## v1.1.0 (2024-06-21)

### Bug fix

* No longer set demand to `:accumulate` when draining, for compatibility with GenStage v1.2+. This means that any polling implementation must implement the `prepare_for_draining` callback and stop polling messages. You can check how [BroadwaySQS](https://github.com/dashbitco/broadway_sqs/commit/5b8f18a78e4760b5fcc839ad576be8c63345add0) tackles this problem as an example

### Enhancements

* Log leaked trapped exits

## v1.0.7 (2023-04-22)

### Enhancements
Expand Down
1 change: 1 addition & 0 deletions guides/examples/introduction.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ The following Off-Broadway libraries are available (feel free to send a PR addin

* [off_broadway_amqp10](https://github.com/highmobility/off_broadway_amqp10): [Guide](https://hexdocs.pm/off_broadway_amqp10/)
* [off_broadway_kafka](https://github.com/bbalser/off_broadway_kafka): [Guide](https://hexdocs.pm/off_broadway_kafka/)
* [off_broadway_memory](https://github.com/elliotekj/off_broadway_memory): [Guide](https://hexdocs.pm/off_broadway_memory/)
* [off_broadway_redis](https://github.com/amokan/off_broadway_redis): [Guide](https://hexdocs.pm/off_broadway_redis/)
* [off_broadway_redis_stream](https://github.com/akash-akya/off_broadway_redis_stream): [Guide](https://hexdocs.pm/off_broadway_redis_stream/)
* [off_broadway_splunk](https://github.com/Intility/off_broadway_splunk): [Guide](https://hexdocs.pm/off_broadway_splunk/)
13 changes: 12 additions & 1 deletion lib/broadway/topology/processor_stage.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ defmodule Broadway.Topology.ProcessorStage do
use GenStage

require Logger
alias Broadway.{Message, Acknowledger}
alias Broadway.{Message, Acknowledger, Utility}

@spec start_link(term, GenServer.options()) :: GenServer.on_start()
def start_link(args, stage_options) do
Expand Down Expand Up @@ -70,6 +70,7 @@ defmodule Broadway.Topology.ProcessorStage do
producer: state.producer
},
fn ->
Utility.maybe_log("Preparing to process #{length(messages)} message(s)", state)
{prepared_messages, prepared_failed_messages} = maybe_prepare_messages(messages, state)
{successful_messages, failed_messages} = handle_messages(prepared_messages, [], [], state)
failed_messages = prepared_failed_messages ++ failed_messages
Expand All @@ -83,6 +84,15 @@ defmodule Broadway.Topology.ProcessorStage do
{successful_messages, []}
end

Utility.maybe_log(
inspect(%{
to_ack_count: length(successful_messages_to_ack),
to_forward_count: length(successful_messages_to_forward),
failed_count: length(failed_messages)
}),
state
)

failed_messages =
Acknowledger.maybe_handle_failed_messages(
failed_messages,
Expand All @@ -92,6 +102,7 @@ defmodule Broadway.Topology.ProcessorStage do

try do
Acknowledger.ack_messages(successful_messages_to_ack, failed_messages)
Utility.maybe_log("Acked all messages", state)
catch
kind, reason ->
Logger.error(Exception.format(kind, reason, __STACKTRACE__),
Expand Down
57 changes: 41 additions & 16 deletions lib/broadway/topology/producer_stage.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ defmodule Broadway.Topology.ProducerStage do

alias Broadway.Message
alias Broadway.Topology.RateLimiter
alias Broadway.Utility

require Logger

Expand All @@ -19,12 +20,6 @@ defmodule Broadway.Topology.ProducerStage do

@spec drain(GenServer.server()) :: :ok
def drain(producer) do
# First we set the demand to accumulate. This is to avoid
# polling implementations from re-entering the polling loop
# once they flush any timers during draining. Push implementations
# will still empty out their queues as long as they put them
# in the GenStage buffer.
GenStage.demand(producer, :accumulate)
GenStage.cast(producer, {__MODULE__, :prepare_for_draining})
GenStage.async_info(producer, {__MODULE__, :cancel_consumers})
end
Expand Down Expand Up @@ -317,18 +312,48 @@ defmodule Broadway.Topology.ProducerStage do
# No point in trying to emit messages if no messages are allowed. In that case,
# we close the rate limiting and don't emit anything.
allowed when allowed <= 0 ->
Utility.maybe_log("Rate limiting closed", state)
{%{rate_limiting | state: :closed}, []}

allowed ->
{allowed_left, probably_emittable, buffer} = dequeue_many(buffer, allowed, [])
{allowed_left, probably_emittable, buffer, next_message_weight} =
dequeue_many(buffer, allowed, [])

log_map = %{
initial_allowed: allowed,
allowed_left: allowed_left,
probably_emittable_count: length(probably_emittable),
buffer_length: :queue.len(buffer),
next_message_weight: next_message_weight
}

Utility.maybe_log("dequeue_many result: #{inspect(log_map)}", state)

# If nothing was emittable, but the buffer has messages in it,
# then we want to rate limit allowed_left. This will take the limit
# down to 0.
demand =
case {length(probably_emittable), :queue.len(buffer)} do
{0, buffer_length} when buffer_length > 0 -> allowed_left
_ -> allowed - allowed_left
end

{rate_limiting_state, messages_to_emit, messages_to_buffer} =
rate_limit_messages(
rate_limiter,
probably_emittable,
_probably_emittable_weight = allowed - allowed_left
demand,
state
)

rate_limiting_map = %{
rate_limiting_state: rate_limiting_state,
emit_count: length(messages_to_emit),
to_buffer_count: length(messages_to_buffer)
}

Utility.maybe_log("rate_limit_messages result: #{inspect(rate_limiting_map)}", state)

new_buffer = enqueue_batch_r(buffer, messages_to_buffer)

rate_limiting = %{
Expand All @@ -338,6 +363,7 @@ defmodule Broadway.Topology.ProducerStage do
}

if draining? and :queue.is_empty(new_buffer) do
Utility.maybe_log("Cancelling consumers", state)
cancel_consumers(state)
end

Expand All @@ -347,7 +373,7 @@ defmodule Broadway.Topology.ProducerStage do
{%{state | rate_limiting: rate_limiting}, messages_to_emit}
end

defp dequeue_many(queue, 0, acc), do: {0, Enum.reverse(acc), queue}
defp dequeue_many(queue, 0, acc), do: {0, Enum.reverse(acc), queue, 0}

defp dequeue_many(queue, demand, acc) do
case :queue.out(queue) do
Expand All @@ -356,14 +382,14 @@ defmodule Broadway.Topology.ProducerStage do
# requeue first message and ignore remaining demand since
# the next message would put us over the allowed weight
new_queue = :queue.in_r(message, queue)
{0, Enum.reverse(acc), new_queue}
{demand, Enum.reverse(acc), new_queue, message.weight}
else
new_demand = demand - message.weight
dequeue_many(queue, new_demand, [message | acc])
end

{:empty, queue} ->
{demand, Enum.reverse(acc), queue}
{demand, Enum.reverse(acc), queue, 0}
end
end

Expand All @@ -379,12 +405,11 @@ defmodule Broadway.Topology.ProducerStage do
:queue.join(:queue.from_list(list), queue)
end

defp rate_limit_messages(_state, [], _count) do
{:open, [], []}
end
defp rate_limit_messages(rate_limiter, messages, demand, state) do
left = RateLimiter.rate_limit(rate_limiter, demand)
Utility.maybe_log("Remaining rate limit: #{left}", state)

defp rate_limit_messages(rate_limiter, messages, demand) do
case RateLimiter.rate_limit(rate_limiter, demand) do
case left do
# If no more messages are allowed, we're rate limited but we're able
# to emit all messages that we have.
0 ->
Expand Down
2 changes: 1 addition & 1 deletion lib/broadway/topology/rate_limiter.ex
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ defmodule Broadway.Topology.RateLimiter do
end

def rate_limit(counter, amount)
when is_reference(counter) and is_integer(amount) and amount > 0 do
when is_reference(counter) and is_integer(amount) and amount >= 0 do
:atomics.sub_get(counter, @atomics_index, amount)
end

Expand Down
32 changes: 32 additions & 0 deletions lib/broadway/utility.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
defmodule Broadway.Utility do
@moduledoc """
This entire module is temporary while working on the rate limit bug in the
Broadway fork.
Once we're comfortable with the fix, this module and all usages
of it should be removed. This includes removing a bunch of code around those
log statements to setup data for logging.
"""
require Logger

# ABOMINATION: This helps us only log information about specific
# consumers by looking at the name of the queue. This leaks information
# about our app into our Broadway fork.
@queues [
# "text.carrier.tmobile.tcr_campaign.1.sms.messages",
# "text.carrier.us_cellular.tcr_campaign.1.sms.messages",
"text.provider.bandwidth.sms.messages"
]

def maybe_log(message, %{context: %{config: %{queue: queue}}})
when queue in @queues do
Logger.debug("Queue: #{queue} #{message}")
end

def maybe_log(message, %{module_state: %{config: %{queue: queue}}})
when queue in @queues do
Logger.debug("Queue: #{queue} #{message}")
end

def maybe_log(_message, _state), do: :ok
end
5 changes: 3 additions & 2 deletions mix.exs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
defmodule Broadway.MixProject do
use Mix.Project

@version "1.0.7"
@version "1.1.0"
@description "Build concurrent and multi-stage data ingestion and data processing pipelines"

def project do
Expand Down Expand Up @@ -32,8 +32,9 @@ defmodule Broadway.MixProject do
{:telemetry, "~> 0.4.3 or ~> 1.0"},

# Dev/test dependencies.
{:castore, "~> 1.0", only: :test},
{:ex_doc, ">= 0.19.0", only: :docs},
{:excoveralls, "~> 0.17.0", only: :test}
{:excoveralls, "~> 0.18.0", only: :test}
]
end

Expand Down
21 changes: 11 additions & 10 deletions mix.lock
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
%{
"earmark_parser": {:hex, :earmark_parser, "1.4.31", "a93921cdc6b9b869f519213d5bc79d9e218ba768d7270d46fdcf1c01bacff9e2", [:mix], [], "hexpm", "317d367ee0335ef037a87e46c91a2269fef6306413f731e8ec11fc45a7efd059"},
"ex_doc": {:hex, :ex_doc, "0.29.4", "6257ecbb20c7396b1fe5accd55b7b0d23f44b6aa18017b415cb4c2b91d997729", [:mix], [{:earmark_parser, "~> 1.4.31", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "2c6699a737ae46cb61e4ed012af931b57b699643b24dabe2400a8168414bc4f5"},
"excoveralls": {:hex, :excoveralls, "0.17.0", "279f124dba347903bb654bc40745c493ae265d45040001b4899ea1edf88078c7", [:mix], [{:castore, "~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "08b638d114387a888f9cb8d65f2a0021ec04c3e447b793efa7c1e734aba93004"},
"gen_stage": {:hex, :gen_stage, "1.0.0", "51c8ae56ff54f9a2a604ca583798c210ad245f415115453b773b621c49776df5", [:mix], [], "hexpm", "1d9fc978db5305ac54e6f5fec7adf80cd893b1000cf78271564c516aa2af7706"},
"castore": {:hex, :castore, "1.0.7", "b651241514e5f6956028147fe6637f7ac13802537e895a724f90bf3e36ddd1dd", [:mix], [], "hexpm", "da7785a4b0d2a021cd1292a60875a784b6caef71e76bf4917bdee1f390455cf5"},
"earmark_parser": {:hex, :earmark_parser, "1.4.39", "424642f8335b05bb9eb611aa1564c148a8ee35c9c8a8bba6e129d51a3e3c6769", [:mix], [], "hexpm", "06553a88d1f1846da9ef066b87b57c6f605552cfbe40d20bd8d59cc6bde41944"},
"ex_doc": {:hex, :ex_doc, "0.34.1", "9751a0419bc15bc7580c73fde506b17b07f6402a1e5243be9e0f05a68c723368", [:mix], [{:earmark_parser, "~> 1.4.39", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.0", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14 or ~> 1.0", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1 or ~> 1.0", [hex: :makeup_erlang, repo: "hexpm", optional: false]}, {:makeup_html, ">= 0.1.0", [hex: :makeup_html, repo: "hexpm", optional: true]}], "hexpm", "d441f1a86a235f59088978eff870de2e815e290e44a8bd976fe5d64470a4c9d2"},
"excoveralls": {:hex, :excoveralls, "0.18.1", "a6f547570c6b24ec13f122a5634833a063aec49218f6fff27de9df693a15588c", [:mix], [{:castore, "~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "d65f79db146bb20399f23046015974de0079668b9abb2f5aac074d078da60b8d"},
"gen_stage": {:hex, :gen_stage, "1.2.1", "19d8b5e9a5996d813b8245338a28246307fd8b9c99d1237de199d21efc4c76a1", [:mix], [], "hexpm", "83e8be657fa05b992ffa6ac1e3af6d57aa50aace8f691fcf696ff02f8335b001"},
"jason": {:hex, :jason, "1.4.1", "af1504e35f629ddcdd6addb3513c3853991f694921b1b9368b0bd32beb9f1b63", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fbb01ecdfd565b56261302f7e1fcc27c4fb8f32d56eab74db621fc154604a7a1"},
"makeup": {:hex, :makeup, "1.1.0", "6b67c8bc2882a6b6a445859952a602afc1a41c2e08379ca057c0f525366fc3ca", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "0a45ed501f4a8897f580eabf99a2e5234ea3e75a4373c8a52824f6e873be57a6"},
"makeup_elixir": {:hex, :makeup_elixir, "0.16.1", "cc9e3ca312f1cfeccc572b37a09980287e243648108384b97ff2b76e505c3555", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "e127a341ad1b209bd80f7bd1620a15693a9908ed780c3b763bccf7d200c767c6"},
"makeup_erlang": {:hex, :makeup_erlang, "0.1.1", "3fcb7f09eb9d98dc4d208f49cc955a34218fc41ff6b84df7c75b3e6e533cc65f", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "174d0809e98a4ef0b3309256cbf97101c6ec01c4ab0b23e926a9e17df2077cbb"},
"nimble_options": {:hex, :nimble_options, "0.5.2", "42703307b924880f8c08d97719da7472673391905f528259915782bb346e0a1b", [:mix], [], "hexpm", "4da7f904b915fd71db549bcdc25f8d56f378ef7ae07dc1d372cbe72ba950dce0"},
"nimble_parsec": {:hex, :nimble_parsec, "1.3.0", "9e18a119d9efc3370a3ef2a937bf0b24c088d9c4bf0ba9d7c3751d49d347d035", [:mix], [], "hexpm", "7977f183127a7cbe9346981e2f480dc04c55ffddaef746bd58debd566070eef8"},
"telemetry": {:hex, :telemetry, "0.4.3", "a06428a514bdbc63293cd9a6263aad00ddeb66f608163bdec7c8995784080818", [:rebar3], [], "hexpm", "eb72b8365ffda5bed68a620d1da88525e326cb82a75ee61354fc24b844768041"},
"makeup": {:hex, :makeup, "1.1.2", "9ba8837913bdf757787e71c1581c21f9d2455f4dd04cfca785c70bbfff1a76a3", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "cce1566b81fbcbd21eca8ffe808f33b221f9eee2cbc7a1706fc3da9ff18e6cac"},
"makeup_elixir": {:hex, :makeup_elixir, "0.16.2", "627e84b8e8bf22e60a2579dad15067c755531fea049ae26ef1020cad58fe9578", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "41193978704763f6bbe6cc2758b84909e62984c7752b3784bd3c218bb341706b"},
"makeup_erlang": {:hex, :makeup_erlang, "1.0.0", "6f0eff9c9c489f26b69b61440bf1b238d95badae49adac77973cbacae87e3c2e", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "ea7a9307de9d1548d2a72d299058d1fd2339e3d398560a0e46c27dab4891e4d2"},
"nimble_options": {:hex, :nimble_options, "1.1.1", "e3a492d54d85fc3fd7c5baf411d9d2852922f66e69476317787a7b2bb000a61b", [:mix], [], "hexpm", "821b2470ca9442c4b6984882fe9bb0389371b8ddec4d45a9504f00a66f650b44"},
"nimble_parsec": {:hex, :nimble_parsec, "1.4.0", "51f9b613ea62cfa97b25ccc2c1b4216e81df970acd8e16e8d1bdc58fef21370d", [:mix], [], "hexpm", "9c565862810fb383e9838c1dd2d7d2c437b3d13b267414ba6af33e50d2d1cf28"},
"telemetry": {:hex, :telemetry, "1.2.1", "68fdfe8d8f05a8428483a97d7aab2f268aaff24b49e0f599faa091f1d4e7f61c", [:rebar3], [], "hexpm", "dad9ce9d8effc621708f99eac538ef1cbe05d6a874dd741de2e689c47feafed5"},
}

0 comments on commit 13b2730

Please sign in to comment.