Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added ability to specifiy filter worker threads #25

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,20 +75,29 @@ a suite file defines a series of tests to run.
#### suite file format

```ruby
# format description:
# each test can be executed by either target duration using :time => N secs
# or by number of events with :events => N
#
# you can specify the number of filter worker threads for each test with :workers => N
# if you don't specify workers, it defaults to 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should add a note here about version 2.0 improvement. Since 2.0 is released and GA, workers will be assigned to half the number of cpus, but for the case of benchmarks I like the default 1 as it makes things easier to understand. What do you think?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't realize 2.0 changed that behavior. Honestly, I would expect not specifying workers to do whatever logstash does by default when using it directly. So for 1.x that was just defaulting to 1 thread, but if 2.0 defaults to half the amount of cpu cores, I would expect that to happen when running lsperfm.

But that's just me, if you prefer having it always default to one, then I'm cool with that. If so, then yeah, we should note the behavior in a comment.

If we do decide to just have it default to whatever logstash does, then I guess the way to go about it is just construct the command send to popen to not have the -w if workers is never specified in the test.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like more the idea of letting the tool user decide how many workers does he wants, then the expectations are more clear and should be less misunderstanding prone.

#
# Note: Logstash 2.0 has new default worker thread behavior: It defaults to half the number
# of cpus for the number of worker threads. So a 4 CPU machine would get 2 threads by default.
# This is different than logstash 1.x where the default was always 1 worker thread. Lsperfm
# will *always* default to 1 worker regardless of logstash version.
#
#[
# {:name => "simple json out", :config => "config/simple_json_out.conf", :input => "input/simple_10.txt", :time => 30},
# {:name => "simple json out", :config => "config/simple_json_out.conf", :input => "input/simple_10.txt", :events => 50000},
# {:workers => 2, :name => "simple json out", :config => "config/simple_json_out.conf", :input => "input/simple_10.txt", :events => 50000},
#]
#
[
{:name => "simple json out", :config => "config/simple_json_out.conf", :input => "input/simple_10.txt", :time => 60},
{:name => "simple line out", :config => "config/simple.conf", :input => "input/simple_10.txt", :time => 60},
{:name => "json codec", :config => "config/json_inout_codec.conf", :input => "input/json_medium.txt", :time => 60},
{:name => "json filter", :config => "config/json_inout_filter.conf", :input => "input/json_medium.txt", :time => 60},
{:name => "complex syslog", :config => "config/complex_syslog.conf", :input => "input/syslog_acl_10.txt", :time => 60},
{:workers => 2, :name => "complex syslog", :config => "config/complex_syslog.conf", :input => "input/syslog_acl_10.txt", :time => 60},
]
```

