From 7b600b6b2c74cb8a31211abf895dc16375f02f5c Mon Sep 17 00:00:00 2001 From: dominicriordan Date: Thu, 16 Nov 2023 18:58:15 +0000 Subject: [PATCH 01/12] splunk backoff retry mechanism to reduce chance of losing logs --- README.md | 2 ++ pumps/http-retry.go | 57 ++++++++++++++++++++++++++++++++++++++++++++ pumps/splunk.go | 50 ++++++++++++++++---------------------- pumps/splunk_test.go | 52 ++++++++++++++++++++++++++++++++++++---- 4 files changed, 128 insertions(+), 33 deletions(-) create mode 100644 pumps/http-retry.go diff --git a/README.md b/README.md index 1aa196d65..6ca65fa97 100644 --- a/README.md +++ b/README.md @@ -775,6 +775,7 @@ Setting up Splunk with a _HTTP Event Collector_ - `ignore_tag_prefix_list`: (optional) Choose which tags to be ignored by the Splunk Pump. Keep in mind that the tag name and value are hyphenated. Type: Type: String Array `[] string`. Default value is `[]` - `enable_batch`: If this is set to `true`, pump is going to send the analytics records in batch to Splunk. Type: Boolean. Default value is `false`. - `max_content_length`: Max content length in bytes to be sent in batch requests. It should match the `max_content_length` configured in Splunk. If the purged analytics records size don't reach the amount of bytes, they're send anyways in each `purge_loop`. Type: Integer. Default value is 838860800 (~ 800 MB), the same default value as Splunk config. +- `max_retries`: Max number of retries if failed to send requests to splunk HEC. Default value is `0`. ###### JSON / Conf File @@ -791,6 +792,7 @@ Setting up Splunk with a _HTTP Event Collector_ "obfuscate_api_keys": true, "obfuscate_api_keys_length": 10, "enable_batch":true, + "max_retries": 2, "fields": [ "method", "host", diff --git a/pumps/http-retry.go b/pumps/http-retry.go new file mode 100644 index 000000000..4ddda8ae9 --- /dev/null +++ b/pumps/http-retry.go @@ -0,0 +1,57 @@ +package pumps + +import ( + "context" + "errors" + "fmt" + "io" + "net/http" + "time" + + "github.com/cenkalti/backoff/v4" + "github.com/sirupsen/logrus" +) + +var errPerm = errors.New("bad request - not retrying") + +type httpSender func(ctx context.Context, data []byte) (*http.Response, error) + +type backoffRetry struct { + errMsg string + maxRetries uint64 + logger *logrus.Entry + httpsend httpSender +} + +func newBackoffRetry(errMsg string, maxRetries uint64, httpSend httpSender, logger *logrus.Entry) *backoffRetry { + return &backoffRetry{errMsg: errMsg, maxRetries: maxRetries, httpsend: httpSend, logger: logger} +} + +func (s *backoffRetry) send(ctx context.Context, data []byte) error { + fn := func() error { + resp, err := s.httpsend(ctx, data) + if err != nil { + return err + } + defer resp.Body.Close() + + if resp.StatusCode == http.StatusOK { + return nil + } + + // server error or rate limit hit - backoff retry + if resp.StatusCode >= http.StatusInternalServerError || resp.StatusCode == http.StatusTooManyRequests { + body, _ := io.ReadAll(resp.Body) + return fmt.Errorf("error status code %d and response '%s'", resp.StatusCode, body) + } + + // any other error treat as permanent (i.e. auth error, invalid request) and don't retry + return backoff.Permanent(errPerm) + } + + return backoff.RetryNotify(fn, backoff.WithMaxRetries(backoff.NewExponentialBackOff(), s.maxRetries), func(err error, t time.Duration) { + if err != nil { + s.logger.WithError(err).Errorf("%s retrying in %s", s.errMsg, t) + } + }) +} diff --git a/pumps/splunk.go b/pumps/splunk.go index 993f82c87..df810ff71 100644 --- a/pumps/splunk.go +++ b/pumps/splunk.go @@ -6,12 +6,12 @@ import ( "crypto/tls" "encoding/json" "errors" + "github.com/cenkalti/backoff/v4" + "github.com/mitchellh/mapstructure" "net/http" "net/url" "strings" - "github.com/mitchellh/mapstructure" - "github.com/TykTechnologies/tyk-pump/analytics" ) @@ -35,8 +35,8 @@ type SplunkClient struct { Token string CollectorURL string TLSSkipVerify bool - - httpClient *http.Client + httpClient *http.Client + retry *backoffRetry } // SplunkPump is a Tyk Pump driver for Splunk. @@ -85,6 +85,8 @@ type SplunkPumpConfig struct { // the amount of bytes, they're send anyways in each `purge_loop`. Default value is 838860800 // (~ 800 MB), the same default value as Splunk config. BatchMaxContentLength int `json:"batch_max_content_length" mapstructure:"batch_max_content_length"` + // MaxRetries the maximum amount of retries if failed to send requests to splunk HEC. Default value is `0` + MaxRetries uint64 `json:"max_retries" mapstructure:"max_retries"` } // New initializes a new pump. @@ -124,6 +126,18 @@ func (p *SplunkPump) Init(config interface{}) error { p.config.BatchMaxContentLength = maxContentLength } + sender := func(ctx context.Context, data []byte) (*http.Response, error) { + reader := bytes.NewReader(data) + req, err := http.NewRequest("POST", p.config.CollectorURL, reader) + if err != nil { + return nil, backoff.Permanent(err) + } + req = req.WithContext(ctx) + req.Header.Add(authHeaderName, authHeaderPrefix+p.client.Token) + return p.client.httpClient.Do(req) + } + + p.client.retry = newBackoffRetry("Failed writing data to Splunk", p.config.MaxRetries, sender, p.log) p.log.Info(p.GetName() + " Initialized") return nil @@ -153,15 +167,6 @@ func (p *SplunkPump) WriteData(ctx context.Context, data []interface{}) error { var batchBuffer bytes.Buffer - fnSendBytes := func(data []byte) error { - _, errSend := p.client.Send(ctx, data) - if errSend != nil { - p.log.Error("Error writing data to Splunk ", errSend) - return errSend - } - return nil - } - for _, v := range data { decoded := v.(analytics.AnalyticsRecord) apiKey := decoded.APIKey @@ -253,14 +258,14 @@ func (p *SplunkPump) WriteData(ctx context.Context, data []interface{}) error { if p.config.EnableBatch { //if we're batching and the len of our data is already bigger than max_content_length, we send the data and reset the buffer if batchBuffer.Len()+len(data) > p.config.BatchMaxContentLength { - if err := fnSendBytes(batchBuffer.Bytes()); err != nil { + if err := p.client.retry.send(ctx, batchBuffer.Bytes()); err != nil { return err } batchBuffer.Reset() } batchBuffer.Write(data) } else { - if err := fnSendBytes(data); err != nil { + if err := p.client.retry.send(ctx, data); err != nil { return err } } @@ -268,7 +273,7 @@ func (p *SplunkPump) WriteData(ctx context.Context, data []interface{}) error { //this if is for data remaining in the buffer when len(buffer) is lower than max_content_length if p.config.EnableBatch && batchBuffer.Len() > 0 { - if err := fnSendBytes(batchBuffer.Bytes()); err != nil { + if err := p.client.retry.send(ctx, batchBuffer.Bytes()); err != nil { return err } batchBuffer.Reset() @@ -310,16 +315,3 @@ func NewSplunkClient(token string, collectorURL string, skipVerify bool, certFil } return c, nil } - -// Send sends an event to the Splunk HTTP Event Collector interface. -func (c *SplunkClient) Send(ctx context.Context, data []byte) (*http.Response, error) { - - reader := bytes.NewReader(data) - req, err := http.NewRequest("POST", c.CollectorURL, reader) - if err != nil { - return nil, err - } - req = req.WithContext(ctx) - req.Header.Add(authHeaderName, authHeaderPrefix+c.Token) - return c.httpClient.Do(req) -} diff --git a/pumps/splunk_test.go b/pumps/splunk_test.go index 22aeb000e..a787c9166 100644 --- a/pumps/splunk_test.go +++ b/pumps/splunk_test.go @@ -26,13 +26,21 @@ type splunkStatus struct { Len int `json:"len"` } type testHandler struct { - test *testing.T - batched bool - - responses []splunkStatus + test *testing.T + batched bool + returnErrorOnFirstReq bool + responses []splunkStatus + reqCount int } func (h *testHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + h.reqCount++ + if h.returnErrorOnFirstReq && h.reqCount == 1 { + w.WriteHeader(http.StatusInternalServerError) + w.Write([]byte("splunk internal error")) + return + } + authHeaderValue := r.Header.Get("authorization") if authHeaderValue == "" { h.test.Fatal("Auth header is empty") @@ -79,6 +87,41 @@ func TestSplunkInit(t *testing.T) { } } +func Test_SplunkBackoffRetry(t *testing.T) { + handler := &testHandler{test: t, batched: false, returnErrorOnFirstReq: true} + server := httptest.NewServer(handler) + defer server.Close() + + pmp := SplunkPump{} + + cfg := make(map[string]interface{}) + cfg["collector_token"] = testToken + cfg["max_retries"] = 1 + cfg["collector_url"] = server.URL + cfg["ssl_insecure_skip_verify"] = true + + if errInit := pmp.Init(cfg); errInit != nil { + t.Error("Error initializing pump") + return + } + + keys := make([]interface{}, 1) + + keys[0] = analytics.AnalyticsRecord{OrgID: "1", APIID: "123", Path: "/test-path", Method: "POST", TimeStamp: time.Now()} + + if errWrite := pmp.WriteData(context.TODO(), keys); errWrite != nil { + t.Error("Error writing to splunk pump:", errWrite.Error()) + return + } + + assert.Equal(t, 1, len(handler.responses)) + + response := handler.responses[0] + + assert.Equal(t, "Success", response.Text) + assert.Equal(t, int32(0), response.Code) +} + func Test_SplunkWriteData(t *testing.T) { handler := &testHandler{test: t, batched: false} server := httptest.NewServer(handler) @@ -112,6 +155,7 @@ func Test_SplunkWriteData(t *testing.T) { assert.Equal(t, "Success", response.Text) assert.Equal(t, int32(0), response.Code) } + func Test_SplunkWriteDataBatch(t *testing.T) { handler := &testHandler{test: t, batched: true} server := httptest.NewServer(handler) From 9a66a92be827c3a646a07b3c038e082e2c4a7f47 Mon Sep 17 00:00:00 2001 From: dominicriordan Date: Thu, 16 Nov 2023 22:28:37 +0000 Subject: [PATCH 02/12] only retry for temporary errors --- pumps/http-retry.go | 29 ++++++++++++++++++++++------- pumps/splunk.go | 3 ++- 2 files changed, 24 insertions(+), 8 deletions(-) diff --git a/pumps/http-retry.go b/pumps/http-retry.go index 4ddda8ae9..79251ee6a 100644 --- a/pumps/http-retry.go +++ b/pumps/http-retry.go @@ -6,6 +6,7 @@ import ( "fmt" "io" "net/http" + "net/url" "time" "github.com/cenkalti/backoff/v4" @@ -28,10 +29,10 @@ func newBackoffRetry(errMsg string, maxRetries uint64, httpSend httpSender, logg } func (s *backoffRetry) send(ctx context.Context, data []byte) error { - fn := func() error { + opFn := func() error { resp, err := s.httpsend(ctx, data) if err != nil { - return err + return s.handleErr(err) } defer resp.Body.Close() @@ -39,19 +40,33 @@ func (s *backoffRetry) send(ctx context.Context, data []byte) error { return nil } - // server error or rate limit hit - backoff retry + body, _ := io.ReadAll(resp.Body) + err = fmt.Errorf("got status code %d and response '%s'", resp.StatusCode, body) + + // server error or rate limit hit - attempt retry if resp.StatusCode >= http.StatusInternalServerError || resp.StatusCode == http.StatusTooManyRequests { - body, _ := io.ReadAll(resp.Body) - return fmt.Errorf("error status code %d and response '%s'", resp.StatusCode, body) + return err } // any other error treat as permanent (i.e. auth error, invalid request) and don't retry - return backoff.Permanent(errPerm) + return backoff.Permanent(err) } - return backoff.RetryNotify(fn, backoff.WithMaxRetries(backoff.NewExponentialBackOff(), s.maxRetries), func(err error, t time.Duration) { + return backoff.RetryNotify(opFn, backoff.WithMaxRetries(backoff.NewExponentialBackOff(), s.maxRetries), func(err error, t time.Duration) { if err != nil { s.logger.WithError(err).Errorf("%s retrying in %s", s.errMsg, t) } }) } + +func (s *backoffRetry) handleErr(err error) error { + if e, ok := err.(*url.Error); ok { + if e.Temporary() { + // temp error, attempt retry + return err + } + // permanent error - don't retry + return backoff.Permanent(err) + } + return err +} diff --git a/pumps/splunk.go b/pumps/splunk.go index df810ff71..87894389b 100644 --- a/pumps/splunk.go +++ b/pumps/splunk.go @@ -128,8 +128,9 @@ func (p *SplunkPump) Init(config interface{}) error { sender := func(ctx context.Context, data []byte) (*http.Response, error) { reader := bytes.NewReader(data) - req, err := http.NewRequest("POST", p.config.CollectorURL, reader) + req, err := http.NewRequest(http.MethodPost, p.config.CollectorURL, reader) if err != nil { + // invalid req, don't retry return nil, backoff.Permanent(err) } req = req.WithContext(ctx) From 453607cd3e55ab6b774babb83c7589150057ef95 Mon Sep 17 00:00:00 2001 From: dominicriordan Date: Mon, 20 Nov 2023 08:38:37 +0000 Subject: [PATCH 03/12] move to separate pkg outside of pumps --- {pumps => http-retry}/http-retry.go | 29 ++++++++++------------ pumps/splunk.go | 37 ++++++++++++++++------------- 2 files changed, 32 insertions(+), 34 deletions(-) rename {pumps => http-retry}/http-retry.go (60%) diff --git a/pumps/http-retry.go b/http-retry/http-retry.go similarity index 60% rename from pumps/http-retry.go rename to http-retry/http-retry.go index 79251ee6a..d1d01220e 100644 --- a/pumps/http-retry.go +++ b/http-retry/http-retry.go @@ -1,8 +1,6 @@ -package pumps +package httpretry import ( - "context" - "errors" "fmt" "io" "net/http" @@ -13,24 +11,22 @@ import ( "github.com/sirupsen/logrus" ) -var errPerm = errors.New("bad request - not retrying") - -type httpSender func(ctx context.Context, data []byte) (*http.Response, error) - -type backoffRetry struct { +type BackoffHTTPRetry struct { errMsg string maxRetries uint64 logger *logrus.Entry - httpsend httpSender + httpclient *http.Client } -func newBackoffRetry(errMsg string, maxRetries uint64, httpSend httpSender, logger *logrus.Entry) *backoffRetry { - return &backoffRetry{errMsg: errMsg, maxRetries: maxRetries, httpsend: httpSend, logger: logger} +// NewBackoffRetry Creates an exponential backoff retry to use httpClient for connections. Will retry if a temporary error or +// 5xx or 429 status code in response. +func NewBackoffRetry(errMsg string, maxRetries uint64, httpClient *http.Client, logger *logrus.Entry) *BackoffHTTPRetry { + return &BackoffHTTPRetry{errMsg: errMsg, maxRetries: maxRetries, httpclient: httpClient, logger: logger} } -func (s *backoffRetry) send(ctx context.Context, data []byte) error { +func (s *BackoffHTTPRetry) Send(req *http.Request) error { opFn := func() error { - resp, err := s.httpsend(ctx, data) + resp, err := s.httpclient.Do(req) if err != nil { return s.handleErr(err) } @@ -53,13 +49,11 @@ func (s *backoffRetry) send(ctx context.Context, data []byte) error { } return backoff.RetryNotify(opFn, backoff.WithMaxRetries(backoff.NewExponentialBackOff(), s.maxRetries), func(err error, t time.Duration) { - if err != nil { - s.logger.WithError(err).Errorf("%s retrying in %s", s.errMsg, t) - } + s.logger.WithError(err).Errorf("%s retrying in %s", s.errMsg, t) }) } -func (s *backoffRetry) handleErr(err error) error { +func (s *BackoffHTTPRetry) handleErr(err error) error { if e, ok := err.(*url.Error); ok { if e.Temporary() { // temp error, attempt retry @@ -68,5 +62,6 @@ func (s *backoffRetry) handleErr(err error) error { // permanent error - don't retry return backoff.Permanent(err) } + // anything else - retry return err } diff --git a/pumps/splunk.go b/pumps/splunk.go index 87894389b..c19482006 100644 --- a/pumps/splunk.go +++ b/pumps/splunk.go @@ -6,13 +6,13 @@ import ( "crypto/tls" "encoding/json" "errors" - "github.com/cenkalti/backoff/v4" - "github.com/mitchellh/mapstructure" "net/http" "net/url" "strings" "github.com/TykTechnologies/tyk-pump/analytics" + retry "github.com/TykTechnologies/tyk-pump/http-retry" + "github.com/mitchellh/mapstructure" ) const ( @@ -36,7 +36,7 @@ type SplunkClient struct { CollectorURL string TLSSkipVerify bool httpClient *http.Client - retry *backoffRetry + retry *retry.BackoffHTTPRetry } // SplunkPump is a Tyk Pump driver for Splunk. @@ -126,19 +126,11 @@ func (p *SplunkPump) Init(config interface{}) error { p.config.BatchMaxContentLength = maxContentLength } - sender := func(ctx context.Context, data []byte) (*http.Response, error) { - reader := bytes.NewReader(data) - req, err := http.NewRequest(http.MethodPost, p.config.CollectorURL, reader) - if err != nil { - // invalid req, don't retry - return nil, backoff.Permanent(err) - } - req = req.WithContext(ctx) - req.Header.Add(authHeaderName, authHeaderPrefix+p.client.Token) - return p.client.httpClient.Do(req) + if p.config.MaxRetries > 0 { + p.log.Infof("%d max retries", p.config.MaxRetries) } - p.client.retry = newBackoffRetry("Failed writing data to Splunk", p.config.MaxRetries, sender, p.log) + p.client.retry = retry.NewBackoffRetry("Failed writing data to Splunk", p.config.MaxRetries, p.client.httpClient, p.log) p.log.Info(p.GetName() + " Initialized") return nil @@ -259,14 +251,14 @@ func (p *SplunkPump) WriteData(ctx context.Context, data []interface{}) error { if p.config.EnableBatch { //if we're batching and the len of our data is already bigger than max_content_length, we send the data and reset the buffer if batchBuffer.Len()+len(data) > p.config.BatchMaxContentLength { - if err := p.client.retry.send(ctx, batchBuffer.Bytes()); err != nil { + if err := p.send(ctx, batchBuffer.Bytes()); err != nil { return err } batchBuffer.Reset() } batchBuffer.Write(data) } else { - if err := p.client.retry.send(ctx, data); err != nil { + if err := p.send(ctx, data); err != nil { return err } } @@ -274,7 +266,7 @@ func (p *SplunkPump) WriteData(ctx context.Context, data []interface{}) error { //this if is for data remaining in the buffer when len(buffer) is lower than max_content_length if p.config.EnableBatch && batchBuffer.Len() > 0 { - if err := p.client.retry.send(ctx, batchBuffer.Bytes()); err != nil { + if err := p.send(ctx, batchBuffer.Bytes()); err != nil { return err } batchBuffer.Reset() @@ -316,3 +308,14 @@ func NewSplunkClient(token string, collectorURL string, skipVerify bool, certFil } return c, nil } + +func (p *SplunkPump) send(ctx context.Context, data []byte) error { + reader := bytes.NewReader(data) + req, err := http.NewRequest(http.MethodPost, p.config.CollectorURL, reader) + if err != nil { + return err + } + req = req.WithContext(ctx) + req.Header.Add(authHeaderName, authHeaderPrefix+p.client.Token) + return p.client.retry.Send(req) +} From e2fa0d6e7aa5e76cf99b8f17f62bcb9c2b84b7b8 Mon Sep 17 00:00:00 2001 From: dominicriordan Date: Thu, 23 Nov 2023 13:59:57 +0000 Subject: [PATCH 04/12] set correct collector url --- pumps/splunk.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pumps/splunk.go b/pumps/splunk.go index c19482006..f171433c1 100644 --- a/pumps/splunk.go +++ b/pumps/splunk.go @@ -311,7 +311,7 @@ func NewSplunkClient(token string, collectorURL string, skipVerify bool, certFil func (p *SplunkPump) send(ctx context.Context, data []byte) error { reader := bytes.NewReader(data) - req, err := http.NewRequest(http.MethodPost, p.config.CollectorURL, reader) + req, err := http.NewRequest(http.MethodPost, p.client.CollectorURL, reader) if err != nil { return err } From ab2da87ba6ba54dc9eb1576492ee89eae9320814 Mon Sep 17 00:00:00 2001 From: dominicriordan Date: Sat, 2 Dec 2023 11:17:21 +0000 Subject: [PATCH 05/12] ensure connection can be reused by reading and closing body stream --- http-retry/http-retry.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/http-retry/http-retry.go b/http-retry/http-retry.go index d1d01220e..9577b0fe8 100644 --- a/http-retry/http-retry.go +++ b/http-retry/http-retry.go @@ -30,7 +30,12 @@ func (s *BackoffHTTPRetry) Send(req *http.Request) error { if err != nil { return s.handleErr(err) } - defer resp.Body.Close() + defer func() { + // read all response and discard so http client can + // reuse connection as per doc on Response.Body + io.Copy(io.Discard, resp.Body) + resp.Body.Close() + }() if resp.StatusCode == http.StatusOK { return nil From f37a00ff3cb1811b454f92f94c7a61ca93531f84 Mon Sep 17 00:00:00 2001 From: dominicriordan Date: Wed, 27 Dec 2023 13:34:02 +0000 Subject: [PATCH 06/12] reset req body to fix error when req body has already been read from previous failed request --- http-retry/http-retry.go | 84 ++++++++++++++++++++++++++++++++++++---- pumps/splunk_test.go | 18 ++++++--- 2 files changed, 88 insertions(+), 14 deletions(-) diff --git a/http-retry/http-retry.go b/http-retry/http-retry.go index 9577b0fe8..72cc2d561 100644 --- a/http-retry/http-retry.go +++ b/http-retry/http-retry.go @@ -1,10 +1,14 @@ package httpretry import ( + "bytes" + "errors" "fmt" "io" + "net" "net/http" "net/url" + "strings" "time" "github.com/cenkalti/backoff/v4" @@ -18,6 +22,12 @@ type BackoffHTTPRetry struct { httpclient *http.Client } +type ( + conError interface{ ConnectionError() bool } + tempError interface{ Temporary() bool } + timeoutError interface{ Timeout() bool } +) + // NewBackoffRetry Creates an exponential backoff retry to use httpClient for connections. Will retry if a temporary error or // 5xx or 429 status code in response. func NewBackoffRetry(errMsg string, maxRetries uint64, httpClient *http.Client, logger *logrus.Entry) *BackoffHTTPRetry { @@ -25,7 +35,18 @@ func NewBackoffRetry(errMsg string, maxRetries uint64, httpClient *http.Client, } func (s *BackoffHTTPRetry) Send(req *http.Request) error { + var reqBody []byte + if req.Body != nil { + reqBody, _ = io.ReadAll(req.Body) + req.Body.Close() // closing the original body + } + opFn := func() error { + // recreating the request body from the buffer for each retry as if first attempt fails and + // a new conn is created (keep alive disabled on server for example) the req body has already been read, + // resulting in "http: ContentLength=X with Body length Y" error + req.Body = io.NopCloser(bytes.NewBuffer(reqBody)) + resp, err := s.httpclient.Do(req) if err != nil { return s.handleErr(err) @@ -59,14 +80,61 @@ func (s *BackoffHTTPRetry) Send(req *http.Request) error { } func (s *BackoffHTTPRetry) handleErr(err error) error { - if e, ok := err.(*url.Error); ok { - if e.Temporary() { - // temp error, attempt retry - return err + if isErrorRetryable(err) { + return err + } + // permanent error - don't retry + return backoff.Permanent(err) +} + +func isErrorRetryable(err error) bool { + if err == nil { + return false + } + + var ( + conErr conError + tempErr tempError + timeoutErr timeoutError + urlErr *url.Error + netOpErr *net.OpError + ) + + switch { + case errors.As(err, &conErr) && conErr.ConnectionError(): + return true + + case strings.Contains(err.Error(), "connection reset"): + return true + + case errors.As(err, &urlErr): + // Refused connections should be retried as the service may not yet be + // running on the port. Go TCP dial considers refused connections as + // not temporary. + if strings.Contains(urlErr.Error(), "connection refused") { + return true + } else { + return isErrorRetryable(errors.Unwrap(urlErr)) } - // permanent error - don't retry - return backoff.Permanent(err) + + case errors.As(err, &netOpErr): + // Network dial, or temporary network errors are always retryable. + if strings.EqualFold(netOpErr.Op, "dial") || netOpErr.Temporary() { + return true + } else { + return isErrorRetryable(errors.Unwrap(netOpErr)) + } + + case errors.As(err, &tempErr) && tempErr.Temporary(): + // Fallback to the generic temporary check, with temporary errors + // retryable. + return true + + case errors.As(err, &timeoutErr) && timeoutErr.Timeout(): + // Fallback to the generic timeout check, with timeout errors + // retryable. + return true } - // anything else - retry - return err + + return false } diff --git a/pumps/splunk_test.go b/pumps/splunk_test.go index a787c9166..01dba967b 100644 --- a/pumps/splunk_test.go +++ b/pumps/splunk_test.go @@ -35,11 +35,6 @@ type testHandler struct { func (h *testHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { h.reqCount++ - if h.returnErrorOnFirstReq && h.reqCount == 1 { - w.WriteHeader(http.StatusInternalServerError) - w.Write([]byte("splunk internal error")) - return - } authHeaderValue := r.Header.Get("authorization") if authHeaderValue == "" { @@ -56,6 +51,14 @@ func (h *testHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { if err != nil { h.test.Fatal("Couldn't ready body") } + r.Body.Close() + + if h.returnErrorOnFirstReq && h.reqCount == 1 { + w.WriteHeader(http.StatusInternalServerError) + w.Write([]byte("splunk internal error")) + return + } + status := splunkStatus{Text: "Success", Code: 0} if !h.batched { event := make(map[string]interface{}) @@ -89,7 +92,10 @@ func TestSplunkInit(t *testing.T) { func Test_SplunkBackoffRetry(t *testing.T) { handler := &testHandler{test: t, batched: false, returnErrorOnFirstReq: true} - server := httptest.NewServer(handler) + server := httptest.NewUnstartedServer(handler) + server.Config.SetKeepAlivesEnabled(false) + server.Start() + defer server.Close() pmp := SplunkPump{} From 249a1518572391294f0487e8b7f2af7c487a6d23 Mon Sep 17 00:00:00 2001 From: dominicriordan Date: Wed, 27 Dec 2023 16:42:15 +0000 Subject: [PATCH 07/12] tidy up, change retry log to warning as not yet error --- http-retry/http-retry.go | 13 +++---------- pumps/splunk_test.go | 4 ++-- 2 files changed, 5 insertions(+), 12 deletions(-) diff --git a/http-retry/http-retry.go b/http-retry/http-retry.go index 72cc2d561..a218f5d05 100644 --- a/http-retry/http-retry.go +++ b/http-retry/http-retry.go @@ -75,7 +75,7 @@ func (s *BackoffHTTPRetry) Send(req *http.Request) error { } return backoff.RetryNotify(opFn, backoff.WithMaxRetries(backoff.NewExponentialBackOff(), s.maxRetries), func(err error, t time.Duration) { - s.logger.WithError(err).Errorf("%s retrying in %s", s.errMsg, t) + s.logger.WithError(err).Warningf("%s retrying in %s", s.errMsg, t) }) } @@ -103,33 +103,26 @@ func isErrorRetryable(err error) bool { switch { case errors.As(err, &conErr) && conErr.ConnectionError(): return true - case strings.Contains(err.Error(), "connection reset"): return true - case errors.As(err, &urlErr): // Refused connections should be retried as the service may not yet be // running on the port. Go TCP dial considers refused connections as // not temporary. if strings.Contains(urlErr.Error(), "connection refused") { return true - } else { - return isErrorRetryable(errors.Unwrap(urlErr)) } - + return isErrorRetryable(errors.Unwrap(urlErr)) case errors.As(err, &netOpErr): // Network dial, or temporary network errors are always retryable. if strings.EqualFold(netOpErr.Op, "dial") || netOpErr.Temporary() { return true - } else { - return isErrorRetryable(errors.Unwrap(netOpErr)) } - + return isErrorRetryable(errors.Unwrap(netOpErr)) case errors.As(err, &tempErr) && tempErr.Temporary(): // Fallback to the generic temporary check, with temporary errors // retryable. return true - case errors.As(err, &timeoutErr) && timeoutErr.Timeout(): // Fallback to the generic timeout check, with timeout errors // retryable. diff --git a/pumps/splunk_test.go b/pumps/splunk_test.go index 01dba967b..dac797fcc 100644 --- a/pumps/splunk_test.go +++ b/pumps/splunk_test.go @@ -106,8 +106,8 @@ func Test_SplunkBackoffRetry(t *testing.T) { cfg["collector_url"] = server.URL cfg["ssl_insecure_skip_verify"] = true - if errInit := pmp.Init(cfg); errInit != nil { - t.Error("Error initializing pump") + if err := pmp.Init(cfg); err != nil { + t.Errorf("Error initializing pump %v", err) return } From bbd78591902405bbff0cd3cacf3700621635cb47 Mon Sep 17 00:00:00 2001 From: dominicriordan Date: Thu, 28 Dec 2023 10:39:56 +0000 Subject: [PATCH 08/12] add debugging for body bytes sent to splunk & RTT to aid in fine tuning batch size --- pumps/splunk.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pumps/splunk.go b/pumps/splunk.go index f171433c1..1c7c27218 100644 --- a/pumps/splunk.go +++ b/pumps/splunk.go @@ -317,5 +317,7 @@ func (p *SplunkPump) send(ctx context.Context, data []byte) error { } req = req.WithContext(ctx) req.Header.Add(authHeaderName, authHeaderPrefix+p.client.Token) + + p.log.Debugf("Sending %d bytes to splunk", len(data)) return p.client.retry.Send(req) } From e88d9132608c6e4a760254ed4ca1869e1752fa1e Mon Sep 17 00:00:00 2001 From: dominicriordan Date: Thu, 28 Dec 2023 15:42:17 +0000 Subject: [PATCH 09/12] PR requested changes --- http-retry/http-retry.go | 27 ++++++++++++++++++++++----- pumps/splunk_test.go | 5 ++++- 2 files changed, 26 insertions(+), 6 deletions(-) diff --git a/http-retry/http-retry.go b/http-retry/http-retry.go index a218f5d05..def03535e 100644 --- a/http-retry/http-retry.go +++ b/http-retry/http-retry.go @@ -16,10 +16,10 @@ import ( ) type BackoffHTTPRetry struct { - errMsg string - maxRetries uint64 logger *logrus.Entry httpclient *http.Client + errMsg string + maxRetries uint64 } type ( @@ -37,7 +37,12 @@ func NewBackoffRetry(errMsg string, maxRetries uint64, httpClient *http.Client, func (s *BackoffHTTPRetry) Send(req *http.Request) error { var reqBody []byte if req.Body != nil { - reqBody, _ = io.ReadAll(req.Body) + var err error + reqBody, err = io.ReadAll(req.Body) + if err != nil { + s.logger.WithError(err).Error("Failed to read req body") + return err + } req.Body.Close() // closing the original body } @@ -47,14 +52,20 @@ func (s *BackoffHTTPRetry) Send(req *http.Request) error { // resulting in "http: ContentLength=X with Body length Y" error req.Body = io.NopCloser(bytes.NewBuffer(reqBody)) + t := time.Now() resp, err := s.httpclient.Do(req) + s.logger.Debugf("Req %s took %s", req.URL, time.Now().Sub(t)) + if err != nil { return s.handleErr(err) } defer func() { // read all response and discard so http client can // reuse connection as per doc on Response.Body - io.Copy(io.Discard, resp.Body) + _, err := io.Copy(io.Discard, resp.Body) + if err != nil { + s.logger.WithError(err).Error("Failed to read and discard resp body") + } resp.Body.Close() }() @@ -62,7 +73,13 @@ func (s *BackoffHTTPRetry) Send(req *http.Request) error { return nil } - body, _ := io.ReadAll(resp.Body) + body, err := io.ReadAll(resp.Body) + if err != nil { + s.logger.WithError(err).Error("Failed to read resp body") + // attempt retry + return err + } + err = fmt.Errorf("got status code %d and response '%s'", resp.StatusCode, body) // server error or rate limit hit - attempt retry diff --git a/pumps/splunk_test.go b/pumps/splunk_test.go index dac797fcc..248773699 100644 --- a/pumps/splunk_test.go +++ b/pumps/splunk_test.go @@ -55,7 +55,10 @@ func (h *testHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { if h.returnErrorOnFirstReq && h.reqCount == 1 { w.WriteHeader(http.StatusInternalServerError) - w.Write([]byte("splunk internal error")) + _, err := w.Write([]byte("splunk internal error")) + if err != nil { + h.test.Fatalf("Failed to write response got error %v", err) + } return } From deb17c4b4b629e7993c01352ae633b74bb59522a Mon Sep 17 00:00:00 2001 From: dominicriordan Date: Thu, 28 Dec 2023 17:57:58 +0000 Subject: [PATCH 10/12] fix go linter issue --- http-retry/http-retry.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/http-retry/http-retry.go b/http-retry/http-retry.go index def03535e..3a6bdd3e7 100644 --- a/http-retry/http-retry.go +++ b/http-retry/http-retry.go @@ -54,7 +54,7 @@ func (s *BackoffHTTPRetry) Send(req *http.Request) error { t := time.Now() resp, err := s.httpclient.Do(req) - s.logger.Debugf("Req %s took %s", req.URL, time.Now().Sub(t)) + s.logger.Debugf("Req %s took %s", req.URL, time.Since(t)) if err != nil { return s.handleErr(err) From 7b08c79ac4cbe668b0af20a1bb2a80686e41b49d Mon Sep 17 00:00:00 2001 From: Matias <83959431+mativm02@users.noreply.github.com> Date: Fri, 29 Dec 2023 09:53:06 -0300 Subject: [PATCH 11/12] applying suggested changes --- README.md | 2 +- pumps/splunk_test.go | 136 +++++++++++++++++++++++++++++++++---------- 2 files changed, 106 insertions(+), 32 deletions(-) diff --git a/README.md b/README.md index 5dd2aee08..aac70ef3b 100644 --- a/README.md +++ b/README.md @@ -775,7 +775,7 @@ Setting up Splunk with a _HTTP Event Collector_ - `ignore_tag_prefix_list`: (optional) Choose which tags to be ignored by the Splunk Pump. Keep in mind that the tag name and value are hyphenated. Type: Type: String Array `[] string`. Default value is `[]` - `enable_batch`: If this is set to `true`, pump is going to send the analytics records in batch to Splunk. Type: Boolean. Default value is `false`. - `max_content_length`: Max content length in bytes to be sent in batch requests. It should match the `max_content_length` configured in Splunk. If the purged analytics records size don't reach the amount of bytes, they're send anyways in each `purge_loop`. Type: Integer. Default value is 838860800 (~ 800 MB), the same default value as Splunk config. -- `max_retries`: Max number of retries if failed to send requests to splunk HEC. Default value is `0`. +- `max_retries`: Max number of retries if failed to send requests to splunk HEC. Default value is `0` (no retries after failure). Connections, network, timeouts, temporary, too many requests and internal server errors are all considered retryable. ###### JSON / Conf File diff --git a/pumps/splunk_test.go b/pumps/splunk_test.go index 248773699..26c1c5ec8 100644 --- a/pumps/splunk_test.go +++ b/pumps/splunk_test.go @@ -26,11 +26,11 @@ type splunkStatus struct { Len int `json:"len"` } type testHandler struct { - test *testing.T - batched bool - returnErrorOnFirstReq bool - responses []splunkStatus - reqCount int + test *testing.T + batched bool + returnErrors int + responses []splunkStatus + reqCount int } func (h *testHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { @@ -53,7 +53,8 @@ func (h *testHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } r.Body.Close() - if h.returnErrorOnFirstReq && h.reqCount == 1 { + if h.returnErrors >= h.reqCount { + fmt.Println("returning err.......") w.WriteHeader(http.StatusInternalServerError) _, err := w.Write([]byte("splunk internal error")) if err != nil { @@ -94,41 +95,114 @@ func TestSplunkInit(t *testing.T) { } func Test_SplunkBackoffRetry(t *testing.T) { - handler := &testHandler{test: t, batched: false, returnErrorOnFirstReq: true} - server := httptest.NewUnstartedServer(handler) - server.Config.SetKeepAlivesEnabled(false) - server.Start() + go t.Run("max_retries=1", func(t *testing.T) { + handler := &testHandler{test: t, batched: false, returnErrors: 1} + server := httptest.NewUnstartedServer(handler) + server.Config.SetKeepAlivesEnabled(false) + server.Start() + + defer server.Close() + + pmp := SplunkPump{} + cfg := make(map[string]interface{}) + cfg["collector_token"] = testToken + cfg["max_retries"] = 1 + cfg["collector_url"] = server.URL + cfg["ssl_insecure_skip_verify"] = true + + if err := pmp.Init(cfg); err != nil { + t.Errorf("Error initializing pump %v", err) + return + } - defer server.Close() + keys := make([]interface{}, 1) - pmp := SplunkPump{} + keys[0] = analytics.AnalyticsRecord{OrgID: "1", APIID: "123", Path: "/test-path", Method: "POST", TimeStamp: time.Now()} - cfg := make(map[string]interface{}) - cfg["collector_token"] = testToken - cfg["max_retries"] = 1 - cfg["collector_url"] = server.URL - cfg["ssl_insecure_skip_verify"] = true + if errWrite := pmp.WriteData(context.TODO(), keys); errWrite != nil { + t.Error("Error writing to splunk pump:", errWrite.Error()) + return + } - if err := pmp.Init(cfg); err != nil { - t.Errorf("Error initializing pump %v", err) - return - } + assert.Equal(t, 1, len(handler.responses)) + assert.Equal(t, 2, handler.reqCount) - keys := make([]interface{}, 1) + response := handler.responses[0] - keys[0] = analytics.AnalyticsRecord{OrgID: "1", APIID: "123", Path: "/test-path", Method: "POST", TimeStamp: time.Now()} + assert.Equal(t, "Success", response.Text) + assert.Equal(t, int32(0), response.Code) + }) - if errWrite := pmp.WriteData(context.TODO(), keys); errWrite != nil { - t.Error("Error writing to splunk pump:", errWrite.Error()) - return - } + t.Run("max_retries=0", func(t *testing.T) { + handler := &testHandler{test: t, batched: false, returnErrors: 1} + server := httptest.NewUnstartedServer(handler) + server.Config.SetKeepAlivesEnabled(false) + server.Start() - assert.Equal(t, 1, len(handler.responses)) + defer server.Close() - response := handler.responses[0] + pmp := SplunkPump{} + cfg := make(map[string]interface{}) + cfg["collector_token"] = testToken + cfg["max_retries"] = 0 + cfg["collector_url"] = server.URL + cfg["ssl_insecure_skip_verify"] = true + + if err := pmp.Init(cfg); err != nil { + t.Errorf("Error initializing pump %v", err) + return + } + + keys := make([]interface{}, 1) + + keys[0] = analytics.AnalyticsRecord{OrgID: "1", APIID: "123", Path: "/test-path", Method: "POST", TimeStamp: time.Now()} + + if errWrite := pmp.WriteData(context.TODO(), keys); errWrite == nil { + t.Error("Error expected writing to splunk pump, got nil") + return + } + + assert.Equal(t, 1, handler.reqCount) + }) + + t.Run("max_retries=3", func(t *testing.T) { + handler := &testHandler{test: t, batched: false, returnErrors: 2} + server := httptest.NewUnstartedServer(handler) + server.Config.SetKeepAlivesEnabled(false) + server.Start() + + defer server.Close() + + pmp := SplunkPump{} + cfg := make(map[string]interface{}) + cfg["collector_token"] = testToken + cfg["max_retries"] = 3 + cfg["collector_url"] = server.URL + cfg["ssl_insecure_skip_verify"] = true + + if err := pmp.Init(cfg); err != nil { + t.Errorf("Error initializing pump %v", err) + return + } + + keys := make([]interface{}, 1) + + keys[0] = analytics.AnalyticsRecord{OrgID: "1", APIID: "123", Path: "/test-path", Method: "POST", TimeStamp: time.Now()} + + if errWrite := pmp.WriteData(context.TODO(), keys); errWrite != nil { + t.Error("Error writing to splunk pump:", errWrite.Error()) + return + } + + assert.Equal(t, 1, len(handler.responses)) + assert.Equal(t, 3, handler.reqCount) + + response := handler.responses[0] + + assert.Equal(t, "Success", response.Text) + assert.Equal(t, int32(0), response.Code) + }) - assert.Equal(t, "Success", response.Text) - assert.Equal(t, int32(0), response.Code) } func Test_SplunkWriteData(t *testing.T) { From ee8bfcd047ed7cfa2ac27278b59bcd5734fc0a8d Mon Sep 17 00:00:00 2001 From: Matias <83959431+mativm02@users.noreply.github.com> Date: Fri, 29 Dec 2023 10:04:13 -0300 Subject: [PATCH 12/12] linting --- pumps/splunk_test.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/pumps/splunk_test.go b/pumps/splunk_test.go index 26c1c5ec8..81a5ebbc7 100644 --- a/pumps/splunk_test.go +++ b/pumps/splunk_test.go @@ -202,7 +202,6 @@ func Test_SplunkBackoffRetry(t *testing.T) { assert.Equal(t, "Success", response.Text) assert.Equal(t, int32(0), response.Code) }) - } func Test_SplunkWriteData(t *testing.T) { @@ -275,7 +274,6 @@ func Test_SplunkWriteDataBatch(t *testing.T) { assert.Equal(t, getEventBytes(keys[:2]), handler.responses[0].Len) assert.Equal(t, getEventBytes(keys[2:]), handler.responses[1].Len) - } // getEventBytes returns the bytes amount of the marshalled events struct