Skip to content

Commit

Permalink
Merge pull request #168 from brzyangg/feature-delay-group
Browse files Browse the repository at this point in the history
delay topic by group
  • Loading branch information
niubell authored May 9, 2020
2 parents 61c3965 + 3cc8e59 commit 6e44662
Show file tree
Hide file tree
Showing 6 changed files with 50 additions and 6 deletions.
2 changes: 1 addition & 1 deletion mq/delay.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func NewDefaultDelayClient(ctx context.Context, topic string) (*DelayClient, err
if err != nil {
return nil, err
}
namespace, queue, err := parseTopic(topic)
namespace, queue, err := parseTopic(wrapTopicFromContext(ctx, topic))
if err != nil {
return nil, err
}
Expand Down
7 changes: 7 additions & 0 deletions mq/delay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,13 @@ func Test_parseTopic(t *testing.T) {
wantQueue: "",
wantErr: true,
},
{
name: "group topic",
args:args{topic: "base.changeboard.event_t1"},
wantNamespace: "base.changeboard",
wantQueue: "event_t1",
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down
20 changes: 17 additions & 3 deletions mq/examples/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,28 @@ type Msg struct {
Body string
}

type simpleContextControlRouter struct {
group string
}

func (s simpleContextControlRouter) GetControlRouteGroup() (string, bool) {
return s.group, true
}

func (s simpleContextControlRouter) SetControlRouteGroup(group string) error {
s.group = group
return nil
}

func main() {

_ = trace.InitDefaultTracer("mq.test")
topic := "palfish.test.test"

ctx := context.Background()
sc := &simpleContextControlRouter{group: "t1"}
ctx = context.WithValue(ctx, scontext.ContextKeyHead, "hahahaha")
ctx = context.WithValue(ctx, scontext.ContextKeyControl, "{\"group\": \"g1\"}")
ctx = context.WithValue(ctx, scontext.ContextKeyControl, sc)
span, ctx := opentracing.StartSpanFromContext(ctx, "main")
if span != nil {
defer span.Finish()
Expand Down Expand Up @@ -66,7 +80,7 @@ func main() {
slog.Infof(ctx, "write delay msg, jobID = %s, err = %v", jobID, err)
}()

ctx1 := context.Background()
//ctx1 := context.Background()
/*
//err := mq.SetOffsetAt(ctx1, topic, 1, time.Date(2019, time.December, 4, 0, 0, 0, 0, time.UTC))
//err := mq.SetOffset(ctx1, topic, 1, -2)
Expand All @@ -85,7 +99,7 @@ func main() {
go func() {
for i := 0; i < 10; i ++ {
var msg Msg
ctx, ack, err := mq.FetchDelayMsg(ctx1, topic, &msg)
ctx, ack, err := mq.FetchDelayMsg(ctx, topic, &msg)
slog.Infof(ctx, "1111111111111111out msg: %v, ctx:%v, err:%v", msg, ctx, err)
err = ack.Ack(ctx)
slog.Infof(ctx, "2222222222222222out msg: %v, ctx:%v, err:%v", msg, ctx, err)
Expand Down
2 changes: 1 addition & 1 deletion mq/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ func (m *InstanceManager) applyChange(ctx context.Context, k string, change *cen
}

func (m *InstanceManager) applyChangeEvent(ctx context.Context, ce *center.ChangeEvent) {
slog.Infoln(ctx, "got new change event:%v", ce)
slog.Infof(ctx, "got new change event:%v", ce)

for key, change := range ce.Changes {
// NOTE: 只需关心 MODIFY 与 DELETE 类型改变
Expand Down
23 changes: 23 additions & 0 deletions mq/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"github.com/shawnfeng/sutil/sconf/center"
"github.com/shawnfeng/sutil/scontext"
"github.com/stretchr/testify/assert"
"sync"
"testing"
Expand Down Expand Up @@ -184,3 +185,25 @@ func TestInstanceManager_getDelayClient(t *testing.T) {
})

}

func TestInstanceManager_getDelayClientGroup(t *testing.T) {
ctx := context.TODO()
sc := &simpleContextControlRouter{group: "t1"}
ctx = context.WithValue(context.Background(), scontext.ContextKeyControl, sc)
_ = SetConfiger(ctx, ConfigerTypeApollo)
t.Run("test get delay client", func(t *testing.T) {
m := NewInstanceManager()
conf := &instanceConf{
group: "unknown",
role: RoleTypeDelayClient,
topic: defaultTestTopic,
groupId: "g1",
partition: 0,
}
client := m.getDelayClient(ctx, conf)

assert.Equal(t, "palfish.test", client.namespace)
assert.Equal(t, "test_t1", client.queue)
})

}
2 changes: 1 addition & 1 deletion mq/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ func ReadDelayMsg(ctx context.Context, topic string, value interface{}) (context
}
err = client.Ack(ctx, job.ID)
if err != nil {
slog.Errorf(ctx, "%s, delay Ack err: %v, jobID", fun, err, job.ID)
slog.Errorf(ctx, "%s, delay Ack err: %v, jobID: %s", fun, err, job.ID)
return ctx, err
}
if len(payload.Value) == 0 {
Expand Down

0 comments on commit 6e44662

Please sign in to comment.