From b97e91ab6f00119ecb705d229eec59fab75c2dd1 Mon Sep 17 00:00:00 2001 From: achingbrain Date: Tue, 19 Nov 2024 16:55:54 +0000 Subject: [PATCH 1/6] feat: auto-tls for websockets Starts a tcp server on the listen port and hands connections off to internal http or https servers (depdending on connection type). Upgrade requests from both servers are handled by a websocket server. The https server is enabled when either a secure websocket address is listened to explicitly, or when a TLS certificate is provisioned by another libp2p component, likely `@libp2p/auto-tls`. This means we don't need to add another port mapping for the https server since we run http and https over the same port. --- packages/transport-websockets/package.json | 3 +- packages/transport-websockets/src/index.ts | 45 ++- packages/transport-websockets/src/listener.ts | 375 +++++++++++++----- packages/transport-websockets/test/browser.ts | 5 + packages/transport-websockets/test/node.ts | 105 ++++- 5 files changed, 413 insertions(+), 120 deletions(-) diff --git a/packages/transport-websockets/package.json b/packages/transport-websockets/package.json index bc08520026..fd7fe06abe 100644 --- a/packages/transport-websockets/package.json +++ b/packages/transport-websockets/package.json @@ -78,10 +78,11 @@ "@libp2p/utils": "^6.2.1", "@multiformats/multiaddr": "^12.2.3", "@multiformats/multiaddr-matcher": "^1.4.0", - "@multiformats/multiaddr-to-uri": "^10.0.1", + "@multiformats/multiaddr-to-uri": "^11.0.0", "@types/ws": "^8.5.10", "it-ws": "^6.1.1", "p-defer": "^4.0.1", + "p-event": "^6.0.1", "progress-events": "^1.0.0", "race-signal": "^1.0.2", "wherearewe": "^2.0.1", diff --git a/packages/transport-websockets/src/index.ts b/packages/transport-websockets/src/index.ts index f18ffb5961..c38cbe0a19 100644 --- a/packages/transport-websockets/src/index.ts +++ b/packages/transport-websockets/src/index.ts @@ -57,7 +57,7 @@ * ``` */ -import { transportSymbol, serviceCapabilities, ConnectionFailedError } from '@libp2p/interface' +import { transportSymbol, serviceCapabilities, ConnectionFailedError, serviceDependencies } from '@libp2p/interface' import { multiaddrToUri as toUri } from '@multiformats/multiaddr-to-uri' import { connect, type WebSocketOptions } from 'it-ws/client' import pDefer from 'p-defer' @@ -67,21 +67,39 @@ import { isBrowser, isWebWorker } from 'wherearewe' import * as filters from './filters.js' import { createListener } from './listener.js' import { socketToMaConn } from './socket-to-conn.js' -import type { Transport, MultiaddrFilter, CreateListenerOptions, DialTransportOptions, Listener, AbortOptions, ComponentLogger, Logger, Connection, OutboundConnectionUpgradeEvents, Metrics, CounterGroup } from '@libp2p/interface' +import type { Transport, MultiaddrFilter, CreateListenerOptions, DialTransportOptions, Listener, AbortOptions, ComponentLogger, Logger, Connection, OutboundConnectionUpgradeEvents, Metrics, CounterGroup, TypedEventTarget, Libp2pEvents } from '@libp2p/interface' import type { Multiaddr } from '@multiformats/multiaddr' -import type { Server } from 'http' import type { DuplexWebSocket } from 'it-ws/duplex' +import type http from 'node:http' +import type https from 'node:https' import type { ProgressEvent } from 'progress-events' import type { ClientOptions } from 'ws' export interface WebSocketsInit extends AbortOptions, WebSocketOptions { filter?: MultiaddrFilter websocket?: ClientOptions - server?: Server + http?: http.ServerOptions + https?: https.ServerOptions + + /** + * If a service like `@libp2p/auto-tls` creates a TLS certificate this + * transport can use, upgrade any listeners from `/ws` to `/wss`. + * + * @default false + */ + autoTLS?: boolean + + /** + * Inbound connections must complete their upgrade within this many ms + * + * @default 5000 + */ + inboundConnectionUpgradeTimeout?: number } export interface WebSocketsComponents { logger: ComponentLogger + events: TypedEventTarget metrics?: Metrics } @@ -95,12 +113,12 @@ export type WebSocketsDialEvents = class WebSockets implements Transport { private readonly log: Logger - private readonly init?: WebSocketsInit + private readonly init: WebSocketsInit private readonly logger: ComponentLogger private readonly metrics?: WebSocketsMetrics private readonly components: WebSocketsComponents - constructor (components: WebSocketsComponents, init?: WebSocketsInit) { + constructor (components: WebSocketsComponents, init: WebSocketsInit = {}) { this.log = components.logger.forComponent('libp2p:websockets') this.logger = components.logger this.components = components @@ -124,6 +142,16 @@ class WebSockets implements Transport { '@libp2p/transport' ] + get [serviceDependencies] (): string[] { + if (this.init.autoTLS === true) { + return [ + '@libp2p/auto-tls' + ] + } + + return [] + } + async dial (ma: Multiaddr, options: DialTransportOptions): Promise { this.log('dialing %s', ma) options = options ?? {} @@ -180,13 +208,14 @@ class WebSockets implements Transport { } /** - * Creates a Websockets listener. The provided `handler` function will be called + * Creates a WebSockets listener. The provided `handler` function will be called * anytime a new incoming Connection has been successfully upgraded via * `upgrader.upgradeInbound` */ createListener (options: CreateListenerOptions): Listener { return createListener({ logger: this.logger, + events: this.components.events, metrics: this.components.metrics }, { ...this.init, @@ -195,7 +224,7 @@ class WebSockets implements Transport { } /** - * Takes a list of `Multiaddr`s and returns only valid Websockets addresses. + * Takes a list of `Multiaddr`s and returns only valid WebSockets addresses. * By default, in a browser environment only DNS+WSS multiaddr is accepted, * while in a Node.js environment DNS+{WS, WSS} multiaddrs are accepted. */ diff --git a/packages/transport-websockets/src/listener.ts b/packages/transport-websockets/src/listener.ts index b941aa071a..942c228e84 100644 --- a/packages/transport-websockets/src/listener.ts +++ b/packages/transport-websockets/src/listener.ts @@ -1,145 +1,314 @@ -import os from 'os' -import { TypedEventEmitter } from '@libp2p/interface' +import http from 'node:http' +import https from 'node:https' +import net from 'node:net' +import os from 'node:os' +import { TypedEventEmitter, setMaxListeners } from '@libp2p/interface' import { ipPortToMultiaddr as toMultiaddr } from '@libp2p/utils/ip-port-to-multiaddr' import { multiaddr, protocols } from '@multiformats/multiaddr' -import { createServer } from 'it-ws/server' +import { WebSockets, WebSocketsSecure } from '@multiformats/multiaddr-matcher' +import duplex from 'it-ws/duplex' +import { pEvent } from 'p-event' +import * as ws from 'ws' import { socketToMaConn } from './socket-to-conn.js' -import type { ComponentLogger, Logger, Listener, ListenerEvents, CreateListenerOptions, CounterGroup, MetricGroup, Metrics } from '@libp2p/interface' +import type { ComponentLogger, Logger, Listener, ListenerEvents, CreateListenerOptions, CounterGroup, MetricGroup, Metrics, TLSCertificate, TypedEventTarget, Libp2pEvents, Upgrader, ConnectionHandler } from '@libp2p/interface' import type { Multiaddr } from '@multiformats/multiaddr' -import type { Server } from 'http' import type { DuplexWebSocket } from 'it-ws/duplex' -import type { WebSocketServer } from 'it-ws/server' +import type { EventEmitter } from 'node:events' +import type { Server } from 'node:http' +import type { Duplex } from 'node:stream' export interface WebSocketListenerComponents { logger: ComponentLogger + events: TypedEventTarget metrics?: Metrics } export interface WebSocketListenerInit extends CreateListenerOptions { server?: Server + inboundConnectionUpgradeTimeout?: number + autoTLS?: boolean + cert?: string + key?: string + http?: http.ServerOptions + https?: http.ServerOptions } export interface WebSocketListenerMetrics { - status: MetricGroup - errors: CounterGroup - events: CounterGroup + status?: MetricGroup + errors?: CounterGroup + events?: CounterGroup } -class WebSocketListener extends TypedEventEmitter implements Listener { - private readonly connections: Set - private listeningMultiaddr?: Multiaddr - private readonly server: WebSocketServer +export class WebSocketListener extends TypedEventEmitter implements Listener { private readonly log: Logger - private metrics?: WebSocketListenerMetrics - private addr: string + private readonly logger: ComponentLogger + private readonly server: net.Server + private readonly wsServer: ws.WebSocketServer + private readonly metrics: WebSocketListenerMetrics + private readonly sockets: Set + private readonly upgrader: Upgrader + private readonly inboundConnectionUpgradeTimeout: number + private readonly httpOptions?: http.ServerOptions + private readonly httpsOptions?: https.ServerOptions + private http?: http.Server + private https?: https.Server + private addr?: string + private listeningMultiaddr?: Multiaddr constructor (components: WebSocketListenerComponents, init: WebSocketListenerInit) { super() this.log = components.logger.forComponent('libp2p:websockets:listener') - const metrics = components.metrics - // Keep track of open connections to destroy when the listener is closed - this.connections = new Set() + this.logger = components.logger + this.upgrader = init.upgrader + this.httpOptions = init.http + this.httpsOptions = init.https + this.inboundConnectionUpgradeTimeout = init.inboundConnectionUpgradeTimeout ?? 5000 + this.sockets = new Set() - const self = this // eslint-disable-line @typescript-eslint/no-this-alias + this.wsServer = new ws.WebSocketServer({ + noServer: true + }) + this.wsServer.addListener('connection', this.onWsServerConnection.bind(this)) - this.addr = 'unknown' + components.metrics?.registerMetricGroup('libp2p_websockets_inbound_connections_total', { + label: 'address', + help: 'Current active connections in WebSocket listener', + calculate: () => { + if (this.addr == null) { + return {} + } - this.server = createServer({ - ...init, - onConnection: (stream: DuplexWebSocket) => { - const maConn = socketToMaConn(stream, toMultiaddr(stream.remoteAddress ?? '', stream.remotePort ?? 0), { - logger: components.logger, - metrics: this.metrics?.events, - metricPrefix: `${this.addr} ` - }) - this.log('new inbound connection %s', maConn.remoteAddr) + return { + [this.addr]: this.sockets.size + } + } + }) + + this.metrics = { + status: components.metrics?.registerMetricGroup('libp2p_websockets_listener_status_info', { + label: 'address', + help: 'Current status of the WebSocket listener socket' + }), + errors: components.metrics?.registerMetricGroup('libp2p_websockets_listener_errors_total', { + label: 'address', + help: 'Total count of WebSocket listener errors by type' + }), + events: components.metrics?.registerMetricGroup('libp2p_websockets_listener_events_total', { + label: 'address', + help: 'Total count of WebSocket listener events by type' + }) + } - this.connections.add(stream) + this.server = net.createServer(socket => { + socket.once('data', buffer => { + console.info('---> incoming packet') +try { + // Pause the socket + socket.pause() - stream.socket.on('close', function () { - self.connections.delete(stream) + // Determine if this is an HTTP(s) request + const byte = buffer[0] + + let server: EventEmitter | undefined = this.http + + if (byte === 22) { + console.info('---> incoming https packet') + server = this.https + } else { + console.info('---> incoming http packet') + } + + if (server == null) { + this.log.error('no appropriate listener configured for byte %d', byte) + socket.destroy() + return + } + + // store the socket so we can close it when the listener closes + this.sockets.add(socket) + socket.on('close', () => { + this.sockets.delete(socket) }) - init.upgrader.upgradeInbound(maConn) - .catch(async err => { - this.log.error('inbound connection failed to upgrade', err) - this.metrics?.errors.increment({ [`${this.addr} inbound_upgrade`]: true }) + // push the buffer back onto the front of the data stream + socket.unshift(buffer) - try { - maConn.abort(err) - } catch (err) { - this.log.error('inbound connection failed to close after upgrade failed - %e', err) - this.metrics?.errors.increment({ [`${this.addr} inbound_closing_failed`]: true }) - } - }) + // emit the socket to the relevant server + server.emit('connection', socket) + + // TODO: verify this + // As of NodeJS 10.x the socket must be + // resumed asynchronously or the socket + // connection hangs, potentially crashing + // the process. Prior to NodeJS 10.x + // the socket may be resumed synchronously. + process.nextTick(() => socket.resume()) + } catch (err) { + console.error('error handling socket data', err) } + }) }) - this.server.on('listening', () => { - if (metrics != null) { - const { host, port } = this.listeningMultiaddr?.toOptions() ?? {} - this.addr = `${host}:${port}` - - metrics.registerMetricGroup('libp2p_websockets_inbound_connections_total', { - label: 'address', - help: 'Current active connections in WebSocket listener', - calculate: () => { - return { - [this.addr]: this.connections.size - } - } - }) + if (init?.autoTLS === true) { + components.events.addEventListener('certificate:provision', this.onCertificateProvision.bind(this)) + components.events.addEventListener('certificate:renew', this.onCertificateRenew.bind(this)) + } + } - this.metrics = { - status: metrics?.registerMetricGroup('libp2p_websockets_listener_status_info', { - label: 'address', - help: 'Current status of the WebSocket listener socket' - }), - errors: metrics?.registerMetricGroup('libp2p_websockets_listener_errors_total', { - label: 'address', - help: 'Total count of WebSocket listener errors by type' - }), - events: metrics?.registerMetricGroup('libp2p_websockets_listener_events_total', { - label: 'address', - help: 'Total count of WebSocket listener events by type' - }) - } + onWsServerConnection (socket: ws.WebSocket, req: http.IncomingMessage): void { + let addr: string | ws.AddressInfo | null + + try { + if (req.socket.remoteAddress == null || req.socket.remotePort == null) { + throw new Error('Remote connection did not have address and/or port') } - this.dispatchEvent(new CustomEvent('listening')) - }) - this.server.on('error', (err: Error) => { - this.metrics?.errors.increment({ [`${this.addr} listen_error`]: true }) - this.dispatchEvent(new CustomEvent('error', { - detail: err - })) + + addr = this.server.address() + + if (typeof addr === 'string') { + throw new Error('Cannot listen on unix sockets') + } + + if (addr == null) { + throw new Error('Server was closing or not running') + } + } catch (err: any) { + this.log.error('error obtaining remote socket address - %e', err) + req.destroy(err) + socket.close() + return + } + + const stream: DuplexWebSocket = { + ...duplex(socket, { + remoteAddress: req.socket.remoteAddress, + remotePort: req.socket.remotePort + }), + localAddress: addr.address, + localPort: addr.port + } + + const maConn = socketToMaConn(stream, toMultiaddr(stream.remoteAddress ?? '', stream.remotePort ?? 0), { + logger: this.logger, + metrics: this.metrics?.events, + metricPrefix: `${this.addr} ` }) - this.server.on('close', () => { - this.dispatchEvent(new CustomEvent('close')) + this.log('new inbound connection %s', maConn.remoteAddr) + + const signal = AbortSignal.timeout(this.inboundConnectionUpgradeTimeout) + setMaxListeners(Infinity, signal) + + this.upgrader.upgradeInbound(maConn, { + signal }) + .catch(async err => { + this.log.error('inbound connection failed to upgrade - %e', err) + this.metrics.errors?.increment({ [`${this.addr} inbound_upgrade`]: true }) + + await maConn.close() + .catch(err => { + this.log.error('inbound connection failed to close after upgrade failed', err) + this.metrics.errors?.increment({ [`${this.addr} inbound_closing_failed`]: true }) + }) + }) } - async close (): Promise { - await Promise.all( - Array.from(this.connections).map(async maConn => { await maConn.close() }) - ) + onUpgrade (req: http.IncomingMessage, socket: Duplex, head: Buffer): void { + this.wsServer.handleUpgrade(req, socket, head, this.onWsServerConnection.bind(this)) + } + + async listen (ma: Multiaddr): Promise { + if (WebSockets.exactMatch(ma)) { + this.http = http.createServer(this.httpOptions ?? {}) + this.http.addListener('upgrade', this.onUpgrade.bind(this)) + } else if (WebSocketsSecure.exactMatch(ma)) { + this.https = https.createServer(this.httpsOptions ?? {}) + this.https.addListener('upgrade', this.onUpgrade.bind(this)) + } + + this.listeningMultiaddr = ma + const { host, port } = ma.toOptions() + this.addr = `${host}:${port}` + + this.server.listen(port) + + await new Promise((resolve, reject) => { + const onListening = (): void => { + removeListeners() + resolve() + } + const onError = (err: Error): void => { + removeListeners() + reject(err) + } + const removeListeners = (): void => { + this.server.removeListener('listening', onListening) + this.server.removeListener('error', onError) + } + + this.server.addListener('listening', onListening) + this.server.addListener('error', onError) + }) + + this.safeDispatchEvent('listening') + } - if (this.server.address() == null) { - // not listening, close will throw an error + onCertificateProvision (event: CustomEvent): void { + if (this.https != null) { + this.log('auto-tls certificate found but already listening on https') return } - await this.server.close() + this.log('auto-tls certificate found, starting https server') + this.https = https.createServer({ + ...this.httpsOptions, + ...event.detail + }) + this.https.addListener('upgrade', this.onUpgrade.bind(this)) + this.safeDispatchEvent('listening') } - async listen (ma: Multiaddr): Promise { - this.listeningMultiaddr = ma + onCertificateRenew (event: CustomEvent): void { + // stop accepting new connections + this.https?.close() + this.https?.removeListener('upgrade', this.onUpgrade.bind(this)) - await this.server.listen(ma.toOptions()) + this.log('auto-tls certificate renews, restarting https server') + this.https = https.createServer({ + ...this.httpsOptions, + ...event.detail + }) + this.https.addListener('upgrade', this.onUpgrade.bind(this)) + } + + async close (): Promise { + this.server.close() + this.http?.close() + this.https?.close() + this.wsServer.close() + + this.http?.closeAllConnections() + this.https?.closeAllConnections() + + ;[...this.sockets].forEach(socket => { + socket.destroy() + }) + + await Promise.all([ + pEvent(this.server, 'close'), + this.http == null ? null : pEvent(this.http, 'close'), + this.https == null ? null : pEvent(this.https, 'close'), + pEvent(this.wsServer, 'close') + ]) + + this.safeDispatchEvent('close') } getAddrs (): Multiaddr[] { - const multiaddrs = [] + console.info('getting ws addresses: http:', Boolean(this.http), 'https:', Boolean(this.https)) + + const multiaddrs: Multiaddr[] = [] const address = this.server.address() if (address == null) { @@ -154,7 +323,6 @@ class WebSocketListener extends TypedEventEmitter implements Lis throw new Error('Listener is not ready yet') } - const ipfsId = this.listeningMultiaddr.getPeerId() const protos = this.listeningMultiaddr.protos() // Because TCP will only return the IPv6 version @@ -163,28 +331,37 @@ class WebSocketListener extends TypedEventEmitter implements Lis const wsProto = protos.some(proto => proto.code === protocols('ws').code) ? '/ws' : '/wss' let m = this.listeningMultiaddr.decapsulate('tcp') m = m.encapsulate(`/tcp/${address.port}${wsProto}`) - if (ipfsId != null) { - m = m.encapsulate(`/p2p/${ipfsId}`) - } + const options = m.toOptions() - if (m.toString().includes('0.0.0.0')) { - const netInterfaces = os.networkInterfaces() - Object.values(netInterfaces).forEach(niInfos => { + if (options.host === '0.0.0.0') { + Object.values(os.networkInterfaces()).forEach(niInfos => { if (niInfos == null) { return } niInfos.forEach(ni => { if (ni.family === 'IPv4') { - multiaddrs.push(multiaddr(m.toString().replace('0.0.0.0', ni.address))) + multiaddrs.push(multiaddr(`/ip${options.family}/${ni.address}/${options.transport}/${options.port}/ws`)) + + if (this.https != null && WebSockets.exactMatch(m)) { + multiaddrs.push(multiaddr(`/ip${options.family}/${ni.address}/${options.transport}/${options.port}/tls/ws`)) + } } }) }) } else { multiaddrs.push(m) } + + if (this.https != null && WebSockets.exactMatch(m)) { + multiaddrs.push( + m.decapsulate('/ws').encapsulate('/tls/ws') + ) + } } + console.info('ws addresses:\n', multiaddrs.map(ma => ma.toString()).join('\n')) + return multiaddrs } } diff --git a/packages/transport-websockets/test/browser.ts b/packages/transport-websockets/test/browser.ts index 3c01ed0edd..e9d19daad9 100644 --- a/packages/transport-websockets/test/browser.ts +++ b/packages/transport-websockets/test/browser.ts @@ -1,5 +1,6 @@ /* eslint-env mocha */ +import { TypedEventEmitter } from '@libp2p/interface' import { defaultLogger } from '@libp2p/logger' import { multiaddr } from '@multiformats/multiaddr' import { expect } from 'aegir/chai' @@ -11,7 +12,10 @@ describe('libp2p-websockets', () => { let ws: Transport beforeEach(async () => { + const events = new TypedEventEmitter() + ws = webSockets()({ + events, logger: defaultLogger() }) }) @@ -34,6 +38,7 @@ describe('libp2p-websockets', () => { it('.createServer throws in browser', () => { expect(webSockets()({ + events: new TypedEventEmitter(), logger: defaultLogger() }).createListener).to.throw() }) diff --git a/packages/transport-websockets/test/node.ts b/packages/transport-websockets/test/node.ts index 7497be4565..1b2ca00b37 100644 --- a/packages/transport-websockets/test/node.ts +++ b/packages/transport-websockets/test/node.ts @@ -1,24 +1,27 @@ /* eslint-env mocha */ /* eslint max-nested-callbacks: ["error", 6] */ -import fs from 'fs' -import http from 'http' -import https from 'https' +import fs from 'node:fs' +import http from 'node:http' +import { TypedEventEmitter } from '@libp2p/interface' import { defaultLogger } from '@libp2p/logger' import { multiaddr } from '@multiformats/multiaddr' +import { WebSockets, WebSocketsSecure } from '@multiformats/multiaddr-matcher' import { expect } from 'aegir/chai' import { isLoopbackAddr } from 'is-loopback-addr' +import { pEvent } from 'p-event' import pWaitFor from 'p-wait-for' import Sinon from 'sinon' import { stubInterface } from 'sinon-ts' import * as filters from '../src/filters.js' import { webSockets } from '../src/index.js' -import type { Connection, Listener, Transport, Upgrader } from '@libp2p/interface' +import type { Connection, Libp2pEvents, Listener, Transport, Upgrader, TLSCertificate } from '@libp2p/interface' import type { StubbedInstance } from 'sinon-ts' describe('instantiate the transport', () => { it('create', () => { const ws = webSockets()({ + events: new TypedEventEmitter(), logger: defaultLogger() }) expect(ws).to.exist() @@ -46,6 +49,7 @@ describe('listen', () => { beforeEach(() => { ws = webSockets()({ + events: new TypedEventEmitter(), logger: defaultLogger() }) }) @@ -164,6 +168,7 @@ describe('listen', () => { beforeEach(() => { ws = webSockets()({ + events: new TypedEventEmitter(), logger: defaultLogger() }) }) @@ -225,6 +230,7 @@ describe('dial', () => { beforeEach(async () => { ws = webSockets()({ + events: new TypedEventEmitter(), logger: defaultLogger() }) listener = ws.createListener({ @@ -264,6 +270,7 @@ describe('dial', () => { it('should resolve port 0', async () => { const ma = multiaddr('/ip4/127.0.0.1/tcp/0/ws') const ws = webSockets()({ + events: new TypedEventEmitter(), logger: defaultLogger() }) @@ -295,6 +302,7 @@ describe('dial', () => { beforeEach(async () => { ws = webSockets()({ + events: new TypedEventEmitter(), logger: defaultLogger() }) listener = ws.createListener({ @@ -328,14 +336,18 @@ describe('dial', () => { let ws: Transport let listener: Listener const ma = multiaddr('/ip4/127.0.0.1/tcp/37284/wss') - let server: https.Server beforeEach(async () => { - server = https.createServer({ - cert: fs.readFileSync('./test/fixtures/certificate.pem'), - key: fs.readFileSync('./test/fixtures/key.pem') - }) - ws = webSockets({ websocket: { rejectUnauthorized: false }, server })({ + ws = webSockets({ + websocket: { + rejectUnauthorized: false + }, + https: { + cert: fs.readFileSync('./test/fixtures/certificate.pem'), + key: fs.readFileSync('./test/fixtures/key.pem') + } + })({ + events: new TypedEventEmitter(), logger: defaultLogger() }) listener = ws.createListener({ @@ -346,8 +358,6 @@ describe('dial', () => { afterEach(async () => { await listener.close() - server.close() - server.closeAllConnections() }) it('should listen on wss address', () => { @@ -370,6 +380,7 @@ describe('dial', () => { beforeEach(async () => { ws = webSockets()({ + events: new TypedEventEmitter(), logger: defaultLogger() }) listener = ws.createListener({ @@ -401,6 +412,7 @@ describe('filter addrs', () => { describe('default filter addrs with only dns', () => { before(() => { ws = webSockets()({ + events: new TypedEventEmitter(), logger: defaultLogger() }) }) @@ -471,6 +483,7 @@ describe('filter addrs', () => { describe('custom filter addrs', () => { before(() => { ws = webSockets({ filter: filters.all })({ + events: new TypedEventEmitter(), logger: defaultLogger() }) }) @@ -600,3 +613,71 @@ describe('filter addrs', () => { }) }) }) + +describe('auto-tls', () => { + let ws: Transport + let listener: Listener + let events: TypedEventEmitter + const ma = multiaddr('/ip4/127.0.0.1/tcp/37284/ws') + + beforeEach(async () => { + events = new TypedEventEmitter() + + const upgrader = stubInterface({ + upgradeInbound: Sinon.stub().resolves(), + upgradeOutbound: async () => { + return stubInterface() + } + }) + + ws = webSockets({ + websocket: { + rejectUnauthorized: false + }, + autoTLS: true + })({ + events, + logger: defaultLogger() + }) + listener = ws.createListener({ + upgrader + }) + await listener.listen(ma) + }) + + afterEach(async () => { + await listener.close() + }) + + it('should listen on wss after a certificate is found', async () => { + const addrs = listener.getAddrs() + expect(addrs).to.have.lengthOf(1) + expect(WebSockets.exactMatch(addrs[0])).to.be.true() + + const listeningPromise = pEvent(listener, 'listening') + + events.safeDispatchEvent('certificate:provision', { + detail: { + key: fs.readFileSync('./test/fixtures/key.pem', { + encoding: 'utf-8' + }), + cert: fs.readFileSync('./test/fixtures/certificate.pem', { + encoding: 'utf-8' + }) + } + }) + + await listeningPromise + + const addrs2 = listener.getAddrs() + expect(addrs2).to.have.lengthOf(2) + expect(WebSockets.exactMatch(addrs2[0])).to.be.true() + expect(WebSocketsSecure.exactMatch(addrs2[1])).to.be.true() + + const wsOptions = addrs2[0].toOptions() + const wssOptions = addrs2[1].toOptions() + + expect(wsOptions.host).to.equal(wssOptions.host) + expect(wsOptions.port).to.equal(wssOptions.port) + }) +}) From 66372e7307a9be07b29b33dcaa0d5fbf67772cdf Mon Sep 17 00:00:00 2001 From: achingbrain Date: Wed, 20 Nov 2024 17:55:38 +0000 Subject: [PATCH 2/6] chore: remove unused import --- packages/transport-websockets/src/listener.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/transport-websockets/src/listener.ts b/packages/transport-websockets/src/listener.ts index 942c228e84..40b46c7684 100644 --- a/packages/transport-websockets/src/listener.ts +++ b/packages/transport-websockets/src/listener.ts @@ -10,7 +10,7 @@ import duplex from 'it-ws/duplex' import { pEvent } from 'p-event' import * as ws from 'ws' import { socketToMaConn } from './socket-to-conn.js' -import type { ComponentLogger, Logger, Listener, ListenerEvents, CreateListenerOptions, CounterGroup, MetricGroup, Metrics, TLSCertificate, TypedEventTarget, Libp2pEvents, Upgrader, ConnectionHandler } from '@libp2p/interface' +import type { ComponentLogger, Logger, Listener, ListenerEvents, CreateListenerOptions, CounterGroup, MetricGroup, Metrics, TLSCertificate, TypedEventTarget, Libp2pEvents, Upgrader } from '@libp2p/interface' import type { Multiaddr } from '@multiformats/multiaddr' import type { DuplexWebSocket } from 'it-ws/duplex' import type { EventEmitter } from 'node:events' From 2046823aece110929261ef22408f2187ae0d0a49 Mon Sep 17 00:00:00 2001 From: Alex Potsides Date: Mon, 25 Nov 2024 09:01:44 +0000 Subject: [PATCH 3/6] chore: revert merge --- packages/transport-websockets/test/browser.ts | 27 ------------------- 1 file changed, 27 deletions(-) diff --git a/packages/transport-websockets/test/browser.ts b/packages/transport-websockets/test/browser.ts index 9e818d0ab2..576bd131ee 100644 --- a/packages/transport-websockets/test/browser.ts +++ b/packages/transport-websockets/test/browser.ts @@ -6,33 +6,6 @@ import { expect } from 'aegir/chai' import { webSockets } from '../src/index.js' describe('libp2p-websockets', () => { - let ws: Transport - - beforeEach(async () => { - const events = new TypedEventEmitter() - - ws = webSockets()({ - events, - logger: defaultLogger() - }) - }) - - it('should filter out no wss websocket addresses', function () { - const ma1 = multiaddr('/ip4/127.0.0.1/tcp/80/ws') - const ma2 = multiaddr('/ip4/127.0.0.1/tcp/443/wss') - const ma3 = multiaddr('/ip6/::1/tcp/80/ws') - const ma4 = multiaddr('/ip6/::1/tcp/443/wss') - - const valid = ws.dialFilter([ma1, ma2, ma3, ma4]) - - if (isBrowser || isWebWorker) { - expect(valid.length).to.equal(2) - expect(valid).to.deep.equal([ma2, ma4]) - } else { - expect(valid.length).to.equal(4) - } - }) - it('.createServer throws in browser', () => { expect(webSockets()({ events: new TypedEventEmitter(), From 6ae1f97ed216842dc3267ee9d30408989af05081 Mon Sep 17 00:00:00 2001 From: achingbrain Date: Tue, 26 Nov 2024 09:16:58 +0000 Subject: [PATCH 4/6] chore: open http/https listeners on same port --- packages/transport-websockets/src/index.ts | 29 ++-- packages/transport-websockets/src/listener.ts | 153 +++++++++--------- packages/transport-websockets/test/node.ts | 3 +- 3 files changed, 93 insertions(+), 92 deletions(-) diff --git a/packages/transport-websockets/src/index.ts b/packages/transport-websockets/src/index.ts index f060d3f7bf..4fca358400 100644 --- a/packages/transport-websockets/src/index.ts +++ b/packages/transport-websockets/src/index.ts @@ -23,7 +23,7 @@ * ``` */ -import { transportSymbol, serviceCapabilities, ConnectionFailedError, serviceDependencies } from '@libp2p/interface' +import { transportSymbol, serviceCapabilities, ConnectionFailedError } from '@libp2p/interface' import { multiaddrToUri as toUri } from '@multiformats/multiaddr-to-uri' import { connect, type WebSocketOptions } from 'it-ws/client' import pDefer from 'p-defer' @@ -45,17 +45,22 @@ export interface WebSocketsInit extends AbortOptions, WebSocketOptions { * @deprecated Use a ConnectionGater instead */ filter?: MultiaddrFilter + + /** + * Options used to create WebSockets + */ websocket?: ClientOptions + + /** + * Options used to create the HTTP server + */ http?: http.ServerOptions - https?: https.ServerOptions /** - * If a service like `@libp2p/auto-tls` creates a TLS certificate this - * transport can use, upgrade any listeners from `/ws` to `/wss`. - * - * @default false + * Options used to create the HTTPs server. `options.http` will be used if + * unspecified. */ - autoTLS?: boolean + https?: https.ServerOptions /** * Inbound connections must complete their upgrade within this many ms @@ -110,16 +115,6 @@ class WebSockets implements Transport { '@libp2p/transport' ] - get [serviceDependencies] (): string[] { - if (this.init.autoTLS === true) { - return [ - '@libp2p/auto-tls' - ] - } - - return [] - } - async dial (ma: Multiaddr, options: DialTransportOptions): Promise { this.log('dialing %s', ma) options = options ?? {} diff --git a/packages/transport-websockets/src/listener.ts b/packages/transport-websockets/src/listener.ts index 40b46c7684..f19cc2456c 100644 --- a/packages/transport-websockets/src/listener.ts +++ b/packages/transport-websockets/src/listener.ts @@ -16,6 +16,7 @@ import type { DuplexWebSocket } from 'it-ws/duplex' import type { EventEmitter } from 'node:events' import type { Server } from 'node:http' import type { Duplex } from 'node:stream' +import type tls from 'node:tls' export interface WebSocketListenerComponents { logger: ComponentLogger @@ -26,7 +27,6 @@ export interface WebSocketListenerComponents { export interface WebSocketListenerInit extends CreateListenerOptions { server?: Server inboundConnectionUpgradeTimeout?: number - autoTLS?: boolean cert?: string key?: string http?: http.ServerOptions @@ -62,7 +62,7 @@ export class WebSocketListener extends TypedEventEmitter impleme this.logger = components.logger this.upgrader = init.upgrader this.httpOptions = init.http - this.httpsOptions = init.https + this.httpsOptions = init.https ?? init.http this.inboundConnectionUpgradeTimeout = init.inboundConnectionUpgradeTimeout ?? 5000 this.sockets = new Set() @@ -100,70 +100,66 @@ export class WebSocketListener extends TypedEventEmitter impleme }) } - this.server = net.createServer(socket => { - socket.once('data', buffer => { - console.info('---> incoming packet') -try { - // Pause the socket - socket.pause() + this.server = net.createServer({ + pauseOnConnect: true + }, (socket) => { + this.onSocketConnection(socket) + .catch(err => { + this.log.error('error handling socket - %e', err) + socket.destroy() + }) + }) - // Determine if this is an HTTP(s) request - const byte = buffer[0] + components.events.addEventListener('certificate:provision', this.onCertificateProvision.bind(this)) + components.events.addEventListener('certificate:renew', this.onCertificateRenew.bind(this)) + } - let server: EventEmitter | undefined = this.http + async onSocketConnection (socket: net.Socket): Promise { + let buffer = socket.read(1) - if (byte === 22) { - console.info('---> incoming https packet') - server = this.https - } else { - console.info('---> incoming http packet') - } + if (buffer == null) { + await pEvent(socket, 'readable') + buffer = socket.read(1) + } - if (server == null) { - this.log.error('no appropriate listener configured for byte %d', byte) - socket.destroy() - return - } + // determine if this is an HTTP(s) request + const byte = buffer[0] + let server: EventEmitter | undefined = this.http - // store the socket so we can close it when the listener closes - this.sockets.add(socket) - socket.on('close', () => { - this.sockets.delete(socket) - }) + // https://github.com/mscdex/httpolyglot/blob/1c6c4af65f4cf95a32c918d0fdcc532e0c095740/lib/index.js#L92 + if (byte < 32 || byte >= 127) { + server = this.https + } - // push the buffer back onto the front of the data stream - socket.unshift(buffer) - - // emit the socket to the relevant server - server.emit('connection', socket) - - // TODO: verify this - // As of NodeJS 10.x the socket must be - // resumed asynchronously or the socket - // connection hangs, potentially crashing - // the process. Prior to NodeJS 10.x - // the socket may be resumed synchronously. - process.nextTick(() => socket.resume()) - } catch (err) { - console.error('error handling socket data', err) - } - }) + if (server == null) { + this.log.error('no appropriate listener configured for byte %d', byte) + socket.destroy() + return + } + + // store the socket so we can close it when the listener closes + // TODO: is this necessary if we can `this.https.closeAllConnections`? + this.sockets.add(socket) + socket.on('close', () => { + this.sockets.delete(socket) }) - if (init?.autoTLS === true) { - components.events.addEventListener('certificate:provision', this.onCertificateProvision.bind(this)) - components.events.addEventListener('certificate:renew', this.onCertificateRenew.bind(this)) - } + socket.on('error', (err) => { + this.log.error('socket error - %e', err) + socket.destroy() + }) + + // re-queue first data chunk + socket.unshift(buffer) + + // hand the socket off to the appropriate server + server.emit('connection', socket) } onWsServerConnection (socket: ws.WebSocket, req: http.IncomingMessage): void { let addr: string | ws.AddressInfo | null try { - if (req.socket.remoteAddress == null || req.socket.remotePort == null) { - throw new Error('Remote connection did not have address and/or port') - } - addr = this.server.address() if (typeof addr === 'string') { @@ -182,8 +178,8 @@ try { const stream: DuplexWebSocket = { ...duplex(socket, { - remoteAddress: req.socket.remoteAddress, - remotePort: req.socket.remotePort + remoteAddress: req.socket.remoteAddress ?? '0.0.0.0', + remotePort: req.socket.remotePort ?? 0 }), localAddress: addr.address, localPort: addr.port @@ -218,13 +214,19 @@ try { this.wsServer.handleUpgrade(req, socket, head, this.onWsServerConnection.bind(this)) } + onTLSClientError (err: Error, socket: tls.TLSSocket): void { + this.log.error('TLS client error - %e', err) + socket.destroy() + } + async listen (ma: Multiaddr): Promise { if (WebSockets.exactMatch(ma)) { - this.http = http.createServer(this.httpOptions ?? {}) + this.http = http.createServer(this.httpOptions ?? {}, this.httpRequestHandler.bind(this)) this.http.addListener('upgrade', this.onUpgrade.bind(this)) } else if (WebSocketsSecure.exactMatch(ma)) { - this.https = https.createServer(this.httpsOptions ?? {}) + this.https = https.createServer(this.httpsOptions ?? {}, this.httpRequestHandler.bind(this)) this.https.addListener('upgrade', this.onUpgrade.bind(this)) + this.https.addListener('tlsClientError', this.onTLSClientError.bind(this)) } this.listeningMultiaddr = ma @@ -264,22 +266,24 @@ try { this.https = https.createServer({ ...this.httpsOptions, ...event.detail - }) + }, this.httpRequestHandler.bind(this)) this.https.addListener('upgrade', this.onUpgrade.bind(this)) + this.https.addListener('tlsClientError', this.onTLSClientError.bind(this)) + this.safeDispatchEvent('listening') } onCertificateRenew (event: CustomEvent): void { // stop accepting new connections this.https?.close() - this.https?.removeListener('upgrade', this.onUpgrade.bind(this)) - this.log('auto-tls certificate renews, restarting https server') + this.log('auto-tls certificate renewed, restarting https server') this.https = https.createServer({ ...this.httpsOptions, ...event.detail - }) + }, this.httpRequestHandler.bind(this)) this.https.addListener('upgrade', this.onUpgrade.bind(this)) + this.https.addListener('tlsClientError', this.onTLSClientError.bind(this)) } async close (): Promise { @@ -288,6 +292,9 @@ try { this.https?.close() this.wsServer.close() + // close all connections, must be done after closing the server to prevent + // race conditions where a new connection is accepted while we are closing + // the existing ones this.http?.closeAllConnections() this.https?.closeAllConnections() @@ -306,8 +313,6 @@ try { } getAddrs (): Multiaddr[] { - console.info('getting ws addresses: http:', Boolean(this.http), 'https:', Boolean(this.https)) - const multiaddrs: Multiaddr[] = [] const address = this.server.address() @@ -325,8 +330,8 @@ try { const protos = this.listeningMultiaddr.protos() - // Because TCP will only return the IPv6 version - // we need to capture from the passed multiaddr + // because TCP will only return the IPv6 version, we need to capture from + // the passed multiaddr if (protos.some(proto => proto.code === protocols('ip4').code)) { const wsProto = protos.some(proto => proto.code === protocols('ws').code) ? '/ws' : '/wss' let m = this.listeningMultiaddr.decapsulate('tcp') @@ -350,20 +355,22 @@ try { }) }) } else { - multiaddrs.push(m) - } - - if (this.https != null && WebSockets.exactMatch(m)) { - multiaddrs.push( - m.decapsulate('/ws').encapsulate('/tls/ws') - ) + if (this.https != null && WebSockets.exactMatch(m)) { + multiaddrs.push(m.decapsulate('/ws').encapsulate('/tls/ws')) + } else { + multiaddrs.push(m) + } } } - console.info('ws addresses:\n', multiaddrs.map(ma => ma.toString()).join('\n')) - return multiaddrs } + + private httpRequestHandler (req: http.IncomingMessage, res: http.ServerResponse): void { + res.writeHead(400) + res.write('Only WebSocket connections are supported') + res.end() + } } export function createListener (components: WebSocketListenerComponents, init: WebSocketListenerInit): Listener { diff --git a/packages/transport-websockets/test/node.ts b/packages/transport-websockets/test/node.ts index 1b2ca00b37..6b3f274cff 100644 --- a/packages/transport-websockets/test/node.ts +++ b/packages/transport-websockets/test/node.ts @@ -633,8 +633,7 @@ describe('auto-tls', () => { ws = webSockets({ websocket: { rejectUnauthorized: false - }, - autoTLS: true + } })({ events, logger: defaultLogger() From 17bd4fda09b5850c14487bcb8b91deb10e7b8090 Mon Sep 17 00:00:00 2001 From: achingbrain Date: Tue, 26 Nov 2024 15:58:07 +0000 Subject: [PATCH 5/6] chore: update tests --- packages/transport-websockets/src/listener.ts | 65 ++++++++++------ packages/transport-websockets/test/node.ts | 74 ++++++++++++++++++- 2 files changed, 112 insertions(+), 27 deletions(-) diff --git a/packages/transport-websockets/src/listener.ts b/packages/transport-websockets/src/listener.ts index f19cc2456c..aba19f51ad 100644 --- a/packages/transport-websockets/src/listener.ts +++ b/packages/transport-websockets/src/listener.ts @@ -4,7 +4,7 @@ import net from 'node:net' import os from 'node:os' import { TypedEventEmitter, setMaxListeners } from '@libp2p/interface' import { ipPortToMultiaddr as toMultiaddr } from '@libp2p/utils/ip-port-to-multiaddr' -import { multiaddr, protocols } from '@multiformats/multiaddr' +import { multiaddr } from '@multiformats/multiaddr' import { WebSockets, WebSocketsSecure } from '@multiformats/multiaddr-matcher' import duplex from 'it-ws/duplex' import { pEvent } from 'p-event' @@ -138,7 +138,6 @@ export class WebSocketListener extends TypedEventEmitter impleme } // store the socket so we can close it when the listener closes - // TODO: is this necessary if we can `this.https.closeAllConnections`? this.sockets.add(socket) socket.on('close', () => { this.sockets.delete(socket) @@ -233,7 +232,7 @@ export class WebSocketListener extends TypedEventEmitter impleme const { host, port } = ma.toOptions() this.addr = `${host}:${port}` - this.server.listen(port) + this.server.listen(port, host) await new Promise((resolve, reject) => { const onListening = (): void => { @@ -313,7 +312,6 @@ export class WebSocketListener extends TypedEventEmitter impleme } getAddrs (): Multiaddr[] { - const multiaddrs: Multiaddr[] = [] const address = this.server.address() if (address == null) { @@ -328,16 +326,10 @@ export class WebSocketListener extends TypedEventEmitter impleme throw new Error('Listener is not ready yet') } - const protos = this.listeningMultiaddr.protos() - - // because TCP will only return the IPv6 version, we need to capture from - // the passed multiaddr - if (protos.some(proto => proto.code === protocols('ip4').code)) { - const wsProto = protos.some(proto => proto.code === protocols('ws').code) ? '/ws' : '/wss' - let m = this.listeningMultiaddr.decapsulate('tcp') - m = m.encapsulate(`/tcp/${address.port}${wsProto}`) - const options = m.toOptions() + const options = this.listeningMultiaddr.toOptions() + const multiaddrs: Multiaddr[] = [] + if (options.family === 4) { if (options.host === '0.0.0.0') { Object.values(os.networkInterfaces()).forEach(niInfos => { if (niInfos == null) { @@ -346,24 +338,51 @@ export class WebSocketListener extends TypedEventEmitter impleme niInfos.forEach(ni => { if (ni.family === 'IPv4') { - multiaddrs.push(multiaddr(`/ip${options.family}/${ni.address}/${options.transport}/${options.port}/ws`)) + multiaddrs.push(multiaddr(`/ip${options.family}/${ni.address}/${options.transport}/${address.port}`)) + } + }) + }) + } else { + multiaddrs.push(multiaddr(`/ip${options.family}/${options.host}/${options.transport}/${address.port}`)) + } + } else if (options.family === 6) { + if (options.host === '::') { + Object.values(os.networkInterfaces()).forEach(niInfos => { + if (niInfos == null) { + return + } - if (this.https != null && WebSockets.exactMatch(m)) { - multiaddrs.push(multiaddr(`/ip${options.family}/${ni.address}/${options.transport}/${options.port}/tls/ws`)) - } + niInfos.forEach(ni => { + if (ni.family === 'IPv6') { + multiaddrs.push(multiaddr(`/ip${options.family}/${ni.address}/${options.transport}/${address.port}`)) } }) }) } else { - if (this.https != null && WebSockets.exactMatch(m)) { - multiaddrs.push(m.decapsulate('/ws').encapsulate('/tls/ws')) - } else { - multiaddrs.push(m) - } + multiaddrs.push(multiaddr(`/ip${options.family}/${options.host}/${options.transport}/${address.port}`)) } } - return multiaddrs + const insecureMultiaddrs: Multiaddr[] = [] + + if (this.http != null) { + multiaddrs.forEach(ma => { + insecureMultiaddrs.push(ma.encapsulate('/ws')) + }) + } + + const secureMultiaddrs: Multiaddr[] = [] + + if (this.https != null) { + multiaddrs.forEach(ma => { + secureMultiaddrs.push(ma.encapsulate('/tls/ws')) + }) + } + + return [ + ...insecureMultiaddrs, + ...secureMultiaddrs + ] } private httpRequestHandler (req: http.IncomingMessage, res: http.ServerResponse): void { diff --git a/packages/transport-websockets/test/node.ts b/packages/transport-websockets/test/node.ts index 6b3f274cff..6baae5a67d 100644 --- a/packages/transport-websockets/test/node.ts +++ b/packages/transport-websockets/test/node.ts @@ -76,7 +76,8 @@ describe('listen', () => { it('should error on starting two listeners on same address', async () => { listener = ws.createListener({ upgrader }) const dumbServer = http.createServer() - await new Promise(resolve => dumbServer.listen(ma.toOptions().port, resolve)) + const options = ma.toOptions() + await new Promise(resolve => dumbServer.listen(options.port, options.host, resolve)) await expect(listener.listen(ma)).to.eventually.rejectedWith('listen EADDRINUSE') await new Promise(resolve => dumbServer.close(() => { resolve() })) }) @@ -152,7 +153,7 @@ describe('listen', () => { }) it('getAddrs preserves p2p Id', async () => { - const ma = multiaddr('/ip4/127.0.0.1/tcp/47382/ws/p2p/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') + const ma = multiaddr('/ip4/127.0.0.1/tcp/47382/ws') listener = ws.createListener({ upgrader }) await listener.listen(ma) @@ -335,7 +336,7 @@ describe('dial', () => { describe('ip4 with wss', () => { let ws: Transport let listener: Listener - const ma = multiaddr('/ip4/127.0.0.1/tcp/37284/wss') + const ma = multiaddr('/ip4/127.0.0.1/tcp/37284/tls/ws') beforeEach(async () => { ws = webSockets({ @@ -614,7 +615,7 @@ describe('filter addrs', () => { }) }) -describe('auto-tls', () => { +describe('auto-tls (IPv4)', () => { let ws: Transport let listener: Listener let events: TypedEventEmitter @@ -652,7 +653,72 @@ describe('auto-tls', () => { const addrs = listener.getAddrs() expect(addrs).to.have.lengthOf(1) expect(WebSockets.exactMatch(addrs[0])).to.be.true() + const listeningPromise = pEvent(listener, 'listening') + + events.safeDispatchEvent('certificate:provision', { + detail: { + key: fs.readFileSync('./test/fixtures/key.pem', { + encoding: 'utf-8' + }), + cert: fs.readFileSync('./test/fixtures/certificate.pem', { + encoding: 'utf-8' + }) + } + }) + + await listeningPromise + + const addrs2 = listener.getAddrs() + expect(addrs2).to.have.lengthOf(2) + expect(WebSockets.exactMatch(addrs2[0])).to.be.true() + expect(WebSocketsSecure.exactMatch(addrs2[1])).to.be.true() + + const wsOptions = addrs2[0].toOptions() + const wssOptions = addrs2[1].toOptions() + + expect(wsOptions.host).to.equal(wssOptions.host) + expect(wsOptions.port).to.equal(wssOptions.port) + }) +}) + +describe('auto-tls (IPv6)', () => { + let ws: Transport + let listener: Listener + let events: TypedEventEmitter + const ma = multiaddr('/ip6/::1/tcp/37284/ws') + beforeEach(async () => { + events = new TypedEventEmitter() + + const upgrader = stubInterface({ + upgradeInbound: Sinon.stub().resolves(), + upgradeOutbound: async () => { + return stubInterface() + } + }) + + ws = webSockets({ + websocket: { + rejectUnauthorized: false + } + })({ + events, + logger: defaultLogger() + }) + listener = ws.createListener({ + upgrader + }) + await listener.listen(ma) + }) + + afterEach(async () => { + await listener.close() + }) + + it('should listen on wss after a certificate is found', async () => { + const addrs = listener.getAddrs() + expect(addrs).to.have.lengthOf(1) + expect(WebSockets.exactMatch(addrs[0])).to.be.true() const listeningPromise = pEvent(listener, 'listening') events.safeDispatchEvent('certificate:provision', { From 3ede04acd57e08abf3ef232756a4ee186f77ee74 Mon Sep 17 00:00:00 2001 From: achingbrain Date: Tue, 26 Nov 2024 16:29:54 +0000 Subject: [PATCH 6/6] chore: add metrics --- packages/transport-websockets/src/listener.ts | 42 +++++++++++++++---- 1 file changed, 35 insertions(+), 7 deletions(-) diff --git a/packages/transport-websockets/src/listener.ts b/packages/transport-websockets/src/listener.ts index aba19f51ad..20baead9f6 100644 --- a/packages/transport-websockets/src/listener.ts +++ b/packages/transport-websockets/src/listener.ts @@ -10,7 +10,7 @@ import duplex from 'it-ws/duplex' import { pEvent } from 'p-event' import * as ws from 'ws' import { socketToMaConn } from './socket-to-conn.js' -import type { ComponentLogger, Logger, Listener, ListenerEvents, CreateListenerOptions, CounterGroup, MetricGroup, Metrics, TLSCertificate, TypedEventTarget, Libp2pEvents, Upgrader } from '@libp2p/interface' +import type { ComponentLogger, Logger, Listener, ListenerEvents, CreateListenerOptions, CounterGroup, MetricGroup, Metrics, TLSCertificate, TypedEventTarget, Libp2pEvents, Upgrader, MultiaddrConnection } from '@libp2p/interface' import type { Multiaddr } from '@multiformats/multiaddr' import type { DuplexWebSocket } from 'it-ws/duplex' import type { EventEmitter } from 'node:events' @@ -115,6 +115,8 @@ export class WebSocketListener extends TypedEventEmitter impleme } async onSocketConnection (socket: net.Socket): Promise { + this.metrics.events?.increment({ [`${this.addr} connection`]: true }) + let buffer = socket.read(1) if (buffer == null) { @@ -139,15 +141,26 @@ export class WebSocketListener extends TypedEventEmitter impleme // store the socket so we can close it when the listener closes this.sockets.add(socket) + socket.on('close', () => { + this.metrics.events?.increment({ [`${this.addr} close`]: true }) this.sockets.delete(socket) }) socket.on('error', (err) => { this.log.error('socket error - %e', err) + this.metrics.events?.increment({ [`${this.addr} error`]: true }) socket.destroy() }) + socket.once('timeout', () => { + this.metrics.events?.increment({ [`${this.addr} timeout`]: true }) + }) + + socket.once('end', () => { + this.metrics.events?.increment({ [`${this.addr} end`]: true }) + }) + // re-queue first data chunk socket.unshift(buffer) @@ -184,13 +197,22 @@ export class WebSocketListener extends TypedEventEmitter impleme localPort: addr.port } - const maConn = socketToMaConn(stream, toMultiaddr(stream.remoteAddress ?? '', stream.remotePort ?? 0), { - logger: this.logger, - metrics: this.metrics?.events, - metricPrefix: `${this.addr} ` - }) - this.log('new inbound connection %s', maConn.remoteAddr) + let maConn: MultiaddrConnection + try { + maConn = socketToMaConn(stream, toMultiaddr(stream.remoteAddress ?? '', stream.remotePort ?? 0), { + logger: this.logger, + metrics: this.metrics?.events, + metricPrefix: `${this.addr} ` + }) + } catch (err: any) { + this.log.error('inbound connection failed', err) + this.metrics.errors?.increment({ [`${this.addr} inbound_to_connection`]: true }) + socket.close() + return + } + + this.log('new inbound connection %s', maConn.remoteAddr) const signal = AbortSignal.timeout(this.inboundConnectionUpgradeTimeout) setMaxListeners(Infinity, signal) @@ -240,16 +262,22 @@ export class WebSocketListener extends TypedEventEmitter impleme resolve() } const onError = (err: Error): void => { + this.metrics.errors?.increment({ [`${this.addr} listen_error`]: true }) removeListeners() reject(err) } + const onDrop = (): void => { + this.metrics.events?.increment({ [`${this.addr} drop`]: true }) + } const removeListeners = (): void => { this.server.removeListener('listening', onListening) this.server.removeListener('error', onError) + this.server.removeListener('drop', onDrop) } this.server.addListener('listening', onListening) this.server.addListener('error', onError) + this.server.addListener('drop', onDrop) }) this.safeDispatchEvent('listening')