From 341faec3acccb249e0ecb6188b54107e3528c68f Mon Sep 17 00:00:00 2001 From: Alexey Zapparov Date: Sun, 19 Nov 2023 01:36:38 +0100 Subject: [PATCH] wip: Implement cooldown controller --- README.adoc | 19 +++++ lib/sidekiq/throttled.rb | 36 +++++++- lib/sidekiq/throttled/config.rb | 44 ++++++++++ lib/sidekiq/throttled/cooldown.rb | 55 ++++++++++++ lib/sidekiq/throttled/expirable_set.rb | 2 + lib/sidekiq/throttled/patches/basic_fetch.rb | 12 +++ spec/lib/sidekiq/throttled/config_spec.rb | 58 +++++++++++++ spec/lib/sidekiq/throttled/cooldown_spec.rb | 83 +++++++++++++++++++ .../throttled/patches/basic_fetch_spec.rb | 16 +++- 9 files changed, 318 insertions(+), 7 deletions(-) create mode 100644 lib/sidekiq/throttled/config.rb create mode 100644 lib/sidekiq/throttled/cooldown.rb create mode 100644 spec/lib/sidekiq/throttled/config_spec.rb create mode 100644 spec/lib/sidekiq/throttled/cooldown_spec.rb diff --git a/README.adoc b/README.adoc index e704c168..22cb1c71 100644 --- a/README.adoc +++ b/README.adoc @@ -89,6 +89,25 @@ end ---- +=== Configuration + +[source,ruby] +---- +Sidekiq::Throttled.configure do |config| + # Period in seconds to exclude queue from polling in case it returned + # {config.cooldown_threshold} amount of throttled jobs in a row. Set + # this value to `nil` to disable cooldown manager completely. + # Default: 2.0 + config.coolldown_period = 2.0 + + # Amount of throttled jobs returned from the queue subsequently after which + # queue will be excluded from polling. + # Default: 1 (cooldown after first throttled job) + config.cooldown_threshold = 1 +end +---- + + === Observer You can specify an observer that will be called on throttling. To do so pass an diff --git a/lib/sidekiq/throttled.rb b/lib/sidekiq/throttled.rb index 5bac71d8..e3e1c76d 100644 --- a/lib/sidekiq/throttled.rb +++ b/lib/sidekiq/throttled.rb @@ -2,11 +2,13 @@ require "sidekiq" -require_relative "./throttled/version" -require_relative "./throttled/patches/basic_fetch" -require_relative "./throttled/registry" +require_relative "./throttled/config" +require_relative "./throttled/cooldown" require_relative "./throttled/job" require_relative "./throttled/middleware" +require_relative "./throttled/patches/basic_fetch" +require_relative "./throttled/registry" +require_relative "./throttled/version" require_relative "./throttled/worker" # @see https://github.com/mperham/sidekiq/ @@ -39,7 +41,35 @@ module Sidekiq # end # end module Throttled + MUTEX = Mutex.new + private_constant :MUTEX + + @config = Config.new.freeze + @cooldown = Cooldown[@config] + class << self + # @api internal + # + # @return [Cooldown, nil] + attr_reader :cooldown + + # @example + # Sidekiq::Throttled.configure do |config| + # config.cooldown_period = nil # Disable queues cooldown manager + # end + # + # @yieldparam config [Config] + def configure + MUTEX.synchronize do + config = @config.dup + + yield config + + @config = config.freeze + @cooldown = Cooldown[@config] + end + end + def setup! Sidekiq.configure_server do |config| config.server_middleware do |chain| diff --git a/lib/sidekiq/throttled/config.rb b/lib/sidekiq/throttled/config.rb new file mode 100644 index 00000000..faf75d2c --- /dev/null +++ b/lib/sidekiq/throttled/config.rb @@ -0,0 +1,44 @@ +# frozen_string_literal: true + +module Sidekiq + module Throttled + # Configuration object. + class Config + # Period in seconds to exclude queue from polling in case it returned + # {#cooldown_threshold} amount of throttled jobs in a row. + # + # Set this to `nil` to disable cooldown completely. + # + # @return [Float, nil] + attr_reader :cooldown_period + + # Amount of throttled jobs returned from the same queue subsequently after + # which queue will be excluded from polling for the durations of + # {#cooldown_period}. + # + # @return [Integer] + attr_reader :cooldown_threshold + + def initialize + @cooldown_period = 2.0 + @cooldown_threshold = 1 + end + + # @!attribute [w] cooldown_period + def cooldown_period=(value) + raise TypeError, "unexpected type #{value.class}" unless value.nil? || value.is_a?(Float) + raise ArgumentError, "period must be positive" unless value.nil? || value.positive? + + @cooldown_period = value + end + + # @!attribute [w] cooldown_threshold + def cooldown_threshold=(value) + raise TypeError, "unexpected type #{value.class}" unless value.is_a?(Integer) + raise ArgumentError, "threshold must be positive" unless value.positive? + + @cooldown_threshold = value + end + end + end +end diff --git a/lib/sidekiq/throttled/cooldown.rb b/lib/sidekiq/throttled/cooldown.rb new file mode 100644 index 00000000..11f38bda --- /dev/null +++ b/lib/sidekiq/throttled/cooldown.rb @@ -0,0 +1,55 @@ +# frozen_string_literal: true + +require "concurrent" + +require_relative "./expirable_set" + +module Sidekiq + module Throttled + # @api internal + # + # Queues cooldown manager. Tracks list of queues that should be temporarily + # (for the duration of {Config#cooldown_period}) excluded from polling. + class Cooldown + class << self + # Returns new {Cooldown} instance if {Config#cooldown_period} is not `nil`. + # + # @param config [Config] + # @return [Cooldown, nil] + def [](config) + new(config) if config.cooldown_period + end + end + + # @param config [Config] + def initialize(config) + @queues = ExpirableSet.new(config.cooldown_period) + @threshold = config.cooldown_threshold + @tracker = Concurrent::Map.new + end + + # Notify that given queue returned job that was throttled. + # + # @param queue [String] + # @return [void] + def notify_throttled(queue) + @queues.add(queue) if @threshold <= @tracker.merge_pair(queue, 1, &:succ) + end + + # Notify that given queue returned job that was not throttled. + # + # @param queue [String] + # @return [void] + def notify_admitted(queue) + @tracker.delete(queue) + end + + # List of queues that should not be polled + # + # @return [Array] + def queues + @queues.to_a + end + end + end +end diff --git a/lib/sidekiq/throttled/expirable_set.rb b/lib/sidekiq/throttled/expirable_set.rb index b8ebae46..9d992f68 100644 --- a/lib/sidekiq/throttled/expirable_set.rb +++ b/lib/sidekiq/throttled/expirable_set.rb @@ -4,6 +4,8 @@ module Sidekiq module Throttled + # @api internal + # # Set of elements with expirations. # # @example diff --git a/lib/sidekiq/throttled/patches/basic_fetch.rb b/lib/sidekiq/throttled/patches/basic_fetch.rb index 603db222..0f933dc8 100644 --- a/lib/sidekiq/throttled/patches/basic_fetch.rb +++ b/lib/sidekiq/throttled/patches/basic_fetch.rb @@ -14,10 +14,13 @@ def retrieve_work work = super if work && Throttled.throttled?(work.job) + Throttled.cooldown&.notify_throttled(work.queue) requeue_throttled(work) return nil end + Throttled.cooldown&.notify_admitted(work.queue) if work + work end @@ -33,6 +36,15 @@ def retrieve_work def requeue_throttled(work) redis { |conn| conn.lpush(work.queue, work.job) } end + + # Returns list of queues to try to fetch jobs from. + # + # @note It may return an empty array. + # @param [Array] queues + # @return [Array] + def queues_cmd + super - (Throttled.cooldown&.queues || []) + end end end end diff --git a/spec/lib/sidekiq/throttled/config_spec.rb b/spec/lib/sidekiq/throttled/config_spec.rb new file mode 100644 index 00000000..47662faa --- /dev/null +++ b/spec/lib/sidekiq/throttled/config_spec.rb @@ -0,0 +1,58 @@ +# frozen_string_literal: true + +require "sidekiq/throttled/config" + +RSpec.describe Sidekiq::Throttled::Config do + subject(:config) { described_class.new } + + describe "#cooldown_period" do + subject { config.cooldown_period } + + it { is_expected.to eq 2.0 } + end + + describe "#cooldown_period=" do + it "updates #cooldown_period" do + expect { config.cooldown_period = 42.0 } + .to change { config.cooldown_period }.to(42.0) + end + + it "allows setting value to `nil`" do + expect { config.cooldown_period = nil } + .to change { config.cooldown_period }.to(nil) + end + + it "fails if given value is neither `NilClass` nor `Float`" do + expect { config.cooldown_period = 42 } + .to raise_error(TypeError, /unexpected type/) + end + + it "fails if given value is not positive" do + expect { config.cooldown_period = 0.0 } + .to raise_error(ArgumentError, /must be positive/) + end + end + + describe "#cooldown_threshold" do + subject { config.cooldown_threshold } + + it { is_expected.to eq 1 } + end + + describe "#cooldown_threshold=" do + it "updates #cooldown_threshold" do + expect { config.cooldown_threshold = 42 } + .to change { config.cooldown_threshold }.to(42) + end + + it "fails if given value is not `Integer`" do + expect { config.cooldown_threshold = 42.0 } + .to raise_error(TypeError, /unexpected type/) + end + + it "fails if given value is not positive" do + expect { config.cooldown_threshold = 0 } + .to raise_error(ArgumentError, /must be positive/) + end + end +end diff --git a/spec/lib/sidekiq/throttled/cooldown_spec.rb b/spec/lib/sidekiq/throttled/cooldown_spec.rb new file mode 100644 index 00000000..9472259b --- /dev/null +++ b/spec/lib/sidekiq/throttled/cooldown_spec.rb @@ -0,0 +1,83 @@ +# frozen_string_literal: true + +require "sidekiq/throttled/cooldown" + +RSpec.describe Sidekiq::Throttled::Cooldown do + subject(:cooldown) { described_class.new(config) } + + let(:config) { Sidekiq::Throttled::Config.new } + + describe ".[]" do + subject { described_class[config] } + + it { is_expected.to be_an_instance_of described_class } + + context "when `cooldown_period` is nil" do + before { config.cooldown_period = nil } + + it { is_expected.to be_nil } + end + end + + describe "#notify_throttled" do + before do + config.cooldown_threshold = 5 + + (config.cooldown_threshold - 1).times do + cooldown.notify_throttled("queue:the_longest_line") + end + end + + it "marks queue for exclusion once threshold is met" do + cooldown.notify_throttled("queue:the_longest_line") + + expect(cooldown.queues).to contain_exactly("queue:the_longest_line") + end + end + + describe "#notify_admitted" do + before do + config.cooldown_threshold = 5 + + (config.cooldown_threshold - 1).times do + cooldown.notify_throttled("queue:at_the_end_of") + cooldown.notify_throttled("queue:the_longest_line") + end + end + + it "resets threshold counter" do + cooldown.notify_admitted("queue:at_the_end_of") + + cooldown.notify_throttled("queue:at_the_end_of") + cooldown.notify_throttled("queue:the_longest_line") + + expect(cooldown.queues).to contain_exactly("queue:the_longest_line") + end + end + + describe "#queues" do + before do + config.cooldown_period = 1.0 + config.cooldown_threshold = 1 + end + + it "keeps queue in the exclusion list for the duration of cooldown_period" do + monotonic_time = 0.0 + + allow(Process).to receive(:clock_gettime).with(Process::CLOCK_MONOTONIC) { monotonic_time } + + cooldown.notify_throttled("queue:at_the_end_of") + + monotonic_time += 0.9 + cooldown.notify_throttled("queue:the_longest_line") + + expect(cooldown.queues).to contain_exactly("queue:at_the_end_of", "queue:the_longest_line") + + monotonic_time += 0.1 + expect(cooldown.queues).to contain_exactly("queue:the_longest_line") + + monotonic_time += 1.0 + expect(cooldown.queues).to be_empty + end + end +end diff --git a/spec/lib/sidekiq/throttled/patches/basic_fetch_spec.rb b/spec/lib/sidekiq/throttled/patches/basic_fetch_spec.rb index a814ecfc..e35c12ef 100644 --- a/spec/lib/sidekiq/throttled/patches/basic_fetch_spec.rb +++ b/spec/lib/sidekiq/throttled/patches/basic_fetch_spec.rb @@ -12,16 +12,24 @@ def perform(*); end RSpec.describe Sidekiq::Throttled::Patches::BasicFetch do subject(:fetch) do config = Sidekiq::Config.new - config.queues = %w[default] + config.queues = %w[default underground network] Sidekiq::BasicFetch.new(config.default_capsule) end + before do + Sidekiq::Throttled.configure { |config| config.cooldown_threshold = 2 } + end + + after do + Sidekiq::Throttled.configure { |config| config.cooldown_threshold = 1 } + end + describe "#retrieve_work" do def enqueued_jobs(queue) Sidekiq.redis do |conn| conn.lrange("queue:#{queue}", 0, -1).map do |job| JSON.parse(job).then do |payload| - [payload["class"], payload["args"]] + [payload["class"], *payload["args"]] end end end @@ -52,7 +60,7 @@ def enqueued_jobs(queue) it "pushes job back to the head of the queue" do expect { fetch.retrieve_work } .to change { enqueued_jobs("default") } - .to eq([["ThrottledTestJob", [2]], ["ThrottledTestJob", [3]]]) + .to eq([["ThrottledTestJob", 2], ["ThrottledTestJob", 3]]) end end @@ -69,7 +77,7 @@ def enqueued_jobs(queue) it "pushes job back to the head of the queue" do expect { fetch.retrieve_work } .to change { enqueued_jobs("default") } - .to eq([["ThrottledTestJob", [2]], ["ThrottledTestJob", [3]]]) + .to eq([["ThrottledTestJob", 2], ["ThrottledTestJob", 3]]) end end end