Skip to content
This repository has been archived by the owner on Mar 10, 2020. It is now read-only.

Commit

Permalink
Merge pull request #159 from svarkey/kafka_use_high_level_consumer
Browse files Browse the repository at this point in the history
Kafka use high level consumer
  • Loading branch information
mcollina authored Nov 24, 2016
2 parents 68333a5 + a0b227c commit c4c32f4
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 5 deletions.
14 changes: 11 additions & 3 deletions lib/kafka_ascoltatore.js
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ KafkaAscoltatore.prototype._startConn = function(cb) {
var noAckBatchOptions = that._opts.noAckBatchOptions;
var groupId = that._opts.groupId || that._default_opts.groupId;
var Client = this._opts.kafka.Client;
var useHighLevelConsumer = that._opts.useHighLevelConsumer || false;
var HighLevelConsumer = this._opts.kafka.HighLevelConsumer;
var Consumer = this._opts.kafka.Consumer;
var subscribedTopics = that._subs_counter.keys();
Expand All @@ -173,9 +174,16 @@ KafkaAscoltatore.prototype._startConn = function(cb) {
return false;
}
debug("initial subscriptions expanded to ",subscriptions);
that._consumer = new Consumer(that._consumerClient,subscriptions,{groupId: groupId,
fromOffset: true, autoCommit: false, encoding: "buffer" });

if (useHighLevelConsumer) {
debug("Using high level consumer ");
that._consumer = new HighLevelConsumer(that._consumerClient, subscriptions, {
groupId: groupId,fromOffset: true, autoCommit: false, encoding: "buffer"
});
} else {
debug("Using simple consumer ");
that._consumer = new Consumer(that._consumerClient, subscriptions, {
groupId: groupId,fromOffset: true, autoCommit: false, encoding: "buffer"});
}
that._consumer.on("message", function(message) {
debug("received new message on topic ", message.topic);
var value = message.value;
Expand Down
4 changes: 2 additions & 2 deletions test/common.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
"use strict";

global.sinon = require("sinon");
global.chai = require("chai");
global.expect = require("chai").expect;
Expand Down Expand Up @@ -27,13 +26,14 @@ global.zeromqSettings = function(remote_ports) {
};
};

global.kafkaSettings = function() {
global.kafkaSettings = function(useHighLevelConsumer) {
return {
json: false,
kafka: require("kafka-node"),
connectionString: "localhost:2181",
clientId: "test",
groupId: "test",
useHighLevelConsumer: useHighLevelConsumer || false ,
defaultEncoding: "utf8",
encodings: {image: "buffer", hello_42: "utf-8"}
};
Expand Down
21 changes: 21 additions & 0 deletions test/kafka_ascoltatore_spec.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
var fs = require("fs");
var util = require("../lib/util");
var steed = require('steed')();
var kafka = require("kafka-node");

describeAscoltatore("kafka", function() {

Expand Down Expand Up @@ -47,5 +48,25 @@ describeAscoltatore("kafka", function() {
that.instance.pub("hello", "€99");
});
});
it("Use high level consumer to sync two instances", function(done) {
var other = new ascoltatori.KafkaAscoltatore(kafkaSettings(true));
var HighLevelConsumer = kafka.HighLevelConsumer;
var that = this;
steed.series([
function(cb) {
other.on("ready", cb);
},
function(cb) {
other.subscribe("hello", function() {}, cb);
},
function(cb) {
that.instance.publish("hello", null, cb);
},
function() {
expect(other._consumer).to.be.an.instanceof(HighLevelConsumer);
done();
}
]);
});

});

0 comments on commit c4c32f4

Please sign in to comment.