Skip to content

Commit

Permalink
fix: Better patch BasicFetch
Browse files Browse the repository at this point in the history
This brings support of both Sidekiq::BasicFetch and
Sidekiq::Pro::BasicFetch
  • Loading branch information
ixti committed Nov 17, 2023
1 parent fa2d701 commit d263ba9
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 50 deletions.
32 changes: 14 additions & 18 deletions lib/sidekiq/throttled.rb
Original file line number Diff line number Diff line change
Expand Up @@ -40,29 +40,31 @@ module Sidekiq
# end
# end
module Throttled
@configuration = Configuration.new

class << self
# @return [Configuration]
def configuration
@configuration ||= Configuration.new
end
attr_reader :configuration

# Hooks throttler into sidekiq.
#
# @return [void]
def setup!
Sidekiq::Throttled::Patches::BasicFetch.apply!
Sidekiq.configure_server do |config|
config.server_middleware do |chain|
chain.add Sidekiq::Throttled::Middleware
end
end
end

# Tells whenever job is throttled or not.
#
# @param [String] message Job's JSON payload
# @return [Boolean]
def throttled?(message)
message = JSON.parse message
job = message.fetch("wrapped") { message.fetch("class") { return false } }
jid = message.fetch("jid") { return false }
message = Sidekiq.load_json(message)
job = message.fetch("wrapped") { message["class"] }
jid = message["jid"]

Registry.get job do |strategy|
return false unless job && jid

Registry.get(job) do |strategy|
return strategy.throttled?(jid, *message["args"])
end

Expand All @@ -72,10 +74,4 @@ def throttled?(message)
end
end
end

configure_server do |config|
config.server_middleware do |chain|
chain.add Sidekiq::Throttled::Middleware
end
end
end
23 changes: 2 additions & 21 deletions lib/sidekiq/throttled/patches/basic_fetch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,6 @@ module Sidekiq
module Throttled
module Patches
module BasicFetch
class << self
def apply!
Sidekiq::BasicFetch.prepend(self) unless Sidekiq::BasicFetch.include?(self)
end
end

# Retrieves job from redis.
#
# @return [Sidekiq::Throttled::UnitOfWork, nil]
Expand All @@ -39,22 +33,9 @@ def retrieve_work
def requeue_throttled(work)
redis { |conn| conn.lpush(work.queue, work.job) }
end

# Returns list of queues to try to fetch jobs from.
#
# @note It may return an empty array.
# @param [Array<String>] queues
# @return [Array<String>]
def queues_cmd
queues = super

# TODO: Refactor to be prepended as an integration mixin during configuration stage
# Or via configurable queues reducer
queues -= Sidekiq::Pauzer.paused_queues.map { |name| "queue:#{name}" } if defined?(Sidekiq::Pauzer)

queues
end
end
end
end
end

Sidekiq::BasicFetch.prepend(Sidekiq::Throttled::Patches::BasicFetch)
14 changes: 3 additions & 11 deletions spec/lib/sidekiq/throttled/patches/basic_fetch_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,11 @@ def perform(*); end

RSpec.describe Sidekiq::Throttled::Patches::BasicFetch do
subject(:fetch) do
if Gem::Version.new(Sidekiq::VERSION) < Gem::Version.new("7.0.0")
Sidekiq.instance_variable_set(:@config, Sidekiq::DEFAULTS.dup)
Sidekiq.queues = %w[default]
Sidekiq::BasicFetch.new(Sidekiq)
else
config = Sidekiq::Config.new
config.queues = %w[default]
Sidekiq::BasicFetch.new(config.default_capsule)
end
config = Sidekiq::Config.new
config.queues = %w[default]
Sidekiq::BasicFetch.new(config.default_capsule)
end

before { described_class.apply! }

describe "#retrieve_work" do
def enqueued_jobs(queue)
Sidekiq.redis do |conn|
Expand Down

0 comments on commit d263ba9

Please sign in to comment.