diff --git a/README.md b/README.md index 1aa196d65..6ca65fa97 100644 --- a/README.md +++ b/README.md @@ -775,6 +775,7 @@ Setting up Splunk with a _HTTP Event Collector_ - `ignore_tag_prefix_list`: (optional) Choose which tags to be ignored by the Splunk Pump. Keep in mind that the tag name and value are hyphenated. Type: Type: String Array `[] string`. Default value is `[]` - `enable_batch`: If this is set to `true`, pump is going to send the analytics records in batch to Splunk. Type: Boolean. Default value is `false`. - `max_content_length`: Max content length in bytes to be sent in batch requests. It should match the `max_content_length` configured in Splunk. If the purged analytics records size don't reach the amount of bytes, they're send anyways in each `purge_loop`. Type: Integer. Default value is 838860800 (~ 800 MB), the same default value as Splunk config. +- `max_retries`: Max number of retries if failed to send requests to splunk HEC. Default value is `0`. ###### JSON / Conf File @@ -791,6 +792,7 @@ Setting up Splunk with a _HTTP Event Collector_ "obfuscate_api_keys": true, "obfuscate_api_keys_length": 10, "enable_batch":true, + "max_retries": 2, "fields": [ "method", "host", diff --git a/pumps/http-retry.go b/pumps/http-retry.go new file mode 100644 index 000000000..4ddda8ae9 --- /dev/null +++ b/pumps/http-retry.go @@ -0,0 +1,57 @@ +package pumps + +import ( + "context" + "errors" + "fmt" + "io" + "net/http" + "time" + + "github.com/cenkalti/backoff/v4" + "github.com/sirupsen/logrus" +) + +var errPerm = errors.New("bad request - not retrying") + +type httpSender func(ctx context.Context, data []byte) (*http.Response, error) + +type backoffRetry struct { + errMsg string + maxRetries uint64 + logger *logrus.Entry + httpsend httpSender +} + +func newBackoffRetry(errMsg string, maxRetries uint64, httpSend httpSender, logger *logrus.Entry) *backoffRetry { + return &backoffRetry{errMsg: errMsg, maxRetries: maxRetries, httpsend: httpSend, logger: logger} +} + +func (s *backoffRetry) send(ctx context.Context, data []byte) error { + fn := func() error { + resp, err := s.httpsend(ctx, data) + if err != nil { + return err + } + defer resp.Body.Close() + + if resp.StatusCode == http.StatusOK { + return nil + } + + // server error or rate limit hit - backoff retry + if resp.StatusCode >= http.StatusInternalServerError || resp.StatusCode == http.StatusTooManyRequests { + body, _ := io.ReadAll(resp.Body) + return fmt.Errorf("error status code %d and response '%s'", resp.StatusCode, body) + } + + // any other error treat as permanent (i.e. auth error, invalid request) and don't retry + return backoff.Permanent(errPerm) + } + + return backoff.RetryNotify(fn, backoff.WithMaxRetries(backoff.NewExponentialBackOff(), s.maxRetries), func(err error, t time.Duration) { + if err != nil { + s.logger.WithError(err).Errorf("%s retrying in %s", s.errMsg, t) + } + }) +} diff --git a/pumps/splunk.go b/pumps/splunk.go index 993f82c87..df810ff71 100644 --- a/pumps/splunk.go +++ b/pumps/splunk.go @@ -6,12 +6,12 @@ import ( "crypto/tls" "encoding/json" "errors" + "github.com/cenkalti/backoff/v4" + "github.com/mitchellh/mapstructure" "net/http" "net/url" "strings" - "github.com/mitchellh/mapstructure" - "github.com/TykTechnologies/tyk-pump/analytics" ) @@ -35,8 +35,8 @@ type SplunkClient struct { Token string CollectorURL string TLSSkipVerify bool - - httpClient *http.Client + httpClient *http.Client + retry *backoffRetry } // SplunkPump is a Tyk Pump driver for Splunk. @@ -85,6 +85,8 @@ type SplunkPumpConfig struct { // the amount of bytes, they're send anyways in each `purge_loop`. Default value is 838860800 // (~ 800 MB), the same default value as Splunk config. BatchMaxContentLength int `json:"batch_max_content_length" mapstructure:"batch_max_content_length"` + // MaxRetries the maximum amount of retries if failed to send requests to splunk HEC. Default value is `0` + MaxRetries uint64 `json:"max_retries" mapstructure:"max_retries"` } // New initializes a new pump. @@ -124,6 +126,18 @@ func (p *SplunkPump) Init(config interface{}) error { p.config.BatchMaxContentLength = maxContentLength } + sender := func(ctx context.Context, data []byte) (*http.Response, error) { + reader := bytes.NewReader(data) + req, err := http.NewRequest("POST", p.config.CollectorURL, reader) + if err != nil { + return nil, backoff.Permanent(err) + } + req = req.WithContext(ctx) + req.Header.Add(authHeaderName, authHeaderPrefix+p.client.Token) + return p.client.httpClient.Do(req) + } + + p.client.retry = newBackoffRetry("Failed writing data to Splunk", p.config.MaxRetries, sender, p.log) p.log.Info(p.GetName() + " Initialized") return nil @@ -153,15 +167,6 @@ func (p *SplunkPump) WriteData(ctx context.Context, data []interface{}) error { var batchBuffer bytes.Buffer - fnSendBytes := func(data []byte) error { - _, errSend := p.client.Send(ctx, data) - if errSend != nil { - p.log.Error("Error writing data to Splunk ", errSend) - return errSend - } - return nil - } - for _, v := range data { decoded := v.(analytics.AnalyticsRecord) apiKey := decoded.APIKey @@ -253,14 +258,14 @@ func (p *SplunkPump) WriteData(ctx context.Context, data []interface{}) error { if p.config.EnableBatch { //if we're batching and the len of our data is already bigger than max_content_length, we send the data and reset the buffer if batchBuffer.Len()+len(data) > p.config.BatchMaxContentLength { - if err := fnSendBytes(batchBuffer.Bytes()); err != nil { + if err := p.client.retry.send(ctx, batchBuffer.Bytes()); err != nil { return err } batchBuffer.Reset() } batchBuffer.Write(data) } else { - if err := fnSendBytes(data); err != nil { + if err := p.client.retry.send(ctx, data); err != nil { return err } } @@ -268,7 +273,7 @@ func (p *SplunkPump) WriteData(ctx context.Context, data []interface{}) error { //this if is for data remaining in the buffer when len(buffer) is lower than max_content_length if p.config.EnableBatch && batchBuffer.Len() > 0 { - if err := fnSendBytes(batchBuffer.Bytes()); err != nil { + if err := p.client.retry.send(ctx, batchBuffer.Bytes()); err != nil { return err } batchBuffer.Reset() @@ -310,16 +315,3 @@ func NewSplunkClient(token string, collectorURL string, skipVerify bool, certFil } return c, nil } - -// Send sends an event to the Splunk HTTP Event Collector interface. -func (c *SplunkClient) Send(ctx context.Context, data []byte) (*http.Response, error) { - - reader := bytes.NewReader(data) - req, err := http.NewRequest("POST", c.CollectorURL, reader) - if err != nil { - return nil, err - } - req = req.WithContext(ctx) - req.Header.Add(authHeaderName, authHeaderPrefix+c.Token) - return c.httpClient.Do(req) -} diff --git a/pumps/splunk_test.go b/pumps/splunk_test.go index 22aeb000e..a787c9166 100644 --- a/pumps/splunk_test.go +++ b/pumps/splunk_test.go @@ -26,13 +26,21 @@ type splunkStatus struct { Len int `json:"len"` } type testHandler struct { - test *testing.T - batched bool - - responses []splunkStatus + test *testing.T + batched bool + returnErrorOnFirstReq bool + responses []splunkStatus + reqCount int } func (h *testHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + h.reqCount++ + if h.returnErrorOnFirstReq && h.reqCount == 1 { + w.WriteHeader(http.StatusInternalServerError) + w.Write([]byte("splunk internal error")) + return + } + authHeaderValue := r.Header.Get("authorization") if authHeaderValue == "" { h.test.Fatal("Auth header is empty") @@ -79,6 +87,41 @@ func TestSplunkInit(t *testing.T) { } } +func Test_SplunkBackoffRetry(t *testing.T) { + handler := &testHandler{test: t, batched: false, returnErrorOnFirstReq: true} + server := httptest.NewServer(handler) + defer server.Close() + + pmp := SplunkPump{} + + cfg := make(map[string]interface{}) + cfg["collector_token"] = testToken + cfg["max_retries"] = 1 + cfg["collector_url"] = server.URL + cfg["ssl_insecure_skip_verify"] = true + + if errInit := pmp.Init(cfg); errInit != nil { + t.Error("Error initializing pump") + return + } + + keys := make([]interface{}, 1) + + keys[0] = analytics.AnalyticsRecord{OrgID: "1", APIID: "123", Path: "/test-path", Method: "POST", TimeStamp: time.Now()} + + if errWrite := pmp.WriteData(context.TODO(), keys); errWrite != nil { + t.Error("Error writing to splunk pump:", errWrite.Error()) + return + } + + assert.Equal(t, 1, len(handler.responses)) + + response := handler.responses[0] + + assert.Equal(t, "Success", response.Text) + assert.Equal(t, int32(0), response.Code) +} + func Test_SplunkWriteData(t *testing.T) { handler := &testHandler{test: t, batched: false} server := httptest.NewServer(handler) @@ -112,6 +155,7 @@ func Test_SplunkWriteData(t *testing.T) { assert.Equal(t, "Success", response.Text) assert.Equal(t, int32(0), response.Code) } + func Test_SplunkWriteDataBatch(t *testing.T) { handler := &testHandler{test: t, batched: true} server := httptest.NewServer(handler)