Skip to content

Commit

Permalink
Merge pull request #175 from brzyangg/feature-mq-log
Browse files Browse the repository at this point in the history
mq-log
  • Loading branch information
niubell authored Jul 8, 2020
2 parents b1a9f62 + 9ab8091 commit fb0baed
Show file tree
Hide file tree
Showing 12 changed files with 212 additions and 85 deletions.
12 changes: 12 additions & 0 deletions cache/redis/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -646,6 +646,18 @@ func (m *Client) SMembers(ctx context.Context, key string) *redis.StringSliceCmd
return m.client.SMembers(k)
}

func (m *Client) SRandMember(ctx context.Context, key string) *redis.StringCmd {
k := m.fixKey(key)
m.logSpan(ctx, "SRandMember", k)
return m.client.SRandMember(k)
}

func (m *Client) SRandMemberN(ctx context.Context, key string, count int64) *redis.StringSliceCmd {
k := m.fixKey(key)
m.logSpan(ctx, "SRandMemberN", k)
return m.client.SRandMemberN(k, count)
}

func (m *Client) Close(ctx context.Context) error {
return m.client.Close()
}
Expand Down
32 changes: 32 additions & 0 deletions cache/redisext/redisext.go
Original file line number Diff line number Diff line change
Expand Up @@ -1113,6 +1113,38 @@ func (m *RedisExt) SMembers(ctx context.Context, key string) (s []string, err er
return
}

func (m *RedisExt) SRandMember(ctx context.Context, key string) (s string, err error) {
command := "redisext.SRandMember"
span, ctx := opentracing.StartSpanFromContext(ctx, command)
st := stime.NewTimeStat()
defer func() {
span.Finish()
statReqDuration(m.namespace, command, st.Millisecond())
}()
client, err := m.getRedisInstance(ctx)
if err == nil {
s, err = client.SRandMember(ctx, m.prefixKey(key)).Result()
}
statReqErr(m.namespace, command, err)
return
}

func (m *RedisExt) SRandMemberN(ctx context.Context, key string, count int64) (s []string, err error) {
command := "redisext.SRandMemberN"
span, ctx := opentracing.StartSpanFromContext(ctx, command)
st := stime.NewTimeStat()
defer func() {
span.Finish()
statReqDuration(m.namespace, command, st.Millisecond())
}()
client, err := m.getRedisInstance(ctx)
if err == nil {
s, err = client.SRandMemberN(ctx, m.prefixKey(key), count).Result()
}
statReqErr(m.namespace, command, err)
return
}

