From 7d46f2a1598fceaf6c82fec2b2aea4db94d2f0d8 Mon Sep 17 00:00:00 2001 From: EvgeniaMartynova-thebeat <83655560+EvgeniaMartynova-thebeat@users.noreply.github.com> Date: Thu, 20 Jan 2022 10:07:03 +0100 Subject: [PATCH] Prometheus Exemplars Support (#448) --- client/amqp/v2/amqp.go | 12 +- client/es/elasticsearch.go | 8 +- client/grpc/grpc.go | 8 +- client/http/http.go | 7 +- client/kafka/v2/async_producer.go | 4 +- client/kafka/v2/kafka.go | 7 + client/kafka/v2/sync_producer.go | 16 +- client/redis/redis.go | 7 +- client/sns/v2/publisher.go | 12 +- client/sql/sql.go | 66 ++++---- client/sqs/v2/publisher.go | 10 +- component/amqp/component.go | 21 +-- component/amqp/message.go | 4 +- component/async/component.go | 8 +- component/grpc/observability.go | 11 +- component/http/metric.go | 3 +- component/http/middleware.go | 14 +- component/kafka/group/component.go | 17 ++- component/sqs/component.go | 26 ++-- component/sqs/message.go | 12 +- docs/observability/Observability.md | 22 ++- trace/metric.go | 64 ++++++++ trace/metric_test.go | 223 ++++++++++++++++++++++++++++ trace/trace.go | 6 +- 24 files changed, 477 insertions(+), 111 deletions(-) create mode 100644 trace/metric.go create mode 100644 trace/metric_test.go diff --git a/client/amqp/v2/amqp.go b/client/amqp/v2/amqp.go index 573b3aaac..66d7adb7e 100644 --- a/client/amqp/v2/amqp.go +++ b/client/amqp/v2/amqp.go @@ -12,7 +12,7 @@ import ( patronerrors "github.com/beatlabs/patron/errors" "github.com/beatlabs/patron/log" "github.com/beatlabs/patron/trace" - opentracing "github.com/opentracing/opentracing-go" + "github.com/opentracing/opentracing-go" "github.com/opentracing/opentracing-go/ext" "github.com/prometheus/client_golang/prometheus" "github.com/streadway/amqp" @@ -100,7 +100,7 @@ func (tc *Publisher) Publish(ctx context.Context, exchange, key string, mandator start := time.Now() err := tc.channel.Publish(exchange, key, mandatory, immediate, msg) - observePublish(sp, start, exchange, err) + observePublish(ctx, sp, start, exchange, err) if err != nil { return fmt.Errorf("failed to publish message: %w", err) } @@ -120,7 +120,11 @@ func (c amqpHeadersCarrier) Set(key, val string) { c[key] = val } -func observePublish(span opentracing.Span, start time.Time, exchange string, err error) { +func observePublish(ctx context.Context, span opentracing.Span, start time.Time, exchange string, err error) { trace.SpanComplete(span, err) - publishDurationMetrics.WithLabelValues(exchange, strconv.FormatBool(err != nil)).Observe(time.Since(start).Seconds()) + + durationHistogram := trace.Histogram{ + Observer: publishDurationMetrics.WithLabelValues(exchange, strconv.FormatBool(err == nil)), + } + durationHistogram.Observe(ctx, time.Since(start).Seconds()) } diff --git a/client/es/elasticsearch.go b/client/es/elasticsearch.go index ed2bde43f..32fd66357 100644 --- a/client/es/elasticsearch.go +++ b/client/es/elasticsearch.go @@ -17,7 +17,7 @@ import ( elasticsearch "github.com/elastic/go-elasticsearch/v8" "github.com/elastic/go-elasticsearch/v8/esapi" "github.com/elastic/go-elasticsearch/v8/estransport" - opentracing "github.com/opentracing/opentracing-go" + "github.com/opentracing/opentracing-go" "github.com/opentracing/opentracing-go/ext" "github.com/prometheus/client_golang/prometheus" ) @@ -120,8 +120,10 @@ func (c *transportClient) Perform(req *http.Request) (*http.Response, error) { func observeResponse(req *http.Request, sp opentracing.Span, rsp *http.Response, start time.Time) { endSpan(sp, rsp) - reqDurationMetrics.WithLabelValues(req.Method, req.URL.Host, strconv.Itoa(rsp.StatusCode)). - Observe(time.Since(start).Seconds()) + durationHistogram := trace.Histogram{ + Observer: reqDurationMetrics.WithLabelValues(req.Method, req.URL.Host, strconv.Itoa(rsp.StatusCode)), + } + durationHistogram.Observe(req.Context(), time.Since(start).Seconds()) } // Config is a wrapper for elasticsearch.Config diff --git a/client/grpc/grpc.go b/client/grpc/grpc.go index b625d9ccf..d386c9b43 100644 --- a/client/grpc/grpc.go +++ b/client/grpc/grpc.go @@ -86,9 +86,11 @@ func unaryInterceptor(target string) grpc.UnaryClientInterceptor { invokeDuration := time.Since(invokeTime) rpcStatus, _ := status.FromError(err) // codes.OK if err == nil, codes.Unknown if !ok - rpcDurationMetrics. - WithLabelValues(unary, target, method, rpcStatus.Code().String()). - Observe(invokeDuration.Seconds()) + + durationHistogram := trace.Histogram{ + Observer: rpcDurationMetrics.WithLabelValues(unary, target, method, rpcStatus.Code().String()), + } + durationHistogram.Observe(ctx, invokeDuration.Seconds()) if err != nil { trace.SpanError(span) diff --git a/client/http/http.go b/client/http/http.go index b6276b300..91efc6c29 100644 --- a/client/http/http.go +++ b/client/http/http.go @@ -93,9 +93,10 @@ func (tc *TracedClient) Do(req *http.Request) (*http.Response, error) { } ext.HTTPStatusCode.Set(ht.Span(), uint16(rsp.StatusCode)) - reqDurationMetrics. - WithLabelValues(req.Method, req.URL.Host, strconv.Itoa(rsp.StatusCode)). - Observe(time.Since(start).Seconds()) + durationHistogram := trace.Histogram{ + Observer: reqDurationMetrics.WithLabelValues(req.Method, req.URL.Host, strconv.Itoa(rsp.StatusCode)), + } + durationHistogram.Observe(req.Context(), time.Since(start).Seconds()) if hdr := req.Header.Get(encoding.AcceptEncodingHeader); hdr != "" { rsp.Body = decompress(hdr, rsp) diff --git a/client/kafka/v2/async_producer.go b/client/kafka/v2/async_producer.go index 558d7a445..f1805e7f1 100644 --- a/client/kafka/v2/async_producer.go +++ b/client/kafka/v2/async_producer.go @@ -28,13 +28,13 @@ func (ap *AsyncProducer) Send(ctx context.Context, msg *sarama.ProducerMessage) err := injectTracingHeaders(msg, sp) if err != nil { - statusCountAdd(deliveryTypeAsync, deliveryStatusSendError, msg.Topic, 1) + statusCountAddWithExemplars(ctx, deliveryTypeAsync, deliveryStatusSendError, msg.Topic, 1) trace.SpanError(sp) return fmt.Errorf("failed to inject tracing headers: %w", err) } ap.asyncProd.Input() <- msg - statusCountAdd(deliveryTypeAsync, deliveryStatusSent, msg.Topic, 1) + statusCountAddWithExemplars(ctx, deliveryTypeAsync, deliveryStatusSent, msg.Topic, 1) trace.SpanSuccess(sp) return nil } diff --git a/client/kafka/v2/kafka.go b/client/kafka/v2/kafka.go index 4a964bfa0..da1e8b862 100644 --- a/client/kafka/v2/kafka.go +++ b/client/kafka/v2/kafka.go @@ -1,6 +1,7 @@ package v2 import ( + "context" "errors" "fmt" "os" @@ -8,6 +9,7 @@ import ( "github.com/Shopify/sarama" patronerrors "github.com/beatlabs/patron/errors" "github.com/beatlabs/patron/internal/validation" + "github.com/beatlabs/patron/trace" "github.com/prometheus/client_golang/prometheus" ) @@ -41,6 +43,11 @@ func init() { prometheus.MustRegister(messageStatus) } +func statusCountAddWithExemplars(ctx context.Context, deliveryType string, status deliveryStatus, topic string, cnt int) { + messageStatusCounter := trace.Counter{Counter: messageStatus.WithLabelValues(string(status), topic, deliveryType)} + messageStatusCounter.Add(ctx, float64(cnt)) +} + func statusCountAdd(deliveryType string, status deliveryStatus, topic string, cnt int) { messageStatus.WithLabelValues(string(status), topic, deliveryType).Add(float64(cnt)) } diff --git a/client/kafka/v2/sync_producer.go b/client/kafka/v2/sync_producer.go index 9d4262e67..3f5748000 100644 --- a/client/kafka/v2/sync_producer.go +++ b/client/kafka/v2/sync_producer.go @@ -29,19 +29,19 @@ func (p *SyncProducer) Send(ctx context.Context, msg *sarama.ProducerMessage) (p err = injectTracingHeaders(msg, sp) if err != nil { - statusCountAdd(deliveryTypeSync, deliveryStatusSendError, msg.Topic, 1) + statusCountAddWithExemplars(ctx, deliveryTypeSync, deliveryStatusSendError, msg.Topic, 1) trace.SpanError(sp) return -1, -1, fmt.Errorf("failed to inject tracing headers: %w", err) } partition, offset, err = p.syncProd.SendMessage(msg) if err != nil { - statusCountAdd(deliveryTypeSync, deliveryStatusSendError, msg.Topic, 1) + statusCountAddWithExemplars(ctx, deliveryTypeSync, deliveryStatusSendError, msg.Topic, 1) trace.SpanError(sp) return -1, -1, err } - statusCountAdd(deliveryTypeSync, deliveryStatusSent, msg.Topic, 1) + statusCountAddWithExemplars(ctx, deliveryTypeSync, deliveryStatusSent, msg.Topic, 1) trace.SpanSuccess(sp) return partition, offset, nil } @@ -57,19 +57,19 @@ func (p *SyncProducer) SendBatch(ctx context.Context, messages []*sarama.Produce for _, msg := range messages { if err := injectTracingHeaders(msg, sp); err != nil { - statusCountAdd(deliveryTypeSync, deliveryStatusSendError, msg.Topic, len(messages)) + statusCountAddWithExemplars(ctx, deliveryTypeSync, deliveryStatusSendError, msg.Topic, len(messages)) trace.SpanError(sp) return fmt.Errorf("failed to inject tracing headers: %w", err) } } if err := p.syncProd.SendMessages(messages); err != nil { - statusCountBatchAdd(deliveryTypeSync, deliveryStatusSendError, messages) + statusCountBatchAdd(ctx, deliveryTypeSync, deliveryStatusSendError, messages) trace.SpanError(sp) return err } - statusCountBatchAdd(deliveryTypeSync, deliveryStatusSent, messages) + statusCountBatchAdd(ctx, deliveryTypeSync, deliveryStatusSent, messages) trace.SpanSuccess(sp) return nil } @@ -87,8 +87,8 @@ func (p *SyncProducer) Close() error { return nil } -func statusCountBatchAdd(deliveryType string, status deliveryStatus, messages []*sarama.ProducerMessage) { +func statusCountBatchAdd(ctx context.Context, deliveryType string, status deliveryStatus, messages []*sarama.ProducerMessage) { for _, msg := range messages { - statusCountAdd(deliveryType, status, msg.Topic, 1) + statusCountAddWithExemplars(ctx, deliveryType, status, msg.Topic, 1) } } diff --git a/client/redis/redis.go b/client/redis/redis.go index ff1baf9a2..2a36297fb 100644 --- a/client/redis/redis.go +++ b/client/redis/redis.go @@ -9,7 +9,7 @@ import ( "github.com/beatlabs/patron/trace" "github.com/go-redis/redis/extra/rediscmd" "github.com/go-redis/redis/v8" - opentracing "github.com/opentracing/opentracing-go" + "github.com/opentracing/opentracing-go" "github.com/opentracing/opentracing-go/ext" "github.com/prometheus/client_golang/prometheus" ) @@ -89,7 +89,10 @@ func (th tracingHook) AfterProcessPipeline(ctx context.Context, cmds []redis.Cmd func observeDuration(ctx context.Context, cmd string, err error) { dur := time.Since(ctx.Value(duration{}).(time.Time)) - cmdDurationMetrics.WithLabelValues(cmd, strconv.FormatBool(err != nil)).Observe(dur.Seconds()) + durationHistogram := trace.Histogram{ + Observer: cmdDurationMetrics.WithLabelValues(cmd, strconv.FormatBool(err == nil)), + } + durationHistogram.Observe(ctx, dur.Seconds()) } func startSpan(ctx context.Context, address, opName string) (opentracing.Span, context.Context) { diff --git a/client/sns/v2/publisher.go b/client/sns/v2/publisher.go index bda64263c..324079ce2 100644 --- a/client/sns/v2/publisher.go +++ b/client/sns/v2/publisher.go @@ -69,10 +69,10 @@ func (p Publisher) Publish(ctx context.Context, input *sns.PublishInput) (messag start := time.Now() out, err := p.api.PublishWithContext(ctx, input) if input.TopicArn != nil { - observePublish(span, start, *input.TopicArn, err) + observePublish(ctx, span, start, *input.TopicArn, err) } if input.TargetArn != nil { - observePublish(span, start, *input.TargetArn, err) + observePublish(ctx, span, start, *input.TargetArn, err) } if err != nil { return "", fmt.Errorf("failed to publish message: %w", err) @@ -124,7 +124,11 @@ func injectHeaders(span opentracing.Span, input *sns.PublishInput) error { return nil } -func observePublish(span opentracing.Span, start time.Time, topic string, err error) { +func observePublish(ctx context.Context, span opentracing.Span, start time.Time, topic string, err error) { trace.SpanComplete(span, err) - publishDurationMetrics.WithLabelValues(topic, strconv.FormatBool(err != nil)).Observe(time.Since(start).Seconds()) + + durationHistogram := trace.Histogram{ + Observer: publishDurationMetrics.WithLabelValues(topic, strconv.FormatBool(err == nil)), + } + durationHistogram.Observe(ctx, time.Since(start).Seconds()) } diff --git a/client/sql/sql.go b/client/sql/sql.go index 0075adf73..a3a29f25b 100644 --- a/client/sql/sql.go +++ b/client/sql/sql.go @@ -10,7 +10,7 @@ import ( "time" "github.com/beatlabs/patron/trace" - opentracing "github.com/opentracing/opentracing-go" + "github.com/opentracing/opentracing-go" "github.com/opentracing/opentracing-go/ext" "github.com/prometheus/client_golang/prometheus" ) @@ -75,7 +75,7 @@ func (c *Conn) BeginTx(ctx context.Context, opts *sql.TxOptions) (*Tx, error) { sp, _ := c.startSpan(ctx, op, "") start := time.Now() tx, err := c.conn.BeginTx(ctx, opts) - observeDuration(sp, start, op, err) + observeDuration(ctx, sp, start, op, err) if err != nil { return nil, err } @@ -89,7 +89,7 @@ func (c *Conn) Close(ctx context.Context) error { sp, _ := c.startSpan(ctx, op, "") start := time.Now() err := c.conn.Close() - observeDuration(sp, start, op, err) + observeDuration(ctx, sp, start, op, err) return err } @@ -99,7 +99,7 @@ func (c *Conn) Exec(ctx context.Context, query string, args ...interface{}) (sql sp, _ := c.startSpan(ctx, op, query) start := time.Now() res, err := c.conn.ExecContext(ctx, query, args...) - observeDuration(sp, start, op, err) + observeDuration(ctx, sp, start, op, err) return res, err } @@ -109,7 +109,7 @@ func (c *Conn) Ping(ctx context.Context) error { sp, _ := c.startSpan(ctx, op, "") start := time.Now() err := c.conn.PingContext(ctx) - observeDuration(sp, start, op, err) + observeDuration(ctx, sp, start, op, err) return err } @@ -119,7 +119,7 @@ func (c *Conn) Prepare(ctx context.Context, query string) (*Stmt, error) { sp, _ := c.startSpan(ctx, op, query) start := time.Now() stmt, err := c.conn.PrepareContext(ctx, query) - observeDuration(sp, start, op, err) + observeDuration(ctx, sp, start, op, err) if err != nil { return nil, err } @@ -132,7 +132,7 @@ func (c *Conn) Query(ctx context.Context, query string, args ...interface{}) (*s sp, _ := c.startSpan(ctx, op, query) start := time.Now() rows, err := c.conn.QueryContext(ctx, query, args...) - observeDuration(sp, start, op, err) + observeDuration(ctx, sp, start, op, err) if err != nil { return nil, err } @@ -146,7 +146,7 @@ func (c *Conn) QueryRow(ctx context.Context, query string, args ...interface{}) sp, _ := c.startSpan(ctx, op, query) start := time.Now() row := c.conn.QueryRowContext(ctx, query, args...) - observeDuration(sp, start, op, nil) + observeDuration(ctx, sp, start, op, nil) return row } @@ -191,7 +191,7 @@ func (db *DB) BeginTx(ctx context.Context, opts *sql.TxOptions) (*Tx, error) { sp, _ := db.startSpan(ctx, op, "") start := time.Now() tx, err := db.db.BeginTx(ctx, opts) - observeDuration(sp, start, op, err) + observeDuration(ctx, sp, start, op, err) if err != nil { return nil, err } @@ -204,7 +204,7 @@ func (db *DB) Close(ctx context.Context) error { sp, _ := db.startSpan(ctx, op, "") start := time.Now() err := db.db.Close() - observeDuration(sp, start, op, err) + observeDuration(ctx, sp, start, op, err) return err } @@ -214,7 +214,7 @@ func (db *DB) Conn(ctx context.Context) (*Conn, error) { sp, _ := db.startSpan(ctx, op, "") start := time.Now() conn, err := db.db.Conn(ctx) - observeDuration(sp, start, op, err) + observeDuration(ctx, sp, start, op, err) if err != nil { return nil, err } @@ -228,7 +228,7 @@ func (db *DB) Driver(ctx context.Context) driver.Driver { sp, _ := db.startSpan(ctx, op, "") start := time.Now() drv := db.db.Driver() - observeDuration(sp, start, op, nil) + observeDuration(ctx, sp, start, op, nil) return drv } @@ -238,7 +238,7 @@ func (db *DB) Exec(ctx context.Context, query string, args ...interface{}) (sql. sp, _ := db.startSpan(ctx, op, query) start := time.Now() res, err := db.db.ExecContext(ctx, query, args...) - observeDuration(sp, start, op, err) + observeDuration(ctx, sp, start, op, err) if err != nil { return nil, err } @@ -252,7 +252,7 @@ func (db *DB) Ping(ctx context.Context) error { sp, _ := db.startSpan(ctx, op, "") start := time.Now() err := db.db.PingContext(ctx) - observeDuration(sp, start, op, err) + observeDuration(ctx, sp, start, op, err) return err } @@ -262,7 +262,7 @@ func (db *DB) Prepare(ctx context.Context, query string) (*Stmt, error) { sp, _ := db.startSpan(ctx, op, query) start := time.Now() stmt, err := db.db.PrepareContext(ctx, query) - observeDuration(sp, start, op, err) + observeDuration(ctx, sp, start, op, err) if err != nil { return nil, err } @@ -276,7 +276,7 @@ func (db *DB) Query(ctx context.Context, query string, args ...interface{}) (*sq sp, _ := db.startSpan(ctx, op, query) start := time.Now() rows, err := db.db.QueryContext(ctx, query, args...) - observeDuration(sp, start, op, err) + observeDuration(ctx, sp, start, op, err) if err != nil { return nil, err } @@ -290,7 +290,7 @@ func (db *DB) QueryRow(ctx context.Context, query string, args ...interface{}) * sp, _ := db.startSpan(ctx, op, query) start := time.Now() row := db.db.QueryRowContext(ctx, query, args...) - observeDuration(sp, start, op, nil) + observeDuration(ctx, sp, start, op, nil) return row } @@ -315,7 +315,7 @@ func (db *DB) Stats(ctx context.Context) sql.DBStats { sp, _ := db.startSpan(ctx, op, "") start := time.Now() stats := db.db.Stats() - observeDuration(sp, start, op, nil) + observeDuration(ctx, sp, start, op, nil) return stats } @@ -332,7 +332,7 @@ func (s *Stmt) Close(ctx context.Context) error { sp, _ := s.startSpan(ctx, op, "") start := time.Now() err := s.stmt.Close() - observeDuration(sp, start, op, err) + observeDuration(ctx, sp, start, op, err) return err } @@ -342,7 +342,7 @@ func (s *Stmt) Exec(ctx context.Context, args ...interface{}) (sql.Result, error sp, _ := s.startSpan(ctx, op, s.query) start := time.Now() res, err := s.stmt.ExecContext(ctx, args...) - observeDuration(sp, start, op, err) + observeDuration(ctx, sp, start, op, err) if err != nil { return nil, err } @@ -356,7 +356,7 @@ func (s *Stmt) Query(ctx context.Context, args ...interface{}) (*sql.Rows, error sp, _ := s.startSpan(ctx, op, s.query) start := time.Now() rows, err := s.stmt.QueryContext(ctx, args...) - observeDuration(sp, start, op, err) + observeDuration(ctx, sp, start, op, err) if err != nil { return nil, err } @@ -370,7 +370,7 @@ func (s *Stmt) QueryRow(ctx context.Context, args ...interface{}) *sql.Row { sp, _ := s.startSpan(ctx, op, s.query) start := time.Now() row := s.stmt.QueryRowContext(ctx, args...) - observeDuration(sp, start, op, nil) + observeDuration(ctx, sp, start, op, nil) return row } @@ -386,7 +386,7 @@ func (tx *Tx) Commit(ctx context.Context) error { sp, _ := tx.startSpan(ctx, op, "") start := time.Now() err := tx.tx.Commit() - observeDuration(sp, start, op, err) + observeDuration(ctx, sp, start, op, err) return err } @@ -396,7 +396,7 @@ func (tx *Tx) Exec(ctx context.Context, query string, args ...interface{}) (sql. sp, _ := tx.startSpan(ctx, op, query) start := time.Now() res, err := tx.tx.ExecContext(ctx, query, args...) - observeDuration(sp, start, op, err) + observeDuration(ctx, sp, start, op, err) if err != nil { return nil, err } @@ -410,7 +410,7 @@ func (tx *Tx) Prepare(ctx context.Context, query string) (*Stmt, error) { sp, _ := tx.startSpan(ctx, op, query) start := time.Now() stmt, err := tx.tx.PrepareContext(ctx, query) - observeDuration(sp, start, op, err) + observeDuration(ctx, sp, start, op, err) if err != nil { return nil, err } @@ -424,7 +424,7 @@ func (tx *Tx) Query(ctx context.Context, query string, args ...interface{}) (*sq sp, _ := tx.startSpan(ctx, op, query) start := time.Now() rows, err := tx.tx.QueryContext(ctx, query, args...) - observeDuration(sp, start, op, err) + observeDuration(ctx, sp, start, op, err) if err != nil { return nil, err } @@ -437,7 +437,7 @@ func (tx *Tx) QueryRow(ctx context.Context, query string, args ...interface{}) * sp, _ := tx.startSpan(ctx, op, query) start := time.Now() row := tx.tx.QueryRowContext(ctx, query, args...) - observeDuration(sp, start, op, nil) + observeDuration(ctx, sp, start, op, nil) return row } @@ -447,7 +447,7 @@ func (tx *Tx) Rollback(ctx context.Context) error { sp, _ := tx.startSpan(ctx, op, "") start := time.Now() err := tx.tx.Rollback() - observeDuration(sp, start, op, err) + observeDuration(ctx, sp, start, op, err) return err } @@ -457,7 +457,7 @@ func (tx *Tx) Stmt(ctx context.Context, stmt *Stmt) *Stmt { sp, _ := tx.startSpan(ctx, op, stmt.query) start := time.Now() st := &Stmt{stmt: tx.tx.StmtContext(ctx, stmt.stmt), connInfo: tx.connInfo, query: stmt.query} - observeDuration(sp, start, op, nil) + observeDuration(ctx, sp, start, op, nil) return st } @@ -491,7 +491,11 @@ func parseDSN(dsn string) DSNInfo { return res } -func observeDuration(span opentracing.Span, start time.Time, op string, err error) { +func observeDuration(ctx context.Context, span opentracing.Span, start time.Time, op string, err error) { trace.SpanComplete(span, err) - opDurationMetrics.WithLabelValues(op, strconv.FormatBool(err == nil)).Observe(time.Since(start).Seconds()) + + durationHistogram := trace.Histogram{ + Observer: opDurationMetrics.WithLabelValues(op, strconv.FormatBool(err == nil)), + } + durationHistogram.Observe(ctx, time.Since(start).Seconds()) } diff --git a/client/sqs/v2/publisher.go b/client/sqs/v2/publisher.go index f21041af3..f49638375 100644 --- a/client/sqs/v2/publisher.go +++ b/client/sqs/v2/publisher.go @@ -63,7 +63,7 @@ func (p Publisher) Publish(ctx context.Context, msg *sqs.SendMessageInput) (mess start := time.Now() out, err := p.api.SendMessageWithContext(ctx, msg) - observePublish(span, start, *msg.QueueUrl, err) + observePublish(ctx, span, start, *msg.QueueUrl, err) if err != nil { return "", fmt.Errorf("failed to publish message: %w", err) } @@ -101,7 +101,11 @@ func injectHeaders(span opentracing.Span, input *sqs.SendMessageInput) error { return nil } -func observePublish(span opentracing.Span, start time.Time, queue string, err error) { +func observePublish(ctx context.Context, span opentracing.Span, start time.Time, queue string, err error) { trace.SpanComplete(span, err) - publishDurationMetrics.WithLabelValues(queue, strconv.FormatBool(err != nil)).Observe(time.Since(start).Seconds()) + + durationHistogram := trace.Histogram{ + Observer: publishDurationMetrics.WithLabelValues(queue, strconv.FormatBool(err == nil)), + } + durationHistogram.Observe(ctx, time.Since(start).Seconds()) } diff --git a/component/amqp/component.go b/component/amqp/component.go index 323843d36..50beed9e7 100644 --- a/component/amqp/component.go +++ b/component/amqp/component.go @@ -38,9 +38,9 @@ const ( ) var ( - messageAge *prometheus.GaugeVec - messageCounter *prometheus.CounterVec - queueSize *prometheus.GaugeVec + messageAge *prometheus.GaugeVec + messageCounterVec *prometheus.CounterVec + queueSize *prometheus.GaugeVec ) func init() { @@ -54,7 +54,7 @@ func init() { []string{"queue"}, ) prometheus.MustRegister(messageAge) - messageCounter = prometheus.NewCounterVec( + messageCounterVec = prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: "component", Subsystem: "amqp", @@ -63,7 +63,7 @@ func init() { }, []string{"queue", "state", "hasError"}, ) - prometheus.MustRegister(messageCounter) + prometheus.MustRegister(messageCounterVec) queueSize = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: "component", @@ -219,7 +219,7 @@ func (c *Component) processLoop(ctx context.Context, sub subscription) error { return errors.New("subscription channel closed") } log.Debugf("processing message %d", delivery.DeliveryTag) - observeReceivedMessageStats(c.queueCfg.queue, delivery.Timestamp) + observeReceivedMessageStats(ctx, c.queueCfg.queue, delivery.Timestamp) c.processBatch(ctx, c.createMessage(ctx, delivery), btc) case <-batchTimeout.C: log.Debugf("batch timeout expired, sending batch") @@ -233,9 +233,9 @@ func (c *Component) processLoop(ctx context.Context, sub subscription) error { } } -func observeReceivedMessageStats(queue string, timestamp time.Time) { +func observeReceivedMessageStats(ctx context.Context, queue string, timestamp time.Time) { messageAge.WithLabelValues(queue).Set(time.Now().UTC().Sub(timestamp).Seconds()) - messageCountInc(queue, fetchedMessageState, nil) + messageCountInc(ctx, queue, fetchedMessageState, nil) } type subscription struct { @@ -329,12 +329,13 @@ func (c *Component) stats(sub subscription) error { return nil } -func messageCountInc(queue string, state messageState, err error) { +func messageCountInc(ctx context.Context, queue string, state messageState, err error) { hasError := "false" if err != nil { hasError = "true" } - messageCounter.WithLabelValues(queue, string(state), hasError).Inc() + messageStatusCounter := trace.Counter{Counter: messageCounterVec.WithLabelValues(queue, string(state), hasError)} + messageStatusCounter.Inc(ctx) } func mapHeader(hh amqp.Table) map[string]string { diff --git a/component/amqp/message.go b/component/amqp/message.go index ea71b9f3a..e6ce98024 100644 --- a/component/amqp/message.go +++ b/component/amqp/message.go @@ -71,13 +71,13 @@ func (m message) Message() amqp.Delivery { func (m message) ACK() error { err := m.msg.Ack(false) trace.SpanComplete(m.span, err) - messageCountInc(m.queue, ackMessageState, err) + messageCountInc(m.ctx, m.queue, ackMessageState, err) return err } func (m message) NACK() error { err := m.msg.Nack(false, m.requeue) - messageCountInc(m.queue, nackMessageState, err) + messageCountInc(m.ctx, m.queue, nackMessageState, err) trace.SpanComplete(m.span, err) return err } diff --git a/component/async/component.go b/component/async/component.go index 33a3d75e1..ec4aaf196 100644 --- a/component/async/component.go +++ b/component/async/component.go @@ -8,6 +8,7 @@ import ( patronErrors "github.com/beatlabs/patron/errors" "github.com/beatlabs/patron/log" + "github.com/beatlabs/patron/trace" "github.com/prometheus/client_golang/prometheus" ) @@ -28,8 +29,9 @@ func init() { prometheus.MustRegister(consumerErrors) } -func consumerErrorsInc(name string) { - consumerErrors.WithLabelValues(name).Inc() +func consumerErrorsInc(ctx context.Context, name string) { + consumerErrorsCounter := trace.Counter{Counter: consumerErrors.WithLabelValues(name)} + consumerErrorsCounter.Inc(ctx) } // Component implementation of a async component. @@ -160,7 +162,7 @@ func (c *Component) Run(ctx context.Context) error { if ctx.Err() == context.Canceled { break } - consumerErrorsInc(c.name) + consumerErrorsInc(ctx, c.name) if c.retries > 0 { log.Errorf("failed run, retry %d/%d with %v wait: %v", i, c.retries, c.retryWait, err) time.Sleep(c.retryWait) diff --git a/component/grpc/observability.go b/component/grpc/observability.go index ff0e0329f..42a64cc70 100644 --- a/component/grpc/observability.go +++ b/component/grpc/observability.go @@ -111,12 +111,19 @@ func (o *observer) log(err error) { func (o *observer) messageHandled(err error) { st, _ := status.FromError(err) - rpcHandledMetric.WithLabelValues(o.typ, o.service, o.method, st.Code().String()).Inc() + rpcHandledCounter := trace.Counter{ + Counter: rpcHandledMetric.WithLabelValues(o.typ, o.service, o.method, st.Code().String()), + } + rpcHandledCounter.Inc(o.ctx) } func (o *observer) messageLatency(dur time.Duration, err error) { st, _ := status.FromError(err) - rpcLatencyMetric.WithLabelValues(o.typ, o.service, o.method, st.Code().String()).Observe(dur.Seconds()) + + rpcLatencyMetricObserver := trace.Histogram{ + Observer: rpcLatencyMetric.WithLabelValues(o.typ, o.service, o.method, st.Code().String()), + } + rpcLatencyMetricObserver.Observe(o.ctx, dur.Seconds()) } func observableUnaryInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { diff --git a/component/http/metric.go b/component/http/metric.go index e7c2c3f40..bdf8f78d6 100644 --- a/component/http/metric.go +++ b/component/http/metric.go @@ -1,9 +1,10 @@ package http import ( + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" ) func metricRoute() *RouteBuilder { - return NewRawRouteBuilder("/metrics", promhttp.Handler().ServeHTTP).MethodGet() + return NewRawRouteBuilder("/metrics", promhttp.HandlerFor(prometheus.DefaultGatherer, promhttp.HandlerOpts{EnableOpenMetrics: true}).ServeHTTP).MethodGet() } diff --git a/component/http/middleware.go b/component/http/middleware.go index 986c26f07..ae8c48dd8 100644 --- a/component/http/middleware.go +++ b/component/http/middleware.go @@ -188,7 +188,7 @@ func initHTTPServerMetrics() { // metrics are exposed via Prometheus. // This middleware is enabled by default. func NewRequestObserverMiddleware(method, path string) MiddlewareFunc { - // register Promethus metrics on first use + // register Prometheus metrics on first use httpStatusTracingInit.Do(initHTTPServerMetrics) return func(next http.Handler) http.Handler { @@ -199,8 +199,16 @@ func NewRequestObserverMiddleware(method, path string) MiddlewareFunc { // collect metrics about HTTP server-side handling and latency status := strconv.Itoa(lw.Status()) - httpStatusTracingHandledMetric.WithLabelValues(method, path, status).Inc() - httpStatusTracingLatencyMetric.WithLabelValues(method, path, status).Observe(time.Since(now).Seconds()) + + httpStatusCounter := trace.Counter{ + Counter: httpStatusTracingHandledMetric.WithLabelValues(method, path, status), + } + httpStatusCounter.Inc(r.Context()) + + httpLatencyMetricObserver := trace.Histogram{ + Observer: httpStatusTracingLatencyMetric.WithLabelValues(method, path, status), + } + httpLatencyMetricObserver.Observe(r.Context(), time.Since(now).Seconds()) }) } } diff --git a/component/kafka/group/component.go b/component/kafka/group/component.go index d3ef1476e..7b8ec08d6 100644 --- a/component/kafka/group/component.go +++ b/component/kafka/group/component.go @@ -92,8 +92,11 @@ func topicPartitionOffsetDiffGaugeSet(group, topic string, partition int32, high } // messageStatusCountInc increments the messageStatus counter for a certain status. -func messageStatusCountInc(status, group, topic string) { - messageStatus.WithLabelValues(status, group, topic).Inc() +func messageStatusCountInc(ctx context.Context, status, group, topic string) { + httpStatusCounter := trace.Counter{ + Counter: messageStatus.WithLabelValues(status, group, topic), + } + httpStatusCounter.Inc(ctx) } // New initializes a new kafka consumer component with support for functional configuration. @@ -318,7 +321,7 @@ func (c *consumerHandler) ConsumeClaim(session sarama.ConsumerGroupSession, clai if ok { log.Debugf("message claimed: value = %s, timestamp = %v, topic = %s", string(msg.Value), msg.Timestamp, msg.Topic) topicPartitionOffsetDiffGaugeSet(c.group, msg.Topic, msg.Partition, claim.HighWaterMarkOffset(), msg.Offset) - messageStatusCountInc(messageReceived, c.group, msg.Topic) + messageStatusCountInc(c.ctx, messageReceived, c.group, msg.Topic) err := c.insertMessage(session, msg) if err != nil { return err @@ -347,7 +350,7 @@ func (c *consumerHandler) flush(session sarama.ConsumerGroupSession) error { if len(c.msgBuf) > 0 { messages := make([]kafka.Message, 0, len(c.msgBuf)) for _, msg := range c.msgBuf { - messageStatusCountInc(messageProcessed, c.group, msg.Topic) + messageStatusCountInc(c.ctx, messageProcessed, c.group, msg.Topic) ctx, sp := c.getContextWithCorrelation(msg) messages = append(messages, kafka.NewMessage(ctx, sp, msg)) } @@ -385,7 +388,7 @@ func (c *consumerHandler) executeFailureStrategy(messages []kafka.Message, err e case kafka.ExitStrategy: for _, m := range messages { trace.SpanError(m.Span()) - messageStatusCountInc(messageErrored, c.group, m.Message().Topic) + messageStatusCountInc(c.ctx, messageErrored, c.group, m.Message().Topic) } log.Errorf("could not process message(s)") c.err = err @@ -393,8 +396,8 @@ func (c *consumerHandler) executeFailureStrategy(messages []kafka.Message, err e case kafka.SkipStrategy: for _, m := range messages { trace.SpanError(m.Span()) - messageStatusCountInc(messageErrored, c.group, m.Message().Topic) - messageStatusCountInc(messageSkipped, c.group, m.Message().Topic) + messageStatusCountInc(c.ctx, messageErrored, c.group, m.Message().Topic) + messageStatusCountInc(c.ctx, messageSkipped, c.group, m.Message().Topic) } log.Errorf("could not process message(s) so skipping with error: %v", err) default: diff --git a/component/sqs/component.go b/component/sqs/component.go index 2f7b406f0..e43e91eff 100644 --- a/component/sqs/component.go +++ b/component/sqs/component.go @@ -46,9 +46,9 @@ const ( ) var ( - messageAge *prometheus.GaugeVec - messageCounter *prometheus.CounterVec - queueSize *prometheus.GaugeVec + messageAge *prometheus.GaugeVec + messageCounterVec *prometheus.CounterVec + queueSize *prometheus.GaugeVec ) func init() { @@ -62,7 +62,7 @@ func init() { []string{"queue"}, ) prometheus.MustRegister(messageAge) - messageCounter = prometheus.NewCounterVec( + messageCounterVec = prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: "component", Subsystem: "sqs", @@ -71,7 +71,7 @@ func init() { }, []string{"queue", "state", "hasError"}, ) - prometheus.MustRegister(messageCounter) + prometheus.MustRegister(messageCounterVec) queueSize = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: "component", @@ -228,7 +228,7 @@ func (c *Component) consume(ctx context.Context, chErr chan error) { } logger.Debugf("Consume: received %d messages", len(output.Messages)) - messageCountInc(c.queue.name, fetchedMessageState, len(output.Messages)) + messageCountInc(ctx, c.queue.name, fetchedMessageState, false, len(output.Messages)) if len(output.Messages) == 0 { continue @@ -330,12 +330,16 @@ func observerMessageAge(queue string, attributes map[string]*string) { messageAge.WithLabelValues(queue).Set(time.Now().UTC().Sub(time.Unix(timestamp, 0)).Seconds()) } -func messageCountInc(queue string, state messageState, count int) { - messageCounter.WithLabelValues(queue, string(state), "false").Add(float64(count)) -} +func messageCountInc(ctx context.Context, queue string, state messageState, hasError bool, count int) { + hasErrorString := "false" + if hasError { + hasErrorString = "true" + } -func messageCountErrorInc(queue string, state messageState, count int) { - messageCounter.WithLabelValues(queue, string(state), "true").Add(float64(count)) + messageCounter := trace.Counter{ + Counter: messageCounterVec.WithLabelValues(queue, string(state), hasErrorString), + } + messageCounter.Add(ctx, float64(count)) } func getCorrelationID(ma map[string]*sqs.MessageAttributeValue) string { diff --git a/component/sqs/message.go b/component/sqs/message.go index d80a23620..22dc90d6b 100644 --- a/component/sqs/message.go +++ b/component/sqs/message.go @@ -79,17 +79,17 @@ func (m message) ACK() error { ReceiptHandle: m.msg.ReceiptHandle, }) if err != nil { - messageCountErrorInc(m.queue.name, ackMessageState, 1) + messageCountInc(m.ctx, m.queue.name, ackMessageState, true, 1) trace.SpanError(m.span) return err } - messageCountInc(m.queue.name, ackMessageState, 1) + messageCountInc(m.ctx, m.queue.name, ackMessageState, false, 1) trace.SpanSuccess(m.span) return nil } func (m message) NACK() { - messageCountInc(m.queue.name, nackMessageState, 1) + messageCountInc(m.ctx, m.queue.name, nackMessageState, false, 1) trace.SpanSuccess(m.span) } @@ -117,7 +117,7 @@ func (b batch) ACK() ([]Message, error) { QueueUrl: aws.String(b.queue.url), }) if err != nil { - messageCountErrorInc(b.queue.name, ackMessageState, len(b.messages)) + messageCountInc(b.ctx, b.queue.name, ackMessageState, true, len(b.messages)) for _, msg := range b.messages { trace.SpanError(msg.Span()) } @@ -125,7 +125,7 @@ func (b batch) ACK() ([]Message, error) { } if len(output.Successful) > 0 { - messageCountInc(b.queue.name, ackMessageState, len(output.Successful)) + messageCountInc(b.ctx, b.queue.name, ackMessageState, false, len(output.Successful)) for _, suc := range output.Successful { trace.SpanSuccess(msgMap[aws.StringValue(suc.Id)].Span()) @@ -133,7 +133,7 @@ func (b batch) ACK() ([]Message, error) { } if len(output.Failed) > 0 { - messageCountErrorInc(b.queue.name, ackMessageState, len(output.Failed)) + messageCountInc(b.ctx, b.queue.name, ackMessageState, true, len(output.Failed)) failed := make([]Message, 0, len(output.Failed)) for _, fail := range output.Failed { msg := msgMap[aws.StringValue(fail.Id)] diff --git a/docs/observability/Observability.md b/docs/observability/Observability.md index c4979c38c..a5e73ab71 100644 --- a/docs/observability/Observability.md +++ b/docs/observability/Observability.md @@ -7,4 +7,24 @@ Every component has been integrated with the above library and produces traces a Metrics are can be scraped via the default HTTP component at the `/metrics` route for Prometheus. Traces will be sent to a Jaeger agent, which can be setup through environment variables mentioned in the config section. Sane defaults are applied for making the use easy. -The `component` and `client` packages implement capturing and propagating of metrics and traces. \ No newline at end of file +The `component` and `client` packages implement capturing and propagating of metrics and traces. + +## Prometheus Exemplars + +[OpenTracing](https://opentracing.io) compatible tracing systems such as [Grafana Tempo](https://grafana.com/oss/tempo/) +can work with [Prometheus Exemplars](https://grafana.com/docs/grafana/latest/basics/exemplars/). + +Below are prerequisites for enabling exemplars: + +- Use Prometheus Go client library version 1.4.0 or above. +- Use the new `ExemplarObserver` for `Histogram` or `ExemplarAdder` for `Counter` + because the original interfaces has not been changed for the backward compatibility. +- Use `ObserveWithExemplar` or `AddWithExemplar` methods noting the `TraceID` key — it is needed later to configure + Grafana, so that it knows which label to use to retrieve the `TraceID` + +An example of enabling exemplars in an already instrumented Go application can be found [here](../trace/metric.go) +where exemplars are enabled for `Histogram` and `Counter` metrics. + +The result of the above steps is attached trace IDs to metrics via exemplars. +When querying `/metrics` endpoint `curl -H "Accept: application/openmetrics-text" :/metrics` +exemplars will be present in metrics entry after `#` in [Open Metrics](https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md#exemplars-1) format. \ No newline at end of file diff --git a/trace/metric.go b/trace/metric.go new file mode 100644 index 000000000..ed71c1699 --- /dev/null +++ b/trace/metric.go @@ -0,0 +1,64 @@ +package trace + +import ( + "context" + + "github.com/opentracing/opentracing-go" + "github.com/prometheus/client_golang/prometheus" + "github.com/uber/jaeger-client-go" +) + +// Counter is a wrapper of a prometheus.Counter. +type Counter struct { + prometheus.Counter +} + +// Add adds the given value to the counter. If there is a span associated with a context ctx the method +// replaces the currently saved exemplar (if any) with a new one, created from the provided value. +func (c *Counter) Add(ctx context.Context, count float64) { + spanFromCtx := opentracing.SpanFromContext(ctx) + if spanFromCtx != nil { + if sctx, ok := spanFromCtx.Context().(jaeger.SpanContext); ok { + if counter, ok := c.Counter.(prometheus.ExemplarAdder); ok { + counter.AddWithExemplar(count, prometheus.Labels{TraceID: sctx.TraceID().String()}) + return + } + } + } + c.Counter.Add(count) +} + +// Inc increments the given value to the counter. If there is a span associated with a context ctx the method +// replaces the currently saved exemplar (if any) with a new one, created from the provided value. +func (c *Counter) Inc(ctx context.Context) { + spanFromCtx := opentracing.SpanFromContext(ctx) + if spanFromCtx != nil { + if sctx, ok := spanFromCtx.Context().(jaeger.SpanContext); ok { + if counter, ok := c.Counter.(prometheus.ExemplarAdder); ok { + counter.AddWithExemplar(1, prometheus.Labels{TraceID: sctx.TraceID().String()}) + return + } + } + } + c.Counter.Add(1) +} + +// Histogram is a wrapper of a prometheus.Observer. +type Histogram struct { + prometheus.Observer +} + +// Observe adds an observation. If there is a span associated with a context ctx the method replaces +// the currently saved exemplar (if any) with a new one, created from the provided value. +func (h *Histogram) Observe(ctx context.Context, v float64) { + spanFromCtx := opentracing.SpanFromContext(ctx) + if spanFromCtx != nil { + if sctx, ok := spanFromCtx.Context().(jaeger.SpanContext); ok { + if observer, ok := h.Observer.(prometheus.ExemplarObserver); ok { + observer.ObserveWithExemplar(v, prometheus.Labels{TraceID: sctx.TraceID().String()}) + return + } + } + } + h.Observer.Observe(v) +} diff --git a/trace/metric_test.go b/trace/metric_test.go new file mode 100644 index 000000000..0a67fedc7 --- /dev/null +++ b/trace/metric_test.go @@ -0,0 +1,223 @@ +package trace + +import ( + "context" + "fmt" + "testing" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" + dto "github.com/prometheus/client_model/go" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestCounter_Add(t *testing.T) { + t.Parallel() + type fields struct { + counter prometheus.Counter + } + type args struct { + count float64 + } + tests := map[string]struct { + fields fields + args args + expectedVal float64 + expectedPanic bool + }{ + "test-add-counter": { + fields: fields{ + counter: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "test_counter", + }, + []string{"name"}, + ).WithLabelValues("test"), + }, + args: args{ + count: 2, + }, + expectedVal: 2, + expectedPanic: false, + }, + "test-try-to-decrease-counter": { + fields: fields{ + counter: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "test_counter", + }, + []string{"name"}, + ).WithLabelValues("test"), + }, + args: args{ + count: -2, + }, + expectedPanic: true, + }, + } + for name, tt := range tests { + tt := tt + t.Run(name, func(t *testing.T) { + t.Parallel() + if tt.expectedPanic { + defer func() { + if r := recover(); r == nil { + t.Error("Add method did not panic.") + } + }() + } + c := Counter{ + Counter: tt.fields.counter, + } + c.Add(context.Background(), tt.args.count) + if tt.expectedPanic { + defer func() { + if r := recover(); r == nil { + t.Error("Add method did not panic.") + } + }() + } else { + assert.Equal(t, tt.expectedVal, testutil.ToFloat64(c)) + c.Add(context.Background(), tt.args.count) + assert.Equal(t, 2*tt.expectedVal, testutil.ToFloat64(c)) + } + }) + } +} + +func TestCounter_Inc(t *testing.T) { + t.Parallel() + type fields struct { + counter prometheus.Counter + } + type args struct { + count int + } + tests := map[string]struct { + fields fields + args args + expectedVal float64 + }{ + "test-inc-counter": { + fields: fields{ + counter: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "test_counter", + }, + []string{"name"}, + ).WithLabelValues("test"), + }, + args: args{ + count: 2, + }, + expectedVal: 1, + }, + } + for name, tt := range tests { + tt := tt + t.Run(name, func(t *testing.T) { + t.Parallel() + c := Counter{ + Counter: tt.fields.counter, + } + c.Inc(context.Background()) + assert.Equal(t, tt.expectedVal, testutil.ToFloat64(c)) + c.Inc(context.Background()) + assert.Equal(t, 2*tt.expectedVal, testutil.ToFloat64(c)) + }) + } +} + +func TestHistogram_Observe(t *testing.T) { + t.Parallel() + type fields struct { + histogram *prometheus.HistogramVec + } + type args struct { + val float64 + } + tests := map[string]struct { + fields fields + args args + expectedVal float64 + }{ + "test-observe-histogram": { + fields: fields{ + histogram: prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "test_histogram", + }, + []string{"name"}, + ), + }, + args: args{ + val: 2, + }, + expectedVal: 2, + }, + "test-observe-histogram-negative-value": { + fields: fields{ + histogram: prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "test_histogram", + }, + []string{"name"}, + ), + }, + args: args{ + val: -2, + }, + expectedVal: -2, + }, + } + for name, tt := range tests { + tt := tt + t.Run(name, func(t *testing.T) { + t.Parallel() + h := Histogram{ + Observer: tt.fields.histogram.WithLabelValues("test"), + } + h.Observe(context.Background(), tt.args.val) + actualVal, err := sampleSum(tt.fields.histogram) + require.Nil(t, err) + assert.Equal(t, tt.args.val, actualVal) + h.Observe(context.Background(), tt.args.val) + actualVal, err = sampleSum(tt.fields.histogram) + require.Nil(t, err) + assert.Equal(t, 2*tt.args.val, actualVal) + }) + } +} + +func sampleSum(c prometheus.Collector) (float64, error) { + var ( + m prometheus.Metric + mCount int + mChan = make(chan prometheus.Metric) + done = make(chan struct{}) + ) + + go func() { + for m = range mChan { + mCount++ + } + close(done) + }() + + c.Collect(mChan) + close(mChan) + <-done + + if mCount != 1 { + return -1, fmt.Errorf("collected %d metrics instead of exactly 1", mCount) + } + + pb := &dto.Metric{} + _ = m.Write(pb) + + if pb.Histogram != nil { + return *pb.Histogram.SampleSum, nil + } + return -1, fmt.Errorf("collected a non-histogram metric: %s", pb) +} diff --git a/trace/trace.go b/trace/trace.go index 31ca3de5b..f143a2ce5 100644 --- a/trace/trace.go +++ b/trace/trace.go @@ -18,10 +18,12 @@ import ( ) const ( - // HostsTag is used to tag the components's hosts. + // HostsTag is used to tag the component's hosts. HostsTag = "hosts" - // VersionTag is used to tag the components's version. + // VersionTag is used to tag the component's version. VersionTag = "version" + // TraceID is a label name for a request trace ID + TraceID = "traceID" ) var (