Skip to content

Commit

Permalink
Merge branch 'tmp-dump' into 'master'
Browse files Browse the repository at this point in the history
feat: support nsq to nsq ordered

See merge request paas/nsqserver!58
  • Loading branch information
absolute8511 committed Apr 24, 2022
2 parents 10b63d6 + 96608a3 commit 9d60245
Show file tree
Hide file tree
Showing 10 changed files with 295 additions and 21 deletions.
222 changes: 222 additions & 0 deletions apps/nsq_to_nsq_ordered/nsq_to_nsq_ordered.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
// This is an NSQ client that reads the specified topic/channel
// and re-publishes the messages to destination nsqd via TCP

package main

import (
"errors"
"flag"
"fmt"
"log"
"os"
"os/signal"
"strconv"
"syscall"
"time"

"github.com/bitly/go-simplejson"
"github.com/spaolacci/murmur3"
"github.com/youzan/go-nsq"
"github.com/youzan/nsq/internal/app"
"github.com/youzan/nsq/internal/protocol"
"github.com/youzan/nsq/internal/version"
)

const (
ModeRoundRobin = iota
ModeHostPool
)

var (
showVersion = flag.Bool("version", false, "print version string")

topic = flag.String("topic", "", "nsq topic")
channel = flag.String("channel", "nsq_to_nsq", "nsq channel")
destTopic = flag.String("destination-topic", "", "destination nsq topic")
maxInFlight = flag.Int("max-in-flight", 200, "max number of messages to allow in flight")
parallNum = flag.Int("parall-num", 100, "number of parallel run")
orderJsonKey = flag.String("order-json-key", "testkey", "json key to ordered sharding")

statusEvery = flag.Int("status-every", 250, "the # of requests between logging status (per destination), 0 disables")

lookupdHTTPAddrs = app.StringArray{}
destLookupdHTTPAddrs = app.StringArray{}

// TODO: remove, deprecated
maxBackoffDuration = flag.Duration("max-backoff-duration", 120*time.Second, "(deprecated) use --consumer-opt=max_backoff_duration,X")
)

func init() {
flag.Var(&destLookupdHTTPAddrs, "destination-lookupd-address", "destination address (may be given multiple times)")
flag.Var(&lookupdHTTPAddrs, "lookupd-http-address", "lookupd HTTP address (may be given multiple times)")
}

type PublishHandler struct {
// 64bit atomic vars need to be first for proper alignment on 32bit platforms
counter uint64

producers *nsq.TopicProducerMgr
mode int

requireJSONValueParsed bool
requireJSONValueIsNumber bool
requireJSONNumber float64
}

func getMsgKeyFromBody(body []byte, parentKey string) ([]byte, error) {
jsonMsg, err := simplejson.NewJson(body)
if err != nil {
log.Printf("ERROR: Unable to decode json: %s", body)
return nil, err
}

jsonV, ok := jsonMsg.CheckGet(parentKey)
if !ok {
log.Printf("ERROR: Unable to get json: %s", body)
return nil, err
}
jsonV, ok = jsonV.CheckGet("key2")
if !ok {
log.Printf("ERROR: Unable to get json: %s", body)
return nil, errors.New("unable to get json key")
}
msgKey, err := jsonV.Bytes()
if err != nil {
log.Printf("WARN: json key value is not string: %s, %v", body, jsonV)
intV, err := jsonV.Int()
if err != nil {
return nil, err
}
return []byte(strconv.Itoa(intV)), nil
}
return msgKey, nil
}

func (ph *PublishHandler) HandleMessage(m *nsq.Message) error {
var err error
msgBody := m.Body
msgKey, err := getMsgKeyFromBody(msgBody, *orderJsonKey)
if err != nil {
return err
}

var ext *nsq.MsgExt
if m.ExtVer > 0 {
var err2 error
ext, err2 = m.GetJsonExt()
if err2 != nil {
log.Printf("failed to get ext, ignore it: %v, header: %s", err2, m.ExtBytes)
}
}
if ext != nil {
_, _, _, err = ph.producers.PublishOrderedWithJsonExt(*destTopic, msgKey, msgBody, ext)
if err != nil {
return err
}
} else {
_, _, _, err = ph.producers.PublishOrdered(*destTopic, msgKey, msgBody)
if err != nil {
return err
}
}
return nil
}

func hasArg(s string) bool {
argExist := false
flag.Visit(func(f *flag.Flag) {
if f.Name == s {
argExist = true
}
})
return argExist
}

