diff --git a/cache/constants/constants.go b/cache/constants/constants.go index bf5631d..df328af 100644 --- a/cache/constants/constants.go +++ b/cache/constants/constants.go @@ -53,4 +53,8 @@ func (c ConfigerType) String() string { } } -const DefaultRouteGroup = "default" +const ( + DefaultRouteGroup = "default" + + DefaultRedisWrapper = "" +) diff --git a/cache/redis/instance.go b/cache/redis/instance.go index 04e0ae4..fd9d1a5 100644 --- a/cache/redis/instance.go +++ b/cache/redis/instance.go @@ -8,6 +8,7 @@ import ( "context" "fmt" "runtime" + "strconv" "strings" "sync" @@ -25,22 +26,27 @@ type InstanceConf struct { Group string Namespace string Wrapper string + NoFixKey bool } func (m *InstanceConf) String() string { - return fmt.Sprintf("group:%s namespace:%s wrapper:%s", m.Group, m.Namespace, m.Wrapper) + return fmt.Sprintf("group:%s namespace:%s wrapper:%s no_fix:%v", m.Group, m.Namespace, m.Wrapper, m.NoFixKey) } func instanceConfFromString(s string) (conf *InstanceConf, err error) { items := strings.Split(s, keySep) - if len(items) != 3 { + if len(items) != 4 { return nil, fmt.Errorf("invalid instance conf string:%s", s) } - + noFix, err := strconv.ParseBool(items[3]) + if err != nil { + return nil, fmt.Errorf("instance conf parse no_prefix:%s, err: %v", s, err) + } conf = &InstanceConf{ Group: items[0], Namespace: items[1], Wrapper: items[2], + NoFixKey: noFix, } return conf, nil } @@ -61,6 +67,7 @@ func (m *InstanceManager) buildKey(conf *InstanceConf) string { conf.Group, conf.Namespace, conf.Wrapper, + fmt.Sprint(conf.NoFixKey), }, keySep) } @@ -69,7 +76,7 @@ func (m *InstanceManager) add(key string, client *Client) { } func (m *InstanceManager) newInstance(ctx context.Context, conf *InstanceConf) (*Client, error) { - return NewClient(ctx, conf.Namespace, conf.Wrapper) + return NewClientWithOptions(ctx, conf.Namespace, WithWrapper(conf.Wrapper), WithNoFixKey(conf.NoFixKey)) } func (m *InstanceManager) GetInstance(ctx context.Context, conf *InstanceConf) (*Client, error) { diff --git a/cache/redis/instance_test.go b/cache/redis/instance_test.go new file mode 100644 index 0000000..a30111d --- /dev/null +++ b/cache/redis/instance_test.go @@ -0,0 +1,44 @@ +package redis + +import ( + "reflect" + "testing" +) + +func Test_instanceConfFromString(t *testing.T) { + type args struct { + s string + } + tests := []struct { + name string + args args + wantConf *InstanceConf + wantErr bool + }{ + { + name: "test no prefix key", + args: args{ + s: "default-base/test-e-false", + }, + wantConf: &InstanceConf{ + Group: "default", + Namespace: "base/test", + Wrapper: "e", + NoFixKey: false, + }, + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + gotConf, err := instanceConfFromString(tt.args.s) + if (err != nil) != tt.wantErr { + t.Errorf("instanceConfFromString() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !reflect.DeepEqual(gotConf, tt.wantConf) { + t.Errorf("instanceConfFromString() gotConf = %v, want %v", gotConf, tt.wantConf) + } + }) + } +} diff --git a/cache/redis/options.go b/cache/redis/options.go new file mode 100644 index 0000000..83c247a --- /dev/null +++ b/cache/redis/options.go @@ -0,0 +1,45 @@ +package redis + +// redis Client options +type options struct { + // fix key #{namespace.wrapper.key} + wrapper string + // if true no fix key + noFixKey bool + // if true key => #{namespace.wrapper.key} else key => #{namespace.key} + useWrapper bool +} + +type Option interface { + apply(*options) +} + +type wrapperOption string + +func (c wrapperOption) apply(opts *options) { + opts.wrapper = string(c) +} + +func WithWrapper(w string) Option { + return wrapperOption(w) +} + +type noFixKeyOption bool + +func (c noFixKeyOption) apply(opts *options) { + opts.noFixKey = bool(c) +} + +func WithNoFixKey(n bool) Option { + return noFixKeyOption(n) +} + +type useWrapperOption bool + +func (c useWrapperOption) apply(opts *options) { + opts.useWrapper = bool(c) +} + +func WithUseWrapper(n bool) Option { + return useWrapperOption(n) +} diff --git a/cache/redis/pipeline.go b/cache/redis/pipeline.go new file mode 100644 index 0000000..bf5fda8 --- /dev/null +++ b/cache/redis/pipeline.go @@ -0,0 +1,456 @@ +package redis + +import ( + "context" + "fmt" + "github.com/go-redis/redis" + "github.com/opentracing/opentracing-go" + "github.com/opentracing/opentracing-go/log" + "github.com/shawnfeng/sutil/cache/constants" + "strings" + "time" +) + +type Pipeline struct { + namespace string + pipeline redis.Pipeliner + opts *options +} + +func (m *Pipeline) fixKey(key string) string { + if m.opts.noFixKey { + return key + } + parts := []string{ + m.namespace, + m.opts.wrapper, + key, + } + if !m.opts.useWrapper { + parts = []string{ + m.namespace, + key, + } + } + return strings.Join(parts, ".") +} + +func (m *Pipeline) logSpan(ctx context.Context, op, key string) { + if span := opentracing.SpanFromContext(ctx); span != nil { + span.LogFields( + log.String(constants.SpanLogOp, op), + log.String(constants.SpanLogKeyKey, key), + log.String(constants.SpanLogCacheType, fmt.Sprint(constants.CacheTypeRedis))) + } +} + + +func (m *Pipeline) Get(ctx context.Context, key string) *redis.StringCmd { + k := m.fixKey(key) + m.logSpan(ctx, "Pipeline.Get", k) + return m.pipeline.Get(k) +} + +func (m *Pipeline) MGet(ctx context.Context, keys ...string) *redis.SliceCmd { + var fixKeys = make([]string, len(keys)) + for k, v := range keys { + key := m.fixKey(v) + fixKeys[k] = key + } + m.logSpan(ctx, "Pipeline.MGet", strings.Join(fixKeys, "||")) + return m.pipeline.MGet(fixKeys...) +} + +func (m *Pipeline) Set(ctx context.Context, key string, value interface{}, expiration time.Duration) *redis.StatusCmd { + k := m.fixKey(key) + m.logSpan(ctx, "Pipeline.Set", k) + return m.pipeline.Set(k, value, expiration) +} + +func (m *Pipeline) MSet(ctx context.Context, pairs ...interface{}) *redis.StatusCmd { + var fixPairs = make([]interface{}, len(pairs)) + var keys []string + for k, v := range pairs { + if (k & 1) == 0 { + key := m.fixKey(v.(string)) + keys = append(keys, key) + fixPairs[k] = key + } else { + fixPairs[k] = v + } + } + m.logSpan(ctx, "Pipeline.MSet", strings.Join(keys, "||")) + return m.pipeline.MSet(fixPairs...) +} + +func (m *Pipeline) GetBit(ctx context.Context, key string, offset int64) *redis.IntCmd { + k := m.fixKey(key) + m.logSpan(ctx, "Pipeline.GetBit", k) + return m.pipeline.GetBit(k, offset) +} + +func (m *Pipeline) SetBit(ctx context.Context, key string, offset int64, value int) *redis.IntCmd { + k := m.fixKey(key) + m.logSpan(ctx, "Pipeline.SetBit", k) + return m.pipeline.SetBit(k, offset, value) +} + +func (m *Pipeline) Exists(ctx context.Context, key string) *redis.IntCmd { + k := m.fixKey(key) + m.logSpan(ctx, "Exists", k) + return m.pipeline.Exists(k) +} + +func (m *Pipeline) Del(ctx context.Context, keys ...string) *redis.IntCmd { + var tkeys []string + for _, key := range keys { + tkeys = append(tkeys, m.fixKey(key)) + } + + m.logSpan(ctx, "Pipeline.Del", strings.Join(tkeys, ",")) + return m.pipeline.Del(tkeys...) +} + +func (m *Pipeline) Expire(ctx context.Context, key string, expiration time.Duration) *redis.BoolCmd { + k := m.fixKey(key) + m.logSpan(ctx, "Pipeline.Expire", k) + return m.pipeline.Expire(k, expiration) +} + +func (m *Pipeline) Incr(ctx context.Context, key string) *redis.IntCmd { + k := m.fixKey(key) + m.logSpan(ctx, "Pipeline.Incr", k) + return m.pipeline.Incr(k) +} + +func (m *Pipeline) IncrBy(ctx context.Context, key string, value int64) *redis.IntCmd { + k := m.fixKey(key) + m.logSpan(ctx, "Pipeline.IncrBy", k) + return m.pipeline.IncrBy(k, value) +} + +func (m *Pipeline) Decr(ctx context.Context, key string) *redis.IntCmd { + k := m.fixKey(key) + m.logSpan(ctx, "Pipeline.Decr", k) + return m.pipeline.Decr(k) +} + +func (m *Pipeline) DecrBy(ctx context.Context, key string, value int64) *redis.IntCmd { + k := m.fixKey(key) + m.logSpan(ctx, "Pipeline.DecrBy", k) + return m.pipeline.DecrBy(k, value) +} + +func (m *Pipeline) SetNX(ctx context.Context, key string, value interface{}, expiration time.Duration) *redis.BoolCmd { + k := m.fixKey(key) + m.logSpan(ctx, "Pipeline.SetNX", k) + return m.pipeline.SetNX(k, value, expiration) +} + +func (m *Pipeline) HSet(ctx context.Context, key string, field string, value interface{}) *redis.BoolCmd { + k := m.fixKey(key) + m.logSpan(ctx, "Pipeline.HSet", k) + return m.pipeline.HSet(k, field, value) +} + +func (m *Pipeline) HDel(ctx context.Context, key string, fields ...string) *redis.IntCmd { + k := m.fixKey(key) + m.logSpan(ctx, "Pipeline.HDel", k) + return m.pipeline.HDel(k, fields...) +} + +func (m *Pipeline) HExists(ctx context.Context, key string, field string) *redis.BoolCmd { + k := m.fixKey(key) + m.logSpan(ctx, "Pipeline.HExists", k) + return m.pipeline.HExists(k, field) +} + +func (m *Pipeline) HGet(ctx context.Context, key string, field string) *redis.StringCmd { + k := m.fixKey(key) + m.logSpan(ctx, "Pipeline.HGet", k) + return m.pipeline.HGet(k, field) +} + +func (m *Pipeline) HGetAll(ctx context.Context, key string) *redis.StringStringMapCmd { + k := m.fixKey(key) + m.logSpan(ctx, "Pipeline.HGetAll", k) + return m.pipeline.HGetAll(k) +} + +func (m *Pipeline) HIncrBy(ctx context.Context, key string, field string, incr int64) *redis.IntCmd { + k := m.fixKey(key) + m.logSpan(ctx, "Pipeline.HIncrBy", k) + return m.pipeline.HIncrBy(k, field, incr) +} + +func (m *Pipeline) HIncrByFloat(ctx context.Context, key string, field string, incr float64) *redis.FloatCmd { + k := m.fixKey(key) + m.logSpan(ctx, "Pipeline.HIncrByFloat", k) + return m.pipeline.HIncrByFloat(k, field, incr) +} + +func (m *Pipeline) HKeys(ctx context.Context, key string) *redis.StringSliceCmd { + k := m.fixKey(key) + m.logSpan(ctx, "Pipeline.HKeys", k) + return m.pipeline.HKeys(k) +} + +func (m *Pipeline) HLen(ctx context.Context, key string) *redis.IntCmd { + k := m.fixKey(key) + m.logSpan(ctx, "Pipeline.HLen", k) + return m.pipeline.HLen(k) +} + +func (m *Pipeline) HMGet(ctx context.Context, key string, fields ...string) *redis.SliceCmd { + k := m.fixKey(key) + m.logSpan(ctx, "Pipeline.HMGet", k) + return m.pipeline.HMGet(k, fields...) +} + +func (m *Pipeline) HMSet(ctx context.Context, key string, fields map[string]interface{}) *redis.StatusCmd { + k := m.fixKey(key) + m.logSpan(ctx, "Pipeline.HMSet", k) + return m.pipeline.HMSet(k, fields) +} + +func (m *Pipeline) HSetNX(ctx context.Context, key string, field string, val interface{}) *redis.BoolCmd { + k := m.fixKey(key) + m.logSpan(ctx, "Pipeline.HSetNX", k) + return m.pipeline.HSetNX(k, field, val) +} + +func (m *Pipeline) HVals(ctx context.Context, key string) *redis.StringSliceCmd { + k := m.fixKey(key) + m.logSpan(ctx, "Pipeline.HVals", k) + return m.pipeline.HVals(k) +} + +func (m *Pipeline) ZAdd(ctx context.Context, key string, members ...redis.Z) *redis.IntCmd { + k := m.fixKey(key) + m.logSpan(ctx, "Pipeline.ZAdd", k) + return m.pipeline.ZAdd(k, members...) +} + +func (m *Pipeline) ZAddNX(ctx context.Context, key string, members ...redis.Z) *redis.IntCmd { + k := m.fixKey(key) + m.logSpan(ctx, "Pipeline.ZAddNX", k) + return m.pipeline.ZAddNX(k, members...) +} + +func (m *Pipeline) ZAddNXCh(ctx context.Context, key string, members ...redis.Z) *redis.IntCmd { + k := m.fixKey(key) + m.logSpan(ctx, "Pipeline.ZAddNXCh", k) + return m.pipeline.ZAddNXCh(k, members...) +} + +func (m *Pipeline) ZAddXX(ctx context.Context, key string, members ...redis.Z) *redis.IntCmd { + k := m.fixKey(key) + m.logSpan(ctx, "Pipeline.ZAddXX", k) + return m.pipeline.ZAddXX(k, members...) +} + +func (m *Pipeline) ZAddXXCh(ctx context.Context, key string, members ...redis.Z) *redis.IntCmd { + k := m.fixKey(key) + m.logSpan(ctx, "Pipeline.ZAddXXCh", k) + return m.pipeline.ZAddXXCh(k, members...) +} + +func (m *Pipeline) ZAddCh(ctx context.Context, key string, members ...redis.Z) *redis.IntCmd { + k := m.fixKey(key) + m.logSpan(ctx, "Pipeline.ZAddCh", k) + return m.pipeline.ZAddCh(k, members...) +} + +func (m *Pipeline) ZCard(ctx context.Context, key string) *redis.IntCmd { + k := m.fixKey(key) + m.logSpan(ctx, "Pipeline.ZCard", k) + return m.pipeline.ZCard(k) +} + +func (m *Pipeline) ZCount(ctx context.Context, key, min, max string) *redis.IntCmd { + k := m.fixKey(key) + m.logSpan(ctx, "Pipeline.ZCount", k) + return m.pipeline.ZCount(k, min, max) +} + +func (m *Pipeline) ZRange(ctx context.Context, key string, start, stop int64) *redis.StringSliceCmd { + k := m.fixKey(key) + m.logSpan(ctx, "Pipeline.ZRange", k) + return m.pipeline.ZRange(k, start, stop) +} + +func (m *Pipeline) ZRangeByLex(ctx context.Context, key string, by redis.ZRangeBy) *redis.StringSliceCmd { + k := m.fixKey(key) + m.logSpan(ctx, "Pipeline.ZRangeByLex", k) + return m.pipeline.ZRangeByLex(k, by) +} + +func (m *Pipeline) ZRangeByScore(ctx context.Context, key string, by redis.ZRangeBy) *redis.StringSliceCmd { + k := m.fixKey(key) + m.logSpan(ctx, "Pipeline.ZRangeByScore", k) + return m.pipeline.ZRangeByScore(k, by) +} + +func (m *Pipeline) ZRangeWithScores(ctx context.Context, key string, start, stop int64) *redis.ZSliceCmd { + k := m.fixKey(key) + m.logSpan(ctx, "Pipeline.ZRangeWithScores", k) + return m.pipeline.ZRangeWithScores(k, start, stop) +} + +func (m *Pipeline) ZRevRange(ctx context.Context, key string, start, stop int64) *redis.StringSliceCmd { + k := m.fixKey(key) + m.logSpan(ctx, "Pipeline.ZRevRange", k) + return m.pipeline.ZRevRange(k, start, stop) +} + +func (m *Pipeline) ZRevRangeWithScores(ctx context.Context, key string, start, stop int64) *redis.ZSliceCmd { + k := m.fixKey(key) + m.logSpan(ctx, "Pipeline.ZRevRangeWithScores", k) + return m.pipeline.ZRevRangeWithScores(k, start, stop) +} + +func (m *Pipeline) ZRank(ctx context.Context, key string, member string) *redis.IntCmd { + k := m.fixKey(key) + m.logSpan(ctx, "Pipeline.ZRank", k) + return m.pipeline.ZRank(k, member) +} + +func (m *Pipeline) ZRevRank(ctx context.Context, key string, member string) *redis.IntCmd { + k := m.fixKey(key) + m.logSpan(ctx, "Pipeline.ZRevRank", k) + return m.pipeline.ZRevRank(k, member) +} + +func (m *Pipeline) ZRem(ctx context.Context, key string, members []interface{}) *redis.IntCmd { + k := m.fixKey(key) + m.logSpan(ctx, "Pipeline.ZRem", k) + return m.pipeline.ZRem(k, members...) +} + +func (m *Pipeline) ZIncr(ctx context.Context, key string, member redis.Z) *redis.FloatCmd { + k := m.fixKey(key) + m.logSpan(ctx, "Pipeline.ZIncr", k) + return m.pipeline.ZIncr(k, member) +} + +func (m *Pipeline) ZIncrNX(ctx context.Context, key string, member redis.Z) *redis.FloatCmd { + k := m.fixKey(key) + m.logSpan(ctx, "Pipeline.ZIncrNX", k) + return m.pipeline.ZIncrNX(k, member) +} + +func (m *Pipeline) ZIncrXX(ctx context.Context, key string, member redis.Z) *redis.FloatCmd { + k := m.fixKey(key) + m.logSpan(ctx, "Pipeline.ZIncrXX", k) + return m.pipeline.ZIncrXX(k, member) +} + +func (m *Pipeline) ZIncrBy(ctx context.Context, key string, increment float64, member string) *redis.FloatCmd { + k := m.fixKey(key) + m.logSpan(ctx, "Pipeline.ZIncrBy", k) + return m.pipeline.ZIncrBy(k, increment, member) +} + +func (m *Pipeline) ZScore(ctx context.Context, key string, member string) *redis.FloatCmd { + k := m.fixKey(key) + m.logSpan(ctx, "Pipeline.ZScore", k) + return m.pipeline.ZScore(k, member) +} + +func (m *Pipeline) LIndex(ctx context.Context, key string, index int64) *redis.StringCmd { + k := m.fixKey(key) + m.logSpan(ctx, "Pipeline.LIndex", k) + return m.pipeline.LIndex(k, index) +} + +func (m *Pipeline) LInsert(ctx context.Context, key, op string, pivot, value interface{}) *redis.IntCmd { + k := m.fixKey(key) + m.logSpan(ctx, "Pipeline.LInsert", k) + return m.pipeline.LInsert(k, op, pivot, value) +} + +func (m *Pipeline) LLen(ctx context.Context, key string) *redis.IntCmd { + k := m.fixKey(key) + m.logSpan(ctx, "Pipeline.LLen", k) + return m.pipeline.LLen(k) +} + +func (m *Pipeline) LPop(ctx context.Context, key string) *redis.StringCmd { + k := m.fixKey(key) + m.logSpan(ctx, "Pipeline.LPop", k) + return m.pipeline.LPop(k) +} + +func (m *Pipeline) LPush(ctx context.Context, key string, values ...interface{}) *redis.IntCmd { + k := m.fixKey(key) + m.logSpan(ctx, "Pipeline.LPush", k) + return m.pipeline.LPush(k, values...) +} + +func (m *Pipeline) LPushX(ctx context.Context, key string, value interface{}) *redis.IntCmd { + k := m.fixKey(key) + m.logSpan(ctx, "Pipeline.LPushX", k) + return m.pipeline.LPushX(k, value) +} + +func (m *Pipeline) LRange(ctx context.Context, key string, start, stop int64) *redis.StringSliceCmd { + k := m.fixKey(key) + m.logSpan(ctx, "Pipeline.LRange", k) + return m.pipeline.LRange(k, start, stop) +} + +func (m *Pipeline) LRem(ctx context.Context, key string, count int64, value interface{}) *redis.IntCmd { + k := m.fixKey(key) + m.logSpan(ctx, "Pipeline.LRem", k) + return m.pipeline.LRem(k, count, value) +} + +func (m *Pipeline) LSet(ctx context.Context, key string, index int64, value interface{}) *redis.StatusCmd { + k := m.fixKey(key) + m.logSpan(ctx, "Pipeline.LSet", k) + return m.pipeline.LSet(k, index, value) +} + +func (m *Pipeline) LTrim(ctx context.Context, key string, start, stop int64) *redis.StatusCmd { + k := m.fixKey(key) + m.logSpan(ctx, "Pipeline.LTrim", k) + return m.pipeline.LTrim(k, start, stop) +} + +func (m *Pipeline) RPop(ctx context.Context, key string) *redis.StringCmd { + k := m.fixKey(key) + m.logSpan(ctx, "Pipeline.RPop", k) + return m.pipeline.RPop(k) +} + +func (m *Pipeline) RPush(ctx context.Context, key string, values ...interface{}) *redis.IntCmd { + k := m.fixKey(key) + m.logSpan(ctx, "Pipeline.RPush", k) + return m.pipeline.RPush(k, values...) +} + +func (m *Pipeline) RPushX(ctx context.Context, key string, value interface{}) *redis.IntCmd { + k := m.fixKey(key) + m.logSpan(ctx, "Pipeline.RPushX", k) + return m.pipeline.RPushX(k, value) +} + +func (m *Pipeline) TTL(ctx context.Context, key string) *redis.DurationCmd { + k := m.fixKey(key) + m.logSpan(ctx, "Pipeline.TTL", k) + return m.pipeline.TTL(k) +} + +func (m *Pipeline) Exec(ctx context.Context) ([]redis.Cmder, error){ + return m.pipeline.Exec() +} + +func (m *Pipeline) Discard(ctx context.Context) error { + return m.pipeline.Discard() +} + +func (m *Pipeline) Close() error { + return m.pipeline.Close() +} + + diff --git a/cache/redis/redis.go b/cache/redis/redis.go index 8b06dd0..b4889ba 100644 --- a/cache/redis/redis.go +++ b/cache/redis/redis.go @@ -16,10 +16,9 @@ import ( var RedisNil = fmt.Sprintf("redis: nil") type Client struct { - client *redis.Client - namespace string - wrapper string - useWrapper bool + client *redis.Client + namespace string + opts *options } func NewClient(ctx context.Context, namespace string, wrapper string) (*Client, error) { @@ -43,12 +42,51 @@ func NewClient(ctx context.Context, namespace string, wrapper string) (*Client, if err != nil { slog.Errorf(ctx, "%s ping:%s err:%s", fun, pong, err) } - - return &Client{ - client: client, - namespace: namespace, + opts := &options{ wrapper: wrapper, + noFixKey: false, + useWrapper: config.useWrapper, + } + return &Client{ + client: client, + namespace: namespace, + opts: opts, + }, err +} + +func NewClientWithOptions(ctx context.Context, namespace string, opts ...Option) (*Client, error) { + fun := "NewClient -->" + + config, err := DefaultConfiger.GetConfig(ctx, namespace) + if err != nil { + return nil, err + } + + client := redis.NewClient(&redis.Options{ + Addr: config.addr, + DialTimeout: 3 * config.timeout, + ReadTimeout: config.timeout, + WriteTimeout: config.timeout, + PoolSize: config.poolSize, + PoolTimeout: 2 * config.timeout, + }) + + pong, err := client.Ping().Result() + if err != nil { + slog.Errorf(ctx, "%s ping:%s err:%s", fun, pong, err) + } + opt := &options{ + wrapper: constants.DefaultRedisWrapper, + noFixKey: false, useWrapper: config.useWrapper, + } + for _, o := range opts { + o.apply(opt) + } + return &Client{ + namespace: namespace, + opts: opt, + client: client, }, err } @@ -70,20 +108,26 @@ func NewDefaultClient(ctx context.Context, namespace, addr, wrapper string, pool } return &Client{ - client: client, - namespace: namespace, - wrapper: wrapper, - useWrapper: useWrapper, + client: client, + namespace: namespace, + opts: &options{ + wrapper: wrapper, + noFixKey: false, + useWrapper: useWrapper, + }, }, err } func (m *Client) fixKey(key string) string { + if m.opts.noFixKey { + return key + } parts := []string{ m.namespace, - m.wrapper, + m.opts.wrapper, key, } - if !m.useWrapper { + if !m.opts.useWrapper { parts = []string{ m.namespace, key, @@ -526,3 +570,11 @@ func (m *Client) EvalSha(ctx context.Context, scriptHash string, keys []string, func (m *Client) Close(ctx context.Context) error { return m.client.Close() } + +func (m *Client) Pipeline() *Pipeline { + return &Pipeline{ + namespace: m.namespace, + pipeline: m.client.Pipeline(), + opts: m.opts, + } +} diff --git a/cache/redisext/list.go b/cache/redisext/list.go index c93aa21..018ea6b 100644 --- a/cache/redisext/list.go +++ b/cache/redisext/list.go @@ -2,6 +2,7 @@ package redisext import ( "context" + go_redis "github.com/go-redis/redis" "github.com/opentracing/opentracing-go" "github.com/shawnfeng/sutil/stime" @@ -23,6 +24,10 @@ func (m *RedisExt) LIndex(ctx context.Context, key string, index int64) (element return } +func (p *PipelineExt) LIndex(ctx context.Context, key string, index int64) *go_redis.StringCmd { + return p.pipe.LIndex(ctx, p.prefixKey(key), index) +} + func (m *RedisExt) LInsert(ctx context.Context, key, op string, pivot, value interface{}) (n int64, err error) { command := "redisext.LInsert" span, ctx := opentracing.StartSpanFromContext(ctx, command) @@ -39,6 +44,10 @@ func (m *RedisExt) LInsert(ctx context.Context, key, op string, pivot, value int return } +func (p *PipelineExt) LInsert(ctx context.Context, key, op string, pivot, value interface{}) *go_redis.IntCmd { + return p.pipe.LInsert(ctx, p.prefixKey(key), op, pivot, value) +} + func (m *RedisExt) LLen(ctx context.Context, key string) (n int64, err error) { command := "redisext.LLen" span, ctx := opentracing.StartSpanFromContext(ctx, command) @@ -55,6 +64,10 @@ func (m *RedisExt) LLen(ctx context.Context, key string) (n int64, err error) { return } +func (p *PipelineExt) LLen(ctx context.Context, key string) *go_redis.IntCmd { + return p.pipe.LLen(ctx, p.prefixKey(key)) +} + func (m *RedisExt) LPop(ctx context.Context, key string) (element string, err error) { command := "redisext.LPop" span, ctx := opentracing.StartSpanFromContext(ctx, command) @@ -71,6 +84,10 @@ func (m *RedisExt) LPop(ctx context.Context, key string) (element string, err er return } +func (p *PipelineExt) LPop(ctx context.Context, key string) *go_redis.StringCmd { + return p.pipe.LPop(ctx, p.prefixKey(key)) +} + func (m *RedisExt) LPush(ctx context.Context, key string, values ...interface{}) (n int64, err error) { command := "rdisext.LPush" span, ctx := opentracing.StartSpanFromContext(ctx, command) @@ -87,6 +104,10 @@ func (m *RedisExt) LPush(ctx context.Context, key string, values ...interface{}) return } +func (p *PipelineExt) LPush(ctx context.Context, key string, values ...interface{}) *go_redis.IntCmd { + return p.pipe.LPush(ctx, p.prefixKey(key), values) +} + func (m *RedisExt) LPushX(ctx context.Context, key string, value interface{}) (n int64, err error) { command := "redisext.LPushX" span, ctx := opentracing.StartSpanFromContext(ctx, command) @@ -103,6 +124,10 @@ func (m *RedisExt) LPushX(ctx context.Context, key string, value interface{}) (n return } +func (p *PipelineExt) LPushX(ctx context.Context, key string, value interface{}) *go_redis.IntCmd { + return p.pipe.LPushX(ctx, p.prefixKey(key), value) +} + func (m *RedisExt) LRange(ctx context.Context, key string, start, stop int64) (r []string, err error) { command := "redisext.LRange" span, ctx := opentracing.StartSpanFromContext(ctx, command) @@ -119,6 +144,10 @@ func (m *RedisExt) LRange(ctx context.Context, key string, start, stop int64) (r return } +func (p *PipelineExt) LRange(ctx context.Context, key string, start, stop int64) *go_redis.StringSliceCmd { + return p.pipe.LRange(ctx, p.prefixKey(key), start, stop) +} + func (m *RedisExt) LRem(ctx context.Context, key string, count int64, value interface{}) (n int64, err error) { command := "redisext.LRem" span, ctx := opentracing.StartSpanFromContext(ctx, command) @@ -135,6 +164,10 @@ func (m *RedisExt) LRem(ctx context.Context, key string, count int64, value inte return } +func (p *PipelineExt) LRem(ctx context.Context, key string, count int64, value interface{}) *go_redis.IntCmd { + return p.pipe.LRem(ctx, p.prefixKey(key), count, value) +} + func (m *RedisExt) LSet(ctx context.Context, key string, index int64, value interface{}) (r string, err error) { command := "redisext.LSet" span, ctx := opentracing.StartSpanFromContext(ctx, command) @@ -151,6 +184,10 @@ func (m *RedisExt) LSet(ctx context.Context, key string, index int64, value inte return } +func (p *PipelineExt) LSet(ctx context.Context, key string, count int64, value interface{}) *go_redis.StatusCmd { + return p.pipe.LSet(ctx, p.prefixKey(key), count, value) +} + func (m *RedisExt) LTrim(ctx context.Context, key string, start, stop int64) (r string, err error) { command := "redisext.LTrim" span, ctx := opentracing.StartSpanFromContext(ctx, command) @@ -167,6 +204,10 @@ func (m *RedisExt) LTrim(ctx context.Context, key string, start, stop int64) (r return } +func (p *PipelineExt) LTrim(ctx context.Context, key string, start, stop int64) *go_redis.StatusCmd { + return p.pipe.LTrim(ctx, p.prefixKey(key), start, stop) +} + func (m *RedisExt) RPop(ctx context.Context, key string) (element string, err error) { command := "redisext.RPop" span, ctx := opentracing.StartSpanFromContext(ctx, command) @@ -183,6 +224,10 @@ func (m *RedisExt) RPop(ctx context.Context, key string) (element string, err er return } +func (p *PipelineExt) RPop(ctx context.Context, key string) *go_redis.StringCmd { + return p.pipe.RPop(ctx, p.prefixKey(key)) +} + func (m *RedisExt) RPush(ctx context.Context, key string, values ...interface{}) (n int64, err error) { command := "redisext.RPush" span, ctx := opentracing.StartSpanFromContext(ctx, command) @@ -199,6 +244,10 @@ func (m *RedisExt) RPush(ctx context.Context, key string, values ...interface{}) return } +func (p *PipelineExt) RPush(ctx context.Context, key string, values ...interface{}) *go_redis.IntCmd { + return p.pipe.RPush(ctx, p.prefixKey(key), values) +} + func (m *RedisExt) RPushX(ctx context.Context, key string, value interface{}) (n int64, err error) { command := "redisext.RPushX" span, ctx := opentracing.StartSpanFromContext(ctx, command) @@ -214,3 +263,7 @@ func (m *RedisExt) RPushX(ctx context.Context, key string, value interface{}) (n statReqErr(m.namespace, command, err) return } + +func (p *PipelineExt) RPushX(ctx context.Context, key string, value interface{}) *go_redis.IntCmd { + return p.pipe.RPushX(ctx, p.prefixKey(key), value) +} diff --git a/cache/redisext/metrics.go b/cache/redisext/metrics.go index ac9d2ad..6471765 100644 --- a/cache/redisext/metrics.go +++ b/cache/redisext/metrics.go @@ -1,6 +1,7 @@ package redisext import ( + go_redis "github.com/go-redis/redis" "gitlab.pri.ibanyu.com/middleware/seaweed/xstat/xmetric/xprometheus" ) @@ -35,7 +36,7 @@ func statReqDuration(namespace, command string, durationMS int64) { } func statReqErr(namespace, command string, err error) { - if err != nil { + if err != nil && err != go_redis.Nil { _metricReqErr.With("namespace", namespace, "command", command).Inc() } return diff --git a/cache/redisext/pipelinext.go b/cache/redisext/pipelinext.go new file mode 100644 index 0000000..5ac8c35 --- /dev/null +++ b/cache/redisext/pipelinext.go @@ -0,0 +1,270 @@ +package redisext + +import ( + "context" + "fmt" + go_redis "github.com/go-redis/redis" + "github.com/opentracing/opentracing-go" + "github.com/shawnfeng/sutil/cache/redis" + "github.com/shawnfeng/sutil/stime" + "time" +) + +// PipelineExt by RedisExt get pipeline +type PipelineExt struct { + namespace string + prefix string + pipe *redis.Pipeline +} + +func (m *PipelineExt) prefixKey(key string) string { + if len(m.prefix) > 0 { + key = fmt.Sprintf("%s.%s", m.prefix, key) + } + return key +} + +func (m *PipelineExt) Get(ctx context.Context, key string) (strCmd *go_redis.StringCmd) { + return m.pipe.Get(ctx, m.prefixKey(key)) +} + +func (m *PipelineExt) MGet(ctx context.Context, keys ...string) (sliceCmd *go_redis.SliceCmd) { + var prefixKey = make([]string, len(keys)) + for k, v := range keys { + prefixKey[k] = m.prefixKey(v) + } + return m.pipe.MGet(ctx, prefixKey...) +} + +func (m *PipelineExt) Set(ctx context.Context, key string, val interface{}, exp time.Duration) (statusCmd *go_redis.StatusCmd) { + return m.pipe.Set(ctx, m.prefixKey(key), val, exp) +} + +func (m *PipelineExt) MSet(ctx context.Context, pairs ...interface{}) (s *go_redis.StatusCmd) { + var prefixPairs = make([]interface{}, len(pairs)) + for k, v := range pairs { + if (k & 1) == 0 { + prefixPairs[k] = m.prefixKey(v.(string)) + } else { + prefixPairs[k] = v + } + } + return m.pipe.MSet(ctx, prefixPairs...) +} + +func (m *PipelineExt) GetBit(ctx context.Context, key string, offset int64) (n *go_redis.IntCmd) { + return m.pipe.GetBit(ctx, m.prefixKey(key), offset) +} + +func (m *PipelineExt) SetBit(ctx context.Context, key string, offset int64, value int) (n *go_redis.IntCmd) { + return m.pipe.SetBit(ctx, m.prefixKey(key), offset, value) +} + +func (m *PipelineExt) Incr(ctx context.Context, key string) (n *go_redis.IntCmd) { + return m.pipe.Incr(ctx, m.prefixKey(key)) +} + +func (m *PipelineExt) IncrBy(ctx context.Context, key string, val int64) (n *go_redis.IntCmd) { + return m.pipe.IncrBy(ctx, m.prefixKey(key), val) +} + +func (m *PipelineExt) Decr(ctx context.Context, key string) (n *go_redis.IntCmd) { + return m.pipe.Decr(ctx, m.prefixKey(key)) +} + +func (m *PipelineExt) DecrBy(ctx context.Context, key string, val int64) (n *go_redis.IntCmd) { + return m.pipe.DecrBy(ctx, m.prefixKey(key), val) +} + +func (m *PipelineExt) SetNX(ctx context.Context, key string, val interface{}, exp time.Duration) (b *go_redis.BoolCmd) { + return m.pipe.SetNX(ctx, m.prefixKey(key), val, exp) +} + +func (m *PipelineExt) Exists(ctx context.Context, key string) (n *go_redis.IntCmd) { + return m.pipe.Exists(ctx, m.prefixKey(key)) +} + +func (m *PipelineExt) Del(ctx context.Context, key string) (n *go_redis.IntCmd) { + return m.pipe.Del(ctx, m.prefixKey(key)) +} + +func (m *PipelineExt) Expire(ctx context.Context, key string, expiration time.Duration) (b *go_redis.BoolCmd) { + return m.pipe.Expire(ctx, m.prefixKey(key), expiration) +} + +// hashes apis +func (m *PipelineExt) HSet(ctx context.Context, key string, field string, value interface{}) (b *go_redis.BoolCmd) { + return m.pipe.HSet(ctx, m.prefixKey(key), field, value) +} + +func (m *PipelineExt) HDel(ctx context.Context, key string, fields ...string) (n *go_redis.IntCmd) { + return m.pipe.HDel(ctx, m.prefixKey(key), fields...) +} + +func (m *PipelineExt) HExists(ctx context.Context, key string, field string) (b *go_redis.BoolCmd) { + return m.pipe.HExists(ctx, m.prefixKey(key), field) +} + +func (m *PipelineExt) HGet(ctx context.Context, key string, field string) (s *go_redis.StringCmd) { + return m.pipe.HGet(ctx, m.prefixKey(key), field) +} + +func (m *PipelineExt) HGetAll(ctx context.Context, key string) (sm *go_redis.StringStringMapCmd) { + return m.pipe.HGetAll(ctx, m.prefixKey(key)) +} + +func (m *PipelineExt) HIncrBy(ctx context.Context, key string, field string, incr int64) (n *go_redis.IntCmd) { + return m.pipe.HIncrBy(ctx, m.prefixKey(key), field, incr) +} + +func (m *PipelineExt) HIncrByFloat(ctx context.Context, key string, field string, incr float64) (f *go_redis.FloatCmd) { + return m.pipe.HIncrByFloat(ctx, m.prefixKey(key), field, incr) +} + +func (m *PipelineExt) HKeys(ctx context.Context, key string) (ss *go_redis.StringSliceCmd) { + return m.pipe.HKeys(ctx, m.prefixKey(key)) +} + +func (m *PipelineExt) HLen(ctx context.Context, key string) (n *go_redis.IntCmd) { + return m.pipe.HLen(ctx, m.prefixKey(key)) +} + +func (m *PipelineExt) HMGet(ctx context.Context, key string, fields ...string) (vs *go_redis.SliceCmd) { + return m.pipe.HMGet(ctx, m.prefixKey(key), fields...) +} + +func (m *PipelineExt) HMSet(ctx context.Context, key string, fields map[string]interface{}) (s *go_redis.StatusCmd) { + return m.pipe.HMSet(ctx, m.prefixKey(key), fields) +} + +func (m *PipelineExt) HSetNX(ctx context.Context, key string, field string, val interface{}) (b *go_redis.BoolCmd) { + return m.pipe.HSetNX(ctx, m.prefixKey(key), field, val) +} + +func (m *PipelineExt) HVals(ctx context.Context, key string) (ss *go_redis.StringSliceCmd) { + return m.pipe.HVals(ctx, m.prefixKey(key)) +} + +// sorted set apis +func (m *PipelineExt) ZAdd(ctx context.Context, key string, members []Z) (n *go_redis.IntCmd) { + return m.pipe.ZAdd(ctx, m.prefixKey(key), toRedisZSlice(members)...) +} + +func (m *PipelineExt) ZAddNX(ctx context.Context, key string, members []Z) (n *go_redis.IntCmd) { + return m.pipe.ZAddNX(ctx, m.prefixKey(key), toRedisZSlice(members)...) +} + +func (m *PipelineExt) ZAddNXCh(ctx context.Context, key string, members []Z) (n *go_redis.IntCmd) { + return m.pipe.ZAddNXCh(ctx, m.prefixKey(key), toRedisZSlice(members)...) +} + +func (m *PipelineExt) ZAddXX(ctx context.Context, key string, members []Z) (n *go_redis.IntCmd) { + return m.pipe.ZAddXX(ctx, m.prefixKey(key), toRedisZSlice(members)...) +} + +func (m *PipelineExt) ZAddXXCh(ctx context.Context, key string, members []Z) (n *go_redis.IntCmd) { + return m.pipe.ZAddXXCh(ctx, m.prefixKey(key), toRedisZSlice(members)...) +} + +func (m *PipelineExt) ZAddCh(ctx context.Context, key string, members []Z) (n *go_redis.IntCmd) { + return m.pipe.ZAddCh(ctx, m.prefixKey(key), toRedisZSlice(members)...) +} + +func (m *PipelineExt) ZCard(ctx context.Context, key string) (n *go_redis.IntCmd) { + return m.pipe.ZCard(ctx, m.prefixKey(key)) +} + +func (m *PipelineExt) ZCount(ctx context.Context, key, min, max string) (n *go_redis.IntCmd) { + return m.pipe.ZCount(ctx, m.prefixKey(key), min, max) +} + +func (m *PipelineExt) ZRange(ctx context.Context, key string, start, stop int64) (ss *go_redis.StringSliceCmd) { + return m.pipe.ZRange(ctx, m.prefixKey(key), start, stop) +} + +func (m *PipelineExt) ZRangeByLex(ctx context.Context, key string, by ZRangeBy) (ss *go_redis.StringSliceCmd) { + return m.pipe.ZRangeByLex(ctx, m.prefixKey(key), toRedisZRangeBy(by)) +} + +func (m *PipelineExt) ZRangeByScore(ctx context.Context, key string, by ZRangeBy) (ss *go_redis.StringSliceCmd) { + return m.pipe.ZRangeByScore(ctx, m.prefixKey(key), toRedisZRangeBy(by)) +} + +func (m *PipelineExt) ZRangeWithScores(ctx context.Context, key string, start, stop int64) (zs *go_redis.ZSliceCmd) { + return m.pipe.ZRangeWithScores(ctx, m.prefixKey(key), start, stop) +} + +func (m *PipelineExt) ZRevRange(ctx context.Context, key string, start, stop int64) (ss *go_redis.StringSliceCmd) { + return m.pipe.ZRevRange(ctx, m.prefixKey(key), start, stop) +} + +func (m *PipelineExt) ZRevRangeWithScores(ctx context.Context, key string, start, stop int64) (zs *go_redis.ZSliceCmd) { + return m.pipe.ZRevRangeWithScores(ctx, m.prefixKey(key), start, stop) +} + +func (m *PipelineExt) ZRank(ctx context.Context, key string, member string) (n *go_redis.IntCmd) { + return m.pipe.ZRank(ctx, m.prefixKey(key), member) +} + +func (m *PipelineExt) ZRevRank(ctx context.Context, key string, member string) (n *go_redis.IntCmd) { + return m.pipe.ZRevRank(ctx, m.prefixKey(key), member) +} + +func (m *PipelineExt) ZRem(ctx context.Context, key string, members []interface{}) (n *go_redis.IntCmd) { + return m.pipe.ZRem(ctx, m.prefixKey(key), members) +} + +func (m *PipelineExt) ZIncr(ctx context.Context, key string, member Z) (f *go_redis.FloatCmd) { + return m.pipe.ZIncr(ctx, m.prefixKey(key), member.toRedisZ()) +} + +func (m *PipelineExt) ZIncrNX(ctx context.Context, key string, member Z) (f *go_redis.FloatCmd) { + return m.pipe.ZIncrNX(ctx, m.prefixKey(key), member.toRedisZ()) +} + +func (m *PipelineExt) ZIncrXX(ctx context.Context, key string, member Z) (f *go_redis.FloatCmd) { + return m.pipe.ZIncrXX(ctx, m.prefixKey(key), member.toRedisZ()) +} + +func (m *PipelineExt) ZIncrBy(ctx context.Context, key string, increment float64, member string) (f *go_redis.FloatCmd) { + return m.pipe.ZIncrBy(ctx, m.prefixKey(key), increment, member) +} + +func (m *PipelineExt) ZScore(ctx context.Context, key string, member string) (f *go_redis.FloatCmd) { + return m.pipe.ZScore(ctx, m.prefixKey(key), member) +} + +func (m *PipelineExt) TTL(ctx context.Context, key string) (d *go_redis.DurationCmd) { + return m.pipe.TTL(ctx, m.prefixKey(key)) +} + +func (m *PipelineExt) Exec(ctx context.Context) (cmds []go_redis.Cmder, err error) { + command := "PipelineExt.Exec" + span, ctx := opentracing.StartSpanFromContext(ctx, command) + st := stime.NewTimeStat() + defer func() { + span.Finish() + statReqDuration(m.namespace, command, st.Millisecond()) + }() + + cmds, err = m.pipe.Exec(ctx) + statReqErr(m.namespace, command, err) + return +} + +func (m *PipelineExt) Discard(ctx context.Context) (err error) { + command := "PipelineExt.Discard" + span, ctx := opentracing.StartSpanFromContext(ctx, command) + st := stime.NewTimeStat() + defer func() { + span.Finish() + statReqDuration(m.namespace, command, st.Millisecond()) + }() + err = m.pipe.Discard(ctx) + statReqErr(m.namespace, command, err) + return +} + +func (m *PipelineExt) Close(ctx context.Context) error { + return m.pipe.Close() +} diff --git a/cache/redisext/pipelinext_test.go b/cache/redisext/pipelinext_test.go new file mode 100644 index 0000000..928f7e7 --- /dev/null +++ b/cache/redisext/pipelinext_test.go @@ -0,0 +1,147 @@ +package redisext + +import ( + "context" + "github.com/stretchr/testify/assert" + "testing" + "time" +) + +func TestPipelineExt_Get(t *testing.T) { + ctx := context.Background() + redis := NewRedisExt("base/test", "test") + redis.Set(ctx, "testPipeline", "success", 15 * time.Second) + pipe, err := redis.Pipeline(ctx) + assert.NoError(t, err) + + s := pipe.Get(ctx, "testPipeline") + str, err := s.Result() + assert.NoError(t, err) + assert.Equal(t, "", str) + + cmds, err := pipe.Exec(ctx) + assert.NoError(t, err) + assert.Equal(t, 1, len(cmds)) + str, err = s.Result() + assert.Equal(t, "success", str) +} + +func TestPipelineExt_TTL(t *testing.T) { + ctx := context.Background() + redis := NewRedisExt("base/test", "test") + _, err := redis.Set(ctx, "testPipeline", "success", 15 * time.Second) + re, err := redis.Pipeline(ctx) + assert.NoError(t, err) + + s := re.Get(ctx, "testPipeline") + str, err := s.Result() + assert.NoError(t, err) + assert.Equal(t, "", str) + + tr := re.TTL(ctx, "testPipeline") + dur, err := tr.Result() + assert.NoError(t, err) + assert.Equal(t, "0s", dur.String()) + + cmds, err := re.Exec(ctx) + assert.NoError(t, err) + assert.Equal(t, 2, len(cmds)) + str, err = s.Result() + assert.Equal(t, "success", str) + dur, err = tr.Result() + assert.Equal(t, "15s", dur.String()) +} + +func TestPipelineExt_MGet(t *testing.T) { + ctx := context.Background() + redis := NewRedisExt("base/test", "test") + _, err := redis.MSet(ctx, "testPipeline", "success", "testPipeline2", "success") + re, err := redis.Pipeline(ctx) + assert.NoError(t, err) + + r := re.MGet(ctx, "testPipeline", "testPipeline2") + arr, err := r.Result() + assert.NoError(t, err) + assert.Equal(t, []interface {}(nil), arr) + + cmds, err := re.Exec(ctx) + assert.NoError(t, err) + assert.Equal(t, 1, len(cmds)) + arr, err = r.Result() + assert.Equal(t, "success", arr[0].(string)) + assert.Equal(t, "success", arr[1].(string)) +} + +func TestPipelineExt_NoPrefix(t *testing.T) { + ctx := context.Background() + + redis := NewRedisExtNoPrefix("base/test") + redis.Set(ctx, "testPipeline", "success", 15 * time.Second) + pipe, err := redis.Pipeline(ctx) + assert.NoError(t, err) + + s := pipe.Get(ctx, "testPipeline") + str, err := s.Result() + assert.NoError(t, err) + assert.Equal(t, "", str) + + cmds, err := pipe.Exec(ctx) + assert.NoError(t, err) + assert.Equal(t, 1, len(cmds)) + str, err = s.Result() + assert.Equal(t, "success", str) +} + +func TestPipelineExt_HGet(t *testing.T) { + ctx := context.Background() + + redis := NewRedisExt("base/test", "test") + redis.HSet(ctx, "testPipeline", "key", "val") + pipe, err := redis.Pipeline(ctx) + assert.NoError(t, err) + + strCmd := pipe.HGet(ctx, "testPipeline", "key") + str, err := strCmd.Result() + assert.NoError(t, err) + assert.Equal(t, "", str) + + cmds, err := pipe.Exec(ctx) + assert.NoError(t, err) + assert.Equal(t, 1, len(cmds)) + str, err = strCmd.Result() + assert.Equal(t, "val", str) +} + +func TestPipelineExt_LPop(t *testing.T) { + ctx := context.Background() + + redis := NewRedisExt("base/test", "test") + redis.LPush(ctx, "testPipelineLPop", "val1", "val2", "val3") + pipe, err := redis.Pipeline(ctx) + assert.NoError(t, err) + + strCmd1 := pipe.LPop(ctx, "testPipelineLPop") + str, err := strCmd1.Result() + assert.NoError(t, err) + assert.Equal(t, "", str) + + strCmd2 := pipe.LPop(ctx, "testPipelineLPop") + str, err = strCmd2.Result() + assert.NoError(t, err) + assert.Equal(t, "", str) + + strCmd3 := pipe.LPop(ctx, "testPipelineLPop") + str, err = strCmd3.Result() + assert.NoError(t, err) + assert.Equal(t, "", str) + + cmds, err := pipe.Exec(ctx) + assert.NoError(t, err) + assert.Equal(t, 3, len(cmds)) + str, err = strCmd1.Result() + assert.Equal(t, "val3", str) + str, err = strCmd2.Result() + assert.Equal(t, "val2", str) + str, err = strCmd3.Result() + assert.Equal(t, "val1", str) +} diff --git a/cache/redisext/redisext.go b/cache/redisext/redisext.go index e08e0cd..9e65a80 100644 --- a/cache/redisext/redisext.go +++ b/cache/redisext/redisext.go @@ -18,10 +18,20 @@ import ( type RedisExt struct { namespace string prefix string + noFixKey bool // redis client no fix key } func NewRedisExt(namespace, prefix string) *RedisExt { - return &RedisExt{namespace, prefix} + return &RedisExt{ + namespace: namespace, + prefix: prefix} +} + +// NewRedisExtNoPrefix not fix redis key +func NewRedisExtNoPrefix(namespace string) *RedisExt { + return &RedisExt{ + namespace: namespace, + noFixKey: true} } type Z struct { @@ -88,6 +98,7 @@ func (m *RedisExt) getInstanceConf(ctx context.Context) *redis.InstanceConf { Group: scontext.GetControlRouteGroupWithDefault(ctx, constants.DefaultRouteGroup), Namespace: m.namespace, Wrapper: cache.WrapperTypeRedisExt, + NoFixKey: m.noFixKey, } } @@ -903,7 +914,24 @@ func (m *RedisExt) TTL(ctx context.Context, key string) (d time.Duration, err er }() client, err := m.getRedisInstance(ctx) if err == nil { - d, err = client.TTL(ctx, m.prefixKey(key)).Result() + d, err = client.TTL(ctx, m.prefixKey(key)).Result() + } + statReqErr(m.namespace, command, err) + return +} + +func (m *RedisExt) Pipeline(ctx context.Context) (pipe *PipelineExt, err error) { + command := "redisExt.Pipeline" + span, ctx := opentracing.StartSpanFromContext(ctx, command) + st := stime.NewTimeStat() + defer func() { + span.Finish() + statReqDuration(m.namespace, command, st.Millisecond()) + }() + client, err := m.getRedisInstance(ctx) + if err == nil { + p := client.Pipeline() + pipe = &PipelineExt{namespace: m.namespace, prefix: m.prefix, pipe: p} } statReqErr(m.namespace, command, err) return diff --git a/cache/redisext/redisext_test.go b/cache/redisext/redisext_test.go index d94e458..03db306 100644 --- a/cache/redisext/redisext_test.go +++ b/cache/redisext/redisext_test.go @@ -279,3 +279,24 @@ func TestRedisExt_TTL(t *testing.T) { assert.Equal(t, -1 * time.Second, d) re.Del(ctx, "getttl3") } + +func TestNewRedisExtNoPrefix(t *testing.T) { + ctx := context.Background() + val := "val" + re := NewRedisExtNoPrefix("base/report") + preRedis := NewRedisExt("base/report", "test") + + _, err := re.Set(ctx, "set", val, 10 * time.Second) + assert.NoError(t, err) + + _, err = preRedis.Set(ctx, "set", val+"prefix", 10 * time.Second) + assert.NoError(t, err) + + s, err := re.Get(ctx, "set") + assert.NoError(t, err) + assert.Equal(t, s, val) + + s, err = preRedis.Get(ctx, "set") + assert.NoError(t, err) + assert.Equal(t, s, val+"prefix") +} diff --git a/go.mod b/go.mod index 2a01194..57b22dd 100644 --- a/go.mod +++ b/go.mod @@ -27,9 +27,8 @@ require ( github.com/ugorji/go v1.1.7 // indirect github.com/vaughan0/go-ini v0.0.0-20130923145212-a98ad7ee00ec gitlab.pri.ibanyu.com/middleware/delayqueue v0.0.0-20200213090847-cd24af2bd1f2 - gitlab.pri.ibanyu.com/middleware/seaweed v1.1.6 + gitlab.pri.ibanyu.com/middleware/seaweed v1.1.11 golang.org/x/tools v0.0.0-20191120001058-ad01d5993d97 // indirect - gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22 ) diff --git a/go.sum b/go.sum index cc3d3c2..2348a52 100644 --- a/go.sum +++ b/go.sum @@ -300,6 +300,10 @@ gitlab.pri.ibanyu.com/middleware/seaweed v1.0.5/go.mod h1:iCFyLPlxZOk9Z6sJ+b92gN gitlab.pri.ibanyu.com/middleware/seaweed v1.0.20/go.mod h1:MpSbv4ZwqbtESMEils0BcjET3+Qp+XeWtAwLqbk07fg= gitlab.pri.ibanyu.com/middleware/seaweed v1.1.5 h1:uqli1CC8D/iVm98Jw/FS9lpxAB7ghkKZ0IT2uqjFLXo= gitlab.pri.ibanyu.com/middleware/seaweed v1.1.5/go.mod h1:/jdJeuMOKlxO0+fC18Lrv4xJEchrzyVJdXi1oc5D9oA= +gitlab.pri.ibanyu.com/middleware/seaweed v1.1.6 h1:8MCogx3rtxV8fDOHulK0cwFcd6fEwmTQgYYRvnPKOsA= +gitlab.pri.ibanyu.com/middleware/seaweed v1.1.6/go.mod h1:/jdJeuMOKlxO0+fC18Lrv4xJEchrzyVJdXi1oc5D9oA= +gitlab.pri.ibanyu.com/middleware/seaweed v1.1.11 h1:GWDbAxLwy88meUOvRqRIsEs7tU2uwEYVqU7FHCFKr/g= +gitlab.pri.ibanyu.com/middleware/seaweed v1.1.11/go.mod h1:/jdJeuMOKlxO0+fC18Lrv4xJEchrzyVJdXi1oc5D9oA= gitlab.pri.ibanyu.com/middleware/util v1.0.27 h1:9cFCEPpxvUDfM2ZCzLhaf/Qe4ytgSlVniObjuY2uzVo= gitlab.pri.ibanyu.com/middleware/util v1.0.27/go.mod h1:quXdc6vUclNPa1AB1z5eTdlegRmXL+tOOfkMvezxk8k= go.opencensus.io v0.20.1/go.mod h1:6WKK9ahsWS3RSO+PY9ZHZUfv2irvY6gN279GOPZjmmk= diff --git a/slog/statlog/log.go b/slog/statlog/log.go index 6997835..d94632d 100644 --- a/slog/statlog/log.go +++ b/slog/statlog/log.go @@ -30,6 +30,11 @@ func LogKV(ctx context.Context, name string, keysAndValues ...interface{}) { atomic.AddInt64(&counter, 1) } +func LogKVWithMap(ctx context.Context, name string, kvMap map[string]interface{}) { + xlog.StatLogWithMap(ctx, name, kvMap) + atomic.AddInt64(&counter, 1) +} + func init() { atomic.StoreInt64(&fromTimeStamp, time.Now().Unix()) } diff --git a/slog/statlog/log_test.go b/slog/statlog/log_test.go index 8ab8140..ab8de5e 100644 --- a/slog/statlog/log_test.go +++ b/slog/statlog/log_test.go @@ -73,3 +73,44 @@ func TestLog(t *testing.T) { "k2", 1) }) } + +func TestLogKVWithMap(t *testing.T) { + Init("", "", "testservice") + t.Run("context without head", func(t *testing.T) { + LogKVWithMap(context.TODO(), "method1", map[string]interface{}{ + "k1": 0, + "k2": "hello", + "k3": true, + "k4": []int{1, 2, 3}, + "k5": &sa{"world", 0}, + }) + }) + + t.Run("context without head LogKV", func(t *testing.T) { + LogKV(context.TODO(), "method1", + "k1", 0, + "k2", "hello", + "k3", true, + "k4", []int{1, 2, 3}, + "k5", &sa{"world", 0}) + }) + + t.Run("context with head", func(t *testing.T) { + ctx := context.TODO() + ctx = context.WithValue(ctx, scontext.ContextKeyHead, &testHead{ + uid: 1234, + source: 0, + ip: "192.168.0.1", + region: "asia", + dt: 0, + unionid: "unionid", + }) + LogKVWithMap(ctx, "method2", map[string]interface{}{ + "k1": 0, + "k2": "hello", + "k3": true, + "k4": []int{1, 2, 3}, + "k5": &sa{"world", 0}, + }) + }) +}