diff --git a/lib/broadway/topology/producer_stage.ex b/lib/broadway/topology/producer_stage.ex index d8d35ee..c0d932f 100644 --- a/lib/broadway/topology/producer_stage.ex +++ b/lib/broadway/topology/producer_stage.ex @@ -316,13 +316,15 @@ defmodule Broadway.Topology.ProducerStage do {%{rate_limiting | state: :closed}, []} allowed -> - {allowed_left, probably_emittable, buffer} = dequeue_many(buffer, allowed, []) + {allowed_left, probably_emittable, buffer, next_message_weight} = + dequeue_many(buffer, allowed, []) log_map = %{ initial_allowed: allowed, allowed_left: allowed_left, probably_emittable_count: length(probably_emittable), - buffer_length: :queue.len(buffer) + buffer_length: :queue.len(buffer), + next_message_weight: next_message_weight } Utility.maybe_log("dequeue_many result: #{inspect(log_map)}", state) @@ -333,7 +335,9 @@ defmodule Broadway.Topology.ProducerStage do rate_limit_messages( rate_limiter, probably_emittable, - _probably_emittable_weight = allowed - allowed_left + _probably_emittable_weight = allowed - allowed_left, + next_message_weight, + state ) rate_limiting_map = %{ @@ -353,6 +357,7 @@ defmodule Broadway.Topology.ProducerStage do } if draining? and :queue.is_empty(new_buffer) do + Utility.maybe_log("Cancelling consumers", state) cancel_consumers(state) end @@ -370,7 +375,7 @@ defmodule Broadway.Topology.ProducerStage do # allowed_left is how much of the demand remands # probably_emittable is the messages pulled off the queue that we can probably process # buffer is the remaining queue - defp dequeue_many(queue, 0, acc), do: {0, Enum.reverse(acc), queue} + defp dequeue_many(queue, 0, acc), do: {0, Enum.reverse(acc), queue, 0} defp dequeue_many(queue, demand, acc) do # Take the first message off the queue @@ -382,14 +387,16 @@ defmodule Broadway.Topology.ProducerStage do # requeue first message and ignore remaining demand since # the next message would put us over the allowed weight new_queue = :queue.in_r(message, queue) - {0, Enum.reverse(acc), new_queue} + # TODO: Should the first element be demand instead of 0? + # Changed it for now. + {demand, Enum.reverse(acc), new_queue, message.weight} else new_demand = demand - message.weight dequeue_many(queue, new_demand, [message | acc]) end {:empty, queue} -> - {demand, Enum.reverse(acc), queue} + {demand, Enum.reverse(acc), queue, 0} end end @@ -405,30 +412,41 @@ defmodule Broadway.Topology.ProducerStage do :queue.join(:queue.from_list(list), queue) end - defp rate_limit_messages(_state, [], _count) do - {:open, [], []} - end + # defp rate_limit_messages(_rate_limiter, [], _count, _next_message_weight, state) do + # Utility.maybe_log("No messages to rate limit", state) + # {:open, [], []} + # end + + defp rate_limit_messages(rate_limiter, messages, demand, next_message_weight, state) do + left = RateLimiter.rate_limit(rate_limiter, demand) + Utility.maybe_log("Remaining rate limit: #{left}", state) + + cond do + messages == [] -> + {:open, [], []} - defp rate_limit_messages(rate_limiter, messages, demand) do - case RateLimiter.rate_limit(rate_limiter, demand) do # If no more messages are allowed, we're rate limited but we're able # to emit all messages that we have. - 0 -> + left == 0 -> + {:closed, messages, _to_buffer = []} + + # TODO: Does this need changes to handle the negative case below? + left < next_message_weight and left >= 0 -> {:closed, messages, _to_buffer = []} # We were able to emit all messages and still more messages are allowed, # so the rate limiting is "open". - left when left > 0 -> + left > next_message_weight -> {:open, messages, _to_buffer = []} # We went over the rate limit, so we remove messages from the # back of the list of those we were able to emit until the # overflow is corrected and close the rate limiting. - overflow when overflow < 0 -> + left < 0 -> reversed = Enum.reverse(messages) {emittable, to_buffer, _overflow} = - Enum.reduce_while(reversed, {reversed, [], overflow}, fn + Enum.reduce_while(reversed, {reversed, [], left}, fn message, {emittable, to_buffer, overflow} -> if overflow >= 0 do {:halt, {Enum.reverse(emittable), to_buffer, overflow}}