Skip to content

Commit

Permalink
rate limit by message weight instead of message count
Browse files Browse the repository at this point in the history
  • Loading branch information
mus0u committed Apr 15, 2024
1 parent f0bc227 commit 70358a9
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 27 deletions.
4 changes: 3 additions & 1 deletion lib/broadway/message.ex
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ defmodule Broadway.Message do
batcher: atom,
batch_key: term,
batch_mode: :bulk | :flush,
weight: non_neg_integer,
status:
:ok
| {:failed, reason :: term}
Expand All @@ -63,7 +64,8 @@ defmodule Broadway.Message do
batcher: :default,
batch_key: :default,
batch_mode: :bulk,
status: :ok
status: :ok,
weight: 1

@doc """
Updates the data in the message.
Expand Down
92 changes: 66 additions & 26 deletions lib/broadway/topology/producer_stage.ex
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ defmodule Broadway.Topology.ProducerStage do
state: :open,
draining?: false,
rate_limiter: rate_limiter_ref,
# A queue of "batches" of messages that we buffered.
# A queue of INDIVIDUAL messages that we buffered.
message_buffer: :queue.new(),
# A queue of demands (integers) that we buffered.
demand_buffer: :queue.new()
Expand Down Expand Up @@ -312,7 +312,7 @@ defmodule Broadway.Topology.ProducerStage do
rate_limit_messages(
rate_limiter,
probably_emittable,
_probably_emittable_count = allowed - allowed_left
_probably_emittable_weight = allowed - allowed_left
)

new_buffer = enqueue_batch_r(buffer, messages_to_buffer)
Expand All @@ -333,30 +333,47 @@ defmodule Broadway.Topology.ProducerStage do
{%{state | rate_limiting: rate_limiting}, messages_to_emit}
end

defp reverse_split_demand(rest, 0, acc) do
{0, acc, rest}
end
# defp reverse_split_demand(rest, 0, acc) do
# {0, acc, rest}
# end

defp reverse_split_demand([], demand, acc) do
{demand, acc, []}
end
# defp reverse_split_demand([], demand, acc) do
# {demand, acc, []}
# end

defp reverse_split_demand([head | tail], demand, acc) do
reverse_split_demand(tail, demand - 1, [head | acc])
end
# defp reverse_split_demand([head | tail], demand, acc) do
# reverse_split_demand(tail, demand - 1, [head | acc])
# end

defp dequeue_many(queue, 0, acc), do: {0, Enum.reverse(acc), queue}

defp dequeue_many(queue, demand, acc) do
# case :queue.out(queue) do
# {{:value, list}, queue} ->
# case reverse_split_demand(list, demand, acc) do
# {0, acc, []} ->
# {0, Enum.reverse(acc), queue}

# {0, acc, rest} ->
# {0, Enum.reverse(acc), :queue.in_r(rest, queue)}

# {demand, acc, []} ->
# dequeue_many(queue, demand, acc)
# end

# {:empty, queue} ->
# {demand, Enum.reverse(acc), queue}
# end
case :queue.out(queue) do
{{:value, list}, queue} ->
case reverse_split_demand(list, demand, acc) do
{0, acc, []} ->
{0, Enum.reverse(acc), queue}

{0, acc, rest} ->
{0, Enum.reverse(acc), :queue.in_r(rest, queue)}

{demand, acc, []} ->
dequeue_many(queue, demand, acc)
{{:value, message}, queue} ->
if message.weight > demand do
new_queue = :queue.in_r(message, queue)
# ignore remaining demand since message at head of queue is
# too chunky to send
{0, Enum.reverse(acc), new_queue}
else
new_demand = demand - message.weight
dequeue_many(queue, new_demand, [message | acc])
end

{:empty, queue} ->
Expand All @@ -365,17 +382,25 @@ defmodule Broadway.Topology.ProducerStage do
end

defp enqueue_batch(queue, _list = []), do: queue
defp enqueue_batch(queue, list), do: :queue.in(list, queue)

defp enqueue_batch(queue, list) do
# :queue.in(list, queue)
:queue.join(queue, :queue.from_list(list))
end

defp enqueue_batch_r(queue, _list = []), do: queue
defp enqueue_batch_r(queue, list), do: :queue.in_r(list, queue)

defp enqueue_batch_r(queue, list) do
# :queue.in_r(list, queue)
:queue.join(:queue.from_list(list), queue)
end

defp rate_limit_messages(_state, [], _count) do
{:open, [], []}
end

defp rate_limit_messages(rate_limiter, messages, message_count) do
case RateLimiter.rate_limit(rate_limiter, message_count) do
defp rate_limit_messages(rate_limiter, messages, demand) do
case RateLimiter.rate_limit(rate_limiter, demand) do
# If no more messages are allowed, we're rate limited but we're able
# to emit all messages that we have.
0 ->
Expand All @@ -388,8 +413,23 @@ defmodule Broadway.Topology.ProducerStage do

# We went over the rate limit, so we split (on negative index) the messages
# we were able to emit and close the rate limiting.

# NOTE: we think this will work for concurrent producers, but we
# ourselves don't actually have any.
overflow when overflow < 0 ->
{emittable, to_buffer} = Enum.split(messages, overflow)
# {emittable, to_buffer} = Enum.split(messages, overflow)
reversed = Enum.reverse(messages)

{emittable, to_buffer, _overflow} =
Enum.reduce_while(reversed, {[], reversed, overflow}, fn
message, {emittable, to_buffer, overflow} ->
if overflow > 0 do
{:halt, {emittable, overflow}}
else
{:cont, {[message | emittable], tl(to_buffer), overflow + message.weight}}
end
end)

{:closed, emittable, to_buffer}
end
end
Expand Down

0 comments on commit 70358a9

Please sign in to comment.