From 453607cd3e55ab6b774babb83c7589150057ef95 Mon Sep 17 00:00:00 2001 From: dominicriordan Date: Mon, 20 Nov 2023 08:38:37 +0000 Subject: [PATCH] move to separate pkg outside of pumps --- {pumps => http-retry}/http-retry.go | 29 ++++++++++------------ pumps/splunk.go | 37 ++++++++++++++++------------- 2 files changed, 32 insertions(+), 34 deletions(-) rename {pumps => http-retry}/http-retry.go (60%) diff --git a/pumps/http-retry.go b/http-retry/http-retry.go similarity index 60% rename from pumps/http-retry.go rename to http-retry/http-retry.go index 79251ee6a..d1d01220e 100644 --- a/pumps/http-retry.go +++ b/http-retry/http-retry.go @@ -1,8 +1,6 @@ -package pumps +package httpretry import ( - "context" - "errors" "fmt" "io" "net/http" @@ -13,24 +11,22 @@ import ( "github.com/sirupsen/logrus" ) -var errPerm = errors.New("bad request - not retrying") - -type httpSender func(ctx context.Context, data []byte) (*http.Response, error) - -type backoffRetry struct { +type BackoffHTTPRetry struct { errMsg string maxRetries uint64 logger *logrus.Entry - httpsend httpSender + httpclient *http.Client } -func newBackoffRetry(errMsg string, maxRetries uint64, httpSend httpSender, logger *logrus.Entry) *backoffRetry { - return &backoffRetry{errMsg: errMsg, maxRetries: maxRetries, httpsend: httpSend, logger: logger} +// NewBackoffRetry Creates an exponential backoff retry to use httpClient for connections. Will retry if a temporary error or +// 5xx or 429 status code in response. +func NewBackoffRetry(errMsg string, maxRetries uint64, httpClient *http.Client, logger *logrus.Entry) *BackoffHTTPRetry { + return &BackoffHTTPRetry{errMsg: errMsg, maxRetries: maxRetries, httpclient: httpClient, logger: logger} } -func (s *backoffRetry) send(ctx context.Context, data []byte) error { +func (s *BackoffHTTPRetry) Send(req *http.Request) error { opFn := func() error { - resp, err := s.httpsend(ctx, data) + resp, err := s.httpclient.Do(req) if err != nil { return s.handleErr(err) } @@ -53,13 +49,11 @@ func (s *backoffRetry) send(ctx context.Context, data []byte) error { } return backoff.RetryNotify(opFn, backoff.WithMaxRetries(backoff.NewExponentialBackOff(), s.maxRetries), func(err error, t time.Duration) { - if err != nil { - s.logger.WithError(err).Errorf("%s retrying in %s", s.errMsg, t) - } + s.logger.WithError(err).Errorf("%s retrying in %s", s.errMsg, t) }) } -func (s *backoffRetry) handleErr(err error) error { +func (s *BackoffHTTPRetry) handleErr(err error) error { if e, ok := err.(*url.Error); ok { if e.Temporary() { // temp error, attempt retry @@ -68,5 +62,6 @@ func (s *backoffRetry) handleErr(err error) error { // permanent error - don't retry return backoff.Permanent(err) } + // anything else - retry return err } diff --git a/pumps/splunk.go b/pumps/splunk.go index 87894389b..c19482006 100644 --- a/pumps/splunk.go +++ b/pumps/splunk.go @@ -6,13 +6,13 @@ import ( "crypto/tls" "encoding/json" "errors" - "github.com/cenkalti/backoff/v4" - "github.com/mitchellh/mapstructure" "net/http" "net/url" "strings" "github.com/TykTechnologies/tyk-pump/analytics" + retry "github.com/TykTechnologies/tyk-pump/http-retry" + "github.com/mitchellh/mapstructure" ) const ( @@ -36,7 +36,7 @@ type SplunkClient struct { CollectorURL string TLSSkipVerify bool httpClient *http.Client - retry *backoffRetry + retry *retry.BackoffHTTPRetry } // SplunkPump is a Tyk Pump driver for Splunk. @@ -126,19 +126,11 @@ func (p *SplunkPump) Init(config interface{}) error { p.config.BatchMaxContentLength = maxContentLength } - sender := func(ctx context.Context, data []byte) (*http.Response, error) { - reader := bytes.NewReader(data) - req, err := http.NewRequest(http.MethodPost, p.config.CollectorURL, reader) - if err != nil { - // invalid req, don't retry - return nil, backoff.Permanent(err) - } - req = req.WithContext(ctx) - req.Header.Add(authHeaderName, authHeaderPrefix+p.client.Token) - return p.client.httpClient.Do(req) + if p.config.MaxRetries > 0 { + p.log.Infof("%d max retries", p.config.MaxRetries) } - p.client.retry = newBackoffRetry("Failed writing data to Splunk", p.config.MaxRetries, sender, p.log) + p.client.retry = retry.NewBackoffRetry("Failed writing data to Splunk", p.config.MaxRetries, p.client.httpClient, p.log) p.log.Info(p.GetName() + " Initialized") return nil @@ -259,14 +251,14 @@ func (p *SplunkPump) WriteData(ctx context.Context, data []interface{}) error { if p.config.EnableBatch { //if we're batching and the len of our data is already bigger than max_content_length, we send the data and reset the buffer if batchBuffer.Len()+len(data) > p.config.BatchMaxContentLength { - if err := p.client.retry.send(ctx, batchBuffer.Bytes()); err != nil { + if err := p.send(ctx, batchBuffer.Bytes()); err != nil { return err } batchBuffer.Reset() } batchBuffer.Write(data) } else { - if err := p.client.retry.send(ctx, data); err != nil { + if err := p.send(ctx, data); err != nil { return err } } @@ -274,7 +266,7 @@ func (p *SplunkPump) WriteData(ctx context.Context, data []interface{}) error { //this if is for data remaining in the buffer when len(buffer) is lower than max_content_length if p.config.EnableBatch && batchBuffer.Len() > 0 { - if err := p.client.retry.send(ctx, batchBuffer.Bytes()); err != nil { + if err := p.send(ctx, batchBuffer.Bytes()); err != nil { return err } batchBuffer.Reset() @@ -316,3 +308,14 @@ func NewSplunkClient(token string, collectorURL string, skipVerify bool, certFil } return c, nil } + +func (p *SplunkPump) send(ctx context.Context, data []byte) error { + reader := bytes.NewReader(data) + req, err := http.NewRequest(http.MethodPost, p.config.CollectorURL, reader) + if err != nil { + return err + } + req = req.WithContext(ctx) + req.Header.Add(authHeaderName, authHeaderPrefix+p.client.Token) + return p.client.retry.Send(req) +}