diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 88fe8cf..05ee070 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -16,9 +16,13 @@ jobs: fail-fast: false matrix: include: - - pair: - elixir: 1.15.7 - otp: 26.1.2 + # Earliest-supported versions. + - elixir: "1.7.4" + otp: "21.3.8.17" + + # Latest versions. + - elixir: "1.17" + otp: "27.0" lint: lint test: test steps: @@ -28,8 +32,8 @@ jobs: - name: Set up Erlang and Elixir uses: erlef/setup-beam@v1 with: - otp-version: ${{matrix.pair.otp}} - elixir-version: ${{matrix.pair.elixir}} + otp-version: ${{matrix.otp}} + elixir-version: ${{matrix.elixir}} - name: Cache Mix dependencies uses: actions/cache@v3 @@ -39,9 +43,9 @@ jobs: deps _build key: | - mix-${{ runner.os }}-${{matrix.pair.elixir}}-${{matrix.pair.otp}}-${{ hashFiles('**/mix.lock') }} + mix-${{ runner.os }}-${{matrix.elixir}}-${{matrix.otp}}-${{ hashFiles('**/mix.lock') }} restore-keys: | - mix-${{ runner.os }}-${{matrix.pair.elixir}}-${{matrix.pair.otp}}- + mix-${{ runner.os }}-${{matrix.elixir}}-${{matrix.otp}}- - run: mix do deps.get --check-locked, deps.compile if: steps.cache-deps.outputs.cache-hit != 'true' diff --git a/CHANGELOG.md b/CHANGELOG.md index 05ba305..5ae4ac2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,15 @@ # Changelog +## v1.1.0 (2024-06-21) + +### Bug fix + + * No longer set demand to `:accumulate` when draining, for compatibility with GenStage v1.2+. This means that any polling implementation must implement the `prepare_for_draining` callback and stop polling messages. You can check how [BroadwaySQS](https://github.com/dashbitco/broadway_sqs/commit/5b8f18a78e4760b5fcc839ad576be8c63345add0) tackles this problem as an example + +### Enhancements + + * Log leaked trapped exits + ## v1.0.7 (2023-04-22) ### Enhancements diff --git a/guides/examples/introduction.md b/guides/examples/introduction.md index 72c5abd..5a35086 100644 --- a/guides/examples/introduction.md +++ b/guides/examples/introduction.md @@ -32,6 +32,7 @@ The following Off-Broadway libraries are available (feel free to send a PR addin * [off_broadway_amqp10](https://github.com/highmobility/off_broadway_amqp10): [Guide](https://hexdocs.pm/off_broadway_amqp10/) * [off_broadway_kafka](https://github.com/bbalser/off_broadway_kafka): [Guide](https://hexdocs.pm/off_broadway_kafka/) + * [off_broadway_memory](https://github.com/elliotekj/off_broadway_memory): [Guide](https://hexdocs.pm/off_broadway_memory/) * [off_broadway_redis](https://github.com/amokan/off_broadway_redis): [Guide](https://hexdocs.pm/off_broadway_redis/) * [off_broadway_redis_stream](https://github.com/akash-akya/off_broadway_redis_stream): [Guide](https://hexdocs.pm/off_broadway_redis_stream/) * [off_broadway_splunk](https://github.com/Intility/off_broadway_splunk): [Guide](https://hexdocs.pm/off_broadway_splunk/) diff --git a/lib/broadway/topology/processor_stage.ex b/lib/broadway/topology/processor_stage.ex index e0e1014..2a32b8f 100644 --- a/lib/broadway/topology/processor_stage.ex +++ b/lib/broadway/topology/processor_stage.ex @@ -3,7 +3,7 @@ defmodule Broadway.Topology.ProcessorStage do use GenStage require Logger - alias Broadway.{Message, Acknowledger} + alias Broadway.{Message, Acknowledger, Utility} @spec start_link(term, GenServer.options()) :: GenServer.on_start() def start_link(args, stage_options) do @@ -70,6 +70,7 @@ defmodule Broadway.Topology.ProcessorStage do producer: state.producer }, fn -> + Utility.maybe_log("Preparing to process #{length(messages)} message(s)", state) {prepared_messages, prepared_failed_messages} = maybe_prepare_messages(messages, state) {successful_messages, failed_messages} = handle_messages(prepared_messages, [], [], state) failed_messages = prepared_failed_messages ++ failed_messages @@ -83,6 +84,15 @@ defmodule Broadway.Topology.ProcessorStage do {successful_messages, []} end + Utility.maybe_log( + inspect(%{ + to_ack_count: length(successful_messages_to_ack), + to_forward_count: length(successful_messages_to_forward), + failed_count: length(failed_messages) + }), + state + ) + failed_messages = Acknowledger.maybe_handle_failed_messages( failed_messages, @@ -92,6 +102,7 @@ defmodule Broadway.Topology.ProcessorStage do try do Acknowledger.ack_messages(successful_messages_to_ack, failed_messages) + Utility.maybe_log("Acked all messages", state) catch kind, reason -> Logger.error(Exception.format(kind, reason, __STACKTRACE__), diff --git a/lib/broadway/topology/producer_stage.ex b/lib/broadway/topology/producer_stage.ex index cb2cf83..8dab036 100644 --- a/lib/broadway/topology/producer_stage.ex +++ b/lib/broadway/topology/producer_stage.ex @@ -4,6 +4,7 @@ defmodule Broadway.Topology.ProducerStage do alias Broadway.Message alias Broadway.Topology.RateLimiter + alias Broadway.Utility require Logger @@ -19,12 +20,6 @@ defmodule Broadway.Topology.ProducerStage do @spec drain(GenServer.server()) :: :ok def drain(producer) do - # First we set the demand to accumulate. This is to avoid - # polling implementations from re-entering the polling loop - # once they flush any timers during draining. Push implementations - # will still empty out their queues as long as they put them - # in the GenStage buffer. - GenStage.demand(producer, :accumulate) GenStage.cast(producer, {__MODULE__, :prepare_for_draining}) GenStage.async_info(producer, {__MODULE__, :cancel_consumers}) end @@ -317,18 +312,48 @@ defmodule Broadway.Topology.ProducerStage do # No point in trying to emit messages if no messages are allowed. In that case, # we close the rate limiting and don't emit anything. allowed when allowed <= 0 -> + Utility.maybe_log("Rate limiting closed", state) {%{rate_limiting | state: :closed}, []} allowed -> - {allowed_left, probably_emittable, buffer} = dequeue_many(buffer, allowed, []) + {allowed_left, probably_emittable, buffer, next_message_weight} = + dequeue_many(buffer, allowed, []) + + log_map = %{ + initial_allowed: allowed, + allowed_left: allowed_left, + probably_emittable_count: length(probably_emittable), + buffer_length: :queue.len(buffer), + next_message_weight: next_message_weight + } + + Utility.maybe_log("dequeue_many result: #{inspect(log_map)}", state) + + # If nothing was emittable, but the buffer has messages in it, + # then we want to rate limit allowed_left. This will take the limit + # down to 0. + demand = + case {length(probably_emittable), :queue.len(buffer)} do + {0, buffer_length} when buffer_length > 0 -> allowed_left + _ -> allowed - allowed_left + end {rate_limiting_state, messages_to_emit, messages_to_buffer} = rate_limit_messages( rate_limiter, probably_emittable, - _probably_emittable_weight = allowed - allowed_left + demand, + state ) + rate_limiting_map = %{ + rate_limiting_state: rate_limiting_state, + emit_count: length(messages_to_emit), + to_buffer_count: length(messages_to_buffer) + } + + Utility.maybe_log("rate_limit_messages result: #{inspect(rate_limiting_map)}", state) + new_buffer = enqueue_batch_r(buffer, messages_to_buffer) rate_limiting = %{ @@ -338,6 +363,7 @@ defmodule Broadway.Topology.ProducerStage do } if draining? and :queue.is_empty(new_buffer) do + Utility.maybe_log("Cancelling consumers", state) cancel_consumers(state) end @@ -347,7 +373,7 @@ defmodule Broadway.Topology.ProducerStage do {%{state | rate_limiting: rate_limiting}, messages_to_emit} end - defp dequeue_many(queue, 0, acc), do: {0, Enum.reverse(acc), queue} + defp dequeue_many(queue, 0, acc), do: {0, Enum.reverse(acc), queue, 0} defp dequeue_many(queue, demand, acc) do case :queue.out(queue) do @@ -356,14 +382,14 @@ defmodule Broadway.Topology.ProducerStage do # requeue first message and ignore remaining demand since # the next message would put us over the allowed weight new_queue = :queue.in_r(message, queue) - {0, Enum.reverse(acc), new_queue} + {demand, Enum.reverse(acc), new_queue, message.weight} else new_demand = demand - message.weight dequeue_many(queue, new_demand, [message | acc]) end {:empty, queue} -> - {demand, Enum.reverse(acc), queue} + {demand, Enum.reverse(acc), queue, 0} end end @@ -379,12 +405,11 @@ defmodule Broadway.Topology.ProducerStage do :queue.join(:queue.from_list(list), queue) end - defp rate_limit_messages(_state, [], _count) do - {:open, [], []} - end + defp rate_limit_messages(rate_limiter, messages, demand, state) do + left = RateLimiter.rate_limit(rate_limiter, demand) + Utility.maybe_log("Remaining rate limit: #{left}", state) - defp rate_limit_messages(rate_limiter, messages, demand) do - case RateLimiter.rate_limit(rate_limiter, demand) do + case left do # If no more messages are allowed, we're rate limited but we're able # to emit all messages that we have. 0 -> diff --git a/lib/broadway/topology/rate_limiter.ex b/lib/broadway/topology/rate_limiter.ex index c155057..34ad1e9 100644 --- a/lib/broadway/topology/rate_limiter.ex +++ b/lib/broadway/topology/rate_limiter.ex @@ -22,7 +22,7 @@ defmodule Broadway.Topology.RateLimiter do end def rate_limit(counter, amount) - when is_reference(counter) and is_integer(amount) and amount > 0 do + when is_reference(counter) and is_integer(amount) and amount >= 0 do :atomics.sub_get(counter, @atomics_index, amount) end diff --git a/lib/broadway/utility.ex b/lib/broadway/utility.ex new file mode 100644 index 0000000..7351a44 --- /dev/null +++ b/lib/broadway/utility.ex @@ -0,0 +1,32 @@ +defmodule Broadway.Utility do + @moduledoc """ + This entire module is temporary while working on the rate limit bug in the + Broadway fork. + + Once we're comfortable with the fix, this module and all usages + of it should be removed. This includes removing a bunch of code around those + log statements to setup data for logging. + """ + require Logger + + # ABOMINATION: This helps us only log information about specific + # consumers by looking at the name of the queue. This leaks information + # about our app into our Broadway fork. + @queues [ + # "text.carrier.tmobile.tcr_campaign.1.sms.messages", + # "text.carrier.us_cellular.tcr_campaign.1.sms.messages", + "text.provider.bandwidth.sms.messages" + ] + + def maybe_log(message, %{context: %{config: %{queue: queue}}}) + when queue in @queues do + Logger.debug("Queue: #{queue} #{message}") + end + + def maybe_log(message, %{module_state: %{config: %{queue: queue}}}) + when queue in @queues do + Logger.debug("Queue: #{queue} #{message}") + end + + def maybe_log(_message, _state), do: :ok +end diff --git a/mix.exs b/mix.exs index bc4b355..e191e94 100644 --- a/mix.exs +++ b/mix.exs @@ -1,7 +1,7 @@ defmodule Broadway.MixProject do use Mix.Project - @version "1.0.7" + @version "1.1.0" @description "Build concurrent and multi-stage data ingestion and data processing pipelines" def project do @@ -32,8 +32,9 @@ defmodule Broadway.MixProject do {:telemetry, "~> 0.4.3 or ~> 1.0"}, # Dev/test dependencies. + {:castore, "~> 1.0", only: :test}, {:ex_doc, ">= 0.19.0", only: :docs}, - {:excoveralls, "~> 0.17.0", only: :test} + {:excoveralls, "~> 0.18.0", only: :test} ] end diff --git a/mix.lock b/mix.lock index 144be22..506a130 100644 --- a/mix.lock +++ b/mix.lock @@ -1,13 +1,14 @@ %{ - "earmark_parser": {:hex, :earmark_parser, "1.4.31", "a93921cdc6b9b869f519213d5bc79d9e218ba768d7270d46fdcf1c01bacff9e2", [:mix], [], "hexpm", "317d367ee0335ef037a87e46c91a2269fef6306413f731e8ec11fc45a7efd059"}, - "ex_doc": {:hex, :ex_doc, "0.29.4", "6257ecbb20c7396b1fe5accd55b7b0d23f44b6aa18017b415cb4c2b91d997729", [:mix], [{:earmark_parser, "~> 1.4.31", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "2c6699a737ae46cb61e4ed012af931b57b699643b24dabe2400a8168414bc4f5"}, - "excoveralls": {:hex, :excoveralls, "0.17.0", "279f124dba347903bb654bc40745c493ae265d45040001b4899ea1edf88078c7", [:mix], [{:castore, "~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "08b638d114387a888f9cb8d65f2a0021ec04c3e447b793efa7c1e734aba93004"}, - "gen_stage": {:hex, :gen_stage, "1.0.0", "51c8ae56ff54f9a2a604ca583798c210ad245f415115453b773b621c49776df5", [:mix], [], "hexpm", "1d9fc978db5305ac54e6f5fec7adf80cd893b1000cf78271564c516aa2af7706"}, + "castore": {:hex, :castore, "1.0.7", "b651241514e5f6956028147fe6637f7ac13802537e895a724f90bf3e36ddd1dd", [:mix], [], "hexpm", "da7785a4b0d2a021cd1292a60875a784b6caef71e76bf4917bdee1f390455cf5"}, + "earmark_parser": {:hex, :earmark_parser, "1.4.39", "424642f8335b05bb9eb611aa1564c148a8ee35c9c8a8bba6e129d51a3e3c6769", [:mix], [], "hexpm", "06553a88d1f1846da9ef066b87b57c6f605552cfbe40d20bd8d59cc6bde41944"}, + "ex_doc": {:hex, :ex_doc, "0.34.1", "9751a0419bc15bc7580c73fde506b17b07f6402a1e5243be9e0f05a68c723368", [:mix], [{:earmark_parser, "~> 1.4.39", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.0", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14 or ~> 1.0", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1 or ~> 1.0", [hex: :makeup_erlang, repo: "hexpm", optional: false]}, {:makeup_html, ">= 0.1.0", [hex: :makeup_html, repo: "hexpm", optional: true]}], "hexpm", "d441f1a86a235f59088978eff870de2e815e290e44a8bd976fe5d64470a4c9d2"}, + "excoveralls": {:hex, :excoveralls, "0.18.1", "a6f547570c6b24ec13f122a5634833a063aec49218f6fff27de9df693a15588c", [:mix], [{:castore, "~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "d65f79db146bb20399f23046015974de0079668b9abb2f5aac074d078da60b8d"}, + "gen_stage": {:hex, :gen_stage, "1.2.1", "19d8b5e9a5996d813b8245338a28246307fd8b9c99d1237de199d21efc4c76a1", [:mix], [], "hexpm", "83e8be657fa05b992ffa6ac1e3af6d57aa50aace8f691fcf696ff02f8335b001"}, "jason": {:hex, :jason, "1.4.1", "af1504e35f629ddcdd6addb3513c3853991f694921b1b9368b0bd32beb9f1b63", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fbb01ecdfd565b56261302f7e1fcc27c4fb8f32d56eab74db621fc154604a7a1"}, - "makeup": {:hex, :makeup, "1.1.0", "6b67c8bc2882a6b6a445859952a602afc1a41c2e08379ca057c0f525366fc3ca", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "0a45ed501f4a8897f580eabf99a2e5234ea3e75a4373c8a52824f6e873be57a6"}, - "makeup_elixir": {:hex, :makeup_elixir, "0.16.1", "cc9e3ca312f1cfeccc572b37a09980287e243648108384b97ff2b76e505c3555", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "e127a341ad1b209bd80f7bd1620a15693a9908ed780c3b763bccf7d200c767c6"}, - "makeup_erlang": {:hex, :makeup_erlang, "0.1.1", "3fcb7f09eb9d98dc4d208f49cc955a34218fc41ff6b84df7c75b3e6e533cc65f", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "174d0809e98a4ef0b3309256cbf97101c6ec01c4ab0b23e926a9e17df2077cbb"}, - "nimble_options": {:hex, :nimble_options, "0.5.2", "42703307b924880f8c08d97719da7472673391905f528259915782bb346e0a1b", [:mix], [], "hexpm", "4da7f904b915fd71db549bcdc25f8d56f378ef7ae07dc1d372cbe72ba950dce0"}, - "nimble_parsec": {:hex, :nimble_parsec, "1.3.0", "9e18a119d9efc3370a3ef2a937bf0b24c088d9c4bf0ba9d7c3751d49d347d035", [:mix], [], "hexpm", "7977f183127a7cbe9346981e2f480dc04c55ffddaef746bd58debd566070eef8"}, - "telemetry": {:hex, :telemetry, "0.4.3", "a06428a514bdbc63293cd9a6263aad00ddeb66f608163bdec7c8995784080818", [:rebar3], [], "hexpm", "eb72b8365ffda5bed68a620d1da88525e326cb82a75ee61354fc24b844768041"}, + "makeup": {:hex, :makeup, "1.1.2", "9ba8837913bdf757787e71c1581c21f9d2455f4dd04cfca785c70bbfff1a76a3", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "cce1566b81fbcbd21eca8ffe808f33b221f9eee2cbc7a1706fc3da9ff18e6cac"}, + "makeup_elixir": {:hex, :makeup_elixir, "0.16.2", "627e84b8e8bf22e60a2579dad15067c755531fea049ae26ef1020cad58fe9578", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "41193978704763f6bbe6cc2758b84909e62984c7752b3784bd3c218bb341706b"}, + "makeup_erlang": {:hex, :makeup_erlang, "1.0.0", "6f0eff9c9c489f26b69b61440bf1b238d95badae49adac77973cbacae87e3c2e", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "ea7a9307de9d1548d2a72d299058d1fd2339e3d398560a0e46c27dab4891e4d2"}, + "nimble_options": {:hex, :nimble_options, "1.1.1", "e3a492d54d85fc3fd7c5baf411d9d2852922f66e69476317787a7b2bb000a61b", [:mix], [], "hexpm", "821b2470ca9442c4b6984882fe9bb0389371b8ddec4d45a9504f00a66f650b44"}, + "nimble_parsec": {:hex, :nimble_parsec, "1.4.0", "51f9b613ea62cfa97b25ccc2c1b4216e81df970acd8e16e8d1bdc58fef21370d", [:mix], [], "hexpm", "9c565862810fb383e9838c1dd2d7d2c437b3d13b267414ba6af33e50d2d1cf28"}, + "telemetry": {:hex, :telemetry, "1.2.1", "68fdfe8d8f05a8428483a97d7aab2f268aaff24b49e0f599faa091f1d4e7f61c", [:rebar3], [], "hexpm", "dad9ce9d8effc621708f99eac538ef1cbe05d6a874dd741de2e689c47feafed5"}, }