diff --git a/go.mod b/go.mod index 06f34b5af..491dfb0d7 100644 --- a/go.mod +++ b/go.mod @@ -31,7 +31,7 @@ require ( github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.16.0 github.com/quipo/statsd v0.0.0-20160923160612-75b7afedf0d2 - github.com/resurfaceio/logger-go/v3 v3.2.1 + github.com/resurfaceio/logger-go/v3 v3.3.2 github.com/robertkowalski/graylog-golang v0.0.0-20151121031040-e5295cfa2827 github.com/segmentio/analytics-go v0.0.0-20160711225931-bdb0aeca8a99 github.com/segmentio/kafka-go v0.3.6 @@ -55,6 +55,7 @@ require ( github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d // indirect github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 // indirect + github.com/andybalholm/brotli v1.0.5 // indirect github.com/asaskevich/govalidator v0.0.0-20210307081110-f21760c49a8d // indirect github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.7.0 // indirect github.com/aws/aws-sdk-go-v2/internal/configsources v1.2.1 // indirect diff --git a/go.sum b/go.sum index 6e46923c1..7d781a4d0 100644 --- a/go.sum +++ b/go.sum @@ -51,6 +51,8 @@ github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRF github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 h1:s6gZFSlWYmbqAuRjVTiNNhvNRfY2Wxp9nhfyel4rklc= github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137/go.mod h1:OMCwj8VM1Kc9e19TLln2VL61YJF0x1XFtfdL4JdbSyE= github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883/go.mod h1:rCTlJbsFo29Kk6CurOXKm700vrz8f0KW0JNfpkRJY/8= +github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs= +github.com/andybalholm/brotli v1.0.5/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig= github.com/apache/arrow/go/arrow v0.0.0-20191024131854-af6fa24be0db/go.mod h1:VTxUBvSJ3s3eHAg65PNgrsn5BtqCRPdmyXh6rAfdxN0= github.com/asaskevich/govalidator v0.0.0-20210307081110-f21760c49a8d h1:Byv0BzEl3/e6D5CLfI0j/7hiIEtvGVFPCZ7Ei2oq8iQ= github.com/asaskevich/govalidator v0.0.0-20210307081110-f21760c49a8d/go.mod h1:WaHUgvxTVq04UNunO+XhnAqY/wQc+bxr74GqbsZ/Jqw= @@ -449,8 +451,8 @@ github.com/quipo/statsd v0.0.0-20160923160612-75b7afedf0d2 h1:IvjiJDGCF8L8TjKHQK github.com/quipo/statsd v0.0.0-20160923160612-75b7afedf0d2/go.mod h1:1COUodqytMiv/GkAVUGhc0CA6e8xak5U4551TY7iEe0= github.com/redis/go-redis/v9 v9.3.1 h1:KqdY8U+3X6z+iACvumCNxnoluToB+9Me+TvyFa21Mds= github.com/redis/go-redis/v9 v9.3.1/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M= -github.com/resurfaceio/logger-go/v3 v3.2.1 h1:tTPvGp+FpH35aaT/nnhP4n/Rh/f1vHe64WoXTDgv0fY= -github.com/resurfaceio/logger-go/v3 v3.2.1/go.mod h1:YPcxFUcloW37F1WQA9MUcGWu2JzlvBxlCfFF5+T3GO8= +github.com/resurfaceio/logger-go/v3 v3.3.2 h1:bUjzvNM/g7Qcy8r3ctA71lCFf9gmpEghovmN125XXRs= +github.com/resurfaceio/logger-go/v3 v3.3.2/go.mod h1:ZswfY9nja+SfeAZiFnbFR1aHk7WCDR0is/NWvId7oT0= github.com/retailnext/hllpp v1.0.1-0.20180308014038-101a6d2f8b52/go.mod h1:RDpi1RftBQPUCDRw6SmxeaREsAaRKnOclghuzp/WRzc= github.com/robertkowalski/graylog-golang v0.0.0-20151121031040-e5295cfa2827 h1:D2Xs0bSuqpKnUOOlK4yu6lloeOs4+oD+pjbOfsxgWu0= github.com/robertkowalski/graylog-golang v0.0.0-20151121031040-e5295cfa2827/go.mod h1:jONcYFk83vUF1lv0aERAwaFtDM9wUW4BMGmlnpLJyZY= @@ -495,7 +497,6 @@ github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnIn github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE= -github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/testify v1.2.0/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= @@ -505,9 +506,6 @@ github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81P github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= -github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/syndtr/goleveldb v0.0.0-20190318030020-c3a204f8e965 h1:V/AztY/q2oW5ghho7YMgUJQkKvSACHRxpeDyT5DxpIo= diff --git a/pumps/resurface.go b/pumps/resurface.go index e820e9cd3..fcf974e73 100644 --- a/pumps/resurface.go +++ b/pumps/resurface.go @@ -10,15 +10,19 @@ import ( "reflect" "strconv" "strings" + "sync" "github.com/TykTechnologies/tyk-pump/analytics" "github.com/mitchellh/mapstructure" - "github.com/resurfaceio/logger-go/v3" + logger "github.com/resurfaceio/logger-go/v3" ) type ResurfacePump struct { - logger *logger.HttpLogger - config *ResurfacePumpConfig + logger *logger.HttpLogger + config *ResurfacePumpConfig + data chan []interface{} + wg sync.WaitGroup + enabled bool CommonPumpConfig } @@ -49,6 +53,7 @@ func (rp *ResurfacePump) GetEnvPrefix() string { } func (rp *ResurfacePump) Init(config interface{}) error { + rp.wg = sync.WaitGroup{} rp.config = &ResurfacePumpConfig{} rp.log = log.WithField("prefix", resurfacePrefix) @@ -74,10 +79,26 @@ func (rp *ResurfacePump) Init(config interface{}) error { rp.log.Info(rp.GetName() + " Initialized (Logger disabled)") return errors.New("logger is not enabled") } + rp.initWorker() rp.log.Info(rp.GetName() + " Initialized") return nil } +func (rp *ResurfacePump) initWorker() { + rp.data = make(chan []interface{}, 5) + rp.wg.Add(1) + go rp.writeData() + rp.enable() +} + +func (rp *ResurfacePump) disable() { + rp.enabled = false +} + +func (rp *ResurfacePump) enable() { + rp.enabled = true +} + func parseHeaders(headersString string, existingHeaders http.Header) (headers http.Header) { if existingHeaders != nil { headers = http.Header.Clone(existingHeaders) @@ -210,30 +231,79 @@ func mapRawData(rec *analytics.AnalyticsRecord) (httpReq http.Request, httpResp return } +func (rp *ResurfacePump) writeData() { + defer rp.wg.Done() + for data := range rp.data { + for _, v := range data { + decoded, ok := v.(analytics.AnalyticsRecord) + if !ok { + rp.log.Error("Error decoding analytic record") + continue + } + if len(decoded.RawRequest) == 0 && len(decoded.RawResponse) == 0 { + rp.log.Warn("Record dropped. Please enable Detailed Logging.") + continue + } + + req, resp, customFields, err := mapRawData(&decoded) + if err != nil { + rp.log.Error(err) + continue + } + + logger.SendHttpMessage(rp.logger, &resp, &req, decoded.TimeStamp.Unix()*1000, decoded.RequestTime, customFields) + } + rp.log.Info("Wrote ", len(data), " records...") + } +} + func (rp *ResurfacePump) WriteData(ctx context.Context, data []interface{}) error { rp.log.Debug("Writing ", len(data), " records") - - for _, v := range data { - decoded, ok := v.(analytics.AnalyticsRecord) - if !ok { - rp.log.Error("Error decoding analytic record") - continue + if rp.enabled { + select { + case rp.data <- data: + rp.log.Info("Purged ", len(data), " records...") + case <-ctx.Done(): + // Context has been cancelled or timed out + return ctx.Err() } - if len(decoded.RawRequest) == 0 && len(decoded.RawResponse) == 0 { - rp.log.Warn("Record dropped. Please enable Detailed Logging.") - continue - } - - req, resp, customFields, err := mapRawData(&decoded) - if err != nil { - rp.log.Error(err) - continue + } else { + select { + case peek, open := <-rp.data: + if open { + rp.data <- peek + close(rp.data) + } + case <-ctx.Done(): + // Context has been cancelled or timed out + close(rp.data) + return ctx.Err() + default: + close(rp.data) } + } + return nil +} - logger.SendHttpMessage(rp.logger, &resp, &req, decoded.TimeStamp.Unix()*1000, decoded.RequestTime, customFields) +func (rp *ResurfacePump) Flush() error { + rp.disable() + err := rp.WriteData(context.Background(), []interface{}{}) + if err != nil { + return err } + rp.wg.Wait() + rp.initWorker() - rp.log.Info("Purged ", len(data), " records...") + return nil +} + +func (rp *ResurfacePump) Shutdown() error { + rp.logger.Stop() + + err := rp.Flush() + if err != nil { + return err + } return nil } diff --git a/pumps/resurface_test.go b/pumps/resurface_test.go index 7e6f8e861..4e2b9ca74 100644 --- a/pumps/resurface_test.go +++ b/pumps/resurface_test.go @@ -121,7 +121,10 @@ func TestResurfaceWriteData(t *testing.T) { } err := pmp.WriteData(context.TODO(), recs) - assert.Nil(t, err, pmp.GetName()+"couldn't write records") + assert.Nil(t, err, pmp.GetName()+" couldn't write records") + + err = pmp.Flush() + assert.Nil(t, err, pmp.GetName()+" couldn't flush records") queue := pmp.logger.Queue() assert.Equal(t, len(recs), len(queue)) @@ -162,7 +165,10 @@ func TestResurfaceWriteData(t *testing.T) { TimeStamp: time.Now(), }, }) - assert.Nil(t, err, pmp.GetName()+"couldn't write records") + assert.Nil(t, err, pmp.GetName()+" couldn't write records") + + err = pmp.Flush() + assert.Nil(t, err, pmp.GetName()+" couldn't flush records") queue = pmp.logger.Queue() assert.Equal(t, len(recs)+1, len(queue)) @@ -231,7 +237,10 @@ func TestResurfaceWriteCustomFields(t *testing.T) { } err := pmp.WriteData(context.TODO(), recs) - assert.Nil(t, err, pmp.GetName()+"couldn't write records") + assert.Nil(t, err, pmp.GetName()+" couldn't write records") + + err = pmp.Flush() + assert.Nil(t, err, pmp.GetName()+" couldn't flush records") queue := pmp.logger.Queue() assert.Equal(t, len(recs), len(queue)) @@ -284,9 +293,10 @@ func TestResurfaceWriteChunkedResponse(t *testing.T) { } err := pmp.WriteData(context.TODO(), recs) - if err != nil { - t.Fatal(pmp.GetName()+"couldn't write records with err:", err) - } + assert.Nil(t, err, pmp.GetName()+" couldn't write records") + + err = pmp.Flush() + assert.Nil(t, err, pmp.GetName()+" couldn't flush records") queue := pmp.logger.Queue() assert.Equal(t, len(recs), len(queue)) @@ -319,7 +329,10 @@ func TestResurfaceSkipWrite(t *testing.T) { } err := pmp.WriteData(context.TODO(), recs) - assert.Nil(t, err, pmp.GetName()+"couldn't write records") + assert.Nil(t, err, pmp.GetName()+" couldn't write records") + + err = pmp.Flush() + assert.Nil(t, err, pmp.GetName()+" couldn't flush records") queue := pmp.logger.Queue() assert.Equal(t, 0, len(queue))