Skip to content

Commit

Permalink
fix: throw away samples on Prometheus Remote Write 4xx errors
Browse files Browse the repository at this point in the history
This change adds concept of Recoverable and Non-Recoverable error in
Notifier interface. Deamon throws away the samples on non-recoverable
error.

PrometheusNotifier was modified to produce Non-recoverable error on 4xx
http error family and on unknown http errors. This should be in complience
with prometheus remote write spec as the client should not retry on 4xx
except on 429.

This also means that host-metering will throw away samples from time
when
- it was misconfigured with wrong URL (404)
- auth issue happened (403)

It doesn't throw away samples on connectivity issues (no response
from server).

Signed-off-by: Petr Vobornik <[email protected]>
  • Loading branch information
pvoborni committed Oct 26, 2023
1 parent 0de885a commit 342a8ad
Show file tree
Hide file tree
Showing 7 changed files with 138 additions and 23 deletions.
34 changes: 21 additions & 13 deletions daemon/daemon.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package daemon

import (
"errors"
"fmt"
"os"
"os/signal"
Expand Down Expand Up @@ -215,22 +216,29 @@ func (d *Daemon) notify() error {
}
logger.Debugf("Sending %d sample(s)...\n", count)
err = d.notifier.Notify(samples, d.hostInfo)
if err != nil {
logger.Warnf("Error calling PrometheusRemoteWrite: %s\n", err.Error())
// clear old samples even on error so that WAL does not grow indefinitely
var notifyError *notify.NotifyError
var truncateError error
if err == nil {
// clear all samples on success as they were accepted by the server
logger.Debugln("Notification successful")
truncateError = d.metricsLog.RemoveSamples(checkpoint)
} else if errors.As(err, &notifyError) && !notifyError.Recoverable() {
// clear all samples on non-recoverable error
logger.Warnf("Notification: %s\n", notifyError.Error())
truncateError = d.metricsLog.RemoveSamples(checkpoint)
} else {
// don't clear or clear only old so that WAL does not grow indefinitely on retries
// on recoverable or unknowns errors
logger.Warnf("Notification: %s\n", err.Error())
if origCount != count {
err2 := d.metricsLog.RemoveSamples(checkpoint - uint64(count))
if err2 != nil {
logger.Warnf("Error truncating WAL: %s\n", err2.Error())
}
truncateError = d.metricsLog.RemoveSamples(checkpoint - uint64(count))
}
return err
}
err = d.metricsLog.RemoveSamples(checkpoint)
if err != nil {
logger.Warnf("Error truncating WAL: %s\n", err.Error())

if truncateError != nil {
logger.Warnf("Error truncating WAL: %s\n", truncateError.Error())
return err
}
logger.Debugln("Notification successful")
return nil

return err
}
28 changes: 28 additions & 0 deletions daemon/daemon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package daemon

import (
"errors"
"fmt"
"testing"
"time"

Expand Down Expand Up @@ -200,6 +201,33 @@ func TestNotify(t *testing.T) {
t.Fatalf("expected non-expired sample to be passed to notifier")
}

// Test that only expired samples are pruned on recoverable error
_, checkpoint, _ := metricsLog.GetSamples()
metricsLog.RemoveSamples(checkpoint)
expiredTs = time.Now().UnixMilli() - 11000
metricsLog.WriteSample(1, expiredTs)
metricsLog.WriteSampleNow(2)
notifier.ExpectError(notify.RecoverableError(fmt.Errorf("mocked")))
err = daemon.notify()
checkExpectedError(t, err, "recoverable notify error: mocked")
samples, _, _ = metricsLog.GetSamples()
if len(samples) != 1 {
t.Fatalf("expected expired sample to be pruned")
}
if samples[0].Value != 2 {
t.Fatalf("expected non-expired sample to be kept")
}

// Test that all samples are pruned on non-recoverable error
metricsLog.WriteSampleNow(2)
metricsLog.WriteSampleNow(2)
notifier.ExpectError(notify.NonRecoverableError(fmt.Errorf("mocked")))
err = daemon.notify()
checkExpectedError(t, err, "non-recoverable notify error: mocked")
samples, _, _ = metricsLog.GetSamples()
if len(samples) != 0 {
t.Fatalf("expected all samples to be pruned")
}
}

// Helper functions
Expand Down
3 changes: 2 additions & 1 deletion notify/metricslog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,8 +241,9 @@ func createCorruptedMetrics(t *testing.T, path string) {
}

func checkError(t *testing.T, err error, message string) {
t.Helper()
if err != nil {
t.Fatalf("%s: %v", message, err)
t.Fatalf("%s: %v", message, err.Error())
}
}

Expand Down
33 changes: 33 additions & 0 deletions notify/notifier.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,45 @@
package notify

import (
"fmt"
"time"

"github.com/RedHatInsights/host-metering/hostinfo"
"github.com/prometheus/prometheus/prompb"
)

type NotifyError struct {
recoverable bool
wrappedErr error
}

func (e *NotifyError) Error() string {
str := "non-recoverable notify error"
if e.recoverable {
str = "recoverable notify error"
}
if e.wrappedErr == nil {
return str
}
return fmt.Errorf("%s: %w", str, e.wrappedErr).Error()
}

func (e *NotifyError) Recoverable() bool {
return e.recoverable
}

func (e *NotifyError) Unwrap() error {
return e.wrappedErr
}

func RecoverableError(err error) *NotifyError {
return &NotifyError{recoverable: true, wrappedErr: err}
}

func NonRecoverableError(err error) *NotifyError {
return &NotifyError{recoverable: false, wrappedErr: err}
}

type Notifier interface {
Notify(samples []prompb.Sample, hostinfo *hostinfo.HostInfo) error

Expand Down
25 changes: 25 additions & 0 deletions notify/notifier_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package notify

import (
"errors"
"fmt"
"testing"
"time"

Expand Down Expand Up @@ -39,3 +41,26 @@ func TestFilterSamplesByAge(t *testing.T) {
}

}

func TestNotifyError(t *testing.T) {
wrappedErr := RecoverableError(fmt.Errorf("wrapped"))

if wrappedErr.Error() != "recoverable notify error: wrapped" {
t.Errorf("Expected error message 'recoverable notify error: wrapped', got %s", wrappedErr.Error())
}

if wrappedErr.Recoverable() != true {
t.Errorf("Expected recoverable error, got non-recoverable")
}

wrappedErr = NonRecoverableError(fmt.Errorf("wrapped"))

if wrappedErr.Error() != "non-recoverable notify error: wrapped" {
t.Errorf("Expected error message 'non-recoverable notify error: wrapped', got %s", wrappedErr.Error())
}

err := errors.Unwrap(wrappedErr)
if err.Error() != "wrapped" {
t.Errorf("Expected error message 'wrapped', got %s", err.Error())
}
}
12 changes: 6 additions & 6 deletions notify/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,13 @@ func NewPrometheusNotifier(cfg *config.Config) *PrometheusNotifier {
func (n *PrometheusNotifier) Notify(samples []prompb.Sample, hostinfo *hostinfo.HostInfo) error {
if !n.validClient || n.client == nil {
if err := n.createHttpClient(); err != nil {
return err
return RecoverableError(err)
}
n.validClient = true
}
request, err := newPrometheusRequest(hostinfo, n.cfg, samples)
if err != nil {
return err
return RecoverableError(err)
}
return prometheusRemoteWrite(n.client, n.cfg, request)
}
Expand Down Expand Up @@ -89,7 +89,7 @@ func prometheusRemoteWrite(httpClient *http.Client, cfg *config.Config, httpRequ
resp, err := httpClient.Do(httpRequest)

if err != nil {
return fmt.Errorf("PrometheusRemoteWrite: %w", err)
return RecoverableError(err)
}
defer resp.Body.Close()
if resp.StatusCode/100 == 2 {
Expand All @@ -112,12 +112,12 @@ func prometheusRemoteWrite(httpClient *http.Client, cfg *config.Config, httpRequ
continue
}
if resp.StatusCode/100 == 4 {
return fmt.Errorf("PrometheusRemoteWrite: Http Error: %d, failing", resp.StatusCode)
return NonRecoverableError(fmt.Errorf("http Error: %d", resp.StatusCode))
}
return fmt.Errorf("PrometheusRemoteWrite: Unexpected Http Status: %d", resp.StatusCode)
return NonRecoverableError(fmt.Errorf("unexpected Http Status: %d", resp.StatusCode))
}

return fmt.Errorf("PrometheusRemoteWrite: Failed after %d attempts", attempt)
return RecoverableError(fmt.Errorf("failed after %d attempts", attempt))
}

func newPrometheusRequest(hostinfo *hostinfo.HostInfo, cfg *config.Config, samples []prompb.Sample) (
Expand Down
26 changes: 23 additions & 3 deletions notify/prometheus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"crypto/x509"
"crypto/x509/pkix"
"encoding/pem"
"errors"
"io"
"math/big"
"net/http"
Expand Down Expand Up @@ -214,6 +215,7 @@ func TestRetriesAndBackoff(t *testing.T) {
if err == nil {
t.Fatal("Expected error on request failure")
}
checkRecoverable(t, err)
checkCalled(t, called, int(cfg.WriteRetryAttempts))

// Test that retries are done as expected and it will succeed
Expand Down Expand Up @@ -259,17 +261,19 @@ func TestNoRetriesOn4xx(t *testing.T) {

// Test that retries are done as expected and it will fail
err := prometheusRemoteWrite(client, cfg, request)
checkExpectedErrorContains(t, err, "Http Error: 400")
checkExpectedErrorContains(t, err, "http Error: 400")
checkNonRecoverable(t, err)
checkCalled(t, called, 1)

// Test that retries are done on 429 but not on subsequest 404
err = prometheusRemoteWrite(client, cfg, request)
checkExpectedErrorContains(t, err, "Http Error: 404")
checkExpectedErrorContains(t, err, "http Error: 404")
checkNonRecoverable(t, err)
checkCalled(t, called, 1+2)

// Last request is 200 and that should succeed without retries
err = prometheusRemoteWrite(client, cfg, request)
checkError(t, err, "Failed to send request")
checkError(t, err, "failed to send request")
checkCalled(t, called, 1+2+1)
}

Expand Down Expand Up @@ -372,6 +376,22 @@ func checkExpectedErrorContains(t *testing.T, err error, message string) {
}
}

func checkRecoverable(t *testing.T, err error) {
t.Helper()
var notifyError *NotifyError
if errors.As(err, &notifyError) && !notifyError.Recoverable() {
t.Fatalf("Expected error to be recoverable. Got: %s", err.Error())
}
}

func checkNonRecoverable(t *testing.T, err error) {
t.Helper()
var notifyError *NotifyError
if errors.As(err, &notifyError) && notifyError.Recoverable() {
t.Fatalf("Expected error to be non-recoverable. Got: %s", err.Error())
}
}

// Check that labels follow:
// - SHOULD contain a __name__ label.
// - MUST NOT contain repeated label names.
Expand Down

0 comments on commit 342a8ad

Please sign in to comment.