Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TT-10676 Upgrade Resurface Pump backend #773

Merged
merged 11 commits into from
Jan 26, 2024
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ require (
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.16.0
github.com/quipo/statsd v0.0.0-20160923160612-75b7afedf0d2
github.com/resurfaceio/logger-go/v3 v3.2.1
github.com/resurfaceio/logger-go/v3 v3.3.2
github.com/robertkowalski/graylog-golang v0.0.0-20151121031040-e5295cfa2827
github.com/segmentio/analytics-go v0.0.0-20160711225931-bdb0aeca8a99
github.com/segmentio/kafka-go v0.3.6
Expand All @@ -55,6 +55,7 @@ require (
github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d // indirect
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect
github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 // indirect
github.com/andybalholm/brotli v1.0.5 // indirect
github.com/asaskevich/govalidator v0.0.0-20210307081110-f21760c49a8d // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.7.0 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.2.1 // indirect
Expand Down
10 changes: 4 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRF
github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 h1:s6gZFSlWYmbqAuRjVTiNNhvNRfY2Wxp9nhfyel4rklc=
github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137/go.mod h1:OMCwj8VM1Kc9e19TLln2VL61YJF0x1XFtfdL4JdbSyE=
github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883/go.mod h1:rCTlJbsFo29Kk6CurOXKm700vrz8f0KW0JNfpkRJY/8=
github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs=
github.com/andybalholm/brotli v1.0.5/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig=
github.com/apache/arrow/go/arrow v0.0.0-20191024131854-af6fa24be0db/go.mod h1:VTxUBvSJ3s3eHAg65PNgrsn5BtqCRPdmyXh6rAfdxN0=
github.com/asaskevich/govalidator v0.0.0-20210307081110-f21760c49a8d h1:Byv0BzEl3/e6D5CLfI0j/7hiIEtvGVFPCZ7Ei2oq8iQ=
github.com/asaskevich/govalidator v0.0.0-20210307081110-f21760c49a8d/go.mod h1:WaHUgvxTVq04UNunO+XhnAqY/wQc+bxr74GqbsZ/Jqw=
Expand Down Expand Up @@ -449,8 +451,8 @@ github.com/quipo/statsd v0.0.0-20160923160612-75b7afedf0d2 h1:IvjiJDGCF8L8TjKHQK
github.com/quipo/statsd v0.0.0-20160923160612-75b7afedf0d2/go.mod h1:1COUodqytMiv/GkAVUGhc0CA6e8xak5U4551TY7iEe0=
github.com/redis/go-redis/v9 v9.3.1 h1:KqdY8U+3X6z+iACvumCNxnoluToB+9Me+TvyFa21Mds=
github.com/redis/go-redis/v9 v9.3.1/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M=
github.com/resurfaceio/logger-go/v3 v3.2.1 h1:tTPvGp+FpH35aaT/nnhP4n/Rh/f1vHe64WoXTDgv0fY=
github.com/resurfaceio/logger-go/v3 v3.2.1/go.mod h1:YPcxFUcloW37F1WQA9MUcGWu2JzlvBxlCfFF5+T3GO8=
github.com/resurfaceio/logger-go/v3 v3.3.2 h1:bUjzvNM/g7Qcy8r3ctA71lCFf9gmpEghovmN125XXRs=
github.com/resurfaceio/logger-go/v3 v3.3.2/go.mod h1:ZswfY9nja+SfeAZiFnbFR1aHk7WCDR0is/NWvId7oT0=
github.com/retailnext/hllpp v1.0.1-0.20180308014038-101a6d2f8b52/go.mod h1:RDpi1RftBQPUCDRw6SmxeaREsAaRKnOclghuzp/WRzc=
github.com/robertkowalski/graylog-golang v0.0.0-20151121031040-e5295cfa2827 h1:D2Xs0bSuqpKnUOOlK4yu6lloeOs4+oD+pjbOfsxgWu0=
github.com/robertkowalski/graylog-golang v0.0.0-20151121031040-e5295cfa2827/go.mod h1:jONcYFk83vUF1lv0aERAwaFtDM9wUW4BMGmlnpLJyZY=
Expand Down Expand Up @@ -495,7 +497,6 @@ github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnIn
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.2.0/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
Expand All @@ -505,9 +506,6 @@ github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81P
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/syndtr/goleveldb v0.0.0-20190318030020-c3a204f8e965 h1:V/AztY/q2oW5ghho7YMgUJQkKvSACHRxpeDyT5DxpIo=
Expand Down
110 changes: 90 additions & 20 deletions pumps/resurface.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,19 @@ import (
"reflect"
"strconv"
"strings"
"sync"

"github.com/TykTechnologies/tyk-pump/analytics"
"github.com/mitchellh/mapstructure"
"github.com/resurfaceio/logger-go/v3"
logger "github.com/resurfaceio/logger-go/v3"
)

type ResurfacePump struct {
logger *logger.HttpLogger
config *ResurfacePumpConfig
logger *logger.HttpLogger
config *ResurfacePumpConfig
data chan []interface{}
wg sync.WaitGroup
enabled bool
CommonPumpConfig
}

Expand Down Expand Up @@ -49,6 +53,7 @@ func (rp *ResurfacePump) GetEnvPrefix() string {
}

func (rp *ResurfacePump) Init(config interface{}) error {
rp.wg = sync.WaitGroup{}
rp.config = &ResurfacePumpConfig{}
rp.log = log.WithField("prefix", resurfacePrefix)

Expand All @@ -74,10 +79,26 @@ func (rp *ResurfacePump) Init(config interface{}) error {
rp.log.Info(rp.GetName() + " Initialized (Logger disabled)")
return errors.New("logger is not enabled")
}
rp.initWorker()
rp.log.Info(rp.GetName() + " Initialized")
return nil
}

func (rp *ResurfacePump) initWorker() {
rp.data = make(chan []interface{}, 5)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@monrax could I ask you again what's the reason why the size of this channel is hardcoded to 5?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hi @mativm02 , sorry for the late reply -- Good catch! Ideally, it would be 1 instead of 5. Although, after some load testing I found this to be a good queue depth. I do agree it would be better to make it configurable by defining a new parameter for this pump backend. Please, let me know if this is a priority for y'all and I'll make sure to submit a new PR with this change. Thanks!

rp.wg.Add(1)
go rp.writeData()
rp.enable()
}

func (rp *ResurfacePump) disable() {
rp.enabled = false
}

func (rp *ResurfacePump) enable() {
rp.enabled = true
}

func parseHeaders(headersString string, existingHeaders http.Header) (headers http.Header) {
if existingHeaders != nil {
headers = http.Header.Clone(existingHeaders)
Expand Down Expand Up @@ -210,30 +231,79 @@ func mapRawData(rec *analytics.AnalyticsRecord) (httpReq http.Request, httpResp
return
}

func (rp *ResurfacePump) writeData() {
defer rp.wg.Done()
for data := range rp.data {
for _, v := range data {
decoded, ok := v.(analytics.AnalyticsRecord)
if !ok {
rp.log.Error("Error decoding analytic record")
continue
}
if len(decoded.RawRequest) == 0 && len(decoded.RawResponse) == 0 {
rp.log.Warn("Record dropped. Please enable Detailed Logging.")
continue
}

req, resp, customFields, err := mapRawData(&decoded)
if err != nil {
rp.log.Error(err)
continue
}

logger.SendHttpMessage(rp.logger, &resp, &req, decoded.TimeStamp.Unix()*1000, decoded.RequestTime, customFields)
}
rp.log.Info("Wrote ", len(data), " records...")
}
}

func (rp *ResurfacePump) WriteData(ctx context.Context, data []interface{}) error {
rp.log.Debug("Writing ", len(data), " records")

for _, v := range data {
decoded, ok := v.(analytics.AnalyticsRecord)
if !ok {
rp.log.Error("Error decoding analytic record")
continue
if rp.enabled {
select {
case rp.data <- data:
rp.log.Info("Purged ", len(data), " records...")
case <-ctx.Done():
// Context has been cancelled or timed out
return ctx.Err()
}
if len(decoded.RawRequest) == 0 && len(decoded.RawResponse) == 0 {
rp.log.Warn("Record dropped. Please enable Detailed Logging.")
continue
}

req, resp, customFields, err := mapRawData(&decoded)
if err != nil {
rp.log.Error(err)
continue
} else {
select {
case peek, open := <-rp.data:
if open {
rp.data <- peek
close(rp.data)
}
case <-ctx.Done():
// Context has been cancelled or timed out
close(rp.data)
return ctx.Err()
default:
close(rp.data)
}
}
return nil
}

logger.SendHttpMessage(rp.logger, &resp, &req, decoded.TimeStamp.Unix()*1000, decoded.RequestTime, customFields)
func (rp *ResurfacePump) Flush() error {
rp.disable()
err := rp.WriteData(context.Background(), []interface{}{})
if err != nil {
return err
}
rp.wg.Wait()
rp.initWorker()

rp.log.Info("Purged ", len(data), " records...")
return nil
}

func (rp *ResurfacePump) Shutdown() error {
rp.logger.Stop()

err := rp.Flush()
if err != nil {
return err
}

return nil
}
27 changes: 20 additions & 7 deletions pumps/resurface_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,10 @@ func TestResurfaceWriteData(t *testing.T) {
}

err := pmp.WriteData(context.TODO(), recs)
assert.Nil(t, err, pmp.GetName()+"couldn't write records")
assert.Nil(t, err, pmp.GetName()+" couldn't write records")

err = pmp.Flush()
assert.Nil(t, err, pmp.GetName()+" couldn't flush records")

queue := pmp.logger.Queue()
assert.Equal(t, len(recs), len(queue))
Expand Down Expand Up @@ -162,7 +165,10 @@ func TestResurfaceWriteData(t *testing.T) {
TimeStamp: time.Now(),
},
})
assert.Nil(t, err, pmp.GetName()+"couldn't write records")
assert.Nil(t, err, pmp.GetName()+" couldn't write records")

err = pmp.Flush()
assert.Nil(t, err, pmp.GetName()+" couldn't flush records")

queue = pmp.logger.Queue()
assert.Equal(t, len(recs)+1, len(queue))
Expand Down Expand Up @@ -231,7 +237,10 @@ func TestResurfaceWriteCustomFields(t *testing.T) {
}

err := pmp.WriteData(context.TODO(), recs)
assert.Nil(t, err, pmp.GetName()+"couldn't write records")
assert.Nil(t, err, pmp.GetName()+" couldn't write records")

err = pmp.Flush()
assert.Nil(t, err, pmp.GetName()+" couldn't flush records")

queue := pmp.logger.Queue()
assert.Equal(t, len(recs), len(queue))
Expand Down Expand Up @@ -284,9 +293,10 @@ func TestResurfaceWriteChunkedResponse(t *testing.T) {
}

err := pmp.WriteData(context.TODO(), recs)
if err != nil {
t.Fatal(pmp.GetName()+"couldn't write records with err:", err)
}
assert.Nil(t, err, pmp.GetName()+" couldn't write records")

err = pmp.Flush()
assert.Nil(t, err, pmp.GetName()+" couldn't flush records")

queue := pmp.logger.Queue()
assert.Equal(t, len(recs), len(queue))
Expand Down Expand Up @@ -319,7 +329,10 @@ func TestResurfaceSkipWrite(t *testing.T) {
}

err := pmp.WriteData(context.TODO(), recs)
assert.Nil(t, err, pmp.GetName()+"couldn't write records")
assert.Nil(t, err, pmp.GetName()+" couldn't write records")

err = pmp.Flush()
assert.Nil(t, err, pmp.GetName()+" couldn't flush records")

queue := pmp.logger.Queue()
assert.Equal(t, 0, len(queue))
Expand Down
Loading