diff --git a/src/websockets/WebSocketServer.ts b/src/websockets/WebSocketServer.ts index a9df35d92..43bd31ffe 100644 --- a/src/websockets/WebSocketServer.ts +++ b/src/websockets/WebSocketServer.ts @@ -2,39 +2,21 @@ import type { ReadableStreamController, WritableStreamDefaultController, } from 'stream/web'; -import type { - HttpRequest, - HttpResponse, - us_socket_context_t, - WebSocket, -} from 'uWebSockets.js'; -import type { FileSystem, JSONValue, PromiseDeconstructed } from '../types'; +import type { JSONValue } from '../types'; import type { TLSConfig } from '../network/types'; +import type { IncomingMessage, ServerResponse } from 'http'; import { WritableStream, ReadableStream } from 'stream/web'; -import path from 'path'; -import os from 'os'; -import { startStop } from '@matrixai/async-init'; +import https from 'https'; +import { startStop, status } from '@matrixai/async-init'; import Logger from '@matrixai/logger'; -import uWebsocket from 'uWebSockets.js'; +import * as ws from 'ws'; import WebSocketStream from './WebSocketStream'; import * as webSocketErrors from './errors'; import * as webSocketEvents from './events'; -import { promise } from '../utils'; +import { never, promise } from '../utils'; type ConnectionCallback = (streamPair: WebSocketStream) => void; -type Context = { - message: ( - ws: WebSocket, - message: ArrayBuffer, - isBinary: boolean, - ) => void; - drain: (ws: WebSocket) => void; - close: (ws: WebSocket, code: number, message: ArrayBuffer) => void; - pong: (ws: WebSocket, message: ArrayBuffer) => void; - logger: Logger; -}; - /** * Events: * - start @@ -57,7 +39,6 @@ class WebSocketServer extends EventTarget { * Default is 1,000 milliseconds. * @param obj.pingTimeoutTimeTime - Time before connection is cleaned up after no ping responses. * Default is 10,000 milliseconds. - * @param obj.fs - FileSystem interface used for creating files. * @param obj.maxReadableStreamBytes - The number of bytes the readable stream will buffer until pausing. * @param obj.logger */ @@ -70,8 +51,6 @@ class WebSocketServer extends EventTarget { maxIdleTimeout = 120, pingIntervalTime = 1_000, pingTimeoutTimeTime = 10_000, - fs = require('fs'), - maxReadableStreamBytes = 1_000_000_000, // About 1 GB logger = new Logger(this.name), }: { connectionCallback: ConnectionCallback; @@ -82,15 +61,11 @@ class WebSocketServer extends EventTarget { maxIdleTimeout?: number; pingIntervalTime?: number; pingTimeoutTimeTime?: number; - fs?: FileSystem; - maxReadableStreamBytes?: number; logger?: Logger; }) { logger.info(`Creating ${this.name}`); const wsServer = new this( logger, - fs, - maxReadableStreamBytes, maxIdleTimeout, pingIntervalTime, pingTimeoutTimeTime, @@ -106,20 +81,18 @@ class WebSocketServer extends EventTarget { return wsServer; } - protected server: uWebsocket.TemplatedApp; - protected listenSocket: uWebsocket.us_listen_socket; + protected server: https.Server; + protected webSocketServer: ws.WebSocketServer; protected _port: number; protected _host: string; protected connectionEventHandler: ( event: webSocketEvents.ConnectionEvent, ) => void; protected activeSockets: Set = new Set(); - protected connectionIndex: number = 0; /** * * @param logger - * @param fs * @param maxReadableStreamBytes Max number of bytes stored in read buffer before error * @param maxIdleTimeout * @param pingIntervalTime @@ -127,8 +100,6 @@ class WebSocketServer extends EventTarget { */ constructor( protected logger: Logger, - protected fs: FileSystem, - protected maxReadableStreamBytes, protected maxIdleTimeout: number | undefined, protected pingIntervalTime: number, protected pingTimeoutTimeTime: number, @@ -138,7 +109,6 @@ class WebSocketServer extends EventTarget { public async start({ tlsConfig, - basePath = os.tmpdir(), host, port = 0, connectionCallback, @@ -158,47 +128,42 @@ class WebSocketServer extends EventTarget { }; this.addEventListener('connection', this.connectionEventHandler); } - await this.setupServer(basePath, tlsConfig); - this.server.ws('/*', { - sendPingsAutomatically: true, - idleTimeout: this.maxIdleTimeout, - upgrade: this.upgrade, - open: this.open, - message: this.message, - close: this.close, - drain: this.drain, - pong: this.pong, - // Ping uses default behaviour. - // We don't use subscriptions. + this.server = https.createServer({ + key: tlsConfig.keyPrivatePem, + cert: tlsConfig.certChainPem, }); - this.server.any('/*', (res, _) => { - // Reject normal requests with an upgrade code - res - .writeStatus('426') - .writeHeader('connection', 'Upgrade') - .writeHeader('upgrade', 'websocket') - .end('426 Upgrade Required', true); + this.webSocketServer = new ws.WebSocketServer({ + host: this._host, + port: this._port, + server: this.server, }); + + this.webSocketServer.on('connection', this.connectionHandler); + this.webSocketServer.on('close', this.closeHandler); + this.server.on('close', this.closeHandler); + this.webSocketServer.on('error', this.errorHandler); + this.server.on('error', this.errorHandler); + this.server.on('request', this.requestHandler); + + // This.server.any('/*', (res, _) => { + // // Reject normal requests with an upgrade code + // res + // .writeStatus('426') + // .writeHeader('connection', 'Upgrade') + // .writeHeader('upgrade', 'websocket') + // .end('426 Upgrade Required', true); + // }); + + // TODO: tell normal requests to upgrade. const listenProm = promise(); - const listenCallback = (listenSocket) => { - if (listenSocket) { - this.listenSocket = listenSocket; - listenProm.resolveP(); - } else { - listenProm.rejectP(new webSocketErrors.ErrorServerPortUnavailable()); - } - }; - if (host != null) { - // With custom host - this.server.listen(host, port ?? 0, listenCallback); - } else { - // With default host - this.server.listen(port, listenCallback); - } + this.server.listen(port ?? 0, host, listenProm.resolveP); await listenProm.p; - this._port = uWebsocket.us_socket_local_port(this.listenSocket); + const address = this.server.address(); + // TODO: handle string + if (address == null || typeof address === 'string') never(); + this._port = address.port; this.logger.debug(`Listening on port ${this._port}`); - this._host = host ?? '127.0.0.1'; + this._host = address.address ?? '127.0.0.1'; this.dispatchEvent( new webSocketEvents.StartEvent({ detail: { @@ -212,8 +177,6 @@ class WebSocketServer extends EventTarget { public async stop(force: boolean = false): Promise { this.logger.info(`Stopping ${this.constructor.name}`); - // Close the server by closing the underlying socket - uWebsocket.us_listen_socket_close(this.listenSocket); // Shutting down active websockets if (force) { for (const webSocketStream of this.activeSockets) { @@ -225,9 +188,37 @@ class WebSocketServer extends EventTarget { // Ignore errors, we only care that it finished webSocketStream.endedProm.catch(() => {}); } + // Close the server by closing the underlying socket + const wssCloseProm = promise(); + this.webSocketServer.close((e) => { + if (e == null || e.message === 'The server is not running') { + wssCloseProm.resolveP(); + } else { + wssCloseProm.rejectP(e); + } + }); + await wssCloseProm.p; + const serverCloseProm = promise(); + this.server.close((e) => { + if (e == null || e.message === 'Server is not running.') { + serverCloseProm.resolveP(); + } else { + serverCloseProm.rejectP(e); + } + }); + await serverCloseProm.p; + // Removing handlers if (this.connectionEventHandler != null) { this.removeEventListener('connection', this.connectionEventHandler); } + + this.webSocketServer.off('connection', this.connectionHandler); + this.webSocketServer.off('close', this.closeHandler); + this.server.off('close', this.closeHandler); + this.webSocketServer.off('error', this.errorHandler); + this.server.off('error', this.errorHandler); + this.server.on('request', this.requestHandler); + this.dispatchEvent(new webSocketEvents.StopEvent()); this.logger.info(`Stopped ${this.constructor.name}`); } @@ -242,68 +233,30 @@ class WebSocketServer extends EventTarget { return this._host; } - /** - * This creates the pem files and starts the server with them. It ensures that - * files are cleaned up to the best of its ability. - */ - protected async setupServer(basePath: string, tlsConfig: TLSConfig) { - const tmpDir = await this.fs.promises.mkdtemp( - path.join(basePath, 'polykey-'), - ); - // TODO: The key file needs to be in the encrypted format - const keyFile = path.join(tmpDir, 'keyFile.pem'); - const certFile = path.join(tmpDir, 'certFile.pem'); - await this.fs.promises.writeFile(keyFile, tlsConfig.keyPrivatePem); - await this.fs.promises.writeFile(certFile, tlsConfig.certChainPem); - try { - this.server = uWebsocket.SSLApp({ - key_file_name: keyFile, - cert_file_name: certFile, - }); - } finally { - await this.fs.promises.rm(keyFile); - await this.fs.promises.rm(certFile); - await this.fs.promises.rm(tmpDir, { recursive: true, force: true }); - } - } - - /** - * Applies default upgrade behaviour and creates a UserData object we can - * mutate for the Context - */ - protected upgrade = ( - res: HttpResponse, - req: HttpRequest, - context: us_socket_context_t, - ) => { - const logger = this.logger.getChild(`Connection ${this.connectionIndex}`); - res.upgrade>( - { - logger, - }, - req.getHeader('sec-websocket-key'), - req.getHeader('sec-websocket-protocol'), - req.getHeader('sec-websocket-extensions'), - context, - ); - this.connectionIndex += 1; - }; - /** * Handles the creation of the `ReadableWritablePair` and provides it to the * StreamPair handler. */ - protected open = (ws: WebSocket) => { + protected connectionHandler = ( + webSocket: ws.WebSocket, + request: IncomingMessage, + ) => { + const socket = request.connection; const webSocketStream = new WebSocketStreamServerInternal( - ws, - this.maxReadableStreamBytes, + webSocket, this.pingIntervalTime, this.pingTimeoutTimeTime, - {}, // TODO: fill in connection metadata + { + localHost: socket.localAddress ?? '', + localPort: socket.localPort ?? 0, + remoteHost: socket.remoteAddress ?? '', + remotePort: socket.remotePort ?? '', + }, + this.logger.getChild(WebSocketStreamServerInternal.name), ); // Adding socket to the active sockets map this.activeSockets.add(webSocketStream); - webSocketStream.endedProm + void webSocketStream.endedProm // Ignore errors, we only care that it finished .catch(() => {}) .finally(() => { @@ -322,52 +275,52 @@ class WebSocketServer extends EventTarget { }; /** - * Routes incoming messages to each stream using the `Context` message - * callback. + * Used to trigger stopping if the underlying server fails */ - protected message = ( - ws: WebSocket, - message: ArrayBuffer, - isBinary: boolean, - ) => { - ws.getUserData().message(ws, message, isBinary); - }; - - protected drain = (ws: WebSocket) => { - ws.getUserData().drain(ws); + protected closeHandler = async () => { + if (this[status] == null || this[status] === 'stopping') { + this.logger.debug('close event but already stopping'); + return; + } + this.logger.debug('close event, forcing stop'); + await this.stop(true); }; - protected close = ( - ws: WebSocket, - code: number, - message: ArrayBuffer, - ) => { - ws.getUserData().close(ws, code, message); + /** + * Used to propagate error conditions + */ + protected errorHandler = (e: Error) => { + this.logger.error(e); }; - protected pong = (ws: WebSocket, message: ArrayBuffer) => { - ws.getUserData().pong(ws, message); + /** + * Will tell any normal HTTP request to upgrade + */ + protected requestHandler = (_req, res: ServerResponse) => { + res + .writeHead(426, '426 Upgrade Required', { + connection: 'Upgrade', + upgrade: 'websocket', + }) + .end('426 Upgrade Required'); }; } class WebSocketStreamServerInternal extends WebSocketStream { - protected backPressure: PromiseDeconstructed | null = null; - protected writeBackpressure: boolean = false; protected writableController: WritableStreamDefaultController | undefined; protected readableController: | ReadableStreamController | undefined; + protected messageHandler: (data: ws.RawData, isBinary: boolean) => void; constructor( - protected ws: WebSocket, - maxReadBufferBytes: number, + protected webSocket: ws.WebSocket, pingInterval: number, pingTimeoutTime: number, protected metadata: Record, + protected logger: Logger, ) { super(); - const context = ws.getUserData(); - const logger = context.logger; logger.info('WS opened'); const writableLogger = logger.getChild('Writable'); const readableLogger = logger.getChild('Readable'); @@ -377,38 +330,34 @@ class WebSocketStreamServerInternal extends WebSocketStream { this.writableController = controller; }, write: async (chunk, controller) => { - await this.backPressure?.p; - const writeResult = ws.send(chunk, true); - switch (writeResult) { - default: - case 2: - // Write failure, emit error - writableLogger.error('Send error'); - controller.error(new webSocketErrors.ErrorServerSendFailed()); - break; - case 0: - writableLogger.info('Write backpressure'); - // Signal backpressure - this.backPressure = promise(); - this.writeBackpressure = true; - this.backPressure.p.finally(() => { - this.writeBackpressure = false; - }); - break; - case 1: - // Success - writableLogger.debug(`Sending ${Buffer.from(chunk).toString()}`); - break; + const writeResultProm = promise(); + this.webSocket.send(chunk, (err) => { + if (err == null) writeResultProm.resolveP(); + else writeResultProm.rejectP(err); + }); + try { + await writeResultProm.p; + writableLogger.debug(`Sending ${Buffer.from(chunk).toString()}`); + } catch (e) { + this.logger.error(e); + controller.error(new webSocketErrors.ErrorServerSendFailed()); } }, - close: () => { + close: async () => { writableLogger.info('Closed, sending null message'); - if (!this._webSocketEnded) ws.send(Buffer.from([]), true); + if (!this._webSocketEnded) { + const endProm = promise(); + this.webSocket.send(Buffer.from([]), (err) => { + if (err == null) endProm.resolveP(); + else endProm.rejectP(err); + }); + await endProm.p; + } this.signalWritableEnd(); if (this._readableEnded && !this._webSocketEnded) { writableLogger.debug('Ending socket'); this.signalWebSocketEnd(); - ws.end(); + this.webSocket.close(); } }, abort: (reason) => { @@ -416,71 +365,80 @@ class WebSocketStreamServerInternal extends WebSocketStream { if (this._readableEnded && !this._webSocketEnded) { writableLogger.debug('Ending socket'); this.signalWebSocketEnd(reason); - ws.end(4000, 'Aborting connection'); + this.webSocket.close(4000, 'Aborting connection'); } }, + }, + { + highWaterMark: 1, }); // Setting up the readable stream + this.messageHandler = (data: ws.RawData, isBinary: boolean) => { + if (!isBinary) never(); + if (data instanceof Array) never(); + const messageBuffer = Buffer.from(data); + readableLogger.debug(`Received ${messageBuffer.toString()}`); + if (messageBuffer.byteLength === 0) { + readableLogger.debug('Null message received'); + this.webSocket.off('message', this.messageHandler); + if (!this._readableEnded) { + readableLogger.debug('Closing'); + this.signalReadableEnd(); + this.readableController!.close(); + if (this._writableEnded && !this._webSocketEnded) { + readableLogger.debug('Ending socket'); + this.signalWebSocketEnd(); + this.webSocket.close(); + } + } + return; + } + this.readableController!.enqueue(messageBuffer); + if ( + this.readableController!.desiredSize != null && + this.readableController!.desiredSize < 0 + ) { + this.webSocket.pause(); + } + }; this.readable = new ReadableStream( { start: (controller) => { this.readableController = controller; - context.message = (ws, message, _) => { - const messageBuffer = Buffer.from(message); - readableLogger.debug(`Received ${messageBuffer.toString()}`); - if (message.byteLength === 0) { - readableLogger.debug('Null message received'); - if (!this._readableEnded) { - readableLogger.debug('Closing'); - this.signalReadableEnd(); - controller.close(); - if (this._writableEnded && !this._webSocketEnded) { - readableLogger.debug('Ending socket'); - this.signalWebSocketEnd(); - ws.end(); - } - } - return; - } - controller.enqueue(messageBuffer); - if (controller.desiredSize != null && controller.desiredSize < 0) { - readableLogger.error('Read stream buffer full'); - const err = new webSocketErrors.ErrorServerReadableBufferLimit(); - if (!this._webSocketEnded) { - this.signalWebSocketEnd(err); - ws.end(4000, 'Read stream buffer full'); - } - controller.error(err); - } - }; + this.webSocket.on('message', this.messageHandler); + }, + pull: () => { + this.webSocket.resume(); }, cancel: (reason) => { + this.webSocket.off('message', this.messageHandler); this.signalReadableEnd(reason); if (this._writableEnded && !this._webSocketEnded) { readableLogger.debug('Ending socket'); this.signalWebSocketEnd(); - ws.end(); + this.webSocket.close(); } }, }, { - highWaterMark: maxReadBufferBytes, - size: (chunk) => chunk?.byteLength ?? 0, + highWaterMark: 1, }, ); const pingTimer = setInterval(() => { - ws.ping(); + this.webSocket.ping(); }, pingInterval); const pingTimeoutTimeTimer = setTimeout(() => { logger.debug('Ping timed out'); - ws.end(); + this.webSocket.close(); }, pingTimeoutTime); - context.pong = () => { - logger.debug('Received pong'); + const pongHandler = (data: Buffer) => { + logger.debug(`Received pong with (${data.toString()})`); pingTimeoutTimeTimer.refresh(); }; - context.close = () => { + this.webSocket.on('pong', pongHandler); + + const closeHandler = () => { logger.debug('Closing'); this.signalWebSocketEnd(); // Cleaning up timers @@ -489,10 +447,12 @@ class WebSocketStreamServerInternal extends WebSocketStream { clearTimeout(pingTimeoutTimeTimer); // Closing streams logger.debug('Cleaning streams'); + this.webSocket.off('message', this.messageHandler); const err = new webSocketErrors.ErrorServerConnectionEndedEarly(); if (!this._readableEnded) { readableLogger.debug('Closing'); this.signalReadableEnd(err); + this.webSocket.off('message', this.messageHandler); this.readableController?.error(err); } if (!this._writableEnded) { @@ -501,10 +461,7 @@ class WebSocketStreamServerInternal extends WebSocketStream { this.writableController?.error(err); } }; - context.drain = () => { - logger.debug('Drained'); - this.backPressure?.resolveP(); - }; + this.webSocket.once('close', closeHandler); } get meta(): Record { @@ -518,6 +475,7 @@ class WebSocketStreamServerInternal extends WebSocketStream { const err = reason ?? new webSocketErrors.ErrorClientConnectionEndedEarly(); // Close the streams with the given error, if (!this._readableEnded) { + this.webSocket.off('message', this.messageHandler); this.readableController?.error(err); this.signalReadableEnd(err); } @@ -527,7 +485,7 @@ class WebSocketStreamServerInternal extends WebSocketStream { } // Then close the websocket if (!this._webSocketEnded) { - this.ws.end(4000, 'Ending connection'); + this.webSocket.terminate(); this.signalWebSocketEnd(err); } } diff --git a/tests/scratch.test.ts b/tests/scratch.test.ts index c7c21d965..4959b5280 100644 --- a/tests/scratch.test.ts +++ b/tests/scratch.test.ts @@ -1,14 +1,13 @@ import Logger, { LogLevel, StreamHandler } from '@matrixai/logger'; -import NodeManager from '@/nodes/NodeManager'; // This is a 'scratch paper' test file for quickly running tests in the CI describe('scratch', () => { - const _logger = new Logger(`${NodeManager.name} test`, LogLevel.WARN, [ + const _logger = new Logger(`scratch test`, LogLevel.WARN, [ new StreamHandler(), ]); -}); -// We can't have empty test files so here is a sanity test -test('Should avoid empty test suite', async () => { - expect(1 + 1).toBe(2); + // We can't have empty test files so here is a sanity test + test('Should avoid empty test suite', async () => { + expect(1 + 1).toBe(2); + }); }); diff --git a/tests/websockets/WebSocket.test.ts b/tests/websockets/WebSocket.test.ts index 3d56191d6..b7fb00356 100644 --- a/tests/websockets/WebSocket.test.ts +++ b/tests/websockets/WebSocket.test.ts @@ -2,7 +2,6 @@ import type { ReadableWritablePair } from 'stream/web'; import type { TLSConfig } from '@/network/types'; import type { KeyPair } from '@/keys/types'; import type http from 'http'; -import type WebSocketStream from '@/websockets/WebSocketStream'; import fs from 'fs'; import path from 'path'; import os from 'os'; @@ -10,6 +9,7 @@ import https from 'https'; import Logger, { formatting, LogLevel, StreamHandler } from '@matrixai/logger'; import { testProp, fc } from '@fast-check/jest'; import { Timer } from '@matrixai/timer'; +import { status } from '@matrixai/async-init'; import { KeyRing } from '@/keys/index'; import WebSocketServer from '@/websockets/WebSocketServer'; import WebSocketClient from '@/websockets/WebSocketClient'; @@ -227,46 +227,14 @@ describe('WebSocket', () => { } }, ); - test('reverse backpressure', async () => { - const backpressure = promise(); - const resumeWriting = promise(); - let webSocketStream: WebSocketStream | null = null; + test('handles https server failure', async () => { webSocketServer = await WebSocketServer.createWebSocketServer({ connectionCallback: (streamPair) => { logger.info('inside callback'); - void Promise.allSettled([ - (async () => { - for await (const _ of streamPair.readable) { - // No touch, only consume - } - })(), - (async () => { - // Kidnap the context - // @ts-ignore: kidnap protected property - for (const websocket of webSocketServer.activeSockets.values()) { - webSocketStream = websocket; - } - if (webSocketStream == null) { - await streamPair.writable.close(); - return; - } - // Write until backPressured - const message = Buffer.alloc(128, 0xf0); - const writer = streamPair.writable.getWriter(); - // @ts-ignore: kidnap protected property - while (!webSocketStream.writeBackpressure) { - await writer.write(message); - } - logger.info('BACK PRESSURED'); - backpressure.resolveP(); - await resumeWriting.p; - for (let i = 0; i < 100; i++) { - await writer.write(message); - } - await writer.close(); - logger.info('WRITING ENDED'); - })(), - ]).catch((e) => logger.error(e.toString())); + void streamPair.readable + .pipeTo(streamPair.writable) + .catch(() => {}) + .finally(() => logger.info('STREAM HANDLING ENDED')); }, basePath: dataDir, tlsConfig, @@ -274,82 +242,43 @@ describe('WebSocket', () => { logger: logger.getChild('server'), }); logger.info(`Server started on port ${webSocketServer.getPort()}`); - webSocketClient = await WebSocketClient.createWebSocketClient({ - host, - port: webSocketServer.getPort(), - expectedNodeIds: [keyRing.getNodeId()], - logger: logger.getChild('clientClient'), + + const closeP = promise(); + // @ts-ignore: protected property + webSocketServer.server.close(() => { + closeP.resolveP(); }); - const websocket = await webSocketClient.startConnection(); - await websocket.writable.close(); + await closeP.p; + // The webSocketServer should stop itself + expect(webSocketServer[status]).toBe(null); - await backpressure.p; - // @ts-ignore: kidnap protected property - expect(webSocketStream.writeBackpressure).toBeTrue(); - resumeWriting.resolveP(); - // Consume all the back-pressured data - for await (const _ of websocket.readable) { - // No touch, only consume - } - // @ts-ignore: kidnap protected property - expect(webSocketStream.writeBackpressure).toBeFalse(); logger.info('ending'); }); - // Readable backpressure is not actually supported. We're dealing with it by - // using a buffer with a provided limit that can be very large. - test('exceeding readable buffer limit causes error', async () => { - const startReading = promise(); - const handlingProm = promise(); + test('handles webSocketServer server failure', async () => { webSocketServer = await WebSocketServer.createWebSocketServer({ connectionCallback: (streamPair) => { logger.info('inside callback'); - Promise.all([ - (async () => { - await startReading.p; - logger.info('Starting consumption'); - for await (const _ of streamPair.readable) { - // No touch, only consume - } - logger.info('Reads ended'); - })(), - (async () => { - await streamPair.writable.close(); - })(), - ]) + void streamPair.readable + .pipeTo(streamPair.writable) .catch(() => {}) - .finally(() => handlingProm.resolveP()); + .finally(() => logger.info('STREAM HANDLING ENDED')); }, basePath: dataDir, tlsConfig, host, - // Setting a really low buffer limit - maxReadableStreamBytes: 1500, logger: logger.getChild('server'), }); logger.info(`Server started on port ${webSocketServer.getPort()}`); - webSocketClient = await WebSocketClient.createWebSocketClient({ - host, - port: webSocketServer.getPort(), - expectedNodeIds: [keyRing.getNodeId()], - logger: logger.getChild('clientClient'), + + const closeP = promise(); + // @ts-ignore: protected property + webSocketServer.webSocketServer.close(() => { + closeP.resolveP(); }); - const websocket = await webSocketClient.startConnection(); - const message = Buffer.alloc(1_000, 0xf0); - const writer = websocket.writable.getWriter(); - logger.info('Starting writes'); - await expect(async () => { - for (let i = 0; i < 100; i++) { - await writer.write(message); - } - }).rejects.toThrow(); - startReading.resolveP(); - logger.info('writes ended'); - await expect(async () => { - for await (const _ of websocket.readable) { - // No touch, only consume - } - }).rejects.toThrow(); - await handlingProm.p; + await closeP.p; + // The webSocketServer should stop itself + expect(webSocketServer[status]).toBe(null); + logger.info('ending'); }); test('client ends connection abruptly', async () => { @@ -468,6 +397,7 @@ describe('WebSocket', () => { [messagesArb, messagesArb], async (messages1, messages2) => { try { + const serverStreamProm = promise(); webSocketServer = await WebSocketServer.createWebSocketServer({ connectionCallback: (streamPair) => { logger.info('inside callback'); @@ -480,7 +410,7 @@ describe('WebSocket', () => { for await (const _ of streamPair.readable) { // No touch, only consume } - })().catch((e) => logger.error(e)); + })().then(serverStreamProm.resolveP, serverStreamProm.rejectP); }, basePath: dataDir, tlsConfig, @@ -496,6 +426,7 @@ describe('WebSocket', () => { }); const websocket = await webSocketClient.startConnection(); await asyncReadWrite(messages1, websocket); + await serverStreamProm.p; logger.info('ending'); } finally { await webSocketServer.stop(true); @@ -507,6 +438,7 @@ describe('WebSocket', () => { [messagesArb, messagesArb], async (messages1, messages2) => { try { + const serverStreamProm = promise(); webSocketServer = await WebSocketServer.createWebSocketServer({ connectionCallback: (streamPair) => { logger.info('inside callback'); @@ -519,7 +451,7 @@ describe('WebSocket', () => { await writer.write(val); } await writer.close(); - })().catch((e) => logger.error(e)); + })().then(serverStreamProm.resolveP, serverStreamProm.rejectP); }, basePath: dataDir, tlsConfig, @@ -535,6 +467,7 @@ describe('WebSocket', () => { }); const websocket = await webSocketClient.startConnection(); await asyncReadWrite(messages1, websocket); + await serverStreamProm.p; logger.info('ending'); } finally { await webSocketServer.stop(true); @@ -546,6 +479,7 @@ describe('WebSocket', () => { [messagesArb, messagesArb], async (messages1, messages2) => { try { + const serverStreamProm = promise(); webSocketServer = await WebSocketServer.createWebSocketServer({ connectionCallback: (streamPair) => { logger.info('inside callback'); @@ -556,7 +490,7 @@ describe('WebSocket', () => { await writer.write(val); } await writer.close(); - })().catch((e) => logger.error(e)); + })().then(serverStreamProm.resolveP, serverStreamProm.rejectP); }, basePath: dataDir, tlsConfig, @@ -572,6 +506,7 @@ describe('WebSocket', () => { }); const websocket = await webSocketClient.startConnection(); await asyncReadWrite(messages1, websocket); + await serverStreamProm.p; logger.info('ending'); } finally { await webSocketServer.stop(true);