From 342a8ade1ae29748a26d694b33fe4d13389cde62 Mon Sep 17 00:00:00 2001 From: Petr Vobornik Date: Mon, 23 Oct 2023 15:32:56 +0000 Subject: [PATCH] fix: throw away samples on Prometheus Remote Write 4xx errors This change adds concept of Recoverable and Non-Recoverable error in Notifier interface. Deamon throws away the samples on non-recoverable error. PrometheusNotifier was modified to produce Non-recoverable error on 4xx http error family and on unknown http errors. This should be in complience with prometheus remote write spec as the client should not retry on 4xx except on 429. This also means that host-metering will throw away samples from time when - it was misconfigured with wrong URL (404) - auth issue happened (403) It doesn't throw away samples on connectivity issues (no response from server). Signed-off-by: Petr Vobornik --- daemon/daemon.go | 34 +++++++++++++++++++++------------- daemon/daemon_test.go | 28 ++++++++++++++++++++++++++++ notify/metricslog_test.go | 3 ++- notify/notifier.go | 33 +++++++++++++++++++++++++++++++++ notify/notifier_test.go | 25 +++++++++++++++++++++++++ notify/prometheus.go | 12 ++++++------ notify/prometheus_test.go | 26 +++++++++++++++++++++++--- 7 files changed, 138 insertions(+), 23 deletions(-) diff --git a/daemon/daemon.go b/daemon/daemon.go index e9d2388..1dbb2d0 100644 --- a/daemon/daemon.go +++ b/daemon/daemon.go @@ -1,6 +1,7 @@ package daemon import ( + "errors" "fmt" "os" "os/signal" @@ -215,22 +216,29 @@ func (d *Daemon) notify() error { } logger.Debugf("Sending %d sample(s)...\n", count) err = d.notifier.Notify(samples, d.hostInfo) - if err != nil { - logger.Warnf("Error calling PrometheusRemoteWrite: %s\n", err.Error()) - // clear old samples even on error so that WAL does not grow indefinitely + var notifyError *notify.NotifyError + var truncateError error + if err == nil { + // clear all samples on success as they were accepted by the server + logger.Debugln("Notification successful") + truncateError = d.metricsLog.RemoveSamples(checkpoint) + } else if errors.As(err, ¬ifyError) && !notifyError.Recoverable() { + // clear all samples on non-recoverable error + logger.Warnf("Notification: %s\n", notifyError.Error()) + truncateError = d.metricsLog.RemoveSamples(checkpoint) + } else { + // don't clear or clear only old so that WAL does not grow indefinitely on retries + // on recoverable or unknowns errors + logger.Warnf("Notification: %s\n", err.Error()) if origCount != count { - err2 := d.metricsLog.RemoveSamples(checkpoint - uint64(count)) - if err2 != nil { - logger.Warnf("Error truncating WAL: %s\n", err2.Error()) - } + truncateError = d.metricsLog.RemoveSamples(checkpoint - uint64(count)) } - return err } - err = d.metricsLog.RemoveSamples(checkpoint) - if err != nil { - logger.Warnf("Error truncating WAL: %s\n", err.Error()) + + if truncateError != nil { + logger.Warnf("Error truncating WAL: %s\n", truncateError.Error()) return err } - logger.Debugln("Notification successful") - return nil + + return err } diff --git a/daemon/daemon_test.go b/daemon/daemon_test.go index 15de5c5..346f885 100644 --- a/daemon/daemon_test.go +++ b/daemon/daemon_test.go @@ -2,6 +2,7 @@ package daemon import ( "errors" + "fmt" "testing" "time" @@ -200,6 +201,33 @@ func TestNotify(t *testing.T) { t.Fatalf("expected non-expired sample to be passed to notifier") } + // Test that only expired samples are pruned on recoverable error + _, checkpoint, _ := metricsLog.GetSamples() + metricsLog.RemoveSamples(checkpoint) + expiredTs = time.Now().UnixMilli() - 11000 + metricsLog.WriteSample(1, expiredTs) + metricsLog.WriteSampleNow(2) + notifier.ExpectError(notify.RecoverableError(fmt.Errorf("mocked"))) + err = daemon.notify() + checkExpectedError(t, err, "recoverable notify error: mocked") + samples, _, _ = metricsLog.GetSamples() + if len(samples) != 1 { + t.Fatalf("expected expired sample to be pruned") + } + if samples[0].Value != 2 { + t.Fatalf("expected non-expired sample to be kept") + } + + // Test that all samples are pruned on non-recoverable error + metricsLog.WriteSampleNow(2) + metricsLog.WriteSampleNow(2) + notifier.ExpectError(notify.NonRecoverableError(fmt.Errorf("mocked"))) + err = daemon.notify() + checkExpectedError(t, err, "non-recoverable notify error: mocked") + samples, _, _ = metricsLog.GetSamples() + if len(samples) != 0 { + t.Fatalf("expected all samples to be pruned") + } } // Helper functions diff --git a/notify/metricslog_test.go b/notify/metricslog_test.go index b3e1939..2bb1c1e 100644 --- a/notify/metricslog_test.go +++ b/notify/metricslog_test.go @@ -241,8 +241,9 @@ func createCorruptedMetrics(t *testing.T, path string) { } func checkError(t *testing.T, err error, message string) { + t.Helper() if err != nil { - t.Fatalf("%s: %v", message, err) + t.Fatalf("%s: %v", message, err.Error()) } } diff --git a/notify/notifier.go b/notify/notifier.go index 9b02f7f..c88080e 100644 --- a/notify/notifier.go +++ b/notify/notifier.go @@ -1,12 +1,45 @@ package notify import ( + "fmt" "time" "github.com/RedHatInsights/host-metering/hostinfo" "github.com/prometheus/prometheus/prompb" ) +type NotifyError struct { + recoverable bool + wrappedErr error +} + +func (e *NotifyError) Error() string { + str := "non-recoverable notify error" + if e.recoverable { + str = "recoverable notify error" + } + if e.wrappedErr == nil { + return str + } + return fmt.Errorf("%s: %w", str, e.wrappedErr).Error() +} + +func (e *NotifyError) Recoverable() bool { + return e.recoverable +} + +func (e *NotifyError) Unwrap() error { + return e.wrappedErr +} + +func RecoverableError(err error) *NotifyError { + return &NotifyError{recoverable: true, wrappedErr: err} +} + +func NonRecoverableError(err error) *NotifyError { + return &NotifyError{recoverable: false, wrappedErr: err} +} + type Notifier interface { Notify(samples []prompb.Sample, hostinfo *hostinfo.HostInfo) error diff --git a/notify/notifier_test.go b/notify/notifier_test.go index 243f0fa..5cdef15 100644 --- a/notify/notifier_test.go +++ b/notify/notifier_test.go @@ -1,6 +1,8 @@ package notify import ( + "errors" + "fmt" "testing" "time" @@ -39,3 +41,26 @@ func TestFilterSamplesByAge(t *testing.T) { } } + +func TestNotifyError(t *testing.T) { + wrappedErr := RecoverableError(fmt.Errorf("wrapped")) + + if wrappedErr.Error() != "recoverable notify error: wrapped" { + t.Errorf("Expected error message 'recoverable notify error: wrapped', got %s", wrappedErr.Error()) + } + + if wrappedErr.Recoverable() != true { + t.Errorf("Expected recoverable error, got non-recoverable") + } + + wrappedErr = NonRecoverableError(fmt.Errorf("wrapped")) + + if wrappedErr.Error() != "non-recoverable notify error: wrapped" { + t.Errorf("Expected error message 'non-recoverable notify error: wrapped', got %s", wrappedErr.Error()) + } + + err := errors.Unwrap(wrappedErr) + if err.Error() != "wrapped" { + t.Errorf("Expected error message 'wrapped', got %s", err.Error()) + } +} diff --git a/notify/prometheus.go b/notify/prometheus.go index abde181..af52eac 100644 --- a/notify/prometheus.go +++ b/notify/prometheus.go @@ -37,13 +37,13 @@ func NewPrometheusNotifier(cfg *config.Config) *PrometheusNotifier { func (n *PrometheusNotifier) Notify(samples []prompb.Sample, hostinfo *hostinfo.HostInfo) error { if !n.validClient || n.client == nil { if err := n.createHttpClient(); err != nil { - return err + return RecoverableError(err) } n.validClient = true } request, err := newPrometheusRequest(hostinfo, n.cfg, samples) if err != nil { - return err + return RecoverableError(err) } return prometheusRemoteWrite(n.client, n.cfg, request) } @@ -89,7 +89,7 @@ func prometheusRemoteWrite(httpClient *http.Client, cfg *config.Config, httpRequ resp, err := httpClient.Do(httpRequest) if err != nil { - return fmt.Errorf("PrometheusRemoteWrite: %w", err) + return RecoverableError(err) } defer resp.Body.Close() if resp.StatusCode/100 == 2 { @@ -112,12 +112,12 @@ func prometheusRemoteWrite(httpClient *http.Client, cfg *config.Config, httpRequ continue } if resp.StatusCode/100 == 4 { - return fmt.Errorf("PrometheusRemoteWrite: Http Error: %d, failing", resp.StatusCode) + return NonRecoverableError(fmt.Errorf("http Error: %d", resp.StatusCode)) } - return fmt.Errorf("PrometheusRemoteWrite: Unexpected Http Status: %d", resp.StatusCode) + return NonRecoverableError(fmt.Errorf("unexpected Http Status: %d", resp.StatusCode)) } - return fmt.Errorf("PrometheusRemoteWrite: Failed after %d attempts", attempt) + return RecoverableError(fmt.Errorf("failed after %d attempts", attempt)) } func newPrometheusRequest(hostinfo *hostinfo.HostInfo, cfg *config.Config, samples []prompb.Sample) ( diff --git a/notify/prometheus_test.go b/notify/prometheus_test.go index fc3223a..0c729d6 100644 --- a/notify/prometheus_test.go +++ b/notify/prometheus_test.go @@ -7,6 +7,7 @@ import ( "crypto/x509" "crypto/x509/pkix" "encoding/pem" + "errors" "io" "math/big" "net/http" @@ -214,6 +215,7 @@ func TestRetriesAndBackoff(t *testing.T) { if err == nil { t.Fatal("Expected error on request failure") } + checkRecoverable(t, err) checkCalled(t, called, int(cfg.WriteRetryAttempts)) // Test that retries are done as expected and it will succeed @@ -259,17 +261,19 @@ func TestNoRetriesOn4xx(t *testing.T) { // Test that retries are done as expected and it will fail err := prometheusRemoteWrite(client, cfg, request) - checkExpectedErrorContains(t, err, "Http Error: 400") + checkExpectedErrorContains(t, err, "http Error: 400") + checkNonRecoverable(t, err) checkCalled(t, called, 1) // Test that retries are done on 429 but not on subsequest 404 err = prometheusRemoteWrite(client, cfg, request) - checkExpectedErrorContains(t, err, "Http Error: 404") + checkExpectedErrorContains(t, err, "http Error: 404") + checkNonRecoverable(t, err) checkCalled(t, called, 1+2) // Last request is 200 and that should succeed without retries err = prometheusRemoteWrite(client, cfg, request) - checkError(t, err, "Failed to send request") + checkError(t, err, "failed to send request") checkCalled(t, called, 1+2+1) } @@ -372,6 +376,22 @@ func checkExpectedErrorContains(t *testing.T, err error, message string) { } } +func checkRecoverable(t *testing.T, err error) { + t.Helper() + var notifyError *NotifyError + if errors.As(err, ¬ifyError) && !notifyError.Recoverable() { + t.Fatalf("Expected error to be recoverable. Got: %s", err.Error()) + } +} + +func checkNonRecoverable(t *testing.T, err error) { + t.Helper() + var notifyError *NotifyError + if errors.As(err, ¬ifyError) && notifyError.Recoverable() { + t.Fatalf("Expected error to be non-recoverable. Got: %s", err.Error()) + } +} + // Check that labels follow: // - SHOULD contain a __name__ label. // - MUST NOT contain repeated label names.