From 3ede04acd57e08abf3ef232756a4ee186f77ee74 Mon Sep 17 00:00:00 2001 From: achingbrain Date: Tue, 26 Nov 2024 16:29:54 +0000 Subject: [PATCH] chore: add metrics --- packages/transport-websockets/src/listener.ts | 42 +++++++++++++++---- 1 file changed, 35 insertions(+), 7 deletions(-) diff --git a/packages/transport-websockets/src/listener.ts b/packages/transport-websockets/src/listener.ts index aba19f51ad..20baead9f6 100644 --- a/packages/transport-websockets/src/listener.ts +++ b/packages/transport-websockets/src/listener.ts @@ -10,7 +10,7 @@ import duplex from 'it-ws/duplex' import { pEvent } from 'p-event' import * as ws from 'ws' import { socketToMaConn } from './socket-to-conn.js' -import type { ComponentLogger, Logger, Listener, ListenerEvents, CreateListenerOptions, CounterGroup, MetricGroup, Metrics, TLSCertificate, TypedEventTarget, Libp2pEvents, Upgrader } from '@libp2p/interface' +import type { ComponentLogger, Logger, Listener, ListenerEvents, CreateListenerOptions, CounterGroup, MetricGroup, Metrics, TLSCertificate, TypedEventTarget, Libp2pEvents, Upgrader, MultiaddrConnection } from '@libp2p/interface' import type { Multiaddr } from '@multiformats/multiaddr' import type { DuplexWebSocket } from 'it-ws/duplex' import type { EventEmitter } from 'node:events' @@ -115,6 +115,8 @@ export class WebSocketListener extends TypedEventEmitter impleme } async onSocketConnection (socket: net.Socket): Promise { + this.metrics.events?.increment({ [`${this.addr} connection`]: true }) + let buffer = socket.read(1) if (buffer == null) { @@ -139,15 +141,26 @@ export class WebSocketListener extends TypedEventEmitter impleme // store the socket so we can close it when the listener closes this.sockets.add(socket) + socket.on('close', () => { + this.metrics.events?.increment({ [`${this.addr} close`]: true }) this.sockets.delete(socket) }) socket.on('error', (err) => { this.log.error('socket error - %e', err) + this.metrics.events?.increment({ [`${this.addr} error`]: true }) socket.destroy() }) + socket.once('timeout', () => { + this.metrics.events?.increment({ [`${this.addr} timeout`]: true }) + }) + + socket.once('end', () => { + this.metrics.events?.increment({ [`${this.addr} end`]: true }) + }) + // re-queue first data chunk socket.unshift(buffer) @@ -184,13 +197,22 @@ export class WebSocketListener extends TypedEventEmitter impleme localPort: addr.port } - const maConn = socketToMaConn(stream, toMultiaddr(stream.remoteAddress ?? '', stream.remotePort ?? 0), { - logger: this.logger, - metrics: this.metrics?.events, - metricPrefix: `${this.addr} ` - }) - this.log('new inbound connection %s', maConn.remoteAddr) + let maConn: MultiaddrConnection + try { + maConn = socketToMaConn(stream, toMultiaddr(stream.remoteAddress ?? '', stream.remotePort ?? 0), { + logger: this.logger, + metrics: this.metrics?.events, + metricPrefix: `${this.addr} ` + }) + } catch (err: any) { + this.log.error('inbound connection failed', err) + this.metrics.errors?.increment({ [`${this.addr} inbound_to_connection`]: true }) + socket.close() + return + } + + this.log('new inbound connection %s', maConn.remoteAddr) const signal = AbortSignal.timeout(this.inboundConnectionUpgradeTimeout) setMaxListeners(Infinity, signal) @@ -240,16 +262,22 @@ export class WebSocketListener extends TypedEventEmitter impleme resolve() } const onError = (err: Error): void => { + this.metrics.errors?.increment({ [`${this.addr} listen_error`]: true }) removeListeners() reject(err) } + const onDrop = (): void => { + this.metrics.events?.increment({ [`${this.addr} drop`]: true }) + } const removeListeners = (): void => { this.server.removeListener('listening', onListening) this.server.removeListener('error', onError) + this.server.removeListener('drop', onDrop) } this.server.addListener('listening', onListening) this.server.addListener('error', onError) + this.server.addListener('drop', onDrop) }) this.safeDispatchEvent('listening')