-
Notifications
You must be signed in to change notification settings - Fork 1
/
consumer.go
119 lines (105 loc) · 3.3 KB
/
consumer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
package otelkafka
import (
"context"
"fmt"
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
"go.opentelemetry.io/otel/attribute"
semconv "go.opentelemetry.io/otel/semconv/v1.26.0"
"go.opentelemetry.io/otel/trace"
"strconv"
"time"
)
type Consumer struct {
*kafka.Consumer
cfg config
prev trace.Span
}
func NewConsumer(conf *kafka.ConfigMap, opts ...Option) (*Consumer, error) {
c, err := kafka.NewConsumer(conf)
if err != nil {
return nil, err
}
opts = append(opts, withConfig(conf))
cfg := newConfig(opts...)
return &Consumer{Consumer: c, cfg: cfg}, nil
}
// WrapConsumer wraps a kafka.Consumer so that any consumed events are traced.
func WrapConsumer(c *kafka.Consumer, opts ...Option) *Consumer {
wrapped := &Consumer{
Consumer: c,
cfg: newConfig(opts...),
}
return wrapped
}
func (c *Consumer) Poll(timeoutMs int) (event kafka.Event) {
if c.prev != nil {
c.prev.End()
}
e := c.Consumer.Poll(timeoutMs)
switch e := e.(type) {
case *kafka.Message:
span := c.startSpan(e)
// latest span is stored to be closed when the next message is polled or when the consumer is closed
c.prev = span
}
return e
}
// ReadMessage polls the consumer for a message. Message will be traced.
func (c *Consumer) ReadMessage(timeout time.Duration) (*kafka.Message, error) {
if c.prev != nil {
if c.prev.IsRecording() {
c.prev.End()
}
c.prev = nil
}
msg, err := c.Consumer.ReadMessage(timeout)
if err != nil {
return nil, err
}
// latest span is stored to be closed when the next message is polled or when the consumer is closed
c.prev = c.startSpan(msg)
return msg, nil
}
// Close calls the underlying Consumer.Close and if polling is enabled, finishes
// any remaining span.
func (c *Consumer) Close() error {
err := c.Consumer.Close()
// we only close the previous span if consuming via the events channel is
// not enabled, because otherwise there would be a data race from the
// consuming goroutine.
if c.prev != nil {
if c.prev.IsRecording() {
c.prev.End()
}
c.prev = nil
}
return err
}
func (c *Consumer) startSpan(msg *kafka.Message) trace.Span {
carrier := NewMessageCarrier(msg)
parentSpanContext := c.cfg.Propagators.Extract(context.Background(), carrier)
// Create a span.
attrs := []attribute.KeyValue{
semconv.MessagingOperationTypeReceive,
semconv.MessagingSystemKafka,
semconv.MessagingKafkaMessageOffset(int(msg.TopicPartition.Offset)),
semconv.MessagingKafkaConsumerGroup(c.cfg.consumerGroupID),
semconv.MessagingKafkaMessageKey(string(msg.Key)),
semconv.ServerAddress(c.cfg.bootstrapServers),
semconv.MessagingDestinationName(*msg.TopicPartition.Topic),
semconv.MessagingMessageID(strconv.FormatInt(int64(msg.TopicPartition.Offset), 10)),
semconv.MessagingDestinationPartitionID(strconv.Itoa(int(msg.TopicPartition.Partition))),
semconv.MessagingMessageBodySize(getMsgSize(msg)),
}
if c.cfg.attributeInjectFunc != nil {
attrs = append(attrs, c.cfg.attributeInjectFunc(msg)...)
}
opts := []trace.SpanStartOption{
trace.WithAttributes(attrs...),
trace.WithSpanKind(trace.SpanKindConsumer),
}
newCtx, span := c.cfg.Tracer.Start(parentSpanContext, fmt.Sprintf("%v receive", msg.TopicPartition.Topic), opts...)
// Inject current span context, so consumers can use it to propagate span.
c.cfg.Propagators.Inject(newCtx, carrier)
return span
}