From ec6bc837f1b1117d3d243c201387e876f49ebcdd Mon Sep 17 00:00:00 2001 From: Brian Botha Date: Thu, 23 Feb 2023 17:23:32 +1100 Subject: [PATCH] feat: `ClientServer` keepalive and heartbeat [ci skip] --- src/clientRPC/ClientServer.ts | 72 ++++++++++++++++++++-------- tests/clientRPC/clientRPC.test.ts | 78 +++++++++++++++++++------------ 2 files changed, 100 insertions(+), 50 deletions(-) diff --git a/src/clientRPC/ClientServer.ts b/src/clientRPC/ClientServer.ts index 8e32b3b6b6..567015d895 100644 --- a/src/clientRPC/ClientServer.ts +++ b/src/clientRPC/ClientServer.ts @@ -41,6 +41,8 @@ class ClientServer { host, port, idleTimeout, + pingInterval = 1000, + pingTimeout = 10000, fs = require('fs'), maxReadBufferBytes = 1_000_000_000, // About 1 GB logger = new Logger(this.name), @@ -51,12 +53,21 @@ class ClientServer { host?: string; port?: number; idleTimeout?: number; + pingInterval?: number; + pingTimeout?: number; fs?: FileSystem; maxReadBufferBytes?: number; logger?: Logger; }) { logger.info(`Creating ${this.name}`); - const wsServer = new this(logger, fs, maxReadBufferBytes, idleTimeout); + const wsServer = new this( + logger, + fs, + maxReadBufferBytes, + idleTimeout, + pingInterval, + pingTimeout, + ); await wsServer.start({ connectionCallback, tlsConfig, @@ -81,12 +92,16 @@ class ClientServer { * @param fs * @param maxReadBufferBytes Max number of bytes stored in read buffer before error * @param idleTimeout + * @param pingInterval + * @param pingTimeout */ constructor( protected logger: Logger, protected fs: FileSystem, protected maxReadBufferBytes, protected idleTimeout: number | undefined, + protected pingInterval: number, + protected pingTimeout: number, ) {} public async start({ @@ -221,24 +236,6 @@ class ClientServer { let backpressure: PromiseDeconstructed | null = null; let writableController: WritableStreamDefaultController | undefined; let readableController: ReadableStreamController | undefined; - context.close = () => { - logger.debug('CLOSING CALLED'); - wsClosed = true; - if (!readableClosed) { - logger.debug('CLOSING READABLE'); - readableController?.error(Error('TMP Web stream closed early SR')); - readableClosed = true; - } - if (!writableClosed) { - logger.debug('CLOSING Writable'); - writableController?.error(Error('TMP Web stream closed early SW')); - writableClosed = true; - } - }; - context.drain = () => { - logger.debug('DRAINING CALLED'); - backpressure?.resolveP(); - }; // Setting up the writable stream const writableStream = new WritableStream({ start: (controller) => { @@ -329,6 +326,43 @@ class ClientServer { size: (chunk) => chunk?.byteLength ?? 0, }, ); + + const pingTimer = setInterval(() => { + ws.ping(); + }, this.pingInterval); + const pingTimeoutTimer = setTimeout(() => { + logger.debug('ping timed out'); + ws.end(); + }, this.pingTimeout); + context.pong = () => { + logger.debug('received pong'); + pingTimeoutTimer.refresh(); + }; + context.close = () => { + logger.debug('CLOSING CALLED'); + wsClosed = true; + // Cleaning up timers + logger.debug('Cleaning up timers'); + clearTimeout(pingTimer); + clearTimeout(pingTimeoutTimer); + // Closing streams + logger.debug('cleaning streams'); + if (!readableClosed) { + logger.debug('CLOSING READABLE'); + readableController?.error(Error('TMP Web stream closed early SR')); + readableClosed = true; + } + if (!writableClosed) { + logger.debug('CLOSING Writable'); + writableController?.error(Error('TMP Web stream closed early SW')); + writableClosed = true; + } + }; + context.drain = () => { + logger.debug('DRAINING CALLED'); + backpressure?.resolveP(); + }; + logger.info('callback'); try { this.connectionCallback({ diff --git a/tests/clientRPC/clientRPC.test.ts b/tests/clientRPC/clientRPC.test.ts index 55f268fbba..a16bb72f82 100644 --- a/tests/clientRPC/clientRPC.test.ts +++ b/tests/clientRPC/clientRPC.test.ts @@ -12,7 +12,7 @@ import { testProp, fc } from '@fast-check/jest'; import { Timer } from '@matrixai/timer'; import { KeyRing } from '@/keys/index'; import ClientServer from '@/clientRPC/ClientServer'; -import { promise, sleep } from '@/utils'; +import { promise } from '@/utils'; import ClientClient from '@/clientRPC/ClientClient'; import * as keysUtils from '@/keys/utils'; import * as networkErrors from '@/network/errors'; @@ -436,36 +436,6 @@ describe('ClientRPC', () => { await expect(clientWritable.write(Buffer.from('test'))).rejects.toThrow(); logger.info('ending'); }); - test('ping pong', async () => { - const waitP = promise(); - clientServer = await ClientServer.createClientServer({ - connectionCallback: (streamPair) => { - logger.info('inside callback'); - void waitP.p.then(() => { - void streamPair.readable - .pipeTo(streamPair.writable) - .catch(() => {}) - .finally(() => loudLogger.info('STREAM HANDLING ENDED')); - }); - }, - basePath: dataDir, - tlsConfig, - host, - logger: loudLogger.getChild('server'), - }); - logger.info(`Server started on port ${clientServer.port}`); - clientClient = await ClientClient.createClientClient({ - host, - port: clientServer.port, - expectedNodeIds: [keyRing.getNodeId()], - logger: logger.getChild('clientClient'), - }); - const websocket = await clientClient.startConnection(); - await sleep(10000); - waitP.resolveP(); - await asyncReadWrite([], websocket); - logger.info('ending'); - }); // These describe blocks contains tests specific to either the client or server describe('ClientServer', () => { @@ -657,6 +627,29 @@ describe('ClientRPC', () => { expect(res.headers['connection']).toBe('Upgrade'); expect(res.headers['upgrade']).toBe('websocket'); }); + test('ping timeout', async () => { + clientServer = await ClientServer.createClientServer({ + connectionCallback: (_) => { + logger.info('inside callback'); + // Hang connection + }, + basePath: dataDir, + tlsConfig, + host, + pingTimeout: 100, + logger: loudLogger.getChild('server'), + }); + logger.info(`Server started on port ${clientServer.port}`); + clientClient = await ClientClient.createClientClient({ + host, + port: clientServer.port, + expectedNodeIds: [keyRing.getNodeId()], + logger: logger.getChild('clientClient'), + }); + await clientClient.startConnection(); + await clientClient.destroy(); + logger.info('ending'); + }); }); describe('ClientClient', () => { test('Destroying ClientClient stops all connections', async () => { @@ -813,5 +806,28 @@ describe('ClientRPC', () => { ).rejects.toThrow(); logger.info('ending'); }); + test('ping timeout', async () => { + clientServer = await ClientServer.createClientServer({ + connectionCallback: (_) => { + logger.info('inside callback'); + // Hang connection + }, + basePath: dataDir, + tlsConfig, + host, + logger: loudLogger.getChild('server'), + }); + logger.info(`Server started on port ${clientServer.port}`); + clientClient = await ClientClient.createClientClient({ + host, + port: clientServer.port, + expectedNodeIds: [keyRing.getNodeId()], + pingTimeout: 100, + logger: logger.getChild('clientClient'), + }); + await clientClient.startConnection(); + await clientClient.destroy(); + logger.info('ending'); + }); }); });