Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[TT-10520] Migrating from go-redis to storage library #765

Merged
merged 27 commits into from
Jan 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions analytics/aggregate_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package analytics

import (
"fmt"
"testing"
"time"

Expand Down Expand Up @@ -326,7 +325,6 @@ func TestAggregateGraphData_Dimension(t *testing.T) {
r.Len(aggregated, 1)
aggre := aggregated["test-api"]
dimensions := aggre.Dimensions()
fmt.Println(dimensions)
for d, values := range responsesCheck {
for _, v := range values {
found := false
Expand Down
8 changes: 4 additions & 4 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ type TykPumpConfiguration struct {
// Sets the analytics storage type. Where the pump will be fetching data from. Currently, only
// the `redis` option is supported.
AnalyticsStorageType string `json:"analytics_storage_type"`
// Example Redis storage configuration:
// Example Temporal storage configuration:
// ```{.json}
// "analytics_storage_config": {
// "type": "redis",
Expand All @@ -188,11 +188,11 @@ type TykPumpConfiguration struct {
// "optimisation_max_idle": 100,
// "optimisation_max_active": 0,
// "enable_cluster": false,
// "redis_use_ssl": false,
// "redis_ssl_insecure_skip_verify": false
// "use_ssl": false,
// "ssl_insecure_skip_verify": false
// },
// ```
AnalyticsStorageConfig storage.RedisStorageConfig `json:"analytics_storage_config"`
AnalyticsStorageConfig storage.TemporalStorageConfig `json:"analytics_storage_config"`
// Connection string for StatsD monitoring for information please see the
// [Instrumentation docs](https://tyk.io/docs/basic-config-and-security/report-monitor-trigger-events/instrumentation/).
StatsdConnectionString string `json:"statsd_connection_string"`
Expand Down
6 changes: 2 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,14 @@ require (
github.com/DataDog/datadog-go v4.7.0+incompatible
github.com/TykTechnologies/gorpc v0.0.0-20210624160652-fe65bda0ccb9
github.com/TykTechnologies/murmur3 v0.0.0-20230310161213-aad17efd5632
github.com/TykTechnologies/storage v1.0.8
github.com/TykTechnologies/storage v1.1.1
github.com/aws/aws-sdk-go-v2 v1.22.1
github.com/aws/aws-sdk-go-v2/config v1.9.0
github.com/aws/aws-sdk-go-v2/credentials v1.5.0
github.com/aws/aws-sdk-go-v2/service/sqs v1.26.0
github.com/aws/aws-sdk-go-v2/service/timestreamwrite v1.9.0
github.com/cenkalti/backoff/v4 v4.0.2
github.com/fatih/structs v1.1.0
github.com/go-redis/redis/v8 v8.3.1
github.com/gocraft/health v0.0.0-20170925182251-8675af27fef0
github.com/gofrs/uuid v4.0.0+incompatible
github.com/golang/protobuf v1.5.3
Expand Down Expand Up @@ -100,13 +99,13 @@ require (
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe // indirect
github.com/olivere/elastic v6.2.31+incompatible // indirect
github.com/onsi/ginkgo v1.16.4 // indirect
github.com/onsi/gomega v1.20.0 // indirect
github.com/pierrec/lz4 v2.6.0+incompatible // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_model v0.3.0 // indirect
github.com/prometheus/common v0.42.0 // indirect
github.com/prometheus/procfs v0.10.1 // indirect
github.com/redis/go-redis/v9 v9.3.1 // indirect
github.com/rogpeppe/go-internal v1.11.0 // indirect
github.com/segmentio/backo-go v0.0.0-20160424052352-204274ad699c // indirect
github.com/shirou/gopsutil v3.20.11+incompatible // indirect
Expand All @@ -120,7 +119,6 @@ require (
github.com/xtgo/uuid v0.0.0-20140804021211-a0b114877d4c // indirect
github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d // indirect
go.mongodb.org/mongo-driver v1.11.2 // indirect
go.opentelemetry.io/otel v0.13.0 // indirect
go.uber.org/atomic v1.9.0 // indirect
golang.org/x/crypto v0.14.0 // indirect
golang.org/x/sync v0.2.0 // indirect
Expand Down
44 changes: 15 additions & 29 deletions go.sum

Large diffs are not rendered by default.

111 changes: 77 additions & 34 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@ import (
kingpin "gopkg.in/alecthomas/kingpin.v2"
)

var SystemConfig TykPumpConfiguration
var AnalyticsStore storage.AnalyticsStorage
var UptimeStorage storage.AnalyticsStorage
var Pumps []pumps.Pump
var UptimePump pumps.UptimePump
var AnalyticsSerializers []serializer.AnalyticsSerializer
var (
SystemConfig TykPumpConfiguration
AnalyticsStore storage.AnalyticsStorage
UptimeStorage storage.AnalyticsStorage
Pumps []pumps.Pump
UptimePump pumps.UptimePump
AnalyticsSerializers []serializer.AnalyticsSerializer
)

var log = logger.GetLogger()

Expand Down Expand Up @@ -66,7 +68,7 @@ func Init() {
demoMode = &envDemo
}

//Serializer init
// Serializer init
AnalyticsSerializers = []serializer.AnalyticsSerializer{serializer.NewAnalyticsSerializer(serializer.MSGP_SERIALIZER), serializer.NewAnalyticsSerializer(serializer.PROTOBUF_SERIALIZER)}

log.WithFields(logrus.Fields{
Expand Down Expand Up @@ -96,36 +98,69 @@ func Init() {
if *debugMode {
log.Level = logrus.DebugLevel
}

}

func setupAnalyticsStore() {
switch SystemConfig.AnalyticsStorageType {
case "redis":
AnalyticsStore = &storage.RedisClusterStorageManager{}
UptimeStorage = &storage.RedisClusterStorageManager{}
default:
AnalyticsStore = &storage.RedisClusterStorageManager{}
UptimeStorage = &storage.RedisClusterStorageManager{}
}
case "redis", "":
var err error
AnalyticsStore, err = storage.NewTemporalStorageHandler(SystemConfig.AnalyticsStorageConfig, false)
if err != nil {
log.WithFields(logrus.Fields{
"prefix": mainPrefix,
}).Fatal("Error connecting to Temporal Storage: ", err)
}
err = AnalyticsStore.Init()
if err != nil {
log.WithFields(logrus.Fields{
"prefix": mainPrefix,
}).Fatal("Error connecting to Temporal Storage: ", err)
}

// Copy across the redis configuration
uptimeConf := SystemConfig.AnalyticsStorageConfig
// Swap key prefixes for uptime purger
uptimeConf.KeyPrefix = "host-checker:"

AnalyticsStore.Init(SystemConfig.AnalyticsStorageConfig)
UptimeStorage, err = storage.NewTemporalStorageHandler(uptimeConf, false)
if err != nil {
log.WithFields(logrus.Fields{
"prefix": mainPrefix,
}).Fatal("Error connecting to Temporal Storage: ", err)
}

// Copy across the redis configuration
uptimeConf := SystemConfig.AnalyticsStorageConfig
err = UptimeStorage.Init()
if err != nil {
log.WithFields(logrus.Fields{
"prefix": mainPrefix,
}).Fatal("Error connecting to Redis: ", err)
}

// Swap key prefixes for uptime purger
uptimeConf.RedisKeyPrefix = "host-checker:"
UptimeStorage.Init(uptimeConf)
default:
log.WithFields(logrus.Fields{
"prefix": mainPrefix,
}).Fatal("Invalid analytics storage type: ", SystemConfig.AnalyticsStorageType)
}
}

func storeVersion() {
var versionStore = &storage.RedisClusterStorageManager{}
versionConf := SystemConfig.AnalyticsStorageConfig
versionStore.KeyPrefix = "version-check-"
versionStore.Config = versionConf
versionStore.Connect()
err := versionStore.SetKey("pump", pumps.Version, 0)
versionConf := &SystemConfig.AnalyticsStorageConfig
versionConf.KeyPrefix = "version-check-"
versionStore, err := storage.NewTemporalStorageHandler(versionConf, false)
if err != nil {
log.WithFields(logrus.Fields{
"prefix": mainPrefix,
}).Fatal("Error connecting to Temporal Storage: ", err)
}

err = versionStore.Init()
if err != nil {
log.WithFields(logrus.Fields{
"prefix": mainPrefix,
}).Fatal("Error connecting to Temporal Storage: ", err)
}

err = versionStore.SetKey("pump", pumps.Version, 0)
if err != nil {
log.WithFields(logrus.Fields{
"prefix": mainPrefix,
Expand Down Expand Up @@ -177,7 +212,6 @@ func initialisePumps() {
if !SystemConfig.DontPurgeUptimeData {
initialiseUptimePump()
}

}

func initialiseUptimePump() {
Expand Down Expand Up @@ -210,15 +244,20 @@ func StartPurgeLoop(wg *sync.WaitGroup, ctx context.Context, secInterval int, ch
for i := -1; i < 10; i++ {
var analyticsKeyName string
if i == -1 {
//if it's the first iteration, we look for tyk-system-analytics to maintain backwards compatibility or if analytics_config.enable_multiple_analytics_keys is disabled in the gateway
// if it's the first iteration, we look for tyk-system-analytics to maintain backwards compatibility or if analytics_config.enable_multiple_analytics_keys is disabled in the gateway
analyticsKeyName = storage.ANALYTICS_KEYNAME
} else {
analyticsKeyName = fmt.Sprintf("%v_%v", storage.ANALYTICS_KEYNAME, i)
}

for _, serializerMethod := range AnalyticsSerializers {
analyticsKeyName += serializerMethod.GetSuffix()
AnalyticsValues := AnalyticsStore.GetAndDeleteSet(analyticsKeyName, chunkSize, expire)
AnalyticsValues, err := AnalyticsStore.GetAndDeleteSet(analyticsKeyName, chunkSize, expire)
if err != nil {
log.WithFields(logrus.Fields{
"prefix": mainPrefix,
}).Error("Error on Purge Loop. Is Temporal Storage down?: " + err.Error())
}
if len(AnalyticsValues) > 0 {
PreprocessAnalyticsValues(AnalyticsValues, serializerMethod, analyticsKeyName, omitDetails, job, startTime, secInterval)
}
Expand All @@ -229,7 +268,12 @@ func StartPurgeLoop(wg *sync.WaitGroup, ctx context.Context, secInterval int, ch
job.Timing("purge_time_all", time.Since(startTime).Nanoseconds())

if !SystemConfig.DontPurgeUptimeData {
UptimeValues := UptimeStorage.GetAndDeleteSet(storage.UptimeAnalytics_KEYNAME, chunkSize, expire)
UptimeValues, err := UptimeStorage.GetAndDeleteSet(storage.UptimeAnalytics_KEYNAME, chunkSize, expire)
if err != nil {
log.WithFields(logrus.Fields{
"prefix": mainPrefix,
}).Error("Error on Purge Loop. Is Temporal Storage down?: " + err.Error())
}
UptimePump.WriteUptimeData(UptimeValues)
}

Expand Down Expand Up @@ -305,7 +349,6 @@ func writeToPumps(keys []interface{}, job *health.Job, startTime time.Time, purg
}

func filterData(pump pumps.Pump, keys []interface{}) []interface{} {

shouldTrim := SystemConfig.MaxRecordSize != 0 || pump.GetMaxRecordSize() != 0
filters := pump.GetFilters()
ignoreFields := pump.GetIgnoreFields()
Expand Down Expand Up @@ -381,11 +424,11 @@ func execPumpWriting(wg *sync.WaitGroup, pmp pumps.Pump, keys *[]interface{}, pu
}).Debug("Writing to: ", pmp.GetName())

ch := make(chan error, 1)
//Load pump timeout
// Load pump timeout
timeout := pmp.GetTimeout()
var ctx context.Context
var cancel context.CancelFunc
//Initialize context depending if the pump has a configured timeout
// Initialize context depending if the pump has a configured timeout
if timeout > 0 {
ctx, cancel = context.WithTimeout(context.Background(), time.Duration(timeout)*time.Second)
} else {
Expand Down
3 changes: 2 additions & 1 deletion pumps/splunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ import (
"strings"

"github.com/TykTechnologies/tyk-pump/analytics"
retry "github.com/TykTechnologies/tyk-pump/http-retry"
"github.com/TykTechnologies/tyk-pump/retry"

"github.com/mitchellh/mapstructure"
)

Expand Down
4 changes: 0 additions & 4 deletions pumps/splunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package pumps
import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"net/http/httptest"
Expand Down Expand Up @@ -54,7 +53,6 @@ func (h *testHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
r.Body.Close()

if h.returnErrors >= h.reqCount {
fmt.Println("returning err.......")
w.WriteHeader(http.StatusInternalServerError)
_, err := w.Write([]byte("splunk internal error"))
if err != nil {
Expand Down Expand Up @@ -249,8 +247,6 @@ func Test_SplunkWriteDataBatch(t *testing.T) {
keys[1] = analytics.AnalyticsRecord{OrgID: "1", APIID: "123", Path: "/test-path", Method: "POST", TimeStamp: time.Now()}
keys[2] = analytics.AnalyticsRecord{OrgID: "1", APIID: "123", Path: "/test-path", Method: "POST", TimeStamp: time.Now()}

fmt.Println(maxContentLength)

pmp := SplunkPump{}

cfg := make(map[string]interface{})
Expand Down
2 changes: 1 addition & 1 deletion pumps/version.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package pumps

var (
Version = "v1.8.0"
Version = "v1.9.0"
BuiltBy string
Commit string
BuildDate string
Expand Down
2 changes: 1 addition & 1 deletion http-retry/http-retry.go → retry/http-retry.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package httpretry
package retry

import (
"bytes"
Expand Down
16 changes: 16 additions & 0 deletions retry/storage-retry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package retry

import (
"time"

"github.com/cenkalti/backoff/v4"
)

func GetTemporalStorageExponentialBackoff() *backoff.ExponentialBackOff {
exponentialBackoff := backoff.NewExponentialBackOff()
exponentialBackoff.Multiplier = 2
exponentialBackoff.MaxInterval = 10 * time.Second
exponentialBackoff.MaxElapsedTime = 0

return exponentialBackoff
}
15 changes: 0 additions & 15 deletions storage/init.go

This file was deleted.

Loading
Loading