Skip to content

Commit

Permalink
fix: throw away samples on Prometheus Remote Write 4xx errors
Browse files Browse the repository at this point in the history
This change adds concept of Recoverable and Non-Recoverable error in
Notifier interface. Deamon throws away the samples on non-recoverable
error.

PrometheusNotifier was modified to produce Non-recoverable error on 4xx
http error family and on unknown http errors. This should be in complience
with prometheus remote write spec as the client should not retry on 4xx
except on 429.

This also means that host-metering will throw away samples from time
when
- it was misconfigured with wrong URL (404)
- auth issue happened (403)

It doesn't throw away samples on connectivity issues (no response
from server).

Signed-off-by: Petr Vobornik <[email protected]>
  • Loading branch information
pvoborni committed Oct 24, 2023
1 parent 0de885a commit e98bd2b
Show file tree
Hide file tree
Showing 6 changed files with 117 additions and 49 deletions.
33 changes: 18 additions & 15 deletions daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,23 +214,26 @@ func (d *Daemon) notify() error {
return nil
}
logger.Debugf("Sending %d sample(s)...\n", count)
err = d.notifier.Notify(samples, d.hostInfo)
if err != nil {
logger.Warnf("Error calling PrometheusRemoteWrite: %s\n", err.Error())
// clear old samples even on error so that WAL does not grow indefinitely
result := d.notifier.Notify(samples, d.hostInfo)
var truncateError error
switch result.Status {
case notify.StatusOK:
logger.Debugln("Notification successful")
truncateError = d.metricsLog.RemoveSamples(checkpoint)
case notify.StatusNonRecoverableError:
logger.Warnf("Error calling PrometheusRemoteWrite: %s\n", result.Error())
truncateError = d.metricsLog.RemoveSamples(checkpoint)
case notify.StatusRecoverableError:
logger.Warnf("Error calling PrometheusRemoteWrite: %s\n", result.Error())
// clear only old so that WAL does not grow indefinitely on retries
if origCount != count {
err2 := d.metricsLog.RemoveSamples(checkpoint - uint64(count))
if err2 != nil {
logger.Warnf("Error truncating WAL: %s\n", err2.Error())
}
truncateError = d.metricsLog.RemoveSamples(checkpoint - uint64(count))
}
return err
}
err = d.metricsLog.RemoveSamples(checkpoint)
if err != nil {
logger.Warnf("Error truncating WAL: %s\n", err.Error())
return err
if truncateError != nil {
logger.Warnf("Error truncating WAL: %s\n", truncateError.Error())
return result.Err()
}
logger.Debugln("Notification successful")
return nil

return result.Err()
}
12 changes: 6 additions & 6 deletions daemon/daemon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,10 +370,10 @@ type notifyArgs struct {
type mockNotifier struct {
calledWith *notifyArgs
hostChangedTimes uint
result func(samples []prompb.Sample, hostinfo *hostinfo.HostInfo) error
result func(samples []prompb.Sample, hostinfo *hostinfo.HostInfo) *notify.NotifyResult
}

func (n *mockNotifier) Notify(samples []prompb.Sample, hostinfo *hostinfo.HostInfo) error {
func (n *mockNotifier) Notify(samples []prompb.Sample, hostinfo *hostinfo.HostInfo) *notify.NotifyResult {
n.calledWith = &notifyArgs{samples, hostinfo}
return n.result(samples, hostinfo)
}
Expand Down Expand Up @@ -401,14 +401,14 @@ func (n *mockNotifier) CheckWasNotCalled(t *testing.T) {
}

func (n *mockNotifier) ExpectError(err error) {
n.result = func(samples []prompb.Sample, hostinfo *hostinfo.HostInfo) error {
return err
n.result = func(samples []prompb.Sample, hostinfo *hostinfo.HostInfo) *notify.NotifyResult {
return notify.RecoverableErrorResult(err)
}
}

func (n *mockNotifier) ExpectSuccess() {
n.result = func(samples []prompb.Sample, hostinfo *hostinfo.HostInfo) error {
return nil
n.result = func(samples []prompb.Sample, hostinfo *hostinfo.HostInfo) *notify.NotifyResult {
return notify.OKResult()
}
}

Expand Down
3 changes: 2 additions & 1 deletion notify/metricslog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,8 +241,9 @@ func createCorruptedMetrics(t *testing.T, path string) {
}

func checkError(t *testing.T, err error, message string) {
t.Helper()
if err != nil {
t.Fatalf("%s: %v", message, err)
t.Fatalf("%s: %v", message, err.Error())
}
}

Expand Down
38 changes: 37 additions & 1 deletion notify/notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,44 @@ import (
"github.com/prometheus/prometheus/prompb"
)

type NotifyStatus int

const (
StatusOK NotifyStatus = iota
StatusRecoverableError
StatusNonRecoverableError
)

type NotifyResult struct {
Status NotifyStatus
err error
}

func (e *NotifyResult) Err() error {
return e.err
}

func (e *NotifyResult) Error() string {
if e.err == nil {
return ""
}
return e.err.Error()
}

func OKResult() *NotifyResult {
return &NotifyResult{StatusOK, nil}
}

func RecoverableErrorResult(err error) *NotifyResult {
return &NotifyResult{StatusRecoverableError, err}
}

func NonRecoverableErrorResult(err error) *NotifyResult {
return &NotifyResult{StatusNonRecoverableError, err}
}

type Notifier interface {
Notify(samples []prompb.Sample, hostinfo *hostinfo.HostInfo) error
Notify(samples []prompb.Sample, hostinfo *hostinfo.HostInfo) *NotifyResult

// HostChanged tells notifier that related information on host has changed
HostChanged()
Expand Down
27 changes: 17 additions & 10 deletions notify/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,18 +34,25 @@ func NewPrometheusNotifier(cfg *config.Config) *PrometheusNotifier {
}
}

func (n *PrometheusNotifier) Notify(samples []prompb.Sample, hostinfo *hostinfo.HostInfo) error {
func (n *PrometheusNotifier) Notify(samples []prompb.Sample, hostinfo *hostinfo.HostInfo) *NotifyResult {
if !n.validClient || n.client == nil {
if err := n.createHttpClient(); err != nil {
return err
return RecoverableErrorResult(err)
}
n.validClient = true
}
request, err := newPrometheusRequest(hostinfo, n.cfg, samples)
if err != nil {
return err
return RecoverableErrorResult(err)
}
err, recoverable := prometheusRemoteWrite(n.client, n.cfg, request)
if err == nil {
return OKResult()
}
if recoverable {
return RecoverableErrorResult(err)
}
return prometheusRemoteWrite(n.client, n.cfg, request)
return NonRecoverableErrorResult(err)
}

func (n *PrometheusNotifier) HostChanged() {
Expand Down Expand Up @@ -80,7 +87,7 @@ func newMTLSHttpClient(keypair tls.Certificate, timeout time.Duration) (*http.Cl
}, nil
}

func prometheusRemoteWrite(httpClient *http.Client, cfg *config.Config, httpRequest *http.Request) error {
func prometheusRemoteWrite(httpClient *http.Client, cfg *config.Config, httpRequest *http.Request) (err error, recoverable bool) {
var attempt uint = 0
maxRetryWait := cfg.WriteRetryMaxInt
retryWait := cfg.WriteRetryMinInt
Expand All @@ -89,11 +96,11 @@ func prometheusRemoteWrite(httpClient *http.Client, cfg *config.Config, httpRequ
resp, err := httpClient.Do(httpRequest)

if err != nil {
return fmt.Errorf("PrometheusRemoteWrite: %w", err)
return fmt.Errorf("PrometheusRemoteWrite: %w", err), true
}
defer resp.Body.Close()
if resp.StatusCode/100 == 2 {
return nil // success
return nil, true // success
}
body, err := io.ReadAll(resp.Body)
if err == nil {
Expand All @@ -112,12 +119,12 @@ func prometheusRemoteWrite(httpClient *http.Client, cfg *config.Config, httpRequ
continue
}
if resp.StatusCode/100 == 4 {
return fmt.Errorf("PrometheusRemoteWrite: Http Error: %d, failing", resp.StatusCode)
return fmt.Errorf("PrometheusRemoteWrite: Http Error: %d, failing", resp.StatusCode), false
}
return fmt.Errorf("PrometheusRemoteWrite: Unexpected Http Status: %d", resp.StatusCode)
return fmt.Errorf("PrometheusRemoteWrite: Unexpected Http Status: %d", resp.StatusCode), false
}

return fmt.Errorf("PrometheusRemoteWrite: Failed after %d attempts", attempt)
return fmt.Errorf("PrometheusRemoteWrite: Failed after %d attempts", attempt), true
}

func newPrometheusRequest(hostinfo *hostinfo.HostInfo, cfg *config.Config, samples []prompb.Sample) (
Expand Down
53 changes: 37 additions & 16 deletions notify/prometheus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,23 +105,26 @@ func TestNotify(t *testing.T) {
cfg.WriteUrl = server.URL + writeUrlPath

// Test that notify returns no error
err := n.Notify(samples, hostinfo)
checkError(t, err, "Failed to notify")
result := n.Notify(samples, hostinfo)
checkError(t, result.Err(), "Failed to notify")
checkCalled(t, called, 1)
if result.Status != StatusOK {
t.Fatalf("Expected status OK, got %d", result.Status)
}

// Test that http client is still the same after next request
httpClient := n.client
err = n.Notify(samples, hostinfo)
checkError(t, err, "Failed to notify")
result = n.Notify(samples, hostinfo)
checkError(t, result.Err(), "Failed to notify")
if httpClient != n.client {
t.Fatalf("Expected client to be reused")
}
checkCalled(t, called, 2)

// Test that http client is recreated when host info changes
n.HostChanged()
err = n.Notify(samples, hostinfo)
checkError(t, err, "Failed to notify")
result = n.Notify(samples, hostinfo)
checkError(t, result.Err(), "Failed to notify")

if httpClient == n.client {
t.Fatalf("Expected client to be recreated")
Expand All @@ -140,9 +143,9 @@ func TestNotifyNoCert(t *testing.T) {
samples := createSamples()
hostinfo := createHostInfo()

err := n.Notify(samples, hostinfo)
result := n.Notify(samples, hostinfo)

if !strings.Contains(err.Error(), "no such file or directory") {
if !strings.Contains(result.Error(), "no such file or directory") {
t.Fatal("Expected error on not finding certicate")
}
}
Expand All @@ -169,9 +172,9 @@ func TestNotifyRequestError(t *testing.T) {
defer server.Close()
cfg.WriteUrl = server.URL + writeUrlPath

err := n.Notify(samples, hostinfo)
if err == nil {
t.Fatal("Expected error on request failure")
result := n.Notify(samples, hostinfo)
if result.Status == StatusOK {
t.Fatal("Expected error status on request failure")
}
if called != 1 {
t.Fatalf("Expected to call server once, got %d", called)
Expand Down Expand Up @@ -210,16 +213,18 @@ func TestRetriesAndBackoff(t *testing.T) {
request, _ := http.NewRequest("POST", cfg.WriteUrl, nil)

// Test that retries are done as expected and it will fail
err := prometheusRemoteWrite(client, cfg, request)
err, recoverable := prometheusRemoteWrite(client, cfg, request)
if err == nil {
t.Fatal("Expected error on request failure")
}
checkRecoverable(t, recoverable)
checkCalled(t, called, int(cfg.WriteRetryAttempts))

// Test that retries are done as expected and it will succeed
called = 0
cfg.WriteRetryAttempts = 3
err = prometheusRemoteWrite(client, cfg, request)
err, recoverable = prometheusRemoteWrite(client, cfg, request)
checkRecoverable(t, recoverable)
checkError(t, err, "Failed to send request")
}

Expand Down Expand Up @@ -258,17 +263,19 @@ func TestNoRetriesOn4xx(t *testing.T) {
request, _ := http.NewRequest("POST", cfg.WriteUrl, nil)

// Test that retries are done as expected and it will fail
err := prometheusRemoteWrite(client, cfg, request)
err, recoverable := prometheusRemoteWrite(client, cfg, request)
checkNonRecoverable(t, recoverable)
checkExpectedErrorContains(t, err, "Http Error: 400")
checkCalled(t, called, 1)

// Test that retries are done on 429 but not on subsequest 404
err = prometheusRemoteWrite(client, cfg, request)
err, recoverable = prometheusRemoteWrite(client, cfg, request)
checkNonRecoverable(t, recoverable)
checkExpectedErrorContains(t, err, "Http Error: 404")
checkCalled(t, called, 1+2)

// Last request is 200 and that should succeed without retries
err = prometheusRemoteWrite(client, cfg, request)
err, _ = prometheusRemoteWrite(client, cfg, request)
checkError(t, err, "Failed to send request")
checkCalled(t, called, 1+2+1)
}
Expand Down Expand Up @@ -372,6 +379,20 @@ func checkExpectedErrorContains(t *testing.T, err error, message string) {
}
}

func checkRecoverable(t *testing.T, recoverable bool) {
t.Helper()
if !recoverable {
t.Fatalf("Expected error to be recoverable")
}
}

func checkNonRecoverable(t *testing.T, recoverable bool) {
t.Helper()
if recoverable {
t.Fatalf("Expected error to be non-recoverable")
}
}

// Check that labels follow:
// - SHOULD contain a __name__ label.
// - MUST NOT contain repeated label names.
Expand Down

0 comments on commit e98bd2b

Please sign in to comment.