Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TT-10564 Splunk backoff retry #758

Merged
merged 22 commits into from
Jan 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
7b600b6
splunk backoff retry mechanism to reduce chance of losing logs
Nov 16, 2023
9a66a92
only retry for temporary errors
Nov 16, 2023
453607c
move to separate pkg outside of pumps
Nov 20, 2023
1c12e46
Merge branch 'master' into splunk-retry
domsolutions Nov 23, 2023
e2fa0d6
set correct collector url
Nov 23, 2023
740798e
Merge branch 'splunk-retry' of github.com:domsolutions/tyk-pump into …
Nov 23, 2023
5412712
Merge branch 'master' into splunk-retry
domsolutions Nov 26, 2023
ab2da87
ensure connection can be reused by reading and closing body stream
Dec 2, 2023
99d9285
Merge branch 'splunk-retry' of github.com:domsolutions/tyk-pump into …
Dec 2, 2023
4761a3a
Merge branch 'master' into splunk-retry
domsolutions Dec 4, 2023
5de8341
Merge branch 'master' into splunk-retry
domsolutions Dec 8, 2023
f37a00f
reset req body to fix error when req body has already been read from …
Dec 27, 2023
3bf653b
Merge branch 'splunk-retry' of github.com:domsolutions/tyk-pump into …
Dec 27, 2023
995a659
Merge branch 'master' into splunk-retry
domsolutions Dec 27, 2023
249a151
tidy up, change retry log to warning as not yet error
Dec 27, 2023
f0a1c76
Merge branch 'splunk-retry' of github.com:domsolutions/tyk-pump into …
Dec 27, 2023
bbd7859
add debugging for body bytes sent to splunk & RTT to aid in fine tuni…
Dec 28, 2023
e88d913
PR requested changes
Dec 28, 2023
deb17c4
fix go linter issue
Dec 28, 2023
7b08c79
applying suggested changes
mativm02 Dec 29, 2023
ee8bfcd
linting
mativm02 Dec 29, 2023
444ddde
Merge branch 'master' into domsolutions-splunk-retry
mativm02 Jan 4, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -775,6 +775,7 @@ Setting up Splunk with a _HTTP Event Collector_
- `ignore_tag_prefix_list`: (optional) Choose which tags to be ignored by the Splunk Pump. Keep in mind that the tag name and value are hyphenated. Type: Type: String Array `[] string`. Default value is `[]`
- `enable_batch`: If this is set to `true`, pump is going to send the analytics records in batch to Splunk. Type: Boolean. Default value is `false`.
- `max_content_length`: Max content length in bytes to be sent in batch requests. It should match the `max_content_length` configured in Splunk. If the purged analytics records size don't reach the amount of bytes, they're send anyways in each `purge_loop`. Type: Integer. Default value is 838860800 (~ 800 MB), the same default value as Splunk config.
- `max_retries`: Max number of retries if failed to send requests to splunk HEC. Default value is `0` (no retries after failure). Connections, network, timeouts, temporary, too many requests and internal server errors are all considered retryable.

###### JSON / Conf File

