Skip to content

Commit

Permalink
move to separate pkg outside of pumps
Browse files Browse the repository at this point in the history
  • Loading branch information
dominicriordan committed Nov 20, 2023
1 parent 9a66a92 commit 453607c
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 34 deletions.
29 changes: 12 additions & 17 deletions pumps/http-retry.go → http-retry/http-retry.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package pumps
package httpretry

import (
"context"
"errors"
"fmt"
"io"
"net/http"
Expand All @@ -13,24 +11,22 @@ import (
"github.com/sirupsen/logrus"
)

var errPerm = errors.New("bad request - not retrying")

type httpSender func(ctx context.Context, data []byte) (*http.Response, error)

type backoffRetry struct {
type BackoffHTTPRetry struct {
errMsg string
maxRetries uint64
logger *logrus.Entry
httpsend httpSender
httpclient *http.Client
}

func newBackoffRetry(errMsg string, maxRetries uint64, httpSend httpSender, logger *logrus.Entry) *backoffRetry {
return &backoffRetry{errMsg: errMsg, maxRetries: maxRetries, httpsend: httpSend, logger: logger}
// NewBackoffRetry Creates an exponential backoff retry to use httpClient for connections. Will retry if a temporary error or
// 5xx or 429 status code in response.
func NewBackoffRetry(errMsg string, maxRetries uint64, httpClient *http.Client, logger *logrus.Entry) *BackoffHTTPRetry {
return &BackoffHTTPRetry{errMsg: errMsg, maxRetries: maxRetries, httpclient: httpClient, logger: logger}
}

func (s *backoffRetry) send(ctx context.Context, data []byte) error {
func (s *BackoffHTTPRetry) Send(req *http.Request) error {
opFn := func() error {
resp, err := s.httpsend(ctx, data)
resp, err := s.httpclient.Do(req)
if err != nil {
return s.handleErr(err)
}
Expand All @@ -53,13 +49,11 @@ func (s *backoffRetry) send(ctx context.Context, data []byte) error {
}

return backoff.RetryNotify(opFn, backoff.WithMaxRetries(backoff.NewExponentialBackOff(), s.maxRetries), func(err error, t time.Duration) {
if err != nil {
s.logger.WithError(err).Errorf("%s retrying in %s", s.errMsg, t)
}
s.logger.WithError(err).Errorf("%s retrying in %s", s.errMsg, t)
})
}

func (s *backoffRetry) handleErr(err error) error {
func (s *BackoffHTTPRetry) handleErr(err error) error {
if e, ok := err.(*url.Error); ok {
if e.Temporary() {
// temp error, attempt retry
Expand All @@ -68,5 +62,6 @@ func (s *backoffRetry) handleErr(err error) error {
// permanent error - don't retry
return backoff.Permanent(err)
}
// anything else - retry
return err
}
37 changes: 20 additions & 17 deletions pumps/splunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ import (
"crypto/tls"
"encoding/json"
"errors"
"github.com/cenkalti/backoff/v4"
"github.com/mitchellh/mapstructure"
"net/http"
"net/url"
"strings"

"github.com/TykTechnologies/tyk-pump/analytics"
retry "github.com/TykTechnologies/tyk-pump/http-retry"
"github.com/mitchellh/mapstructure"
)

const (
Expand All @@ -36,7 +36,7 @@ type SplunkClient struct {
CollectorURL string
TLSSkipVerify bool
httpClient *http.Client
retry *backoffRetry
retry *retry.BackoffHTTPRetry
}

// SplunkPump is a Tyk Pump driver for Splunk.
Expand Down Expand Up @@ -126,19 +126,11 @@ func (p *SplunkPump) Init(config interface{}) error {
p.config.BatchMaxContentLength = maxContentLength
}

sender := func(ctx context.Context, data []byte) (*http.Response, error) {
reader := bytes.NewReader(data)
req, err := http.NewRequest(http.MethodPost, p.config.CollectorURL, reader)
if err != nil {
// invalid req, don't retry
return nil, backoff.Permanent(err)
}
req = req.WithContext(ctx)
req.Header.Add(authHeaderName, authHeaderPrefix+p.client.Token)
return p.client.httpClient.Do(req)
if p.config.MaxRetries > 0 {
p.log.Infof("%d max retries", p.config.MaxRetries)
}

p.client.retry = newBackoffRetry("Failed writing data to Splunk", p.config.MaxRetries, sender, p.log)
p.client.retry = retry.NewBackoffRetry("Failed writing data to Splunk", p.config.MaxRetries, p.client.httpClient, p.log)
p.log.Info(p.GetName() + " Initialized")

return nil
Expand Down Expand Up @@ -259,22 +251,22 @@ func (p *SplunkPump) WriteData(ctx context.Context, data []interface{}) error {
if p.config.EnableBatch {
//if we're batching and the len of our data is already bigger than max_content_length, we send the data and reset the buffer
if batchBuffer.Len()+len(data) > p.config.BatchMaxContentLength {
if err := p.client.retry.send(ctx, batchBuffer.Bytes()); err != nil {
if err := p.send(ctx, batchBuffer.Bytes()); err != nil {
return err
}
batchBuffer.Reset()
}
batchBuffer.Write(data)
} else {
if err := p.client.retry.send(ctx, data); err != nil {
if err := p.send(ctx, data); err != nil {
return err
}
}
}

//this if is for data remaining in the buffer when len(buffer) is lower than max_content_length
if p.config.EnableBatch && batchBuffer.Len() > 0 {
if err := p.client.retry.send(ctx, batchBuffer.Bytes()); err != nil {
if err := p.send(ctx, batchBuffer.Bytes()); err != nil {
return err
}
batchBuffer.Reset()
Expand Down Expand Up @@ -316,3 +308,14 @@ func NewSplunkClient(token string, collectorURL string, skipVerify bool, certFil
}
return c, nil
}

func (p *SplunkPump) send(ctx context.Context, data []byte) error {
reader := bytes.NewReader(data)
req, err := http.NewRequest(http.MethodPost, p.config.CollectorURL, reader)
if err != nil {
return err
}
req = req.WithContext(ctx)
req.Header.Add(authHeaderName, authHeaderPrefix+p.client.Token)
return p.client.retry.Send(req)
}

0 comments on commit 453607c

Please sign in to comment.