forked from influxdata/telegraf
-
Notifications
You must be signed in to change notification settings - Fork 0
/
kafka_consumer.go
173 lines (145 loc) · 3.99 KB
/
kafka_consumer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
package kafka_consumer
import (
"log"
"strings"
"sync"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/Shopify/sarama"
"github.com/wvanbergen/kafka/consumergroup"
)
type Kafka struct {
ConsumerGroup string
Topics []string
ZookeeperPeers []string
ZookeeperChroot string
Consumer *consumergroup.ConsumerGroup
// Legacy metric buffer support
MetricBuffer int
// TODO remove PointBuffer, legacy support
PointBuffer int
Offset string
parser parsers.Parser
sync.Mutex
// channel for all incoming kafka messages
in <-chan *sarama.ConsumerMessage
// channel for all kafka consumer errors
errs <-chan *sarama.ConsumerError
done chan struct{}
// keep the accumulator internally:
acc telegraf.Accumulator
// doNotCommitMsgs tells the parser not to call CommitUpTo on the consumer
// this is mostly for test purposes, but there may be a use-case for it later.
doNotCommitMsgs bool
}
var sampleConfig = `
## topic(s) to consume
topics = ["telegraf"]
## an array of Zookeeper connection strings
zookeeper_peers = ["localhost:2181"]
## Zookeeper Chroot
zookeeper_chroot = ""
## the name of the consumer group
consumer_group = "telegraf_metrics_consumers"
## Offset (must be either "oldest" or "newest")
offset = "oldest"
## Data format to consume.
## Each data format has it's own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "influx"
`
func (k *Kafka) SampleConfig() string {
return sampleConfig
}
func (k *Kafka) Description() string {
return "Read metrics from Kafka topic(s)"
}
func (k *Kafka) SetParser(parser parsers.Parser) {
k.parser = parser
}
func (k *Kafka) Start(acc telegraf.Accumulator) error {
k.Lock()
defer k.Unlock()
var consumerErr error
k.acc = acc
config := consumergroup.NewConfig()
config.Zookeeper.Chroot = k.ZookeeperChroot
switch strings.ToLower(k.Offset) {
case "oldest", "":
config.Offsets.Initial = sarama.OffsetOldest
case "newest":
config.Offsets.Initial = sarama.OffsetNewest
default:
log.Printf("I! WARNING: Kafka consumer invalid offset '%s', using 'oldest'\n",
k.Offset)
config.Offsets.Initial = sarama.OffsetOldest
}
if k.Consumer == nil || k.Consumer.Closed() {
k.Consumer, consumerErr = consumergroup.JoinConsumerGroup(
k.ConsumerGroup,
k.Topics,
k.ZookeeperPeers,
config,
)
if consumerErr != nil {
return consumerErr
}
// Setup message and error channels
k.in = k.Consumer.Messages()
k.errs = k.Consumer.Errors()
}
k.done = make(chan struct{})
// Start the kafka message reader
go k.receiver()
log.Printf("I! Started the kafka consumer service, peers: %v, topics: %v\n",
k.ZookeeperPeers, k.Topics)
return nil
}
// receiver() reads all incoming messages from the consumer, and parses them into
// influxdb metric points.
func (k *Kafka) receiver() {
for {
select {
case <-k.done:
return
case err := <-k.errs:
if err != nil {
log.Printf("E! Kafka Consumer Error: %s\n", err)
}
case msg := <-k.in:
metrics, err := k.parser.Parse(msg.Value)
if err != nil {
log.Printf("E! Kafka Message Parse Error\nmessage: %s\nerror: %s",
string(msg.Value), err.Error())
}
for _, metric := range metrics {
k.acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), metric.Time())
}
if !k.doNotCommitMsgs {
// TODO(cam) this locking can be removed if this PR gets merged:
// https://github.com/wvanbergen/kafka/pull/84
k.Lock()
k.Consumer.CommitUpto(msg)
k.Unlock()
}
}
}
}
func (k *Kafka) Stop() {
k.Lock()
defer k.Unlock()
close(k.done)
if err := k.Consumer.Close(); err != nil {
log.Printf("E! Error closing kafka consumer: %s\n", err.Error())
}
}
func (k *Kafka) Gather(acc telegraf.Accumulator) error {
return nil
}
func init() {
inputs.Add("kafka_consumer", func() telegraf.Input {
return &Kafka{}
})
}