diff --git a/go.mod b/go.mod index 9f4c59a0d..83b3e81a7 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( github.com/DataDog/datadog-go v4.7.0+incompatible github.com/TykTechnologies/gorpc v0.0.0-20210624160652-fe65bda0ccb9 github.com/TykTechnologies/murmur3 v0.0.0-20230310161213-aad17efd5632 - github.com/TykTechnologies/storage v1.0.8 + github.com/TykTechnologies/storage v1.0.11-0.20231229145029-6fde14b1aa20 github.com/aws/aws-sdk-go-v2 v1.16.14 github.com/aws/aws-sdk-go-v2/config v1.9.0 github.com/aws/aws-sdk-go-v2/service/timestreamwrite v1.9.0 @@ -105,6 +105,7 @@ require ( github.com/prometheus/client_model v0.3.0 // indirect github.com/prometheus/common v0.42.0 // indirect github.com/prometheus/procfs v0.10.1 // indirect + github.com/redis/go-redis/v9 v9.3.1 // indirect github.com/rogpeppe/go-internal v1.11.0 // indirect github.com/segmentio/backo-go v0.0.0-20160424052352-204274ad699c // indirect github.com/shirou/gopsutil v3.20.11+incompatible // indirect diff --git a/go.sum b/go.sum index 959530c88..adfcf50e5 100644 --- a/go.sum +++ b/go.sum @@ -41,8 +41,8 @@ github.com/TykTechnologies/gorpc v0.0.0-20210624160652-fe65bda0ccb9 h1:fbxHiuw/2 github.com/TykTechnologies/gorpc v0.0.0-20210624160652-fe65bda0ccb9/go.mod h1:v6v7Mlj08+EmEcXOfpuTxGt2qYU9yhqqtv4QF9Wf50E= github.com/TykTechnologies/murmur3 v0.0.0-20230310161213-aad17efd5632 h1:T5NWziFusj8au5nxAqMMh/bZyX9CAyYnBkaMSsfH6BA= github.com/TykTechnologies/murmur3 v0.0.0-20230310161213-aad17efd5632/go.mod h1:UsPYgOFBpNzDXLEti7MKOwHLpVSqdzuNGkVFPspQmnQ= -github.com/TykTechnologies/storage v1.0.8 h1:MBs6hk5oLOmr2qK5/rl+dYO6iDMez6u3QkwOCL6K8n8= -github.com/TykTechnologies/storage v1.0.8/go.mod h1:+0S3KuNlLGBTMTSFREuZFm315zzXjuuCO4QSAPy+d3M= +github.com/TykTechnologies/storage v1.0.11-0.20231229145029-6fde14b1aa20 h1:puEgTIpj2A6c5z7TqIQB3bly2ielrhE4ubxOZTUND2o= +github.com/TykTechnologies/storage v1.0.11-0.20231229145029-6fde14b1aa20/go.mod h1:zcANqpIsDL/l/1zsMMERmjBeJYpER9XMi/dw2Gqa7m4= github.com/ajstarks/svgo v0.0.0-20180226025133-644b8db467af/go.mod h1:K08gAheRH3/J6wwsYMMT4xOr94bZjxIelGM0+d/wbFw= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafoB+tBA3gMyHYHrpOtNuDiK/uB5uXxq5wM= @@ -96,6 +96,8 @@ github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6r github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4YnC6+E63dPcxHo2sUxDIu8g3QgEJdRY= github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4= github.com/bmizerany/pat v0.0.0-20170815010413-6226ea591a40/go.mod h1:8rLXio+WjiTceGBHIoTvn60HIbs7Hm7bcHjyrSqYB9c= +github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= +github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= github.com/c-bata/go-prompt v0.2.2/go.mod h1:VzqtzE2ksDBcdln8G7mk2RX9QyGjH+OVqOCSiVIqS34= github.com/cenkalti/backoff/v4 v4.0.2 h1:JIufpQLbh4DkbQoii76ItQIUFzevQSqOLZca4eamEDs= github.com/cenkalti/backoff/v4 v4.0.2/go.mod h1:eEew/i+1Q6OrCDZh3WiXYv3+nJwBASZ8Bog/87DQnVg= @@ -451,6 +453,8 @@ github.com/prometheus/procfs v0.10.1 h1:kYK1Va/YMlutzCGazswoHKo//tZVlFpKYh+Pymzi github.com/prometheus/procfs v0.10.1/go.mod h1:nwNm2aOCAYw8uTR/9bWRREkZFxAUcWzPHWJq+XBB/FM= github.com/quipo/statsd v0.0.0-20160923160612-75b7afedf0d2 h1:IvjiJDGCF8L8TjKHQKmLAjWztpKDCAaRifiRMdGzWk0= github.com/quipo/statsd v0.0.0-20160923160612-75b7afedf0d2/go.mod h1:1COUodqytMiv/GkAVUGhc0CA6e8xak5U4551TY7iEe0= +github.com/redis/go-redis/v9 v9.3.1 h1:KqdY8U+3X6z+iACvumCNxnoluToB+9Me+TvyFa21Mds= +github.com/redis/go-redis/v9 v9.3.1/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M= github.com/resurfaceio/logger-go/v3 v3.2.1 h1:tTPvGp+FpH35aaT/nnhP4n/Rh/f1vHe64WoXTDgv0fY= github.com/resurfaceio/logger-go/v3 v3.2.1/go.mod h1:YPcxFUcloW37F1WQA9MUcGWu2JzlvBxlCfFF5+T3GO8= github.com/retailnext/hllpp v1.0.1-0.20180308014038-101a6d2f8b52/go.mod h1:RDpi1RftBQPUCDRw6SmxeaREsAaRKnOclghuzp/WRzc= diff --git a/storage/redis.go b/storage/redis.go index 70accc0b9..1a39b9bea 100644 --- a/storage/redis.go +++ b/storage/redis.go @@ -2,12 +2,16 @@ package storage import ( "context" - "crypto/tls" + "fmt" "strconv" "strings" "time" - "github.com/go-redis/redis/v8" + "github.com/TykTechnologies/storage/temporal/connector" + keyvalue "github.com/TykTechnologies/storage/temporal/keyvalue" + "github.com/TykTechnologies/storage/temporal/list" + "github.com/TykTechnologies/storage/temporal/model" + "github.com/sirupsen/logrus" "github.com/kelseyhightower/envconfig" @@ -16,11 +20,16 @@ import ( // ------------------- REDIS CLUSTER STORAGE MANAGER ------------------------------- -var redisClusterSingleton redis.UniversalClient +var redisClusterSingleton *redisManager var redisLogPrefix = "redis" var ENV_REDIS_PREFIX = "TYK_PMP_REDIS" var ctx = context.Background() +type redisManager struct { + list list.List + kv keyvalue.KeyValue +} + type EnvMapString map[string]string func (e *EnvMapString) Decode(value string) error { @@ -78,13 +87,13 @@ type RedisStorageConfig struct { // RedisClusterStorageManager is a storage manager that uses the redis database. type RedisClusterStorageManager struct { - db redis.UniversalClient + db *redisManager KeyPrefix string HashKeys bool Config RedisStorageConfig } -func NewRedisClusterPool(forceReconnect bool, config RedisStorageConfig) redis.UniversalClient { +func NewRedisClusterPool(forceReconnect bool, config RedisStorageConfig) *redisManager { if !forceReconnect { if redisClusterSingleton != nil { log.WithFields(logrus.Fields{ @@ -94,7 +103,7 @@ func NewRedisClusterPool(forceReconnect bool, config RedisStorageConfig) redis.U } } else { if redisClusterSingleton != nil { - redisClusterSingleton.Close() + // redisClusterSingleton.Close() } } @@ -107,49 +116,52 @@ func NewRedisClusterPool(forceReconnect bool, config RedisStorageConfig) redis.U maxActive = config.MaxActive } - timeout := 5 * time.Second + timeout := 5 if config.Timeout > 0 { - timeout = time.Duration(config.Timeout) * time.Second - } - - var tlsConfig *tls.Config - if config.RedisUseSSL { - tlsConfig = &tls.Config{ - InsecureSkipVerify: config.RedisSSLInsecureSkipVerify, - } + timeout = config.Timeout } - var client redis.UniversalClient - opts := &redis.UniversalOptions{ + opts := &model.RedisOptions{ MasterName: config.MasterName, SentinelPassword: config.SentinelPassword, Addrs: getRedisAddrs(config), - DB: config.Database, + Database: config.Database, Username: config.Username, Password: config.Password, - PoolSize: maxActive, - IdleTimeout: 240 * time.Second, - ReadTimeout: timeout, - WriteTimeout: timeout, - DialTimeout: timeout, - TLSConfig: tlsConfig, + MaxActive: maxActive, + Timeout: timeout, + EnableCluster: config.EnableCluster, } - if opts.MasterName != "" { - log.Info("--> [REDIS] Creating sentinel-backed failover client") - client = redis.NewFailoverClient(opts.Failover()) - } else if config.EnableCluster { - log.Info("--> [REDIS] Creating cluster client") - client = redis.NewClusterClient(opts.Cluster()) - } else { - log.Info("--> [REDIS] Creating single-node client") - client = redis.NewClient(opts.Simple()) + tlsOptions := &model.TLS{ + Enable: config.RedisUseSSL, + InsecureSkipVerify: config.RedisSSLInsecureSkipVerify, + } + + conn, err := connector.NewConnector(model.RedisV9Type, model.WithRedisConfig(opts), model.WithTLS(tlsOptions)) + if err != nil { + log.WithFields(logrus.Fields{"prefix": redisLogPrefix}).Error(err) + return nil } - redisClusterSingleton = client + kv, err := keyvalue.NewKeyValue(conn) + if err != nil { + log.WithFields(logrus.Fields{"prefix": redisLogPrefix}).Error(err) + return nil + } - return client + l, err := list.NewList(conn) + if err != nil { + log.WithFields(logrus.Fields{"prefix": redisLogPrefix}).Error(err) + return nil + } + + redisClusterSingleton = &redisManager{} + redisClusterSingleton.kv = kv + redisClusterSingleton.list = l + + return redisClusterSingleton } func getRedisAddrs(config RedisStorageConfig) (addrs []string) { @@ -252,30 +264,28 @@ func (r *RedisClusterStorageManager) GetAndDeleteSet(keyName string, chunkSize i "prefix": redisLogPrefix, }).Debug("Fixed keyname is: ", fixedKey) - var lrange *redis.StringSliceCmd - _, err := r.db.TxPipelined(ctx, func(pipe redis.Pipeliner) error { - lrange = pipe.LRange(ctx, fixedKey, 0, chunkSize-1) - - if chunkSize == 0 { - pipe.Del(ctx, fixedKey) - } else { - pipe.LTrim(ctx, fixedKey, chunkSize, -1) - - // extend expiry after successful LTRIM - pipe.Expire(ctx, fixedKey, expire) - } + vals, err := r.db.list.Pop(ctx, fixedKey, chunkSize-1) + if err != nil { + fmt.Println("FAILED 1") + log.WithFields(logrus.Fields{ + "prefix": redisLogPrefix, + }).Error("Multi command failed: ", err) + r.Connect() return nil - }) + } + fmt.Println("vals:", vals) + + err = r.db.kv.Expire(ctx, fixedKey, expire) if err != nil { + fmt.Println("FAILED 2") log.WithFields(logrus.Fields{ "prefix": redisLogPrefix, }).Error("Multi command failed: ", err) r.Connect() + return nil } - vals := lrange.Val() - result := make([]interface{}, len(vals)) for i, v := range vals { result[i] = v @@ -294,7 +304,7 @@ func (r *RedisClusterStorageManager) SetKey(keyName, session string, timeout int log.Debug("[STORE] Setting key: ", r.fixKey(keyName)) r.ensureConnection() - err := r.db.Set(ctx, r.fixKey(keyName), session, 0).Err() + err := r.db.kv.Set(ctx, r.fixKey(keyName), session, 0) if timeout > 0 { if err := r.SetExp(keyName, timeout); err != nil { return err @@ -308,7 +318,7 @@ func (r *RedisClusterStorageManager) SetKey(keyName, session string, timeout int } func (r *RedisClusterStorageManager) SetExp(keyName string, timeout int64) error { - err := r.db.Expire(ctx, r.fixKey(keyName), time.Duration(timeout)*time.Second).Err() + err := r.db.kv.Expire(ctx, r.fixKey(keyName), time.Duration(timeout)*time.Second) if err != nil { log.Error("Could not EXPIRE key: ", err) } diff --git a/storage/redis_test.go b/storage/redis_test.go index 13bffa07b..dfa36d392 100644 --- a/storage/redis_test.go +++ b/storage/redis_test.go @@ -71,30 +71,30 @@ var testData = []struct { in []string chunk int64 }{ - {in: nil, chunk: int64(0)}, - {in: []string{"one"}, chunk: int64(0)}, - {in: []string{"one", "two"}, chunk: int64(0)}, - {in: []string{"one", "two", "three"}, chunk: int64(0)}, - {in: []string{"one", "two", "three", "four"}, chunk: int64(0)}, - {in: []string{"one", "two", "three", "four", "five"}, chunk: int64(0)}, - {in: nil, chunk: int64(1)}, - {in: []string{"one"}, chunk: int64(1)}, - {in: []string{"one", "two"}, chunk: int64(1)}, - {in: []string{"one", "two", "three"}, chunk: int64(1)}, - {in: []string{"one", "two", "three", "four"}, chunk: int64(1)}, - {in: []string{"one", "two", "three", "four", "five"}, chunk: int64(1)}, - {in: nil, chunk: int64(2)}, - {in: []string{"one"}, chunk: int64(2)}, + // {in: nil, chunk: int64(0)}, + // {in: []string{"one"}, chunk: int64(0)}, + // {in: []string{"one", "two"}, chunk: int64(0)}, + // {in: []string{"one", "two", "three"}, chunk: int64(0)}, + // {in: []string{"one", "two", "three", "four"}, chunk: int64(0)}, + // {in: []string{"one", "two", "three", "four", "five"}, chunk: int64(0)}, + // {in: nil, chunk: int64(1)}, + // {in: []string{"one"}, chunk: int64(1)}, + // {in: []string{"one", "two"}, chunk: int64(1)}, + // {in: []string{"one", "two", "three"}, chunk: int64(1)}, + // {in: []string{"one", "two", "three", "four"}, chunk: int64(1)}, + // {in: []string{"one", "two", "three", "four", "five"}, chunk: int64(1)}, + // {in: nil, chunk: int64(2)}, + // {in: []string{"one"}, chunk: int64(2)}, {in: []string{"one", "two"}, chunk: int64(2)}, - {in: []string{"one", "two", "three"}, chunk: int64(2)}, - {in: []string{"one", "two", "three", "four"}, chunk: int64(2)}, - {in: []string{"one", "two", "three", "four", "five"}, chunk: int64(2)}, - {in: nil, chunk: int64(3)}, - {in: []string{"one"}, chunk: int64(3)}, - {in: []string{"one", "two"}, chunk: int64(3)}, - {in: []string{"one", "two", "three"}, chunk: int64(3)}, - {in: []string{"one", "two", "three", "four"}, chunk: int64(3)}, - {in: []string{"one", "two", "three", "four", "five"}, chunk: int64(3)}, + // {in: []string{"one", "two", "three"}, chunk: int64(2)}, + // {in: []string{"one", "two", "three", "four"}, chunk: int64(2)}, + // {in: []string{"one", "two", "three", "four", "five"}, chunk: int64(2)}, + // {in: nil, chunk: int64(3)}, + // {in: []string{"one"}, chunk: int64(3)}, + // {in: []string{"one", "two"}, chunk: int64(3)}, + // {in: []string{"one", "two", "three"}, chunk: int64(3)}, + // {in: []string{"one", "two", "three", "four"}, chunk: int64(3)}, + // {in: []string{"one", "two", "three", "four", "five"}, chunk: int64(3)}, } func TestRedisClusterStorageManager_GetAndDeleteSet(t *testing.T) { @@ -107,13 +107,29 @@ func TestRedisClusterStorageManager_GetAndDeleteSet(t *testing.T) { t.Fatal("unable to connect", err.Error()) } + connected := r.Connect() + if !connected { + t.Fatal("failed to connect") + } + + if r.db == nil { + t.Fatal("db is empty") + } + mockKeyName := "testanalytics" for _, tt := range testData { t.Run(fmt.Sprintf("in: %v", tt), func(t *testing.T) { ctx := context.Background() if tt.in != nil { - r.db.RPush(ctx, r.fixKey(mockKeyName), tt.in) + in := [][]byte{} + for _, v := range tt.in { + in = append(in, []byte(v)) + } + err := r.db.list.Append(ctx, false, r.fixKey(mockKeyName), in...) + if err != nil { + t.Fatal(err) + } } iterations := 1 @@ -129,12 +145,12 @@ func TestRedisClusterStorageManager_GetAndDeleteSet(t *testing.T) { count := 0 for i := 0; i < iterations; i++ { res := r.GetAndDeleteSet(mockKeyName, tt.chunk, 60*time.Second) - count += len(res) t.Logf("---> %d: %v", i, res) } if count != len(tt.in) { + fmt.Println("count:", count, "(tt.in):", tt.in) t.Fatal() } })