Skip to content

Commit

Permalink
feat: optimize the stats api
Browse files Browse the repository at this point in the history
  • Loading branch information
absolute8511 committed Apr 5, 2022
1 parent 8907b0f commit 96608a3
Show file tree
Hide file tree
Showing 8 changed files with 33 additions and 21 deletions.
2 changes: 1 addition & 1 deletion consistence/coordinator_rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -577,7 +577,7 @@ func (self *NsqdCoordRpcServer) GetTopicStats(topic string) *NodeTopicStats {
// all topic status
topicStats = self.nsqdCoord.localNsqd.GetStats(false, true)
} else {
topicStats = self.nsqdCoord.localNsqd.GetTopicStatsWithFilter(false, topic, true)
topicStats = self.nsqdCoord.localNsqd.GetTopicStatsWithFilter(false, topic, "", true)
}
stat := NewNodeTopicStats(self.nsqdCoord.myNode.GetID(), len(topicStats)*2, runtime.NumCPU())
for _, ts := range topicStats {
Expand Down
5 changes: 1 addition & 4 deletions internal/clusterinfo/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -977,11 +977,8 @@ func (c *ClusterInfo) getNSQDStats(producers Producers, selectedTopic string, so

addr := p.HTTPAddress()
endpoint := fmt.Sprintf("http://%s/stats?format=json&leaderOnly=%t&needClients=%t", addr, leaderOnly, needClient)
if !needClient {
endpoint = fmt.Sprintf("http://%s/stats?format=json&leaderOnly=%t", addr, leaderOnly)
}
if selectedTopic != "" {
endpoint = fmt.Sprintf("http://%s/stats?format=json&topic=%s&leaderOnly=%t", addr, selectedTopic, leaderOnly)
endpoint = fmt.Sprintf("http://%s/stats?format=json&topic=%s&leaderOnly=%t&needClients=%t", addr, selectedTopic, leaderOnly, needClient)
}
c.logf("CI: querying nsqd %s", endpoint)

Expand Down
3 changes: 3 additions & 0 deletions internal/clusterinfo/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,11 +140,13 @@ type TopicStats struct {
MessageCount int64 `json:"message_count"`
NodeStats []*TopicStats `json:"nodes"`
Channels []*ChannelStats `json:"channels"`
ChannelNum int64 `json:"channel_num"`
TotalChannelDepth int64 `json:"total_channel_depth"`
Paused bool `json:"paused"`
HourlyPubSize int64 `json:"hourly_pubsize"`
PartitionHourlyPubSize []int64 `json:"partition_hourly_pubsize"`
Clients []ClientPubStats `json:"client_pub_stats"`
ClientNum int64 `json:"client_num"`
MessageSizeStats [16]int64 `json:"msg_size_stats"`
MessageLatencyStats [16]int64 `json:"msg_write_latency_stats"`

Expand Down Expand Up @@ -224,6 +226,7 @@ type ChannelStats struct {
Selected bool `json:"-"`
NodeStats []*ChannelStats `json:"nodes"`
Clients []*ClientStats `json:"clients"`
ClientNum int64 `json:"client_num"`
Paused bool `json:"paused"`
Skipped bool `json:"skipped"`
ZanTestSkipped bool `json:"zan_test_skipped"`
Expand Down
2 changes: 1 addition & 1 deletion internal/http_api/api_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ type Client struct {
}

func NewClient(tlsConfig *tls.Config) *Client {
transport := NewDeadlineTransport(5 * time.Second)
transport := NewDeadlineTransport(15 * time.Second)
transport.TLSClientConfig = tlsConfig
return &Client{
c: &http.Client{
Expand Down
10 changes: 7 additions & 3 deletions nsqadmin/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,7 @@ func (s *httpServer) topicHandler(w http.ResponseWriter, req *http.Request, ps h
s.ctx.nsqadmin.logf("WARNING: %s", err)
messages = append(messages, pe.Error())
}
topicStats, _, err := s.ci.GetNSQDStats(producers, topicName, "partition", true)
topicStats, _, err := s.ci.GetNSQDStatsWithClients(producers, topicName, "partition", true)
if err != nil {
pe, ok := err.(clusterinfo.PartialErr)
if !ok {
Expand Down Expand Up @@ -587,7 +587,7 @@ func (s *httpServer) channelHandler(w http.ResponseWriter, req *http.Request, ps
s.ctx.nsqadmin.logf("WARNING: %s", err)
messages = append(messages, pe.Error())
}
_, allChannelStats, err := s.ci.GetNSQDStats(producers, topicName, "partition", true)
_, allChannelStats, err := s.ci.GetNSQDStatsWithClients(producers, topicName, "partition", true)
if err != nil {
pe, ok := err.(clusterinfo.PartialErr)
if !ok {
Expand Down Expand Up @@ -662,7 +662,11 @@ func (s *httpServer) nodeHandler(w http.ResponseWriter, req *http.Request, ps ht
var totalMessages int64
for _, ts := range topicStats {
for _, cs := range ts.Channels {
totalClients += int64(len(cs.Clients))
if len(cs.Clients) != 0 {
totalClients += int64(len(cs.Clients))
} else {
totalClients += int64(cs.ClientNum)
}
}
totalMessages += ts.MessageCount
}
Expand Down
20 changes: 12 additions & 8 deletions nsqd/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type TopicStats struct {
TopicFullName string `json:"topic_full_name"`
TopicPartition string `json:"topic_partition"`
Channels []ChannelStats `json:"channels"`
ChannelNum int64 `json:"channel_num"`
Depth int64 `json:"depth"`
BackendDepth int64 `json:"backend_depth"`
BackendStart int64 `json:"backend_start"`
Expand Down Expand Up @@ -62,6 +63,7 @@ func NewTopicStats(t *Topic, channels []ChannelStats, filterClients bool) TopicS
TopicFullName: t.GetFullName(),
TopicPartition: strconv.Itoa(t.GetTopicPart()),
Channels: channels,
ChannelNum: int64(len(channels)),
Depth: t.TotalDataSize(),
BackendDepth: t.TotalDataSize(),
BackendStart: t.GetQueueReadStart(),
Expand Down Expand Up @@ -260,10 +262,10 @@ func (n *NSQD) GetStats(leaderOnly bool, filterClients bool) []TopicStats {
}
n.RUnlock()

return n.getTopicStats(realTopics, filterClients)
return n.getTopicStats(realTopics, "", filterClients)
}

func (n *NSQD) getTopicStats(realTopics []*Topic, filterClients bool) []TopicStats {
func (n *NSQD) getTopicStats(realTopics []*Topic, ch string, filterClients bool) []TopicStats {
sort.Sort(TopicsByName{realTopics})
topics := make([]TopicStats, 0, len(realTopics))
for _, t := range realTopics {
Expand All @@ -282,9 +284,11 @@ func (n *NSQD) getTopicStats(realTopics []*Topic, filterClients bool) []TopicSta
if filterClients {
clients = nil
} else {
clients = make([]ClientStats, 0, len(c.clients))
for _, client := range c.clients {
clients = append(clients, client.Stats())
if len(ch) == 0 || c.name == ch {
clients = make([]ClientStats, 0, len(c.clients))
for _, client := range c.clients {
clients = append(clients, client.Stats())
}
}
}
c.RUnlock()
Expand All @@ -295,7 +299,7 @@ func (n *NSQD) getTopicStats(realTopics []*Topic, filterClients bool) []TopicSta
return topics
}

func (n *NSQD) GetTopicStatsWithFilter(leaderOnly bool, topic string, filterClients bool) []TopicStats {
func (n *NSQD) GetTopicStatsWithFilter(leaderOnly bool, topic string, ch string, filterClients bool) []TopicStats {
n.RLock()
realTopics := make([]*Topic, 0, len(n.topicMap))
for name, topicParts := range n.topicMap {
Expand All @@ -310,11 +314,11 @@ func (n *NSQD) GetTopicStatsWithFilter(leaderOnly bool, topic string, filterClie
}
}
n.RUnlock()
return n.getTopicStats(realTopics, filterClients)
return n.getTopicStats(realTopics, ch, filterClients)
}

func (n *NSQD) GetTopicStats(leaderOnly bool, topic string) []TopicStats {
return n.GetTopicStatsWithFilter(leaderOnly, topic, false)
return n.GetTopicStatsWithFilter(leaderOnly, topic, "", false)
}

type DetailStatsInfo struct {
Expand Down
4 changes: 2 additions & 2 deletions nsqdserver/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,9 @@ func (c *context) setHealth(err error) {
c.nsqd.SetHealth(err)
}

func (c *context) getStats(leaderOnly bool, selectedTopic string, filterClients bool) []nsqd.TopicStats {
func (c *context) getStats(leaderOnly bool, selectedTopic string, ch string, filterClients bool) []nsqd.TopicStats {
if selectedTopic != "" {
return c.nsqd.GetTopicStats(leaderOnly, selectedTopic)
return c.nsqd.GetTopicStatsWithFilter(leaderOnly, selectedTopic, ch, filterClients)
}
return c.nsqd.GetStats(leaderOnly, filterClients)
}
Expand Down
8 changes: 6 additions & 2 deletions nsqdserver/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -1062,7 +1062,7 @@ func (s *httpServer) doMessageHistoryStats(w http.ResponseWriter, req *http.Requ
}

if topicName == "" && topicPartStr == "" {
topicStats := s.ctx.getStats(true, "", true)
topicStats := s.ctx.getStats(true, "", "", true)
var topicHourlyPubStatsList []*clusterinfo.NodeHourlyPubsize
for _, topicStat := range topicStats {
partitionNum, err := strconv.Atoi(topicStat.TopicPartition)
Expand Down Expand Up @@ -1361,8 +1361,12 @@ func (s *httpServer) doStats(w http.ResponseWriter, req *http.Request, ps httpro

jsonFormat := formatString == "json"
filterClients := needClients != "true"
if topicName != "" && needClients == "" {
// compatible with old, we always return clients if topic is specified and needClients is not specified
filterClients = false
}

stats := s.ctx.getStats(leaderOnly, topicName, filterClients)
stats := s.ctx.getStats(leaderOnly, topicName, channelName, filterClients)
health := s.ctx.getHealth()
startTime := s.ctx.getStartTime()
uptime := time.Since(startTime)
Expand Down

0 comments on commit 96608a3

Please sign in to comment.