diff --git a/apps/nsq_to_nsq_ordered/nsq_to_nsq_ordered.go b/apps/nsq_to_nsq_ordered/nsq_to_nsq_ordered.go new file mode 100644 index 00000000..d1b27c73 --- /dev/null +++ b/apps/nsq_to_nsq_ordered/nsq_to_nsq_ordered.go @@ -0,0 +1,222 @@ +// This is an NSQ client that reads the specified topic/channel +// and re-publishes the messages to destination nsqd via TCP + +package main + +import ( + "errors" + "flag" + "fmt" + "log" + "os" + "os/signal" + "strconv" + "syscall" + "time" + + "github.com/bitly/go-simplejson" + "github.com/spaolacci/murmur3" + "github.com/youzan/go-nsq" + "github.com/youzan/nsq/internal/app" + "github.com/youzan/nsq/internal/protocol" + "github.com/youzan/nsq/internal/version" +) + +const ( + ModeRoundRobin = iota + ModeHostPool +) + +var ( + showVersion = flag.Bool("version", false, "print version string") + + topic = flag.String("topic", "", "nsq topic") + channel = flag.String("channel", "nsq_to_nsq", "nsq channel") + destTopic = flag.String("destination-topic", "", "destination nsq topic") + maxInFlight = flag.Int("max-in-flight", 200, "max number of messages to allow in flight") + parallNum = flag.Int("parall-num", 100, "number of parallel run") + orderJsonKey = flag.String("order-json-key", "testkey", "json key to ordered sharding") + + statusEvery = flag.Int("status-every", 250, "the # of requests between logging status (per destination), 0 disables") + + lookupdHTTPAddrs = app.StringArray{} + destLookupdHTTPAddrs = app.StringArray{} + + // TODO: remove, deprecated + maxBackoffDuration = flag.Duration("max-backoff-duration", 120*time.Second, "(deprecated) use --consumer-opt=max_backoff_duration,X") +) + +func init() { + flag.Var(&destLookupdHTTPAddrs, "destination-lookupd-address", "destination address (may be given multiple times)") + flag.Var(&lookupdHTTPAddrs, "lookupd-http-address", "lookupd HTTP address (may be given multiple times)") +} + +type PublishHandler struct { + // 64bit atomic vars need to be first for proper alignment on 32bit platforms + counter uint64 + + producers *nsq.TopicProducerMgr + mode int + + requireJSONValueParsed bool + requireJSONValueIsNumber bool + requireJSONNumber float64 +} + +func getMsgKeyFromBody(body []byte, parentKey string) ([]byte, error) { + jsonMsg, err := simplejson.NewJson(body) + if err != nil { + log.Printf("ERROR: Unable to decode json: %s", body) + return nil, err + } + + jsonV, ok := jsonMsg.CheckGet(parentKey) + if !ok { + log.Printf("ERROR: Unable to get json: %s", body) + return nil, err + } + jsonV, ok = jsonV.CheckGet("key2") + if !ok { + log.Printf("ERROR: Unable to get json: %s", body) + return nil, errors.New("unable to get json key") + } + msgKey, err := jsonV.Bytes() + if err != nil { + log.Printf("WARN: json key value is not string: %s, %v", body, jsonV) + intV, err := jsonV.Int() + if err != nil { + return nil, err + } + return []byte(strconv.Itoa(intV)), nil + } + return msgKey, nil +} + +func (ph *PublishHandler) HandleMessage(m *nsq.Message) error { + var err error + msgBody := m.Body + msgKey, err := getMsgKeyFromBody(msgBody, *orderJsonKey) + if err != nil { + return err + } + + var ext *nsq.MsgExt + if m.ExtVer > 0 { + var err2 error + ext, err2 = m.GetJsonExt() + if err2 != nil { + log.Printf("failed to get ext, ignore it: %v, header: %s", err2, m.ExtBytes) + } + } + if ext != nil { + _, _, _, err = ph.producers.PublishOrderedWithJsonExt(*destTopic, msgKey, msgBody, ext) + if err != nil { + return err + } + } else { + _, _, _, err = ph.producers.PublishOrdered(*destTopic, msgKey, msgBody) + if err != nil { + return err + } + } + return nil +} + +func hasArg(s string) bool { + argExist := false + flag.Visit(func(f *flag.Flag) { + if f.Name == s { + argExist = true + } + }) + return argExist +} + +func main() { + cCfg := nsq.NewConfig() + pCfg := nsq.NewConfig() + + flag.Var(&nsq.ConfigFlag{cCfg}, "consumer-opt", "option to passthrough to nsq.Consumer (may be given multiple times, see http://godoc.org/github.com/youzan/go-nsq#Config)") + flag.Var(&nsq.ConfigFlag{pCfg}, "producer-opt", "option to passthrough to nsq.Producer (may be given multiple times, see http://godoc.org/github.com/youzan/go-nsq#Config)") + + flag.Parse() + + if *showVersion { + fmt.Printf("nsq_to_nsq v%s\n", version.Binary) + return + } + + if *topic == "" || *channel == "" { + log.Fatal("--topic and --channel are required") + } + + if *destTopic == "" { + *destTopic = *topic + } + + if !protocol.IsValidTopicName(*topic) { + log.Fatal("--topic is invalid") + } + + if !protocol.IsValidTopicName(*destTopic) { + log.Fatal("--destination-topic is invalid") + } + + if !protocol.IsValidChannelName(*channel) { + log.Fatal("--channel is invalid") + } + + if len(lookupdHTTPAddrs) == 0 { + log.Fatal("--lookupd-http-address required") + } + + if len(destLookupdHTTPAddrs) == 0 { + destLookupdHTTPAddrs = lookupdHTTPAddrs + } + + termChan := make(chan os.Signal, 1) + signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM) + + defaultUA := fmt.Sprintf("nsq_to_nsq/%s go-nsq/%s", version.Binary, nsq.VERSION) + + cCfg.UserAgent = defaultUA + cCfg.MaxInFlight = *maxInFlight + + // TODO: remove, deprecated + if hasArg("max-backoff-duration") { + log.Printf("WARNING: --max-backoff-duration is deprecated in favor of --consumer-opt=max_backoff_duration,X") + cCfg.MaxBackoffDuration = *maxBackoffDuration + } + + cCfg.EnableOrdered = true + consumer, err := nsq.NewConsumer(*topic, *channel, cCfg) + if err != nil { + log.Fatal(err) + } + + pCfg.UserAgent = defaultUA + pCfg.LookupdSeeds = destLookupdHTTPAddrs + pCfg.ProducerPoolSize = *parallNum + pCfg.EnableOrdered = true + pCfg.Hasher = murmur3.New32() + producerMgr, err := nsq.NewTopicProducerMgr([]string{*destTopic}, pCfg) + + handler := &PublishHandler{ + producers: producerMgr, + } + consumer.AddConcurrentHandlers(handler, *parallNum) + + err = consumer.ConnectToNSQLookupds(lookupdHTTPAddrs) + if err != nil { + log.Fatal(err) + } + + for { + select { + case <-consumer.StopChan: + return + case <-termChan: + consumer.Stop() + } + } +} diff --git a/apps/nsq_to_nsq_ordered/nsq_to_nsq_test.go b/apps/nsq_to_nsq_ordered/nsq_to_nsq_test.go new file mode 100644 index 00000000..7b8226ee --- /dev/null +++ b/apps/nsq_to_nsq_ordered/nsq_to_nsq_test.go @@ -0,0 +1,40 @@ +// This is an NSQ client that reads the specified topic/channel +// and re-publishes the messages to destination nsqd via TCP + +package main + +import ( + "reflect" + "testing" +) + +func Test_getMsgKeyFromBody(t *testing.T) { + type args struct { + body []byte + jsonKey string + } + tests := []struct { + name string + args args + want []byte + wantErr bool + }{ + // TODO: Add test cases. + {"test1", args{[]byte("{\"key1\":\"test1\"}"), "key1"}, nil, true}, + {"test2", args{[]byte("{\"key1\":{\"key2\":\"test12\"}}"), "key1"}, []byte("test12"), false}, + {"test3", args{[]byte("{\"key11\":{\"key3\":\"test12\"}}"), "key11"}, nil, true}, + {"test4", args{[]byte("{\"key11\":{\"key2\":12}}"), "key11"}, []byte("12"), false}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := getMsgKeyFromBody(tt.args.body, tt.args.jsonKey) + if (err != nil) != tt.wantErr { + t.Errorf("getMsgKeyFromBody() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("getMsgKeyFromBody() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/consistence/coordinator_rpc.go b/consistence/coordinator_rpc.go index 025e65c1..5ac7e88b 100644 --- a/consistence/coordinator_rpc.go +++ b/consistence/coordinator_rpc.go @@ -577,7 +577,7 @@ func (self *NsqdCoordRpcServer) GetTopicStats(topic string) *NodeTopicStats { // all topic status topicStats = self.nsqdCoord.localNsqd.GetStats(false, true) } else { - topicStats = self.nsqdCoord.localNsqd.GetTopicStatsWithFilter(false, topic, true) + topicStats = self.nsqdCoord.localNsqd.GetTopicStatsWithFilter(false, topic, "", true) } stat := NewNodeTopicStats(self.nsqdCoord.myNode.GetID(), len(topicStats)*2, runtime.NumCPU()) for _, ts := range topicStats { diff --git a/internal/clusterinfo/data.go b/internal/clusterinfo/data.go index 2d3b6352..8f8e85cf 100644 --- a/internal/clusterinfo/data.go +++ b/internal/clusterinfo/data.go @@ -977,11 +977,8 @@ func (c *ClusterInfo) getNSQDStats(producers Producers, selectedTopic string, so addr := p.HTTPAddress() endpoint := fmt.Sprintf("http://%s/stats?format=json&leaderOnly=%t&needClients=%t", addr, leaderOnly, needClient) - if !needClient { - endpoint = fmt.Sprintf("http://%s/stats?format=json&leaderOnly=%t", addr, leaderOnly) - } if selectedTopic != "" { - endpoint = fmt.Sprintf("http://%s/stats?format=json&topic=%s&leaderOnly=%t", addr, selectedTopic, leaderOnly) + endpoint = fmt.Sprintf("http://%s/stats?format=json&topic=%s&leaderOnly=%t&needClients=%t", addr, selectedTopic, leaderOnly, needClient) } c.logf("CI: querying nsqd %s", endpoint) diff --git a/internal/clusterinfo/types.go b/internal/clusterinfo/types.go index 5b8ca412..94b98ec2 100644 --- a/internal/clusterinfo/types.go +++ b/internal/clusterinfo/types.go @@ -140,11 +140,13 @@ type TopicStats struct { MessageCount int64 `json:"message_count"` NodeStats []*TopicStats `json:"nodes"` Channels []*ChannelStats `json:"channels"` + ChannelNum int64 `json:"channel_num"` TotalChannelDepth int64 `json:"total_channel_depth"` Paused bool `json:"paused"` HourlyPubSize int64 `json:"hourly_pubsize"` PartitionHourlyPubSize []int64 `json:"partition_hourly_pubsize"` Clients []ClientPubStats `json:"client_pub_stats"` + ClientNum int64 `json:"client_num"` MessageSizeStats [16]int64 `json:"msg_size_stats"` MessageLatencyStats [16]int64 `json:"msg_write_latency_stats"` @@ -224,6 +226,7 @@ type ChannelStats struct { Selected bool `json:"-"` NodeStats []*ChannelStats `json:"nodes"` Clients []*ClientStats `json:"clients"` + ClientNum int64 `json:"client_num"` Paused bool `json:"paused"` Skipped bool `json:"skipped"` ZanTestSkipped bool `json:"zan_test_skipped"` diff --git a/internal/http_api/api_request.go b/internal/http_api/api_request.go index 12f90a6c..c12d9638 100644 --- a/internal/http_api/api_request.go +++ b/internal/http_api/api_request.go @@ -47,7 +47,7 @@ type Client struct { } func NewClient(tlsConfig *tls.Config) *Client { - transport := NewDeadlineTransport(5 * time.Second) + transport := NewDeadlineTransport(15 * time.Second) transport.TLSClientConfig = tlsConfig return &Client{ c: &http.Client{ diff --git a/nsqadmin/http.go b/nsqadmin/http.go index 74fdbb00..a8bf3fc1 100644 --- a/nsqadmin/http.go +++ b/nsqadmin/http.go @@ -495,7 +495,7 @@ func (s *httpServer) topicHandler(w http.ResponseWriter, req *http.Request, ps h s.ctx.nsqadmin.logf("WARNING: %s", err) messages = append(messages, pe.Error()) } - topicStats, _, err := s.ci.GetNSQDStats(producers, topicName, "partition", true) + topicStats, _, err := s.ci.GetNSQDStatsWithClients(producers, topicName, "partition", true) if err != nil { pe, ok := err.(clusterinfo.PartialErr) if !ok { @@ -587,7 +587,7 @@ func (s *httpServer) channelHandler(w http.ResponseWriter, req *http.Request, ps s.ctx.nsqadmin.logf("WARNING: %s", err) messages = append(messages, pe.Error()) } - _, allChannelStats, err := s.ci.GetNSQDStats(producers, topicName, "partition", true) + _, allChannelStats, err := s.ci.GetNSQDStatsWithClients(producers, topicName, "partition", true) if err != nil { pe, ok := err.(clusterinfo.PartialErr) if !ok { @@ -662,7 +662,11 @@ func (s *httpServer) nodeHandler(w http.ResponseWriter, req *http.Request, ps ht var totalMessages int64 for _, ts := range topicStats { for _, cs := range ts.Channels { - totalClients += int64(len(cs.Clients)) + if len(cs.Clients) != 0 { + totalClients += int64(len(cs.Clients)) + } else { + totalClients += int64(cs.ClientNum) + } } totalMessages += ts.MessageCount } diff --git a/nsqd/stats.go b/nsqd/stats.go index a6af0922..8e9b76d6 100644 --- a/nsqd/stats.go +++ b/nsqd/stats.go @@ -27,6 +27,7 @@ type TopicStats struct { TopicFullName string `json:"topic_full_name"` TopicPartition string `json:"topic_partition"` Channels []ChannelStats `json:"channels"` + ChannelNum int64 `json:"channel_num"` Depth int64 `json:"depth"` BackendDepth int64 `json:"backend_depth"` BackendStart int64 `json:"backend_start"` @@ -62,6 +63,7 @@ func NewTopicStats(t *Topic, channels []ChannelStats, filterClients bool) TopicS TopicFullName: t.GetFullName(), TopicPartition: strconv.Itoa(t.GetTopicPart()), Channels: channels, + ChannelNum: int64(len(channels)), Depth: t.TotalDataSize(), BackendDepth: t.TotalDataSize(), BackendStart: t.GetQueueReadStart(), @@ -260,10 +262,10 @@ func (n *NSQD) GetStats(leaderOnly bool, filterClients bool) []TopicStats { } n.RUnlock() - return n.getTopicStats(realTopics, filterClients) + return n.getTopicStats(realTopics, "", filterClients) } -func (n *NSQD) getTopicStats(realTopics []*Topic, filterClients bool) []TopicStats { +func (n *NSQD) getTopicStats(realTopics []*Topic, ch string, filterClients bool) []TopicStats { sort.Sort(TopicsByName{realTopics}) topics := make([]TopicStats, 0, len(realTopics)) for _, t := range realTopics { @@ -282,9 +284,11 @@ func (n *NSQD) getTopicStats(realTopics []*Topic, filterClients bool) []TopicSta if filterClients { clients = nil } else { - clients = make([]ClientStats, 0, len(c.clients)) - for _, client := range c.clients { - clients = append(clients, client.Stats()) + if len(ch) == 0 || c.name == ch { + clients = make([]ClientStats, 0, len(c.clients)) + for _, client := range c.clients { + clients = append(clients, client.Stats()) + } } } c.RUnlock() @@ -295,7 +299,7 @@ func (n *NSQD) getTopicStats(realTopics []*Topic, filterClients bool) []TopicSta return topics } -func (n *NSQD) GetTopicStatsWithFilter(leaderOnly bool, topic string, filterClients bool) []TopicStats { +func (n *NSQD) GetTopicStatsWithFilter(leaderOnly bool, topic string, ch string, filterClients bool) []TopicStats { n.RLock() realTopics := make([]*Topic, 0, len(n.topicMap)) for name, topicParts := range n.topicMap { @@ -310,11 +314,11 @@ func (n *NSQD) GetTopicStatsWithFilter(leaderOnly bool, topic string, filterClie } } n.RUnlock() - return n.getTopicStats(realTopics, filterClients) + return n.getTopicStats(realTopics, ch, filterClients) } func (n *NSQD) GetTopicStats(leaderOnly bool, topic string) []TopicStats { - return n.GetTopicStatsWithFilter(leaderOnly, topic, false) + return n.GetTopicStatsWithFilter(leaderOnly, topic, "", false) } type DetailStatsInfo struct { diff --git a/nsqdserver/context.go b/nsqdserver/context.go index 9aae75b5..d5935e77 100644 --- a/nsqdserver/context.go +++ b/nsqdserver/context.go @@ -93,9 +93,9 @@ func (c *context) setHealth(err error) { c.nsqd.SetHealth(err) } -func (c *context) getStats(leaderOnly bool, selectedTopic string, filterClients bool) []nsqd.TopicStats { +func (c *context) getStats(leaderOnly bool, selectedTopic string, ch string, filterClients bool) []nsqd.TopicStats { if selectedTopic != "" { - return c.nsqd.GetTopicStats(leaderOnly, selectedTopic) + return c.nsqd.GetTopicStatsWithFilter(leaderOnly, selectedTopic, ch, filterClients) } return c.nsqd.GetStats(leaderOnly, filterClients) } diff --git a/nsqdserver/http.go b/nsqdserver/http.go index 72b60f69..f2907594 100644 --- a/nsqdserver/http.go +++ b/nsqdserver/http.go @@ -1062,7 +1062,7 @@ func (s *httpServer) doMessageHistoryStats(w http.ResponseWriter, req *http.Requ } if topicName == "" && topicPartStr == "" { - topicStats := s.ctx.getStats(true, "", true) + topicStats := s.ctx.getStats(true, "", "", true) var topicHourlyPubStatsList []*clusterinfo.NodeHourlyPubsize for _, topicStat := range topicStats { partitionNum, err := strconv.Atoi(topicStat.TopicPartition) @@ -1361,8 +1361,12 @@ func (s *httpServer) doStats(w http.ResponseWriter, req *http.Request, ps httpro jsonFormat := formatString == "json" filterClients := needClients != "true" + if topicName != "" && needClients == "" { + // compatible with old, we always return clients if topic is specified and needClients is not specified + filterClients = false + } - stats := s.ctx.getStats(leaderOnly, topicName, filterClients) + stats := s.ctx.getStats(leaderOnly, topicName, channelName, filterClients) health := s.ctx.getHealth() startTime := s.ctx.getStartTime() uptime := time.Since(startTime)