Skip to content

Commit

Permalink
migrating from go-redis to storage library
Browse files Browse the repository at this point in the history
  • Loading branch information
mativm02 committed Jan 8, 2024
1 parent 35e1e74 commit 547aaa8
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 80 deletions.
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
github.com/DataDog/datadog-go v4.7.0+incompatible
github.com/TykTechnologies/gorpc v0.0.0-20210624160652-fe65bda0ccb9
github.com/TykTechnologies/murmur3 v0.0.0-20230310161213-aad17efd5632
github.com/TykTechnologies/storage v1.0.8
github.com/TykTechnologies/storage v1.0.11-0.20231229145029-6fde14b1aa20
github.com/aws/aws-sdk-go-v2 v1.16.14
github.com/aws/aws-sdk-go-v2/config v1.9.0
github.com/aws/aws-sdk-go-v2/service/timestreamwrite v1.9.0
Expand Down Expand Up @@ -105,6 +105,7 @@ require (
github.com/prometheus/client_model v0.3.0 // indirect
github.com/prometheus/common v0.42.0 // indirect
github.com/prometheus/procfs v0.10.1 // indirect
github.com/redis/go-redis/v9 v9.3.1 // indirect
github.com/rogpeppe/go-internal v1.11.0 // indirect
github.com/segmentio/backo-go v0.0.0-20160424052352-204274ad699c // indirect
github.com/shirou/gopsutil v3.20.11+incompatible // indirect
Expand Down
8 changes: 6 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ github.com/TykTechnologies/gorpc v0.0.0-20210624160652-fe65bda0ccb9 h1:fbxHiuw/2
github.com/TykTechnologies/gorpc v0.0.0-20210624160652-fe65bda0ccb9/go.mod h1:v6v7Mlj08+EmEcXOfpuTxGt2qYU9yhqqtv4QF9Wf50E=
github.com/TykTechnologies/murmur3 v0.0.0-20230310161213-aad17efd5632 h1:T5NWziFusj8au5nxAqMMh/bZyX9CAyYnBkaMSsfH6BA=
github.com/TykTechnologies/murmur3 v0.0.0-20230310161213-aad17efd5632/go.mod h1:UsPYgOFBpNzDXLEti7MKOwHLpVSqdzuNGkVFPspQmnQ=
github.com/TykTechnologies/storage v1.0.8 h1:MBs6hk5oLOmr2qK5/rl+dYO6iDMez6u3QkwOCL6K8n8=
github.com/TykTechnologies/storage v1.0.8/go.mod h1:+0S3KuNlLGBTMTSFREuZFm315zzXjuuCO4QSAPy+d3M=
github.com/TykTechnologies/storage v1.0.11-0.20231229145029-6fde14b1aa20 h1:puEgTIpj2A6c5z7TqIQB3bly2ielrhE4ubxOZTUND2o=
github.com/TykTechnologies/storage v1.0.11-0.20231229145029-6fde14b1aa20/go.mod h1:zcANqpIsDL/l/1zsMMERmjBeJYpER9XMi/dw2Gqa7m4=
github.com/ajstarks/svgo v0.0.0-20180226025133-644b8db467af/go.mod h1:K08gAheRH3/J6wwsYMMT4xOr94bZjxIelGM0+d/wbFw=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafoB+tBA3gMyHYHrpOtNuDiK/uB5uXxq5wM=
Expand Down Expand Up @@ -96,6 +96,8 @@ github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6r
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4YnC6+E63dPcxHo2sUxDIu8g3QgEJdRY=
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4=
github.com/bmizerany/pat v0.0.0-20170815010413-6226ea591a40/go.mod h1:8rLXio+WjiTceGBHIoTvn60HIbs7Hm7bcHjyrSqYB9c=
github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
github.com/c-bata/go-prompt v0.2.2/go.mod h1:VzqtzE2ksDBcdln8G7mk2RX9QyGjH+OVqOCSiVIqS34=
github.com/cenkalti/backoff/v4 v4.0.2 h1:JIufpQLbh4DkbQoii76ItQIUFzevQSqOLZca4eamEDs=
github.com/cenkalti/backoff/v4 v4.0.2/go.mod h1:eEew/i+1Q6OrCDZh3WiXYv3+nJwBASZ8Bog/87DQnVg=
Expand Down Expand Up @@ -451,6 +453,8 @@ github.com/prometheus/procfs v0.10.1 h1:kYK1Va/YMlutzCGazswoHKo//tZVlFpKYh+Pymzi
github.com/prometheus/procfs v0.10.1/go.mod h1:nwNm2aOCAYw8uTR/9bWRREkZFxAUcWzPHWJq+XBB/FM=
github.com/quipo/statsd v0.0.0-20160923160612-75b7afedf0d2 h1:IvjiJDGCF8L8TjKHQKmLAjWztpKDCAaRifiRMdGzWk0=
github.com/quipo/statsd v0.0.0-20160923160612-75b7afedf0d2/go.mod h1:1COUodqytMiv/GkAVUGhc0CA6e8xak5U4551TY7iEe0=
github.com/redis/go-redis/v9 v9.3.1 h1:KqdY8U+3X6z+iACvumCNxnoluToB+9Me+TvyFa21Mds=
github.com/redis/go-redis/v9 v9.3.1/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M=
github.com/resurfaceio/logger-go/v3 v3.2.1 h1:tTPvGp+FpH35aaT/nnhP4n/Rh/f1vHe64WoXTDgv0fY=
github.com/resurfaceio/logger-go/v3 v3.2.1/go.mod h1:YPcxFUcloW37F1WQA9MUcGWu2JzlvBxlCfFF5+T3GO8=
github.com/retailnext/hllpp v1.0.1-0.20180308014038-101a6d2f8b52/go.mod h1:RDpi1RftBQPUCDRw6SmxeaREsAaRKnOclghuzp/WRzc=
Expand Down
114 changes: 62 additions & 52 deletions storage/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,16 @@ package storage

import (
"context"
"crypto/tls"
"fmt"
"strconv"
"strings"
"time"

"github.com/go-redis/redis/v8"
"github.com/TykTechnologies/storage/temporal/connector"
keyvalue "github.com/TykTechnologies/storage/temporal/keyvalue"
"github.com/TykTechnologies/storage/temporal/list"
"github.com/TykTechnologies/storage/temporal/model"

"github.com/sirupsen/logrus"

"github.com/kelseyhightower/envconfig"
Expand All @@ -16,11 +20,16 @@ import (

// ------------------- REDIS CLUSTER STORAGE MANAGER -------------------------------

var redisClusterSingleton redis.UniversalClient
var redisClusterSingleton *redisManager
var redisLogPrefix = "redis"
var ENV_REDIS_PREFIX = "TYK_PMP_REDIS"
var ctx = context.Background()

type redisManager struct {
list list.List
kv keyvalue.KeyValue
}

type EnvMapString map[string]string

func (e *EnvMapString) Decode(value string) error {
Expand Down Expand Up @@ -78,13 +87,13 @@ type RedisStorageConfig struct {

// RedisClusterStorageManager is a storage manager that uses the redis database.
type RedisClusterStorageManager struct {
db redis.UniversalClient
db *redisManager
KeyPrefix string
HashKeys bool
Config RedisStorageConfig
}

func NewRedisClusterPool(forceReconnect bool, config RedisStorageConfig) redis.UniversalClient {
func NewRedisClusterPool(forceReconnect bool, config RedisStorageConfig) *redisManager {
if !forceReconnect {
if redisClusterSingleton != nil {
log.WithFields(logrus.Fields{
Expand All @@ -94,7 +103,7 @@ func NewRedisClusterPool(forceReconnect bool, config RedisStorageConfig) redis.U
}
} else {
if redisClusterSingleton != nil {
redisClusterSingleton.Close()
// redisClusterSingleton.Close()
}
}

Expand All @@ -107,49 +116,52 @@ func NewRedisClusterPool(forceReconnect bool, config RedisStorageConfig) redis.U
maxActive = config.MaxActive
}

timeout := 5 * time.Second
timeout := 5

if config.Timeout > 0 {
timeout = time.Duration(config.Timeout) * time.Second
}

var tlsConfig *tls.Config
if config.RedisUseSSL {
tlsConfig = &tls.Config{
InsecureSkipVerify: config.RedisSSLInsecureSkipVerify,
}
timeout = config.Timeout
}

var client redis.UniversalClient
opts := &redis.UniversalOptions{
opts := &model.RedisOptions{
MasterName: config.MasterName,
SentinelPassword: config.SentinelPassword,
Addrs: getRedisAddrs(config),
DB: config.Database,
Database: config.Database,
Username: config.Username,
Password: config.Password,
PoolSize: maxActive,
IdleTimeout: 240 * time.Second,
ReadTimeout: timeout,
WriteTimeout: timeout,
DialTimeout: timeout,
TLSConfig: tlsConfig,
MaxActive: maxActive,
Timeout: timeout,
EnableCluster: config.EnableCluster,
}

if opts.MasterName != "" {
log.Info("--> [REDIS] Creating sentinel-backed failover client")
client = redis.NewFailoverClient(opts.Failover())
} else if config.EnableCluster {
log.Info("--> [REDIS] Creating cluster client")
client = redis.NewClusterClient(opts.Cluster())
} else {
log.Info("--> [REDIS] Creating single-node client")
client = redis.NewClient(opts.Simple())
tlsOptions := &model.TLS{
Enable: config.RedisUseSSL,
InsecureSkipVerify: config.RedisSSLInsecureSkipVerify,
}

conn, err := connector.NewConnector(model.RedisV9Type, model.WithRedisConfig(opts), model.WithTLS(tlsOptions))
if err != nil {
log.WithFields(logrus.Fields{"prefix": redisLogPrefix}).Error(err)
return nil
}

redisClusterSingleton = client
kv, err := keyvalue.NewKeyValue(conn)
if err != nil {
log.WithFields(logrus.Fields{"prefix": redisLogPrefix}).Error(err)
return nil
}

return client
l, err := list.NewList(conn)
if err != nil {
log.WithFields(logrus.Fields{"prefix": redisLogPrefix}).Error(err)
return nil
}

redisClusterSingleton = &redisManager{}
redisClusterSingleton.kv = kv
redisClusterSingleton.list = l

return redisClusterSingleton
}

func getRedisAddrs(config RedisStorageConfig) (addrs []string) {
Expand Down Expand Up @@ -252,30 +264,28 @@ func (r *RedisClusterStorageManager) GetAndDeleteSet(keyName string, chunkSize i
"prefix": redisLogPrefix,
}).Debug("Fixed keyname is: ", fixedKey)

var lrange *redis.StringSliceCmd
_, err := r.db.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
lrange = pipe.LRange(ctx, fixedKey, 0, chunkSize-1)

if chunkSize == 0 {
pipe.Del(ctx, fixedKey)
} else {
pipe.LTrim(ctx, fixedKey, chunkSize, -1)

// extend expiry after successful LTRIM
pipe.Expire(ctx, fixedKey, expire)
}
vals, err := r.db.list.Pop(ctx, fixedKey, chunkSize-1)
if err != nil {
fmt.Println("FAILED 1")
log.WithFields(logrus.Fields{
"prefix": redisLogPrefix,
}).Error("Multi command failed: ", err)
r.Connect()
return nil
})
}

fmt.Println("vals:", vals)

err = r.db.kv.Expire(ctx, fixedKey, expire)
if err != nil {
fmt.Println("FAILED 2")
log.WithFields(logrus.Fields{
"prefix": redisLogPrefix,
}).Error("Multi command failed: ", err)
r.Connect()
return nil
}

vals := lrange.Val()

result := make([]interface{}, len(vals))
for i, v := range vals {
result[i] = v
Expand All @@ -294,7 +304,7 @@ func (r *RedisClusterStorageManager) SetKey(keyName, session string, timeout int
log.Debug("[STORE] Setting key: ", r.fixKey(keyName))

r.ensureConnection()
err := r.db.Set(ctx, r.fixKey(keyName), session, 0).Err()
err := r.db.kv.Set(ctx, r.fixKey(keyName), session, 0)
if timeout > 0 {
if err := r.SetExp(keyName, timeout); err != nil {
return err
Expand All @@ -308,7 +318,7 @@ func (r *RedisClusterStorageManager) SetKey(keyName, session string, timeout int
}

func (r *RedisClusterStorageManager) SetExp(keyName string, timeout int64) error {
err := r.db.Expire(ctx, r.fixKey(keyName), time.Duration(timeout)*time.Second).Err()
err := r.db.kv.Expire(ctx, r.fixKey(keyName), time.Duration(timeout)*time.Second)
if err != nil {
log.Error("Could not EXPIRE key: ", err)
}
Expand Down
66 changes: 41 additions & 25 deletions storage/redis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,30 +71,30 @@ var testData = []struct {
in []string
chunk int64
}{
{in: nil, chunk: int64(0)},
{in: []string{"one"}, chunk: int64(0)},
{in: []string{"one", "two"}, chunk: int64(0)},
{in: []string{"one", "two", "three"}, chunk: int64(0)},
{in: []string{"one", "two", "three", "four"}, chunk: int64(0)},
{in: []string{"one", "two", "three", "four", "five"}, chunk: int64(0)},
{in: nil, chunk: int64(1)},
{in: []string{"one"}, chunk: int64(1)},
{in: []string{"one", "two"}, chunk: int64(1)},
{in: []string{"one", "two", "three"}, chunk: int64(1)},
{in: []string{"one", "two", "three", "four"}, chunk: int64(1)},
{in: []string{"one", "two", "three", "four", "five"}, chunk: int64(1)},
{in: nil, chunk: int64(2)},
{in: []string{"one"}, chunk: int64(2)},
// {in: nil, chunk: int64(0)},
// {in: []string{"one"}, chunk: int64(0)},
// {in: []string{"one", "two"}, chunk: int64(0)},
// {in: []string{"one", "two", "three"}, chunk: int64(0)},
// {in: []string{"one", "two", "three", "four"}, chunk: int64(0)},
// {in: []string{"one", "two", "three", "four", "five"}, chunk: int64(0)},
// {in: nil, chunk: int64(1)},
// {in: []string{"one"}, chunk: int64(1)},
// {in: []string{"one", "two"}, chunk: int64(1)},
// {in: []string{"one", "two", "three"}, chunk: int64(1)},
// {in: []string{"one", "two", "three", "four"}, chunk: int64(1)},
// {in: []string{"one", "two", "three", "four", "five"}, chunk: int64(1)},
// {in: nil, chunk: int64(2)},
// {in: []string{"one"}, chunk: int64(2)},
{in: []string{"one", "two"}, chunk: int64(2)},
{in: []string{"one", "two", "three"}, chunk: int64(2)},
{in: []string{"one", "two", "three", "four"}, chunk: int64(2)},
{in: []string{"one", "two", "three", "four", "five"}, chunk: int64(2)},
{in: nil, chunk: int64(3)},
{in: []string{"one"}, chunk: int64(3)},
{in: []string{"one", "two"}, chunk: int64(3)},
{in: []string{"one", "two", "three"}, chunk: int64(3)},
{in: []string{"one", "two", "three", "four"}, chunk: int64(3)},
{in: []string{"one", "two", "three", "four", "five"}, chunk: int64(3)},
// {in: []string{"one", "two", "three"}, chunk: int64(2)},
// {in: []string{"one", "two", "three", "four"}, chunk: int64(2)},
// {in: []string{"one", "two", "three", "four", "five"}, chunk: int64(2)},
// {in: nil, chunk: int64(3)},
// {in: []string{"one"}, chunk: int64(3)},
// {in: []string{"one", "two"}, chunk: int64(3)},
// {in: []string{"one", "two", "three"}, chunk: int64(3)},
// {in: []string{"one", "two", "three", "four"}, chunk: int64(3)},
// {in: []string{"one", "two", "three", "four", "five"}, chunk: int64(3)},
}

func TestRedisClusterStorageManager_GetAndDeleteSet(t *testing.T) {
Expand All @@ -107,13 +107,29 @@ func TestRedisClusterStorageManager_GetAndDeleteSet(t *testing.T) {
t.Fatal("unable to connect", err.Error())
}

connected := r.Connect()
if !connected {
t.Fatal("failed to connect")
}

if r.db == nil {
t.Fatal("db is empty")
}

mockKeyName := "testanalytics"

for _, tt := range testData {
t.Run(fmt.Sprintf("in: %v", tt), func(t *testing.T) {
ctx := context.Background()
if tt.in != nil {
r.db.RPush(ctx, r.fixKey(mockKeyName), tt.in)
in := [][]byte{}
for _, v := range tt.in {
in = append(in, []byte(v))
}
err := r.db.list.Append(ctx, false, r.fixKey(mockKeyName), in...)
if err != nil {
t.Fatal(err)
}
}

iterations := 1
Expand All @@ -129,12 +145,12 @@ func TestRedisClusterStorageManager_GetAndDeleteSet(t *testing.T) {
count := 0
for i := 0; i < iterations; i++ {
res := r.GetAndDeleteSet(mockKeyName, tt.chunk, 60*time.Second)

count += len(res)
t.Logf("---> %d: %v", i, res)
}

if count != len(tt.in) {
fmt.Println("count:", count, "(tt.in):", tt.in)
t.Fatal()
}
})
Expand Down

0 comments on commit 547aaa8

Please sign in to comment.