From 5b08980dc966ebc6990cccd4d7cafd4984e4df85 Mon Sep 17 00:00:00 2001 From: Bartlomiej Plotka Date: Thu, 26 Sep 2024 15:52:50 +0200 Subject: [PATCH] fixed remote write regression (#88) * fixed regressions around series leak due to races; refactor for clarity Signed-off-by: bwplotka * fixed remote write regresion. Fixes https://github.com/prometheus-community/avalanche/issues/86 Signed-off-by: bwplotka --------- Signed-off-by: bwplotka --- cmd/avalanche.go | 21 +++++++++++---------- metrics/write.go | 22 ++++++++++++++++------ 2 files changed, 27 insertions(+), 16 deletions(-) diff --git a/cmd/avalanche.go b/cmd/avalanche.go index 6b3649e..489a1e4 100644 --- a/cmd/avalanche.go +++ b/cmd/avalanche.go @@ -147,18 +147,19 @@ func main() { }() } - // First cut: just send the metrics once then exit - if err := metrics.SendRemoteWrite(config, reg); err != nil { - log.Fatal(err) - } - if *remotePprofInterval > 0 { - done <- struct{}{} - wg.Wait() - } - return + ctx, cancel := context.WithCancel(context.Background()) + g.Add(func() error { + if err := metrics.SendRemoteWrite(ctx, config, reg); err != nil { + return err + } + if *remotePprofInterval > 0 { + done <- struct{}{} + wg.Wait() + } + return nil // One-off. + }, func(error) { cancel() }) } - // Standard mode for continuous exposure of metrics. httpSrv := &http.Server{Addr: fmt.Sprintf(":%v", *port)} g.Add(func() error { fmt.Printf("Serving your metrics at :%v/metrics\n", *port) diff --git a/metrics/write.go b/metrics/write.go index d20af03..8c66faa 100644 --- a/metrics/write.go +++ b/metrics/write.go @@ -66,7 +66,7 @@ type Client struct { // SendRemoteWrite initializes a http client and // sends metrics to a prometheus compatible remote endpoint. -func SendRemoteWrite(config *ConfigWrite, gatherer prometheus.Gatherer) error { +func SendRemoteWrite(ctx context.Context, config *ConfigWrite, gatherer prometheus.Gatherer) error { var rt http.RoundTripper = &http.Transport{ TLSClientConfig: &config.TLSClientConfig, } @@ -79,7 +79,7 @@ func SendRemoteWrite(config *ConfigWrite, gatherer prometheus.Gatherer) error { config: config, gatherer: gatherer, } - return c.write() + return c.write(ctx) } // Add the tenant ID header @@ -109,7 +109,7 @@ func cloneRequest(r *http.Request) *http.Request { return r2 } -func (c *Client) write() error { +func (c *Client) write(ctx context.Context) error { tss, err := collectMetrics(c.gatherer, c.config.OutOfOrder) if err != nil { return err @@ -125,10 +125,14 @@ func (c *Client) write() error { merr = &errors.MultiError{} ) - log.Printf("Sending: %v timeseries, %v samples, %v timeseries per request, %v delay between requests\n", len(tss), c.config.RequestCount, c.config.BatchSize, c.config.RequestInterval) + log.Printf("Sending: %v timeseries, %v samples, %v timeseries per request, %v delay between requests\n", len(tss), c.config.RequestCount, c.config.BatchSize, c.config.RequestInterval) ticker := time.NewTicker(c.config.RequestInterval) defer ticker.Stop() for ii := 0; ii < c.config.RequestCount; ii++ { + if ctx.Err() != nil { + return ctx.Err() + } + // Download the pprofs during half of the iteration to get avarege readings. // Do that only when it is not set to take profiles at a given interval. if len(c.config.PprofURLs) > 0 && ii == c.config.RequestCount/2 { @@ -223,6 +227,7 @@ func ToTimeSeriesSlice(metricFamilies []*dto.MetricFamily) []prompb.TimeSeries { tss := make([]prompb.TimeSeries, 0, len(metricFamilies)*10) timestamp := int64(model.Now()) // Not using metric.TimestampMs because it is (always?) nil. Is this right? + skippedSamples := 0 for _, metricFamily := range metricFamilies { for _, metric := range metricFamily.Metric { labels := prompbLabels(*metricFamily.Name, metric.Label) @@ -235,16 +240,21 @@ func ToTimeSeriesSlice(metricFamilies []*dto.MetricFamily) []prompb.TimeSeries { Value: *metric.Counter.Value, Timestamp: timestamp, }} + tss = append(tss, ts) case dto.MetricType_GAUGE: ts.Samples = []prompb.Sample{{ Value: *metric.Gauge.Value, Timestamp: timestamp, }} + tss = append(tss, ts) + default: + skippedSamples++ } - tss = append(tss, ts) } } - + if skippedSamples > 0 { + log.Printf("WARN: Skipping %v samples; sending only %v samples, given only gauge and counters are currently implemented\n", skippedSamples, len(tss)) + } return tss }