Skip to content

Commit

Permalink
splunk backoff retry mechanism to reduce chance of losing logs
Browse files Browse the repository at this point in the history
  • Loading branch information
dominicriordan committed Nov 16, 2023
1 parent d7dfaa4 commit 7b600b6
Show file tree
Hide file tree
Showing 4 changed files with 128 additions and 33 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -775,6 +775,7 @@ Setting up Splunk with a _HTTP Event Collector_
- `ignore_tag_prefix_list`: (optional) Choose which tags to be ignored by the Splunk Pump. Keep in mind that the tag name and value are hyphenated. Type: Type: String Array `[] string`. Default value is `[]`
- `enable_batch`: If this is set to `true`, pump is going to send the analytics records in batch to Splunk. Type: Boolean. Default value is `false`.
- `max_content_length`: Max content length in bytes to be sent in batch requests. It should match the `max_content_length` configured in Splunk. If the purged analytics records size don't reach the amount of bytes, they're send anyways in each `purge_loop`. Type: Integer. Default value is 838860800 (~ 800 MB), the same default value as Splunk config.
- `max_retries`: Max number of retries if failed to send requests to splunk HEC. Default value is `0`.

###### JSON / Conf File

Expand All @@ -791,6 +792,7 @@ Setting up Splunk with a _HTTP Event Collector_
"obfuscate_api_keys": true,
"obfuscate_api_keys_length": 10,
"enable_batch":true,
"max_retries": 2,
"fields": [
"method",
"host",
Expand Down
57 changes: 57 additions & 0 deletions pumps/http-retry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package pumps

import (
"context"
"errors"
"fmt"
"io"
"net/http"
"time"

"github.com/cenkalti/backoff/v4"
"github.com/sirupsen/logrus"
)

var errPerm = errors.New("bad request - not retrying")

type httpSender func(ctx context.Context, data []byte) (*http.Response, error)

type backoffRetry struct {
errMsg string
maxRetries uint64
logger *logrus.Entry
httpsend httpSender
}

func newBackoffRetry(errMsg string, maxRetries uint64, httpSend httpSender, logger *logrus.Entry) *backoffRetry {
return &backoffRetry{errMsg: errMsg, maxRetries: maxRetries, httpsend: httpSend, logger: logger}
}

func (s *backoffRetry) send(ctx context.Context, data []byte) error {
fn := func() error {
resp, err := s.httpsend(ctx, data)
if err != nil {
return err
}
defer resp.Body.Close()

if resp.StatusCode == http.StatusOK {
return nil
}

// server error or rate limit hit - backoff retry
if resp.StatusCode >= http.StatusInternalServerError || resp.StatusCode == http.StatusTooManyRequests {
body, _ := io.ReadAll(resp.Body)
return fmt.Errorf("error status code %d and response '%s'", resp.StatusCode, body)
}

// any other error treat as permanent (i.e. auth error, invalid request) and don't retry
return backoff.Permanent(errPerm)
}

return backoff.RetryNotify(fn, backoff.WithMaxRetries(backoff.NewExponentialBackOff(), s.maxRetries), func(err error, t time.Duration) {
if err != nil {
s.logger.WithError(err).Errorf("%s retrying in %s", s.errMsg, t)
}
})
}
50 changes: 21 additions & 29 deletions pumps/splunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ import (
"crypto/tls"
"encoding/json"
"errors"
"github.com/cenkalti/backoff/v4"
"github.com/mitchellh/mapstructure"
"net/http"
"net/url"
"strings"

"github.com/mitchellh/mapstructure"

"github.com/TykTechnologies/tyk-pump/analytics"
)

Expand All @@ -35,8 +35,8 @@ type SplunkClient struct {
Token string
CollectorURL string
TLSSkipVerify bool

httpClient *http.Client
httpClient *http.Client
retry *backoffRetry
}

// SplunkPump is a Tyk Pump driver for Splunk.
Expand Down Expand Up @@ -85,6 +85,8 @@ type SplunkPumpConfig struct {
// the amount of bytes, they're send anyways in each `purge_loop`. Default value is 838860800
// (~ 800 MB), the same default value as Splunk config.
BatchMaxContentLength int `json:"batch_max_content_length" mapstructure:"batch_max_content_length"`
// MaxRetries the maximum amount of retries if failed to send requests to splunk HEC. Default value is `0`
MaxRetries uint64 `json:"max_retries" mapstructure:"max_retries"`
}

// New initializes a new pump.
Expand Down Expand Up @@ -124,6 +126,18 @@ func (p *SplunkPump) Init(config interface{}) error {
p.config.BatchMaxContentLength = maxContentLength
}

sender := func(ctx context.Context, data []byte) (*http.Response, error) {
reader := bytes.NewReader(data)
req, err := http.NewRequest("POST", p.config.CollectorURL, reader)
if err != nil {
return nil, backoff.Permanent(err)
}
req = req.WithContext(ctx)
req.Header.Add(authHeaderName, authHeaderPrefix+p.client.Token)
return p.client.httpClient.Do(req)
}

p.client.retry = newBackoffRetry("Failed writing data to Splunk", p.config.MaxRetries, sender, p.log)
p.log.Info(p.GetName() + " Initialized")

return nil
Expand Down Expand Up @@ -153,15 +167,6 @@ func (p *SplunkPump) WriteData(ctx context.Context, data []interface{}) error {

var batchBuffer bytes.Buffer

fnSendBytes := func(data []byte) error {
_, errSend := p.client.Send(ctx, data)
if errSend != nil {
p.log.Error("Error writing data to Splunk ", errSend)
return errSend
}
return nil
}

for _, v := range data {
decoded := v.(analytics.AnalyticsRecord)
apiKey := decoded.APIKey
Expand Down Expand Up @@ -253,22 +258,22 @@ func (p *SplunkPump) WriteData(ctx context.Context, data []interface{}) error {
if p.config.EnableBatch {
//if we're batching and the len of our data is already bigger than max_content_length, we send the data and reset the buffer
if batchBuffer.Len()+len(data) > p.config.BatchMaxContentLength {
if err := fnSendBytes(batchBuffer.Bytes()); err != nil {
if err := p.client.retry.send(ctx, batchBuffer.Bytes()); err != nil {
return err
}
batchBuffer.Reset()
}
batchBuffer.Write(data)
} else {
if err := fnSendBytes(data); err != nil {
if err := p.client.retry.send(ctx, data); err != nil {
return err
}
}
}

//this if is for data remaining in the buffer when len(buffer) is lower than max_content_length
if p.config.EnableBatch && batchBuffer.Len() > 0 {
if err := fnSendBytes(batchBuffer.Bytes()); err != nil {
if err := p.client.retry.send(ctx, batchBuffer.Bytes()); err != nil {
return err
}
batchBuffer.Reset()
Expand Down Expand Up @@ -310,16 +315,3 @@ func NewSplunkClient(token string, collectorURL string, skipVerify bool, certFil
}
return c, nil
}

// Send sends an event to the Splunk HTTP Event Collector interface.
func (c *SplunkClient) Send(ctx context.Context, data []byte) (*http.Response, error) {

reader := bytes.NewReader(data)
req, err := http.NewRequest("POST", c.CollectorURL, reader)
if err != nil {
return nil, err
}
req = req.WithContext(ctx)
req.Header.Add(authHeaderName, authHeaderPrefix+c.Token)
return c.httpClient.Do(req)
}
52 changes: 48 additions & 4 deletions pumps/splunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,21 @@ type splunkStatus struct {
Len int `json:"len"`
}
type testHandler struct {
test *testing.T
batched bool

responses []splunkStatus
test *testing.T
batched bool
returnErrorOnFirstReq bool
responses []splunkStatus
reqCount int
}

func (h *testHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
h.reqCount++
if h.returnErrorOnFirstReq && h.reqCount == 1 {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte("splunk internal error"))
return
}

authHeaderValue := r.Header.Get("authorization")
if authHeaderValue == "" {
h.test.Fatal("Auth header is empty")
Expand Down Expand Up @@ -79,6 +87,41 @@ func TestSplunkInit(t *testing.T) {
}
}

func Test_SplunkBackoffRetry(t *testing.T) {
handler := &testHandler{test: t, batched: false, returnErrorOnFirstReq: true}
server := httptest.NewServer(handler)
defer server.Close()

pmp := SplunkPump{}

cfg := make(map[string]interface{})
cfg["collector_token"] = testToken
cfg["max_retries"] = 1
cfg["collector_url"] = server.URL
cfg["ssl_insecure_skip_verify"] = true

if errInit := pmp.Init(cfg); errInit != nil {
t.Error("Error initializing pump")
return
}

keys := make([]interface{}, 1)

keys[0] = analytics.AnalyticsRecord{OrgID: "1", APIID: "123", Path: "/test-path", Method: "POST", TimeStamp: time.Now()}

if errWrite := pmp.WriteData(context.TODO(), keys); errWrite != nil {
t.Error("Error writing to splunk pump:", errWrite.Error())
return
}

assert.Equal(t, 1, len(handler.responses))

response := handler.responses[0]

assert.Equal(t, "Success", response.Text)
assert.Equal(t, int32(0), response.Code)
}

func Test_SplunkWriteData(t *testing.T) {
handler := &testHandler{test: t, batched: false}
server := httptest.NewServer(handler)
Expand Down Expand Up @@ -112,6 +155,7 @@ func Test_SplunkWriteData(t *testing.T) {
assert.Equal(t, "Success", response.Text)
assert.Equal(t, int32(0), response.Code)
}

func Test_SplunkWriteDataBatch(t *testing.T) {
handler := &testHandler{test: t, batched: true}
server := httptest.NewServer(handler)
Expand Down

0 comments on commit 7b600b6

Please sign in to comment.