diff --git a/src/clientRPC/ClientClient.ts b/src/clientRPC/ClientClient.ts index 10fda4ec62..d6ca4d915b 100644 --- a/src/clientRPC/ClientClient.ts +++ b/src/clientRPC/ClientClient.ts @@ -9,6 +9,7 @@ import { PromiseCancellable } from '@matrixai/async-cancellable'; import { Timer } from '@matrixai/timer'; import { Validator } from 'ip-num'; import * as clientRpcUtils from './utils'; +import * as clientRPCErrors from './errors'; import { promise } from '../utils'; const timeoutSymbol = Symbol('TimedOutSymbol'); @@ -69,7 +70,7 @@ class ClientClient { } else if (Validator.isValidIPv6String(host)[0]) { this.host = `[${host}]`; } else { - throw Error('TMP Invalid host'); + throw new clientRPCErrors.ErrorClientInvalidHost(); } } @@ -86,7 +87,7 @@ class ClientClient { this.logger.info(`Destroyed ${this.constructor.name}`); } - @createDestroy.ready(Error('TMP destroyed')) + @createDestroy.ready(new clientRPCErrors.ErrorClientDestroyed()) public async startConnection({ timeoutTimer, }: { @@ -125,7 +126,11 @@ class ClientClient { ); // Handle connection failure const openErrorHandler = (e) => { - connectProm.rejectP(Error('TMP ERROR Connection failure', { cause: e })); + connectProm.rejectP( + new clientRPCErrors.ErrorClientConnectionFailed(undefined, { + cause: e, + }), + ); }; ws.once('error', openErrorHandler); // Authenticate server's certificates @@ -153,7 +158,9 @@ class ClientClient { timer?.then(() => timeoutSymbol) ?? new Promise(() => {}), await Promise.all([authenticateProm.p, connectProm.p]), ]); - if (result === timeoutSymbol) throw Error('TMP timed out'); + if (result === timeoutSymbol) { + throw new clientRPCErrors.ErrorClientConnectionTimedOut(); + } } catch (e) { // Clean up // unregister handlers @@ -177,52 +184,67 @@ class ClientClient { const readableStream = new ReadableStream( { start: (controller) => { - readableLogger.info('STARTING'); + readableLogger.info('Starting'); const messageHandler = (data) => { - // ReadableLogger.debug(`message: ${data.toString()}`); + readableLogger.debug(`Received ${data.toString()}`); if (controller.desiredSize == null) { controller.error(Error('NEVER')); return; } if (controller.desiredSize < 0) { - // ReadableLogger.debug('PAUSING'); + readableLogger.debug('Applying readable backpressure'); ws.pause(); } const message = data as Buffer; if (message.length === 0) { - readableLogger.info('CLOSING, NULL MESSAGE'); + readableLogger.debug('Null message received'); ws.removeListener('message', messageHandler); if (!readableClosed) { - controller.close(); readableClosed = true; + readableLogger.debug('Closing'); + controller.close(); } if (writableClosed) { + this.logger.debug('Closing socket'); ws.close(); } return; } controller.enqueue(message); }; + readableLogger.debug('Registering socket message handler'); ws.on('message', messageHandler); - ws.once('close', () => { - readableLogger.info('CLOSED, WS CLOSED'); + ws.once('close', (code, reason) => { + this.logger.info('Socket closed'); ws.removeListener('message', messageHandler); if (!readableClosed) { - controller.error(Error('TMP WebSocket Closed early CR')); readableClosed = true; + readableLogger.debug( + `Closed early, ${code}, ${reason.toString()}`, + ); + controller.error( + new clientRPCErrors.ErrorClientConnectionEndedEarly(), + ); + } + }); + ws.once('error', (e) => { + if (!readableClosed) { + readableClosed = true; + readableLogger.error(e); + controller.error(e); } }); - ws.once('error', (e) => readableLogger.error(e)); }, cancel: () => { - readableLogger.info('CANCELLED'); + readableLogger.debug('Cancelled'); if (!readableClosed) { - ws.close(); + readableLogger.debug('Closing socket'); readableClosed = true; + ws.close(); } }, pull: () => { - // ReadableLogger.debug('RESUMING'); + readableLogger.debug('Releasing backpressure'); ws.resume(); }, }, @@ -233,43 +255,47 @@ class ClientClient { ); const writableStream = new WritableStream({ start: (controller) => { - writableLogger.info('STARTING'); + writableLogger.info('Starting'); ws.once('error', (e) => { - writableLogger.error(`error: ${e}`); if (!writableClosed) { - controller.error(e); writableClosed = true; + writableLogger.error(e.toString()); + controller.error(e); } }); ws.once('close', (code, reason) => { if (!writableClosed) { - writableLogger.info( - `ws closing early! with code: ${code} and reason: ${reason.toString()}`, + writableClosed = true; + writableLogger.debug(`Closed early, ${code}, ${reason.toString()}`); + controller.error( + new clientRPCErrors.ErrorClientConnectionEndedEarly(), ); - controller.error(Error('TMP WebSocket Closed early CW')); } }); }, close: () => { - writableLogger.info('CLOSING'); + writableLogger.debug('Closing, sending null message'); ws.send(Buffer.from([])); writableClosed = true; if (readableClosed) { + writableLogger.debug('Closing socket'); ws.close(); } }, abort: () => { - writableLogger.info('ABORTED'); + writableLogger.debug('Aborted'); writableClosed = true; if (readableClosed) { + writableLogger.debug('Closing socket'); ws.close(); } }, write: async (chunk, controller) => { - // WritableLogger.debug(`writing: ${chunk?.toString()}`); + writableLogger.debug(`Sending ${chunk?.toString()}`); const wait = promise(); ws.send(chunk, (e) => { if (e != null) { + writableLogger.error(e.toString()); controller.error(e); } wait.resolveP(); @@ -283,15 +309,15 @@ class ClientClient { ws.ping(); }, this.pingInterval); const pingTimeoutTimer = setTimeout(() => { - this.logger.debug('PING TIMED OUT'); + this.logger.debug('Ping timed out'); ws.close(4002, 'Timed out'); }, this.pingTimeout); ws.on('ping', () => { - this.logger.debug('received ping'); + this.logger.debug('Received ping'); ws.pong(); }); ws.on('pong', () => { - this.logger.debug('received pong'); + this.logger.debug('Received pong'); pingTimeoutTimer.refresh(); }); ws.once('close', () => { diff --git a/src/clientRPC/ClientServer.ts b/src/clientRPC/ClientServer.ts index 567015d895..a241306f47 100644 --- a/src/clientRPC/ClientServer.ts +++ b/src/clientRPC/ClientServer.ts @@ -12,6 +12,7 @@ import os from 'os'; import { startStop } from '@matrixai/async-init'; import Logger from '@matrixai/logger'; import uWebsocket from 'uWebSockets.js'; +import * as clientRPCErrors from './errors'; import { promise } from '../utils'; type ConnectionCallback = ( @@ -109,7 +110,7 @@ class ClientServer { tlsConfig, basePath = os.tmpdir(), host, - port, + port = 0, }: { connectionCallback: ConnectionCallback; tlsConfig: TLSConfig; @@ -119,12 +120,11 @@ class ClientServer { }): Promise { this.logger.info(`Starting ${this.constructor.name}`); this.connectionCallback = connectionCallback; - // TODO: take a TLS config, write the files in the temp directory and - // load them. let count = 0; const tmpDir = await this.fs.promises.mkdtemp( path.join(basePath, 'polykey-'), ); + // TODO: The key file needs to be in the encrypted format const keyFile = path.join(tmpDir, 'keyFile.pem'); const certFile = path.join(tmpDir, 'certFile.pem'); await this.fs.promises.writeFile(keyFile, tlsConfig.keyPrivatePem); @@ -170,38 +170,33 @@ class ClientServer { ws.getUserData().drain(ws); }, }); + this.server.any('/*', (res, _) => { + // Reject normal requests with an upgrade code + res + .writeStatus('426') + .writeHeader('connection', 'Upgrade') + .writeHeader('upgrade', 'websocket') + .end('426 Upgrade Required', true); + }); const listenProm = promise(); + const listenCallback = (listenSocket) => { + if (listenSocket) { + this.listenSocket = listenSocket; + listenProm.resolveP(); + } else { + listenProm.rejectP(new clientRPCErrors.ErrorServerPortUnavailable()); + } + }; if (host != null) { // With custom host - this.server.any('/*', (res, _) => { - res - .writeStatus('426') - .writeHeader('connection', 'Upgrade') - .writeHeader('upgrade', 'websocket') - .end('426 Upgrade Required', true); - }); - this.server.listen(host, port ?? 0, (listenSocket) => { - if (listenSocket) { - this.listenSocket = listenSocket; - listenProm.resolveP(); - } else { - listenProm.rejectP(Error('TMP, no port')); - } - }); + this.server.listen(host, port ?? 0, listenCallback); } else { // With default host - this.server.listen(port ?? 0, (listenSocket) => { - if (listenSocket) { - this.listenSocket = listenSocket; - listenProm.resolveP(); - } else { - listenProm.rejectP(Error('TMP, no port')); - } - }); + this.server.listen(port, listenCallback); } await listenProm.p; this.logger.debug( - `bound to port ${uWebsocket.us_socket_local_port(this.listenSocket)}`, + `Listening on port ${uWebsocket.us_socket_local_port(this.listenSocket)}`, ); this.host = host ?? '127.0.0.1'; this.logger.info(`Started ${this.constructor.name}`); @@ -236,23 +231,25 @@ class ClientServer { let backpressure: PromiseDeconstructed | null = null; let writableController: WritableStreamDefaultController | undefined; let readableController: ReadableStreamController | undefined; + const writableLogger = logger.getChild('Writable'); + const readableLogger = logger.getChild('Readable'); // Setting up the writable stream const writableStream = new WritableStream({ start: (controller) => { writableController = controller; }, write: async (chunk, controller) => { - // Logger.debug(`WRITABLE WRITE ${chunk.toString()}`); await backpressure?.p; const writeResult = ws.send(chunk, true); switch (writeResult) { default: case 2: // Write failure, emit error - controller.error(Error('TMP Failed to write')); + writableLogger.error('Send error'); + controller.error(new clientRPCErrors.ErrorServerSendFailed()); break; case 0: - logger.info('Write backpressure'); + writableLogger.info('Write backpressure'); // Signal backpressure backpressure = promise(); context.writeBackpressure = true; @@ -262,22 +259,23 @@ class ClientServer { break; case 1: // Success + writableLogger.debug(`Sending ${chunk.toString()}`); break; } }, close: () => { - logger.info('WRITABLE CLOSE'); + writableLogger.info('Closed, sending null message'); if (!wsClosed) ws.send(Buffer.from([]), true); writableClosed = true; if (readableClosed && !wsClosed) { - logger.debug('ENDING WS'); + writableLogger.debug('Ending socket'); ws.end(); } }, abort: () => { - logger.info('WRITABLE ABORT'); + writableLogger.info('Aborted'); if (readableClosed && !wsClosed) { - logger.debug('ENDING WS'); + writableLogger.debug('Ending socket'); ws.end(); } }, @@ -288,35 +286,34 @@ class ClientServer { start: (controller) => { readableController = controller; context.message = (ws, message, _) => { - // Logger.debug(`MESSAGE CALLED ${message.toString()}`); + readableLogger.debug(`Received ${message.toString()}`); if (message.byteLength === 0) { - logger.debug('NULL MESSAGE, CLOSING'); + readableLogger.debug('Null message received'); if (!readableClosed) { - logger.debug('CLOSING READABLE'); - controller.close(); readableClosed = true; + readableLogger.debug('Closing'); + controller.close(); if (writableClosed && !wsClosed) { + readableLogger.debug('Ending socket'); ws.end(); } } return; } controller.enqueue(Buffer.from(message)); - if ( - controller.desiredSize != null && - controller.desiredSize < -1000 - ) { - logger.error('Read stream buffer full'); - const err = Error('TMP read buffer limit'); - if (!wsClosed) ws.end(4001, err.toString()); - controller.error(err); + if (controller.desiredSize != null && controller.desiredSize < 0) { + readableLogger.error('Read stream buffer full'); + if (!wsClosed) ws.end(4001, 'Read stream buffer full'); + controller.error( + new clientRPCErrors.ErrorServerReadableBufferLimit(), + ); } }; }, cancel: () => { readableClosed = true; if (writableClosed && !wsClosed) { - logger.debug('ENDING WS'); + readableLogger.debug('Ending socket'); ws.end(); } }, @@ -331,39 +328,42 @@ class ClientServer { ws.ping(); }, this.pingInterval); const pingTimeoutTimer = setTimeout(() => { - logger.debug('ping timed out'); + logger.debug('Ping timed out'); ws.end(); }, this.pingTimeout); context.pong = () => { - logger.debug('received pong'); + logger.debug('Received pong'); pingTimeoutTimer.refresh(); }; context.close = () => { - logger.debug('CLOSING CALLED'); + logger.debug('Closing'); wsClosed = true; // Cleaning up timers logger.debug('Cleaning up timers'); clearTimeout(pingTimer); clearTimeout(pingTimeoutTimer); // Closing streams - logger.debug('cleaning streams'); + logger.debug('Cleaning streams'); if (!readableClosed) { - logger.debug('CLOSING READABLE'); - readableController?.error(Error('TMP Web stream closed early SR')); readableClosed = true; + readableLogger.debug('Closing'); + readableController?.error( + new clientRPCErrors.ErrorServerConnectionEndedEarly(), + ); } if (!writableClosed) { - logger.debug('CLOSING Writable'); - writableController?.error(Error('TMP Web stream closed early SW')); writableClosed = true; + writableLogger.debug('Closing'); + writableController?.error( + new clientRPCErrors.ErrorServerConnectionEndedEarly(), + ); } }; context.drain = () => { - logger.debug('DRAINING CALLED'); + logger.debug('Drained'); backpressure?.resolveP(); }; - - logger.info('callback'); + logger.debug('Calling handler callback'); try { this.connectionCallback({ readable: readableStream, diff --git a/src/clientRPC/errors.ts b/src/clientRPC/errors.ts new file mode 100644 index 0000000000..bbe9145828 --- /dev/null +++ b/src/clientRPC/errors.ts @@ -0,0 +1,66 @@ +import { ErrorPolykey, sysexits } from '../errors'; + +class ErrorClient extends ErrorPolykey {} + +class ErrorClientClient extends ErrorClient {} + +class ErrorClientDestroyed extends ErrorClientClient{ + static description = 'ClientClient has been destroyed'; + exitCode = sysexits.USAGE; +} + +class ErrorClientInvalidHost extends ErrorClientClient{ + static description = 'Host must be a valid IPv4 or IPv6 address string'; + exitCode = sysexits.USAGE; +} + +class ErrorClientConnectionFailed extends ErrorClientClient{ + static description = 'Failed to establish connection to server'; + exitCode = sysexits.UNAVAILABLE; +} + +class ErrorClientConnectionTimedOut extends ErrorClientClient{ + static description = 'Connection timed out'; + exitCode = sysexits.UNAVAILABLE; +} + +class ErrorClientConnectionEndedEarly extends ErrorClientClient{ + static description = 'Connection ended before stream ended'; + exitCode = sysexits.UNAVAILABLE; +} + +class ErrorClientServer extends ErrorClient {} + +class ErrorServerPortUnavailable extends ErrorClientServer{ + static description = 'Failed to bind a free port'; + exitCode = sysexits.UNAVAILABLE; +} + +class ErrorServerSendFailed extends ErrorClientServer{ + static description = 'Failed to send message'; + exitCode = sysexits.UNAVAILABLE; +} + +class ErrorServerReadableBufferLimit extends ErrorClientServer{ + static description = 'Readable buffer is full, messages received too quickly'; + exitCode = sysexits.USAGE; +} + +class ErrorServerConnectionEndedEarly extends ErrorClientServer{ + static description = 'Connection ended before stream ended'; + exitCode = sysexits.UNAVAILABLE; +} + +export { + ErrorClientClient, + ErrorClientDestroyed, + ErrorClientInvalidHost, + ErrorClientConnectionFailed, + ErrorClientConnectionTimedOut, + ErrorClientConnectionEndedEarly, + ErrorClientServer, + ErrorServerPortUnavailable, + ErrorServerSendFailed, + ErrorServerReadableBufferLimit, + ErrorServerConnectionEndedEarly, +}