Skip to content

Commit

Permalink
ENG-2031: Add more logging to the producer stage
Browse files Browse the repository at this point in the history
I can now see that only one message is being pulled off each time
rate_limit_and_buffer_messages is called. That's surprising. The buffer
length is only 1 in the scenarios I'm debugging. It does emit 10 messages
when the buffer length is longer though.
  • Loading branch information
CuriousCurmudgeon committed Oct 16, 2024
1 parent ff534c0 commit 5304c57
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 5 deletions.
23 changes: 20 additions & 3 deletions lib/broadway/topology/producer_stage.ex
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,6 @@ defmodule Broadway.Topology.ProducerStage do
end

def handle_call(message, from, state) do
dbg({message, from, state})
%{module: module, module_state: module_state} = state

message
Expand Down Expand Up @@ -307,7 +306,6 @@ defmodule Broadway.Topology.ProducerStage do

defp rate_limit_and_buffer_messages(%{rate_limiting: rate_limiting} = state) do
%{message_buffer: buffer, rate_limiter: rate_limiter, draining?: draining?} = rate_limiting
# dbg(state)

{rate_limiting, messages_to_emit} =
case RateLimiter.get_currently_allowed(rate_limiter) do
Expand All @@ -320,13 +318,32 @@ defmodule Broadway.Topology.ProducerStage do
allowed ->
{allowed_left, probably_emittable, buffer} = dequeue_many(buffer, allowed, [])

log_map = %{
initial_allowed: allowed,
allowed_left: allowed_left,
probably_emittable_count: length(probably_emittable),
buffer_length: :queue.len(buffer)
}

Utility.maybe_log("dequeue_many result: #{inspect(log_map)}", state)

# This seems suspicious. Are we confirming that the rate limit is still valid?
# That nothing has changed?
{rate_limiting_state, messages_to_emit, messages_to_buffer} =
rate_limit_messages(
rate_limiter,
probably_emittable,
_probably_emittable_weight = allowed - allowed_left
)

rate_limiting_map = %{
rate_limiting_stage: rate_limiting_state,
emit_count: length(messages_to_emit),
to_buffer_count: length(messages_to_buffer)
}

Utility.maybe_log("rate_limit_messages result: #{inspect(rate_limiting_map)}", state)

new_buffer = enqueue_batch_r(buffer, messages_to_buffer)

rate_limiting = %{
Expand All @@ -346,7 +363,7 @@ defmodule Broadway.Topology.ProducerStage do
end

# queue is an Erlang queue
# demand is how many messages rate limiting is allowing
# demand is how many segments rate limiting is allowing
# acc is the messages pulled off the queue
#
# returns {allowed_left, probably_emittable, buffer}
Expand Down
4 changes: 2 additions & 2 deletions lib/broadway/utility.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ defmodule Broadway.Utility do
require Logger

@queues [
"text.carrier.tmobile.tcr_campaign.1.sms.messages",
"text.provider.bandwidth.sms.messages"
"text.carrier.tmobile.tcr_campaign.1.sms.messages"
# "text.provider.bandwidth.sms.messages"
]

def maybe_log(message, %{context: %{config: %{queue: queue}}})
Expand Down

0 comments on commit 5304c57

Please sign in to comment.