From edb32d83dbd6638020ff118fdcebb7ac99d7f068 Mon Sep 17 00:00:00 2001 From: shantanualshi Date: Mon, 18 Nov 2024 17:00:08 +0530 Subject: [PATCH] Add metrics to aggregated metrics ingestion --- pkg/pattern/aggregation/metrics.go | 77 ++++++++++++++++++++++++++-- pkg/pattern/aggregation/push.go | 38 ++++++++++++++ pkg/pattern/aggregation/push_test.go | 3 +- pkg/pattern/ingester.go | 1 + 4 files changed, 115 insertions(+), 4 deletions(-) diff --git a/pkg/pattern/aggregation/metrics.go b/pkg/pattern/aggregation/metrics.go index d777af50b8130..1757a279287d5 100644 --- a/pkg/pattern/aggregation/metrics.go +++ b/pkg/pattern/aggregation/metrics.go @@ -5,13 +5,26 @@ import ( "github.com/prometheus/client_golang/prometheus/promauto" ) -type ChunkMetrics struct { +type AggregationMetrics struct { chunks *prometheus.GaugeVec samples *prometheus.CounterVec + + // push operation + pushErrors *prometheus.CounterVec + pushRetries *prometheus.CounterVec + pushSuccesses *prometheus.CounterVec + payloadSize *prometheus.HistogramVec + + // Batch metrics + streamsPerPush *prometheus.HistogramVec + entriesPerPush *prometheus.HistogramVec + servicesTracked *prometheus.GaugeVec + + writeTimeout *prometheus.CounterVec } -func NewChunkMetrics(r prometheus.Registerer, metricsNamespace string) *ChunkMetrics { - return &ChunkMetrics{ +func NewMetrics(r prometheus.Registerer, metricsNamespace string) *AggregationMetrics { + return &AggregationMetrics{ chunks: promauto.With(r).NewGaugeVec(prometheus.GaugeOpts{ Namespace: metricsNamespace, Subsystem: "pattern_ingester", @@ -24,5 +37,63 @@ func NewChunkMetrics(r prometheus.Registerer, metricsNamespace string) *ChunkMet Name: "metric_samples", Help: "The total number of samples in memory.", }, []string{"service_name"}), + pushErrors: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ + Namespace: metricsNamespace, + Subsystem: "pattern_ingester", + Name: "push_errors_total", + Help: "Total number of errors when pushing metrics to Loki.", + }, []string{"tenant_id", "error_type"}), + + pushRetries: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ + Namespace: metricsNamespace, + Subsystem: "pattern_ingester", + Name: "push_retries_total", + Help: "Total number of retries when pushing metrics to Loki.", + }, []string{"tenant_id"}), + + pushSuccesses: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ + Namespace: metricsNamespace, + Subsystem: "pattern_ingester", + Name: "push_successes_total", + Help: "Total number of successful pushes to Loki.", + }, []string{"tenant_id"}), + + // Batch metrics + payloadSize: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{ + Namespace: metricsNamespace, + Subsystem: "pattern_ingester", + Name: "push_payload_bytes", + Help: "Size of push payloads in bytes.", + Buckets: []float64{1024, 4096, 16384, 65536, 262144, 1048576}, + }, []string{"tenant_id"}), + + streamsPerPush: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{ + Namespace: metricsNamespace, + Subsystem: "pattern_ingester", + Name: "streams_per_push", + Help: "Number of streams in each push request.", + Buckets: []float64{1, 5, 10, 25, 50, 100, 250, 500, 1000}, + }, []string{"tenant_id"}), + + entriesPerPush: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{ + Namespace: metricsNamespace, + Subsystem: "pattern_ingester", + Name: "entries_per_push", + Help: "Number of entries in each push request.", + Buckets: []float64{10, 50, 100, 500, 1000, 5000, 10000}, + }, []string{"tenant_id"}), + + servicesTracked: promauto.With(r).NewGaugeVec(prometheus.GaugeOpts{ + Namespace: metricsNamespace, + Subsystem: "pattern_ingester", + Name: "services_tracked", + Help: "Number of unique services being tracked.", + }, []string{"tenant_id"}), + writeTimeout: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ + Namespace: metricsNamespace, + Subsystem: "pattern_ingester", + Name: "write_timeouts_total", + Help: "Total number of write timeouts.", + }, []string{"tenant_id"}), } } diff --git a/pkg/pattern/aggregation/push.go b/pkg/pattern/aggregation/push.go index a282913fe5081..dff7d2939a362 100644 --- a/pkg/pattern/aggregation/push.go +++ b/pkg/pattern/aggregation/push.go @@ -4,6 +4,7 @@ import ( "bufio" "bytes" "context" + "errors" "fmt" "io" "net/http" @@ -16,6 +17,7 @@ import ( "github.com/go-kit/log/level" "github.com/golang/snappy" "github.com/opentracing/opentracing-go" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/config" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" @@ -71,6 +73,8 @@ type Push struct { backoff *backoff.Config entries entries + + metrics *AggregationMetrics } type entry struct { @@ -108,6 +112,7 @@ func NewPush( useTLS bool, backoffCfg *backoff.Config, logger log.Logger, + registrer prometheus.Registerer, ) (*Push, error) { client, err := config.NewClientFromConfig(cfg, "pattern-ingester-push", config.WithHTTP2Disabled()) if err != nil { @@ -142,6 +147,7 @@ func NewPush( entries: entries{ entries: make([]entry, 0), }, + metrics: NewMetrics(registrer, "pattern_ingester"), } go p.run(pushPeriod) @@ -222,6 +228,10 @@ func (p *Push) buildPayload(ctx context.Context) ([]byte, error) { payload = snappy.Encode(nil, payload) + p.metrics.streamsPerPush.WithLabelValues(p.tenantID).Observe(float64(len(streams))) + p.metrics.entriesPerPush.WithLabelValues(p.tenantID).Observe(float64(len(entries))) + p.metrics.servicesTracked.WithLabelValues(p.tenantID).Set(float64(len(entriesByStream))) + sp.LogKV( "event", "build aggregated metrics payload", "num_service", len(entriesByStream), @@ -287,6 +297,31 @@ func (p *Push) run(pushPeriod time.Duration) { } } +func (p *Push) sendPayload(ctx context.Context, payload []byte) (int, error) { + + status, err := p.send(ctx, payload) + if err != nil { + errorType := "unknown" + if status == 429 { + errorType = "rate_limited" + } else if status/100 == 5 { + errorType = "server_error" + } else if status/100 != 2 { + errorType = "client_error" + } + p.metrics.pushErrors.WithLabelValues(p.tenantID, errorType).Inc() + } else { + p.metrics.pushSuccesses.WithLabelValues(p.tenantID).Inc() + } + if err != nil { + return 0, err + } + + p.metrics.payloadSize.WithLabelValues(p.tenantID).Observe(float64(len(payload))) + + return status, err +} + // send makes one attempt to send the payload to Loki func (p *Push) send(ctx context.Context, payload []byte) (int, error) { var ( @@ -320,6 +355,9 @@ func (p *Push) send(ctx context.Context, payload []byte) (int, error) { resp, err = p.httpClient.Do(req) if err != nil { + if errors.Is(ctx.Err(), context.DeadlineExceeded) { + p.metrics.writeTimeout.WithLabelValues(p.tenantID).Inc() + } return -1, fmt.Errorf("failed to push payload: %w", err) } status := resp.StatusCode diff --git a/pkg/pattern/aggregation/push_test.go b/pkg/pattern/aggregation/push_test.go index 149b54a977151..cf45dbe0dabdd 100644 --- a/pkg/pattern/aggregation/push_test.go +++ b/pkg/pattern/aggregation/push_test.go @@ -58,6 +58,7 @@ func Test_Push(t *testing.T) { false, &backoff, log.NewNopLogger(), + nil, ) require.NoError(t, err) ts, payload := testPayload() @@ -82,7 +83,7 @@ func Test_Push(t *testing.T) { "user", "secret", false, &backoff, - log.NewNopLogger(), + log.NewNopLogger(), nil, ) require.NoError(t, err) ts, payload := testPayload() diff --git a/pkg/pattern/ingester.go b/pkg/pattern/ingester.go index 90e69f8433333..1b88387d7b55b 100644 --- a/pkg/pattern/ingester.go +++ b/pkg/pattern/ingester.go @@ -403,6 +403,7 @@ func (i *Ingester) GetOrCreateInstance(instanceID string) (*instance, error) { / aggCfg.UseTLS, &aggCfg.BackoffConfig, i.logger, + i.registerer, ) if err != nil { return nil, err