Expand All @@ -791,6 +792,7 @@ Setting up Splunk with a _HTTP Event Collector_
"obfuscate_api_keys": true,
"obfuscate_api_keys_length": 10,
"enable_batch":true,
"max_retries": 2,
"fields": [
"method",
"host",
Expand Down
150 changes: 150 additions & 0 deletions http-retry/http-retry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
package httpretry

import (
"bytes"
"errors"
"fmt"
"io"
"net"
"net/http"
"net/url"
"strings"
"time"

"github.com/cenkalti/backoff/v4"
"github.com/sirupsen/logrus"
)

type BackoffHTTPRetry struct {
logger *logrus.Entry
httpclient *http.Client
errMsg string
maxRetries uint64
}

type (
conError interface{ ConnectionError() bool }
tempError interface{ Temporary() bool }
timeoutError interface{ Timeout() bool }
)

// NewBackoffRetry Creates an exponential backoff retry to use httpClient for connections. Will retry if a temporary error or
// 5xx or 429 status code in response.
func NewBackoffRetry(errMsg string, maxRetries uint64, httpClient *http.Client, logger *logrus.Entry) *BackoffHTTPRetry {
return &BackoffHTTPRetry{errMsg: errMsg, maxRetries: maxRetries, httpclient: httpClient, logger: logger}
}

func (s *BackoffHTTPRetry) Send(req *http.Request) error {
var reqBody []byte
if req.Body != nil {
var err error
reqBody, err = io.ReadAll(req.Body)
if err != nil {
s.logger.WithError(err).Error("Failed to read req body")
return err
}
req.Body.Close() // closing the original body
}

opFn := func() error {
// recreating the request body from the buffer for each retry as if first attempt fails and
// a new conn is created (keep alive disabled on server for example) the req body has already been read,
// resulting in "http: ContentLength=X with Body length Y" error
req.Body = io.NopCloser(bytes.NewBuffer(reqBody))

t := time.Now()
resp, err := s.httpclient.Do(req)
s.logger.Debugf("Req %s took %s", req.URL, time.Since(t))

if err != nil {
return s.handleErr(err)
}
defer func() {
// read all response and discard so http client can
// reuse connection as per doc on Response.Body
_, err := io.Copy(io.Discard, resp.Body)
if err != nil {
s.logger.WithError(err).Error("Failed to read and discard resp body")
}
resp.Body.Close()
}()

if resp.StatusCode == http.StatusOK {
return nil
}

body, err := io.ReadAll(resp.Body)
if err != nil {
s.logger.WithError(err).Error("Failed to read resp body")
// attempt retry
return err
}

err = fmt.Errorf("got status code %d and response '%s'", resp.StatusCode, body)

// server error or rate limit hit - attempt retry
if resp.StatusCode >= http.StatusInternalServerError || resp.StatusCode == http.StatusTooManyRequests {
return err
}

// any other error treat as permanent (i.e. auth error, invalid request) and don't retry
return backoff.Permanent(err)
}

return backoff.RetryNotify(opFn, backoff.WithMaxRetries(backoff.NewExponentialBackOff(), s.maxRetries), func(err error, t time.Duration) {
s.logger.WithError(err).Warningf("%s retrying in %s", s.errMsg, t)
})
}

func (s *BackoffHTTPRetry) handleErr(err error) error {
if isErrorRetryable(err) {
return err
}
// permanent error - don't retry
return backoff.Permanent(err)
}

func isErrorRetryable(err error) bool {
if err == nil {
return false
}

var (
conErr conError
tempErr tempError
timeoutErr timeoutError
urlErr *url.Error
netOpErr *net.OpError
)

switch {
case errors.As(err, &conErr) && conErr.ConnectionError():
return true
case strings.Contains(err.Error(), "connection reset"):
return true
case errors.As(err, &urlErr):
// Refused connections should be retried as the service may not yet be
// running on the port. Go TCP dial considers refused connections as
// not temporary.
if strings.Contains(urlErr.Error(), "connection refused") {
return true
}
return isErrorRetryable(errors.Unwrap(urlErr))
case errors.As(err, &netOpErr):
// Network dial, or temporary network errors are always retryable.
if strings.EqualFold(netOpErr.Op, "dial") || netOpErr.Temporary() {
return true
}
return isErrorRetryable(errors.Unwrap(netOpErr))
case errors.As(err, &tempErr) && tempErr.Temporary():
// Fallback to the generic temporary check, with temporary errors
// retryable.
return true
case errors.As(err, &timeoutErr) && timeoutErr.Timeout():
// Fallback to the generic timeout check, with timeout errors
// retryable.
return true
}

return false
}
44 changes: 21 additions & 23 deletions pumps/splunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ import (
"net/url"
"strings"

"github.com/mitchellh/mapstructure"

"github.com/TykTechnologies/tyk-pump/analytics"
retry "github.com/TykTechnologies/tyk-pump/http-retry"
"github.com/mitchellh/mapstructure"
)

const (
Expand All @@ -35,8 +35,8 @@ type SplunkClient struct {
Token string
CollectorURL string
TLSSkipVerify bool

httpClient *http.Client
httpClient *http.Client
retry *retry.BackoffHTTPRetry
}

// SplunkPump is a Tyk Pump driver for Splunk.
Expand Down Expand Up @@ -85,6 +85,8 @@ type SplunkPumpConfig struct {
// the amount of bytes, they're send anyways in each `purge_loop`. Default value is 838860800
// (~ 800 MB), the same default value as Splunk config.
BatchMaxContentLength int `json:"batch_max_content_length" mapstructure:"batch_max_content_length"`
// MaxRetries the maximum amount of retries if failed to send requests to splunk HEC. Default value is `0`
MaxRetries uint64 `json:"max_retries" mapstructure:"max_retries"`
}

// New initializes a new pump.
Expand Down Expand Up @@ -124,6 +126,11 @@ func (p *SplunkPump) Init(config interface{}) error {
p.config.BatchMaxContentLength = maxContentLength
}

if p.config.MaxRetries > 0 {
p.log.Infof("%d max retries", p.config.MaxRetries)
}

p.client.retry = retry.NewBackoffRetry("Failed writing data to Splunk", p.config.MaxRetries, p.client.httpClient, p.log)
p.log.Info(p.GetName() + " Initialized")

return nil
Expand Down Expand Up @@ -153,15 +160,6 @@ func (p *SplunkPump) WriteData(ctx context.Context, data []interface{}) error {

var batchBuffer bytes.Buffer

fnSendBytes := func(data []byte) error {
_, errSend := p.client.Send(ctx, data)
if errSend != nil {
p.log.Error("Error writing data to Splunk ", errSend)
return errSend
}
return nil
}

for _, v := range data {
decoded := v.(analytics.AnalyticsRecord)
apiKey := decoded.APIKey
Expand Down Expand Up @@ -253,22 +251,22 @@ func (p *SplunkPump) WriteData(ctx context.Context, data []interface{}) error {
if p.config.EnableBatch {
//if we're batching and the len of our data is already bigger than max_content_length, we send the data and reset the buffer
if batchBuffer.Len()+len(data) > p.config.BatchMaxContentLength {
if err := fnSendBytes(batchBuffer.Bytes()); err != nil {
if err := p.send(ctx, batchBuffer.Bytes()); err != nil {
return err
}
batchBuffer.Reset()
}
batchBuffer.Write(data)
} else {
if err := fnSendBytes(data); err != nil {
if err := p.send(ctx, data); err != nil {
return err
}
}
}

//this if is for data remaining in the buffer when len(buffer) is lower than max_content_length
if p.config.EnableBatch && batchBuffer.Len() > 0 {
if err := fnSendBytes(batchBuffer.Bytes()); err != nil {
if err := p.send(ctx, batchBuffer.Bytes()); err != nil {
return err
}
batchBuffer.Reset()
Expand Down Expand Up @@ -311,15 +309,15 @@ func NewSplunkClient(token string, collectorURL string, skipVerify bool, certFil
return c, nil
}

// Send sends an event to the Splunk HTTP Event Collector interface.
func (c *SplunkClient) Send(ctx context.Context, data []byte) (*http.Response, error) {

func (p *SplunkPump) send(ctx context.Context, data []byte) error {
reader := bytes.NewReader(data)
req, err := http.NewRequest("POST", c.CollectorURL, reader)
req, err := http.NewRequest(http.MethodPost, p.client.CollectorURL, reader)
if err != nil {
return nil, err
return err
}
req = req.WithContext(ctx)
req.Header.Add(authHeaderName, authHeaderPrefix+c.Token)
return c.httpClient.Do(req)
req.Header.Add(authHeaderName, authHeaderPrefix+p.client.Token)

p.log.Debugf("Sending %d bytes to splunk", len(data))
return p.client.retry.Send(req)
}
Loading
Loading