func main() {
cCfg := nsq.NewConfig()
pCfg := nsq.NewConfig()

flag.Var(&nsq.ConfigFlag{cCfg}, "consumer-opt", "option to passthrough to nsq.Consumer (may be given multiple times, see http://godoc.org/github.com/youzan/go-nsq#Config)")
flag.Var(&nsq.ConfigFlag{pCfg}, "producer-opt", "option to passthrough to nsq.Producer (may be given multiple times, see http://godoc.org/github.com/youzan/go-nsq#Config)")

flag.Parse()

if *showVersion {
fmt.Printf("nsq_to_nsq v%s\n", version.Binary)
return
}

if *topic == "" || *channel == "" {
log.Fatal("--topic and --channel are required")
}

if *destTopic == "" {
*destTopic = *topic
}

if !protocol.IsValidTopicName(*topic) {
log.Fatal("--topic is invalid")
}

if !protocol.IsValidTopicName(*destTopic) {
log.Fatal("--destination-topic is invalid")
}

if !protocol.IsValidChannelName(*channel) {
log.Fatal("--channel is invalid")
}

if len(lookupdHTTPAddrs) == 0 {
log.Fatal("--lookupd-http-address required")
}

if len(destLookupdHTTPAddrs) == 0 {
destLookupdHTTPAddrs = lookupdHTTPAddrs
}

termChan := make(chan os.Signal, 1)
signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM)

defaultUA := fmt.Sprintf("nsq_to_nsq/%s go-nsq/%s", version.Binary, nsq.VERSION)

cCfg.UserAgent = defaultUA
cCfg.MaxInFlight = *maxInFlight

// TODO: remove, deprecated
if hasArg("max-backoff-duration") {
log.Printf("WARNING: --max-backoff-duration is deprecated in favor of --consumer-opt=max_backoff_duration,X")
cCfg.MaxBackoffDuration = *maxBackoffDuration
}

cCfg.EnableOrdered = true
consumer, err := nsq.NewConsumer(*topic, *channel, cCfg)
if err != nil {
log.Fatal(err)
}

pCfg.UserAgent = defaultUA
pCfg.LookupdSeeds = destLookupdHTTPAddrs
pCfg.ProducerPoolSize = *parallNum
pCfg.EnableOrdered = true
pCfg.Hasher = murmur3.New32()
producerMgr, err := nsq.NewTopicProducerMgr([]string{*destTopic}, pCfg)

handler := &PublishHandler{
producers: producerMgr,
}
consumer.AddConcurrentHandlers(handler, *parallNum)

err = consumer.ConnectToNSQLookupds(lookupdHTTPAddrs)
if err != nil {
log.Fatal(err)
}

for {
select {
case <-consumer.StopChan:
return
case <-termChan:
consumer.Stop()
}
}
}
40 changes: 40 additions & 0 deletions apps/nsq_to_nsq_ordered/nsq_to_nsq_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// This is an NSQ client that reads the specified topic/channel
// and re-publishes the messages to destination nsqd via TCP

package main

import (
"reflect"
"testing"
)

