diff --git a/client.go b/client.go index 5dd6bfad..45dfba1a 100644 --- a/client.go +++ b/client.go @@ -246,6 +246,8 @@ type Client struct { info []byte storage map[string]any storageMu sync.Mutex + metricName string // Make a unique.Handle. + metricVersion string // Make a unique.Handle. authenticated bool clientSideRefresh bool status status @@ -1010,10 +1012,7 @@ func (c *Client) close(disconnect Disconnect) error { c.mu.RUnlock() if authenticated { - err := c.node.removeClient(c) - if err != nil { - c.node.logger.log(newLogEntry(LogLevelError, "error removing client", map[string]any{"user": c.user, "client": c.uid, "error": err.Error()})) - } + c.node.removeClient(c) } if disconnect.Code != DisconnectConnectionClosed.Code && !hasFlag(c.transport.DisabledPushFlags(), PushFlagDisconnect) { @@ -2227,6 +2226,20 @@ func (c *Client) unlockServerSideSubscriptions(subCtxMap map[string]subscribeCon // connectCmd handles connect command from client - client must send connect // command immediately after establishing connection with server. func (c *Client) connectCmd(req *protocol.ConnectRequest, cmd *protocol.Command, started time.Time, rw *replyWriter) (*protocol.ConnectResult, error) { + var metricClientName string + if req.Name != "" { + metricClientName = "unregistered" + if slices.Contains(c.node.config.Metrics.RegisteredClientNames, req.Name) { + metricClientName = req.Name + } + } + var metricClientVersion string + if req.Version != "" { + metricClientVersion = "unregistered" + if c.node.config.Metrics.CheckRegisteredClientVersion != nil && c.node.config.Metrics.CheckRegisteredClientVersion(req.Name, req.Version) { + metricClientVersion = req.Version + } + } c.mu.RLock() authenticated := c.authenticated closed := c.status == statusClosed @@ -2393,15 +2406,16 @@ func (c *Client) connectCmd(req *protocol.ConnectRequest, cmd *protocol.Command, // Client successfully connected. c.mu.Lock() + if c.status == statusClosed { + c.mu.Unlock() + return nil, DisconnectConnectionClosed + } c.authenticated = true + c.metricName = metricClientName + c.metricVersion = metricClientVersion + c.node.addClient(c) c.mu.Unlock() - err := c.node.addClient(c) - if err != nil { - c.node.logger.log(newLogEntry(LogLevelError, "error adding client", map[string]any{"client": c.uid, "error": err.Error()})) - return nil, DisconnectServerError - } - if !clientSideRefresh { // Server will do refresh itself. res.Expires = false diff --git a/config.go b/config.go index 510878b3..ac3e18c7 100644 --- a/config.go +++ b/config.go @@ -169,9 +169,16 @@ type MetricsConfig struct { // will expire in the cache. If zero – default TTL 10 seconds is used. ChannelNamespaceCacheTTL time.Duration - // AdditionalTransportNames is an optional list of additional transport names for a metrics system - // to be aware of. It helps to initialize transport related metrics with zero values. - AdditionalTransportNames []string + // RegisteredClientNames is an optional list of known client names which will be allowed to be + // attached as labels to metrics. If client passed a name which is not in the list – then Centrifuge + // will use string "unregistered" as a client_name label. We need to be strict here to avoid + // Prometheus cardinality issues. + RegisteredClientNames []string + // CheckRegisteredClientVersion is a function to check whether the version passed by a client with a + // particular name is valid and can be used in metric values. When function is not set or returns + // false Centrifuge will use "unregistered" value for a client version. Note, the name argument here + // is an original name of client passed to Centrifuge. + CheckRegisteredClientVersion func(clientName string, clientVersion string) bool } // PingPongConfig allows configuring application level ping-pong behavior. diff --git a/handler_http_stream.go b/handler_http_stream.go index 86d56b2a..51ca0bd0 100644 --- a/handler_http_stream.go +++ b/handler_http_stream.go @@ -40,10 +40,6 @@ const streamingResponseWriteTimeout = time.Second func (h *HTTPStreamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { h.node.metrics.incTransportConnect(transportHTTPStream) - h.node.metrics.incTransportConnectionsInflight(transportHTTPStream) - defer func() { - h.node.metrics.decTransportConnectionsInflight(transportHTTPStream) - }() if r.Method == http.MethodOptions { w.WriteHeader(http.StatusOK) diff --git a/handler_sse.go b/handler_sse.go index 97091971..b782dabd 100644 --- a/handler_sse.go +++ b/handler_sse.go @@ -41,10 +41,6 @@ const defaultMaxSSEBodySize = 64 * 1024 func (h *SSEHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { h.node.metrics.incTransportConnect(transportSSE) - h.node.metrics.incTransportConnectionsInflight(transportSSE) - defer func() { - h.node.metrics.decTransportConnectionsInflight(transportSSE) - }() var requestData []byte if r.Method == http.MethodGet { diff --git a/handler_websocket.go b/handler_websocket.go index a2c76731..daf82800 100644 --- a/handler_websocket.go +++ b/handler_websocket.go @@ -123,7 +123,6 @@ func NewWebsocketHandler(node *Node, config WebsocketConfig) *WebsocketHandler { func (s *WebsocketHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) { s.node.metrics.incTransportConnect(transportWebsocket) - s.node.metrics.incTransportConnectionsInflight(transportWebsocket) var protoType = ProtocolTypeJSON var useFramePingPong bool @@ -149,7 +148,6 @@ func (s *WebsocketHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) { conn, subProtocol, err := s.upgrade.Upgrade(rw, r, nil) if err != nil { s.node.logger.log(newLogEntry(LogLevelDebug, "websocket upgrade error", map[string]any{"error": err.Error()})) - s.node.metrics.decTransportConnectionsInflight(transportWebsocket) return } @@ -187,9 +185,6 @@ func (s *WebsocketHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) { // Separate goroutine for better GC of caller's data. go func() { - defer func() { - s.node.metrics.decTransportConnectionsInflight(transportWebsocket) - }() opts := websocketTransportOptions{ pingPong: s.config.PingPongConfig, writeTimeout: writeTimeout, diff --git a/hub.go b/hub.go index 7d93ca57..63cdbf8c 100644 --- a/hub.go +++ b/hub.go @@ -70,17 +70,17 @@ func (h *Hub) shutdown(ctx context.Context) error { } // Add connection into clientHub connections registry. -func (h *Hub) add(c *Client) error { +func (h *Hub) add(c *Client) { h.sessionsMu.Lock() if c.sessionID() != "" { h.sessions[c.sessionID()] = c } h.sessionsMu.Unlock() - return h.connShards[index(c.UserID(), numHubShards)].add(c) + h.connShards[index(c.UserID(), numHubShards)].add(c) } // Remove connection from clientHub connections registry. -func (h *Hub) remove(c *Client) error { +func (h *Hub) remove(c *Client) bool { h.sessionsMu.Lock() if c.sessionID() != "" { delete(h.sessions, c.sessionID()) @@ -128,7 +128,7 @@ func (h *Hub) addSub(ch string, sub subInfo) (bool, error) { } // removeSub removes connection from clientHub subscriptions registry. -func (h *Hub) removeSub(ch string, c *Client) (bool, error) { +func (h *Hub) removeSub(ch string, c *Client) (bool, bool) { return h.subShards[index(ch, numHubShards)].removeSub(ch, c) } @@ -416,7 +416,7 @@ func (h *connShard) userConnections(userID string) map[string]*Client { } // Add connection into clientHub connections registry. -func (h *connShard) add(c *Client) error { +func (h *connShard) add(c *Client) { h.mu.Lock() defer h.mu.Unlock() @@ -429,11 +429,10 @@ func (h *connShard) add(c *Client) error { h.users[user] = make(map[string]struct{}) } h.users[user][uid] = struct{}{} - return nil } // Remove connection from clientHub connections registry. -func (h *connShard) remove(c *Client) error { +func (h *connShard) remove(c *Client) bool { h.mu.Lock() defer h.mu.Unlock() @@ -444,10 +443,10 @@ func (h *connShard) remove(c *Client) error { // try to find connection to delete, return early if not found. if _, ok := h.users[user]; !ok { - return nil + return false } if _, ok := h.users[user][uid]; !ok { - return nil + return false } // actually remove connection from hub. @@ -458,7 +457,7 @@ func (h *connShard) remove(c *Client) error { delete(h.users, user) } - return nil + return true } // NumClients returns total number of client connections. @@ -533,7 +532,7 @@ func (h *subShard) addSub(ch string, sub subInfo) (bool, error) { } // removeSub removes connection from clientHub subscriptions registry. -func (h *subShard) removeSub(ch string, c *Client) (bool, error) { +func (h *subShard) removeSub(ch string, c *Client) (bool, bool) { h.mu.Lock() defer h.mu.Unlock() @@ -541,10 +540,10 @@ func (h *subShard) removeSub(ch string, c *Client) (bool, error) { // try to find subscription to delete, return early if not found. if _, ok := h.subs[ch]; !ok { - return true, nil + return true, false } if _, ok := h.subs[ch][uid]; !ok { - return true, nil + return true, false } // actually remove subscription from hub. @@ -553,10 +552,10 @@ func (h *subShard) removeSub(ch string, c *Client) (bool, error) { // clean up subs map if it's needed. if len(h.subs[ch]) == 0 { delete(h.subs, ch) - return true, nil + return true, true } - return false, nil + return false, true } type encodeError struct { diff --git a/metrics.go b/metrics.go index 22eb7a53..3ecad141 100644 --- a/metrics.go +++ b/metrics.go @@ -27,6 +27,8 @@ type metrics struct { numChannelsGauge prometheus.Gauge numNodesGauge prometheus.Gauge replyErrorCount *prometheus.CounterVec + connectionsInflight *prometheus.GaugeVec + subscriptionsInflight *prometheus.GaugeVec serverUnsubscribeCount *prometheus.CounterVec serverDisconnectCount *prometheus.CounterVec commandDurationSummary *prometheus.SummaryVec @@ -37,7 +39,6 @@ type metrics struct { transportMessagesSentSize *prometheus.CounterVec transportMessagesReceived *prometheus.CounterVec transportMessagesReceivedSize *prometheus.CounterVec - transportConnectionsInflight *prometheus.GaugeVec messagesReceivedCountPublication prometheus.Counter messagesReceivedCountJoin prometheus.Counter @@ -49,12 +50,9 @@ type metrics struct { messagesSentCountLeave prometheus.Counter messagesSentCountControl prometheus.Counter - transportConnectCountWebsocket prometheus.Counter - transportConnectCountSSE prometheus.Counter - transportConnectCountHTTPStream prometheus.Counter - transportConnectionsInflightWebsocket prometheus.Gauge - transportConnectionsInflightSSE prometheus.Gauge - transportConnectionsInflightHTTPStream prometheus.Gauge + transportConnectCountWebsocket prometheus.Counter + transportConnectCountSSE prometheus.Counter + transportConnectCountHTTPStream prometheus.Counter commandDurationConnect prometheus.Observer commandDurationSubscribe prometheus.Observer @@ -255,6 +253,20 @@ func newMetricsRegistry(config MetricsConfig) (*metrics, error) { 1.0, 2.5, 5.0, 10.0, // Second resolution. }}, []string{"transport"}) + m.connectionsInflight = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: metricsNamespace, + Subsystem: "client", + Name: "connections_inflight", + Help: "Number of inflight client connections.", + }, []string{"transport", "client_name", "client_version"}) + + m.subscriptionsInflight = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: metricsNamespace, + Subsystem: "client", + Name: "subscriptions_inflight", + Help: "Number of inflight client subscriptions.", + }, []string{"client_name", "channel_namespace"}) + m.transportConnectCount = prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: metricsNamespace, Subsystem: "transport", @@ -262,13 +274,6 @@ func newMetricsRegistry(config MetricsConfig) (*metrics, error) { Help: "Number of connect attempts to specific transport.", }, []string{"transport"}) - m.transportConnectionsInflight = prometheus.NewGaugeVec(prometheus.GaugeOpts{ - Namespace: metricsNamespace, - Subsystem: "transport", - Name: "inflight_connections", - Help: "Number of inflight connections over specific transport.", - }, []string{"transport"}) - m.transportMessagesSent = prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: metricsNamespace, Subsystem: "transport", @@ -329,14 +334,6 @@ func newMetricsRegistry(config MetricsConfig) (*metrics, error) { m.transportConnectCountHTTPStream = m.transportConnectCount.WithLabelValues(transportHTTPStream) m.transportConnectCountSSE = m.transportConnectCount.WithLabelValues(transportSSE) - m.transportConnectionsInflightWebsocket = m.transportConnectionsInflight.WithLabelValues(transportWebsocket) - m.transportConnectionsInflightHTTPStream = m.transportConnectionsInflight.WithLabelValues(transportHTTPStream) - m.transportConnectionsInflightSSE = m.transportConnectionsInflight.WithLabelValues(transportSSE) - for _, transportName := range config.AdditionalTransportNames { - m.transportConnectCount.WithLabelValues(transportName).Add(0) - m.transportConnectionsInflight.WithLabelValues(transportName).Set(0) - } - labelForMethod := func(frameType protocol.FrameType) string { return frameType.String() } @@ -367,12 +364,13 @@ func newMetricsRegistry(config MetricsConfig) (*metrics, error) { m.numNodesGauge, m.commandDurationSummary, m.replyErrorCount, + m.connectionsInflight, + m.subscriptionsInflight, m.serverUnsubscribeCount, m.serverDisconnectCount, m.recoverCount, m.pingPongDurationHistogram, m.transportConnectCount, - m.transportConnectionsInflight, m.transportMessagesSent, m.transportMessagesSentSize, m.transportMessagesReceived, @@ -461,32 +459,6 @@ func (m *metrics) observeCommandDuration(frameType protocol.FrameType, d time.Du observer.Observe(d.Seconds()) } -func (m *metrics) incTransportConnectionsInflight(transport string) { - switch transport { - case transportWebsocket: - m.transportConnectionsInflightWebsocket.Inc() - case transportSSE: - m.transportConnectionsInflightSSE.Inc() - case transportHTTPStream: - m.transportConnectionsInflightHTTPStream.Inc() - default: - m.transportConnectionsInflight.WithLabelValues(transport).Inc() - } -} - -func (m *metrics) decTransportConnectionsInflight(transport string) { - switch transport { - case transportWebsocket: - m.transportConnectionsInflightWebsocket.Dec() - case transportSSE: - m.transportConnectionsInflightSSE.Dec() - case transportHTTPStream: - m.transportConnectionsInflightHTTPStream.Dec() - default: - m.transportConnectionsInflight.WithLabelValues(transport).Dec() - } -} - func (m *metrics) observePubSubDeliveryLag(lagTimeMilli int64) { if lagTimeMilli < 0 { lagTimeMilli = -lagTimeMilli diff --git a/node.go b/node.go index 4f883c8b..a8e9d0b7 100644 --- a/node.go +++ b/node.go @@ -982,21 +982,26 @@ func (n *Node) pubDisconnect(user string, disconnect Disconnect, clientID string // addClient registers authenticated connection in clientConnectionHub // this allows to make operations with user connection on demand. -func (n *Node) addClient(c *Client) error { +func (n *Node) addClient(c *Client) { n.metrics.incActionCount("add_client", "") - return n.hub.add(c) + n.metrics.connectionsInflight.WithLabelValues(c.transport.Name(), c.metricName, c.metricVersion).Inc() + n.hub.add(c) } // removeClient removes client connection from connection registry. -func (n *Node) removeClient(c *Client) error { +func (n *Node) removeClient(c *Client) { n.metrics.incActionCount("remove_client", "") - return n.hub.remove(c) + removed := n.hub.remove(c) + if removed { + n.metrics.connectionsInflight.WithLabelValues(c.transport.Name(), c.metricName, c.metricVersion).Dec() + } } // addSubscription registers subscription of connection on channel in both // Hub and Broker. func (n *Node) addSubscription(ch string, sub subInfo) error { n.metrics.incActionCount("add_subscription", ch) + n.metrics.subscriptionsInflight.WithLabelValues(sub.client.metricName, n.metrics.getChannelNamespaceLabel(ch)).Inc() mu := n.subLock(ch) mu.Lock() defer mu.Unlock() @@ -1047,9 +1052,9 @@ func (n *Node) removeSubscription(ch string, c *Client) error { mu := n.subLock(ch) mu.Lock() defer mu.Unlock() - empty, err := n.hub.removeSub(ch, c) - if err != nil { - return err + empty, wasRemoved := n.hub.removeSub(ch, c) + if wasRemoved { + n.metrics.subscriptionsInflight.WithLabelValues(c.metricName, n.metrics.getChannelNamespaceLabel(ch)).Dec() } if empty { submittedAt := time.Now() @@ -1683,21 +1688,3 @@ func (n *Node) HandleLeave(ch string, info *ClientInfo) error { func (n *Node) HandleControl(data []byte) error { return n.handleControl(data) } - -// IncTransportConnectionsInflight increments number of transport connections inflight built-in gauge. -// Only useful if you are using a custom transport. For built-in transports this is done automatically. -func (n *Node) IncTransportConnectionsInflight(transportName string) { - n.metrics.incTransportConnectionsInflight(transportName) -} - -// DecTransportConnectionsInflight decrements number of transport connections inflight built-in gauge. -// Only useful if you are using a custom transport. For built-in transports this is done automatically. -func (n *Node) DecTransportConnectionsInflight(transportName string) { - n.metrics.decTransportConnectionsInflight(transportName) -} - -// IncTransportConnectCounter increments number of transport connect attempts built-in counter. -// Only useful if you are using a custom transport. For built-in transports this is done automatically. -func (n *Node) IncTransportConnectCounter(transportName string) { - n.metrics.incTransportConnect(transportName) -}