-
Notifications
You must be signed in to change notification settings - Fork 0
/
config.go
72 lines (64 loc) · 2.19 KB
/
config.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
package rmq
import (
"github.com/apache/rocketmq-client-go/v2/consumer"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/apache/rocketmq-client-go/v2/producer"
"github.com/go-pay/limiter"
"github.com/go-pay/xlog"
)
const (
LogDebug LogLevel = "debug"
LogWarn LogLevel = "warn"
LogError LogLevel = "error"
LogInfo LogLevel = "info"
)
type LogLevel string
type RocketMQConfig struct {
// 阿里云 实例ID
Namespace string
// GroupID 阿里云创建
GroupName string
// 设置 TCP 协议接入点,从阿里云 RocketMQ 控制台的实例详情页面获取。
EndPoint string
// 您在阿里云账号管理控制台中创建的 AccessKeyId,用于身份认证。
AccessKey string
// 您在阿里云账号管理控制台中创建的 AccessKeySecret,用于身份认证。
SecretKey string
// log 级别 // default info
LogLevel LogLevel
// 自定义消费者配置
ConsumerOptions []consumer.Option
// 自定义生产者配置
ProducerOptions []producer.Option
// currently consume limiter
Limit *limiter.Config
}
func defaultConsumerOps(conf *RocketMQConfig) (ops []consumer.Option) {
ops = []consumer.Option{
consumer.WithNamespace(conf.Namespace),
consumer.WithGroupName(conf.GroupName),
consumer.WithNameServer(primitive.NamesrvAddr{conf.EndPoint}),
consumer.WithCredentials(primitive.Credentials{AccessKey: conf.AccessKey, SecretKey: conf.SecretKey}),
consumer.WithRetry(3),
}
return ops
}
func defaultProducerOps(conf *RocketMQConfig) (ops []producer.Option) {
ops = []producer.Option{
producer.WithNamespace(conf.Namespace),
producer.WithNameServer(primitive.NamesrvAddr{conf.EndPoint}),
producer.WithCredentials(primitive.Credentials{AccessKey: conf.AccessKey, SecretKey: conf.SecretKey}),
producer.WithRetry(3),
}
// GroupName is not necessary for producer
if conf.GroupName != "" {
ops = append(ops, producer.WithGroupName(conf.GroupName))
}
return ops
}
func (c *Consumer) defaultPanicHandler(e interface{}) {
xlog.Errorf("[%s] rocketmq consumer group [%s] panic: %v", c.namespace, c.groupName, e)
}
func (p *Producer) defaultPanicHandler(e interface{}) {
xlog.Errorf("[%s] rocketmq producer group [%s] panic: %v", p.namespace, p.groupName, e)
}