-
Notifications
You must be signed in to change notification settings - Fork 0
/
consumer.js
37 lines (31 loc) · 914 Bytes
/
consumer.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import pkg from 'kafkajs';
import fs from 'fs';
const { Kafka } = pkg;
import util from 'util';
import os from 'os';
const writeFile = util.promisify(fs.writeFile);
(async function () {
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['localhost:9092']
})
const consumer = kafka.consumer({ groupId: 'consumer-group' })
const got = []
await consumer.connect()
await consumer.subscribe({ topic: 'chat', fromBeginning: true })
await consumer.run({
partitionsConsumedConcurrently: os.cpus().length,
async eachMessage ({ topic, partition, message }) {
await new Promise(resolve => {
setTimeout(resolve, Math.random() * 50);
})
const msg = {
key: message.key.toString(),
value: message.value.toString(),
}
console.log(msg)
got.push(msg)
await writeFile('got.json', JSON.stringify(got, null, 2))
}
})
})()