Expand Down
12 changes: 8 additions & 4 deletions examples/suite/basic_performance_long.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,21 @@
# each test can be executed by either target duration using :time => N secs
# or by number of events with :events => N
#
# you can specify the number of filter worker threads for each test with :workers => N
# if you don't specify workers, it defaults to 1
#
#[
# {:name => "simple json out", :config => "config/simple_json_out.conf", :input => "input/simple_10.txt", :time => 30},
# {:name => "simple json out", :config => "config/simple_json_out.conf", :input => "input/simple_10.txt", :events => 50000},
# {:workers => 2, :name => "simple json out", :config => "config/simple_json_out.conf", :input => "input/simple_10.txt", :events => 50000},
#]
#
# We use 2 workers here as an example, since most machines can handle that, but machines with more CPUs can handle more threads.
[
{:name => "simple line in/out", :config => "config/simple.conf", :input => "input/simple_10.txt", :time => 120},
{:name => "simple line in/json out", :config => "config/simple_json_out.conf", :input => "input/simple_10.txt", :time => 120},
{:workers => 2, :name => "simple line in/json out", :config => "config/simple_json_out.conf", :input => "input/simple_10.txt", :time => 120},
{:name => "json codec in/out", :config => "config/json_inout_codec.conf", :input => "input/json_medium.txt", :time => 120},
{:name => "line in/json filter/json out", :config => "config/json_inout_filter.conf", :input => "input/json_medium.txt", :time => 120},
{:workers => 2, :name => "line in/json filter/json out", :config => "config/json_inout_filter.conf", :input => "input/json_medium.txt", :time => 120},
{:name => "apache in/json out", :config => "config/simple.conf", :input => "input/apache_log.txt", :time => 120},
{:name => "apache in/grok codec/json out", :config => "config/simple_grok.conf", :input => "input/apache_log.txt", :time => 120},
{:workers => 2, :name => "apache in/grok codec/json out", :config => "config/simple_grok.conf", :input => "input/apache_log.txt", :time => 120},
{:name => "syslog in/json out", :config => "config/complex_syslog.conf", :input => "input/syslog_acl_10.txt", :time => 120},
]
12 changes: 8 additions & 4 deletions examples/suite/basic_performance_quick.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,21 @@
# each test can be executed by either target duration using :time => N secs
# or by number of events with :events => N
#
# you can specify the number of filter worker threads for each test with :workers => N
# if you don't specify workers, it defaults to 1
#
#[
# {:name => "simple json out", :config => "config/simple_json_out.conf", :input => "input/simple_10.txt", :time => 30},
# {:name => "simple json out", :config => "config/simple_json_out.conf", :input => "input/simple_10.txt", :events => 50000},
# {:workers => 2, :name => "simple json out", :config => "config/simple_json_out.conf", :input => "input/simple_10.txt", :events => 50000},
#]
#
# We use 2 workers here as an example since most machines can handle that, but machines with more CPUs can handle more threads.
[
{:name => "simple line in/out", :config => "config/simple.conf", :input => "input/simple_10.txt", :time => 30},
{:name => "simple line in/json out", :config => "config/simple_json_out.conf", :input => "input/simple_10.txt", :time => 30},
{:workers => 2, :name => "simple line in/json out", :config => "config/simple_json_out.conf", :input => "input/simple_10.txt", :time => 30},
{:name => "json codec in/out", :config => "config/json_inout_codec.conf", :input => "input/json_medium.txt", :time => 30},
{:name => "line in/json filter/json out", :config => "config/json_inout_filter.conf", :input => "input/json_medium.txt", :time => 30},
{:workers => 2, :name => "line in/json filter/json out", :config => "config/json_inout_filter.conf", :input => "input/json_medium.txt", :time => 30},
{:name => "apache in/json out", :config => "config/simple.conf", :input => "input/apache_log.txt", :time => 30},
{:name => "apache in/grok codec/json out", :config => "config/simple_grok.conf", :input => "input/apache_log.txt", :time => 30},
{:workers => 2, :name => "apache in/grok codec/json out", :config => "config/simple_grok.conf", :input => "input/apache_log.txt", :time => 30},
{:name => "syslog in/json out", :config => "config/complex_syslog.conf", :input => "input/syslog_acl_10.txt", :time => 30},
]
3 changes: 2 additions & 1 deletion lib/lsperfm/core.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ def run(debug=false, headers=false)
tests.each do |test|
events = test[:events].to_i
time = test[:time].to_i
workers = test[:workers] ? test[:workers] : 1

manager = runner.new(find_test_config(test[:config]), debug, install_path)
manager = runner.new(find_test_config(test[:config]), workers, debug, install_path)
metrics = manager.run(events, time, runner.read_input_file(find_test_input(test[:input])))
lines << formatter(test[:name], metrics)
end
Expand Down
4 changes: 2 additions & 2 deletions lib/lsperfm/core/run.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ class Runner

attr_reader :command

def initialize(config, debug = false, logstash_home = Dir.pwd)
def initialize(config, workers = 1, debug = false, logstash_home = Dir.pwd)
@debug = debug
@command = [File.join(logstash_home, LOGSTASH_BIN), "-f", config]
@command = [File.join(logstash_home, LOGSTASH_BIN), "-f", config, "-w", workers.to_s]
end

