Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid redundant metrics collection #189

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions judoscale-sidekiq/lib/judoscale/sidekiq/metrics_collector.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,17 @@
module Judoscale
module Sidekiq
class MetricsCollector < Judoscale::JobMetricsCollector
RECENT = 1 # second
RECENT_KEY = "judoscale:sidekiq:recent"
RECENT_VALUE = "1"

def self.adapter_config
Judoscale::Config.instance.sidekiq
end

def collect
return [] if collected_recently?

metrics = []
queues_by_name = ::Sidekiq::Queue.all.each_with_object({}) do |queue, obj|
obj[queue.name] = queue
Expand Down Expand Up @@ -45,6 +51,18 @@ def collect
log_collection(metrics)
metrics
end

def forget_recent_collection!
# We need this for testing
::Sidekiq.redis { |r| r.del RECENT_KEY }
end

private

def collected_recently?
# If another process has collected metrics recently, we don't need to.
!::Sidekiq.redis { |r| r.set RECENT_KEY, RECENT_VALUE, nx: true, ex: RECENT }
end
end
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# frozen_string_literal: true

require "test_helper"
require "minitest/benchmark"
require "judoscale/sidekiq/metrics_collector"

class CollectWithManyReportersBenchmark < Minitest::Benchmark
BATCH_SIZE = 1_000
QUEUES = %w[one two three four five six seven eight nine ten]

# performance assertions will iterate over `bench_range`.
# The values here don't matter—we just want several iterations through the benchmark.
def self.bench_range
(0..4).to_a
end

def setup
# Enqueue jobs on several queues
sidekiq_args = BATCH_SIZE.times.map { [] }
QUEUES.each do |queue|
Sidekiq::Client.push_bulk "class" => "Foo", "args" => sidekiq_args, "queue" => queue
end

# Prepare a collector for each benchmark iteration
@collectors = {}
self.class.bench_range.each do |n|
@collectors[n] = Judoscale::Sidekiq::MetricsCollector.new
end
end

def bench_collect
validation = proc do |_, times|
# The first collector should take the longest, since the rest will be
# no-ops after checking for `collected_recently?`.
first, *rest = times
assert_operator first, :>, rest.max
end

assert_performance validation do |n|
@collectors.fetch(n).collect
end
end
end
23 changes: 20 additions & 3 deletions judoscale-sidekiq/test/metrics_collector_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,8 @@ module Judoscale
subject { Sidekiq::MetricsCollector.new }

describe "#collect" do
after {
subject.clear_queues
}
before { ::Sidekiq.redis { |r| r.flushdb } }
after { subject.clear_queues }

it "collects latency for each queue" do
queues = [
Expand All @@ -39,6 +38,22 @@ module Judoscale
_(metrics[3].identifier).must_equal :qd
end

it "avoids redundant collections" do
queues = [SidekiqQueueStub.new(name: "default", latency: 11, size: 1)]

metrics = ::Sidekiq::Queue.stub(:all, queues) {
subject.collect
}

_(metrics.size).must_equal 2

metrics = ::Sidekiq::Queue.stub(:all, queues) {
subject.collect
}

_(metrics.size).must_equal 0
end

it "always collects for known queues" do
queues = []

Expand All @@ -51,6 +66,7 @@ module Judoscale
queues = [SidekiqQueueStub.new(name: "default", latency: 11, size: 1)]

metrics = ::Sidekiq::Queue.stub(:all, queues) {
subject.forget_recent_collection!
subject.collect
}

Expand All @@ -63,6 +79,7 @@ module Judoscale

metrics = ::Sidekiq::Queue.stub(:all, queues) {
::Sidekiq::Queue.stub(:new, ->(queue_name) { new_queues.fetch(queue_name) }) {
subject.forget_recent_collection!
subject.collect
}
}
Expand Down
Loading