Skip to content

Commit

Permalink
ENG-2031: WIP to try and get rate limiting to close correctly
Browse files Browse the repository at this point in the history
  • Loading branch information
CuriousCurmudgeon committed Oct 16, 2024
1 parent 1e9dc83 commit a5b9576
Showing 1 changed file with 33 additions and 15 deletions.
48 changes: 33 additions & 15 deletions lib/broadway/topology/producer_stage.ex
Original file line number Diff line number Diff line change
Expand Up @@ -316,13 +316,15 @@ defmodule Broadway.Topology.ProducerStage do
{%{rate_limiting | state: :closed}, []}

allowed ->
{allowed_left, probably_emittable, buffer} = dequeue_many(buffer, allowed, [])
{allowed_left, probably_emittable, buffer, next_message_weight} =
dequeue_many(buffer, allowed, [])

log_map = %{
initial_allowed: allowed,
allowed_left: allowed_left,
probably_emittable_count: length(probably_emittable),
buffer_length: :queue.len(buffer)
buffer_length: :queue.len(buffer),
next_message_weight: next_message_weight
}

Utility.maybe_log("dequeue_many result: #{inspect(log_map)}", state)
Expand All @@ -333,7 +335,9 @@ defmodule Broadway.Topology.ProducerStage do
rate_limit_messages(
rate_limiter,
probably_emittable,
_probably_emittable_weight = allowed - allowed_left
_probably_emittable_weight = allowed - allowed_left,
next_message_weight,
state
)

rate_limiting_map = %{
Expand All @@ -353,6 +357,7 @@ defmodule Broadway.Topology.ProducerStage do
}

if draining? and :queue.is_empty(new_buffer) do
Utility.maybe_log("Cancelling consumers", state)
cancel_consumers(state)
end

Expand All @@ -370,7 +375,7 @@ defmodule Broadway.Topology.ProducerStage do
# allowed_left is how much of the demand remands
# probably_emittable is the messages pulled off the queue that we can probably process
# buffer is the remaining queue
defp dequeue_many(queue, 0, acc), do: {0, Enum.reverse(acc), queue}
defp dequeue_many(queue, 0, acc), do: {0, Enum.reverse(acc), queue, 0}

defp dequeue_many(queue, demand, acc) do
# Take the first message off the queue
Expand All @@ -382,14 +387,16 @@ defmodule Broadway.Topology.ProducerStage do
# requeue first message and ignore remaining demand since
# the next message would put us over the allowed weight
new_queue = :queue.in_r(message, queue)
{0, Enum.reverse(acc), new_queue}
# TODO: Should the first element be demand instead of 0?
# Changed it for now.
{demand, Enum.reverse(acc), new_queue, message.weight}
else
new_demand = demand - message.weight
dequeue_many(queue, new_demand, [message | acc])
end

{:empty, queue} ->
{demand, Enum.reverse(acc), queue}
{demand, Enum.reverse(acc), queue, 0}
end
end

Expand All @@ -405,30 +412,41 @@ defmodule Broadway.Topology.ProducerStage do
:queue.join(:queue.from_list(list), queue)
end

defp rate_limit_messages(_state, [], _count) do
{:open, [], []}
end
# defp rate_limit_messages(_rate_limiter, [], _count, _next_message_weight, state) do
# Utility.maybe_log("No messages to rate limit", state)
# {:open, [], []}
# end

defp rate_limit_messages(rate_limiter, messages, demand, next_message_weight, state) do
left = RateLimiter.rate_limit(rate_limiter, demand)
Utility.maybe_log("Remaining rate limit: #{left}", state)

cond do
messages == [] ->
{:open, [], []}

defp rate_limit_messages(rate_limiter, messages, demand) do
case RateLimiter.rate_limit(rate_limiter, demand) do
# If no more messages are allowed, we're rate limited but we're able
# to emit all messages that we have.
0 ->
left == 0 ->
{:closed, messages, _to_buffer = []}

# TODO: Does this need changes to handle the negative case below?
left < next_message_weight and left >= 0 ->
{:closed, messages, _to_buffer = []}

# We were able to emit all messages and still more messages are allowed,
# so the rate limiting is "open".
left when left > 0 ->
left > next_message_weight ->
{:open, messages, _to_buffer = []}

# We went over the rate limit, so we remove messages from the
# back of the list of those we were able to emit until the
# overflow is corrected and close the rate limiting.
overflow when overflow < 0 ->
left < 0 ->
reversed = Enum.reverse(messages)

{emittable, to_buffer, _overflow} =
Enum.reduce_while(reversed, {reversed, [], overflow}, fn
Enum.reduce_while(reversed, {reversed, [], left}, fn
message, {emittable, to_buffer, overflow} ->
if overflow >= 0 do
{:halt, {Enum.reverse(emittable), to_buffer, overflow}}
Expand Down

0 comments on commit a5b9576

Please sign in to comment.