func Test_getMsgKeyFromBody(t *testing.T) {
type args struct {
body []byte
jsonKey string
}
tests := []struct {
name string
args args
want []byte
wantErr bool
}{
// TODO: Add test cases.
{"test1", args{[]byte("{\"key1\":\"test1\"}"), "key1"}, nil, true},
{"test2", args{[]byte("{\"key1\":{\"key2\":\"test12\"}}"), "key1"}, []byte("test12"), false},
{"test3", args{[]byte("{\"key11\":{\"key3\":\"test12\"}}"), "key11"}, nil, true},
{"test4", args{[]byte("{\"key11\":{\"key2\":12}}"), "key11"}, []byte("12"), false},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := getMsgKeyFromBody(tt.args.body, tt.args.jsonKey)
if (err != nil) != tt.wantErr {
t.Errorf("getMsgKeyFromBody() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("getMsgKeyFromBody() = %v, want %v", got, tt.want)
}
})
}
}
2 changes: 1 addition & 1 deletion consistence/coordinator_rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -577,7 +577,7 @@ func (self *NsqdCoordRpcServer) GetTopicStats(topic string) *NodeTopicStats {
// all topic status
topicStats = self.nsqdCoord.localNsqd.GetStats(false, true)
} else {
topicStats = self.nsqdCoord.localNsqd.GetTopicStatsWithFilter(false, topic, true)
topicStats = self.nsqdCoord.localNsqd.GetTopicStatsWithFilter(false, topic, "", true)
}
stat := NewNodeTopicStats(self.nsqdCoord.myNode.GetID(), len(topicStats)*2, runtime.NumCPU())
for _, ts := range topicStats {
Expand Down
5 changes: 1 addition & 4 deletions internal/clusterinfo/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -977,11 +977,8 @@ func (c *ClusterInfo) getNSQDStats(producers Producers, selectedTopic string, so

addr := p.HTTPAddress()
endpoint := fmt.Sprintf("http://%s/stats?format=json&leaderOnly=%t&needClients=%t", addr, leaderOnly, needClient)
if !needClient {
endpoint = fmt.Sprintf("http://%s/stats?format=json&leaderOnly=%t", addr, leaderOnly)
}
if selectedTopic != "" {
endpoint = fmt.Sprintf("http://%s/stats?format=json&topic=%s&leaderOnly=%t", addr, selectedTopic, leaderOnly)
endpoint = fmt.Sprintf("http://%s/stats?format=json&topic=%s&leaderOnly=%t&needClients=%t", addr, selectedTopic, leaderOnly, needClient)
}
c.logf("CI: querying nsqd %s", endpoint)

Expand Down
3 changes: 3 additions & 0 deletions internal/clusterinfo/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,11 +140,13 @@ type TopicStats struct {
MessageCount int64 `json:"message_count"`
NodeStats []*TopicStats `json:"nodes"`
Channels []*ChannelStats `json:"channels"`
ChannelNum int64 `json:"channel_num"`
TotalChannelDepth int64 `json:"total_channel_depth"`
Paused bool `json:"paused"`
HourlyPubSize int64 `json:"hourly_pubsize"`
PartitionHourlyPubSize []int64 `json:"partition_hourly_pubsize"`
Clients []ClientPubStats `json:"client_pub_stats"`
ClientNum int64 `json:"client_num"`
MessageSizeStats [16]int64 `json:"msg_size_stats"`
MessageLatencyStats [16]int64 `json:"msg_write_latency_stats"`

Expand Down Expand Up @@ -224,6 +226,7 @@ type ChannelStats struct {
Selected bool `json:"-"`
NodeStats []*ChannelStats `json:"nodes"`
Clients []*ClientStats `json:"clients"`
ClientNum int64 `json:"client_num"`
Paused bool `json:"paused"`
Skipped bool `json:"skipped"`
ZanTestSkipped bool `json:"zan_test_skipped"`
Expand Down
2 changes: 1 addition & 1 deletion internal/http_api/api_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ type Client struct {
}

func NewClient(tlsConfig *tls.Config) *Client {
transport := NewDeadlineTransport(5 * time.Second)
transport := NewDeadlineTransport(15 * time.Second)
transport.TLSClientConfig = tlsConfig
return &Client{
c: &http.Client{
Expand Down
10 changes: 7 additions & 3 deletions nsqadmin/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,7 @@ func (s *httpServer) topicHandler(w http.ResponseWriter, req *http.Request, ps h
s.ctx.nsqadmin.logf("WARNING: %s", err)
messages = append(messages, pe.Error())
}
topicStats, _, err := s.ci.GetNSQDStats(producers, topicName, "partition", true)
topicStats, _, err := s.ci.GetNSQDStatsWithClients(producers, topicName, "partition", true)
if err != nil {
pe, ok := err.(clusterinfo.PartialErr)
if !ok {
Expand Down Expand Up @@ -587,7 +587,7 @@ func (s *httpServer) channelHandler(w http.ResponseWriter, req *http.Request, ps
s.ctx.nsqadmin.logf("WARNING: %s", err)
messages = append(messages, pe.Error())
}
_, allChannelStats, err := s.ci.GetNSQDStats(producers, topicName, "partition", true)
_, allChannelStats, err := s.ci.GetNSQDStatsWithClients(producers, topicName, "partition", true)
if err != nil {
pe, ok := err.(clusterinfo.PartialErr)
if !ok {
Expand Down Expand Up @@ -662,7 +662,11 @@ func (s *httpServer) nodeHandler(w http.ResponseWriter, req *http.Request, ps ht
var totalMessages int64
for _, ts := range topicStats {
for _, cs := range ts.Channels {
totalClients += int64(len(cs.Clients))
if len(cs.Clients) != 0 {
totalClients += int64(len(cs.Clients))
} else {
totalClients += int64(cs.ClientNum)
}
}
totalMessages += ts.MessageCount
}
Expand Down
Loading

0 comments on commit 9d60245

Please sign in to comment.