diff --git a/cache/redis/redis.go b/cache/redis/redis.go index 30fa0bf..8b363d7 100644 --- a/cache/redis/redis.go +++ b/cache/redis/redis.go @@ -646,6 +646,18 @@ func (m *Client) SMembers(ctx context.Context, key string) *redis.StringSliceCmd return m.client.SMembers(k) } +func (m *Client) SRandMember(ctx context.Context, key string) *redis.StringCmd { + k := m.fixKey(key) + m.logSpan(ctx, "SRandMember", k) + return m.client.SRandMember(k) +} + +func (m *Client) SRandMemberN(ctx context.Context, key string, count int64) *redis.StringSliceCmd { + k := m.fixKey(key) + m.logSpan(ctx, "SRandMemberN", k) + return m.client.SRandMemberN(k, count) +} + func (m *Client) Close(ctx context.Context) error { return m.client.Close() } diff --git a/cache/redisext/redisext.go b/cache/redisext/redisext.go index a55d1da..868dc0b 100644 --- a/cache/redisext/redisext.go +++ b/cache/redisext/redisext.go @@ -1113,6 +1113,38 @@ func (m *RedisExt) SMembers(ctx context.Context, key string) (s []string, err er return } +func (m *RedisExt) SRandMember(ctx context.Context, key string) (s string, err error) { + command := "redisext.SRandMember" + span, ctx := opentracing.StartSpanFromContext(ctx, command) + st := stime.NewTimeStat() + defer func() { + span.Finish() + statReqDuration(m.namespace, command, st.Millisecond()) + }() + client, err := m.getRedisInstance(ctx) + if err == nil { + s, err = client.SRandMember(ctx, m.prefixKey(key)).Result() + } + statReqErr(m.namespace, command, err) + return +} + +func (m *RedisExt) SRandMemberN(ctx context.Context, key string, count int64) (s []string, err error) { + command := "redisext.SRandMemberN" + span, ctx := opentracing.StartSpanFromContext(ctx, command) + st := stime.NewTimeStat() + defer func() { + span.Finish() + statReqDuration(m.namespace, command, st.Millisecond()) + }() + client, err := m.getRedisInstance(ctx) + if err == nil { + s, err = client.SRandMemberN(ctx, m.prefixKey(key), count).Result() + } + statReqErr(m.namespace, command, err) + return +} + func (m *RedisExt) ZScan(ctx context.Context, key string, cursor uint64, match string, count int64) (keys []string, rcursor uint64, err error) { command := "redisext.ZScan" span, ctx := opentracing.StartSpanFromContext(ctx, command) diff --git a/cache/redisext/redisext_test.go b/cache/redisext/redisext_test.go index 6f94957..6237d04 100644 --- a/cache/redisext/redisext_test.go +++ b/cache/redisext/redisext_test.go @@ -447,3 +447,52 @@ func TestRedisExt_SMembers(t *testing.T) { assert.NoError(t, err) assert.True(t, b) } + +func TestRedisExt_SRandMembers(t *testing.T) { + ctx := context.Background() + re := NewRedisExt("base/report", "test") + key := "smemberstest" + members := []string{"m1", "m2", "m3"} + + i, err := re.SAdd(ctx, key, members) + assert.NoError(t, err) + assert.Equal(t, int64(3), i) + + s, err := re.SRandMember(ctx, key) + assert.NoError(t, err) + assert.Equal(t, true, isContain(members, s)) + + b, err := re.Expire(ctx, key, 5 * time.Second) + assert.NoError(t, err) + assert.True(t, b) +} + +func TestRedisExt_SRandMembersN(t *testing.T) { + ctx := context.Background() + re := NewRedisExt("base/report", "test") + key := "smemberstest" + members := []string{"m1", "m2", "m3"} + + i, err := re.SAdd(ctx, key, members) + assert.NoError(t, err) + assert.Equal(t, int64(3), i) + + arr, err := re.SRandMemberN(ctx, key, 2) + assert.NoError(t, err) + assert.Equal(t, 2, len(arr)) + assert.Equal(t, true, isContain(members, arr[0])) + assert.Equal(t, true, isContain(members, arr[1])) + + b, err := re.Expire(ctx, key, 5 * time.Second) + assert.NoError(t, err) + assert.True(t, b) +} + +func isContain(items []string, item string) bool { + for _, eachItem := range items { + if eachItem == item { + return true + } + } + return false +} diff --git a/go.mod b/go.mod index 57b22dd..2c9bc6d 100644 --- a/go.mod +++ b/go.mod @@ -20,7 +20,7 @@ require ( github.com/prometheus/client_golang v1.2.1 github.com/prometheus/common v0.7.0 github.com/sdming/gosnow v0.0.0-20130403030620-3a05c415e886 - github.com/segmentio/kafka-go v0.3.4 + github.com/segmentio/kafka-go v0.3.7 github.com/shawnfeng/lumberjack.v2 v0.0.0-20181226094728-63d76296ede8 github.com/stretchr/testify v1.4.0 github.com/uber/jaeger-client-go v2.20.1+incompatible diff --git a/go.sum b/go.sum index 2348a52..475fb3c 100644 --- a/go.sum +++ b/go.sum @@ -45,6 +45,7 @@ github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWX github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI= github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg= github.com/VividCortex/gohistogram v1.0.0/go.mod h1:Pf5mBqqDxYaXu3hDrrU+w6nw50o/4+TcAqDqk/vUH7g= +github.com/ZhengHe-MD/agollo v0.0.0-20190709010021-166900f9bd56 h1:y5QtREe6LiwDv8oGRNp0pag0bfcQocX1JNr4DxDAOZA= github.com/ZhengHe-MD/agollo v0.0.0-20190709010021-166900f9bd56/go.mod h1:Hs1VYbuOoVlDPOMlZawvwpN7eAqDOQUFG3Lxcne0SBo= github.com/ZhengHe-MD/agollo/v4 v4.1.3/go.mod h1:Cze1dOFujMfuqY5mZWzz1vvJ0ZdpDQ77PC5m2J/qv+4= github.com/ZhengHe-MD/agollo/v4 v4.1.4 h1:59qfwJXnb+MRbzXhn1rsPArEsxhddO8o5DMfXbhAFW4= @@ -159,6 +160,7 @@ github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7V github.com/kaneshin/go-pkg v0.0.0-20150919125626-a8e1479186cf h1:kBpksZbhb1xNFDTPbxh5H5G0o0MJZKrH004ELvq/zWI= github.com/kaneshin/go-pkg v0.0.0-20150919125626-a8e1479186cf/go.mod h1:XI2QuPvM6xUXWcpwlqaMB+SxxV7XNr3aO/DrUK8lRZo= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/klauspost/compress v1.9.8/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= @@ -229,6 +231,8 @@ github.com/sdming/gosnow v0.0.0-20130403030620-3a05c415e886/go.mod h1:xucuMeiX1T github.com/segmentio/kafka-go v0.2.4/go.mod h1:MyX8oKJCSypBXY66FgANfFbqN8aFXAGoLlnR3eKCzoU= github.com/segmentio/kafka-go v0.3.4 h1:Mv9AcnCgU14/cU6Vd0wuRdG1FBO0HzXQLnjBduDLy70= github.com/segmentio/kafka-go v0.3.4/go.mod h1:OT5KXBPbaJJTcvokhWR2KFmm0niEx3mnccTwjmLvSi4= +github.com/segmentio/kafka-go v0.3.7 h1:UCFPJw6KoVkmrilA2LbWVuybJojHzj6gDDFdV7H7IBs= +github.com/segmentio/kafka-go v0.3.7/go.mod h1:8rEphJEczp+yDE/R5vwmaqZgF1wllrl4ioQcNKB8wVA= github.com/serialx/hashring v0.0.0-20160507062712-75d57fa264ad/go.mod h1:/yeG0My1xr/u+HZrFQ1tOQQQQrOawfyMUH13ai5brBc= github.com/shawnfeng/consistent v1.0.2/go.mod h1:TpBWBTkEGx/yt72C1fvM9zfHfKAKXfeMEIxrmdI34xg= github.com/shawnfeng/consistent v1.0.3 h1:ilrK9gACa6S14E7otsWkgIF+2VPz9oATOWaUnT5OxHE= diff --git a/mq/config.go b/mq/config.go index e1c9246..5952768 100644 --- a/mq/config.go +++ b/mq/config.go @@ -8,13 +8,14 @@ import ( "context" "errors" "fmt" - "github.com/shawnfeng/sutil/sconf/center" - "github.com/shawnfeng/sutil/scontext" - "github.com/shawnfeng/sutil/slog/slog" "strconv" "strings" "sync" "time" + + "github.com/shawnfeng/sutil/sconf/center" + "github.com/shawnfeng/sutil/scontext" + "github.com/shawnfeng/sutil/slog/slog" ) type MQType int @@ -59,10 +60,11 @@ func (c ConfigerType) String() string { const ( defaultTimeout = 3 * time.Second //默认1000毫秒 - defaultBatchTimeoutMs = 1000 - defaultTTR = 3600 // 1 hour - defaultTTL = 3600 * 24 // 1 day - defaultTries = 1 + defaultBatchTimeoutMs = 1000 + defaultTTR = 3600 // 1 hour + defaultTTL = 3600 * 24 // 1 day + defaultTries = 1 + defaultRequestIntervalMS = 300 ) type Config struct { @@ -79,6 +81,8 @@ type Config struct { TTL uint32 // time to live Tries uint16 // delay tries BatchSize int + + RequestInterval time.Duration } type KeyParts struct { @@ -197,6 +201,7 @@ const ( apolloTriesKey = "tries" apolloBatchSizeKey = "batchsize" apolloBatchTimeoutMsKey = "batchtimeoutms" + apolloRequestIntervalMS = "interval" ) type ApolloConfig struct { @@ -327,21 +332,32 @@ func (m *ApolloConfig) GetConfig(ctx context.Context, topic string, mqType MQTyp if err != nil { batchTimeoutMsVal = defaultBatchTimeoutMs } - slog.Infof(ctx, "%s got config batchTimeout:%d", fun, ttl) + slog.Infof(ctx, "%s got config batchTimeout:%d", fun, batchTimeoutMsVal) + + requestIntervalMs, ok := m.getConfigItemWithFallback(ctx, topic, apolloRequestIntervalMS, mqType) + if !ok { + slog.Infof(ctx, "%s no requestSleep config founds", fun) + } + requestIntervalMsVal, err := strconv.ParseUint(requestIntervalMs, 10, 32) + if err != nil { + requestIntervalMsVal = defaultRequestIntervalMS + } + slog.Infof(ctx, "%s got config requestSleepMs:%d", fun, requestIntervalMsVal) return &Config{ - MQType: mqType, - MQAddr: brokers, - Topic: topic, - TimeOut: defaultTimeout, - CommitInterval: 1 * time.Second, - BatchTimeout: time.Duration(batchTimeoutMsVal) * time.Millisecond, - Offset: FirstOffset, - OffsetAt: offsetAtVal, - TTR: uint32(ttr), - TTL: uint32(ttl), - Tries: uint16(tries), - BatchSize: batchSize, + MQType: mqType, + MQAddr: brokers, + Topic: topic, + TimeOut: defaultTimeout, + CommitInterval: 1 * time.Second, + BatchTimeout: time.Duration(batchTimeoutMsVal) * time.Millisecond, + Offset: FirstOffset, + OffsetAt: offsetAtVal, + TTR: uint32(ttr), + TTL: uint32(ttl), + Tries: uint16(tries), + BatchSize: batchSize, + RequestInterval: time.Duration(requestIntervalMsVal) * time.Millisecond, }, nil } diff --git a/mq/delay.go b/mq/delay.go index ec1b42f..d348e2f 100644 --- a/mq/delay.go +++ b/mq/delay.go @@ -4,19 +4,19 @@ import ( "context" "encoding/json" "fmt" + "net/http" + "strings" + "time" + "github.com/opentracing/opentracing-go" "github.com/opentracing/opentracing-go/log" "github.com/shawnfeng/sutil/snetutil" "gitlab.pri.ibanyu.com/middleware/delayqueue/model" "gitlab.pri.ibanyu.com/middleware/delayqueue/processor" - "net/http" - "strings" - "time" ) const ( - defaultToken = "01E0SSK0DJ9XX4PDFJCD3DN7WX" - defaultRequestSleep = 300 * time.Millisecond + defaultToken = "01E0SSK0DJ9XX4PDFJCD3DN7WX" ) type AckHandler interface { @@ -51,6 +51,8 @@ type DelayClient struct { ttlSeconds uint32 tries uint16 ttrSeconds uint32 + + requestInterval time.Duration } // 延迟队列任务 @@ -88,14 +90,15 @@ type ackRes struct { Data struct{} `json:"data,omitempty"` } -func NewDelayClient(endpoint, namespace, queue string, ttlSeconds, ttrSeconds uint32, tries uint16) *DelayClient { +func NewDelayClient(endpoint, namespace, queue string, ttlSeconds, ttrSeconds uint32, tries uint16, requestInterval time.Duration) *DelayClient { return &DelayClient{ - endpoint: endpoint, - namespace: namespace, - queue: queue, - ttlSeconds: ttlSeconds, - ttrSeconds: ttrSeconds, - tries: tries, + endpoint: endpoint, + namespace: namespace, + queue: queue, + ttlSeconds: ttlSeconds, + ttrSeconds: ttrSeconds, + tries: tries, + requestInterval: requestInterval, } } @@ -109,7 +112,7 @@ func NewDefaultDelayClient(ctx context.Context, topic string) (*DelayClient, err if err != nil { return nil, err } - client := NewDelayClient(Config.MQAddr[0], namespace, queue, Config.TTL, Config.TTR, Config.Tries) + client := NewDelayClient(Config.MQAddr[0], namespace, queue, Config.TTL, Config.TTR, Config.Tries, Config.RequestInterval) return client, nil } @@ -161,7 +164,7 @@ func (p *DelayClient) Read(ctx context.Context, ttrSeconds uint32) (job *Job, er } path := fmt.Sprintf("/base/delayqueue/%s/job/consume", p.namespace) for { - time.Sleep(defaultRequestSleep) + time.Sleep(p.requestInterval) err = p.httpInvoke(ctx, path, req, res) if err != nil { break @@ -242,7 +245,7 @@ func parseTopic(topic string) (namespace, queue string, err error) { return } -func init() { +func init() { setHttpDefaultClient() } diff --git a/mq/delay_test.go b/mq/delay_test.go index 93fd8d1..ef2a6d9 100644 --- a/mq/delay_test.go +++ b/mq/delay_test.go @@ -4,12 +4,15 @@ import ( "context" "fmt" "testing" + "time" + + "github.com/stretchr/testify/assert" ) var delayCli *DelayClient func init() { - delayCli = NewDelayClient("http://0.0.0.0:7777", "base.test", "test", 100, 50, 1) + delayCli = NewDelayClient("http://0.0.0.0:7777", "base.test", "test", 100, 50, 1, 100*time.Second) } func Test_parseTopic(t *testing.T) { @@ -45,8 +48,8 @@ func Test_parseTopic(t *testing.T) { wantErr: true, }, { - name: "group topic", - args:args{topic: "base.changeboard.event_t1"}, + name: "group topic", + args: args{topic: "base.changeboard.event_t1"}, wantNamespace: "base.changeboard", wantQueue: "event_t1", wantErr: false, @@ -79,7 +82,6 @@ func TestDelayClient_Write(t *testing.T) { } -// 01E0WTH8TCWT6Q2J2HQ0500000 01E0WTPYNE33SHMHSYW0500000 01E0WTT12VCRFWXXWNS4500000 01E0WVCSVE7TZZHFGRWC000000 func TestDelayClient_Read(t *testing.T) { ctx := context.Background() job, err := delayCli.Read(ctx, 5) @@ -92,3 +94,13 @@ func TestDelayClient_Read(t *testing.T) { //return } + +func TestNewDefaultDelayClient(t *testing.T) { + client, err := NewDefaultDelayClient(context.Background(), "palfish.test.test") + assert.NoError(t, err) + assert.Equal(t, int64(100), client.requestInterval.Milliseconds()) + + client, err = NewDefaultDelayClient(context.Background(), "delay.test.test") + assert.NoError(t, err) + assert.Equal(t, int64(300), client.requestInterval.Milliseconds()) +} diff --git a/mq/examples/main.go b/mq/examples/main.go index 2fad928..d847bc4 100644 --- a/mq/examples/main.go +++ b/mq/examples/main.go @@ -6,11 +6,12 @@ package main import ( "context" + "gitlab.pri.ibanyu.com/middleware/seaweed/xlog" + //"fmt" "github.com/opentracing/opentracing-go" "github.com/shawnfeng/sutil/mq" "github.com/shawnfeng/sutil/scontext" - "github.com/shawnfeng/sutil/slog/slog" "github.com/shawnfeng/sutil/trace" "time" ) @@ -50,34 +51,34 @@ func main() { //_ = mq.SetConfiger(ctx, mq.ConfigerTypeApollo) //mq.WatchUpdate(ctx) - /* - go func() { - var msgs []mq.Message - for i := 0; i < 3; i++ { - value := &Msg{ - Id: 1, - Body: fmt.Sprintf("%d", i), - } - - msgs = append(msgs, mq.Message{ - Key: value.Body, - Value: value, - }) - err := mq.WriteMsg(ctx, topic, value.Body, value) - slog.Infof(ctx, "in msg: %v, err:%v", value, err) - } - err := mq.WriteMsgs(ctx, topic, msgs...) - slog.Infof(ctx, "in msgs: %v, err:%v", msgs, err) - }() - */ + + //go func() { + // var msgs []mq.Message + // for i := 0; i < 3; i++ { + // value := &Msg{ + // Id: 1, + // Body: fmt.Sprintf("%d", i), + // } + // + // msgs = append(msgs, mq.Message{ + // Key: value.Body, + // Value: value, + // }) + // err := mq.WriteMsg(ctx, topic, value.Body, value) + // slog.Infof(ctx, "in msg: %v, err:%v", value, err) + // } + // err := mq.WriteMsgs(ctx, topic, msgs...) + // slog.Infof(ctx, "in msgs: %v, err:%v", msgs, err) + //}() + go func() { msg := &Msg{ - Id: 1, - Body: "test", + Id: 2, + Body: "test2", } jobID, err := mq.WriteDelayMsg(ctx, topic, msg, 5) - slog.Infof(ctx, "write delay msg, jobID = %s, err = %v", jobID, err) + xlog.Infof(ctx, "write delay msg, jobID = %s, err = %v", jobID, err) }() //ctx1 := context.Background() @@ -91,7 +92,7 @@ func main() { //go func() { // for i := 0; i < 10000; i++ { // var msg Msg - // ctx, err := mq.ReadMsgByGroup(ctx1, topic, "group3", &msg) + // ctx, err := mq.ReadMsgByGroup(ctx, topic, "group3", &msg) // slog.Infof(ctx, "1111111111111111out msg: %v, ctx:%v, err:%v", msg, ctx, err) // } //}() @@ -100,9 +101,9 @@ func main() { for i := 0; i < 10; i ++ { var msg Msg ctx, ack, err := mq.FetchDelayMsg(ctx, topic, &msg) - slog.Infof(ctx, "1111111111111111out msg: %v, ctx:%v, err:%v", msg, ctx, err) + xlog.Infof(ctx, "1111111111111111out msg: %v, ctx:%v, err:%v", msg, ctx, err) err = ack.Ack(ctx) - slog.Infof(ctx, "2222222222222222out msg: %v, ctx:%v, err:%v", msg, ctx, err) + xlog.Infof(ctx, "2222222222222222out msg: %v, ctx:%v, err:%v", msg, ctx, err) time.Sleep(1 * time.Second) } }() diff --git a/mq/global.go b/mq/global.go index 692457d..45db156 100644 --- a/mq/global.go +++ b/mq/global.go @@ -53,13 +53,11 @@ func WriteMsg(ctx context.Context, topic string, key string, value interface{}) } writer := defaultInstanceManager.getWriter(ctx, conf) if writer == nil { - slog.Errorf(ctx, "%s getWriter err, topic: %s", fun, topic) return fmt.Errorf("%s, getWriter err, topic: %s", fun, topic) } payload, err := generatePayload(ctx, value) if err != nil { - slog.Errorf(ctx, "%s generatePayload err, topic: %s", fun, topic) return fmt.Errorf("%s, generatePayload err, topic: %s", fun, topic) } @@ -90,13 +88,11 @@ func WriteMsgs(ctx context.Context, topic string, msgs ...Message) error { } writer := defaultInstanceManager.getWriter(ctx, conf) if writer == nil { - slog.Errorf(ctx, "%s getWriter err, topic: %s", fun, topic) return fmt.Errorf("%s, getWriter err, topic: %s", fun, topic) } nmsgs, err := generateMsgsPayload(ctx, msgs...) if err != nil { - slog.Errorf(ctx, "%s generateMsgsPayload err, topic: %s", fun, topic) return fmt.Errorf("%s, generateMsgsPayload err, topic: %s", fun, topic) } @@ -129,7 +125,6 @@ func ReadMsgByGroup(ctx context.Context, topic, groupId string, value interface{ } reader := defaultInstanceManager.getReader(ctx, conf) if reader == nil { - slog.Errorf(ctx, "%s getReader err, topic: %s", fun, topic) return ctx, fmt.Errorf("%s, getReader err, topic: %s", fun, topic) } @@ -144,7 +139,6 @@ func ReadMsgByGroup(ctx context.Context, topic, groupId string, value interface{ } if err != nil { - slog.Errorf(ctx, "%s ReadMsg err: %v, topic: %s", fun, err, topic) return ctx, fmt.Errorf("%s, ReadMsg err: %v, topic: %s", fun, err, topic) } @@ -180,7 +174,6 @@ func ReadMsgByPartition(ctx context.Context, topic string, partition int, value } reader := defaultInstanceManager.getReader(ctx, conf) if reader == nil { - slog.Errorf(ctx, "%s getReader err, topic: %s", fun, topic) return ctx, fmt.Errorf("%s, getReader err, topic: %s", fun, topic) } @@ -195,7 +188,6 @@ func ReadMsgByPartition(ctx context.Context, topic string, partition int, value } if err != nil { - slog.Errorf(ctx, "%s ReadMsg err: %v, topic: %s", fun, err, topic) return ctx, fmt.Errorf("%s, ReadMsg err: %v, topic: %s", fun, err, topic) } @@ -231,7 +223,6 @@ func FetchMsgByGroup(ctx context.Context, topic, groupId string, value interface } reader := defaultInstanceManager.getReader(ctx, conf) if reader == nil { - slog.Errorf(ctx, "%s getReader err, topic: %s", fun, topic) return ctx, nil, fmt.Errorf("%s, getReader err, topic: %s", fun, topic) } @@ -246,7 +237,6 @@ func FetchMsgByGroup(ctx context.Context, topic, groupId string, value interface } if err != nil { - slog.Errorf(ctx, "%s ReadMsg err: %v, topic: %s", fun, err, topic) return ctx, nil, fmt.Errorf("%s, ReadMsg err: %v, topic: %s", fun, err, topic) } @@ -281,14 +271,12 @@ func WriteDelayMsg(ctx context.Context, topic string, value interface{}, delaySe } client := defaultInstanceManager.getDelayClient(ctx, conf) if client == nil { - slog.Errorf(ctx, "%s getDelayClient nil, topic: %s", fun, topic) err = fmt.Errorf("%s, getDelayClient nil, topic: %s", fun, topic) return } payload, err := generatePayload(ctx, value) if err != nil { - slog.Errorf(ctx, "%s generatePayload err, topic: %s", fun, topic) err = fmt.Errorf("%s, generatePayload err, topic: %s", fun, topic) return } @@ -322,7 +310,6 @@ func FetchDelayMsg(ctx context.Context, topic string, value interface{}) (contex } client := defaultInstanceManager.getDelayClient(ctx, conf) if client == nil { - slog.Errorf(ctx, "%s getDelayClient nil, topic: %s", fun, topic) err := fmt.Errorf("%s, getDelayClient nil, topic: %s", fun, topic) return ctx, nil, err } @@ -332,12 +319,10 @@ func FetchDelayMsg(ctx context.Context, topic string, value interface{}) (contex job, err := client.Read(ctx, client.ttrSeconds) if err != nil { - slog.Errorf(ctx, "%s Read err: %v, topic: %s", fun, err, topic) return ctx, nil, fmt.Errorf("%s, Read err: %v, topic: %s", fun, err, topic) } err = json.Unmarshal(job.Body, &payload) if err != nil { - slog.Errorf(ctx, "%s, Unmarshal payload err: %v, topic: %s", fun, err, topic) return ctx, nil, fmt.Errorf("%s, Unmarshal payload err: %v, topic: %s", fun, err, topic) } err = json.Unmarshal(job.Body, &value) @@ -383,7 +368,6 @@ func ReadDelayMsg(ctx context.Context, topic string, value interface{}) (context } client := defaultInstanceManager.getDelayClient(ctx, conf) if client == nil { - slog.Errorf(ctx, "%s getDelayClient nil, topic: %s", fun, topic) err := fmt.Errorf("%s, getDelayClient nil, topic: %s", fun, topic) return ctx, err } @@ -393,12 +377,10 @@ func ReadDelayMsg(ctx context.Context, topic string, value interface{}) (context job, err := client.Read(ctx, client.ttrSeconds) if err != nil { - slog.Errorf(ctx, "%s Read err: %v, topic: %s", fun, err, topic) return ctx, fmt.Errorf("%s, Read err: %v, topic: %s", fun, err, topic) } err = json.Unmarshal(job.Body, &payload) if err != nil { - slog.Errorf(ctx, "%s, Unmarshal payload err: %v, topic: %s", fun, err, topic) return ctx, fmt.Errorf("%s, Unmarshal payload err: %v, topic: %s", fun, err, topic) } err = json.Unmarshal(job.Body, &value) diff --git a/mq/kafka.go b/mq/kafka.go index 2e917dd..5a33737 100644 --- a/mq/kafka.go +++ b/mq/kafka.go @@ -11,6 +11,7 @@ import ( "github.com/opentracing/opentracing-go" "github.com/opentracing/opentracing-go/log" kafka "github.com/segmentio/kafka-go" + "github.com/shawnfeng/sutil/slog" "strings" "time" ) @@ -50,6 +51,8 @@ func NewKafkaReader(brokers []string, topic, groupId string, partition, minBytes CommitInterval: commitInterval, StartOffset: kafka.LastOffset, //MaxWait: 30 * time.Second, + Logger: slog.GetInfoLogger(), + ErrorLogger: slog.GetLogger(), }) return &KafkaReader{ @@ -141,6 +144,8 @@ func NewKafkaWriter(brokers []string, topic string) *KafkaWriter { BatchSize: defaultBatchSize, //RequiredAcks: 1, //Async: true, + Logger: slog.GetInfoLogger(), + ErrorLogger: slog.GetLogger(), } // TODO should optimize this, too dumb, double get, reset batchsize config, _ := DefaultConfiger.GetConfig(context.TODO(), topic, MQTypeKafka) diff --git a/slog/slog.go b/slog/slog.go index 906b333..0b8a23b 100644 --- a/slog/slog.go +++ b/slog/slog.go @@ -123,3 +123,14 @@ func GetLogger() *Logger { func (m *Logger) Printf(format string, items ...interface{}) { Errorf(format, items...) } + +type InfoLogger struct { +} + +func GetInfoLogger() *InfoLogger { + return &InfoLogger{} +} + +func (m *InfoLogger) Printf(format string, items ...interface{}) { + Infof(format, items...) +}