From 29556ded74259f3babd0369d94fadc6e240215a5 Mon Sep 17 00:00:00 2001 From: Sanchu Varkey Date: Fri, 18 Nov 2016 11:35:01 +0000 Subject: [PATCH 1/4] Added an option to use Kafka high level consumer. By default system uses the simple consumer and this can be overridden --- lib/kafka_ascoltatore.js | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/lib/kafka_ascoltatore.js b/lib/kafka_ascoltatore.js index 6f49633..4709a8d 100644 --- a/lib/kafka_ascoltatore.js +++ b/lib/kafka_ascoltatore.js @@ -157,6 +157,7 @@ KafkaAscoltatore.prototype._startConn = function(cb) { var noAckBatchOptions = that._opts.noAckBatchOptions; var groupId = that._opts.groupId || that._default_opts.groupId; var Client = this._opts.kafka.Client; + var useHighLevelConsumer = that._opts.useHighLevelConsumer || false; var HighLevelConsumer = this._opts.kafka.HighLevelConsumer; var Consumer = this._opts.kafka.Consumer; var subscribedTopics = that._subs_counter.keys(); @@ -173,8 +174,16 @@ KafkaAscoltatore.prototype._startConn = function(cb) { return false; } debug("initial subscriptions expanded to ",subscriptions); - that._consumer = new Consumer(that._consumerClient,subscriptions,{groupId: groupId, - fromOffset: true, autoCommit: false, encoding: "buffer" }); + if (useHighLevelConsumer) { + debug("Using high level consumer "); + that._consumer = new HighLevelConsumer(that._consumerClient, subscriptions, { + groupId: groupId,fromOffset: true, autoCommit: false, encoding: "buffer" + }); + } else { + debug("Using simple consumer "); + that._consumer = new Consumer(that._consumerClient, subscriptions, { + groupId: groupId,fromOffset: true, autoCommit: false, encoding: "buffer"}); + } that._consumer.on("message", function(message) { debug("received new message on topic ", message.topic); From 0a3cf9b362582295b259779d8dd077331a857ef1 Mon Sep 17 00:00:00 2001 From: Sanchu Varkey Date: Fri, 18 Nov 2016 14:21:56 +0000 Subject: [PATCH 2/4] Formatting of code --- lib/kafka_ascoltatore.js | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/lib/kafka_ascoltatore.js b/lib/kafka_ascoltatore.js index 4709a8d..2425e36 100644 --- a/lib/kafka_ascoltatore.js +++ b/lib/kafka_ascoltatore.js @@ -174,17 +174,16 @@ KafkaAscoltatore.prototype._startConn = function(cb) { return false; } debug("initial subscriptions expanded to ",subscriptions); - if (useHighLevelConsumer) { + if (useHighLevelConsumer) { debug("Using high level consumer "); that._consumer = new HighLevelConsumer(that._consumerClient, subscriptions, { groupId: groupId,fromOffset: true, autoCommit: false, encoding: "buffer" }); - } else { + } else { debug("Using simple consumer "); that._consumer = new Consumer(that._consumerClient, subscriptions, { groupId: groupId,fromOffset: true, autoCommit: false, encoding: "buffer"}); - } - + } that._consumer.on("message", function(message) { debug("received new message on topic ", message.topic); var value = message.value; From fc0d888cac0cd815a1f43cdbf0a21927234e8c8d Mon Sep 17 00:00:00 2001 From: Sanchu Varkey Date: Wed, 23 Nov 2016 14:54:28 +0000 Subject: [PATCH 3/4] Added unit tests to cover the new highlevel consumer setting --- test/common.js | 5 +++-- test/kafka_ascoltatore_spec.js | 21 +++++++++++++++++++++ 2 files changed, 24 insertions(+), 2 deletions(-) diff --git a/test/common.js b/test/common.js index 3f4ac72..bae9231 100644 --- a/test/common.js +++ b/test/common.js @@ -1,5 +1,5 @@ "use strict"; - +//process.env.DEBUG = 'kafka-node:HighLevelConsumer'; global.sinon = require("sinon"); global.chai = require("chai"); global.expect = require("chai").expect; @@ -27,13 +27,14 @@ global.zeromqSettings = function(remote_ports) { }; }; -global.kafkaSettings = function() { +global.kafkaSettings = function(useHighLevelConsumer) { return { json: false, kafka: require("kafka-node"), connectionString: "localhost:2181", clientId: "test", groupId: "test", + useHighLevelConsumer: useHighLevelConsumer || false , defaultEncoding: "utf8", encodings: {image: "buffer", hello_42: "utf-8"} }; diff --git a/test/kafka_ascoltatore_spec.js b/test/kafka_ascoltatore_spec.js index 2104992..254e7e7 100644 --- a/test/kafka_ascoltatore_spec.js +++ b/test/kafka_ascoltatore_spec.js @@ -1,6 +1,7 @@ var fs = require("fs"); var util = require("../lib/util"); var steed = require('steed')(); +var kafka = require("kafka-node"); describeAscoltatore("kafka", function() { @@ -47,5 +48,25 @@ describeAscoltatore("kafka", function() { that.instance.pub("hello", "€99"); }); }); + it("Use high level consumer to sync two instances", function(done) { + var other = new ascoltatori.KafkaAscoltatore(kafkaSettings(true)); + var HighLevelConsumer = kafka.HighLevelConsumer; + var that = this; + steed.series([ + function(cb) { + other.on("ready", cb); + }, + function(cb) { + other.subscribe("hello", function() {}, cb); + }, + function(cb) { + that.instance.publish("hello", null, cb); + }, + function() { + expect(other._consumer).to.be.an.instanceof(HighLevelConsumer); + done(); + } + ]); + }); }); From a0b227cb8b9035a2d4d87e6a813de41f812ac0b5 Mon Sep 17 00:00:00 2001 From: Sanchu Varkey Date: Wed, 23 Nov 2016 15:01:08 +0000 Subject: [PATCH 4/4] Removed commented code --- test/common.js | 1 - 1 file changed, 1 deletion(-) diff --git a/test/common.js b/test/common.js index bae9231..dd4a1a9 100644 --- a/test/common.js +++ b/test/common.js @@ -1,5 +1,4 @@ "use strict"; -//process.env.DEBUG = 'kafka-node:HighLevelConsumer'; global.sinon = require("sinon"); global.chai = require("chai"); global.expect = require("chai").expect;