diff --git a/packages/transport-websockets/package.json b/packages/transport-websockets/package.json index f2270fba8b..74c478ea2d 100644 --- a/packages/transport-websockets/package.json +++ b/packages/transport-websockets/package.json @@ -78,10 +78,11 @@ "@libp2p/utils": "^6.2.1", "@multiformats/multiaddr": "^12.2.3", "@multiformats/multiaddr-matcher": "^1.4.0", - "@multiformats/multiaddr-to-uri": "^10.0.1", + "@multiformats/multiaddr-to-uri": "^11.0.0", "@types/ws": "^8.5.10", "it-ws": "^6.1.1", "p-defer": "^4.0.1", + "p-event": "^6.0.1", "progress-events": "^1.0.0", "race-signal": "^1.0.2", "ws": "^8.17.0" diff --git a/packages/transport-websockets/src/index.ts b/packages/transport-websockets/src/index.ts index 7eea0727e8..4fca358400 100644 --- a/packages/transport-websockets/src/index.ts +++ b/packages/transport-websockets/src/index.ts @@ -32,10 +32,11 @@ import { raceSignal } from 'race-signal' import * as filters from './filters.js' import { createListener } from './listener.js' import { socketToMaConn } from './socket-to-conn.js' -import type { Transport, MultiaddrFilter, CreateListenerOptions, DialTransportOptions, Listener, AbortOptions, ComponentLogger, Logger, Connection, OutboundConnectionUpgradeEvents, Metrics, CounterGroup } from '@libp2p/interface' +import type { Transport, MultiaddrFilter, CreateListenerOptions, DialTransportOptions, Listener, AbortOptions, ComponentLogger, Logger, Connection, OutboundConnectionUpgradeEvents, Metrics, CounterGroup, TypedEventTarget, Libp2pEvents } from '@libp2p/interface' import type { Multiaddr } from '@multiformats/multiaddr' -import type { Server } from 'http' import type { DuplexWebSocket } from 'it-ws/duplex' +import type http from 'node:http' +import type https from 'node:https' import type { ProgressEvent } from 'progress-events' import type { ClientOptions } from 'ws' @@ -44,12 +45,34 @@ export interface WebSocketsInit extends AbortOptions, WebSocketOptions { * @deprecated Use a ConnectionGater instead */ filter?: MultiaddrFilter + + /** + * Options used to create WebSockets + */ websocket?: ClientOptions - server?: Server + + /** + * Options used to create the HTTP server + */ + http?: http.ServerOptions + + /** + * Options used to create the HTTPs server. `options.http` will be used if + * unspecified. + */ + https?: https.ServerOptions + + /** + * Inbound connections must complete their upgrade within this many ms + * + * @default 5000 + */ + inboundConnectionUpgradeTimeout?: number } export interface WebSocketsComponents { logger: ComponentLogger + events: TypedEventTarget metrics?: Metrics } @@ -63,12 +86,12 @@ export type WebSocketsDialEvents = class WebSockets implements Transport { private readonly log: Logger - private readonly init?: WebSocketsInit + private readonly init: WebSocketsInit private readonly logger: ComponentLogger private readonly metrics?: WebSocketsMetrics private readonly components: WebSocketsComponents - constructor (components: WebSocketsComponents, init?: WebSocketsInit) { + constructor (components: WebSocketsComponents, init: WebSocketsInit = {}) { this.log = components.logger.forComponent('libp2p:websockets') this.logger = components.logger this.components = components @@ -148,13 +171,14 @@ class WebSockets implements Transport { } /** - * Creates a Websockets listener. The provided `handler` function will be called + * Creates a WebSockets listener. The provided `handler` function will be called * anytime a new incoming Connection has been successfully upgraded via * `upgrader.upgradeInbound` */ createListener (options: CreateListenerOptions): Listener { return createListener({ logger: this.logger, + events: this.components.events, metrics: this.components.metrics }, { ...this.init, @@ -163,7 +187,7 @@ class WebSockets implements Transport { } /** - * Takes a list of `Multiaddr`s and returns only valid Websockets addresses. + * Takes a list of `Multiaddr`s and returns only valid WebSockets addresses. * By default, in a browser environment only DNS+WSS multiaddr is accepted, * while in a Node.js environment DNS+{WS, WSS} multiaddrs are accepted. */ diff --git a/packages/transport-websockets/src/listener.ts b/packages/transport-websockets/src/listener.ts index b941aa071a..20baead9f6 100644 --- a/packages/transport-websockets/src/listener.ts +++ b/packages/transport-websockets/src/listener.ts @@ -1,145 +1,345 @@ -import os from 'os' -import { TypedEventEmitter } from '@libp2p/interface' +import http from 'node:http' +import https from 'node:https' +import net from 'node:net' +import os from 'node:os' +import { TypedEventEmitter, setMaxListeners } from '@libp2p/interface' import { ipPortToMultiaddr as toMultiaddr } from '@libp2p/utils/ip-port-to-multiaddr' -import { multiaddr, protocols } from '@multiformats/multiaddr' -import { createServer } from 'it-ws/server' +import { multiaddr } from '@multiformats/multiaddr' +import { WebSockets, WebSocketsSecure } from '@multiformats/multiaddr-matcher' +import duplex from 'it-ws/duplex' +import { pEvent } from 'p-event' +import * as ws from 'ws' import { socketToMaConn } from './socket-to-conn.js' -import type { ComponentLogger, Logger, Listener, ListenerEvents, CreateListenerOptions, CounterGroup, MetricGroup, Metrics } from '@libp2p/interface' +import type { ComponentLogger, Logger, Listener, ListenerEvents, CreateListenerOptions, CounterGroup, MetricGroup, Metrics, TLSCertificate, TypedEventTarget, Libp2pEvents, Upgrader, MultiaddrConnection } from '@libp2p/interface' import type { Multiaddr } from '@multiformats/multiaddr' -import type { Server } from 'http' import type { DuplexWebSocket } from 'it-ws/duplex' -import type { WebSocketServer } from 'it-ws/server' +import type { EventEmitter } from 'node:events' +import type { Server } from 'node:http' +import type { Duplex } from 'node:stream' +import type tls from 'node:tls' export interface WebSocketListenerComponents { logger: ComponentLogger + events: TypedEventTarget metrics?: Metrics } export interface WebSocketListenerInit extends CreateListenerOptions { server?: Server + inboundConnectionUpgradeTimeout?: number + cert?: string + key?: string + http?: http.ServerOptions + https?: http.ServerOptions } export interface WebSocketListenerMetrics { - status: MetricGroup - errors: CounterGroup - events: CounterGroup + status?: MetricGroup + errors?: CounterGroup + events?: CounterGroup } -class WebSocketListener extends TypedEventEmitter implements Listener { - private readonly connections: Set - private listeningMultiaddr?: Multiaddr - private readonly server: WebSocketServer +export class WebSocketListener extends TypedEventEmitter implements Listener { private readonly log: Logger - private metrics?: WebSocketListenerMetrics - private addr: string + private readonly logger: ComponentLogger + private readonly server: net.Server + private readonly wsServer: ws.WebSocketServer + private readonly metrics: WebSocketListenerMetrics + private readonly sockets: Set + private readonly upgrader: Upgrader + private readonly inboundConnectionUpgradeTimeout: number + private readonly httpOptions?: http.ServerOptions + private readonly httpsOptions?: https.ServerOptions + private http?: http.Server + private https?: https.Server + private addr?: string + private listeningMultiaddr?: Multiaddr constructor (components: WebSocketListenerComponents, init: WebSocketListenerInit) { super() this.log = components.logger.forComponent('libp2p:websockets:listener') - const metrics = components.metrics - // Keep track of open connections to destroy when the listener is closed - this.connections = new Set() + this.logger = components.logger + this.upgrader = init.upgrader + this.httpOptions = init.http + this.httpsOptions = init.https ?? init.http + this.inboundConnectionUpgradeTimeout = init.inboundConnectionUpgradeTimeout ?? 5000 + this.sockets = new Set() - const self = this // eslint-disable-line @typescript-eslint/no-this-alias + this.wsServer = new ws.WebSocketServer({ + noServer: true + }) + this.wsServer.addListener('connection', this.onWsServerConnection.bind(this)) - this.addr = 'unknown' + components.metrics?.registerMetricGroup('libp2p_websockets_inbound_connections_total', { + label: 'address', + help: 'Current active connections in WebSocket listener', + calculate: () => { + if (this.addr == null) { + return {} + } - this.server = createServer({ - ...init, - onConnection: (stream: DuplexWebSocket) => { - const maConn = socketToMaConn(stream, toMultiaddr(stream.remoteAddress ?? '', stream.remotePort ?? 0), { - logger: components.logger, - metrics: this.metrics?.events, - metricPrefix: `${this.addr} ` - }) - this.log('new inbound connection %s', maConn.remoteAddr) + return { + [this.addr]: this.sockets.size + } + } + }) - this.connections.add(stream) + this.metrics = { + status: components.metrics?.registerMetricGroup('libp2p_websockets_listener_status_info', { + label: 'address', + help: 'Current status of the WebSocket listener socket' + }), + errors: components.metrics?.registerMetricGroup('libp2p_websockets_listener_errors_total', { + label: 'address', + help: 'Total count of WebSocket listener errors by type' + }), + events: components.metrics?.registerMetricGroup('libp2p_websockets_listener_events_total', { + label: 'address', + help: 'Total count of WebSocket listener events by type' + }) + } - stream.socket.on('close', function () { - self.connections.delete(stream) + this.server = net.createServer({ + pauseOnConnect: true + }, (socket) => { + this.onSocketConnection(socket) + .catch(err => { + this.log.error('error handling socket - %e', err) + socket.destroy() }) + }) - init.upgrader.upgradeInbound(maConn) - .catch(async err => { - this.log.error('inbound connection failed to upgrade', err) - this.metrics?.errors.increment({ [`${this.addr} inbound_upgrade`]: true }) + components.events.addEventListener('certificate:provision', this.onCertificateProvision.bind(this)) + components.events.addEventListener('certificate:renew', this.onCertificateRenew.bind(this)) + } - try { - maConn.abort(err) - } catch (err) { - this.log.error('inbound connection failed to close after upgrade failed - %e', err) - this.metrics?.errors.increment({ [`${this.addr} inbound_closing_failed`]: true }) - } - }) - } - }) + async onSocketConnection (socket: net.Socket): Promise { + this.metrics.events?.increment({ [`${this.addr} connection`]: true }) - this.server.on('listening', () => { - if (metrics != null) { - const { host, port } = this.listeningMultiaddr?.toOptions() ?? {} - this.addr = `${host}:${port}` - - metrics.registerMetricGroup('libp2p_websockets_inbound_connections_total', { - label: 'address', - help: 'Current active connections in WebSocket listener', - calculate: () => { - return { - [this.addr]: this.connections.size - } - } - }) + let buffer = socket.read(1) - this.metrics = { - status: metrics?.registerMetricGroup('libp2p_websockets_listener_status_info', { - label: 'address', - help: 'Current status of the WebSocket listener socket' - }), - errors: metrics?.registerMetricGroup('libp2p_websockets_listener_errors_total', { - label: 'address', - help: 'Total count of WebSocket listener errors by type' - }), - events: metrics?.registerMetricGroup('libp2p_websockets_listener_events_total', { - label: 'address', - help: 'Total count of WebSocket listener events by type' - }) - } - } - this.dispatchEvent(new CustomEvent('listening')) + if (buffer == null) { + await pEvent(socket, 'readable') + buffer = socket.read(1) + } + + // determine if this is an HTTP(s) request + const byte = buffer[0] + let server: EventEmitter | undefined = this.http + + // https://github.com/mscdex/httpolyglot/blob/1c6c4af65f4cf95a32c918d0fdcc532e0c095740/lib/index.js#L92 + if (byte < 32 || byte >= 127) { + server = this.https + } + + if (server == null) { + this.log.error('no appropriate listener configured for byte %d', byte) + socket.destroy() + return + } + + // store the socket so we can close it when the listener closes + this.sockets.add(socket) + + socket.on('close', () => { + this.metrics.events?.increment({ [`${this.addr} close`]: true }) + this.sockets.delete(socket) }) - this.server.on('error', (err: Error) => { - this.metrics?.errors.increment({ [`${this.addr} listen_error`]: true }) - this.dispatchEvent(new CustomEvent('error', { - detail: err - })) + + socket.on('error', (err) => { + this.log.error('socket error - %e', err) + this.metrics.events?.increment({ [`${this.addr} error`]: true }) + socket.destroy() }) - this.server.on('close', () => { - this.dispatchEvent(new CustomEvent('close')) + + socket.once('timeout', () => { + this.metrics.events?.increment({ [`${this.addr} timeout`]: true }) }) + + socket.once('end', () => { + this.metrics.events?.increment({ [`${this.addr} end`]: true }) + }) + + // re-queue first data chunk + socket.unshift(buffer) + + // hand the socket off to the appropriate server + server.emit('connection', socket) } - async close (): Promise { - await Promise.all( - Array.from(this.connections).map(async maConn => { await maConn.close() }) - ) + onWsServerConnection (socket: ws.WebSocket, req: http.IncomingMessage): void { + let addr: string | ws.AddressInfo | null + + try { + addr = this.server.address() - if (this.server.address() == null) { - // not listening, close will throw an error + if (typeof addr === 'string') { + throw new Error('Cannot listen on unix sockets') + } + + if (addr == null) { + throw new Error('Server was closing or not running') + } + } catch (err: any) { + this.log.error('error obtaining remote socket address - %e', err) + req.destroy(err) + socket.close() + return + } + + const stream: DuplexWebSocket = { + ...duplex(socket, { + remoteAddress: req.socket.remoteAddress ?? '0.0.0.0', + remotePort: req.socket.remotePort ?? 0 + }), + localAddress: addr.address, + localPort: addr.port + } + + let maConn: MultiaddrConnection + + try { + maConn = socketToMaConn(stream, toMultiaddr(stream.remoteAddress ?? '', stream.remotePort ?? 0), { + logger: this.logger, + metrics: this.metrics?.events, + metricPrefix: `${this.addr} ` + }) + } catch (err: any) { + this.log.error('inbound connection failed', err) + this.metrics.errors?.increment({ [`${this.addr} inbound_to_connection`]: true }) + socket.close() return } - await this.server.close() + this.log('new inbound connection %s', maConn.remoteAddr) + const signal = AbortSignal.timeout(this.inboundConnectionUpgradeTimeout) + setMaxListeners(Infinity, signal) + + this.upgrader.upgradeInbound(maConn, { + signal + }) + .catch(async err => { + this.log.error('inbound connection failed to upgrade - %e', err) + this.metrics.errors?.increment({ [`${this.addr} inbound_upgrade`]: true }) + + await maConn.close() + .catch(err => { + this.log.error('inbound connection failed to close after upgrade failed', err) + this.metrics.errors?.increment({ [`${this.addr} inbound_closing_failed`]: true }) + }) + }) + } + + onUpgrade (req: http.IncomingMessage, socket: Duplex, head: Buffer): void { + this.wsServer.handleUpgrade(req, socket, head, this.onWsServerConnection.bind(this)) + } + + onTLSClientError (err: Error, socket: tls.TLSSocket): void { + this.log.error('TLS client error - %e', err) + socket.destroy() } async listen (ma: Multiaddr): Promise { + if (WebSockets.exactMatch(ma)) { + this.http = http.createServer(this.httpOptions ?? {}, this.httpRequestHandler.bind(this)) + this.http.addListener('upgrade', this.onUpgrade.bind(this)) + } else if (WebSocketsSecure.exactMatch(ma)) { + this.https = https.createServer(this.httpsOptions ?? {}, this.httpRequestHandler.bind(this)) + this.https.addListener('upgrade', this.onUpgrade.bind(this)) + this.https.addListener('tlsClientError', this.onTLSClientError.bind(this)) + } + this.listeningMultiaddr = ma + const { host, port } = ma.toOptions() + this.addr = `${host}:${port}` + + this.server.listen(port, host) + + await new Promise((resolve, reject) => { + const onListening = (): void => { + removeListeners() + resolve() + } + const onError = (err: Error): void => { + this.metrics.errors?.increment({ [`${this.addr} listen_error`]: true }) + removeListeners() + reject(err) + } + const onDrop = (): void => { + this.metrics.events?.increment({ [`${this.addr} drop`]: true }) + } + const removeListeners = (): void => { + this.server.removeListener('listening', onListening) + this.server.removeListener('error', onError) + this.server.removeListener('drop', onDrop) + } + + this.server.addListener('listening', onListening) + this.server.addListener('error', onError) + this.server.addListener('drop', onDrop) + }) + + this.safeDispatchEvent('listening') + } + + onCertificateProvision (event: CustomEvent): void { + if (this.https != null) { + this.log('auto-tls certificate found but already listening on https') + return + } + + this.log('auto-tls certificate found, starting https server') + this.https = https.createServer({ + ...this.httpsOptions, + ...event.detail + }, this.httpRequestHandler.bind(this)) + this.https.addListener('upgrade', this.onUpgrade.bind(this)) + this.https.addListener('tlsClientError', this.onTLSClientError.bind(this)) + + this.safeDispatchEvent('listening') + } + + onCertificateRenew (event: CustomEvent): void { + // stop accepting new connections + this.https?.close() + + this.log('auto-tls certificate renewed, restarting https server') + this.https = https.createServer({ + ...this.httpsOptions, + ...event.detail + }, this.httpRequestHandler.bind(this)) + this.https.addListener('upgrade', this.onUpgrade.bind(this)) + this.https.addListener('tlsClientError', this.onTLSClientError.bind(this)) + } - await this.server.listen(ma.toOptions()) + async close (): Promise { + this.server.close() + this.http?.close() + this.https?.close() + this.wsServer.close() + + // close all connections, must be done after closing the server to prevent + // race conditions where a new connection is accepted while we are closing + // the existing ones + this.http?.closeAllConnections() + this.https?.closeAllConnections() + + ;[...this.sockets].forEach(socket => { + socket.destroy() + }) + + await Promise.all([ + pEvent(this.server, 'close'), + this.http == null ? null : pEvent(this.http, 'close'), + this.https == null ? null : pEvent(this.https, 'close'), + pEvent(this.wsServer, 'close') + ]) + + this.safeDispatchEvent('close') } getAddrs (): Multiaddr[] { - const multiaddrs = [] const address = this.server.address() if (address == null) { @@ -154,38 +354,69 @@ class WebSocketListener extends TypedEventEmitter implements Lis throw new Error('Listener is not ready yet') } - const ipfsId = this.listeningMultiaddr.getPeerId() - const protos = this.listeningMultiaddr.protos() - - // Because TCP will only return the IPv6 version - // we need to capture from the passed multiaddr - if (protos.some(proto => proto.code === protocols('ip4').code)) { - const wsProto = protos.some(proto => proto.code === protocols('ws').code) ? '/ws' : '/wss' - let m = this.listeningMultiaddr.decapsulate('tcp') - m = m.encapsulate(`/tcp/${address.port}${wsProto}`) - if (ipfsId != null) { - m = m.encapsulate(`/p2p/${ipfsId}`) - } + const options = this.listeningMultiaddr.toOptions() + const multiaddrs: Multiaddr[] = [] - if (m.toString().includes('0.0.0.0')) { - const netInterfaces = os.networkInterfaces() - Object.values(netInterfaces).forEach(niInfos => { + if (options.family === 4) { + if (options.host === '0.0.0.0') { + Object.values(os.networkInterfaces()).forEach(niInfos => { if (niInfos == null) { return } niInfos.forEach(ni => { if (ni.family === 'IPv4') { - multiaddrs.push(multiaddr(m.toString().replace('0.0.0.0', ni.address))) + multiaddrs.push(multiaddr(`/ip${options.family}/${ni.address}/${options.transport}/${address.port}`)) } }) }) } else { - multiaddrs.push(m) + multiaddrs.push(multiaddr(`/ip${options.family}/${options.host}/${options.transport}/${address.port}`)) } + } else if (options.family === 6) { + if (options.host === '::') { + Object.values(os.networkInterfaces()).forEach(niInfos => { + if (niInfos == null) { + return + } + + niInfos.forEach(ni => { + if (ni.family === 'IPv6') { + multiaddrs.push(multiaddr(`/ip${options.family}/${ni.address}/${options.transport}/${address.port}`)) + } + }) + }) + } else { + multiaddrs.push(multiaddr(`/ip${options.family}/${options.host}/${options.transport}/${address.port}`)) + } + } + + const insecureMultiaddrs: Multiaddr[] = [] + + if (this.http != null) { + multiaddrs.forEach(ma => { + insecureMultiaddrs.push(ma.encapsulate('/ws')) + }) + } + + const secureMultiaddrs: Multiaddr[] = [] + + if (this.https != null) { + multiaddrs.forEach(ma => { + secureMultiaddrs.push(ma.encapsulate('/tls/ws')) + }) } - return multiaddrs + return [ + ...insecureMultiaddrs, + ...secureMultiaddrs + ] + } + + private httpRequestHandler (req: http.IncomingMessage, res: http.ServerResponse): void { + res.writeHead(400) + res.write('Only WebSocket connections are supported') + res.end() } } diff --git a/packages/transport-websockets/test/browser.ts b/packages/transport-websockets/test/browser.ts index d188bd4fd8..576bd131ee 100644 --- a/packages/transport-websockets/test/browser.ts +++ b/packages/transport-websockets/test/browser.ts @@ -1,5 +1,6 @@ /* eslint-env mocha */ +import { TypedEventEmitter } from '@libp2p/interface' import { defaultLogger } from '@libp2p/logger' import { expect } from 'aegir/chai' import { webSockets } from '../src/index.js' @@ -7,6 +8,7 @@ import { webSockets } from '../src/index.js' describe('libp2p-websockets', () => { it('.createServer throws in browser', () => { expect(webSockets()({ + events: new TypedEventEmitter(), logger: defaultLogger() }).createListener).to.throw() }) diff --git a/packages/transport-websockets/test/node.ts b/packages/transport-websockets/test/node.ts index 7497be4565..6baae5a67d 100644 --- a/packages/transport-websockets/test/node.ts +++ b/packages/transport-websockets/test/node.ts @@ -1,24 +1,27 @@ /* eslint-env mocha */ /* eslint max-nested-callbacks: ["error", 6] */ -import fs from 'fs' -import http from 'http' -import https from 'https' +import fs from 'node:fs' +import http from 'node:http' +import { TypedEventEmitter } from '@libp2p/interface' import { defaultLogger } from '@libp2p/logger' import { multiaddr } from '@multiformats/multiaddr' +import { WebSockets, WebSocketsSecure } from '@multiformats/multiaddr-matcher' import { expect } from 'aegir/chai' import { isLoopbackAddr } from 'is-loopback-addr' +import { pEvent } from 'p-event' import pWaitFor from 'p-wait-for' import Sinon from 'sinon' import { stubInterface } from 'sinon-ts' import * as filters from '../src/filters.js' import { webSockets } from '../src/index.js' -import type { Connection, Listener, Transport, Upgrader } from '@libp2p/interface' +import type { Connection, Libp2pEvents, Listener, Transport, Upgrader, TLSCertificate } from '@libp2p/interface' import type { StubbedInstance } from 'sinon-ts' describe('instantiate the transport', () => { it('create', () => { const ws = webSockets()({ + events: new TypedEventEmitter(), logger: defaultLogger() }) expect(ws).to.exist() @@ -46,6 +49,7 @@ describe('listen', () => { beforeEach(() => { ws = webSockets()({ + events: new TypedEventEmitter(), logger: defaultLogger() }) }) @@ -72,7 +76,8 @@ describe('listen', () => { it('should error on starting two listeners on same address', async () => { listener = ws.createListener({ upgrader }) const dumbServer = http.createServer() - await new Promise(resolve => dumbServer.listen(ma.toOptions().port, resolve)) + const options = ma.toOptions() + await new Promise(resolve => dumbServer.listen(options.port, options.host, resolve)) await expect(listener.listen(ma)).to.eventually.rejectedWith('listen EADDRINUSE') await new Promise(resolve => dumbServer.close(() => { resolve() })) }) @@ -148,7 +153,7 @@ describe('listen', () => { }) it('getAddrs preserves p2p Id', async () => { - const ma = multiaddr('/ip4/127.0.0.1/tcp/47382/ws/p2p/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') + const ma = multiaddr('/ip4/127.0.0.1/tcp/47382/ws') listener = ws.createListener({ upgrader }) await listener.listen(ma) @@ -164,6 +169,7 @@ describe('listen', () => { beforeEach(() => { ws = webSockets()({ + events: new TypedEventEmitter(), logger: defaultLogger() }) }) @@ -225,6 +231,7 @@ describe('dial', () => { beforeEach(async () => { ws = webSockets()({ + events: new TypedEventEmitter(), logger: defaultLogger() }) listener = ws.createListener({ @@ -264,6 +271,7 @@ describe('dial', () => { it('should resolve port 0', async () => { const ma = multiaddr('/ip4/127.0.0.1/tcp/0/ws') const ws = webSockets()({ + events: new TypedEventEmitter(), logger: defaultLogger() }) @@ -295,6 +303,7 @@ describe('dial', () => { beforeEach(async () => { ws = webSockets()({ + events: new TypedEventEmitter(), logger: defaultLogger() }) listener = ws.createListener({ @@ -327,15 +336,19 @@ describe('dial', () => { describe('ip4 with wss', () => { let ws: Transport let listener: Listener - const ma = multiaddr('/ip4/127.0.0.1/tcp/37284/wss') - let server: https.Server + const ma = multiaddr('/ip4/127.0.0.1/tcp/37284/tls/ws') beforeEach(async () => { - server = https.createServer({ - cert: fs.readFileSync('./test/fixtures/certificate.pem'), - key: fs.readFileSync('./test/fixtures/key.pem') - }) - ws = webSockets({ websocket: { rejectUnauthorized: false }, server })({ + ws = webSockets({ + websocket: { + rejectUnauthorized: false + }, + https: { + cert: fs.readFileSync('./test/fixtures/certificate.pem'), + key: fs.readFileSync('./test/fixtures/key.pem') + } + })({ + events: new TypedEventEmitter(), logger: defaultLogger() }) listener = ws.createListener({ @@ -346,8 +359,6 @@ describe('dial', () => { afterEach(async () => { await listener.close() - server.close() - server.closeAllConnections() }) it('should listen on wss address', () => { @@ -370,6 +381,7 @@ describe('dial', () => { beforeEach(async () => { ws = webSockets()({ + events: new TypedEventEmitter(), logger: defaultLogger() }) listener = ws.createListener({ @@ -401,6 +413,7 @@ describe('filter addrs', () => { describe('default filter addrs with only dns', () => { before(() => { ws = webSockets()({ + events: new TypedEventEmitter(), logger: defaultLogger() }) }) @@ -471,6 +484,7 @@ describe('filter addrs', () => { describe('custom filter addrs', () => { before(() => { ws = webSockets({ filter: filters.all })({ + events: new TypedEventEmitter(), logger: defaultLogger() }) }) @@ -600,3 +614,135 @@ describe('filter addrs', () => { }) }) }) + +describe('auto-tls (IPv4)', () => { + let ws: Transport + let listener: Listener + let events: TypedEventEmitter + const ma = multiaddr('/ip4/127.0.0.1/tcp/37284/ws') + + beforeEach(async () => { + events = new TypedEventEmitter() + + const upgrader = stubInterface({ + upgradeInbound: Sinon.stub().resolves(), + upgradeOutbound: async () => { + return stubInterface() + } + }) + + ws = webSockets({ + websocket: { + rejectUnauthorized: false + } + })({ + events, + logger: defaultLogger() + }) + listener = ws.createListener({ + upgrader + }) + await listener.listen(ma) + }) + + afterEach(async () => { + await listener.close() + }) + + it('should listen on wss after a certificate is found', async () => { + const addrs = listener.getAddrs() + expect(addrs).to.have.lengthOf(1) + expect(WebSockets.exactMatch(addrs[0])).to.be.true() + const listeningPromise = pEvent(listener, 'listening') + + events.safeDispatchEvent('certificate:provision', { + detail: { + key: fs.readFileSync('./test/fixtures/key.pem', { + encoding: 'utf-8' + }), + cert: fs.readFileSync('./test/fixtures/certificate.pem', { + encoding: 'utf-8' + }) + } + }) + + await listeningPromise + + const addrs2 = listener.getAddrs() + expect(addrs2).to.have.lengthOf(2) + expect(WebSockets.exactMatch(addrs2[0])).to.be.true() + expect(WebSocketsSecure.exactMatch(addrs2[1])).to.be.true() + + const wsOptions = addrs2[0].toOptions() + const wssOptions = addrs2[1].toOptions() + + expect(wsOptions.host).to.equal(wssOptions.host) + expect(wsOptions.port).to.equal(wssOptions.port) + }) +}) + +describe('auto-tls (IPv6)', () => { + let ws: Transport + let listener: Listener + let events: TypedEventEmitter + const ma = multiaddr('/ip6/::1/tcp/37284/ws') + + beforeEach(async () => { + events = new TypedEventEmitter() + + const upgrader = stubInterface({ + upgradeInbound: Sinon.stub().resolves(), + upgradeOutbound: async () => { + return stubInterface() + } + }) + + ws = webSockets({ + websocket: { + rejectUnauthorized: false + } + })({ + events, + logger: defaultLogger() + }) + listener = ws.createListener({ + upgrader + }) + await listener.listen(ma) + }) + + afterEach(async () => { + await listener.close() + }) + + it('should listen on wss after a certificate is found', async () => { + const addrs = listener.getAddrs() + expect(addrs).to.have.lengthOf(1) + expect(WebSockets.exactMatch(addrs[0])).to.be.true() + const listeningPromise = pEvent(listener, 'listening') + + events.safeDispatchEvent('certificate:provision', { + detail: { + key: fs.readFileSync('./test/fixtures/key.pem', { + encoding: 'utf-8' + }), + cert: fs.readFileSync('./test/fixtures/certificate.pem', { + encoding: 'utf-8' + }) + } + }) + + await listeningPromise + + const addrs2 = listener.getAddrs() + expect(addrs2).to.have.lengthOf(2) + expect(WebSockets.exactMatch(addrs2[0])).to.be.true() + expect(WebSocketsSecure.exactMatch(addrs2[1])).to.be.true() + + const wsOptions = addrs2[0].toOptions() + const wssOptions = addrs2[1].toOptions() + + expect(wsOptions.host).to.equal(wssOptions.host) + expect(wsOptions.port).to.equal(wssOptions.port) + }) +})