def run(required_events_count, required_run_time, input_lines)
Expand Down
4 changes: 4 additions & 0 deletions spec/fixtures/worker_basic_suite.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
[
{:workers => 2, :name => "simple 1", :config => "simple.conf", :input => "simple_10.txt", :time => 5},
{:workers => 2, :name => "simple 2", :config => "simple.conf", :input => "simple_10.txt", :time => 10},
]
2 changes: 1 addition & 1 deletion spec/lib/runner_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

subject (:runner) { LogStash::PerformanceMeter::Runner.new(config) }

let(:command) { [File.join(Dir.pwd, LogStash::PerformanceMeter::Runner::LOGSTASH_BIN), "-f", "spec/fixtures/simple.conf"]}
let(:command) { [File.join(Dir.pwd, LogStash::PerformanceMeter::Runner::LOGSTASH_BIN), "-f", "spec/fixtures/simple.conf", "-w", "1"]}

it "invokes the logstash command" do
Open3.should_receive(:popen3).with(*command).and_return(true)
Expand Down
6 changes: 3 additions & 3 deletions spec/lib/suite_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
context "using a file" do

it "run each test case in a serial maner" do
expect(runner).to receive(:new).with("spec/fixtures/simple.conf", false, logstash_home).twice { serial_runner }
expect(runner).to receive(:new).with("spec/fixtures/simple.conf", 1, false, logstash_home).twice { serial_runner }
manager.run
end

Expand All @@ -31,7 +31,7 @@

it "run each test case as expected" do
expect(runner).to receive(:read_input_file).with('simple_10.txt').twice { [] }
expect(runner).to receive(:new).with("simple.conf", false, logstash_home).twice { serial_runner }
expect(runner).to receive(:new).with("simple.conf", 1, false, logstash_home).twice { serial_runner }
manager.run
end

Expand All @@ -43,7 +43,7 @@
let(:config) { 'spec/fixtures/wrong_config.yml' }

it "run each test case as expected" do
expect(runner).to receive(:new).with("spec/wrong_fixture/simple.conf", false, logstash_home).once { serial_runner }
expect(runner).to receive(:new).with("spec/wrong_fixture/simple.conf", 1, false, logstash_home).once { serial_runner }
expect { manager.run }.to raise_error(LogStash::PerformanceMeter::ConfigException)
end
end
Expand Down
41 changes: 41 additions & 0 deletions spec/lib/worker_suite_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
require 'spec_helper'

describe LogStash::PerformanceMeter::Core do

let(:config) { 'spec/fixtures/config.yml' }
let(:logstash_home) { '.' }
let(:suite_def) { 'spec/fixtures/worker_basic_suite.rb' }
let(:serial_runner) { double('DummySerialRunner') }
let(:runner) { LogStash::PerformanceMeter::Runner }
let(:workers) { 2 }

let(:run_outcome) { { :percentile => [2000] , :elapsed => 100, :events_count => 3000, :start_time => 12 } }
subject(:manager) { LogStash::PerformanceMeter::Core.new(suite_def, logstash_home, config, runner) }

context "with a valid configuration and worker threads set to 2" do
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we make the number of workers a variable here? by using a fixture, number of workers used is more hidden, just want to make sure people understand.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's good sense. Sorry it wasn't like that before.

before(:each) do
expect(serial_runner).to receive(:run).with(0, 5, anything()).ordered { run_outcome }
expect(serial_runner).to receive(:run).with(0, 10, anything()).ordered { run_outcome }
end
context "using a file" do

it "run each test case in a serial maner" do
expect(runner).to receive(:new).with("spec/fixtures/simple.conf", workers, false, logstash_home).twice { serial_runner }
manager.run
end

end

context "without a file" do

let(:config) { '' }

it "run each test case as expected" do
expect(runner).to receive(:read_input_file).with('simple_10.txt').twice { [] }
expect(runner).to receive(:new).with("simple.conf", workers, false, logstash_home).twice { serial_runner }
manager.run
end

end
end
end