Skip to content

Commit

Permalink
Add metrics to aggregated metrics ingestion
Browse files Browse the repository at this point in the history
  • Loading branch information
shantanualsi committed Nov 18, 2024
1 parent df7a8e4 commit edb32d8
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 4 deletions.
77 changes: 74 additions & 3 deletions pkg/pattern/aggregation/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,26 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
)

type ChunkMetrics struct {
type AggregationMetrics struct {

Check warning on line 8 in pkg/pattern/aggregation/metrics.go

View workflow job for this annotation

GitHub Actions / check / golangciLint

exported: type name will be used as aggregation.AggregationMetrics by other packages, and that stutters; consider calling this Metrics (revive)
chunks *prometheus.GaugeVec
samples *prometheus.CounterVec

// push operation
pushErrors *prometheus.CounterVec
pushRetries *prometheus.CounterVec
pushSuccesses *prometheus.CounterVec
payloadSize *prometheus.HistogramVec

// Batch metrics
streamsPerPush *prometheus.HistogramVec
entriesPerPush *prometheus.HistogramVec
servicesTracked *prometheus.GaugeVec

writeTimeout *prometheus.CounterVec
}

func NewChunkMetrics(r prometheus.Registerer, metricsNamespace string) *ChunkMetrics {
return &ChunkMetrics{
func NewMetrics(r prometheus.Registerer, metricsNamespace string) *AggregationMetrics {
return &AggregationMetrics{
chunks: promauto.With(r).NewGaugeVec(prometheus.GaugeOpts{
Namespace: metricsNamespace,
Subsystem: "pattern_ingester",
Expand All @@ -24,5 +37,63 @@ func NewChunkMetrics(r prometheus.Registerer, metricsNamespace string) *ChunkMet
Name: "metric_samples",
Help: "The total number of samples in memory.",
}, []string{"service_name"}),
pushErrors: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Namespace: metricsNamespace,
Subsystem: "pattern_ingester",
Name: "push_errors_total",
Help: "Total number of errors when pushing metrics to Loki.",
}, []string{"tenant_id", "error_type"}),

pushRetries: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Namespace: metricsNamespace,
Subsystem: "pattern_ingester",
Name: "push_retries_total",
Help: "Total number of retries when pushing metrics to Loki.",
}, []string{"tenant_id"}),

pushSuccesses: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Namespace: metricsNamespace,
Subsystem: "pattern_ingester",
Name: "push_successes_total",
Help: "Total number of successful pushes to Loki.",
}, []string{"tenant_id"}),

// Batch metrics
payloadSize: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{
Namespace: metricsNamespace,
Subsystem: "pattern_ingester",
Name: "push_payload_bytes",
Help: "Size of push payloads in bytes.",
Buckets: []float64{1024, 4096, 16384, 65536, 262144, 1048576},
}, []string{"tenant_id"}),

streamsPerPush: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{
Namespace: metricsNamespace,
Subsystem: "pattern_ingester",
Name: "streams_per_push",
Help: "Number of streams in each push request.",
Buckets: []float64{1, 5, 10, 25, 50, 100, 250, 500, 1000},
}, []string{"tenant_id"}),

entriesPerPush: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{
Namespace: metricsNamespace,
Subsystem: "pattern_ingester",
Name: "entries_per_push",
Help: "Number of entries in each push request.",
Buckets: []float64{10, 50, 100, 500, 1000, 5000, 10000},
}, []string{"tenant_id"}),

