Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pluralize the :topic initialization option #17

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,14 @@ consumer = Rafka::Consumer.new(
)
```

Consumers can subscribe to multiple topics:

```ruby
consumer = Rafka::Consumer.new(topic: "topic_1")
consumer = Rafka::Consumer.new(topic: ["topic_1", "topic_2"])
consumer = Rafka::Consumer.new(topic: "topic_1,topic_2")
```

Refer to the [Consumer API documentation](http://www.rubydoc.info/github/skroutz/rafka-rb/Rafka/Consumer)
for more information.

Expand Down
12 changes: 9 additions & 3 deletions lib/rafka/consumer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
require "securerandom"

module Rafka
# A Rafka-backed Kafka consumer that consumes messages from a specific topic
# A Rafka-backed Kafka consumer that consumes messages from specific topics
# and belongs to a specific consumer group. Offsets may be committed
# automatically or manually.
#
Expand All @@ -23,7 +23,7 @@ class Consumer
# @param [Hash] opts
# @option opts [String] :host ("localhost") server hostname
# @option opts [Fixnum] :port (6380) server port
# @option opts [String] :topic Kafka topic to consume (required)
# @option opts [Array<String>|String] :topic Kafka topics to consume (required)
# @option opts [String] :group Kafka consumer group name (required)
# @option opts [String] :id (random) Kafka consumer id
# @option opts [Boolean] :auto_commit (true) automatically commit
Expand Down Expand Up @@ -199,11 +199,17 @@ def commit(*msgs)
# @return [Array<Hash, Hash>] rafka opts, redis opts
def parse_opts(opts)
REQUIRED_OPTS.each do |opt|
raise "#{opt.inspect} option not provided" if opts[opt].nil?
raise "#{opt.inspect} option not provided" if opts[opt].nil? || opts[opt].empty?
end

rafka_opts = opts.reject { |k| k == :redis || k == :librdkafka }

# Support array of strings but make sure you turn this into a
# comma-separated string of topics if that's the case.
if rafka_opts[:topic].is_a? Array
rafka_opts[:topic] = rafka_opts[:topic].join(",")
end

redis_opts = REDIS_DEFAULTS.dup.merge(opts[:redis] || {})
redis_opts.merge!(
rafka_opts.select { |k| [:host, :port, :id].include?(k) }
Expand Down
5 changes: 5 additions & 0 deletions test/consumer_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -70,4 +70,9 @@ def test_blpop_arg
cons = Rafka::Consumer.new(group: "foo", topic: "bar")
assert_equal cons.blpop_arg, "topics:bar"
end

def test_multiple_topics
cons = Rafka::Consumer.new(group: "foo", topic: %w[foo bar])
assert_equal cons.blpop_arg, "topics:foo,bar"
end
end