Kafka is a high avaliable message queue, but it lacks of consuming message with a slow speed. Some of task with no need to finish it at none, and we want to complete it with a small cost. This is just the reason why we develop Queue Shedule
.
npm install queue-schedule
A basic example is showed as follows:
const kafka = require('kafka-node');
const {expect} = require('chai');
const {KafkaProducer,KafkaConsumer} = require('queue-schedule');
const KAFKA_HOST = process.env.KAFKA_PEERS;
const FIST_DATA = {a:1,b:2};
const SCHEDULE_NAME1 = 'schedule1';
const TOPIC_NAME1 = 'topic.1';
const PARTITION1 = 0;
let hasDone = false;
new KafkaConsumer({
name: 'kafka',
topics:[TOPIC_NAME1],
consumerOption: {
kafkaHost: KAFKA_HOST,
fromOffset: 'earliest',
fetchMaxBytes: 1024*1024,
},
doTask:function(messages,callback) {console.log(messages);
if (!hasDone) {
const value = messages[0].value;
let data = null;
try {
data = JSON.parse(value);
} catch (e) {
hasDone = true;
console.error('parse message error',e);
return;
}
expect(data).to.have.property('a').and.equal(1);
console.log('recieve data',data);
hasDone = true;
}
callback();
},
readCount : 1,
pauseTime : 500,
idleCheckInter: 10 * 1000
}).on(KafkaConsumer.EVENT_CONSUMER_ERROR,function(err) {
console.error('consumer error',err);
hasDone = true;
});
new KafkaProducer({
name : SCHEDULE_NAME1,
topic: TOPIC_NAME1,
kafkarHost:KAFKA_HOST
}).addData(FIST_DATA,{},function(err) {
if (err) {
console.error('write to queue error',err);
return ;
}
console.info('write to kafka finished');
});
For detail usage, see the document online here.