diff --git a/lib/sidekiq/throttled.rb b/lib/sidekiq/throttled.rb index 8a549958..46a015fb 100644 --- a/lib/sidekiq/throttled.rb +++ b/lib/sidekiq/throttled.rb @@ -40,17 +40,17 @@ module Sidekiq # end # end module Throttled + @configuration = Configuration.new + class << self - # @return [Configuration] - def configuration - @configuration ||= Configuration.new - end + attr_reader :configuration - # Hooks throttler into sidekiq. - # - # @return [void] def setup! - Sidekiq::Throttled::Patches::BasicFetch.apply! + Sidekiq.configure_server do |config| + config.server_middleware do |chain| + chain.add Sidekiq::Throttled::Middleware + end + end end # Tells whenever job is throttled or not. @@ -58,11 +58,13 @@ def setup! # @param [String] message Job's JSON payload # @return [Boolean] def throttled?(message) - message = JSON.parse message - job = message.fetch("wrapped") { message.fetch("class") { return false } } - jid = message.fetch("jid") { return false } + message = Sidekiq.load_json(message) + job = message.fetch("wrapped") { message["class"] } + jid = message["jid"] - Registry.get job do |strategy| + return false unless job && jid + + Registry.get(job) do |strategy| return strategy.throttled?(jid, *message["args"]) end @@ -72,10 +74,4 @@ def throttled?(message) end end end - - configure_server do |config| - config.server_middleware do |chain| - chain.add Sidekiq::Throttled::Middleware - end - end end diff --git a/lib/sidekiq/throttled/patches/basic_fetch.rb b/lib/sidekiq/throttled/patches/basic_fetch.rb index c421520d..603db222 100644 --- a/lib/sidekiq/throttled/patches/basic_fetch.rb +++ b/lib/sidekiq/throttled/patches/basic_fetch.rb @@ -7,12 +7,6 @@ module Sidekiq module Throttled module Patches module BasicFetch - class << self - def apply! - Sidekiq::BasicFetch.prepend(self) unless Sidekiq::BasicFetch.include?(self) - end - end - # Retrieves job from redis. # # @return [Sidekiq::Throttled::UnitOfWork, nil] @@ -39,22 +33,9 @@ def retrieve_work def requeue_throttled(work) redis { |conn| conn.lpush(work.queue, work.job) } end - - # Returns list of queues to try to fetch jobs from. - # - # @note It may return an empty array. - # @param [Array] queues - # @return [Array] - def queues_cmd - queues = super - - # TODO: Refactor to be prepended as an integration mixin during configuration stage - # Or via configurable queues reducer - queues -= Sidekiq::Pauzer.paused_queues.map { |name| "queue:#{name}" } if defined?(Sidekiq::Pauzer) - - queues - end end end end end + +Sidekiq::BasicFetch.prepend(Sidekiq::Throttled::Patches::BasicFetch) diff --git a/spec/lib/sidekiq/throttled/patches/basic_fetch_spec.rb b/spec/lib/sidekiq/throttled/patches/basic_fetch_spec.rb index 78c4de13..a814ecfc 100644 --- a/spec/lib/sidekiq/throttled/patches/basic_fetch_spec.rb +++ b/spec/lib/sidekiq/throttled/patches/basic_fetch_spec.rb @@ -11,19 +11,11 @@ def perform(*); end RSpec.describe Sidekiq::Throttled::Patches::BasicFetch do subject(:fetch) do - if Gem::Version.new(Sidekiq::VERSION) < Gem::Version.new("7.0.0") - Sidekiq.instance_variable_set(:@config, Sidekiq::DEFAULTS.dup) - Sidekiq.queues = %w[default] - Sidekiq::BasicFetch.new(Sidekiq) - else - config = Sidekiq::Config.new - config.queues = %w[default] - Sidekiq::BasicFetch.new(config.default_capsule) - end + config = Sidekiq::Config.new + config.queues = %w[default] + Sidekiq::BasicFetch.new(config.default_capsule) end - before { described_class.apply! } - describe "#retrieve_work" do def enqueued_jobs(queue) Sidekiq.redis do |conn|