diff --git a/.gitignore b/.gitignore index 80036d4..4633e88 100644 --- a/.gitignore +++ b/.gitignore @@ -16,3 +16,4 @@ spec/reports test/tmp test/version_tmp tmp +.lefthook-local.yml \ No newline at end of file diff --git a/Appraisals b/Appraisals index 4b87a34..fb5b7e3 100644 --- a/Appraisals +++ b/Appraisals @@ -21,3 +21,11 @@ end appraise 'sidekiq-6.5' do gem 'sidekiq', '~> 6.5.0' end + +appraise 'sidekiq-7.0' do + gem 'sidekiq', '~> 7.0.0' +end + +appraise 'sidekiq-master' do + gem 'sidekiq', github: 'mperham/sidekiq' +end diff --git a/bin/console b/bin/console new file mode 100755 index 0000000..dde3a1d --- /dev/null +++ b/bin/console @@ -0,0 +1,8 @@ +#!/usr/bin/env ruby +# frozen_string_literal: true + +require "bundler/setup" +require "sidekiq/grouping" + +require "pry" +Pry.start diff --git a/gemfiles/sidekiq_7.0.gemfile b/gemfiles/sidekiq_7.0.gemfile new file mode 100644 index 0000000..381b307 --- /dev/null +++ b/gemfiles/sidekiq_7.0.gemfile @@ -0,0 +1,7 @@ +# This file was generated by Appraisal + +source "https://rubygems.org" + +gem "sidekiq", "~> 7.0.0" + +gemspec path: "../" diff --git a/lefthook.yml b/lefthook.yml index 08d70c4..79bf09c 100644 --- a/lefthook.yml +++ b/lefthook.yml @@ -4,12 +4,17 @@ # See: github.com/evilmartians/lefthook pre-commit: + parallel: true commands: + appraisal: + glob: "{Appraisals,*.gemfile}" + run: echo {staged_files} > /dev/null; bundle exec appraisal install && git add gemfiles/*.gemfile rubocop: - glob: "*.rb" + glob: "{*.rb,*.gemspec,Gemfile,Rakefile}" run: bundle exec rubocop -A {staged_files} && git add {staged_files} pre-push: commands: rspec: - run: bundle exec appraisal rspec + glob: "*.rb" + run: echo {push_files} > /dev/null; bundle exec appraisal rspec diff --git a/lib/sidekiq/grouping.rb b/lib/sidekiq/grouping.rb index 5f9da1a..3e6b908 100644 --- a/lib/sidekiq/grouping.rb +++ b/lib/sidekiq/grouping.rb @@ -4,6 +4,7 @@ require "active_support/core_ext/string" require "active_support/configurable" require "active_support/core_ext/numeric/time" +require "sidekiq" require "sidekiq/grouping/version" require "concurrent" diff --git a/lib/sidekiq/grouping/config.rb b/lib/sidekiq/grouping/config.rb index 4a0aded..6bde5b6 100644 --- a/lib/sidekiq/grouping/config.rb +++ b/lib/sidekiq/grouping/config.rb @@ -6,10 +6,12 @@ module Config include ActiveSupport::Configurable def self.options - if Sidekiq.respond_to?(:[]) - Sidekiq[:grouping] || Sidekiq["grouping"] || {} - else + if Sidekiq.respond_to?(:[]) # Sidekiq 6.x + Sidekiq[:grouping] || {} + elsif Sidekiq.respond_to?(:options) # Sidekiq <= 5.x Sidekiq.options[:grouping] || Sidekiq.options["grouping"] || {} + else # Sidekiq 7.x + Sidekiq.default_configuration[:grouping] || {} end end diff --git a/lib/sidekiq/grouping/redis.rb b/lib/sidekiq/grouping/redis.rb index f79041d..5ace9d6 100644 --- a/lib/sidekiq/grouping/redis.rb +++ b/lib/sidekiq/grouping/redis.rb @@ -1,8 +1,12 @@ # frozen_string_literal: true +require_relative "./redis_dispatcher" + module Sidekiq module Grouping class Redis + include RedisDispatcher + PLUCK_SCRIPT = <<-SCRIPT local pluck_values = redis.call('lpop', KEYS[1], ARGV[1]) or {} if #pluck_values > 0 then @@ -15,13 +19,12 @@ def push_msg(name, msg, remember_unique: false) redis do |conn| conn.multi do |pipeline| sadd = pipeline.respond_to?(:sadd?) ? :sadd? : :sadd - pipeline.public_send(sadd, ns("batches"), name) - pipeline.rpush(ns(name), msg) + redis_connection_call(pipeline, sadd, ns("batches"), name) + redis_connection_call(pipeline, :rpush, ns(name), msg) + if remember_unique - pipeline.public_send( - sadd, - unique_messages_key(name), - msg + redis_connection_call( + pipeline, sadd, unique_messages_key(name), msg ) end end @@ -29,47 +32,62 @@ def push_msg(name, msg, remember_unique: false) end def enqueued?(name, msg) - redis do |conn| - conn.sismember(unique_messages_key(name), msg) - end + member = redis_call(:sismember, unique_messages_key(name), msg) + return member if member.is_a?(TrueClass) || member.is_a?(FalseClass) + + member != 0 end def batch_size(name) - redis { |conn| conn.llen(ns(name)) } + redis_call(:llen, ns(name)) end def batches - redis { |conn| conn.smembers(ns("batches")) } + redis_call(:smembers, ns("batches")) end def pluck(name, limit) - keys = [ns(name), unique_messages_key(name)] - args = [limit] - redis { |conn| conn.eval PLUCK_SCRIPT, keys, args } + if new_redis_client? + redis_call( + :eval, + PLUCK_SCRIPT, + 2, + ns(name), + unique_messages_key(name), + limit + ) + else + keys = [ns(name), unique_messages_key(name)] + args = [limit] + redis_call(:eval, PLUCK_SCRIPT, keys, args) + end end def get_last_execution_time(name) - redis { |conn| conn.get(ns("last_execution_time:#{name}")) } + redis_call(:get, ns("last_execution_time:#{name}")) end def set_last_execution_time(name, time) - redis do |conn| - conn.set(ns("last_execution_time:#{name}"), time.to_json) - end + redis_call( + :set, ns("last_execution_time:#{name}"), time.to_json + ) end def lock(name) - redis do |conn| - id = ns("lock:#{name}") - conn.set(id, true, nx: true, ex: Sidekiq::Grouping::Config.lock_ttl) - end + redis_call( + :set, + ns("lock:#{name}"), + "true", + nx: true, + ex: Sidekiq::Grouping::Config.lock_ttl + ) end def delete(name) redis do |conn| - conn.del(ns("last_execution_time:#{name}")) - conn.del(ns(name)) - conn.srem(ns("batches"), name) + redis_connection_call(conn, :del, ns("last_execution_time:#{name}")) + redis_connection_call(conn, :del, ns(name)) + redis_connection_call(conn, :srem, ns("batches"), name) end end @@ -82,10 +100,6 @@ def unique_messages_key(name) def ns(key = nil) "batching:#{key}" end - - def redis(&block) - Sidekiq.redis(&block) - end end end end diff --git a/lib/sidekiq/grouping/redis_dispatcher.rb b/lib/sidekiq/grouping/redis_dispatcher.rb new file mode 100644 index 0000000..bdf2d25 --- /dev/null +++ b/lib/sidekiq/grouping/redis_dispatcher.rb @@ -0,0 +1,29 @@ +# frozen_string_literal: true + +module Sidekiq + module Grouping + module RedisDispatcher + def redis_call(command, *args, **kwargs) + redis do |connection| + redis_connection_call(connection, command, *args, **kwargs) + end + end + + def redis_connection_call(connection, command, *args, **kwargs) + if new_redis_client? # redis-client + connection.call(command.to_s.upcase, *args, **kwargs) + else # redis + connection.public_send(command, *args, **kwargs) + end + end + + def new_redis_client? + Sidekiq::VERSION[0].to_i >= 7 + end + + def redis(&block) + Sidekiq.redis(&block) + end + end + end +end diff --git a/lib/sidekiq/grouping/version.rb b/lib/sidekiq/grouping/version.rb index f5ca797..8006e30 100644 --- a/lib/sidekiq/grouping/version.rb +++ b/lib/sidekiq/grouping/version.rb @@ -2,6 +2,6 @@ module Sidekiq module Grouping - VERSION = "1.2.0" + VERSION = "1.3.0" end end diff --git a/sidekiq-grouping.gemspec b/sidekiq-grouping.gemspec index 8abf373..69215e0 100644 --- a/sidekiq-grouping.gemspec +++ b/sidekiq-grouping.gemspec @@ -23,6 +23,7 @@ Gem::Specification.new do |spec| spec.add_development_dependency "appraisal" spec.add_development_dependency "bundler", "> 1.5" + spec.add_development_dependency "pry" spec.add_development_dependency "rake" spec.add_development_dependency "rspec" spec.add_development_dependency "rspec-sidekiq" @@ -33,6 +34,6 @@ Gem::Specification.new do |spec| spec.add_dependency "activesupport" spec.add_dependency "concurrent-ruby" - spec.add_dependency "sidekiq", ">= 3.4.2", "< 7" + spec.add_dependency "sidekiq", ">= 3.4.2" spec.metadata["rubygems_mfa_required"] = "true" end diff --git a/spec/modules/redis_spec.rb b/spec/modules/redis_spec.rb index 9a4905b..2a121e4 100644 --- a/spec/modules/redis_spec.rb +++ b/spec/modules/redis_spec.rb @@ -3,6 +3,8 @@ require "spec_helper" describe Sidekiq::Grouping::Redis do + include Sidekiq::Grouping::RedisDispatcher + subject(:redis_service) { described_class.new } let(:queue_name) { "my_queue" } @@ -12,14 +14,14 @@ describe "#push_msg" do it "adds message to queue", :aggregate_failures do redis_service.push_msg(queue_name, "My message") - expect(redis { |c| c.llen key }).to eq 1 - expect(redis { |c| c.lrange key, 0, 1 }).to eq ["My message"] - expect(redis { |c| c.smembers unique_key }).to eq [] + expect(redis_call(:llen, key)).to eq 1 + expect(redis_call(:lrange, key, 0, 1)).to eq ["My message"] + expect(redis_call(:smembers, unique_key)).to eq [] end it "remembers unique message if specified" do redis_service.push_msg(queue_name, "My message", remember_unique: true) - expect(redis { |c| c.smembers unique_key }).to eq ["My message"] + expect(redis_call(:smembers, unique_key)).to eq ["My message"] end end @@ -28,21 +30,15 @@ redis_service.push_msg(queue_name, "Message 1") redis_service.push_msg(queue_name, "Message 2") redis_service.pluck(queue_name, 2) - expect(redis { |c| c.llen key }).to eq 0 + expect(redis_call(:llen, key)).to eq 0 end it "forgets unique messages", :aggregate_failures do redis_service.push_msg(queue_name, "Message 1", remember_unique: true) redis_service.push_msg(queue_name, "Message 2", remember_unique: true) - expect(redis { |c| c.scard unique_key }).to eq 2 + expect(redis_call(:scard, unique_key)).to eq 2 redis_service.pluck(queue_name, 2) - expect(redis { |c| c.smembers unique_key }).to eq [] + expect(redis_call(:smembers, unique_key)).to eq [] end end - - private - - def redis(&block) - Sidekiq.redis(&block) - end end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 6a55ca6..21339bc 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -9,6 +9,7 @@ require "sidekiq" require "rspec-sidekiq" require "support/test_workers" +require "pry" SimpleCov.start do add_filter "spec" @@ -17,8 +18,10 @@ require "sidekiq/grouping" Sidekiq::Grouping.logger = nil -Sidekiq.redis = { db: ENV.fetch("db", 1) } -Sidekiq.logger = nil +Sidekiq.configure_client do |config| + config.redis = { db: 1 } + config.logger = nil +end RSpec::Sidekiq.configure do |config| config.clear_all_enqueued_jobs = true @@ -32,8 +35,13 @@ config.before do Sidekiq.redis do |conn| - keys = conn.keys "*batching*" - keys.each { |key| conn.del key } + if Sidekiq::VERSION[0].to_i >= 7 + keys = conn.call("KEYS", "*batching*") + keys.each { |key| conn.call("DEL", key) } + else + keys = conn.keys "*batching*" + keys.each { |key| conn.del key } + end end end