Skip to content

Commit

Permalink
support custom logger
Browse files Browse the repository at this point in the history
  • Loading branch information
HDT3213 committed Oct 4, 2024
1 parent 705b972 commit fa14a01
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 6 deletions.
31 changes: 29 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ func producer() {

## Options

### Consume Function

```go
func (q *DelayQueue)WithCallback(callback CallbackFunc) *DelayQueue
```
Expand All @@ -113,25 +115,38 @@ queue.WithCallback(func(payload string) bool {
})
```

### Logger

```go
func (q *DelayQueue)WithLogger(logger *log.Logger) *DelayQueue
func (q *DelayQueue)WithLogger(logger Logger) *DelayQueue
```

WithLogger customizes logger for queue
WithLogger customizes logger for queue. Logger should implemented the following interface:

```go
type Logger interface {
Printf(format string, v ...interface{})
}
```

### Concurrent

```go
func (q *DelayQueue)WithConcurrent(c uint) *DelayQueue
```

WithConcurrent sets the number of concurrent consumers

### Polling Interval

```go
func (q *DelayQueue)WithFetchInterval(d time.Duration) *DelayQueue
```

WithFetchInterval customizes the interval at which consumer fetch message from redis

### Timeout

```go
func (q *DelayQueue)WithMaxConsumeDuration(d time.Duration) *DelayQueue
```
Expand All @@ -141,12 +156,16 @@ WithMaxConsumeDuration customizes max consume duration
If no acknowledge received within WithMaxConsumeDuration after message delivery, DelayQueue will try to deliver this
message again

### Max Processing Limit

```go
func (q *DelayQueue)WithFetchLimit(limit uint) *DelayQueue
```

WithFetchLimit limits the max number of unack (processing) messages

### Hash Tag

```go
UseHashTagKey()
```
Expand All @@ -159,6 +178,8 @@ WARNING! CHANGING(add or remove) this option will cause DelayQueue failing to re

> see more: https://redis.io/docs/reference/cluster-spec/#hash-tags

### Default Retry Count

```go
WithDefaultRetryCount(count uint) *DelayQueue
```
Expand All @@ -167,6 +188,12 @@ WithDefaultRetryCount customizes the max number of retry, it effects of messages

use WithRetryCount during DelayQueue.SendScheduleMsg or DelayQueue.SendDelayMsg to specific retry count of particular message

```go
queue.SendDelayMsg(msg, time.Hour, delayqueue.WithRetryCount(3))
```

### Script Preload

```go
(q *DelayQueue) WithScriptPreload(flag bool) *DelayQueue
```
Expand Down
31 changes: 29 additions & 2 deletions README_CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ func producer() {

## 选项

### 回调函数

```go
func (q *DelayQueue)WithCallback(callback CallbackFunc) *DelayQueue
```
Expand All @@ -107,36 +109,53 @@ queue.WithCallback(func(payload string) bool {
})
```

### 日志

```go
func (q *DelayQueue)WithLogger(logger *log.Logger) *DelayQueue
```

为 DelayQueue 设置 logger
为 DelayQueue 设置 logger, logger 需要实现下面的接口:

```go
type Logger interface {
Printf(format string, v ...interface{})
}
```

### 并发数

```go
func (q *DelayQueue)WithConcurrent(c uint) *DelayQueue
```

设置消费者并发数

### 轮询间隔

```go
func (q *DelayQueue)WithFetchInterval(d time.Duration) *DelayQueue
```

设置消费者从 Redis 拉取消息的时间间隔

### 消费超时

```go
func (q *DelayQueue)WithMaxConsumeDuration(d time.Duration) *DelayQueue
```

设置最长消费时间。若拉取消息后超出 MaxConsumeDuration 时限仍未返回 ACK 则认为消费失败,DelayQueue 会重新投递此消息。

### 最大处理中消息数

```go
func (q *DelayQueue)WithFetchLimit(limit uint) *DelayQueue
```

FetchLimit 限制消费者从 Redis 中拉取的消息数目,即单个消费者正在处理中的消息数不会超过 FetchLimit
单个消费者正在处理中的消息数不会超过 FetchLimit

### 启用 HashTag

```go
UseHashTagKey()
Expand All @@ -150,6 +169,8 @@ UseHashTagKey() 会在 Redis Key 上添加 hash tag 确保同一个队列的所

see more: https://redis.io/docs/reference/cluster-spec/#hash-tags

### 设置默认重试次数

```go
WithDefaultRetryCount(count uint)
```
Expand All @@ -158,6 +179,12 @@ WithDefaultRetryCount(count uint)

在调用 DelayQueue.SendScheduleMsg or DelayQueue.SendDelayMsg 发送消息时,可以调用 WithRetryCount 为这条消息单独指定重试次数。

```go
queue.SendDelayMsg(msg, time.Hour, delayqueue.WithRetryCount(3))
```

### 预加载脚本

```go
(q *DelayQueue) WithScriptPreload(flag bool) *DelayQueue
```
Expand Down
9 changes: 7 additions & 2 deletions delayqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type DelayQueue struct {
garbageKey string // set: message id
useHashTag bool
ticker *time.Ticker
logger *log.Logger
logger Logger
close chan struct{}
running int32
maxConsumeDuration time.Duration // default 5 seconds
Expand Down Expand Up @@ -82,6 +82,11 @@ type RedisCli interface {
EvalSha(sha1 string, keys []string, args []interface{}) (interface{}, error)
}

// Logger is an abstraction of logging system
type Logger interface {
Printf(format string, v ...interface{})
}

type hashTagKeyOpt int

// CallbackFunc receives and consumes messages
Expand Down Expand Up @@ -153,7 +158,7 @@ func (q *DelayQueue) WithCallback(callback CallbackFunc) *DelayQueue {
}

// WithLogger customizes logger for queue
func (q *DelayQueue) WithLogger(logger *log.Logger) *DelayQueue {
func (q *DelayQueue) WithLogger(logger Logger) *DelayQueue {
q.logger = logger
return q
}
Expand Down

0 comments on commit fa14a01

Please sign in to comment.