Skip to content

Commit

Permalink
fix: cleaning up temp errors in websocket domain
Browse files Browse the repository at this point in the history
[ci skip]
  • Loading branch information
tegefaulkes committed Mar 24, 2023
1 parent b0876c8 commit a6f69f2
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 22 deletions.
34 changes: 18 additions & 16 deletions src/websockets/WebSocketClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ import WebSocket from 'ws';
import { Timer } from '@matrixai/timer';
import { Validator } from 'ip-num';
import WebSocketStream from './WebSocketStream';
import * as clientRpcUtils from './utils';
import * as clientRPCErrors from './errors';
import * as webSocketUtils from './utils';
import * as webSocketErrors from './errors';
import { promise } from '../utils';

const timeoutSymbol = Symbol('TimedOutSymbol');
Expand Down Expand Up @@ -68,7 +68,7 @@ class WebSocketClient {
} else if (Validator.isValidIPv6String(host)[0]) {
this.host = `[${host}]`;
} else {
throw new clientRPCErrors.ErrorClientInvalidHost();
throw new webSocketErrors.ErrorClientInvalidHost();
}
}

Expand All @@ -85,7 +85,7 @@ class WebSocketClient {
this.logger.info(`Destroyed ${this.constructor.name}`);
}

@createDestroy.ready(new clientRPCErrors.ErrorClientDestroyed())
@createDestroy.ready(new webSocketErrors.ErrorClientDestroyed())
public async stopConnections() {
for (const activeConnection of this.activeConnections) {
activeConnection.end();
Expand All @@ -95,7 +95,7 @@ class WebSocketClient {
}
}

@createDestroy.ready(new clientRPCErrors.ErrorClientDestroyed())
@createDestroy.ready(new webSocketErrors.ErrorClientDestroyed())
public async startConnection({
timeoutTimer,
}: {
Expand All @@ -119,7 +119,7 @@ class WebSocketClient {
// Handle connection failure
const openErrorHandler = (e) => {
connectProm.rejectP(
new clientRPCErrors.ErrorClientConnectionFailed(undefined, {
new webSocketErrors.ErrorClientConnectionFailed(undefined, {
cause: e,
}),
);
Expand All @@ -129,10 +129,10 @@ class WebSocketClient {
ws.once('upgrade', async (request) => {
const tlsSocket = request.socket as TLSSocket;
const peerCert = tlsSocket.getPeerCertificate(true);
clientRpcUtils
webSocketUtils
.verifyServerCertificateChain(
this.expectedNodeIds,
clientRpcUtils.detailedToCertChain(peerCert),
webSocketUtils.detailedToCertChain(peerCert),
)
.then(authenticateProm.resolveP, authenticateProm.rejectP);
});
Expand All @@ -154,7 +154,7 @@ class WebSocketClient {
await Promise.all([authenticateProm.p, connectProm.p]),
]);
if (result === timeoutSymbol) {
throw new clientRPCErrors.ErrorClientConnectionTimedOut();
throw new webSocketErrors.ErrorClientConnectionTimedOut();
}
} catch (e) {
// Clean up
Expand Down Expand Up @@ -244,7 +244,7 @@ class WebSocketStreamClientInternal extends WebSocketStream {
readableLogger.debug(
`Closed early, ${code}, ${reason.toString()}`,
);
const e = new clientRPCErrors.ErrorClientConnectionEndedEarly();
const e = new webSocketErrors.ErrorClientConnectionEndedEarly();
this.signalReadableEnd(e);
controller.error(e);
}
Expand Down Expand Up @@ -289,7 +289,7 @@ class WebSocketStreamClientInternal extends WebSocketStream {
ws.once('close', (code, reason) => {
if (!this.writableEnded_) {
writableLogger.debug(`Closed early, ${code}, ${reason.toString()}`);
const e = new clientRPCErrors.ErrorClientConnectionEndedEarly();
const e = new webSocketErrors.ErrorClientConnectionEndedEarly();
this.signalWritableEnd(e);
controller.error(e);
}
Expand All @@ -304,9 +304,9 @@ class WebSocketStreamClientInternal extends WebSocketStream {
ws.close();
}
},
abort: () => {
abort: (reason) => {
writableLogger.debug('Aborted');
this.signalWritableEnd(Error('TMP ABORTED'));
this.signalWritableEnd(reason);
if (this.readableEnded_) {
writableLogger.debug('Closing socket');
ws.close();
Expand All @@ -321,7 +321,7 @@ class WebSocketStreamClientInternal extends WebSocketStream {
// Opting to debug message here and not log an error, sending
// failure is common if we send before the close event.
writableLogger.debug('failed to send');
const err = new clientRPCErrors.ErrorClientConnectionEndedEarly(
const err = new webSocketErrors.ErrorClientConnectionEndedEarly(
undefined,
{
cause: e,
Expand Down Expand Up @@ -356,7 +356,9 @@ class WebSocketStreamClientInternal extends WebSocketStream {
logger.debug('WebSocket closed');
const err =
code !== 1000
? Error(`TMP WebSocket ended with code ${code}, ${reason.toString()}`)
? new webSocketErrors.ErrorClientConnectionEndedEarly(
`ended with code ${code}, ${reason.toString()}`,
)
: undefined;
this.signalWebSocketEnd(err);
logger.debug('Cleaning up timers');
Expand All @@ -367,7 +369,7 @@ class WebSocketStreamClientInternal extends WebSocketStream {
}

end(): void {
this.ws.close(4001, 'TMP ENDING CONNECTION');
this.ws.close(4001, 'Ending connection');
}
}

Expand Down
10 changes: 5 additions & 5 deletions src/websockets/WebSocketServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -402,11 +402,11 @@ class WebSocketStreamServerInternal extends WebSocketStream {
ws.end();
}
},
abort: () => {
abort: (reason) => {
writableLogger.info('Aborted');
if (this.readableEnded_ && !this.webSocketEnded_) {
writableLogger.debug('Ending socket');
this.signalWebSocketEnd(Error('TMP ERROR ABORTED'));
this.signalWebSocketEnd(reason);
ws.end(4001, 'ABORTED');
}
},
Expand Down Expand Up @@ -445,8 +445,8 @@ class WebSocketStreamServerInternal extends WebSocketStream {
}
};
},
cancel: () => {
this.signalReadableEnd(Error('TMP READABLE CANCELLED'));
cancel: (reason) => {
this.signalReadableEnd(reason);
if (this.writableEnded_ && !this.webSocketEnded_) {
readableLogger.debug('Ending socket');
this.signalWebSocketEnd();
Expand Down Expand Up @@ -499,7 +499,7 @@ class WebSocketStreamServerInternal extends WebSocketStream {
}

end(): void {
this.ws.end(4001, 'TMP ENDING CONNECTION');
this.ws.end(4001, 'Ending connection');
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/websockets/WebSocketStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ abstract class WebSocketStream
result[2].status === 'rejected'
) {
// Throw a compound error
throw Error('TMP Stream failed', { cause: result });
throw AggregateError(result, 'stream failed');
}
// Otherwise return nothing
});
Expand Down

0 comments on commit a6f69f2

Please sign in to comment.