Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ENG-2031: Stuck messages fix #5

Merged
merged 14 commits into from
Oct 18, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 11 additions & 7 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,13 @@ jobs:
fail-fast: false
matrix:
include:
- pair:
elixir: 1.15.7
otp: 26.1.2
# Earliest-supported versions.
- elixir: "1.7.4"
otp: "21.3.8.17"

# Latest versions.
- elixir: "1.17"
otp: "27.0"
lint: lint
test: test
steps:
Expand All @@ -28,8 +32,8 @@ jobs:
- name: Set up Erlang and Elixir
uses: erlef/setup-beam@v1
with:
otp-version: ${{matrix.pair.otp}}
elixir-version: ${{matrix.pair.elixir}}
otp-version: ${{matrix.otp}}
elixir-version: ${{matrix.elixir}}

- name: Cache Mix dependencies
uses: actions/cache@v3
Expand All @@ -39,9 +43,9 @@ jobs:
deps
_build
key: |
mix-${{ runner.os }}-${{matrix.pair.elixir}}-${{matrix.pair.otp}}-${{ hashFiles('**/mix.lock') }}
mix-${{ runner.os }}-${{matrix.elixir}}-${{matrix.otp}}-${{ hashFiles('**/mix.lock') }}
restore-keys: |
mix-${{ runner.os }}-${{matrix.pair.elixir}}-${{matrix.pair.otp}}-
mix-${{ runner.os }}-${{matrix.elixir}}-${{matrix.otp}}-

- run: mix do deps.get --check-locked, deps.compile
if: steps.cache-deps.outputs.cache-hit != 'true'
Expand Down
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
# Changelog

## v1.1.0 (2024-06-21)

### Bug fix

