Skip to content

Commit

Permalink
client conns and subs inflight
Browse files Browse the repository at this point in the history
  • Loading branch information
FZambia committed Dec 12, 2024
1 parent f23315d commit abee564
Show file tree
Hide file tree
Showing 8 changed files with 81 additions and 115 deletions.
34 changes: 24 additions & 10 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,8 @@ type Client struct {
info []byte
storage map[string]any
storageMu sync.Mutex
metricName string // Make a unique.Handle.
metricVersion string // Make a unique.Handle.
authenticated bool
clientSideRefresh bool
status status
Expand Down Expand Up @@ -1010,10 +1012,7 @@ func (c *Client) close(disconnect Disconnect) error {
c.mu.RUnlock()

if authenticated {
err := c.node.removeClient(c)
if err != nil {
c.node.logger.log(newLogEntry(LogLevelError, "error removing client", map[string]any{"user": c.user, "client": c.uid, "error": err.Error()}))
}
c.node.removeClient(c)
}

if disconnect.Code != DisconnectConnectionClosed.Code && !hasFlag(c.transport.DisabledPushFlags(), PushFlagDisconnect) {
Expand Down Expand Up @@ -2227,6 +2226,20 @@ func (c *Client) unlockServerSideSubscriptions(subCtxMap map[string]subscribeCon
// connectCmd handles connect command from client - client must send connect
// command immediately after establishing connection with server.
func (c *Client) connectCmd(req *protocol.ConnectRequest, cmd *protocol.Command, started time.Time, rw *replyWriter) (*protocol.ConnectResult, error) {
var metricClientName string
if req.Name != "" {
metricClientName = "unregistered"
if slices.Contains(c.node.config.Metrics.RegisteredClientNames, req.Name) {
metricClientName = req.Name
}
}
var metricClientVersion string
if req.Version != "" {
metricClientVersion = "unregistered"
if c.node.config.Metrics.CheckRegisteredClientVersion != nil && c.node.config.Metrics.CheckRegisteredClientVersion(req.Name, req.Version) {
metricClientVersion = req.Version
}
}
c.mu.RLock()
authenticated := c.authenticated
closed := c.status == statusClosed
Expand Down Expand Up @@ -2393,15 +2406,16 @@ func (c *Client) connectCmd(req *protocol.ConnectRequest, cmd *protocol.Command,

// Client successfully connected.
c.mu.Lock()
if c.status == statusClosed {
c.mu.Unlock()
return nil, DisconnectConnectionClosed
}
c.authenticated = true
c.metricName = metricClientName
c.metricVersion = metricClientVersion
c.node.addClient(c)
c.mu.Unlock()

err := c.node.addClient(c)
if err != nil {
c.node.logger.log(newLogEntry(LogLevelError, "error adding client", map[string]any{"client": c.uid, "error": err.Error()}))
return nil, DisconnectServerError
}

if !clientSideRefresh {
// Server will do refresh itself.
res.Expires = false
Expand Down
13 changes: 10 additions & 3 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,9 +169,16 @@ type MetricsConfig struct {
// will expire in the cache. If zero – default TTL 10 seconds is used.
ChannelNamespaceCacheTTL time.Duration

// AdditionalTransportNames is an optional list of additional transport names for a metrics system
// to be aware of. It helps to initialize transport related metrics with zero values.
AdditionalTransportNames []string
// RegisteredClientNames is an optional list of known client names which will be allowed to be
// attached as labels to metrics. If client passed a name which is not in the list – then Centrifuge
// will use string "unregistered" as a client_name label. We need to be strict here to avoid
// Prometheus cardinality issues.
RegisteredClientNames []string
// CheckRegisteredClientVersion is a function to check whether the version passed by a client with a
// particular name is valid and can be used in metric values. When function is not set or returns
// false Centrifuge will use "unregistered" value for a client version. Note, the name argument here
// is an original name of client passed to Centrifuge.
CheckRegisteredClientVersion func(clientName string, clientVersion string) bool
}

// PingPongConfig allows configuring application level ping-pong behavior.
Expand Down
4 changes: 0 additions & 4 deletions handler_http_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,6 @@ const streamingResponseWriteTimeout = time.Second

func (h *HTTPStreamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
h.node.metrics.incTransportConnect(transportHTTPStream)
h.node.metrics.incTransportConnectionsInflight(transportHTTPStream)
defer func() {
h.node.metrics.decTransportConnectionsInflight(transportHTTPStream)
}()

if r.Method == http.MethodOptions {
w.WriteHeader(http.StatusOK)
Expand Down
4 changes: 0 additions & 4 deletions handler_sse.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,6 @@ const defaultMaxSSEBodySize = 64 * 1024

func (h *SSEHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
h.node.metrics.incTransportConnect(transportSSE)
h.node.metrics.incTransportConnectionsInflight(transportSSE)
defer func() {
h.node.metrics.decTransportConnectionsInflight(transportSSE)
}()

var requestData []byte
if r.Method == http.MethodGet {
Expand Down
5 changes: 0 additions & 5 deletions handler_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,6 @@ func NewWebsocketHandler(node *Node, config WebsocketConfig) *WebsocketHandler {

func (s *WebsocketHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
s.node.metrics.incTransportConnect(transportWebsocket)
s.node.metrics.incTransportConnectionsInflight(transportWebsocket)

var protoType = ProtocolTypeJSON
var useFramePingPong bool
Expand All @@ -149,7 +148,6 @@ func (s *WebsocketHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
conn, subProtocol, err := s.upgrade.Upgrade(rw, r, nil)
if err != nil {
s.node.logger.log(newLogEntry(LogLevelDebug, "websocket upgrade error", map[string]any{"error": err.Error()}))
s.node.metrics.decTransportConnectionsInflight(transportWebsocket)
return
}

Expand Down Expand Up @@ -187,9 +185,6 @@ func (s *WebsocketHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) {

// Separate goroutine for better GC of caller's data.
go func() {
defer func() {
s.node.metrics.decTransportConnectionsInflight(transportWebsocket)
}()
opts := websocketTransportOptions{
pingPong: s.config.PingPongConfig,
writeTimeout: writeTimeout,
Expand Down
29 changes: 14 additions & 15 deletions hub.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,17 +70,17 @@ func (h *Hub) shutdown(ctx context.Context) error {
}

// Add connection into clientHub connections registry.
func (h *Hub) add(c *Client) error {
func (h *Hub) add(c *Client) {
h.sessionsMu.Lock()
if c.sessionID() != "" {
h.sessions[c.sessionID()] = c
}
h.sessionsMu.Unlock()
return h.connShards[index(c.UserID(), numHubShards)].add(c)
h.connShards[index(c.UserID(), numHubShards)].add(c)
}

// Remove connection from clientHub connections registry.
func (h *Hub) remove(c *Client) error {
func (h *Hub) remove(c *Client) bool {
h.sessionsMu.Lock()
if c.sessionID() != "" {
delete(h.sessions, c.sessionID())
Expand Down Expand Up @@ -128,7 +128,7 @@ func (h *Hub) addSub(ch string, sub subInfo) (bool, error) {
}

// removeSub removes connection from clientHub subscriptions registry.
func (h *Hub) removeSub(ch string, c *Client) (bool, error) {
func (h *Hub) removeSub(ch string, c *Client) (bool, bool) {
return h.subShards[index(ch, numHubShards)].removeSub(ch, c)
}

Expand Down Expand Up @@ -416,7 +416,7 @@ func (h *connShard) userConnections(userID string) map[string]*Client {
}

// Add connection into clientHub connections registry.
func (h *connShard) add(c *Client) error {
func (h *connShard) add(c *Client) {
h.mu.Lock()
defer h.mu.Unlock()

Expand All @@ -429,11 +429,10 @@ func (h *connShard) add(c *Client) error {
h.users[user] = make(map[string]struct{})
}
h.users[user][uid] = struct{}{}
return nil
}

// Remove connection from clientHub connections registry.
func (h *connShard) remove(c *Client) error {
func (h *connShard) remove(c *Client) bool {
h.mu.Lock()
defer h.mu.Unlock()

Expand All @@ -444,10 +443,10 @@ func (h *connShard) remove(c *Client) error {

// try to find connection to delete, return early if not found.
if _, ok := h.users[user]; !ok {
return nil
return false
}
if _, ok := h.users[user][uid]; !ok {
return nil
return false
}

// actually remove connection from hub.
Expand All @@ -458,7 +457,7 @@ func (h *connShard) remove(c *Client) error {
delete(h.users, user)
}

return nil
return true
}

// NumClients returns total number of client connections.
Expand Down Expand Up @@ -533,18 +532,18 @@ func (h *subShard) addSub(ch string, sub subInfo) (bool, error) {
}

// removeSub removes connection from clientHub subscriptions registry.
func (h *subShard) removeSub(ch string, c *Client) (bool, error) {
func (h *subShard) removeSub(ch string, c *Client) (bool, bool) {
h.mu.Lock()
defer h.mu.Unlock()

uid := c.ID()

// try to find subscription to delete, return early if not found.
if _, ok := h.subs[ch]; !ok {
return true, nil
return true, false
}
if _, ok := h.subs[ch][uid]; !ok {
return true, nil
return true, false
}

// actually remove subscription from hub.
Expand All @@ -553,10 +552,10 @@ func (h *subShard) removeSub(ch string, c *Client) (bool, error) {
// clean up subs map if it's needed.
if len(h.subs[ch]) == 0 {
delete(h.subs, ch)
return true, nil
return true, true
}

return false, nil
return false, true
}

type encodeError struct {
Expand Down
70 changes: 21 additions & 49 deletions metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ type metrics struct {
numChannelsGauge prometheus.Gauge
numNodesGauge prometheus.Gauge
replyErrorCount *prometheus.CounterVec
connectionsInflight *prometheus.GaugeVec
subscriptionsInflight *prometheus.GaugeVec
serverUnsubscribeCount *prometheus.CounterVec
serverDisconnectCount *prometheus.CounterVec
commandDurationSummary *prometheus.SummaryVec
Expand All @@ -37,7 +39,6 @@ type metrics struct {
transportMessagesSentSize *prometheus.CounterVec
transportMessagesReceived *prometheus.CounterVec
transportMessagesReceivedSize *prometheus.CounterVec
transportConnectionsInflight *prometheus.GaugeVec

messagesReceivedCountPublication prometheus.Counter
messagesReceivedCountJoin prometheus.Counter
Expand All @@ -49,12 +50,9 @@ type metrics struct {
messagesSentCountLeave prometheus.Counter
messagesSentCountControl prometheus.Counter

transportConnectCountWebsocket prometheus.Counter
transportConnectCountSSE prometheus.Counter
transportConnectCountHTTPStream prometheus.Counter
transportConnectionsInflightWebsocket prometheus.Gauge
transportConnectionsInflightSSE prometheus.Gauge
transportConnectionsInflightHTTPStream prometheus.Gauge
transportConnectCountWebsocket prometheus.Counter
transportConnectCountSSE prometheus.Counter
transportConnectCountHTTPStream prometheus.Counter

commandDurationConnect prometheus.Observer
commandDurationSubscribe prometheus.Observer
Expand Down Expand Up @@ -255,20 +253,27 @@ func newMetricsRegistry(config MetricsConfig) (*metrics, error) {
1.0, 2.5, 5.0, 10.0, // Second resolution.
}}, []string{"transport"})

m.connectionsInflight = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: metricsNamespace,
Subsystem: "client",
Name: "connections_inflight",
Help: "Number of inflight client connections.",
}, []string{"transport", "client_name", "client_version"})

m.subscriptionsInflight = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: metricsNamespace,
Subsystem: "client",
Name: "subscriptions_inflight",
Help: "Number of inflight client subscriptions.",
}, []string{"client_name", "channel_namespace"})

m.transportConnectCount = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: metricsNamespace,
Subsystem: "transport",
Name: "connect_count",
Help: "Number of connect attempts to specific transport.",
}, []string{"transport"})

m.transportConnectionsInflight = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: metricsNamespace,
Subsystem: "transport",
Name: "inflight_connections",
Help: "Number of inflight connections over specific transport.",
}, []string{"transport"})

m.transportMessagesSent = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: metricsNamespace,
Subsystem: "transport",
Expand Down Expand Up @@ -329,14 +334,6 @@ func newMetricsRegistry(config MetricsConfig) (*metrics, error) {
m.transportConnectCountHTTPStream = m.transportConnectCount.WithLabelValues(transportHTTPStream)
m.transportConnectCountSSE = m.transportConnectCount.WithLabelValues(transportSSE)

m.transportConnectionsInflightWebsocket = m.transportConnectionsInflight.WithLabelValues(transportWebsocket)
m.transportConnectionsInflightHTTPStream = m.transportConnectionsInflight.WithLabelValues(transportHTTPStream)
m.transportConnectionsInflightSSE = m.transportConnectionsInflight.WithLabelValues(transportSSE)
for _, transportName := range config.AdditionalTransportNames {
m.transportConnectCount.WithLabelValues(transportName).Add(0)
m.transportConnectionsInflight.WithLabelValues(transportName).Set(0)
}

labelForMethod := func(frameType protocol.FrameType) string {
return frameType.String()
}
Expand Down Expand Up @@ -367,12 +364,13 @@ func newMetricsRegistry(config MetricsConfig) (*metrics, error) {
m.numNodesGauge,
m.commandDurationSummary,
m.replyErrorCount,
m.connectionsInflight,
m.subscriptionsInflight,
m.serverUnsubscribeCount,
m.serverDisconnectCount,
m.recoverCount,
m.pingPongDurationHistogram,
m.transportConnectCount,
m.transportConnectionsInflight,
m.transportMessagesSent,
m.transportMessagesSentSize,
m.transportMessagesReceived,
Expand Down Expand Up @@ -461,32 +459,6 @@ func (m *metrics) observeCommandDuration(frameType protocol.FrameType, d time.Du
observer.Observe(d.Seconds())
}

func (m *metrics) incTransportConnectionsInflight(transport string) {
switch transport {
case transportWebsocket:
m.transportConnectionsInflightWebsocket.Inc()
case transportSSE:
m.transportConnectionsInflightSSE.Inc()
case transportHTTPStream:
m.transportConnectionsInflightHTTPStream.Inc()
default:
m.transportConnectionsInflight.WithLabelValues(transport).Inc()
}
}

func (m *metrics) decTransportConnectionsInflight(transport string) {
switch transport {
case transportWebsocket:
m.transportConnectionsInflightWebsocket.Dec()
case transportSSE:
m.transportConnectionsInflightSSE.Dec()
case transportHTTPStream:
m.transportConnectionsInflightHTTPStream.Dec()
default:
m.transportConnectionsInflight.WithLabelValues(transport).Dec()
}
}

func (m *metrics) observePubSubDeliveryLag(lagTimeMilli int64) {
if lagTimeMilli < 0 {
lagTimeMilli = -lagTimeMilli
Expand Down
Loading

0 comments on commit abee564

Please sign in to comment.