From 3ca2da92d11cfde842be3b49cdb22f6d713c6e1c Mon Sep 17 00:00:00 2001 From: Elliot Jackson Date: Fri, 24 May 2024 09:27:32 +0100 Subject: [PATCH 01/13] add `off_broadway_memory` (#338) --- guides/examples/introduction.md | 1 + 1 file changed, 1 insertion(+) diff --git a/guides/examples/introduction.md b/guides/examples/introduction.md index 72c5abd..5a35086 100644 --- a/guides/examples/introduction.md +++ b/guides/examples/introduction.md @@ -32,6 +32,7 @@ The following Off-Broadway libraries are available (feel free to send a PR addin * [off_broadway_amqp10](https://github.com/highmobility/off_broadway_amqp10): [Guide](https://hexdocs.pm/off_broadway_amqp10/) * [off_broadway_kafka](https://github.com/bbalser/off_broadway_kafka): [Guide](https://hexdocs.pm/off_broadway_kafka/) + * [off_broadway_memory](https://github.com/elliotekj/off_broadway_memory): [Guide](https://hexdocs.pm/off_broadway_memory/) * [off_broadway_redis](https://github.com/amokan/off_broadway_redis): [Guide](https://hexdocs.pm/off_broadway_redis/) * [off_broadway_redis_stream](https://github.com/akash-akya/off_broadway_redis_stream): [Guide](https://hexdocs.pm/off_broadway_redis_stream/) * [off_broadway_splunk](https://github.com/Intility/off_broadway_splunk): [Guide](https://hexdocs.pm/off_broadway_splunk/) From 90defc8ff274345ccaeb19212bab47fd08be9210 Mon Sep 17 00:00:00 2001 From: Andrea Leopardi Date: Thu, 20 Jun 2024 01:23:28 +0900 Subject: [PATCH 02/13] Update Erlang/Elixir in CI (#339) --- .github/workflows/ci.yml | 21 +++++++++++---------- mix.exs | 3 ++- mix.lock | 19 ++++++++++--------- 3 files changed, 23 insertions(+), 20 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index fddde69..3c5faba 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -16,12 +16,13 @@ jobs: fail-fast: false matrix: include: - - pair: - elixir: 1.7.4 - otp: 21.3.8.17 - - pair: - elixir: 1.16.1 - otp: 26.2.2 + # Earliest-supported versions. + - elixir: "1.7.4" + otp: "21.3.8.17" + + # Latest versions. + - elixir: "1.17" + otp: "27.0" lint: lint coverage: coverage steps: @@ -31,8 +32,8 @@ jobs: - name: Set up Erlang and Elixir uses: erlef/setup-beam@v1 with: - otp-version: ${{matrix.pair.otp}} - elixir-version: ${{matrix.pair.elixir}} + otp-version: ${{matrix.otp}} + elixir-version: ${{matrix.elixir}} - name: Cache Mix dependencies uses: actions/cache@v3 @@ -42,9 +43,9 @@ jobs: deps _build key: | - mix-${{ runner.os }}-${{matrix.pair.elixir}}-${{matrix.pair.otp}}-${{ hashFiles('**/mix.lock') }} + mix-${{ runner.os }}-${{matrix.elixir}}-${{matrix.otp}}-${{ hashFiles('**/mix.lock') }} restore-keys: | - mix-${{ runner.os }}-${{matrix.pair.elixir}}-${{matrix.pair.otp}}- + mix-${{ runner.os }}-${{matrix.elixir}}-${{matrix.otp}}- - run: mix do deps.get --check-locked, deps.compile if: steps.cache-deps.outputs.cache-hit != 'true' diff --git a/mix.exs b/mix.exs index bc4b355..c3ecdb7 100644 --- a/mix.exs +++ b/mix.exs @@ -32,8 +32,9 @@ defmodule Broadway.MixProject do {:telemetry, "~> 0.4.3 or ~> 1.0"}, # Dev/test dependencies. + {:castore, "~> 1.0", only: :test}, {:ex_doc, ">= 0.19.0", only: :docs}, - {:excoveralls, "~> 0.17.0", only: :test} + {:excoveralls, "~> 0.18.0", only: :test} ] end diff --git a/mix.lock b/mix.lock index 144be22..e0416f0 100644 --- a/mix.lock +++ b/mix.lock @@ -1,13 +1,14 @@ %{ - "earmark_parser": {:hex, :earmark_parser, "1.4.31", "a93921cdc6b9b869f519213d5bc79d9e218ba768d7270d46fdcf1c01bacff9e2", [:mix], [], "hexpm", "317d367ee0335ef037a87e46c91a2269fef6306413f731e8ec11fc45a7efd059"}, - "ex_doc": {:hex, :ex_doc, "0.29.4", "6257ecbb20c7396b1fe5accd55b7b0d23f44b6aa18017b415cb4c2b91d997729", [:mix], [{:earmark_parser, "~> 1.4.31", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "2c6699a737ae46cb61e4ed012af931b57b699643b24dabe2400a8168414bc4f5"}, - "excoveralls": {:hex, :excoveralls, "0.17.0", "279f124dba347903bb654bc40745c493ae265d45040001b4899ea1edf88078c7", [:mix], [{:castore, "~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "08b638d114387a888f9cb8d65f2a0021ec04c3e447b793efa7c1e734aba93004"}, + "castore": {:hex, :castore, "1.0.7", "b651241514e5f6956028147fe6637f7ac13802537e895a724f90bf3e36ddd1dd", [:mix], [], "hexpm", "da7785a4b0d2a021cd1292a60875a784b6caef71e76bf4917bdee1f390455cf5"}, + "earmark_parser": {:hex, :earmark_parser, "1.4.39", "424642f8335b05bb9eb611aa1564c148a8ee35c9c8a8bba6e129d51a3e3c6769", [:mix], [], "hexpm", "06553a88d1f1846da9ef066b87b57c6f605552cfbe40d20bd8d59cc6bde41944"}, + "ex_doc": {:hex, :ex_doc, "0.34.0", "ab95e0775db3df71d30cf8d78728dd9261c355c81382bcd4cefdc74610bef13e", [:mix], [{:earmark_parser, "~> 1.4.39", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.0", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14 or ~> 1.0", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1 or ~> 1.0", [hex: :makeup_erlang, repo: "hexpm", optional: false]}, {:makeup_html, ">= 0.1.0", [hex: :makeup_html, repo: "hexpm", optional: true]}], "hexpm", "60734fb4c1353f270c3286df4a0d51e65a2c1d9fba66af3940847cc65a8066d7"}, + "excoveralls": {:hex, :excoveralls, "0.18.1", "a6f547570c6b24ec13f122a5634833a063aec49218f6fff27de9df693a15588c", [:mix], [{:castore, "~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "d65f79db146bb20399f23046015974de0079668b9abb2f5aac074d078da60b8d"}, "gen_stage": {:hex, :gen_stage, "1.0.0", "51c8ae56ff54f9a2a604ca583798c210ad245f415115453b773b621c49776df5", [:mix], [], "hexpm", "1d9fc978db5305ac54e6f5fec7adf80cd893b1000cf78271564c516aa2af7706"}, "jason": {:hex, :jason, "1.4.1", "af1504e35f629ddcdd6addb3513c3853991f694921b1b9368b0bd32beb9f1b63", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fbb01ecdfd565b56261302f7e1fcc27c4fb8f32d56eab74db621fc154604a7a1"}, - "makeup": {:hex, :makeup, "1.1.0", "6b67c8bc2882a6b6a445859952a602afc1a41c2e08379ca057c0f525366fc3ca", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "0a45ed501f4a8897f580eabf99a2e5234ea3e75a4373c8a52824f6e873be57a6"}, - "makeup_elixir": {:hex, :makeup_elixir, "0.16.1", "cc9e3ca312f1cfeccc572b37a09980287e243648108384b97ff2b76e505c3555", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "e127a341ad1b209bd80f7bd1620a15693a9908ed780c3b763bccf7d200c767c6"}, - "makeup_erlang": {:hex, :makeup_erlang, "0.1.1", "3fcb7f09eb9d98dc4d208f49cc955a34218fc41ff6b84df7c75b3e6e533cc65f", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "174d0809e98a4ef0b3309256cbf97101c6ec01c4ab0b23e926a9e17df2077cbb"}, - "nimble_options": {:hex, :nimble_options, "0.5.2", "42703307b924880f8c08d97719da7472673391905f528259915782bb346e0a1b", [:mix], [], "hexpm", "4da7f904b915fd71db549bcdc25f8d56f378ef7ae07dc1d372cbe72ba950dce0"}, - "nimble_parsec": {:hex, :nimble_parsec, "1.3.0", "9e18a119d9efc3370a3ef2a937bf0b24c088d9c4bf0ba9d7c3751d49d347d035", [:mix], [], "hexpm", "7977f183127a7cbe9346981e2f480dc04c55ffddaef746bd58debd566070eef8"}, - "telemetry": {:hex, :telemetry, "0.4.3", "a06428a514bdbc63293cd9a6263aad00ddeb66f608163bdec7c8995784080818", [:rebar3], [], "hexpm", "eb72b8365ffda5bed68a620d1da88525e326cb82a75ee61354fc24b844768041"}, + "makeup": {:hex, :makeup, "1.1.2", "9ba8837913bdf757787e71c1581c21f9d2455f4dd04cfca785c70bbfff1a76a3", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "cce1566b81fbcbd21eca8ffe808f33b221f9eee2cbc7a1706fc3da9ff18e6cac"}, + "makeup_elixir": {:hex, :makeup_elixir, "0.16.2", "627e84b8e8bf22e60a2579dad15067c755531fea049ae26ef1020cad58fe9578", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "41193978704763f6bbe6cc2758b84909e62984c7752b3784bd3c218bb341706b"}, + "makeup_erlang": {:hex, :makeup_erlang, "1.0.0", "6f0eff9c9c489f26b69b61440bf1b238d95badae49adac77973cbacae87e3c2e", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "ea7a9307de9d1548d2a72d299058d1fd2339e3d398560a0e46c27dab4891e4d2"}, + "nimble_options": {:hex, :nimble_options, "1.1.1", "e3a492d54d85fc3fd7c5baf411d9d2852922f66e69476317787a7b2bb000a61b", [:mix], [], "hexpm", "821b2470ca9442c4b6984882fe9bb0389371b8ddec4d45a9504f00a66f650b44"}, + "nimble_parsec": {:hex, :nimble_parsec, "1.4.0", "51f9b613ea62cfa97b25ccc2c1b4216e81df970acd8e16e8d1bdc58fef21370d", [:mix], [], "hexpm", "9c565862810fb383e9838c1dd2d7d2c437b3d13b267414ba6af33e50d2d1cf28"}, + "telemetry": {:hex, :telemetry, "1.2.1", "68fdfe8d8f05a8428483a97d7aab2f268aaff24b49e0f599faa091f1d4e7f61c", [:rebar3], [], "hexpm", "dad9ce9d8effc621708f99eac538ef1cbe05d6a874dd741de2e689c47feafed5"}, } From 83a564bc7d61968cc8e7f4c1546da770bbb6891e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Valim?= Date: Fri, 21 Jun 2024 11:57:04 +0200 Subject: [PATCH 03/13] Support GenStage v1.2.0 --- lib/broadway/topology/producer_stage.ex | 6 ------ mix.lock | 2 +- 2 files changed, 1 insertion(+), 7 deletions(-) diff --git a/lib/broadway/topology/producer_stage.ex b/lib/broadway/topology/producer_stage.ex index 31ba059..d03f1f6 100644 --- a/lib/broadway/topology/producer_stage.ex +++ b/lib/broadway/topology/producer_stage.ex @@ -17,12 +17,6 @@ defmodule Broadway.Topology.ProducerStage do @spec drain(GenServer.server()) :: :ok def drain(producer) do - # First we set the demand to accumulate. This is to avoid - # polling implementations from re-entering the polling loop - # once they flush any timers during draining. Push implementations - # will still empty out their queues as long as they put them - # in the GenStage buffer. - GenStage.demand(producer, :accumulate) GenStage.cast(producer, {__MODULE__, :prepare_for_draining}) GenStage.async_info(producer, {__MODULE__, :cancel_consumers}) end diff --git a/mix.lock b/mix.lock index e0416f0..7035c82 100644 --- a/mix.lock +++ b/mix.lock @@ -3,7 +3,7 @@ "earmark_parser": {:hex, :earmark_parser, "1.4.39", "424642f8335b05bb9eb611aa1564c148a8ee35c9c8a8bba6e129d51a3e3c6769", [:mix], [], "hexpm", "06553a88d1f1846da9ef066b87b57c6f605552cfbe40d20bd8d59cc6bde41944"}, "ex_doc": {:hex, :ex_doc, "0.34.0", "ab95e0775db3df71d30cf8d78728dd9261c355c81382bcd4cefdc74610bef13e", [:mix], [{:earmark_parser, "~> 1.4.39", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.0", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14 or ~> 1.0", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1 or ~> 1.0", [hex: :makeup_erlang, repo: "hexpm", optional: false]}, {:makeup_html, ">= 0.1.0", [hex: :makeup_html, repo: "hexpm", optional: true]}], "hexpm", "60734fb4c1353f270c3286df4a0d51e65a2c1d9fba66af3940847cc65a8066d7"}, "excoveralls": {:hex, :excoveralls, "0.18.1", "a6f547570c6b24ec13f122a5634833a063aec49218f6fff27de9df693a15588c", [:mix], [{:castore, "~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "d65f79db146bb20399f23046015974de0079668b9abb2f5aac074d078da60b8d"}, - "gen_stage": {:hex, :gen_stage, "1.0.0", "51c8ae56ff54f9a2a604ca583798c210ad245f415115453b773b621c49776df5", [:mix], [], "hexpm", "1d9fc978db5305ac54e6f5fec7adf80cd893b1000cf78271564c516aa2af7706"}, + "gen_stage": {:hex, :gen_stage, "1.2.1", "19d8b5e9a5996d813b8245338a28246307fd8b9c99d1237de199d21efc4c76a1", [:mix], [], "hexpm", "83e8be657fa05b992ffa6ac1e3af6d57aa50aace8f691fcf696ff02f8335b001"}, "jason": {:hex, :jason, "1.4.1", "af1504e35f629ddcdd6addb3513c3853991f694921b1b9368b0bd32beb9f1b63", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fbb01ecdfd565b56261302f7e1fcc27c4fb8f32d56eab74db621fc154604a7a1"}, "makeup": {:hex, :makeup, "1.1.2", "9ba8837913bdf757787e71c1581c21f9d2455f4dd04cfca785c70bbfff1a76a3", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "cce1566b81fbcbd21eca8ffe808f33b221f9eee2cbc7a1706fc3da9ff18e6cac"}, "makeup_elixir": {:hex, :makeup_elixir, "0.16.2", "627e84b8e8bf22e60a2579dad15067c755531fea049ae26ef1020cad58fe9578", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "41193978704763f6bbe6cc2758b84909e62984c7752b3784bd3c218bb341706b"}, From 3698e41171fe61d92819a65eb3ffbe6189ae9fe7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Valim?= Date: Fri, 21 Jun 2024 12:37:14 +0200 Subject: [PATCH 04/13] Release v1.1.0 --- CHANGELOG.md | 10 ++++++++++ mix.exs | 2 +- mix.lock | 2 +- 3 files changed, 12 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 05ba305..5ae4ac2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,15 @@ # Changelog +## v1.1.0 (2024-06-21) + +### Bug fix + + * No longer set demand to `:accumulate` when draining, for compatibility with GenStage v1.2+. This means that any polling implementation must implement the `prepare_for_draining` callback and stop polling messages. You can check how [BroadwaySQS](https://github.com/dashbitco/broadway_sqs/commit/5b8f18a78e4760b5fcc839ad576be8c63345add0) tackles this problem as an example + +### Enhancements + + * Log leaked trapped exits + ## v1.0.7 (2023-04-22) ### Enhancements diff --git a/mix.exs b/mix.exs index c3ecdb7..e191e94 100644 --- a/mix.exs +++ b/mix.exs @@ -1,7 +1,7 @@ defmodule Broadway.MixProject do use Mix.Project - @version "1.0.7" + @version "1.1.0" @description "Build concurrent and multi-stage data ingestion and data processing pipelines" def project do diff --git a/mix.lock b/mix.lock index 7035c82..506a130 100644 --- a/mix.lock +++ b/mix.lock @@ -1,7 +1,7 @@ %{ "castore": {:hex, :castore, "1.0.7", "b651241514e5f6956028147fe6637f7ac13802537e895a724f90bf3e36ddd1dd", [:mix], [], "hexpm", "da7785a4b0d2a021cd1292a60875a784b6caef71e76bf4917bdee1f390455cf5"}, "earmark_parser": {:hex, :earmark_parser, "1.4.39", "424642f8335b05bb9eb611aa1564c148a8ee35c9c8a8bba6e129d51a3e3c6769", [:mix], [], "hexpm", "06553a88d1f1846da9ef066b87b57c6f605552cfbe40d20bd8d59cc6bde41944"}, - "ex_doc": {:hex, :ex_doc, "0.34.0", "ab95e0775db3df71d30cf8d78728dd9261c355c81382bcd4cefdc74610bef13e", [:mix], [{:earmark_parser, "~> 1.4.39", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.0", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14 or ~> 1.0", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1 or ~> 1.0", [hex: :makeup_erlang, repo: "hexpm", optional: false]}, {:makeup_html, ">= 0.1.0", [hex: :makeup_html, repo: "hexpm", optional: true]}], "hexpm", "60734fb4c1353f270c3286df4a0d51e65a2c1d9fba66af3940847cc65a8066d7"}, + "ex_doc": {:hex, :ex_doc, "0.34.1", "9751a0419bc15bc7580c73fde506b17b07f6402a1e5243be9e0f05a68c723368", [:mix], [{:earmark_parser, "~> 1.4.39", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.0", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14 or ~> 1.0", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1 or ~> 1.0", [hex: :makeup_erlang, repo: "hexpm", optional: false]}, {:makeup_html, ">= 0.1.0", [hex: :makeup_html, repo: "hexpm", optional: true]}], "hexpm", "d441f1a86a235f59088978eff870de2e815e290e44a8bd976fe5d64470a4c9d2"}, "excoveralls": {:hex, :excoveralls, "0.18.1", "a6f547570c6b24ec13f122a5634833a063aec49218f6fff27de9df693a15588c", [:mix], [{:castore, "~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "d65f79db146bb20399f23046015974de0079668b9abb2f5aac074d078da60b8d"}, "gen_stage": {:hex, :gen_stage, "1.2.1", "19d8b5e9a5996d813b8245338a28246307fd8b9c99d1237de199d21efc4c76a1", [:mix], [], "hexpm", "83e8be657fa05b992ffa6ac1e3af6d57aa50aace8f691fcf696ff02f8335b001"}, "jason": {:hex, :jason, "1.4.1", "af1504e35f629ddcdd6addb3513c3853991f694921b1b9368b0bd32beb9f1b63", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fbb01ecdfd565b56261302f7e1fcc27c4fb8f32d56eab74db621fc154604a7a1"}, From 1eaa4ce6e3581386c949d6aa8427cd7dc1d4497d Mon Sep 17 00:00:00 2001 From: Brian Meeker Date: Tue, 15 Oct 2024 16:25:56 -0400 Subject: [PATCH 05/13] ENG-2031: Adding logging and dbg statements for debugging This has helped narrow down the problem. Logging confirms that the processor does process all messages. My current hunch is that rate limiting is getting close incorrectly in some cases when we still have messages. Those messages eventually timeout and the channel gets closed. The point against this is that I don't think it matches what I'm seeing in traces in Datadog. It could be that I'm misinterpreting the traces though. --- lib/broadway/topology/processor_stage.ex | 13 ++++++++++++- lib/broadway/topology/producer_stage.ex | 14 ++++++++++++++ lib/broadway/utility.ex | 13 +++++++++++++ 3 files changed, 39 insertions(+), 1 deletion(-) create mode 100644 lib/broadway/utility.ex diff --git a/lib/broadway/topology/processor_stage.ex b/lib/broadway/topology/processor_stage.ex index e0e1014..2a32b8f 100644 --- a/lib/broadway/topology/processor_stage.ex +++ b/lib/broadway/topology/processor_stage.ex @@ -3,7 +3,7 @@ defmodule Broadway.Topology.ProcessorStage do use GenStage require Logger - alias Broadway.{Message, Acknowledger} + alias Broadway.{Message, Acknowledger, Utility} @spec start_link(term, GenServer.options()) :: GenServer.on_start() def start_link(args, stage_options) do @@ -70,6 +70,7 @@ defmodule Broadway.Topology.ProcessorStage do producer: state.producer }, fn -> + Utility.maybe_log("Preparing to process #{length(messages)} message(s)", state) {prepared_messages, prepared_failed_messages} = maybe_prepare_messages(messages, state) {successful_messages, failed_messages} = handle_messages(prepared_messages, [], [], state) failed_messages = prepared_failed_messages ++ failed_messages @@ -83,6 +84,15 @@ defmodule Broadway.Topology.ProcessorStage do {successful_messages, []} end + Utility.maybe_log( + inspect(%{ + to_ack_count: length(successful_messages_to_ack), + to_forward_count: length(successful_messages_to_forward), + failed_count: length(failed_messages) + }), + state + ) + failed_messages = Acknowledger.maybe_handle_failed_messages( failed_messages, @@ -92,6 +102,7 @@ defmodule Broadway.Topology.ProcessorStage do try do Acknowledger.ack_messages(successful_messages_to_ack, failed_messages) + Utility.maybe_log("Acked all messages", state) catch kind, reason -> Logger.error(Exception.format(kind, reason, __STACKTRACE__), diff --git a/lib/broadway/topology/producer_stage.ex b/lib/broadway/topology/producer_stage.ex index 83221cf..4841bae 100644 --- a/lib/broadway/topology/producer_stage.ex +++ b/lib/broadway/topology/producer_stage.ex @@ -4,6 +4,7 @@ defmodule Broadway.Topology.ProducerStage do alias Broadway.Message alias Broadway.Topology.RateLimiter + alias Broadway.Utility require Logger @@ -121,6 +122,7 @@ defmodule Broadway.Topology.ProducerStage do end def handle_call(message, from, state) do + dbg({message, from, state}) %{module: module, module_state: module_state} = state message @@ -305,6 +307,7 @@ defmodule Broadway.Topology.ProducerStage do defp rate_limit_and_buffer_messages(%{rate_limiting: rate_limiting} = state) do %{message_buffer: buffer, rate_limiter: rate_limiter, draining?: draining?} = rate_limiting + dbg(state) {rate_limiting, messages_to_emit} = case RateLimiter.get_currently_allowed(rate_limiter) do @@ -341,11 +344,22 @@ defmodule Broadway.Topology.ProducerStage do {%{state | rate_limiting: rate_limiting}, messages_to_emit} end + # queue is an Erlang queue + # demand is how many messages rate limiting is allowing + # acc is the messages pulled off the queue + # + # returns {allowed_left, probably_emittable, buffer} + # allowed_left is how much of the demand remands + # probably_emittable is the messages pulled off the queue that we can probably process + # buffer is the remaining queue defp dequeue_many(queue, 0, acc), do: {0, Enum.reverse(acc), queue} defp dequeue_many(queue, demand, acc) do + # Take the first message off the queue case :queue.out(queue) do + # The queue had at least one message {{:value, message}, queue} -> + # There isn't enough rate limit left for this message if message.weight > demand do # requeue first message and ignore remaining demand since # the next message would put us over the allowed weight diff --git a/lib/broadway/utility.ex b/lib/broadway/utility.ex new file mode 100644 index 0000000..72c00b8 --- /dev/null +++ b/lib/broadway/utility.ex @@ -0,0 +1,13 @@ +defmodule Broadway.Utility do + require Logger + + def maybe_log(message, %{context: %{config: %{queue: queue}}}) + when queue in [ + "text.carrier.tmobile.tcr_campaign.1.sms.messages", + "text.provider.bandwidth.sms.messages" + ] do + Logger.info("Queue: #{queue} #{message}") + end + + def maybe_log(_message, _state), do: :ok +end From ff534c0c022414e41fbb00eddf34e680adc53223 Mon Sep 17 00:00:00 2001 From: Brian Meeker Date: Tue, 15 Oct 2024 16:57:42 -0400 Subject: [PATCH 06/13] ENG-2031: Attempting to add more logging to the producer stage This does not work as expected yet. I'm not seeing any logs. I probably have something wrong in the pattern match. The shape of the data is different in the producer stage. --- lib/broadway/topology/producer_stage.ex | 3 ++- lib/broadway/utility.ex | 15 +++++++++++---- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/lib/broadway/topology/producer_stage.ex b/lib/broadway/topology/producer_stage.ex index 4841bae..55d04c1 100644 --- a/lib/broadway/topology/producer_stage.ex +++ b/lib/broadway/topology/producer_stage.ex @@ -307,13 +307,14 @@ defmodule Broadway.Topology.ProducerStage do defp rate_limit_and_buffer_messages(%{rate_limiting: rate_limiting} = state) do %{message_buffer: buffer, rate_limiter: rate_limiter, draining?: draining?} = rate_limiting - dbg(state) + # dbg(state) {rate_limiting, messages_to_emit} = case RateLimiter.get_currently_allowed(rate_limiter) do # No point in trying to emit messages if no messages are allowed. In that case, # we close the rate limiting and don't emit anything. allowed when allowed <= 0 -> + Utility.maybe_log("Rate limiting closed", state) {%{rate_limiting | state: :closed}, []} allowed -> diff --git a/lib/broadway/utility.ex b/lib/broadway/utility.ex index 72c00b8..806bae2 100644 --- a/lib/broadway/utility.ex +++ b/lib/broadway/utility.ex @@ -1,11 +1,18 @@ defmodule Broadway.Utility do require Logger + @queues [ + "text.carrier.tmobile.tcr_campaign.1.sms.messages", + "text.provider.bandwidth.sms.messages" + ] + def maybe_log(message, %{context: %{config: %{queue: queue}}}) - when queue in [ - "text.carrier.tmobile.tcr_campaign.1.sms.messages", - "text.provider.bandwidth.sms.messages" - ] do + when queue in @queues do + Logger.info("Queue: #{queue} #{message}") + end + + def maybe_log(message, %{module_state: %{config: %{queue: queue}}}) + when queue in @queues do Logger.info("Queue: #{queue} #{message}") end From 5304c57df06b2fafb807c0ab731521970c741e10 Mon Sep 17 00:00:00 2001 From: Brian Meeker Date: Wed, 16 Oct 2024 10:55:38 -0400 Subject: [PATCH 07/13] ENG-2031: Add more logging to the producer stage I can now see that only one message is being pulled off each time rate_limit_and_buffer_messages is called. That's surprising. The buffer length is only 1 in the scenarios I'm debugging. It does emit 10 messages when the buffer length is longer though. --- lib/broadway/topology/producer_stage.ex | 23 ++++++++++++++++++++--- lib/broadway/utility.ex | 4 ++-- 2 files changed, 22 insertions(+), 5 deletions(-) diff --git a/lib/broadway/topology/producer_stage.ex b/lib/broadway/topology/producer_stage.ex index 55d04c1..da6be61 100644 --- a/lib/broadway/topology/producer_stage.ex +++ b/lib/broadway/topology/producer_stage.ex @@ -122,7 +122,6 @@ defmodule Broadway.Topology.ProducerStage do end def handle_call(message, from, state) do - dbg({message, from, state}) %{module: module, module_state: module_state} = state message @@ -307,7 +306,6 @@ defmodule Broadway.Topology.ProducerStage do defp rate_limit_and_buffer_messages(%{rate_limiting: rate_limiting} = state) do %{message_buffer: buffer, rate_limiter: rate_limiter, draining?: draining?} = rate_limiting - # dbg(state) {rate_limiting, messages_to_emit} = case RateLimiter.get_currently_allowed(rate_limiter) do @@ -320,6 +318,17 @@ defmodule Broadway.Topology.ProducerStage do allowed -> {allowed_left, probably_emittable, buffer} = dequeue_many(buffer, allowed, []) + log_map = %{ + initial_allowed: allowed, + allowed_left: allowed_left, + probably_emittable_count: length(probably_emittable), + buffer_length: :queue.len(buffer) + } + + Utility.maybe_log("dequeue_many result: #{inspect(log_map)}", state) + + # This seems suspicious. Are we confirming that the rate limit is still valid? + # That nothing has changed? {rate_limiting_state, messages_to_emit, messages_to_buffer} = rate_limit_messages( rate_limiter, @@ -327,6 +336,14 @@ defmodule Broadway.Topology.ProducerStage do _probably_emittable_weight = allowed - allowed_left ) + rate_limiting_map = %{ + rate_limiting_stage: rate_limiting_state, + emit_count: length(messages_to_emit), + to_buffer_count: length(messages_to_buffer) + } + + Utility.maybe_log("rate_limit_messages result: #{inspect(rate_limiting_map)}", state) + new_buffer = enqueue_batch_r(buffer, messages_to_buffer) rate_limiting = %{ @@ -346,7 +363,7 @@ defmodule Broadway.Topology.ProducerStage do end # queue is an Erlang queue - # demand is how many messages rate limiting is allowing + # demand is how many segments rate limiting is allowing # acc is the messages pulled off the queue # # returns {allowed_left, probably_emittable, buffer} diff --git a/lib/broadway/utility.ex b/lib/broadway/utility.ex index 806bae2..0882dc5 100644 --- a/lib/broadway/utility.ex +++ b/lib/broadway/utility.ex @@ -2,8 +2,8 @@ defmodule Broadway.Utility do require Logger @queues [ - "text.carrier.tmobile.tcr_campaign.1.sms.messages", - "text.provider.bandwidth.sms.messages" + "text.carrier.tmobile.tcr_campaign.1.sms.messages" + # "text.provider.bandwidth.sms.messages" ] def maybe_log(message, %{context: %{config: %{queue: queue}}}) From 1e9dc83a3ca2288141c5901d83f4e6748f960f13 Mon Sep 17 00:00:00 2001 From: Brian Meeker Date: Wed, 16 Oct 2024 11:01:20 -0400 Subject: [PATCH 08/13] ENG-2031: Typo fix in new logging --- lib/broadway/topology/producer_stage.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/broadway/topology/producer_stage.ex b/lib/broadway/topology/producer_stage.ex index da6be61..d8d35ee 100644 --- a/lib/broadway/topology/producer_stage.ex +++ b/lib/broadway/topology/producer_stage.ex @@ -337,7 +337,7 @@ defmodule Broadway.Topology.ProducerStage do ) rate_limiting_map = %{ - rate_limiting_stage: rate_limiting_state, + rate_limiting_state: rate_limiting_state, emit_count: length(messages_to_emit), to_buffer_count: length(messages_to_buffer) } From a5b9576d06017ebac358a77770efad08fb5f82c3 Mon Sep 17 00:00:00 2001 From: Brian Meeker Date: Wed, 16 Oct 2024 13:04:13 -0400 Subject: [PATCH 09/13] ENG-2031: WIP to try and get rate limiting to close correctly --- lib/broadway/topology/producer_stage.ex | 48 +++++++++++++++++-------- 1 file changed, 33 insertions(+), 15 deletions(-) diff --git a/lib/broadway/topology/producer_stage.ex b/lib/broadway/topology/producer_stage.ex index d8d35ee..c0d932f 100644 --- a/lib/broadway/topology/producer_stage.ex +++ b/lib/broadway/topology/producer_stage.ex @@ -316,13 +316,15 @@ defmodule Broadway.Topology.ProducerStage do {%{rate_limiting | state: :closed}, []} allowed -> - {allowed_left, probably_emittable, buffer} = dequeue_many(buffer, allowed, []) + {allowed_left, probably_emittable, buffer, next_message_weight} = + dequeue_many(buffer, allowed, []) log_map = %{ initial_allowed: allowed, allowed_left: allowed_left, probably_emittable_count: length(probably_emittable), - buffer_length: :queue.len(buffer) + buffer_length: :queue.len(buffer), + next_message_weight: next_message_weight } Utility.maybe_log("dequeue_many result: #{inspect(log_map)}", state) @@ -333,7 +335,9 @@ defmodule Broadway.Topology.ProducerStage do rate_limit_messages( rate_limiter, probably_emittable, - _probably_emittable_weight = allowed - allowed_left + _probably_emittable_weight = allowed - allowed_left, + next_message_weight, + state ) rate_limiting_map = %{ @@ -353,6 +357,7 @@ defmodule Broadway.Topology.ProducerStage do } if draining? and :queue.is_empty(new_buffer) do + Utility.maybe_log("Cancelling consumers", state) cancel_consumers(state) end @@ -370,7 +375,7 @@ defmodule Broadway.Topology.ProducerStage do # allowed_left is how much of the demand remands # probably_emittable is the messages pulled off the queue that we can probably process # buffer is the remaining queue - defp dequeue_many(queue, 0, acc), do: {0, Enum.reverse(acc), queue} + defp dequeue_many(queue, 0, acc), do: {0, Enum.reverse(acc), queue, 0} defp dequeue_many(queue, demand, acc) do # Take the first message off the queue @@ -382,14 +387,16 @@ defmodule Broadway.Topology.ProducerStage do # requeue first message and ignore remaining demand since # the next message would put us over the allowed weight new_queue = :queue.in_r(message, queue) - {0, Enum.reverse(acc), new_queue} + # TODO: Should the first element be demand instead of 0? + # Changed it for now. + {demand, Enum.reverse(acc), new_queue, message.weight} else new_demand = demand - message.weight dequeue_many(queue, new_demand, [message | acc]) end {:empty, queue} -> - {demand, Enum.reverse(acc), queue} + {demand, Enum.reverse(acc), queue, 0} end end @@ -405,30 +412,41 @@ defmodule Broadway.Topology.ProducerStage do :queue.join(:queue.from_list(list), queue) end - defp rate_limit_messages(_state, [], _count) do - {:open, [], []} - end + # defp rate_limit_messages(_rate_limiter, [], _count, _next_message_weight, state) do + # Utility.maybe_log("No messages to rate limit", state) + # {:open, [], []} + # end + + defp rate_limit_messages(rate_limiter, messages, demand, next_message_weight, state) do + left = RateLimiter.rate_limit(rate_limiter, demand) + Utility.maybe_log("Remaining rate limit: #{left}", state) + + cond do + messages == [] -> + {:open, [], []} - defp rate_limit_messages(rate_limiter, messages, demand) do - case RateLimiter.rate_limit(rate_limiter, demand) do # If no more messages are allowed, we're rate limited but we're able # to emit all messages that we have. - 0 -> + left == 0 -> + {:closed, messages, _to_buffer = []} + + # TODO: Does this need changes to handle the negative case below? + left < next_message_weight and left >= 0 -> {:closed, messages, _to_buffer = []} # We were able to emit all messages and still more messages are allowed, # so the rate limiting is "open". - left when left > 0 -> + left > next_message_weight -> {:open, messages, _to_buffer = []} # We went over the rate limit, so we remove messages from the # back of the list of those we were able to emit until the # overflow is corrected and close the rate limiting. - overflow when overflow < 0 -> + left < 0 -> reversed = Enum.reverse(messages) {emittable, to_buffer, _overflow} = - Enum.reduce_while(reversed, {reversed, [], overflow}, fn + Enum.reduce_while(reversed, {reversed, [], left}, fn message, {emittable, to_buffer, overflow} -> if overflow >= 0 do {:halt, {Enum.reverse(emittable), to_buffer, overflow}} From 0a525bd2d36b9193e87d4c66b7b509685298aad8 Mon Sep 17 00:00:00 2001 From: Brian Meeker Date: Wed, 16 Oct 2024 14:10:53 -0400 Subject: [PATCH 10/13] ENG-2031: Fixed the bug! This is an ugly fix that I'm going to try and simplify. The blast radius for these changes is larger than necessary, so I want to try a simpler fix. --- lib/broadway/topology/producer_stage.ex | 5 ++++- lib/broadway/topology/rate_limiter.ex | 4 ++++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/lib/broadway/topology/producer_stage.ex b/lib/broadway/topology/producer_stage.ex index c0d932f..fc23f3b 100644 --- a/lib/broadway/topology/producer_stage.ex +++ b/lib/broadway/topology/producer_stage.ex @@ -422,9 +422,12 @@ defmodule Broadway.Topology.ProducerStage do Utility.maybe_log("Remaining rate limit: #{left}", state) cond do - messages == [] -> + messages == [] and next_message_weight == 0 -> {:open, [], []} + messages == [] -> + {:closed, [], []} + # If no more messages are allowed, we're rate limited but we're able # to emit all messages that we have. left == 0 -> diff --git a/lib/broadway/topology/rate_limiter.ex b/lib/broadway/topology/rate_limiter.ex index c155057..c92e736 100644 --- a/lib/broadway/topology/rate_limiter.ex +++ b/lib/broadway/topology/rate_limiter.ex @@ -21,6 +21,10 @@ defmodule Broadway.Topology.RateLimiter do end end + def rate_limit(counter, 0) when is_reference(counter) do + get_currently_allowed(counter) + end + def rate_limit(counter, amount) when is_reference(counter) and is_integer(amount) and amount > 0 do :atomics.sub_get(counter, @atomics_index, amount) From 499a0a62870e405203a4f56d643cc705aee9a452 Mon Sep 17 00:00:00 2001 From: Brian Meeker Date: Wed, 16 Oct 2024 14:48:17 -0400 Subject: [PATCH 11/13] ENG-2031: Simplified bug fix When the next message is too large to send, this version attempts to set subtract the remaining demand from the rate limit to set it to 0. This triggers the rate limit to close correctly. I'm still seeing the Bandwidth SMS queue stuck locally, but the T-Mobile queue is flowing correctly. --- lib/broadway/topology/producer_stage.ex | 56 +++++++++---------------- lib/broadway/topology/rate_limiter.ex | 4 -- lib/broadway/utility.ex | 5 ++- 3 files changed, 22 insertions(+), 43 deletions(-) diff --git a/lib/broadway/topology/producer_stage.ex b/lib/broadway/topology/producer_stage.ex index fc23f3b..18c818b 100644 --- a/lib/broadway/topology/producer_stage.ex +++ b/lib/broadway/topology/producer_stage.ex @@ -329,14 +329,20 @@ defmodule Broadway.Topology.ProducerStage do Utility.maybe_log("dequeue_many result: #{inspect(log_map)}", state) - # This seems suspicious. Are we confirming that the rate limit is still valid? - # That nothing has changed? + # If nothing was emittable, but the buffer has messages in it, + # then we want to rate limit allowed_left. This will take the limit + # down to 0. + demand = + case {length(probably_emittable), :queue.len(buffer)} do + {0, buffer_length} when buffer_length > 0 -> allowed_left + _ -> allowed - allowed_left + end + {rate_limiting_state, messages_to_emit, messages_to_buffer} = rate_limit_messages( rate_limiter, probably_emittable, - _probably_emittable_weight = allowed - allowed_left, - next_message_weight, + demand, state ) @@ -367,28 +373,15 @@ defmodule Broadway.Topology.ProducerStage do {%{state | rate_limiting: rate_limiting}, messages_to_emit} end - # queue is an Erlang queue - # demand is how many segments rate limiting is allowing - # acc is the messages pulled off the queue - # - # returns {allowed_left, probably_emittable, buffer} - # allowed_left is how much of the demand remands - # probably_emittable is the messages pulled off the queue that we can probably process - # buffer is the remaining queue defp dequeue_many(queue, 0, acc), do: {0, Enum.reverse(acc), queue, 0} defp dequeue_many(queue, demand, acc) do - # Take the first message off the queue case :queue.out(queue) do - # The queue had at least one message {{:value, message}, queue} -> - # There isn't enough rate limit left for this message if message.weight > demand do # requeue first message and ignore remaining demand since # the next message would put us over the allowed weight new_queue = :queue.in_r(message, queue) - # TODO: Should the first element be demand instead of 0? - # Changed it for now. {demand, Enum.reverse(acc), new_queue, message.weight} else new_demand = demand - message.weight @@ -412,44 +405,33 @@ defmodule Broadway.Topology.ProducerStage do :queue.join(:queue.from_list(list), queue) end - # defp rate_limit_messages(_rate_limiter, [], _count, _next_message_weight, state) do - # Utility.maybe_log("No messages to rate limit", state) - # {:open, [], []} - # end + defp rate_limit_messages(_rate_limiter, [], _count, _state) do + {:open, [], []} + end - defp rate_limit_messages(rate_limiter, messages, demand, next_message_weight, state) do + defp rate_limit_messages(rate_limiter, messages, demand, state) do left = RateLimiter.rate_limit(rate_limiter, demand) Utility.maybe_log("Remaining rate limit: #{left}", state) - cond do - messages == [] and next_message_weight == 0 -> - {:open, [], []} - - messages == [] -> - {:closed, [], []} - + case left do # If no more messages are allowed, we're rate limited but we're able # to emit all messages that we have. - left == 0 -> - {:closed, messages, _to_buffer = []} - - # TODO: Does this need changes to handle the negative case below? - left < next_message_weight and left >= 0 -> + 0 -> {:closed, messages, _to_buffer = []} # We were able to emit all messages and still more messages are allowed, # so the rate limiting is "open". - left > next_message_weight -> + left when left > 0 -> {:open, messages, _to_buffer = []} # We went over the rate limit, so we remove messages from the # back of the list of those we were able to emit until the # overflow is corrected and close the rate limiting. - left < 0 -> + overflow when overflow < 0 -> reversed = Enum.reverse(messages) {emittable, to_buffer, _overflow} = - Enum.reduce_while(reversed, {reversed, [], left}, fn + Enum.reduce_while(reversed, {reversed, [], overflow}, fn message, {emittable, to_buffer, overflow} -> if overflow >= 0 do {:halt, {Enum.reverse(emittable), to_buffer, overflow}} diff --git a/lib/broadway/topology/rate_limiter.ex b/lib/broadway/topology/rate_limiter.ex index c92e736..c155057 100644 --- a/lib/broadway/topology/rate_limiter.ex +++ b/lib/broadway/topology/rate_limiter.ex @@ -21,10 +21,6 @@ defmodule Broadway.Topology.RateLimiter do end end - def rate_limit(counter, 0) when is_reference(counter) do - get_currently_allowed(counter) - end - def rate_limit(counter, amount) when is_reference(counter) and is_integer(amount) and amount > 0 do :atomics.sub_get(counter, @atomics_index, amount) diff --git a/lib/broadway/utility.ex b/lib/broadway/utility.ex index 0882dc5..6b69559 100644 --- a/lib/broadway/utility.ex +++ b/lib/broadway/utility.ex @@ -2,8 +2,9 @@ defmodule Broadway.Utility do require Logger @queues [ - "text.carrier.tmobile.tcr_campaign.1.sms.messages" - # "text.provider.bandwidth.sms.messages" + "text.carrier.tmobile.tcr_campaign.1.sms.messages", + "text.carrier.us_cellular.tcr_campaign.1.sms.messages", + "text.provider.bandwidth.sms.messages" ] def maybe_log(message, %{context: %{config: %{queue: queue}}}) From 3910af24252a05b8da2647070f0b8598caa1ef8f Mon Sep 17 00:00:00 2001 From: Brian Meeker Date: Wed, 16 Oct 2024 22:34:12 -0400 Subject: [PATCH 12/13] ENG-2031: Squashing a bug in the simplified fix Because there could be left over demand that we want to zero out even if no messages are being processed, the no-op function head needs removed. I also noted that we should get rid of the utility module and all of it's logging ASAP. We just need to be confident our fix is working first --- lib/broadway/topology/producer_stage.ex | 4 ---- lib/broadway/utility.ex | 19 +++++++++++++++---- 2 files changed, 15 insertions(+), 8 deletions(-) diff --git a/lib/broadway/topology/producer_stage.ex b/lib/broadway/topology/producer_stage.ex index 18c818b..8dab036 100644 --- a/lib/broadway/topology/producer_stage.ex +++ b/lib/broadway/topology/producer_stage.ex @@ -405,10 +405,6 @@ defmodule Broadway.Topology.ProducerStage do :queue.join(:queue.from_list(list), queue) end - defp rate_limit_messages(_rate_limiter, [], _count, _state) do - {:open, [], []} - end - defp rate_limit_messages(rate_limiter, messages, demand, state) do left = RateLimiter.rate_limit(rate_limiter, demand) Utility.maybe_log("Remaining rate limit: #{left}", state) diff --git a/lib/broadway/utility.ex b/lib/broadway/utility.ex index 6b69559..7351a44 100644 --- a/lib/broadway/utility.ex +++ b/lib/broadway/utility.ex @@ -1,20 +1,31 @@ defmodule Broadway.Utility do + @moduledoc """ + This entire module is temporary while working on the rate limit bug in the + Broadway fork. + + Once we're comfortable with the fix, this module and all usages + of it should be removed. This includes removing a bunch of code around those + log statements to setup data for logging. + """ require Logger + # ABOMINATION: This helps us only log information about specific + # consumers by looking at the name of the queue. This leaks information + # about our app into our Broadway fork. @queues [ - "text.carrier.tmobile.tcr_campaign.1.sms.messages", - "text.carrier.us_cellular.tcr_campaign.1.sms.messages", + # "text.carrier.tmobile.tcr_campaign.1.sms.messages", + # "text.carrier.us_cellular.tcr_campaign.1.sms.messages", "text.provider.bandwidth.sms.messages" ] def maybe_log(message, %{context: %{config: %{queue: queue}}}) when queue in @queues do - Logger.info("Queue: #{queue} #{message}") + Logger.debug("Queue: #{queue} #{message}") end def maybe_log(message, %{module_state: %{config: %{queue: queue}}}) when queue in @queues do - Logger.info("Queue: #{queue} #{message}") + Logger.debug("Queue: #{queue} #{message}") end def maybe_log(_message, _state), do: :ok From 0c97eceb0075f67c3ea050298ad78dda155647dc Mon Sep 17 00:00:00 2001 From: Ryan Reynolds <832921+mus0u@users.noreply.github.com> Date: Thu, 17 Oct 2024 17:55:14 -0600 Subject: [PATCH 13/13] allow demand 0 in rate_limit/2 --- lib/broadway/topology/rate_limiter.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/broadway/topology/rate_limiter.ex b/lib/broadway/topology/rate_limiter.ex index c155057..34ad1e9 100644 --- a/lib/broadway/topology/rate_limiter.ex +++ b/lib/broadway/topology/rate_limiter.ex @@ -22,7 +22,7 @@ defmodule Broadway.Topology.RateLimiter do end def rate_limit(counter, amount) - when is_reference(counter) and is_integer(amount) and amount > 0 do + when is_reference(counter) and is_integer(amount) and amount >= 0 do :atomics.sub_get(counter, @atomics_index, amount) end