* No longer set demand to `:accumulate` when draining, for compatibility with GenStage v1.2+. This means that any polling implementation must implement the `prepare_for_draining` callback and stop polling messages. You can check how [BroadwaySQS](https://github.com/dashbitco/broadway_sqs/commit/5b8f18a78e4760b5fcc839ad576be8c63345add0) tackles this problem as an example

### Enhancements

* Log leaked trapped exits

## v1.0.7 (2023-04-22)

### Enhancements
Expand Down
1 change: 1 addition & 0 deletions guides/examples/introduction.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ The following Off-Broadway libraries are available (feel free to send a PR addin

* [off_broadway_amqp10](https://github.com/highmobility/off_broadway_amqp10): [Guide](https://hexdocs.pm/off_broadway_amqp10/)
* [off_broadway_kafka](https://github.com/bbalser/off_broadway_kafka): [Guide](https://hexdocs.pm/off_broadway_kafka/)
* [off_broadway_memory](https://github.com/elliotekj/off_broadway_memory): [Guide](https://hexdocs.pm/off_broadway_memory/)
* [off_broadway_redis](https://github.com/amokan/off_broadway_redis): [Guide](https://hexdocs.pm/off_broadway_redis/)
* [off_broadway_redis_stream](https://github.com/akash-akya/off_broadway_redis_stream): [Guide](https://hexdocs.pm/off_broadway_redis_stream/)
* [off_broadway_splunk](https://github.com/Intility/off_broadway_splunk): [Guide](https://hexdocs.pm/off_broadway_splunk/)
13 changes: 12 additions & 1 deletion lib/broadway/topology/processor_stage.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ defmodule Broadway.Topology.ProcessorStage do
use GenStage

require Logger
alias Broadway.{Message, Acknowledger}
alias Broadway.{Message, Acknowledger, Utility}

@spec start_link(term, GenServer.options()) :: GenServer.on_start()
def start_link(args, stage_options) do
Expand Down Expand Up @@ -70,6 +70,7 @@ defmodule Broadway.Topology.ProcessorStage do
producer: state.producer
},
fn ->
Utility.maybe_log("Preparing to process #{length(messages)} message(s)", state)
{prepared_messages, prepared_failed_messages} = maybe_prepare_messages(messages, state)
{successful_messages, failed_messages} = handle_messages(prepared_messages, [], [], state)
failed_messages = prepared_failed_messages ++ failed_messages
Expand All @@ -83,6 +84,15 @@ defmodule Broadway.Topology.ProcessorStage do
{successful_messages, []}
end

Utility.maybe_log(
inspect(%{
to_ack_count: length(successful_messages_to_ack),
to_forward_count: length(successful_messages_to_forward),
failed_count: length(failed_messages)
}),
state
)

failed_messages =
Acknowledger.maybe_handle_failed_messages(
failed_messages,
Expand All @@ -92,6 +102,7 @@ defmodule Broadway.Topology.ProcessorStage do

try do
Acknowledger.ack_messages(successful_messages_to_ack, failed_messages)
Utility.maybe_log("Acked all messages", state)
catch
kind, reason ->
Logger.error(Exception.format(kind, reason, __STACKTRACE__),
Expand Down
57 changes: 41 additions & 16 deletions lib/broadway/topology/producer_stage.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ defmodule Broadway.Topology.ProducerStage do

alias Broadway.Message
alias Broadway.Topology.RateLimiter
alias Broadway.Utility

require Logger

Expand All @@ -19,12 +20,6 @@ defmodule Broadway.Topology.ProducerStage do

@spec drain(GenServer.server()) :: :ok
def drain(producer) do
# First we set the demand to accumulate. This is to avoid
# polling implementations from re-entering the polling loop
# once they flush any timers during draining. Push implementations
# will still empty out their queues as long as they put them
# in the GenStage buffer.
GenStage.demand(producer, :accumulate)
GenStage.cast(producer, {__MODULE__, :prepare_for_draining})
GenStage.async_info(producer, {__MODULE__, :cancel_consumers})
end
Expand Down Expand Up @@ -317,18 +312,48 @@ defmodule Broadway.Topology.ProducerStage do
# No point in trying to emit messages if no messages are allowed. In that case,
# we close the rate limiting and don't emit anything.
allowed when allowed <= 0 ->
Utility.maybe_log("Rate limiting closed", state)
{%{rate_limiting | state: :closed}, []}

allowed ->
{allowed_left, probably_emittable, buffer} = dequeue_many(buffer, allowed, [])
{allowed_left, probably_emittable, buffer, next_message_weight} =
dequeue_many(buffer, allowed, [])

log_map = %{
initial_allowed: allowed,
allowed_left: allowed_left,
probably_emittable_count: length(probably_emittable),
buffer_length: :queue.len(buffer),
next_message_weight: next_message_weight
}

Utility.maybe_log("dequeue_many result: #{inspect(log_map)}", state)

# If nothing was emittable, but the buffer has messages in it,
# then we want to rate limit allowed_left. This will take the limit
# down to 0.
demand =
case {length(probably_emittable), :queue.len(buffer)} do
{0, buffer_length} when buffer_length > 0 -> allowed_left
_ -> allowed - allowed_left
end

{rate_limiting_state, messages_to_emit, messages_to_buffer} =
rate_limit_messages(
rate_limiter,
probably_emittable,
_probably_emittable_weight = allowed - allowed_left
demand,
state
)

rate_limiting_map = %{
rate_limiting_state: rate_limiting_state,
emit_count: length(messages_to_emit),
to_buffer_count: length(messages_to_buffer)
}

Utility.maybe_log("rate_limit_messages result: #{inspect(rate_limiting_map)}", state)

new_buffer = enqueue_batch_r(buffer, messages_to_buffer)

rate_limiting = %{
Expand All @@ -338,6 +363,7 @@ defmodule Broadway.Topology.ProducerStage do
}

if draining? and :queue.is_empty(new_buffer) do
Utility.maybe_log("Cancelling consumers", state)
cancel_consumers(state)
end

Expand All @@ -347,7 +373,7 @@ defmodule Broadway.Topology.ProducerStage do
{%{state | rate_limiting: rate_limiting}, messages_to_emit}
end

defp dequeue_many(queue, 0, acc), do: {0, Enum.reverse(acc), queue}
defp dequeue_many(queue, 0, acc), do: {0, Enum.reverse(acc), queue, 0}

defp dequeue_many(queue, demand, acc) do
case :queue.out(queue) do
Expand All @@ -356,14 +382,14 @@ defmodule Broadway.Topology.ProducerStage do
# requeue first message and ignore remaining demand since
# the next message would put us over the allowed weight
new_queue = :queue.in_r(message, queue)
{0, Enum.reverse(acc), new_queue}
{demand, Enum.reverse(acc), new_queue, message.weight}
else
new_demand = demand - message.weight
dequeue_many(queue, new_demand, [message | acc])
end

{:empty, queue} ->
{demand, Enum.reverse(acc), queue}
{demand, Enum.reverse(acc), queue, 0}
end
end

Expand All @@ -379,12 +405,11 @@ defmodule Broadway.Topology.ProducerStage do
:queue.join(:queue.from_list(list), queue)
end

defp rate_limit_messages(_state, [], _count) do
{:open, [], []}
end
defp rate_limit_messages(rate_limiter, messages, demand, state) do
left = RateLimiter.rate_limit(rate_limiter, demand)
Utility.maybe_log("Remaining rate limit: #{left}", state)

defp rate_limit_messages(rate_limiter, messages, demand) do
case RateLimiter.rate_limit(rate_limiter, demand) do
case left do
# If no more messages are allowed, we're rate limited but we're able
# to emit all messages that we have.
0 ->
Expand Down
32 changes: 32 additions & 0 deletions lib/broadway/utility.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
defmodule Broadway.Utility do
@moduledoc """
This entire module is temporary while working on the rate limit bug in the
Broadway fork.

Once we're comfortable with the fix, this module and all usages
of it should be removed. This includes removing a bunch of code around those
log statements to setup data for logging.
"""
require Logger

# ABOMINATION: This helps us only log information about specific
# consumers by looking at the name of the queue. This leaks information
# about our app into our Broadway fork.
@queues [
# "text.carrier.tmobile.tcr_campaign.1.sms.messages",
# "text.carrier.us_cellular.tcr_campaign.1.sms.messages",
"text.provider.bandwidth.sms.messages"
]

def maybe_log(message, %{context: %{config: %{queue: queue}}})
when queue in @queues do
Logger.debug("Queue: #{queue} #{message}")
end

def maybe_log(message, %{module_state: %{config: %{queue: queue}}})
when queue in @queues do
Logger.debug("Queue: #{queue} #{message}")
end

def maybe_log(_message, _state), do: :ok
end
5 changes: 3 additions & 2 deletions mix.exs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
defmodule Broadway.MixProject do
use Mix.Project

@version "1.0.7"
@version "1.1.0"
@description "Build concurrent and multi-stage data ingestion and data processing pipelines"

def project do
Expand Down Expand Up @@ -32,8 +32,9 @@ defmodule Broadway.MixProject do
{:telemetry, "~> 0.4.3 or ~> 1.0"},

# Dev/test dependencies.
{:castore, "~> 1.0", only: :test},
{:ex_doc, ">= 0.19.0", only: :docs},
{:excoveralls, "~> 0.17.0", only: :test}
{:excoveralls, "~> 0.18.0", only: :test}
]
end

Expand Down
21 changes: 11 additions & 10 deletions mix.lock
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
%{
"earmark_parser": {:hex, :earmark_parser, "1.4.31", "a93921cdc6b9b869f519213d5bc79d9e218ba768d7270d46fdcf1c01bacff9e2", [:mix], [], "hexpm", "317d367ee0335ef037a87e46c91a2269fef6306413f731e8ec11fc45a7efd059"},
"ex_doc": {:hex, :ex_doc, "0.29.4", "6257ecbb20c7396b1fe5accd55b7b0d23f44b6aa18017b415cb4c2b91d997729", [:mix], [{:earmark_parser, "~> 1.4.31", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "2c6699a737ae46cb61e4ed012af931b57b699643b24dabe2400a8168414bc4f5"},
"excoveralls": {:hex, :excoveralls, "0.17.0", "279f124dba347903bb654bc40745c493ae265d45040001b4899ea1edf88078c7", [:mix], [{:castore, "~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "08b638d114387a888f9cb8d65f2a0021ec04c3e447b793efa7c1e734aba93004"},
"gen_stage": {:hex, :gen_stage, "1.0.0", "51c8ae56ff54f9a2a604ca583798c210ad245f415115453b773b621c49776df5", [:mix], [], "hexpm", "1d9fc978db5305ac54e6f5fec7adf80cd893b1000cf78271564c516aa2af7706"},
"castore": {:hex, :castore, "1.0.7", "b651241514e5f6956028147fe6637f7ac13802537e895a724f90bf3e36ddd1dd", [:mix], [], "hexpm", "da7785a4b0d2a021cd1292a60875a784b6caef71e76bf4917bdee1f390455cf5"},
"earmark_parser": {:hex, :earmark_parser, "1.4.39", "424642f8335b05bb9eb611aa1564c148a8ee35c9c8a8bba6e129d51a3e3c6769", [:mix], [], "hexpm", "06553a88d1f1846da9ef066b87b57c6f605552cfbe40d20bd8d59cc6bde41944"},
"ex_doc": {:hex, :ex_doc, "0.34.1", "9751a0419bc15bc7580c73fde506b17b07f6402a1e5243be9e0f05a68c723368", [:mix], [{:earmark_parser, "~> 1.4.39", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.0", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14 or ~> 1.0", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1 or ~> 1.0", [hex: :makeup_erlang, repo: "hexpm", optional: false]}, {:makeup_html, ">= 0.1.0", [hex: :makeup_html, repo: "hexpm", optional: true]}], "hexpm", "d441f1a86a235f59088978eff870de2e815e290e44a8bd976fe5d64470a4c9d2"},
"excoveralls": {:hex, :excoveralls, "0.18.1", "a6f547570c6b24ec13f122a5634833a063aec49218f6fff27de9df693a15588c", [:mix], [{:castore, "~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "d65f79db146bb20399f23046015974de0079668b9abb2f5aac074d078da60b8d"},
"gen_stage": {:hex, :gen_stage, "1.2.1", "19d8b5e9a5996d813b8245338a28246307fd8b9c99d1237de199d21efc4c76a1", [:mix], [], "hexpm", "83e8be657fa05b992ffa6ac1e3af6d57aa50aace8f691fcf696ff02f8335b001"},
"jason": {:hex, :jason, "1.4.1", "af1504e35f629ddcdd6addb3513c3853991f694921b1b9368b0bd32beb9f1b63", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fbb01ecdfd565b56261302f7e1fcc27c4fb8f32d56eab74db621fc154604a7a1"},
"makeup": {:hex, :makeup, "1.1.0", "6b67c8bc2882a6b6a445859952a602afc1a41c2e08379ca057c0f525366fc3ca", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "0a45ed501f4a8897f580eabf99a2e5234ea3e75a4373c8a52824f6e873be57a6"},
"makeup_elixir": {:hex, :makeup_elixir, "0.16.1", "cc9e3ca312f1cfeccc572b37a09980287e243648108384b97ff2b76e505c3555", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "e127a341ad1b209bd80f7bd1620a15693a9908ed780c3b763bccf7d200c767c6"},
"makeup_erlang": {:hex, :makeup_erlang, "0.1.1", "3fcb7f09eb9d98dc4d208f49cc955a34218fc41ff6b84df7c75b3e6e533cc65f", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "174d0809e98a4ef0b3309256cbf97101c6ec01c4ab0b23e926a9e17df2077cbb"},
"nimble_options": {:hex, :nimble_options, "0.5.2", "42703307b924880f8c08d97719da7472673391905f528259915782bb346e0a1b", [:mix], [], "hexpm", "4da7f904b915fd71db549bcdc25f8d56f378ef7ae07dc1d372cbe72ba950dce0"},
"nimble_parsec": {:hex, :nimble_parsec, "1.3.0", "9e18a119d9efc3370a3ef2a937bf0b24c088d9c4bf0ba9d7c3751d49d347d035", [:mix], [], "hexpm", "7977f183127a7cbe9346981e2f480dc04c55ffddaef746bd58debd566070eef8"},
"telemetry": {:hex, :telemetry, "0.4.3", "a06428a514bdbc63293cd9a6263aad00ddeb66f608163bdec7c8995784080818", [:rebar3], [], "hexpm", "eb72b8365ffda5bed68a620d1da88525e326cb82a75ee61354fc24b844768041"},
"makeup": {:hex, :makeup, "1.1.2", "9ba8837913bdf757787e71c1581c21f9d2455f4dd04cfca785c70bbfff1a76a3", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "cce1566b81fbcbd21eca8ffe808f33b221f9eee2cbc7a1706fc3da9ff18e6cac"},
"makeup_elixir": {:hex, :makeup_elixir, "0.16.2", "627e84b8e8bf22e60a2579dad15067c755531fea049ae26ef1020cad58fe9578", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "41193978704763f6bbe6cc2758b84909e62984c7752b3784bd3c218bb341706b"},
"makeup_erlang": {:hex, :makeup_erlang, "1.0.0", "6f0eff9c9c489f26b69b61440bf1b238d95badae49adac77973cbacae87e3c2e", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "ea7a9307de9d1548d2a72d299058d1fd2339e3d398560a0e46c27dab4891e4d2"},
"nimble_options": {:hex, :nimble_options, "1.1.1", "e3a492d54d85fc3fd7c5baf411d9d2852922f66e69476317787a7b2bb000a61b", [:mix], [], "hexpm", "821b2470ca9442c4b6984882fe9bb0389371b8ddec4d45a9504f00a66f650b44"},
"nimble_parsec": {:hex, :nimble_parsec, "1.4.0", "51f9b613ea62cfa97b25ccc2c1b4216e81df970acd8e16e8d1bdc58fef21370d", [:mix], [], "hexpm", "9c565862810fb383e9838c1dd2d7d2c437b3d13b267414ba6af33e50d2d1cf28"},
"telemetry": {:hex, :telemetry, "1.2.1", "68fdfe8d8f05a8428483a97d7aab2f268aaff24b49e0f599faa091f1d4e7f61c", [:rebar3], [], "hexpm", "dad9ce9d8effc621708f99eac538ef1cbe05d6a874dd741de2e689c47feafed5"},
}
Loading