Skip to content

Commit

Permalink
Prometheus Exemplars Support (#448)
Browse files Browse the repository at this point in the history
  • Loading branch information
EvgeniaMartynova-thebeat authored Jan 20, 2022
1 parent db673e6 commit 7d46f2a
Show file tree
Hide file tree
Showing 24 changed files with 477 additions and 111 deletions.
12 changes: 8 additions & 4 deletions client/amqp/v2/amqp.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
patronerrors "github.com/beatlabs/patron/errors"
"github.com/beatlabs/patron/log"
"github.com/beatlabs/patron/trace"
opentracing "github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
"github.com/prometheus/client_golang/prometheus"
"github.com/streadway/amqp"
Expand Down Expand Up @@ -100,7 +100,7 @@ func (tc *Publisher) Publish(ctx context.Context, exchange, key string, mandator
start := time.Now()
err := tc.channel.Publish(exchange, key, mandatory, immediate, msg)

observePublish(sp, start, exchange, err)
observePublish(ctx, sp, start, exchange, err)
if err != nil {
return fmt.Errorf("failed to publish message: %w", err)
}
Expand All @@ -120,7 +120,11 @@ func (c amqpHeadersCarrier) Set(key, val string) {
c[key] = val
}

func observePublish(span opentracing.Span, start time.Time, exchange string, err error) {
func observePublish(ctx context.Context, span opentracing.Span, start time.Time, exchange string, err error) {
trace.SpanComplete(span, err)
publishDurationMetrics.WithLabelValues(exchange, strconv.FormatBool(err != nil)).Observe(time.Since(start).Seconds())

durationHistogram := trace.Histogram{
Observer: publishDurationMetrics.WithLabelValues(exchange, strconv.FormatBool(err == nil)),
}
durationHistogram.Observe(ctx, time.Since(start).Seconds())
}
8 changes: 5 additions & 3 deletions client/es/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
elasticsearch "github.com/elastic/go-elasticsearch/v8"
"github.com/elastic/go-elasticsearch/v8/esapi"
"github.com/elastic/go-elasticsearch/v8/estransport"
opentracing "github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
"github.com/prometheus/client_golang/prometheus"
)
Expand Down Expand Up @@ -120,8 +120,10 @@ func (c *transportClient) Perform(req *http.Request) (*http.Response, error) {

func observeResponse(req *http.Request, sp opentracing.Span, rsp *http.Response, start time.Time) {
endSpan(sp, rsp)
reqDurationMetrics.WithLabelValues(req.Method, req.URL.Host, strconv.Itoa(rsp.StatusCode)).
Observe(time.Since(start).Seconds())
durationHistogram := trace.Histogram{
Observer: reqDurationMetrics.WithLabelValues(req.Method, req.URL.Host, strconv.Itoa(rsp.StatusCode)),
}
durationHistogram.Observe(req.Context(), time.Since(start).Seconds())
}

// Config is a wrapper for elasticsearch.Config
Expand Down
8 changes: 5 additions & 3 deletions client/grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,11 @@ func unaryInterceptor(target string) grpc.UnaryClientInterceptor {
invokeDuration := time.Since(invokeTime)

rpcStatus, _ := status.FromError(err) // codes.OK if err == nil, codes.Unknown if !ok
rpcDurationMetrics.
WithLabelValues(unary, target, method, rpcStatus.Code().String()).
Observe(invokeDuration.Seconds())

durationHistogram := trace.Histogram{
Observer: rpcDurationMetrics.WithLabelValues(unary, target, method, rpcStatus.Code().String()),
}
durationHistogram.Observe(ctx, invokeDuration.Seconds())

if err != nil {
trace.SpanError(span)
Expand Down
7 changes: 4 additions & 3 deletions client/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,10 @@ func (tc *TracedClient) Do(req *http.Request) (*http.Response, error) {
}

ext.HTTPStatusCode.Set(ht.Span(), uint16(rsp.StatusCode))
reqDurationMetrics.
WithLabelValues(req.Method, req.URL.Host, strconv.Itoa(rsp.StatusCode)).
Observe(time.Since(start).Seconds())
durationHistogram := trace.Histogram{
Observer: reqDurationMetrics.WithLabelValues(req.Method, req.URL.Host, strconv.Itoa(rsp.StatusCode)),
}
durationHistogram.Observe(req.Context(), time.Since(start).Seconds())

if hdr := req.Header.Get(encoding.AcceptEncodingHeader); hdr != "" {
rsp.Body = decompress(hdr, rsp)
Expand Down
4 changes: 2 additions & 2 deletions client/kafka/v2/async_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,13 @@ func (ap *AsyncProducer) Send(ctx context.Context, msg *sarama.ProducerMessage)

err := injectTracingHeaders(msg, sp)
if err != nil {
statusCountAdd(deliveryTypeAsync, deliveryStatusSendError, msg.Topic, 1)
statusCountAddWithExemplars(ctx, deliveryTypeAsync, deliveryStatusSendError, msg.Topic, 1)
trace.SpanError(sp)
return fmt.Errorf("failed to inject tracing headers: %w", err)
}

ap.asyncProd.Input() <- msg
statusCountAdd(deliveryTypeAsync, deliveryStatusSent, msg.Topic, 1)
statusCountAddWithExemplars(ctx, deliveryTypeAsync, deliveryStatusSent, msg.Topic, 1)
trace.SpanSuccess(sp)
return nil
}
Expand Down
7 changes: 7 additions & 0 deletions client/kafka/v2/kafka.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package v2

import (
"context"
"errors"
"fmt"
"os"

"github.com/Shopify/sarama"
patronerrors "github.com/beatlabs/patron/errors"
"github.com/beatlabs/patron/internal/validation"
"github.com/beatlabs/patron/trace"
"github.com/prometheus/client_golang/prometheus"
)

Expand Down Expand Up @@ -41,6 +43,11 @@ func init() {
prometheus.MustRegister(messageStatus)
}

func statusCountAddWithExemplars(ctx context.Context, deliveryType string, status deliveryStatus, topic string, cnt int) {
messageStatusCounter := trace.Counter{Counter: messageStatus.WithLabelValues(string(status), topic, deliveryType)}
messageStatusCounter.Add(ctx, float64(cnt))
}

func statusCountAdd(deliveryType string, status deliveryStatus, topic string, cnt int) {
messageStatus.WithLabelValues(string(status), topic, deliveryType).Add(float64(cnt))
}
Expand Down
16 changes: 8 additions & 8 deletions client/kafka/v2/sync_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,19 @@ func (p *SyncProducer) Send(ctx context.Context, msg *sarama.ProducerMessage) (p

err = injectTracingHeaders(msg, sp)
if err != nil {
statusCountAdd(deliveryTypeSync, deliveryStatusSendError, msg.Topic, 1)
statusCountAddWithExemplars(ctx, deliveryTypeSync, deliveryStatusSendError, msg.Topic, 1)
trace.SpanError(sp)
return -1, -1, fmt.Errorf("failed to inject tracing headers: %w", err)
}

partition, offset, err = p.syncProd.SendMessage(msg)
if err != nil {
statusCountAdd(deliveryTypeSync, deliveryStatusSendError, msg.Topic, 1)
statusCountAddWithExemplars(ctx, deliveryTypeSync, deliveryStatusSendError, msg.Topic, 1)
trace.SpanError(sp)
return -1, -1, err
}

statusCountAdd(deliveryTypeSync, deliveryStatusSent, msg.Topic, 1)
statusCountAddWithExemplars(ctx, deliveryTypeSync, deliveryStatusSent, msg.Topic, 1)
trace.SpanSuccess(sp)
return partition, offset, nil
}
Expand All @@ -57,19 +57,19 @@ func (p *SyncProducer) SendBatch(ctx context.Context, messages []*sarama.Produce

for _, msg := range messages {
if err := injectTracingHeaders(msg, sp); err != nil {
statusCountAdd(deliveryTypeSync, deliveryStatusSendError, msg.Topic, len(messages))
statusCountAddWithExemplars(ctx, deliveryTypeSync, deliveryStatusSendError, msg.Topic, len(messages))
trace.SpanError(sp)
return fmt.Errorf("failed to inject tracing headers: %w", err)
}
}

if err := p.syncProd.SendMessages(messages); err != nil {
statusCountBatchAdd(deliveryTypeSync, deliveryStatusSendError, messages)
statusCountBatchAdd(ctx, deliveryTypeSync, deliveryStatusSendError, messages)
trace.SpanError(sp)
return err
}

statusCountBatchAdd(deliveryTypeSync, deliveryStatusSent, messages)
statusCountBatchAdd(ctx, deliveryTypeSync, deliveryStatusSent, messages)
trace.SpanSuccess(sp)
return nil
}
Expand All @@ -87,8 +87,8 @@ func (p *SyncProducer) Close() error {
return nil
}

func statusCountBatchAdd(deliveryType string, status deliveryStatus, messages []*sarama.ProducerMessage) {
func statusCountBatchAdd(ctx context.Context, deliveryType string, status deliveryStatus, messages []*sarama.ProducerMessage) {
for _, msg := range messages {
statusCountAdd(deliveryType, status, msg.Topic, 1)
statusCountAddWithExemplars(ctx, deliveryType, status, msg.Topic, 1)
}
}
7 changes: 5 additions & 2 deletions client/redis/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"github.com/beatlabs/patron/trace"
"github.com/go-redis/redis/extra/rediscmd"
"github.com/go-redis/redis/v8"
opentracing "github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
"github.com/prometheus/client_golang/prometheus"
)
Expand Down Expand Up @@ -89,7 +89,10 @@ func (th tracingHook) AfterProcessPipeline(ctx context.Context, cmds []redis.Cmd

func observeDuration(ctx context.Context, cmd string, err error) {
dur := time.Since(ctx.Value(duration{}).(time.Time))
cmdDurationMetrics.WithLabelValues(cmd, strconv.FormatBool(err != nil)).Observe(dur.Seconds())
durationHistogram := trace.Histogram{
Observer: cmdDurationMetrics.WithLabelValues(cmd, strconv.FormatBool(err == nil)),
}
durationHistogram.Observe(ctx, dur.Seconds())
}

func startSpan(ctx context.Context, address, opName string) (opentracing.Span, context.Context) {
Expand Down
12 changes: 8 additions & 4 deletions client/sns/v2/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,10 @@ func (p Publisher) Publish(ctx context.Context, input *sns.PublishInput) (messag
start := time.Now()
out, err := p.api.PublishWithContext(ctx, input)
if input.TopicArn != nil {
observePublish(span, start, *input.TopicArn, err)
observePublish(ctx, span, start, *input.TopicArn, err)
}
if input.TargetArn != nil {
observePublish(span, start, *input.TargetArn, err)
observePublish(ctx, span, start, *input.TargetArn, err)
}
if err != nil {
return "", fmt.Errorf("failed to publish message: %w", err)
Expand Down Expand Up @@ -124,7 +124,11 @@ func injectHeaders(span opentracing.Span, input *sns.PublishInput) error {
return nil
}

func observePublish(span opentracing.Span, start time.Time, topic string, err error) {
func observePublish(ctx context.Context, span opentracing.Span, start time.Time, topic string, err error) {
trace.SpanComplete(span, err)
publishDurationMetrics.WithLabelValues(topic, strconv.FormatBool(err != nil)).Observe(time.Since(start).Seconds())

durationHistogram := trace.Histogram{
Observer: publishDurationMetrics.WithLabelValues(topic, strconv.FormatBool(err == nil)),
}
durationHistogram.Observe(ctx, time.Since(start).Seconds())
}
Loading

0 comments on commit 7d46f2a

Please sign in to comment.