servicesTracked: promauto.With(r).NewGaugeVec(prometheus.GaugeOpts{
Namespace: metricsNamespace,
Subsystem: "pattern_ingester",
Name: "services_tracked",
Help: "Number of unique services being tracked.",
}, []string{"tenant_id"}),
writeTimeout: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Namespace: metricsNamespace,
Subsystem: "pattern_ingester",
Name: "write_timeouts_total",
Help: "Total number of write timeouts.",
}, []string{"tenant_id"}),
}
}
38 changes: 38 additions & 0 deletions pkg/pattern/aggregation/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bufio"
"bytes"
"context"
"errors"
"fmt"
"io"
"net/http"
Expand All @@ -16,6 +17,7 @@ import (
"github.com/go-kit/log/level"
"github.com/golang/snappy"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/config"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
Expand Down Expand Up @@ -71,6 +73,8 @@ type Push struct {
backoff *backoff.Config

entries entries

metrics *AggregationMetrics
}

type entry struct {
Expand Down Expand Up @@ -108,6 +112,7 @@ func NewPush(
useTLS bool,
backoffCfg *backoff.Config,
logger log.Logger,
registrer prometheus.Registerer,
) (*Push, error) {
client, err := config.NewClientFromConfig(cfg, "pattern-ingester-push", config.WithHTTP2Disabled())
if err != nil {
Expand Down Expand Up @@ -142,6 +147,7 @@ func NewPush(
entries: entries{
entries: make([]entry, 0),
},
metrics: NewMetrics(registrer, "pattern_ingester"),
}

go p.run(pushPeriod)
Expand Down Expand Up @@ -222,6 +228,10 @@ func (p *Push) buildPayload(ctx context.Context) ([]byte, error) {

payload = snappy.Encode(nil, payload)

p.metrics.streamsPerPush.WithLabelValues(p.tenantID).Observe(float64(len(streams)))
p.metrics.entriesPerPush.WithLabelValues(p.tenantID).Observe(float64(len(entries)))
p.metrics.servicesTracked.WithLabelValues(p.tenantID).Set(float64(len(entriesByStream)))

sp.LogKV(
"event", "build aggregated metrics payload",
"num_service", len(entriesByStream),
Expand Down Expand Up @@ -287,6 +297,31 @@ func (p *Push) run(pushPeriod time.Duration) {
}
}

func (p *Push) sendPayload(ctx context.Context, payload []byte) (int, error) {

status, err := p.send(ctx, payload)
if err != nil {
errorType := "unknown"
if status == 429 {
errorType = "rate_limited"
} else if status/100 == 5 {
errorType = "server_error"
} else if status/100 != 2 {
errorType = "client_error"
}
p.metrics.pushErrors.WithLabelValues(p.tenantID, errorType).Inc()
} else {
p.metrics.pushSuccesses.WithLabelValues(p.tenantID).Inc()
}
if err != nil {
return 0, err
}

p.metrics.payloadSize.WithLabelValues(p.tenantID).Observe(float64(len(payload)))

return status, err
}

// send makes one attempt to send the payload to Loki
func (p *Push) send(ctx context.Context, payload []byte) (int, error) {
var (
Expand Down Expand Up @@ -320,6 +355,9 @@ func (p *Push) send(ctx context.Context, payload []byte) (int, error) {

resp, err = p.httpClient.Do(req)
if err != nil {
if errors.Is(ctx.Err(), context.DeadlineExceeded) {
p.metrics.writeTimeout.WithLabelValues(p.tenantID).Inc()
}
return -1, fmt.Errorf("failed to push payload: %w", err)
}
status := resp.StatusCode
Expand Down
3 changes: 2 additions & 1 deletion pkg/pattern/aggregation/push_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ func Test_Push(t *testing.T) {
false,
&backoff,
log.NewNopLogger(),
nil,
)
require.NoError(t, err)
ts, payload := testPayload()
Expand All @@ -82,7 +83,7 @@ func Test_Push(t *testing.T) {
"user", "secret",
false,
&backoff,
log.NewNopLogger(),
log.NewNopLogger(), nil,
)
require.NoError(t, err)
ts, payload := testPayload()
Expand Down
1 change: 1 addition & 0 deletions pkg/pattern/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,7 @@ func (i *Ingester) GetOrCreateInstance(instanceID string) (*instance, error) { /
aggCfg.UseTLS,
&aggCfg.BackoffConfig,
i.logger,
i.registerer,
)
if err != nil {
return nil, err
Expand Down

0 comments on commit edb32d8

Please sign in to comment.