From 78ae4288dd507c691d235ab7fe76a6872240ce83 Mon Sep 17 00:00:00 2001 From: John Fragoulis Date: Thu, 18 Apr 2019 14:01:43 +0300 Subject: [PATCH 1/4] Pluralize the :topic initialization option By default, kafka and kafka-go support consumers subscribing to multiple topics, by passing an array of strings. Rafka also respects this (see server.go#parseTopicsAndConfig). Rafka-rb simply needs to change the wording in order to respect the plural and in effect show the api user that consuming from multiple topics is supported. The commit takes the extra step of supporting an array of strings as well as a single string, whether it contains a single topic or multiple comma-separated topics. Closes #5 --- README.md | 14 +++++++++++--- lib/rafka/consumer.rb | 16 +++++++++++----- test/consumer_test.rb | 12 ++++++------ 3 files changed, 28 insertions(+), 14 deletions(-) diff --git a/README.md b/README.md index fbdbdf5..4de4e49 100644 --- a/README.md +++ b/README.md @@ -76,7 +76,7 @@ for more information. ### Consumer ```ruby -consumer = Rafka::Consumer.new(topic: "greetings", group: "myapp") +consumer = Rafka::Consumer.new(topics: "greetings", group: "myapp") msg = consumer.consume msg.value # => "Hello there!" @@ -88,7 +88,7 @@ Offsets are managed automatically by default. If you need more control you can turn off the feature and manually commit offsets: ```ruby -consumer = Rafka::Consumer.new(topic: "greetings", group: "myapp", auto_offset_commit: false) +consumer = Rafka::Consumer.new(topics: "greetings", group: "myapp", auto_offset_commit: false) # commit a single offset msg = consumer.consume @@ -104,10 +104,18 @@ Consumers may also set their own custom [librdkafka configuration](https://githu ```ruby consumer = Rafka::Consumer.new( - topic: "greetings", group: "myapp", librdkafka: { "auto.offset.reset" => "earliest" } + topics: "greetings", group: "myapp", librdkafka: { "auto.offset.reset" => "earliest" } ) ``` +Consumers can subscribe to multiple topics: + +```ruby +consumer = Rafka::Consumer.new(topics: "topic_1") +consumer = Rafka::Consumer.new(topics: ["topic_1", "topic_2"]) +consumer = Rafka::Consumer.new(topics: "topic_1,topic_2") +``` + Refer to the [Consumer API documentation](http://www.rubydoc.info/github/skroutz/rafka-rb/Rafka/Consumer) for more information. diff --git a/lib/rafka/consumer.rb b/lib/rafka/consumer.rb index 5179a2d..79a8f48 100644 --- a/lib/rafka/consumer.rb +++ b/lib/rafka/consumer.rb @@ -2,7 +2,7 @@ require "securerandom" module Rafka - # A Rafka-backed Kafka consumer that consumes messages from a specific topic + # A Rafka-backed Kafka consumer that consumes messages from specific topics # and belongs to a specific consumer group. Offsets may be committed # automatically or manually. # @@ -10,7 +10,7 @@ module Rafka class Consumer include GenericCommands - REQUIRED_OPTS = [:group, :topic].freeze + REQUIRED_OPTS = [:group, :topics].freeze # @return [Redis::Client] the underlying Redis client instance attr_reader :redis @@ -23,7 +23,7 @@ class Consumer # @param [Hash] opts # @option opts [String] :host ("localhost") server hostname # @option opts [Fixnum] :port (6380) server port - # @option opts [String] :topic Kafka topic to consume (required) + # @option opts [Array|String] :topics Kafka topics to consume (required) # @option opts [String] :group Kafka consumer group name (required) # @option opts [String] :id (random) Kafka consumer id # @option opts [Boolean] :auto_commit (true) automatically commit @@ -47,7 +47,7 @@ def initialize(opts={}) @rafka_opts, @redis_opts = parse_opts(opts) @redis = Redis.new(@redis_opts) - @blpop_arg = "topics:#{@rafka_opts[:topic]}" + @blpop_arg = "topics:#{@rafka_opts[:topics]}" @blpop_arg << ":#{opts[:librdkafka].to_json}" if !opts[:librdkafka].empty? end @@ -199,11 +199,17 @@ def commit(*msgs) # @return [Array] rafka opts, redis opts def parse_opts(opts) REQUIRED_OPTS.each do |opt| - raise "#{opt.inspect} option not provided" if opts[opt].nil? + raise "#{opt.inspect} option not provided" if opts[opt].nil? || opts[opt].empty? end rafka_opts = opts.reject { |k| k == :redis || k == :librdkafka } + # Support array of strings but make sure you turn this into a + # comma-separated string of topics if that's the case. + if rafka_opts[:topics].is_a? Array + rafka_opts[:topics] = rafka_opts[:topics].join(",") + end + redis_opts = REDIS_DEFAULTS.dup.merge(opts[:redis] || {}) redis_opts.merge!( rafka_opts.select { |k| [:host, :port, :id].include?(k) } diff --git a/test/consumer_test.rb b/test/consumer_test.rb index a952ba3..07a69a5 100644 --- a/test/consumer_test.rb +++ b/test/consumer_test.rb @@ -3,7 +3,7 @@ class ConsumerTest < Minitest::Test def test_prepare_for_commit - consumer = Rafka::Consumer.new(group: "foo", topic: "bar") + consumer = Rafka::Consumer.new(group: "foo", topics: "bar") msgs = [ ["topic", "foo", "partition", 0, "offset", 1, "value", "a"], @@ -44,7 +44,7 @@ def test_prepare_for_commit end def test_consume_block_no_message - cons = Rafka::Consumer.new(group: "foo", topic: "bar") + cons = Rafka::Consumer.new(group: "foo", topics: "bar") def cons.consume_one(_) nil @@ -57,17 +57,17 @@ def cons.consume_one(_) def test_blpop_arg cons = Rafka::Consumer.new( - group: "foo", topic: "bar", librdkafka: { test1: 2, test2: "a", "foo.bar" => true } + group: "foo", topics: "bar", librdkafka: { test1: 2, test2: "a", "foo.bar" => true } ) assert_equal cons.blpop_arg, 'topics:bar:{"test1":2,"test2":"a","foo.bar":true}' - cons = Rafka::Consumer.new(group: "foo", topic: "bar", librdkafka: {}) + cons = Rafka::Consumer.new(group: "foo", topics: "bar", librdkafka: {}) assert_equal cons.blpop_arg, "topics:bar" - cons = Rafka::Consumer.new(group: "foo", topic: "bar", librdkafka: nil) + cons = Rafka::Consumer.new(group: "foo", topics: "bar", librdkafka: nil) assert_equal cons.blpop_arg, "topics:bar" - cons = Rafka::Consumer.new(group: "foo", topic: "bar") + cons = Rafka::Consumer.new(group: "foo", topics: "bar") assert_equal cons.blpop_arg, "topics:bar" end end From c2c04d8739dd7769f05bb4b64ad3d18a7c174bf3 Mon Sep 17 00:00:00 2001 From: John Fragoulis Date: Thu, 18 Apr 2019 14:50:31 +0300 Subject: [PATCH 2/4] fixup! Pluralize the :topic initialization option --- lib/rafka/consumer.rb | 10 +++++----- test/consumer_test.rb | 17 +++++++++++------ 2 files changed, 16 insertions(+), 11 deletions(-) diff --git a/lib/rafka/consumer.rb b/lib/rafka/consumer.rb index 79a8f48..a9e432b 100644 --- a/lib/rafka/consumer.rb +++ b/lib/rafka/consumer.rb @@ -10,7 +10,7 @@ module Rafka class Consumer include GenericCommands - REQUIRED_OPTS = [:group, :topics].freeze + REQUIRED_OPTS = [:group, :topic].freeze # @return [Redis::Client] the underlying Redis client instance attr_reader :redis @@ -23,7 +23,7 @@ class Consumer # @param [Hash] opts # @option opts [String] :host ("localhost") server hostname # @option opts [Fixnum] :port (6380) server port - # @option opts [Array|String] :topics Kafka topics to consume (required) + # @option opts [Array|String] :topic Kafka topics to consume (required) # @option opts [String] :group Kafka consumer group name (required) # @option opts [String] :id (random) Kafka consumer id # @option opts [Boolean] :auto_commit (true) automatically commit @@ -47,7 +47,7 @@ def initialize(opts={}) @rafka_opts, @redis_opts = parse_opts(opts) @redis = Redis.new(@redis_opts) - @blpop_arg = "topics:#{@rafka_opts[:topics]}" + @blpop_arg = "topics:#{@rafka_opts[:topic]}" @blpop_arg << ":#{opts[:librdkafka].to_json}" if !opts[:librdkafka].empty? end @@ -206,8 +206,8 @@ def parse_opts(opts) # Support array of strings but make sure you turn this into a # comma-separated string of topics if that's the case. - if rafka_opts[:topics].is_a? Array - rafka_opts[:topics] = rafka_opts[:topics].join(",") + if rafka_opts[:topic].is_a? Array + rafka_opts[:topic] = rafka_opts[:topic].join(",") end redis_opts = REDIS_DEFAULTS.dup.merge(opts[:redis] || {}) diff --git a/test/consumer_test.rb b/test/consumer_test.rb index 07a69a5..11e8238 100644 --- a/test/consumer_test.rb +++ b/test/consumer_test.rb @@ -3,7 +3,7 @@ class ConsumerTest < Minitest::Test def test_prepare_for_commit - consumer = Rafka::Consumer.new(group: "foo", topics: "bar") + consumer = Rafka::Consumer.new(group: "foo", topic: "bar") msgs = [ ["topic", "foo", "partition", 0, "offset", 1, "value", "a"], @@ -44,7 +44,7 @@ def test_prepare_for_commit end def test_consume_block_no_message - cons = Rafka::Consumer.new(group: "foo", topics: "bar") + cons = Rafka::Consumer.new(group: "foo", topic: "bar") def cons.consume_one(_) nil @@ -57,17 +57,22 @@ def cons.consume_one(_) def test_blpop_arg cons = Rafka::Consumer.new( - group: "foo", topics: "bar", librdkafka: { test1: 2, test2: "a", "foo.bar" => true } + group: "foo", topic: "bar", librdkafka: { test1: 2, test2: "a", "foo.bar" => true } ) assert_equal cons.blpop_arg, 'topics:bar:{"test1":2,"test2":"a","foo.bar":true}' - cons = Rafka::Consumer.new(group: "foo", topics: "bar", librdkafka: {}) + cons = Rafka::Consumer.new(group: "foo", topic: "bar", librdkafka: {}) assert_equal cons.blpop_arg, "topics:bar" - cons = Rafka::Consumer.new(group: "foo", topics: "bar", librdkafka: nil) + cons = Rafka::Consumer.new(group: "foo", topic: "bar", librdkafka: nil) assert_equal cons.blpop_arg, "topics:bar" - cons = Rafka::Consumer.new(group: "foo", topics: "bar") + cons = Rafka::Consumer.new(group: "foo", topic: "bar") assert_equal cons.blpop_arg, "topics:bar" end + + def test_multiple_topics + cons = Rafka::Consumer.new(group: "foo", topic: %w[foo bar]) + assert_equal cons.blpop_arg, "topics:foo,bar" + end end From 3ed904e2e9351b317d3b190273931ec209f28586 Mon Sep 17 00:00:00 2001 From: John Fragoulis Date: Thu, 18 Apr 2019 14:51:16 +0300 Subject: [PATCH 3/4] fixup! Pluralize the :topic initialization option --- README.md | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index 4de4e49..5bad840 100644 --- a/README.md +++ b/README.md @@ -76,7 +76,7 @@ for more information. ### Consumer ```ruby -consumer = Rafka::Consumer.new(topics: "greetings", group: "myapp") +consumer = Rafka::Consumer.new(topic: "greetings", group: "myapp") msg = consumer.consume msg.value # => "Hello there!" @@ -88,7 +88,7 @@ Offsets are managed automatically by default. If you need more control you can turn off the feature and manually commit offsets: ```ruby -consumer = Rafka::Consumer.new(topics: "greetings", group: "myapp", auto_offset_commit: false) +consumer = Rafka::Consumer.new(topic: "greetings", group: "myapp", auto_offset_commit: false) # commit a single offset msg = consumer.consume @@ -111,9 +111,9 @@ consumer = Rafka::Consumer.new( Consumers can subscribe to multiple topics: ```ruby -consumer = Rafka::Consumer.new(topics: "topic_1") -consumer = Rafka::Consumer.new(topics: ["topic_1", "topic_2"]) -consumer = Rafka::Consumer.new(topics: "topic_1,topic_2") +consumer = Rafka::Consumer.new(topic: "topic_1") +consumer = Rafka::Consumer.new(topic: ["topic_1", "topic_2"]) +consumer = Rafka::Consumer.new(topic: "topic_1,topic_2") ``` Refer to the [Consumer API documentation](http://www.rubydoc.info/github/skroutz/rafka-rb/Rafka/Consumer) From c51626e52b162cbdd778b590a56a3d8459784447 Mon Sep 17 00:00:00 2001 From: John Fragoulis Date: Thu, 18 Apr 2019 14:51:48 +0300 Subject: [PATCH 4/4] fixup! Pluralize the :topic initialization option --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 5bad840..dfcbbe6 100644 --- a/README.md +++ b/README.md @@ -104,7 +104,7 @@ Consumers may also set their own custom [librdkafka configuration](https://githu ```ruby consumer = Rafka::Consumer.new( - topics: "greetings", group: "myapp", librdkafka: { "auto.offset.reset" => "earliest" } + topic: "greetings", group: "myapp", librdkafka: { "auto.offset.reset" => "earliest" } ) ```