Skip to content

Commit

Permalink
chore: add metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
achingbrain committed Nov 26, 2024
1 parent 17bd4fd commit 3ede04a
Showing 1 changed file with 35 additions and 7 deletions.
42 changes: 35 additions & 7 deletions packages/transport-websockets/src/listener.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import duplex from 'it-ws/duplex'
import { pEvent } from 'p-event'
import * as ws from 'ws'
import { socketToMaConn } from './socket-to-conn.js'
import type { ComponentLogger, Logger, Listener, ListenerEvents, CreateListenerOptions, CounterGroup, MetricGroup, Metrics, TLSCertificate, TypedEventTarget, Libp2pEvents, Upgrader } from '@libp2p/interface'
import type { ComponentLogger, Logger, Listener, ListenerEvents, CreateListenerOptions, CounterGroup, MetricGroup, Metrics, TLSCertificate, TypedEventTarget, Libp2pEvents, Upgrader, MultiaddrConnection } from '@libp2p/interface'
import type { Multiaddr } from '@multiformats/multiaddr'
import type { DuplexWebSocket } from 'it-ws/duplex'
import type { EventEmitter } from 'node:events'
Expand Down Expand Up @@ -115,6 +115,8 @@ export class WebSocketListener extends TypedEventEmitter<ListenerEvents> impleme
}

async onSocketConnection (socket: net.Socket): Promise<void> {
this.metrics.events?.increment({ [`${this.addr} connection`]: true })

let buffer = socket.read(1)

if (buffer == null) {
Expand All @@ -139,15 +141,26 @@ export class WebSocketListener extends TypedEventEmitter<ListenerEvents> impleme

// store the socket so we can close it when the listener closes
this.sockets.add(socket)

socket.on('close', () => {
this.metrics.events?.increment({ [`${this.addr} close`]: true })
this.sockets.delete(socket)
})

socket.on('error', (err) => {
this.log.error('socket error - %e', err)
this.metrics.events?.increment({ [`${this.addr} error`]: true })
socket.destroy()

Check warning on line 153 in packages/transport-websockets/src/listener.ts

View check run for this annotation

Codecov / codecov/patch

packages/transport-websockets/src/listener.ts#L151-L153

Added lines #L151 - L153 were not covered by tests
})

socket.once('timeout', () => {
this.metrics.events?.increment({ [`${this.addr} timeout`]: true })

Check warning on line 157 in packages/transport-websockets/src/listener.ts

View check run for this annotation

Codecov / codecov/patch

packages/transport-websockets/src/listener.ts#L157

Added line #L157 was not covered by tests
})

socket.once('end', () => {
this.metrics.events?.increment({ [`${this.addr} end`]: true })

Check warning on line 161 in packages/transport-websockets/src/listener.ts

View check run for this annotation

Codecov / codecov/patch

packages/transport-websockets/src/listener.ts#L161

Added line #L161 was not covered by tests
})

// re-queue first data chunk
socket.unshift(buffer)

Expand Down Expand Up @@ -184,13 +197,22 @@ export class WebSocketListener extends TypedEventEmitter<ListenerEvents> impleme
localPort: addr.port
}

const maConn = socketToMaConn(stream, toMultiaddr(stream.remoteAddress ?? '', stream.remotePort ?? 0), {
logger: this.logger,
metrics: this.metrics?.events,
metricPrefix: `${this.addr} `
})
this.log('new inbound connection %s', maConn.remoteAddr)
let maConn: MultiaddrConnection

try {
maConn = socketToMaConn(stream, toMultiaddr(stream.remoteAddress ?? '', stream.remotePort ?? 0), {
logger: this.logger,
metrics: this.metrics?.events,
metricPrefix: `${this.addr} `
})
} catch (err: any) {
this.log.error('inbound connection failed', err)
this.metrics.errors?.increment({ [`${this.addr} inbound_to_connection`]: true })
socket.close()

Check warning on line 211 in packages/transport-websockets/src/listener.ts

View check run for this annotation

Codecov / codecov/patch

packages/transport-websockets/src/listener.ts#L209-L211

Added lines #L209 - L211 were not covered by tests
return
}

this.log('new inbound connection %s', maConn.remoteAddr)
const signal = AbortSignal.timeout(this.inboundConnectionUpgradeTimeout)
setMaxListeners(Infinity, signal)

Expand Down Expand Up @@ -240,16 +262,22 @@ export class WebSocketListener extends TypedEventEmitter<ListenerEvents> impleme
resolve()
}
const onError = (err: Error): void => {
this.metrics.errors?.increment({ [`${this.addr} listen_error`]: true })
removeListeners()
reject(err)
}
const onDrop = (): void => {
this.metrics.events?.increment({ [`${this.addr} drop`]: true })
}

Check warning on line 271 in packages/transport-websockets/src/listener.ts

View check run for this annotation

Codecov / codecov/patch

packages/transport-websockets/src/listener.ts#L270-L271

Added lines #L270 - L271 were not covered by tests
const removeListeners = (): void => {
this.server.removeListener('listening', onListening)
this.server.removeListener('error', onError)
this.server.removeListener('drop', onDrop)
}

this.server.addListener('listening', onListening)
this.server.addListener('error', onError)
this.server.addListener('drop', onDrop)
})

this.safeDispatchEvent('listening')
Expand Down

0 comments on commit 3ede04a

Please sign in to comment.