Skip to content

Commit

Permalink
inflight_connections
Browse files Browse the repository at this point in the history
  • Loading branch information
FZambia committed Dec 10, 2024
1 parent 2324d83 commit f23315d
Show file tree
Hide file tree
Showing 6 changed files with 87 additions and 6 deletions.
4 changes: 4 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,10 @@ type MetricsConfig struct {
// ChannelNamespaceCacheTTL sets the time after which resolved channel namespace for a channel
// will expire in the cache. If zero – default TTL 10 seconds is used.
ChannelNamespaceCacheTTL time.Duration

// AdditionalTransportNames is an optional list of additional transport names for a metrics system
// to be aware of. It helps to initialize transport related metrics with zero values.
AdditionalTransportNames []string
}

// PingPongConfig allows configuring application level ping-pong behavior.
Expand Down
4 changes: 4 additions & 0 deletions handler_http_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ const streamingResponseWriteTimeout = time.Second

func (h *HTTPStreamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
h.node.metrics.incTransportConnect(transportHTTPStream)
h.node.metrics.incTransportConnectionsInflight(transportHTTPStream)
defer func() {
h.node.metrics.decTransportConnectionsInflight(transportHTTPStream)
}()

if r.Method == http.MethodOptions {
w.WriteHeader(http.StatusOK)
Expand Down
4 changes: 4 additions & 0 deletions handler_sse.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ const defaultMaxSSEBodySize = 64 * 1024

func (h *SSEHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
h.node.metrics.incTransportConnect(transportSSE)
h.node.metrics.incTransportConnectionsInflight(transportSSE)
defer func() {
h.node.metrics.decTransportConnectionsInflight(transportSSE)
}()

var requestData []byte
if r.Method == http.MethodGet {
Expand Down
5 changes: 5 additions & 0 deletions handler_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ func NewWebsocketHandler(node *Node, config WebsocketConfig) *WebsocketHandler {

func (s *WebsocketHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
s.node.metrics.incTransportConnect(transportWebsocket)
s.node.metrics.incTransportConnectionsInflight(transportWebsocket)

var protoType = ProtocolTypeJSON
var useFramePingPong bool
Expand All @@ -148,6 +149,7 @@ func (s *WebsocketHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
conn, subProtocol, err := s.upgrade.Upgrade(rw, r, nil)
if err != nil {
s.node.logger.log(newLogEntry(LogLevelDebug, "websocket upgrade error", map[string]any{"error": err.Error()}))
s.node.metrics.decTransportConnectionsInflight(transportWebsocket)
return
}

Expand Down Expand Up @@ -185,6 +187,9 @@ func (s *WebsocketHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) {

// Separate goroutine for better GC of caller's data.
go func() {
defer func() {
s.node.metrics.decTransportConnectionsInflight(transportWebsocket)
}()
opts := websocketTransportOptions{
pingPong: s.config.PingPongConfig,
writeTimeout: writeTimeout,
Expand Down
58 changes: 52 additions & 6 deletions metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type metrics struct {
transportMessagesSentSize *prometheus.CounterVec
transportMessagesReceived *prometheus.CounterVec
transportMessagesReceivedSize *prometheus.CounterVec
transportConnectionsInflight *prometheus.GaugeVec

messagesReceivedCountPublication prometheus.Counter
messagesReceivedCountJoin prometheus.Counter
Expand All @@ -48,9 +49,12 @@ type metrics struct {
messagesSentCountLeave prometheus.Counter
messagesSentCountControl prometheus.Counter

transportConnectCountWebsocket prometheus.Counter
transportConnectCountSSE prometheus.Counter
transportConnectCountHTTPStream prometheus.Counter
transportConnectCountWebsocket prometheus.Counter
transportConnectCountSSE prometheus.Counter
transportConnectCountHTTPStream prometheus.Counter
transportConnectionsInflightWebsocket prometheus.Gauge
transportConnectionsInflightSSE prometheus.Gauge
transportConnectionsInflightHTTPStream prometheus.Gauge

commandDurationConnect prometheus.Observer
commandDurationSubscribe prometheus.Observer
Expand Down Expand Up @@ -255,7 +259,14 @@ func newMetricsRegistry(config MetricsConfig) (*metrics, error) {
Namespace: metricsNamespace,
Subsystem: "transport",
Name: "connect_count",
Help: "Number of connections to specific transport.",
Help: "Number of connect attempts to specific transport.",
}, []string{"transport"})

m.transportConnectionsInflight = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: metricsNamespace,
Subsystem: "transport",
Name: "inflight_connections",
Help: "Number of inflight connections over specific transport.",
}, []string{"transport"})

m.transportMessagesSent = prometheus.NewCounterVec(prometheus.CounterOpts{
Expand All @@ -269,7 +280,7 @@ func newMetricsRegistry(config MetricsConfig) (*metrics, error) {
Namespace: metricsNamespace,
Subsystem: "transport",
Name: "messages_sent_size",
Help: "Size in bytes of messages sent to client connections over specific transport.",
Help: "Size in bytes of messages sent to client connections over specific transport (uncompressed and does not include framing overhead).",
}, []string{"transport", "frame_type", "channel_namespace"})

m.transportMessagesReceived = prometheus.NewCounterVec(prometheus.CounterOpts{
Expand All @@ -283,7 +294,7 @@ func newMetricsRegistry(config MetricsConfig) (*metrics, error) {
Namespace: metricsNamespace,
Subsystem: "transport",
Name: "messages_received_size",
Help: "Size in bytes of messages received from client connections over specific transport.",
Help: "Size in bytes of messages received from client connections over specific transport (uncompressed and does not include framing overhead).",
}, []string{"transport", "frame_type", "channel_namespace"})

m.pubSubLagHistogram = prometheus.NewHistogram(prometheus.HistogramOpts{
Expand Down Expand Up @@ -318,6 +329,14 @@ func newMetricsRegistry(config MetricsConfig) (*metrics, error) {
m.transportConnectCountHTTPStream = m.transportConnectCount.WithLabelValues(transportHTTPStream)
m.transportConnectCountSSE = m.transportConnectCount.WithLabelValues(transportSSE)

m.transportConnectionsInflightWebsocket = m.transportConnectionsInflight.WithLabelValues(transportWebsocket)
m.transportConnectionsInflightHTTPStream = m.transportConnectionsInflight.WithLabelValues(transportHTTPStream)
m.transportConnectionsInflightSSE = m.transportConnectionsInflight.WithLabelValues(transportSSE)
for _, transportName := range config.AdditionalTransportNames {
m.transportConnectCount.WithLabelValues(transportName).Add(0)
m.transportConnectionsInflight.WithLabelValues(transportName).Set(0)
}

labelForMethod := func(frameType protocol.FrameType) string {
return frameType.String()
}
Expand Down Expand Up @@ -353,6 +372,7 @@ func newMetricsRegistry(config MetricsConfig) (*metrics, error) {
m.recoverCount,
m.pingPongDurationHistogram,
m.transportConnectCount,
m.transportConnectionsInflight,
m.transportMessagesSent,
m.transportMessagesSentSize,
m.transportMessagesReceived,
Expand Down Expand Up @@ -441,6 +461,32 @@ func (m *metrics) observeCommandDuration(frameType protocol.FrameType, d time.Du
observer.Observe(d.Seconds())
}

func (m *metrics) incTransportConnectionsInflight(transport string) {
switch transport {
case transportWebsocket:
m.transportConnectionsInflightWebsocket.Inc()
case transportSSE:
m.transportConnectionsInflightSSE.Inc()
case transportHTTPStream:
m.transportConnectionsInflightHTTPStream.Inc()
default:
m.transportConnectionsInflight.WithLabelValues(transport).Inc()
}
}

func (m *metrics) decTransportConnectionsInflight(transport string) {
switch transport {
case transportWebsocket:
m.transportConnectionsInflightWebsocket.Dec()
case transportSSE:
m.transportConnectionsInflightSSE.Dec()
case transportHTTPStream:
m.transportConnectionsInflightHTTPStream.Dec()
default:
m.transportConnectionsInflight.WithLabelValues(transport).Dec()
}
}

func (m *metrics) observePubSubDeliveryLag(lagTimeMilli int64) {
if lagTimeMilli < 0 {
lagTimeMilli = -lagTimeMilli
Expand Down
18 changes: 18 additions & 0 deletions node.go
Original file line number Diff line number Diff line change
Expand Up @@ -1683,3 +1683,21 @@ func (n *Node) HandleLeave(ch string, info *ClientInfo) error {
func (n *Node) HandleControl(data []byte) error {
return n.handleControl(data)
}

// IncTransportConnectionsInflight increments number of transport connections inflight built-in gauge.
// Only useful if you are using a custom transport. For built-in transports this is done automatically.
func (n *Node) IncTransportConnectionsInflight(transportName string) {
n.metrics.incTransportConnectionsInflight(transportName)
}

// DecTransportConnectionsInflight decrements number of transport connections inflight built-in gauge.
// Only useful if you are using a custom transport. For built-in transports this is done automatically.
func (n *Node) DecTransportConnectionsInflight(transportName string) {
n.metrics.decTransportConnectionsInflight(transportName)
}

// IncTransportConnectCounter increments number of transport connect attempts built-in counter.
// Only useful if you are using a custom transport. For built-in transports this is done automatically.
func (n *Node) IncTransportConnectCounter(transportName string) {
n.metrics.incTransportConnect(transportName)
}

0 comments on commit f23315d

Please sign in to comment.