func (m *RedisExt) ZScan(ctx context.Context, key string, cursor uint64, match string, count int64) (keys []string, rcursor uint64, err error) {
command := "redisext.ZScan"
span, ctx := opentracing.StartSpanFromContext(ctx, command)
Expand Down
49 changes: 49 additions & 0 deletions cache/redisext/redisext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,3 +447,52 @@ func TestRedisExt_SMembers(t *testing.T) {
assert.NoError(t, err)
assert.True(t, b)
}

func TestRedisExt_SRandMembers(t *testing.T) {
ctx := context.Background()
re := NewRedisExt("base/report", "test")
key := "smemberstest"
members := []string{"m1", "m2", "m3"}

i, err := re.SAdd(ctx, key, members)
assert.NoError(t, err)
assert.Equal(t, int64(3), i)

s, err := re.SRandMember(ctx, key)
assert.NoError(t, err)
assert.Equal(t, true, isContain(members, s))

b, err := re.Expire(ctx, key, 5 * time.Second)
assert.NoError(t, err)
assert.True(t, b)
}

func TestRedisExt_SRandMembersN(t *testing.T) {
ctx := context.Background()
re := NewRedisExt("base/report", "test")
key := "smemberstest"
members := []string{"m1", "m2", "m3"}

i, err := re.SAdd(ctx, key, members)
assert.NoError(t, err)
assert.Equal(t, int64(3), i)

arr, err := re.SRandMemberN(ctx, key, 2)
assert.NoError(t, err)
assert.Equal(t, 2, len(arr))
assert.Equal(t, true, isContain(members, arr[0]))
assert.Equal(t, true, isContain(members, arr[1]))

b, err := re.Expire(ctx, key, 5 * time.Second)
assert.NoError(t, err)
assert.True(t, b)
}

func isContain(items []string, item string) bool {
for _, eachItem := range items {
if eachItem == item {
return true
}
}
return false
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ require (
github.com/prometheus/client_golang v1.2.1
github.com/prometheus/common v0.7.0
github.com/sdming/gosnow v0.0.0-20130403030620-3a05c415e886
github.com/segmentio/kafka-go v0.3.4
github.com/segmentio/kafka-go v0.3.7
github.com/shawnfeng/lumberjack.v2 v0.0.0-20181226094728-63d76296ede8
github.com/stretchr/testify v1.4.0
github.com/uber/jaeger-client-go v2.20.1+incompatible
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWX
github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI=
github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg=
github.com/VividCortex/gohistogram v1.0.0/go.mod h1:Pf5mBqqDxYaXu3hDrrU+w6nw50o/4+TcAqDqk/vUH7g=
github.com/ZhengHe-MD/agollo v0.0.0-20190709010021-166900f9bd56 h1:y5QtREe6LiwDv8oGRNp0pag0bfcQocX1JNr4DxDAOZA=
github.com/ZhengHe-MD/agollo v0.0.0-20190709010021-166900f9bd56/go.mod h1:Hs1VYbuOoVlDPOMlZawvwpN7eAqDOQUFG3Lxcne0SBo=
github.com/ZhengHe-MD/agollo/v4 v4.1.3/go.mod h1:Cze1dOFujMfuqY5mZWzz1vvJ0ZdpDQ77PC5m2J/qv+4=
github.com/ZhengHe-MD/agollo/v4 v4.1.4 h1:59qfwJXnb+MRbzXhn1rsPArEsxhddO8o5DMfXbhAFW4=
Expand Down Expand Up @@ -159,6 +160,7 @@ github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7V
github.com/kaneshin/go-pkg v0.0.0-20150919125626-a8e1479186cf h1:kBpksZbhb1xNFDTPbxh5H5G0o0MJZKrH004ELvq/zWI=
github.com/kaneshin/go-pkg v0.0.0-20150919125626-a8e1479186cf/go.mod h1:XI2QuPvM6xUXWcpwlqaMB+SxxV7XNr3aO/DrUK8lRZo=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.9.8/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
Expand Down Expand Up @@ -229,6 +231,8 @@ github.com/sdming/gosnow v0.0.0-20130403030620-3a05c415e886/go.mod h1:xucuMeiX1T
github.com/segmentio/kafka-go v0.2.4/go.mod h1:MyX8oKJCSypBXY66FgANfFbqN8aFXAGoLlnR3eKCzoU=
github.com/segmentio/kafka-go v0.3.4 h1:Mv9AcnCgU14/cU6Vd0wuRdG1FBO0HzXQLnjBduDLy70=
github.com/segmentio/kafka-go v0.3.4/go.mod h1:OT5KXBPbaJJTcvokhWR2KFmm0niEx3mnccTwjmLvSi4=
github.com/segmentio/kafka-go v0.3.7 h1:UCFPJw6KoVkmrilA2LbWVuybJojHzj6gDDFdV7H7IBs=
github.com/segmentio/kafka-go v0.3.7/go.mod h1:8rEphJEczp+yDE/R5vwmaqZgF1wllrl4ioQcNKB8wVA=
github.com/serialx/hashring v0.0.0-20160507062712-75d57fa264ad/go.mod h1:/yeG0My1xr/u+HZrFQ1tOQQQQrOawfyMUH13ai5brBc=
github.com/shawnfeng/consistent v1.0.2/go.mod h1:TpBWBTkEGx/yt72C1fvM9zfHfKAKXfeMEIxrmdI34xg=
github.com/shawnfeng/consistent v1.0.3 h1:ilrK9gACa6S14E7otsWkgIF+2VPz9oATOWaUnT5OxHE=
Expand Down
56 changes: 36 additions & 20 deletions mq/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@ import (
"context"
"errors"
"fmt"
"github.com/shawnfeng/sutil/sconf/center"
"github.com/shawnfeng/sutil/scontext"
"github.com/shawnfeng/sutil/slog/slog"
"strconv"
"strings"
"sync"
"time"

"github.com/shawnfeng/sutil/sconf/center"
"github.com/shawnfeng/sutil/scontext"
"github.com/shawnfeng/sutil/slog/slog"
)

type MQType int
Expand Down Expand Up @@ -59,10 +60,11 @@ func (c ConfigerType) String() string {
const (
defaultTimeout = 3 * time.Second
//默认1000毫秒
defaultBatchTimeoutMs = 1000
defaultTTR = 3600 // 1 hour
defaultTTL = 3600 * 24 // 1 day
defaultTries = 1
defaultBatchTimeoutMs = 1000
defaultTTR = 3600 // 1 hour
defaultTTL = 3600 * 24 // 1 day
defaultTries = 1
defaultRequestIntervalMS = 300
)

type Config struct {
Expand All @@ -79,6 +81,8 @@ type Config struct {
TTL uint32 // time to live
Tries uint16 // delay tries
BatchSize int

RequestInterval time.Duration
}

type KeyParts struct {
Expand Down Expand Up @@ -197,6 +201,7 @@ const (
apolloTriesKey = "tries"
apolloBatchSizeKey = "batchsize"
apolloBatchTimeoutMsKey = "batchtimeoutms"
apolloRequestIntervalMS = "interval"
)

type ApolloConfig struct {
Expand Down Expand Up @@ -327,21 +332,32 @@ func (m *ApolloConfig) GetConfig(ctx context.Context, topic string, mqType MQTyp
if err != nil {
batchTimeoutMsVal = defaultBatchTimeoutMs
}
slog.Infof(ctx, "%s got config batchTimeout:%d", fun, ttl)
slog.Infof(ctx, "%s got config batchTimeout:%d", fun, batchTimeoutMsVal)

requestIntervalMs, ok := m.getConfigItemWithFallback(ctx, topic, apolloRequestIntervalMS, mqType)
if !ok {
slog.Infof(ctx, "%s no requestSleep config founds", fun)
}
requestIntervalMsVal, err := strconv.ParseUint(requestIntervalMs, 10, 32)
if err != nil {
requestIntervalMsVal = defaultRequestIntervalMS
}
slog.Infof(ctx, "%s got config requestSleepMs:%d", fun, requestIntervalMsVal)

return &Config{
MQType: mqType,
MQAddr: brokers,
Topic: topic,
TimeOut: defaultTimeout,
CommitInterval: 1 * time.Second,
BatchTimeout: time.Duration(batchTimeoutMsVal) * time.Millisecond,
Offset: FirstOffset,
OffsetAt: offsetAtVal,
TTR: uint32(ttr),
TTL: uint32(ttl),
Tries: uint16(tries),
BatchSize: batchSize,
MQType: mqType,
MQAddr: brokers,
Topic: topic,
TimeOut: defaultTimeout,
CommitInterval: 1 * time.Second,
BatchTimeout: time.Duration(batchTimeoutMsVal) * time.Millisecond,
Offset: FirstOffset,
OffsetAt: offsetAtVal,
TTR: uint32(ttr),
TTL: uint32(ttl),
Tries: uint16(tries),
BatchSize: batchSize,
RequestInterval: time.Duration(requestIntervalMsVal) * time.Millisecond,
}, nil
}

Expand Down
33 changes: 18 additions & 15 deletions mq/delay.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,19 @@ import (
"context"
"encoding/json"
"fmt"
"net/http"
"strings"
"time"

"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/log"
"github.com/shawnfeng/sutil/snetutil"
"gitlab.pri.ibanyu.com/middleware/delayqueue/model"
"gitlab.pri.ibanyu.com/middleware/delayqueue/processor"
"net/http"
"strings"
"time"
)

const (
defaultToken = "01E0SSK0DJ9XX4PDFJCD3DN7WX"
defaultRequestSleep = 300 * time.Millisecond
defaultToken = "01E0SSK0DJ9XX4PDFJCD3DN7WX"
)

type AckHandler interface {
Expand Down Expand Up @@ -51,6 +51,8 @@ type DelayClient struct {
ttlSeconds uint32
tries uint16
ttrSeconds uint32

requestInterval time.Duration
}

// 延迟队列任务
Expand Down Expand Up @@ -88,14 +90,15 @@ type ackRes struct {
Data struct{} `json:"data,omitempty"`
}

func NewDelayClient(endpoint, namespace, queue string, ttlSeconds, ttrSeconds uint32, tries uint16) *DelayClient {
func NewDelayClient(endpoint, namespace, queue string, ttlSeconds, ttrSeconds uint32, tries uint16, requestInterval time.Duration) *DelayClient {
return &DelayClient{
endpoint: endpoint,
namespace: namespace,
queue: queue,
ttlSeconds: ttlSeconds,
ttrSeconds: ttrSeconds,
tries: tries,
endpoint: endpoint,
namespace: namespace,
queue: queue,
ttlSeconds: ttlSeconds,
ttrSeconds: ttrSeconds,
tries: tries,
requestInterval: requestInterval,
}
}

Expand All @@ -109,7 +112,7 @@ func NewDefaultDelayClient(ctx context.Context, topic string) (*DelayClient, err
if err != nil {
return nil, err
}
client := NewDelayClient(Config.MQAddr[0], namespace, queue, Config.TTL, Config.TTR, Config.Tries)
client := NewDelayClient(Config.MQAddr[0], namespace, queue, Config.TTL, Config.TTR, Config.Tries, Config.RequestInterval)
return client, nil
}

Expand Down Expand Up @@ -161,7 +164,7 @@ func (p *DelayClient) Read(ctx context.Context, ttrSeconds uint32) (job *Job, er
}
path := fmt.Sprintf("/base/delayqueue/%s/job/consume", p.namespace)
for {
time.Sleep(defaultRequestSleep)
time.Sleep(p.requestInterval)
err = p.httpInvoke(ctx, path, req, res)
if err != nil {
break
Expand Down Expand Up @@ -242,7 +245,7 @@ func parseTopic(topic string) (namespace, queue string, err error) {
return
}

func init() {
func init() {
setHttpDefaultClient()
}

Expand Down
20 changes: 16 additions & 4 deletions mq/delay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@ import (
"context"
"fmt"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

var delayCli *DelayClient

func init() {
delayCli = NewDelayClient("http://0.0.0.0:7777", "base.test", "test", 100, 50, 1)
delayCli = NewDelayClient("http://0.0.0.0:7777", "base.test", "test", 100, 50, 1, 100*time.Second)
}

func Test_parseTopic(t *testing.T) {
Expand Down Expand Up @@ -45,8 +48,8 @@ func Test_parseTopic(t *testing.T) {
wantErr: true,
},
{
name: "group topic",
args:args{topic: "base.changeboard.event_t1"},
name: "group topic",
args: args{topic: "base.changeboard.event_t1"},
wantNamespace: "base.changeboard",
wantQueue: "event_t1",
wantErr: false,
Expand Down Expand Up @@ -79,7 +82,6 @@ func TestDelayClient_Write(t *testing.T) {

}

// 01E0WTH8TCWT6Q2J2HQ0500000 01E0WTPYNE33SHMHSYW0500000 01E0WTT12VCRFWXXWNS4500000 01E0WVCSVE7TZZHFGRWC000000
func TestDelayClient_Read(t *testing.T) {
ctx := context.Background()
job, err := delayCli.Read(ctx, 5)
Expand All @@ -92,3 +94,13 @@ func TestDelayClient_Read(t *testing.T) {
//return

}

func TestNewDefaultDelayClient(t *testing.T) {
client, err := NewDefaultDelayClient(context.Background(), "palfish.test.test")
assert.NoError(t, err)
assert.Equal(t, int64(100), client.requestInterval.Milliseconds())

client, err = NewDefaultDelayClient(context.Background(), "delay.test.test")
assert.NoError(t, err)
assert.Equal(t, int64(300), client.requestInterval.Milliseconds())
}
Loading

0 comments on commit fb0baed

Please sign in to comment.