diff --git a/README.md b/README.md index 0e3efe4..a870995 100644 --- a/README.md +++ b/README.md @@ -75,12 +75,21 @@ a suite file defines a series of tests to run. #### suite file format ```ruby +# format description: # each test can be executed by either target duration using :time => N secs # or by number of events with :events => N # +# you can specify the number of filter worker threads for each test with :workers => N +# if you don't specify workers, it defaults to 1 +# +# Note: Logstash 2.0 has new default worker thread behavior: It defaults to half the number +# of cpus for the number of worker threads. So a 4 CPU machine would get 2 threads by default. +# This is different than logstash 1.x where the default was always 1 worker thread. Lsperfm +# will *always* default to 1 worker regardless of logstash version. +# #[ # {:name => "simple json out", :config => "config/simple_json_out.conf", :input => "input/simple_10.txt", :time => 30}, -# {:name => "simple json out", :config => "config/simple_json_out.conf", :input => "input/simple_10.txt", :events => 50000}, +# {:workers => 2, :name => "simple json out", :config => "config/simple_json_out.conf", :input => "input/simple_10.txt", :events => 50000}, #] # [ @@ -88,7 +97,7 @@ a suite file defines a series of tests to run. {:name => "simple line out", :config => "config/simple.conf", :input => "input/simple_10.txt", :time => 60}, {:name => "json codec", :config => "config/json_inout_codec.conf", :input => "input/json_medium.txt", :time => 60}, {:name => "json filter", :config => "config/json_inout_filter.conf", :input => "input/json_medium.txt", :time => 60}, - {:name => "complex syslog", :config => "config/complex_syslog.conf", :input => "input/syslog_acl_10.txt", :time => 60}, + {:workers => 2, :name => "complex syslog", :config => "config/complex_syslog.conf", :input => "input/syslog_acl_10.txt", :time => 60}, ] ``` diff --git a/examples/suite/basic_performance_long.rb b/examples/suite/basic_performance_long.rb index 651f4ca..3d555a6 100644 --- a/examples/suite/basic_performance_long.rb +++ b/examples/suite/basic_performance_long.rb @@ -2,17 +2,21 @@ # each test can be executed by either target duration using :time => N secs # or by number of events with :events => N # +# you can specify the number of filter worker threads for each test with :workers => N +# if you don't specify workers, it defaults to 1 +# #[ # {:name => "simple json out", :config => "config/simple_json_out.conf", :input => "input/simple_10.txt", :time => 30}, -# {:name => "simple json out", :config => "config/simple_json_out.conf", :input => "input/simple_10.txt", :events => 50000}, +# {:workers => 2, :name => "simple json out", :config => "config/simple_json_out.conf", :input => "input/simple_10.txt", :events => 50000}, #] # +# We use 2 workers here as an example, since most machines can handle that, but machines with more CPUs can handle more threads. [ {:name => "simple line in/out", :config => "config/simple.conf", :input => "input/simple_10.txt", :time => 120}, - {:name => "simple line in/json out", :config => "config/simple_json_out.conf", :input => "input/simple_10.txt", :time => 120}, + {:workers => 2, :name => "simple line in/json out", :config => "config/simple_json_out.conf", :input => "input/simple_10.txt", :time => 120}, {:name => "json codec in/out", :config => "config/json_inout_codec.conf", :input => "input/json_medium.txt", :time => 120}, - {:name => "line in/json filter/json out", :config => "config/json_inout_filter.conf", :input => "input/json_medium.txt", :time => 120}, + {:workers => 2, :name => "line in/json filter/json out", :config => "config/json_inout_filter.conf", :input => "input/json_medium.txt", :time => 120}, {:name => "apache in/json out", :config => "config/simple.conf", :input => "input/apache_log.txt", :time => 120}, - {:name => "apache in/grok codec/json out", :config => "config/simple_grok.conf", :input => "input/apache_log.txt", :time => 120}, + {:workers => 2, :name => "apache in/grok codec/json out", :config => "config/simple_grok.conf", :input => "input/apache_log.txt", :time => 120}, {:name => "syslog in/json out", :config => "config/complex_syslog.conf", :input => "input/syslog_acl_10.txt", :time => 120}, ] diff --git a/examples/suite/basic_performance_quick.rb b/examples/suite/basic_performance_quick.rb index afaf948..ccdde8b 100644 --- a/examples/suite/basic_performance_quick.rb +++ b/examples/suite/basic_performance_quick.rb @@ -2,17 +2,21 @@ # each test can be executed by either target duration using :time => N secs # or by number of events with :events => N # +# you can specify the number of filter worker threads for each test with :workers => N +# if you don't specify workers, it defaults to 1 +# #[ # {:name => "simple json out", :config => "config/simple_json_out.conf", :input => "input/simple_10.txt", :time => 30}, -# {:name => "simple json out", :config => "config/simple_json_out.conf", :input => "input/simple_10.txt", :events => 50000}, +# {:workers => 2, :name => "simple json out", :config => "config/simple_json_out.conf", :input => "input/simple_10.txt", :events => 50000}, #] # +# We use 2 workers here as an example since most machines can handle that, but machines with more CPUs can handle more threads. [ {:name => "simple line in/out", :config => "config/simple.conf", :input => "input/simple_10.txt", :time => 30}, - {:name => "simple line in/json out", :config => "config/simple_json_out.conf", :input => "input/simple_10.txt", :time => 30}, + {:workers => 2, :name => "simple line in/json out", :config => "config/simple_json_out.conf", :input => "input/simple_10.txt", :time => 30}, {:name => "json codec in/out", :config => "config/json_inout_codec.conf", :input => "input/json_medium.txt", :time => 30}, - {:name => "line in/json filter/json out", :config => "config/json_inout_filter.conf", :input => "input/json_medium.txt", :time => 30}, + {:workers => 2, :name => "line in/json filter/json out", :config => "config/json_inout_filter.conf", :input => "input/json_medium.txt", :time => 30}, {:name => "apache in/json out", :config => "config/simple.conf", :input => "input/apache_log.txt", :time => 30}, - {:name => "apache in/grok codec/json out", :config => "config/simple_grok.conf", :input => "input/apache_log.txt", :time => 30}, + {:workers => 2, :name => "apache in/grok codec/json out", :config => "config/simple_grok.conf", :input => "input/apache_log.txt", :time => 30}, {:name => "syslog in/json out", :config => "config/complex_syslog.conf", :input => "input/syslog_acl_10.txt", :time => 30}, ] diff --git a/lib/lsperfm/core.rb b/lib/lsperfm/core.rb index 75985b2..563154d 100644 --- a/lib/lsperfm/core.rb +++ b/lib/lsperfm/core.rb @@ -23,8 +23,9 @@ def run(debug=false, headers=false) tests.each do |test| events = test[:events].to_i time = test[:time].to_i + workers = test[:workers] ? test[:workers] : 1 - manager = runner.new(find_test_config(test[:config]), debug, install_path) + manager = runner.new(find_test_config(test[:config]), workers, debug, install_path) metrics = manager.run(events, time, runner.read_input_file(find_test_input(test[:input]))) lines << formatter(test[:name], metrics) end diff --git a/lib/lsperfm/core/run.rb b/lib/lsperfm/core/run.rb index e000088..db65a4e 100644 --- a/lib/lsperfm/core/run.rb +++ b/lib/lsperfm/core/run.rb @@ -18,9 +18,9 @@ class Runner attr_reader :command - def initialize(config, debug = false, logstash_home = Dir.pwd) + def initialize(config, workers = 1, debug = false, logstash_home = Dir.pwd) @debug = debug - @command = [File.join(logstash_home, LOGSTASH_BIN), "-f", config] + @command = [File.join(logstash_home, LOGSTASH_BIN), "-f", config, "-w", workers.to_s] end def run(required_events_count, required_run_time, input_lines) diff --git a/spec/fixtures/worker_basic_suite.rb b/spec/fixtures/worker_basic_suite.rb new file mode 100644 index 0000000..67502d3 --- /dev/null +++ b/spec/fixtures/worker_basic_suite.rb @@ -0,0 +1,4 @@ +[ + {:workers => 2, :name => "simple 1", :config => "simple.conf", :input => "simple_10.txt", :time => 5}, + {:workers => 2, :name => "simple 2", :config => "simple.conf", :input => "simple_10.txt", :time => 10}, +] diff --git a/spec/lib/runner_spec.rb b/spec/lib/runner_spec.rb index 5000d4a..6aae3e0 100644 --- a/spec/lib/runner_spec.rb +++ b/spec/lib/runner_spec.rb @@ -10,7 +10,7 @@ subject (:runner) { LogStash::PerformanceMeter::Runner.new(config) } - let(:command) { [File.join(Dir.pwd, LogStash::PerformanceMeter::Runner::LOGSTASH_BIN), "-f", "spec/fixtures/simple.conf"]} + let(:command) { [File.join(Dir.pwd, LogStash::PerformanceMeter::Runner::LOGSTASH_BIN), "-f", "spec/fixtures/simple.conf", "-w", "1"]} it "invokes the logstash command" do Open3.should_receive(:popen3).with(*command).and_return(true) diff --git a/spec/lib/suite_spec.rb b/spec/lib/suite_spec.rb index 5f9c781..5e5974c 100644 --- a/spec/lib/suite_spec.rb +++ b/spec/lib/suite_spec.rb @@ -19,7 +19,7 @@ context "using a file" do it "run each test case in a serial maner" do - expect(runner).to receive(:new).with("spec/fixtures/simple.conf", false, logstash_home).twice { serial_runner } + expect(runner).to receive(:new).with("spec/fixtures/simple.conf", 1, false, logstash_home).twice { serial_runner } manager.run end @@ -31,7 +31,7 @@ it "run each test case as expected" do expect(runner).to receive(:read_input_file).with('simple_10.txt').twice { [] } - expect(runner).to receive(:new).with("simple.conf", false, logstash_home).twice { serial_runner } + expect(runner).to receive(:new).with("simple.conf", 1, false, logstash_home).twice { serial_runner } manager.run end @@ -43,7 +43,7 @@ let(:config) { 'spec/fixtures/wrong_config.yml' } it "run each test case as expected" do - expect(runner).to receive(:new).with("spec/wrong_fixture/simple.conf", false, logstash_home).once { serial_runner } + expect(runner).to receive(:new).with("spec/wrong_fixture/simple.conf", 1, false, logstash_home).once { serial_runner } expect { manager.run }.to raise_error(LogStash::PerformanceMeter::ConfigException) end end diff --git a/spec/lib/worker_suite_spec.rb b/spec/lib/worker_suite_spec.rb new file mode 100644 index 0000000..8aeca9c --- /dev/null +++ b/spec/lib/worker_suite_spec.rb @@ -0,0 +1,41 @@ +require 'spec_helper' + +describe LogStash::PerformanceMeter::Core do + + let(:config) { 'spec/fixtures/config.yml' } + let(:logstash_home) { '.' } + let(:suite_def) { 'spec/fixtures/worker_basic_suite.rb' } + let(:serial_runner) { double('DummySerialRunner') } + let(:runner) { LogStash::PerformanceMeter::Runner } + let(:workers) { 2 } + + let(:run_outcome) { { :percentile => [2000] , :elapsed => 100, :events_count => 3000, :start_time => 12 } } + subject(:manager) { LogStash::PerformanceMeter::Core.new(suite_def, logstash_home, config, runner) } + + context "with a valid configuration and worker threads set to 2" do + before(:each) do + expect(serial_runner).to receive(:run).with(0, 5, anything()).ordered { run_outcome } + expect(serial_runner).to receive(:run).with(0, 10, anything()).ordered { run_outcome } + end + context "using a file" do + + it "run each test case in a serial maner" do + expect(runner).to receive(:new).with("spec/fixtures/simple.conf", workers, false, logstash_home).twice { serial_runner } + manager.run + end + + end + + context "without a file" do + + let(:config) { '' } + + it "run each test case as expected" do + expect(runner).to receive(:read_input_file).with('simple_10.txt').twice { [] } + expect(runner).to receive(:new).with("simple.conf", workers, false, logstash_home).twice { serial_runner } + manager.run + end + + end + end +end