Skip to content

Commit

Permalink
Merge pull request #506 from MatrixAI/feature-websocket_client
Browse files Browse the repository at this point in the history
Websocket for Client Service API
  • Loading branch information
tegefaulkes authored Mar 1, 2023
2 parents 36d518f + c6feceb commit 1269570
Show file tree
Hide file tree
Showing 22 changed files with 2,406 additions and 411 deletions.
9 changes: 9 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@
"tslib": "^2.4.0",
"tsyringe": "^4.7.0",
"utp-native": "^2.5.3",
"uWebSockets.js": "github:uNetworking/uWebSockets.js#v20.19.0",
"ws": "^8.12.0"
},
"devDependencies": {
Expand Down
2 changes: 1 addition & 1 deletion src/RPC/RPCServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ import type {
RawHandlerImplementation,
ServerHandlerImplementation,
UnaryHandlerImplementation,
ConnectionInfo,
} from './types';
import type { ReadableWritablePair } from 'stream/web';
import type { JSONValue } from '../types';
import type { ConnectionInfo } from '../network/types';
import type { RPCErrorEvent } from './utils';
import type { MiddlewareFactory } from './types';
import { ReadableStream } from 'stream/web';
Expand Down
2 changes: 1 addition & 1 deletion src/RPC/handlers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import type { JSONValue } from 'types';
import type { ContainerType } from 'RPC/types';
import type { ReadableStream } from 'stream/web';
import type { JsonRpcRequest } from 'RPC/types';
import type { ConnectionInfo } from '../network/types';
import type { ConnectionInfo } from './types';
import type { ContextCancellable } from '../contexts/types';

abstract class Handler<
Expand Down
22 changes: 21 additions & 1 deletion src/RPC/types.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import type { JSONValue } from '../types';
import type { ConnectionInfo } from '../network/types';
import type { ContextCancellable } from '../contexts/types';
import type { ReadableStream, ReadableWritablePair } from 'stream/web';
import type { Handler } from './handlers';
Expand All @@ -11,6 +10,8 @@ import type {
ClientCaller,
UnaryCaller,
} from './callers';
import type { NodeId } from '../nodes/types';
import type { Certificate } from '../keys/types';

/**
* This is the JSON RPC request object. this is the generic message type used for the RPC.
Expand Down Expand Up @@ -108,6 +109,24 @@ type JsonRpcMessage<T extends JSONValue = JSONValue> =
| JsonRpcRequest<T>
| JsonRpcResponse<T>;

/**
* Proxy connection information
* @property remoteNodeId - NodeId of the remote connecting node
* @property remoteCertificates - Certificate chain of the remote connecting node
* @property localHost - Proxy host of the local connecting node
* @property localPort - Proxy port of the local connecting node
* @property remoteHost - Proxy host of the remote connecting node
* @property remotePort - Proxy port of the remote connecting node
*/
type ConnectionInfo = Partial<{
remoteNodeId: NodeId;
remoteCertificates: Array<Certificate>;
localHost: string;
localPort: number;
remoteHost: string;
remotePort: number;
}>;

// Handler types
type HandlerImplementation<I, O> = (
input: I,
Expand Down Expand Up @@ -218,6 +237,7 @@ export type {
JsonRpcRequest,
JsonRpcResponse,
JsonRpcMessage,
ConnectionInfo,
HandlerImplementation,
RawHandlerImplementation,
DuplexHandlerImplementation,
Expand Down
28 changes: 28 additions & 0 deletions src/clientRPC/errors.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import { ErrorPolykey, sysexits } from '../errors';

class ErrorRPC<T> extends ErrorPolykey<T> {}

class ErrorRPCClient<T> extends ErrorRPC<T> {}

class ErrorClientAuthMissing<T> extends ErrorRPCClient<T> {
static description = 'Authorisation metadata is required but missing';
exitCode = sysexits.NOPERM;
}

class ErrorClientAuthFormat<T> extends ErrorRPCClient<T> {
static description = 'Authorisation metadata has invalid format';
exitCode = sysexits.USAGE;
}

class ErrorClientAuthDenied<T> extends ErrorRPCClient<T> {
static description = 'Authorisation metadata is incorrect or expired';
exitCode = sysexits.NOPERM;
}

export {
ErrorRPC,
ErrorRPCClient,
ErrorClientAuthMissing,
ErrorClientAuthFormat,
ErrorClientAuthDenied,
};
220 changes: 6 additions & 214 deletions src/clientRPC/utils.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,9 @@
import type { SessionToken } from '../sessions/types';
import type KeyRing from '../keys/KeyRing';
import type SessionManager from '../sessions/SessionManager';
import type { RPCRequestParams } from './types';
import type { JsonRpcRequest } from '../RPC/types';
import type { ReadableWritablePair } from 'stream/web';
import type Logger from '@matrixai/logger';
import type { ConnectionInfo, Host, Port } from '../network/types';
import type RPCServer from '../RPC/RPCServer';
import type { TLSSocket } from 'tls';
import type { Server } from 'https';
import type net from 'net';
import type https from 'https';
import { ReadableStream, WritableStream } from 'stream/web';
import WebSocket, { WebSocketServer } from 'ws';
import * as clientErrors from '../client/errors';
import { promise } from '../utils';
import type SessionManager from 'sessions/SessionManager';
import type KeyRing from 'keys/KeyRing';
import type { JsonRpcRequest } from 'RPC/types';
import type { SessionToken } from 'sessions/types';
import * as clientErrors from './errors';

async function authenticate(
sessionManager: SessionManager,
Expand Down Expand Up @@ -65,201 +54,4 @@ function encodeAuthFromPassword(password: string): string {
return `Basic ${encoded}`;
}

function readableFromWebSocket(
ws: WebSocket,
logger: Logger,
): ReadableStream<Uint8Array> {
return new ReadableStream<Uint8Array>({
start: (controller) => {
logger.info('starting');
const messageHandler = (data) => {
logger.debug(`message: ${data.toString()}`);
ws.pause();
const message = data as Buffer;
if (message.length === 0) {
logger.info('ENDING');
ws.removeAllListeners('message');
try {
controller.close();
} catch {
// Ignore already closed
}
return;
}
controller.enqueue(message);
};
ws.on('message', messageHandler);
ws.once('close', () => {
logger.info('closed');
ws.removeListener('message', messageHandler);
try {
controller.close();
} catch {
// Ignore already closed
}
});
ws.once('error', (e) => {
controller.error(e);
});
},
cancel: () => {
logger.info('cancelled');
ws.close();
},
pull: () => {
logger.debug('resuming');
ws.resume();
},
});
}

function writeableFromWebSocket(
ws: WebSocket,
holdOpen: boolean,
logger: Logger,
): WritableStream<Uint8Array> {
return new WritableStream<Uint8Array>({
start: (controller) => {
logger.info('starting');
ws.once('error', (e) => {
logger.error(`error: ${e}`);
controller.error(e);
});
ws.once('close', (code, reason) => {
logger.info(
`ws closing early! with code: ${code} and reason: ${reason.toString()}`,
);
controller.error(Error('TMP WebSocket Closed early'));
});
},
close: () => {
logger.info('stream closing');
ws.send(Buffer.from([]));
if (!holdOpen) ws.terminate();
},
abort: () => {
logger.info('aborting');
ws.close();
},
write: async (chunk, controller) => {
logger.debug(`writing: ${chunk?.toString()}`);
const wait = promise<void>();
ws.send(chunk, (e) => {
if (e != null) {
logger.error(`error: ${e}`);
controller.error(e);
}
wait.resolveP();
});
await wait.p;
},
});
}

function webSocketToWebStreamPair(
ws: WebSocket,
holdOpen: boolean,
logger: Logger,
): ReadableWritablePair<Uint8Array, Uint8Array> {
return {
readable: readableFromWebSocket(ws, logger.getChild('readable')),
writable: writeableFromWebSocket(ws, holdOpen, logger.getChild('writable')),
};
}

function startConnection(
host: string,
port: number,
logger: Logger,
): Promise<ReadableWritablePair<Uint8Array, Uint8Array>> {
const ws = new WebSocket(`wss://${host}:${port}`, {
// CheckServerIdentity: (
// servername: string,
// cert: WebSocket.CertMeta,
// ): boolean => {
// console.log('CHECKING IDENTITY');
// console.log(servername);
// console.log(cert);
// return false;
// },
rejectUnauthorized: false,
// Ca: tlsConfig.certChainPem
});
ws.once('close', () => logger.info('CLOSED'));
// Ws.once('upgrade', () => {
// // Const tlsSocket = request.socket as TLSSocket;
// // Console.log(tlsSocket.getPeerCertificate());
// logger.info('Test early cancellation');
// // Request.destroy(Error('some error'));
// // tlsSocket.destroy(Error('some error'));
// // ws.close(12345, 'some reason');
// // TODO: Use the existing verify method from the GRPC implementation
// // TODO: Have this emit an error on verification failure.
// // It's fine for the server side to close abruptly without error
// });
const prom = promise<ReadableWritablePair<Uint8Array, Uint8Array>>();
ws.once('open', () => {
logger.info('starting connection');
prom.resolveP(webSocketToWebStreamPair(ws, true, logger));
});
return prom.p;
}

function handleConnection(ws: WebSocket, logger: Logger): void {
ws.once('close', () => logger.info('CLOSED'));
const readable = readableFromWebSocket(ws, logger.getChild('readable'));
const writable = writeableFromWebSocket(
ws,
false,
logger.getChild('writable'),
);
void readable.pipeTo(writable).catch((e) => logger.error(e));
}

function createClientServer(
server: Server,
rpcServer: RPCServer,
logger: Logger,
) {
logger.info('created server');
const wss = new WebSocketServer({
server,
});
wss.on('error', (e) => logger.error(e));
logger.info('created wss');
wss.on('connection', (ws, req) => {
logger.info('connection!');
const socket = req.socket as TLSSocket;
const streamPair = webSocketToWebStreamPair(ws, false, logger);
rpcServer.handleStream(streamPair, {
localHost: socket.localAddress! as Host,
localPort: socket.localPort! as Port,
remoteCertificates: socket.getPeerCertificate(),
remoteHost: socket.remoteAddress! as Host,
remotePort: socket.remotePort! as Port,
} as unknown as ConnectionInfo);
});
wss.once('close', () => {
wss.removeAllListeners('error');
wss.removeAllListeners('connection');
});
return wss;
}

async function listen(server: https.Server, host?: string, port?: number) {
await new Promise<void>((resolve) => {
server.listen(port, host ?? '127.0.0.1', undefined, () => resolve());
});
const addressInfo = server.address() as net.AddressInfo;
return addressInfo.port;
}

export {
authenticate,
decodeAuth,
encodeAuthFromPassword,
startConnection,
handleConnection,
createClientServer,
listen,
};
export { authenticate, decodeAuth, encodeAuthFromPassword };
1 change: 1 addition & 0 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ interface FileSystem {
readdir: typeof fs.promises.readdir;
rename: typeof fs.promises.rename;
open: typeof fs.promises.open;
mkdtemp: typeof fs.promises.mkdtemp;
};
constants: typeof fs.constants;
}
Expand Down
Loading

0 comments on commit 1269570

Please sign in to comment.