diff --git a/daemon/daemon.go b/daemon/daemon.go index e9d2388..1dbb2d0 100644 --- a/daemon/daemon.go +++ b/daemon/daemon.go @@ -1,6 +1,7 @@ package daemon import ( + "errors" "fmt" "os" "os/signal" @@ -215,22 +216,29 @@ func (d *Daemon) notify() error { } logger.Debugf("Sending %d sample(s)...\n", count) err = d.notifier.Notify(samples, d.hostInfo) - if err != nil { - logger.Warnf("Error calling PrometheusRemoteWrite: %s\n", err.Error()) - // clear old samples even on error so that WAL does not grow indefinitely + var notifyError *notify.NotifyError + var truncateError error + if err == nil { + // clear all samples on success as they were accepted by the server + logger.Debugln("Notification successful") + truncateError = d.metricsLog.RemoveSamples(checkpoint) + } else if errors.As(err, ¬ifyError) && !notifyError.Recoverable() { + // clear all samples on non-recoverable error + logger.Warnf("Notification: %s\n", notifyError.Error()) + truncateError = d.metricsLog.RemoveSamples(checkpoint) + } else { + // don't clear or clear only old so that WAL does not grow indefinitely on retries + // on recoverable or unknowns errors + logger.Warnf("Notification: %s\n", err.Error()) if origCount != count { - err2 := d.metricsLog.RemoveSamples(checkpoint - uint64(count)) - if err2 != nil { - logger.Warnf("Error truncating WAL: %s\n", err2.Error()) - } + truncateError = d.metricsLog.RemoveSamples(checkpoint - uint64(count)) } - return err } - err = d.metricsLog.RemoveSamples(checkpoint) - if err != nil { - logger.Warnf("Error truncating WAL: %s\n", err.Error()) + + if truncateError != nil { + logger.Warnf("Error truncating WAL: %s\n", truncateError.Error()) return err } - logger.Debugln("Notification successful") - return nil + + return err } diff --git a/daemon/daemon_test.go b/daemon/daemon_test.go index 15de5c5..346f885 100644 --- a/daemon/daemon_test.go +++ b/daemon/daemon_test.go @@ -2,6 +2,7 @@ package daemon import ( "errors" + "fmt" "testing" "time" @@ -200,6 +201,33 @@ func TestNotify(t *testing.T) { t.Fatalf("expected non-expired sample to be passed to notifier") } + // Test that only expired samples are pruned on recoverable error + _, checkpoint, _ := metricsLog.GetSamples() + metricsLog.RemoveSamples(checkpoint) + expiredTs = time.Now().UnixMilli() - 11000 + metricsLog.WriteSample(1, expiredTs) + metricsLog.WriteSampleNow(2) + notifier.ExpectError(notify.RecoverableError(fmt.Errorf("mocked"))) + err = daemon.notify() + checkExpectedError(t, err, "recoverable notify error: mocked") + samples, _, _ = metricsLog.GetSamples() + if len(samples) != 1 { + t.Fatalf("expected expired sample to be pruned") + } + if samples[0].Value != 2 { + t.Fatalf("expected non-expired sample to be kept") + } + + // Test that all samples are pruned on non-recoverable error + metricsLog.WriteSampleNow(2) + metricsLog.WriteSampleNow(2) + notifier.ExpectError(notify.NonRecoverableError(fmt.Errorf("mocked"))) + err = daemon.notify() + checkExpectedError(t, err, "non-recoverable notify error: mocked") + samples, _, _ = metricsLog.GetSamples() + if len(samples) != 0 { + t.Fatalf("expected all samples to be pruned") + } } // Helper functions diff --git a/notify/metricslog_test.go b/notify/metricslog_test.go index b3e1939..2bb1c1e 100644 --- a/notify/metricslog_test.go +++ b/notify/metricslog_test.go @@ -241,8 +241,9 @@ func createCorruptedMetrics(t *testing.T, path string) { } func checkError(t *testing.T, err error, message string) { + t.Helper() if err != nil { - t.Fatalf("%s: %v", message, err) + t.Fatalf("%s: %v", message, err.Error()) } } diff --git a/notify/notifier.go b/notify/notifier.go index 9b02f7f..c88080e 100644 --- a/notify/notifier.go +++ b/notify/notifier.go @@ -1,12 +1,45 @@ package notify import ( + "fmt" "time" "github.com/RedHatInsights/host-metering/hostinfo" "github.com/prometheus/prometheus/prompb" ) +type NotifyError struct { + recoverable bool + wrappedErr error +} + +func (e *NotifyError) Error() string { + str := "non-recoverable notify error" + if e.recoverable { + str = "recoverable notify error" + } + if e.wrappedErr == nil { + return str + } + return fmt.Errorf("%s: %w", str, e.wrappedErr).Error() +} + +func (e *NotifyError) Recoverable() bool { + return e.recoverable +} + +func (e *NotifyError) Unwrap() error { + return e.wrappedErr +} + +func RecoverableError(err error) *NotifyError { + return &NotifyError{recoverable: true, wrappedErr: err} +} + +func NonRecoverableError(err error) *NotifyError { + return &NotifyError{recoverable: false, wrappedErr: err} +} + type Notifier interface { Notify(samples []prompb.Sample, hostinfo *hostinfo.HostInfo) error diff --git a/notify/notifier_test.go b/notify/notifier_test.go index 243f0fa..5cdef15 100644 --- a/notify/notifier_test.go +++ b/notify/notifier_test.go @@ -1,6 +1,8 @@ package notify import ( + "errors" + "fmt" "testing" "time" @@ -39,3 +41,26 @@ func TestFilterSamplesByAge(t *testing.T) { } } + +func TestNotifyError(t *testing.T) { + wrappedErr := RecoverableError(fmt.Errorf("wrapped")) + + if wrappedErr.Error() != "recoverable notify error: wrapped" { + t.Errorf("Expected error message 'recoverable notify error: wrapped', got %s", wrappedErr.Error()) + } + + if wrappedErr.Recoverable() != true { + t.Errorf("Expected recoverable error, got non-recoverable") + } + + wrappedErr = NonRecoverableError(fmt.Errorf("wrapped")) + + if wrappedErr.Error() != "non-recoverable notify error: wrapped" { + t.Errorf("Expected error message 'non-recoverable notify error: wrapped', got %s", wrappedErr.Error()) + } + + err := errors.Unwrap(wrappedErr) + if err.Error() != "wrapped" { + t.Errorf("Expected error message 'wrapped', got %s", err.Error()) + } +} diff --git a/notify/prometheus.go b/notify/prometheus.go index abde181..af52eac 100644 --- a/notify/prometheus.go +++ b/notify/prometheus.go @@ -37,13 +37,13 @@ func NewPrometheusNotifier(cfg *config.Config) *PrometheusNotifier { func (n *PrometheusNotifier) Notify(samples []prompb.Sample, hostinfo *hostinfo.HostInfo) error { if !n.validClient || n.client == nil { if err := n.createHttpClient(); err != nil { - return err + return RecoverableError(err) } n.validClient = true } request, err := newPrometheusRequest(hostinfo, n.cfg, samples) if err != nil { - return err + return RecoverableError(err) } return prometheusRemoteWrite(n.client, n.cfg, request) } @@ -89,7 +89,7 @@ func prometheusRemoteWrite(httpClient *http.Client, cfg *config.Config, httpRequ resp, err := httpClient.Do(httpRequest) if err != nil { - return fmt.Errorf("PrometheusRemoteWrite: %w", err) + return RecoverableError(err) } defer resp.Body.Close() if resp.StatusCode/100 == 2 { @@ -112,12 +112,12 @@ func prometheusRemoteWrite(httpClient *http.Client, cfg *config.Config, httpRequ continue } if resp.StatusCode/100 == 4 { - return fmt.Errorf("PrometheusRemoteWrite: Http Error: %d, failing", resp.StatusCode) + return NonRecoverableError(fmt.Errorf("http Error: %d", resp.StatusCode)) } - return fmt.Errorf("PrometheusRemoteWrite: Unexpected Http Status: %d", resp.StatusCode) + return NonRecoverableError(fmt.Errorf("unexpected Http Status: %d", resp.StatusCode)) } - return fmt.Errorf("PrometheusRemoteWrite: Failed after %d attempts", attempt) + return RecoverableError(fmt.Errorf("failed after %d attempts", attempt)) } func newPrometheusRequest(hostinfo *hostinfo.HostInfo, cfg *config.Config, samples []prompb.Sample) ( diff --git a/notify/prometheus_test.go b/notify/prometheus_test.go index fc3223a..0c729d6 100644 --- a/notify/prometheus_test.go +++ b/notify/prometheus_test.go @@ -7,6 +7,7 @@ import ( "crypto/x509" "crypto/x509/pkix" "encoding/pem" + "errors" "io" "math/big" "net/http" @@ -214,6 +215,7 @@ func TestRetriesAndBackoff(t *testing.T) { if err == nil { t.Fatal("Expected error on request failure") } + checkRecoverable(t, err) checkCalled(t, called, int(cfg.WriteRetryAttempts)) // Test that retries are done as expected and it will succeed @@ -259,17 +261,19 @@ func TestNoRetriesOn4xx(t *testing.T) { // Test that retries are done as expected and it will fail err := prometheusRemoteWrite(client, cfg, request) - checkExpectedErrorContains(t, err, "Http Error: 400") + checkExpectedErrorContains(t, err, "http Error: 400") + checkNonRecoverable(t, err) checkCalled(t, called, 1) // Test that retries are done on 429 but not on subsequest 404 err = prometheusRemoteWrite(client, cfg, request) - checkExpectedErrorContains(t, err, "Http Error: 404") + checkExpectedErrorContains(t, err, "http Error: 404") + checkNonRecoverable(t, err) checkCalled(t, called, 1+2) // Last request is 200 and that should succeed without retries err = prometheusRemoteWrite(client, cfg, request) - checkError(t, err, "Failed to send request") + checkError(t, err, "failed to send request") checkCalled(t, called, 1+2+1) } @@ -372,6 +376,22 @@ func checkExpectedErrorContains(t *testing.T, err error, message string) { } } +func checkRecoverable(t *testing.T, err error) { + t.Helper() + var notifyError *NotifyError + if errors.As(err, ¬ifyError) && !notifyError.Recoverable() { + t.Fatalf("Expected error to be recoverable. Got: %s", err.Error()) + } +} + +func checkNonRecoverable(t *testing.T, err error) { + t.Helper() + var notifyError *NotifyError + if errors.As(err, ¬ifyError) && notifyError.Recoverable() { + t.Fatalf("Expected error to be non-recoverable. Got: %s", err.Error()) + } +} + // Check that labels follow: // - SHOULD contain a __name__ label. // - MUST NOT contain repeated label names.