Skip to content

Commit

Permalink
Merging to release-1.9: TT-10676 Upgrade Resurface Pump backend (#773)
Browse files Browse the repository at this point in the history
TT-10676 Upgrade Resurface Pump backend (#773)

* feat(resurface): upgrade ResurfacePump backend

* fix(resurface): flush messages before checks

* feat(resurface): upgrade ResurfacePump backend

* linting

* improving context handling

* linting

---------

Co-authored-by: Ramón Márquez <[email protected]>
  • Loading branch information
buger and monrax authored Jan 29, 2024
1 parent 344473a commit eed47a4
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 34 deletions.
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ require (
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.16.0
github.com/quipo/statsd v0.0.0-20160923160612-75b7afedf0d2
github.com/resurfaceio/logger-go/v3 v3.2.1
github.com/resurfaceio/logger-go/v3 v3.3.2
github.com/robertkowalski/graylog-golang v0.0.0-20151121031040-e5295cfa2827
github.com/segmentio/analytics-go v0.0.0-20160711225931-bdb0aeca8a99
github.com/segmentio/kafka-go v0.3.6
Expand All @@ -55,6 +55,7 @@ require (
github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d // indirect
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect
github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 // indirect
github.com/andybalholm/brotli v1.0.5 // indirect
github.com/asaskevich/govalidator v0.0.0-20210307081110-f21760c49a8d // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.7.0 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.2.1 // indirect
Expand Down
10 changes: 4 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRF
github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 h1:s6gZFSlWYmbqAuRjVTiNNhvNRfY2Wxp9nhfyel4rklc=
github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137/go.mod h1:OMCwj8VM1Kc9e19TLln2VL61YJF0x1XFtfdL4JdbSyE=
github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883/go.mod h1:rCTlJbsFo29Kk6CurOXKm700vrz8f0KW0JNfpkRJY/8=
github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs=
github.com/andybalholm/brotli v1.0.5/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig=
github.com/apache/arrow/go/arrow v0.0.0-20191024131854-af6fa24be0db/go.mod h1:VTxUBvSJ3s3eHAg65PNgrsn5BtqCRPdmyXh6rAfdxN0=
github.com/asaskevich/govalidator v0.0.0-20210307081110-f21760c49a8d h1:Byv0BzEl3/e6D5CLfI0j/7hiIEtvGVFPCZ7Ei2oq8iQ=
github.com/asaskevich/govalidator v0.0.0-20210307081110-f21760c49a8d/go.mod h1:WaHUgvxTVq04UNunO+XhnAqY/wQc+bxr74GqbsZ/Jqw=
Expand Down Expand Up @@ -449,8 +451,8 @@ github.com/quipo/statsd v0.0.0-20160923160612-75b7afedf0d2 h1:IvjiJDGCF8L8TjKHQK
github.com/quipo/statsd v0.0.0-20160923160612-75b7afedf0d2/go.mod h1:1COUodqytMiv/GkAVUGhc0CA6e8xak5U4551TY7iEe0=
github.com/redis/go-redis/v9 v9.3.1 h1:KqdY8U+3X6z+iACvumCNxnoluToB+9Me+TvyFa21Mds=
github.com/redis/go-redis/v9 v9.3.1/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M=
github.com/resurfaceio/logger-go/v3 v3.2.1 h1:tTPvGp+FpH35aaT/nnhP4n/Rh/f1vHe64WoXTDgv0fY=
github.com/resurfaceio/logger-go/v3 v3.2.1/go.mod h1:YPcxFUcloW37F1WQA9MUcGWu2JzlvBxlCfFF5+T3GO8=
github.com/resurfaceio/logger-go/v3 v3.3.2 h1:bUjzvNM/g7Qcy8r3ctA71lCFf9gmpEghovmN125XXRs=
github.com/resurfaceio/logger-go/v3 v3.3.2/go.mod h1:ZswfY9nja+SfeAZiFnbFR1aHk7WCDR0is/NWvId7oT0=
github.com/retailnext/hllpp v1.0.1-0.20180308014038-101a6d2f8b52/go.mod h1:RDpi1RftBQPUCDRw6SmxeaREsAaRKnOclghuzp/WRzc=
github.com/robertkowalski/graylog-golang v0.0.0-20151121031040-e5295cfa2827 h1:D2Xs0bSuqpKnUOOlK4yu6lloeOs4+oD+pjbOfsxgWu0=
github.com/robertkowalski/graylog-golang v0.0.0-20151121031040-e5295cfa2827/go.mod h1:jONcYFk83vUF1lv0aERAwaFtDM9wUW4BMGmlnpLJyZY=
Expand Down Expand Up @@ -495,7 +497,6 @@ github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnIn
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.2.0/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
Expand All @@ -505,9 +506,6 @@ github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81P
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/syndtr/goleveldb v0.0.0-20190318030020-c3a204f8e965 h1:V/AztY/q2oW5ghho7YMgUJQkKvSACHRxpeDyT5DxpIo=
Expand Down
110 changes: 90 additions & 20 deletions pumps/resurface.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,19 @@ import (
"reflect"
"strconv"
"strings"
"sync"

"github.com/TykTechnologies/tyk-pump/analytics"
"github.com/mitchellh/mapstructure"
"github.com/resurfaceio/logger-go/v3"
logger "github.com/resurfaceio/logger-go/v3"
)

type ResurfacePump struct {
logger *logger.HttpLogger
config *ResurfacePumpConfig
logger *logger.HttpLogger
config *ResurfacePumpConfig
data chan []interface{}
wg sync.WaitGroup
enabled bool
CommonPumpConfig
}

Expand Down Expand Up @@ -49,6 +53,7 @@ func (rp *ResurfacePump) GetEnvPrefix() string {
}

func (rp *ResurfacePump) Init(config interface{}) error {
rp.wg = sync.WaitGroup{}
rp.config = &ResurfacePumpConfig{}
rp.log = log.WithField("prefix", resurfacePrefix)

Expand All @@ -74,10 +79,26 @@ func (rp *ResurfacePump) Init(config interface{}) error {
rp.log.Info(rp.GetName() + " Initialized (Logger disabled)")
return errors.New("logger is not enabled")
}
rp.initWorker()
rp.log.Info(rp.GetName() + " Initialized")
return nil
}

func (rp *ResurfacePump) initWorker() {
rp.data = make(chan []interface{}, 5)
rp.wg.Add(1)
go rp.writeData()
rp.enable()
}

func (rp *ResurfacePump) disable() {
rp.enabled = false
}

func (rp *ResurfacePump) enable() {
rp.enabled = true
}

func parseHeaders(headersString string, existingHeaders http.Header) (headers http.Header) {
if existingHeaders != nil {
headers = http.Header.Clone(existingHeaders)
Expand Down Expand Up @@ -210,30 +231,79 @@ func mapRawData(rec *analytics.AnalyticsRecord) (httpReq http.Request, httpResp
return
}

func (rp *ResurfacePump) writeData() {
defer rp.wg.Done()
for data := range rp.data {
for _, v := range data {
decoded, ok := v.(analytics.AnalyticsRecord)
if !ok {
rp.log.Error("Error decoding analytic record")
continue
}
if len(decoded.RawRequest) == 0 && len(decoded.RawResponse) == 0 {
rp.log.Warn("Record dropped. Please enable Detailed Logging.")
continue
}

req, resp, customFields, err := mapRawData(&decoded)
if err != nil {
rp.log.Error(err)
continue
}

logger.SendHttpMessage(rp.logger, &resp, &req, decoded.TimeStamp.Unix()*1000, decoded.RequestTime, customFields)
}
rp.log.Info("Wrote ", len(data), " records...")
}
}

func (rp *ResurfacePump) WriteData(ctx context.Context, data []interface{}) error {
rp.log.Debug("Writing ", len(data), " records")

for _, v := range data {
decoded, ok := v.(analytics.AnalyticsRecord)
if !ok {
rp.log.Error("Error decoding analytic record")
continue
if rp.enabled {
select {
case rp.data <- data:
rp.log.Info("Purged ", len(data), " records...")
case <-ctx.Done():
// Context has been cancelled or timed out
return ctx.Err()
}
if len(decoded.RawRequest) == 0 && len(decoded.RawResponse) == 0 {
rp.log.Warn("Record dropped. Please enable Detailed Logging.")
continue
}

req, resp, customFields, err := mapRawData(&decoded)
if err != nil {
rp.log.Error(err)
continue
} else {
select {
case peek, open := <-rp.data:
if open {
rp.data <- peek
close(rp.data)
}
case <-ctx.Done():
// Context has been cancelled or timed out
close(rp.data)
return ctx.Err()
default:
close(rp.data)
}
}
return nil
}

logger.SendHttpMessage(rp.logger, &resp, &req, decoded.TimeStamp.Unix()*1000, decoded.RequestTime, customFields)
func (rp *ResurfacePump) Flush() error {
rp.disable()
err := rp.WriteData(context.Background(), []interface{}{})
if err != nil {
return err
}
rp.wg.Wait()
rp.initWorker()

rp.log.Info("Purged ", len(data), " records...")
return nil
}

func (rp *ResurfacePump) Shutdown() error {
rp.logger.Stop()

err := rp.Flush()
if err != nil {
return err
}

return nil
}
27 changes: 20 additions & 7 deletions pumps/resurface_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,10 @@ func TestResurfaceWriteData(t *testing.T) {
}

err := pmp.WriteData(context.TODO(), recs)
assert.Nil(t, err, pmp.GetName()+"couldn't write records")
assert.Nil(t, err, pmp.GetName()+" couldn't write records")

err = pmp.Flush()
assert.Nil(t, err, pmp.GetName()+" couldn't flush records")

queue := pmp.logger.Queue()
assert.Equal(t, len(recs), len(queue))
Expand Down Expand Up @@ -162,7 +165,10 @@ func TestResurfaceWriteData(t *testing.T) {
TimeStamp: time.Now(),
},
})
assert.Nil(t, err, pmp.GetName()+"couldn't write records")
assert.Nil(t, err, pmp.GetName()+" couldn't write records")

err = pmp.Flush()
assert.Nil(t, err, pmp.GetName()+" couldn't flush records")

queue = pmp.logger.Queue()
assert.Equal(t, len(recs)+1, len(queue))
Expand Down Expand Up @@ -231,7 +237,10 @@ func TestResurfaceWriteCustomFields(t *testing.T) {
}

err := pmp.WriteData(context.TODO(), recs)
assert.Nil(t, err, pmp.GetName()+"couldn't write records")
assert.Nil(t, err, pmp.GetName()+" couldn't write records")

err = pmp.Flush()
assert.Nil(t, err, pmp.GetName()+" couldn't flush records")

queue := pmp.logger.Queue()
assert.Equal(t, len(recs), len(queue))
Expand Down Expand Up @@ -284,9 +293,10 @@ func TestResurfaceWriteChunkedResponse(t *testing.T) {
}

err := pmp.WriteData(context.TODO(), recs)
if err != nil {
t.Fatal(pmp.GetName()+"couldn't write records with err:", err)
}
assert.Nil(t, err, pmp.GetName()+" couldn't write records")

err = pmp.Flush()
assert.Nil(t, err, pmp.GetName()+" couldn't flush records")

queue := pmp.logger.Queue()
assert.Equal(t, len(recs), len(queue))
Expand Down Expand Up @@ -319,7 +329,10 @@ func TestResurfaceSkipWrite(t *testing.T) {
}

err := pmp.WriteData(context.TODO(), recs)
assert.Nil(t, err, pmp.GetName()+"couldn't write records")
assert.Nil(t, err, pmp.GetName()+" couldn't write records")

err = pmp.Flush()
assert.Nil(t, err, pmp.GetName()+" couldn't flush records")

queue := pmp.logger.Queue()
assert.Equal(t, 0, len(queue))
Expand Down

0 comments on commit eed47a4

Please sign in to comment.