From 102bd97aa1b68d5b7598f0a43ede9fb0a7f32083 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ram=C3=B3n=20M=C3=A1rquez?= Date: Thu, 5 Oct 2023 15:37:24 -0500 Subject: [PATCH 1/6] feat(resurface): upgrade ResurfacePump backend --- go.mod | 3 ++- go.sum | 6 ++++-- pumps/resurface.go | 53 +++++++++++++++++++++++++++++----------------- 3 files changed, 39 insertions(+), 23 deletions(-) diff --git a/go.mod b/go.mod index 774492a02..5c5f412b2 100644 --- a/go.mod +++ b/go.mod @@ -31,7 +31,7 @@ require ( github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.16.0 github.com/quipo/statsd v0.0.0-20160923160612-75b7afedf0d2 - github.com/resurfaceio/logger-go/v3 v3.2.1 + github.com/resurfaceio/logger-go/v3 v3.3.1 github.com/robertkowalski/graylog-golang v0.0.0-20151121031040-e5295cfa2827 github.com/segmentio/analytics-go v0.0.0-20160711225931-bdb0aeca8a99 github.com/segmentio/kafka-go v0.3.6 @@ -58,6 +58,7 @@ require ( github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d // indirect github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 // indirect + github.com/andybalholm/brotli v1.0.5 // indirect github.com/asaskevich/govalidator v0.0.0-20210307081110-f21760c49a8d // indirect github.com/aws/aws-sdk-go-v2/credentials v1.5.0 // indirect github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.7.0 // indirect diff --git a/go.sum b/go.sum index 9cc9c15b9..239e0cbd8 100644 --- a/go.sum +++ b/go.sum @@ -66,6 +66,8 @@ github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRF github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 h1:s6gZFSlWYmbqAuRjVTiNNhvNRfY2Wxp9nhfyel4rklc= github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137/go.mod h1:OMCwj8VM1Kc9e19TLln2VL61YJF0x1XFtfdL4JdbSyE= github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883/go.mod h1:rCTlJbsFo29Kk6CurOXKm700vrz8f0KW0JNfpkRJY/8= +github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs= +github.com/andybalholm/brotli v1.0.5/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig= github.com/apache/arrow/go/arrow v0.0.0-20191024131854-af6fa24be0db/go.mod h1:VTxUBvSJ3s3eHAg65PNgrsn5BtqCRPdmyXh6rAfdxN0= github.com/arbovm/levenshtein v0.0.0-20160628152529-48b4e1c0c4d0/go.mod h1:t2tdKJDJF9BV14lnkjHmOQgcvEKgtqs5a1N3LNdJhGE= github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= @@ -626,8 +628,8 @@ github.com/quipo/statsd v0.0.0-20160923160612-75b7afedf0d2/go.mod h1:1COUodqytMi github.com/r3labs/sse/v2 v2.8.1 h1:lZH+W4XOLIq88U5MIHOsLec7+R62uhz3bIi2yn0Sg8o= github.com/r3labs/sse/v2 v2.8.1/go.mod h1:Igau6Whc+F17QUgML1fYe1VPZzTV6EMCnYktEmkNJ7I= github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= -github.com/resurfaceio/logger-go/v3 v3.2.1 h1:tTPvGp+FpH35aaT/nnhP4n/Rh/f1vHe64WoXTDgv0fY= -github.com/resurfaceio/logger-go/v3 v3.2.1/go.mod h1:YPcxFUcloW37F1WQA9MUcGWu2JzlvBxlCfFF5+T3GO8= +github.com/resurfaceio/logger-go/v3 v3.3.1 h1:txfI96jAi0DbQAwREKJTJjimXhUyDCRlTuo1nrmXE3A= +github.com/resurfaceio/logger-go/v3 v3.3.1/go.mod h1:ZswfY9nja+SfeAZiFnbFR1aHk7WCDR0is/NWvId7oT0= github.com/retailnext/hllpp v1.0.1-0.20180308014038-101a6d2f8b52/go.mod h1:RDpi1RftBQPUCDRw6SmxeaREsAaRKnOclghuzp/WRzc= github.com/robertkowalski/graylog-golang v0.0.0-20151121031040-e5295cfa2827 h1:D2Xs0bSuqpKnUOOlK4yu6lloeOs4+oD+pjbOfsxgWu0= github.com/robertkowalski/graylog-golang v0.0.0-20151121031040-e5295cfa2827/go.mod h1:jONcYFk83vUF1lv0aERAwaFtDM9wUW4BMGmlnpLJyZY= diff --git a/pumps/resurface.go b/pumps/resurface.go index e820e9cd3..5fb4d9f63 100644 --- a/pumps/resurface.go +++ b/pumps/resurface.go @@ -19,6 +19,7 @@ import ( type ResurfacePump struct { logger *logger.HttpLogger config *ResurfacePumpConfig + data chan []interface{} CommonPumpConfig } @@ -74,6 +75,8 @@ func (rp *ResurfacePump) Init(config interface{}) error { rp.log.Info(rp.GetName() + " Initialized (Logger disabled)") return errors.New("logger is not enabled") } + rp.data = make(chan []interface{}, 5) + go rp.writeData() rp.log.Info(rp.GetName() + " Initialized") return nil } @@ -210,30 +213,40 @@ func mapRawData(rec *analytics.AnalyticsRecord) (httpReq http.Request, httpResp return } -func (rp *ResurfacePump) WriteData(ctx context.Context, data []interface{}) error { - rp.log.Debug("Writing ", len(data), " records") - - for _, v := range data { - decoded, ok := v.(analytics.AnalyticsRecord) - if !ok { - rp.log.Error("Error decoding analytic record") - continue +func (rp *ResurfacePump) writeData() { + for data := range rp.data { + for _, v := range data { + decoded, ok := v.(analytics.AnalyticsRecord) + if !ok { + rp.log.Error("Error decoding analytic record") + continue + } + if len(decoded.RawRequest) == 0 && len(decoded.RawResponse) == 0 { + rp.log.Warn("Record dropped. Please enable Detailed Logging.") + continue + } + + req, resp, customFields, err := mapRawData(&decoded) + if err != nil { + rp.log.Error(err) + continue + } + + logger.SendHttpMessage(rp.logger, &resp, &req, decoded.TimeStamp.Unix()*1000, decoded.RequestTime, customFields) } - if len(decoded.RawRequest) == 0 && len(decoded.RawResponse) == 0 { - rp.log.Warn("Record dropped. Please enable Detailed Logging.") - continue - } - - req, resp, customFields, err := mapRawData(&decoded) - if err != nil { - rp.log.Error(err) - continue - } - - logger.SendHttpMessage(rp.logger, &resp, &req, decoded.TimeStamp.Unix()*1000, decoded.RequestTime, customFields) + rp.log.Info("Wrote ", len(data), " records...") } +} +func (rp *ResurfacePump) WriteData(ctx context.Context, data []interface{}) error { + rp.log.Debug("Writing ", len(data), " records") + rp.data <- data rp.log.Info("Purged ", len(data), " records...") return nil } + +func (rp *ResurfacePump) Shutdown() error { + rp.logger.Stop() + return nil +} From 97f69dea3e8f2ec8f57322bb9984e1a77fc1c225 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ram=C3=B3n=20M=C3=A1rquez?= Date: Thu, 12 Oct 2023 14:05:22 -0500 Subject: [PATCH 2/6] fix(resurface): flush messages before checks --- pumps/resurface.go | 62 ++++++++++++++++++++++++++++++++++++----- pumps/resurface_test.go | 27 +++++++++++++----- 2 files changed, 75 insertions(+), 14 deletions(-) diff --git a/pumps/resurface.go b/pumps/resurface.go index 5fb4d9f63..51d44dd80 100644 --- a/pumps/resurface.go +++ b/pumps/resurface.go @@ -10,6 +10,7 @@ import ( "reflect" "strconv" "strings" + "sync" "github.com/TykTechnologies/tyk-pump/analytics" "github.com/mitchellh/mapstructure" @@ -17,9 +18,11 @@ import ( ) type ResurfacePump struct { - logger *logger.HttpLogger - config *ResurfacePumpConfig - data chan []interface{} + logger *logger.HttpLogger + config *ResurfacePumpConfig + data chan []interface{} + wg sync.WaitGroup + enabled bool CommonPumpConfig } @@ -50,6 +53,7 @@ func (rp *ResurfacePump) GetEnvPrefix() string { } func (rp *ResurfacePump) Init(config interface{}) error { + rp.wg = sync.WaitGroup{} rp.config = &ResurfacePumpConfig{} rp.log = log.WithField("prefix", resurfacePrefix) @@ -75,12 +79,26 @@ func (rp *ResurfacePump) Init(config interface{}) error { rp.log.Info(rp.GetName() + " Initialized (Logger disabled)") return errors.New("logger is not enabled") } - rp.data = make(chan []interface{}, 5) - go rp.writeData() + rp.initWorker() rp.log.Info(rp.GetName() + " Initialized") return nil } +func (rp *ResurfacePump) initWorker() { + rp.data = make(chan []interface{}, 5) + rp.wg.Add(1) + go rp.writeData() + rp.enable() +} + +func (rp *ResurfacePump) disable() { + rp.enabled = false +} + +func (rp *ResurfacePump) enable() { + rp.enabled = true +} + func parseHeaders(headersString string, existingHeaders http.Header) (headers http.Header) { if existingHeaders != nil { headers = http.Header.Clone(existingHeaders) @@ -214,6 +232,7 @@ func mapRawData(rec *analytics.AnalyticsRecord) (httpReq http.Request, httpResp } func (rp *ResurfacePump) writeData() { + defer rp.wg.Done() for data := range rp.data { for _, v := range data { decoded, ok := v.(analytics.AnalyticsRecord) @@ -240,13 +259,42 @@ func (rp *ResurfacePump) writeData() { func (rp *ResurfacePump) WriteData(ctx context.Context, data []interface{}) error { rp.log.Debug("Writing ", len(data), " records") - rp.data <- data - rp.log.Info("Purged ", len(data), " records...") + if rp.enabled { + rp.data <- data + rp.log.Info("Purged ", len(data), " records...") + } else { + select { + case peek, open := <-rp.data: + if open { + rp.data <- peek + close(rp.data) + } + default: + close(rp.data) + } + } + return nil +} + +func (rp *ResurfacePump) Flush() error { + rp.disable() + err := rp.WriteData(context.TODO(), []interface{}{}) + if err != nil { + return err + } + rp.wg.Wait() + rp.initWorker() return nil } func (rp *ResurfacePump) Shutdown() error { rp.logger.Stop() + + err := rp.Flush() + if err != nil { + return err + } + return nil } diff --git a/pumps/resurface_test.go b/pumps/resurface_test.go index 7e6f8e861..4e2b9ca74 100644 --- a/pumps/resurface_test.go +++ b/pumps/resurface_test.go @@ -121,7 +121,10 @@ func TestResurfaceWriteData(t *testing.T) { } err := pmp.WriteData(context.TODO(), recs) - assert.Nil(t, err, pmp.GetName()+"couldn't write records") + assert.Nil(t, err, pmp.GetName()+" couldn't write records") + + err = pmp.Flush() + assert.Nil(t, err, pmp.GetName()+" couldn't flush records") queue := pmp.logger.Queue() assert.Equal(t, len(recs), len(queue)) @@ -162,7 +165,10 @@ func TestResurfaceWriteData(t *testing.T) { TimeStamp: time.Now(), }, }) - assert.Nil(t, err, pmp.GetName()+"couldn't write records") + assert.Nil(t, err, pmp.GetName()+" couldn't write records") + + err = pmp.Flush() + assert.Nil(t, err, pmp.GetName()+" couldn't flush records") queue = pmp.logger.Queue() assert.Equal(t, len(recs)+1, len(queue)) @@ -231,7 +237,10 @@ func TestResurfaceWriteCustomFields(t *testing.T) { } err := pmp.WriteData(context.TODO(), recs) - assert.Nil(t, err, pmp.GetName()+"couldn't write records") + assert.Nil(t, err, pmp.GetName()+" couldn't write records") + + err = pmp.Flush() + assert.Nil(t, err, pmp.GetName()+" couldn't flush records") queue := pmp.logger.Queue() assert.Equal(t, len(recs), len(queue)) @@ -284,9 +293,10 @@ func TestResurfaceWriteChunkedResponse(t *testing.T) { } err := pmp.WriteData(context.TODO(), recs) - if err != nil { - t.Fatal(pmp.GetName()+"couldn't write records with err:", err) - } + assert.Nil(t, err, pmp.GetName()+" couldn't write records") + + err = pmp.Flush() + assert.Nil(t, err, pmp.GetName()+" couldn't flush records") queue := pmp.logger.Queue() assert.Equal(t, len(recs), len(queue)) @@ -319,7 +329,10 @@ func TestResurfaceSkipWrite(t *testing.T) { } err := pmp.WriteData(context.TODO(), recs) - assert.Nil(t, err, pmp.GetName()+"couldn't write records") + assert.Nil(t, err, pmp.GetName()+" couldn't write records") + + err = pmp.Flush() + assert.Nil(t, err, pmp.GetName()+" couldn't flush records") queue := pmp.logger.Queue() assert.Equal(t, 0, len(queue)) From 888d6b5ba5b7efce3ff4acebdd4a123ad06fc249 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ram=C3=B3n=20M=C3=A1rquez?= Date: Thu, 5 Oct 2023 15:37:24 -0500 Subject: [PATCH 3/6] feat(resurface): upgrade ResurfacePump backend --- go.mod | 3 +- go.sum | 6 ++- pumps/resurface.go | 101 ++++++++++++++++++++++++++++++++-------- pumps/resurface_test.go | 27 ++++++++--- 4 files changed, 107 insertions(+), 30 deletions(-) diff --git a/go.mod b/go.mod index 9f4c59a0d..e876ec7a1 100644 --- a/go.mod +++ b/go.mod @@ -29,7 +29,7 @@ require ( github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.16.0 github.com/quipo/statsd v0.0.0-20160923160612-75b7afedf0d2 - github.com/resurfaceio/logger-go/v3 v3.2.1 + github.com/resurfaceio/logger-go/v3 v3.3.2 github.com/robertkowalski/graylog-golang v0.0.0-20151121031040-e5295cfa2827 github.com/segmentio/analytics-go v0.0.0-20160711225931-bdb0aeca8a99 github.com/segmentio/kafka-go v0.3.6 @@ -53,6 +53,7 @@ require ( github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d // indirect github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 // indirect + github.com/andybalholm/brotli v1.0.5 // indirect github.com/asaskevich/govalidator v0.0.0-20210307081110-f21760c49a8d // indirect github.com/aws/aws-sdk-go-v2/credentials v1.5.0 // indirect github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.7.0 // indirect diff --git a/go.sum b/go.sum index 959530c88..3cd1d7b9e 100644 --- a/go.sum +++ b/go.sum @@ -51,6 +51,8 @@ github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRF github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 h1:s6gZFSlWYmbqAuRjVTiNNhvNRfY2Wxp9nhfyel4rklc= github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137/go.mod h1:OMCwj8VM1Kc9e19TLln2VL61YJF0x1XFtfdL4JdbSyE= github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883/go.mod h1:rCTlJbsFo29Kk6CurOXKm700vrz8f0KW0JNfpkRJY/8= +github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs= +github.com/andybalholm/brotli v1.0.5/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig= github.com/apache/arrow/go/arrow v0.0.0-20191024131854-af6fa24be0db/go.mod h1:VTxUBvSJ3s3eHAg65PNgrsn5BtqCRPdmyXh6rAfdxN0= github.com/asaskevich/govalidator v0.0.0-20210307081110-f21760c49a8d h1:Byv0BzEl3/e6D5CLfI0j/7hiIEtvGVFPCZ7Ei2oq8iQ= github.com/asaskevich/govalidator v0.0.0-20210307081110-f21760c49a8d/go.mod h1:WaHUgvxTVq04UNunO+XhnAqY/wQc+bxr74GqbsZ/Jqw= @@ -451,8 +453,8 @@ github.com/prometheus/procfs v0.10.1 h1:kYK1Va/YMlutzCGazswoHKo//tZVlFpKYh+Pymzi github.com/prometheus/procfs v0.10.1/go.mod h1:nwNm2aOCAYw8uTR/9bWRREkZFxAUcWzPHWJq+XBB/FM= github.com/quipo/statsd v0.0.0-20160923160612-75b7afedf0d2 h1:IvjiJDGCF8L8TjKHQKmLAjWztpKDCAaRifiRMdGzWk0= github.com/quipo/statsd v0.0.0-20160923160612-75b7afedf0d2/go.mod h1:1COUodqytMiv/GkAVUGhc0CA6e8xak5U4551TY7iEe0= -github.com/resurfaceio/logger-go/v3 v3.2.1 h1:tTPvGp+FpH35aaT/nnhP4n/Rh/f1vHe64WoXTDgv0fY= -github.com/resurfaceio/logger-go/v3 v3.2.1/go.mod h1:YPcxFUcloW37F1WQA9MUcGWu2JzlvBxlCfFF5+T3GO8= +github.com/resurfaceio/logger-go/v3 v3.3.2 h1:bUjzvNM/g7Qcy8r3ctA71lCFf9gmpEghovmN125XXRs= +github.com/resurfaceio/logger-go/v3 v3.3.2/go.mod h1:ZswfY9nja+SfeAZiFnbFR1aHk7WCDR0is/NWvId7oT0= github.com/retailnext/hllpp v1.0.1-0.20180308014038-101a6d2f8b52/go.mod h1:RDpi1RftBQPUCDRw6SmxeaREsAaRKnOclghuzp/WRzc= github.com/robertkowalski/graylog-golang v0.0.0-20151121031040-e5295cfa2827 h1:D2Xs0bSuqpKnUOOlK4yu6lloeOs4+oD+pjbOfsxgWu0= github.com/robertkowalski/graylog-golang v0.0.0-20151121031040-e5295cfa2827/go.mod h1:jONcYFk83vUF1lv0aERAwaFtDM9wUW4BMGmlnpLJyZY= diff --git a/pumps/resurface.go b/pumps/resurface.go index e820e9cd3..51d44dd80 100644 --- a/pumps/resurface.go +++ b/pumps/resurface.go @@ -10,6 +10,7 @@ import ( "reflect" "strconv" "strings" + "sync" "github.com/TykTechnologies/tyk-pump/analytics" "github.com/mitchellh/mapstructure" @@ -17,8 +18,11 @@ import ( ) type ResurfacePump struct { - logger *logger.HttpLogger - config *ResurfacePumpConfig + logger *logger.HttpLogger + config *ResurfacePumpConfig + data chan []interface{} + wg sync.WaitGroup + enabled bool CommonPumpConfig } @@ -49,6 +53,7 @@ func (rp *ResurfacePump) GetEnvPrefix() string { } func (rp *ResurfacePump) Init(config interface{}) error { + rp.wg = sync.WaitGroup{} rp.config = &ResurfacePumpConfig{} rp.log = log.WithField("prefix", resurfacePrefix) @@ -74,10 +79,26 @@ func (rp *ResurfacePump) Init(config interface{}) error { rp.log.Info(rp.GetName() + " Initialized (Logger disabled)") return errors.New("logger is not enabled") } + rp.initWorker() rp.log.Info(rp.GetName() + " Initialized") return nil } +func (rp *ResurfacePump) initWorker() { + rp.data = make(chan []interface{}, 5) + rp.wg.Add(1) + go rp.writeData() + rp.enable() +} + +func (rp *ResurfacePump) disable() { + rp.enabled = false +} + +func (rp *ResurfacePump) enable() { + rp.enabled = true +} + func parseHeaders(headersString string, existingHeaders http.Header) (headers http.Header) { if existingHeaders != nil { headers = http.Header.Clone(existingHeaders) @@ -210,30 +231,70 @@ func mapRawData(rec *analytics.AnalyticsRecord) (httpReq http.Request, httpResp return } -func (rp *ResurfacePump) WriteData(ctx context.Context, data []interface{}) error { - rp.log.Debug("Writing ", len(data), " records") - - for _, v := range data { - decoded, ok := v.(analytics.AnalyticsRecord) - if !ok { - rp.log.Error("Error decoding analytic record") - continue - } - if len(decoded.RawRequest) == 0 && len(decoded.RawResponse) == 0 { - rp.log.Warn("Record dropped. Please enable Detailed Logging.") - continue +func (rp *ResurfacePump) writeData() { + defer rp.wg.Done() + for data := range rp.data { + for _, v := range data { + decoded, ok := v.(analytics.AnalyticsRecord) + if !ok { + rp.log.Error("Error decoding analytic record") + continue + } + if len(decoded.RawRequest) == 0 && len(decoded.RawResponse) == 0 { + rp.log.Warn("Record dropped. Please enable Detailed Logging.") + continue + } + + req, resp, customFields, err := mapRawData(&decoded) + if err != nil { + rp.log.Error(err) + continue + } + + logger.SendHttpMessage(rp.logger, &resp, &req, decoded.TimeStamp.Unix()*1000, decoded.RequestTime, customFields) } + rp.log.Info("Wrote ", len(data), " records...") + } +} - req, resp, customFields, err := mapRawData(&decoded) - if err != nil { - rp.log.Error(err) - continue +func (rp *ResurfacePump) WriteData(ctx context.Context, data []interface{}) error { + rp.log.Debug("Writing ", len(data), " records") + if rp.enabled { + rp.data <- data + rp.log.Info("Purged ", len(data), " records...") + } else { + select { + case peek, open := <-rp.data: + if open { + rp.data <- peek + close(rp.data) + } + default: + close(rp.data) } + } + return nil +} - logger.SendHttpMessage(rp.logger, &resp, &req, decoded.TimeStamp.Unix()*1000, decoded.RequestTime, customFields) +func (rp *ResurfacePump) Flush() error { + rp.disable() + err := rp.WriteData(context.TODO(), []interface{}{}) + if err != nil { + return err } + rp.wg.Wait() + rp.initWorker() - rp.log.Info("Purged ", len(data), " records...") + return nil +} + +func (rp *ResurfacePump) Shutdown() error { + rp.logger.Stop() + + err := rp.Flush() + if err != nil { + return err + } return nil } diff --git a/pumps/resurface_test.go b/pumps/resurface_test.go index 7e6f8e861..4e2b9ca74 100644 --- a/pumps/resurface_test.go +++ b/pumps/resurface_test.go @@ -121,7 +121,10 @@ func TestResurfaceWriteData(t *testing.T) { } err := pmp.WriteData(context.TODO(), recs) - assert.Nil(t, err, pmp.GetName()+"couldn't write records") + assert.Nil(t, err, pmp.GetName()+" couldn't write records") + + err = pmp.Flush() + assert.Nil(t, err, pmp.GetName()+" couldn't flush records") queue := pmp.logger.Queue() assert.Equal(t, len(recs), len(queue)) @@ -162,7 +165,10 @@ func TestResurfaceWriteData(t *testing.T) { TimeStamp: time.Now(), }, }) - assert.Nil(t, err, pmp.GetName()+"couldn't write records") + assert.Nil(t, err, pmp.GetName()+" couldn't write records") + + err = pmp.Flush() + assert.Nil(t, err, pmp.GetName()+" couldn't flush records") queue = pmp.logger.Queue() assert.Equal(t, len(recs)+1, len(queue)) @@ -231,7 +237,10 @@ func TestResurfaceWriteCustomFields(t *testing.T) { } err := pmp.WriteData(context.TODO(), recs) - assert.Nil(t, err, pmp.GetName()+"couldn't write records") + assert.Nil(t, err, pmp.GetName()+" couldn't write records") + + err = pmp.Flush() + assert.Nil(t, err, pmp.GetName()+" couldn't flush records") queue := pmp.logger.Queue() assert.Equal(t, len(recs), len(queue)) @@ -284,9 +293,10 @@ func TestResurfaceWriteChunkedResponse(t *testing.T) { } err := pmp.WriteData(context.TODO(), recs) - if err != nil { - t.Fatal(pmp.GetName()+"couldn't write records with err:", err) - } + assert.Nil(t, err, pmp.GetName()+" couldn't write records") + + err = pmp.Flush() + assert.Nil(t, err, pmp.GetName()+" couldn't flush records") queue := pmp.logger.Queue() assert.Equal(t, len(recs), len(queue)) @@ -319,7 +329,10 @@ func TestResurfaceSkipWrite(t *testing.T) { } err := pmp.WriteData(context.TODO(), recs) - assert.Nil(t, err, pmp.GetName()+"couldn't write records") + assert.Nil(t, err, pmp.GetName()+" couldn't write records") + + err = pmp.Flush() + assert.Nil(t, err, pmp.GetName()+" couldn't flush records") queue := pmp.logger.Queue() assert.Equal(t, 0, len(queue)) From 839113558af475b224edd76cddbabdf25ba64f49 Mon Sep 17 00:00:00 2001 From: Matias <83959431+mativm02@users.noreply.github.com> Date: Mon, 22 Jan 2024 13:28:55 -0300 Subject: [PATCH 4/6] linting --- pumps/resurface.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pumps/resurface.go b/pumps/resurface.go index 51d44dd80..041eb3920 100644 --- a/pumps/resurface.go +++ b/pumps/resurface.go @@ -257,7 +257,7 @@ func (rp *ResurfacePump) writeData() { } } -func (rp *ResurfacePump) WriteData(ctx context.Context, data []interface{}) error { +func (rp *ResurfacePump) WriteData(_ context.Context, data []interface{}) error { rp.log.Debug("Writing ", len(data), " records") if rp.enabled { rp.data <- data From f8c3765213f23b30a543f4eb5f723398d6247853 Mon Sep 17 00:00:00 2001 From: Matias <83959431+mativm02@users.noreply.github.com> Date: Wed, 24 Jan 2024 09:14:17 -0300 Subject: [PATCH 5/6] improving context handling --- pumps/resurface.go | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/pumps/resurface.go b/pumps/resurface.go index 041eb3920..8cfbf1e95 100644 --- a/pumps/resurface.go +++ b/pumps/resurface.go @@ -257,11 +257,16 @@ func (rp *ResurfacePump) writeData() { } } -func (rp *ResurfacePump) WriteData(_ context.Context, data []interface{}) error { +func (rp *ResurfacePump) WriteData(ctx context.Context, data []interface{}) error { rp.log.Debug("Writing ", len(data), " records") if rp.enabled { - rp.data <- data - rp.log.Info("Purged ", len(data), " records...") + select { + case rp.data <- data: + rp.log.Info("Purged ", len(data), " records...") + case <-ctx.Done(): + // Context has been cancelled or timed out + return ctx.Err() + } } else { select { case peek, open := <-rp.data: @@ -269,6 +274,10 @@ func (rp *ResurfacePump) WriteData(_ context.Context, data []interface{}) error rp.data <- peek close(rp.data) } + case <-ctx.Done(): + // Context has been cancelled or timed out + close(rp.data) + return ctx.Err() default: close(rp.data) } @@ -278,7 +287,7 @@ func (rp *ResurfacePump) WriteData(_ context.Context, data []interface{}) error func (rp *ResurfacePump) Flush() error { rp.disable() - err := rp.WriteData(context.TODO(), []interface{}{}) + err := rp.WriteData(context.Background(), []interface{}{}) if err != nil { return err } From 5cc6dac7d8eb14fa0b73b1ceacd8a626df9c738c Mon Sep 17 00:00:00 2001 From: Matias <83959431+mativm02@users.noreply.github.com> Date: Wed, 24 Jan 2024 09:15:36 -0300 Subject: [PATCH 6/6] linting --- pumps/resurface.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pumps/resurface.go b/pumps/resurface.go index 8cfbf1e95..fcf974e73 100644 --- a/pumps/resurface.go +++ b/pumps/resurface.go @@ -14,7 +14,7 @@ import ( "github.com/TykTechnologies/tyk-pump/analytics" "github.com/mitchellh/mapstructure" - "github.com/resurfaceio/logger-go/v3" + logger "github.com/resurfaceio/logger-go/v3" ) type ResurfacePump struct {