Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Client RPC migration #509

Merged
merged 38 commits into from
Mar 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
e373209
feat: migrating handlers to agnostic RPC
tegefaulkes Feb 24, 2023
ed608e2
tests: migrating handler tests
tegefaulkes Mar 6, 2023
7e050e8
feat: updating client RPC usage to use agnostic RPC
tegefaulkes Mar 7, 2023
cccc5c5
fix: fixing failing tests
tegefaulkes Mar 9, 2023
2a59a45
fix: updated error names, `Rpc` is now `RPC`
tegefaulkes Mar 17, 2023
63bf745
fix: added return types to utils functions
tegefaulkes Mar 17, 2023
db110c3
syntax: renamed `host_` to `_host` to match conventions
tegefaulkes Mar 17, 2023
7312f7c
fix: adding docblock documentation to middleware.ts
tegefaulkes Mar 17, 2023
4fa0d2d
fix: remove duplicate error
tegefaulkes Mar 17, 2023
57c2565
fix: making `PolykeyClient` transport agnostic
tegefaulkes Mar 17, 2023
d469d1f
fix: changing order of discovery message spam fix
tegefaulkes Mar 19, 2023
d18a1f5
fix: renaming `streamPairCreateCallback` to `streamFactory`
tegefaulkes Mar 19, 2023
dbe4b5c
fix: using transport agnostic `PolykeyClient`
tegefaulkes Mar 20, 2023
345930e
fix: removed placeholder error and fixed error message handling
tegefaulkes Mar 21, 2023
5acc202
syntax: renamed metadata `Authorization` to `authorization`
tegefaulkes Mar 21, 2023
2a59c35
syntax: updated client response and request data types
tegefaulkes Mar 21, 2023
119e73d
fix: fixing up directory structures of `client` and `rpc`
tegefaulkes Mar 21, 2023
b2d2697
fix(rpc): moved `ErrorPolykeyRemote` to the bottom of `rpc/errors` an…
CMCDragonkai Mar 21, 2023
8b3447e
fix: removed spurious mention of `ErrorPolykeyRemote`
CMCDragonkai Mar 21, 2023
95b772d
fix: `rpcServerClient` and `webSocketServerClient` are not dependency…
tegefaulkes Mar 22, 2023
b00b9c8
fix: `rpcClient` is now dependency injected into `PolykeyClient`
tegefaulkes Mar 22, 2023
da925ee
fix: changed `JsonRpc` to `JSONRPC`
CMCDragonkai Mar 22, 2023
95c00dc
style: spacing between handler types
CMCDragonkai Mar 22, 2023
9415794
fix: `clientManifest` avoids needless dependencies
tegefaulkes Mar 22, 2023
9df3302
fix: `outputFormatter` handles `ErrorPolykeyRemote` as well as the ol…
tegefaulkes Mar 22, 2023
bc1ee3f
fix: renamed `ErrorRPCClient` to `ErrorClient`
tegefaulkes Mar 22, 2023
86d1cb1
fix: removing a disabled test
tegefaulkes Mar 22, 2023
c253996
fix: updated docblocks and parameter names for `RPCServer` and `RPCCl…
tegefaulkes Mar 22, 2023
a709b91
chore: removed useless comments about XML RPC error codes
CMCDragonkai Mar 22, 2023
6a12022
fix(rpc): await the handler promises in the async generators, this is…
CMCDragonkai Mar 22, 2023
ae5816b
fix(rpc): moved `RPCErrorEvent` to `rpc/events`, collapsed `Error` to…
CMCDragonkai Mar 22, 2023
428c170
chore: expanding `RPCServer` commentary
CMCDragonkai Mar 22, 2023
7352f1f
fix: made `WebSocketServer` `host` and `port` into `getHost()` and `g…
tegefaulkes Mar 22, 2023
a2ae510
chore: updating doc comments for RPC message types
tegefaulkes Mar 23, 2023
8e19b76
fix: change of head, tail destructuring for raw handlers, head and ta…
CMCDragonkai Mar 23, 2023
b0876c8
fix: `RPCServer` handling output stream cancellation
tegefaulkes Mar 24, 2023
a6f69f2
fix: cleaning up temp errors in websocket domain
tegefaulkes Mar 24, 2023
60e3d13
fix: handler receives signal if output stream fails
tegefaulkes Mar 24, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
155 changes: 102 additions & 53 deletions src/PolykeyAgent.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { FileSystem } from './types';
import type { FileSystem, PromiseDeconstructed } from './types';
import type { PolykeyWorkerManagerInterface } from './workers/types';
import type { ConnectionData, Host, Port, TLSConfig } from './network/types';
import type { SeedNodes } from './nodes/types';
Expand All @@ -10,6 +10,10 @@ import process from 'process';
import Logger from '@matrixai/logger';
import { DB } from '@matrixai/db';
import { CreateDestroyStartStop } from '@matrixai/async-init/dist/CreateDestroyStartStop';
import RPCServer from './rpc/RPCServer';
import WebSocketServer from './websockets/WebSocketServer';
import * as middlewareUtils from './rpc/utils/middleware';
import * as authMiddleware from './client/utils/authenticationMiddleware';
import { WorkerManager } from './workers';
import * as networkUtils from './network/utils';
import KeyRing from './keys/KeyRing';
Expand All @@ -32,14 +36,14 @@ import { providers } from './identities';
import Proxy from './network/Proxy';
import { EventBus, captureRejectionSymbol } from './events';
import createAgentService, { AgentServiceService } from './agent/service';
import createClientService, { ClientServiceService } from './client/service';
import config from './config';
import * as errors from './errors';
import * as utils from './utils';
import * as keysUtils from './keys/utils';
import * as nodesUtils from './nodes/utils';
import * as workersUtils from './workers/utils';
import TaskManager from './tasks/TaskManager';
import { serverManifest } from './client/handlers';

type NetworkConfig = {
forwardHost?: Host;
Expand All @@ -49,9 +53,13 @@ type NetworkConfig = {
// GRPCServer for agent service
agentHost?: Host;
agentPort?: Port;
// GRPCServer for client service
// RPCServer for client service
clientHost?: Host;
clientPort?: Port;
maxReadBufferBytes?: number;
idleTimeout?: number;
pingInterval?: number;
pingTimeout?: number;
};

interface PolykeyAgent extends CreateDestroyStartStop {}
Expand Down Expand Up @@ -103,8 +111,9 @@ class PolykeyAgent {
vaultManager,
notificationsManager,
sessionManager,
grpcServerClient,
tegefaulkes marked this conversation as resolved.
Show resolved Hide resolved
rpcServerClient,
grpcServerAgent,
webSocketServerClient,
fs = require('fs'),
logger = new Logger(this.name),
fresh = false,
Expand Down Expand Up @@ -156,8 +165,9 @@ class PolykeyAgent {
vaultManager?: VaultManager;
notificationsManager?: NotificationsManager;
sessionManager?: SessionManager;
grpcServerClient?: GRPCServer;
rpcServerClient?: RPCServer;
grpcServerAgent?: GRPCServer;
webSocketServerClient?: WebSocketServer;
fs?: FileSystem;
logger?: Logger;
fresh?: boolean;
Expand All @@ -183,6 +193,10 @@ class PolykeyAgent {
...config.defaults.nodeConnectionManagerConfig,
...utils.filterEmptyObject(nodeConnectionManagerConfig),
};
const _networkConfig = {
...config.defaults.networkConfig,
...utils.filterEmptyObject(networkConfig),
};
await utils.mkdirExists(fs, nodePath);
const statusPath = path.join(nodePath, config.defaults.statusBase);
const statusLockPath = path.join(nodePath, config.defaults.statusLockBase);
Expand All @@ -193,6 +207,7 @@ class PolykeyAgent {
const events = new EventBus({
captureRejections: true,
});
let pkAgentProm: PromiseDeconstructed<PolykeyAgent> | undefined;
try {
status =
status ??
Expand Down Expand Up @@ -401,18 +416,65 @@ class PolykeyAgent {
// If a recovery code is provided then we reset any sessions in case the
// password changed.
if (keyRingConfig.recoveryCode != null) await sessionManager.resetKey();
grpcServerClient =
grpcServerClient ??
new GRPCServer({
logger: logger.getChild(GRPCServer.name + 'Client'),
if (rpcServerClient == null) {
pkAgentProm = utils.promise();
rpcServerClient = await RPCServer.createRPCServer({
manifest: serverManifest({
acl: acl,
certManager: certManager,
db: db,
discovery: discovery,
fs: fs,
gestaltGraph: gestaltGraph,
identitiesManager: identitiesManager,
keyRing: keyRing,
logger: logger,
nodeConnectionManager: nodeConnectionManager,
nodeGraph: nodeGraph,
nodeManager: nodeManager,
notificationsManager: notificationsManager,
pkAgentProm: pkAgentProm.p,
sessionManager: sessionManager,
vaultManager: vaultManager,
}),
middlewareFactory: middlewareUtils.defaultServerMiddlewareWrapper(
authMiddleware.authenticationMiddlewareServer(
sessionManager,
keyRing,
),
),
sensitive: false,
logger: logger.getChild('RPCServerClient'),
});
}
const tlsConfig: TLSConfig = {
keyPrivatePem: keysUtils.privateKeyToPEM(keyRing.keyPair.privateKey),
certChainPem: await certManager.getCertPEMsChainPEM(),
};
webSocketServerClient =
webSocketServerClient ??
(await WebSocketServer.createWebSocketServer({
connectionCallback: (streamPair) =>
rpcServerClient!.handleStream(streamPair, {}),
fs,
host: _networkConfig.clientHost,
port: _networkConfig.clientPort,
tlsConfig,
maxReadBufferBytes: _networkConfig.maxReadBufferBytes,
idleTimeout: _networkConfig.idleTimeout,
pingInterval: _networkConfig.pingInterval,
pingTimeout: _networkConfig.pingTimeout,
logger: logger.getChild('WebSocketServer'),
}));
grpcServerAgent =
grpcServerAgent ??
new GRPCServer({
logger: logger.getChild(GRPCServer.name + 'Agent'),
});
} catch (e) {
logger.warn(`Failed Creating ${this.name}`);
await rpcServerClient?.destroy();
await webSocketServerClient?.stop(true);
await sessionManager?.stop();
await notificationsManager?.stop();
await vaultManager?.stop();
Expand Down Expand Up @@ -450,12 +512,14 @@ class PolykeyAgent {
vaultManager,
notificationsManager,
sessionManager,
rpcServerClient,
grpcServerAgent,
grpcServerClient,
webSocketServerClient,
events,
fs,
logger,
});
pkAgentProm?.resolveP(pkAgent);
await pkAgent.start({
password,
networkConfig,
Expand Down Expand Up @@ -486,10 +550,11 @@ class PolykeyAgent {
public readonly notificationsManager: NotificationsManager;
public readonly sessionManager: SessionManager;
public readonly grpcServerAgent: GRPCServer;
public readonly grpcServerClient: GRPCServer;
public readonly events: EventBus;
public readonly fs: FileSystem;
public readonly logger: Logger;
public readonly rpcServerClient: RPCServer;
public readonly webSocketServerClient: WebSocketServer;
protected workerManager: PolykeyWorkerManagerInterface | undefined;

constructor({
Expand All @@ -512,8 +577,9 @@ class PolykeyAgent {
vaultManager,
notificationsManager,
sessionManager,
grpcServerClient,
rpcServerClient,
grpcServerAgent,
webSocketServerClient,
events,
fs,
logger,
Expand All @@ -537,8 +603,9 @@ class PolykeyAgent {
vaultManager: VaultManager;
notificationsManager: NotificationsManager;
sessionManager: SessionManager;
grpcServerClient: GRPCServer;
rpcServerClient: RPCServer;
grpcServerAgent: GRPCServer;
webSocketServerClient: WebSocketServer;
events: EventBus;
fs: FileSystem;
logger: Logger;
Expand All @@ -563,7 +630,8 @@ class PolykeyAgent {
this.vaultManager = vaultManager;
this.notificationsManager = notificationsManager;
this.sessionManager = sessionManager;
this.grpcServerClient = grpcServerClient;
this.rpcServerClient = rpcServerClient;
this.webSocketServerClient = webSocketServerClient;
this.grpcServerAgent = grpcServerAgent;
this.events = events;
this.fs = fs;
Expand Down Expand Up @@ -614,7 +682,8 @@ class PolykeyAgent {
keyPrivatePem: keysUtils.privateKeyToPEM(data.keyPair.privateKey),
certChainPem: await this.certManager.getCertPEMsChainPEM(),
};
this.grpcServerClient.setTLSConfig(tlsConfig);
// FIXME: Can we even support updating TLS config anymore?
// this.grpcServerClient.setTLSConfig(tlsConfig);
this.proxy.setTLSConfig(tlsConfig);
this.logger.info(`${KeyRing.name} change propagated`);
},
Expand All @@ -641,7 +710,7 @@ class PolykeyAgent {
}
},
);
const networkConfig_ = {
const _networkConfig = {
...config.defaults.networkConfig,
...utils.filterEmptyObject(networkConfig),
};
Expand All @@ -661,28 +730,6 @@ class PolykeyAgent {
proxy: this.proxy,
logger: this.logger.getChild('GRPCClientAgentService'),
});
const clientService = createClientService({
pkAgent: this,
db: this.db,
certManager: this.certManager,
discovery: this.discovery,
gestaltGraph: this.gestaltGraph,
identitiesManager: this.identitiesManager,
keyRing: this.keyRing,
nodeGraph: this.nodeGraph,
nodeConnectionManager: this.nodeConnectionManager,
nodeManager: this.nodeManager,
notificationsManager: this.notificationsManager,
sessionManager: this.sessionManager,
vaultManager: this.vaultManager,
sigchain: this.sigchain,
acl: this.acl,
grpcServerClient: this.grpcServerClient,
grpcServerAgent: this.grpcServerAgent,
proxy: this.proxy,
fs: this.fs,
logger: this.logger.getChild('GRPCClientClientService'),
});
// Starting modules
await this.keyRing.start({
password,
Expand Down Expand Up @@ -726,25 +773,26 @@ class PolykeyAgent {
certChainPem: await this.certManager.getCertPEMsChainPEM(),
};
// Client server
await this.grpcServerClient.start({
services: [[ClientServiceService, clientService]],
host: networkConfig_.clientHost,
port: networkConfig_.clientPort,
await this.webSocketServerClient.start({
tlsConfig,
host: _networkConfig.clientHost,
port: _networkConfig.clientPort,
connectionCallback: (streamPair) =>
this.rpcServerClient.handleStream(streamPair, {}),
});
// Agent server
await this.grpcServerAgent.start({
services: [[AgentServiceService, agentService]],
host: networkConfig_.agentHost,
port: networkConfig_.agentPort,
host: _networkConfig.agentHost,
port: _networkConfig.agentPort,
});
await this.proxy.start({
forwardHost: networkConfig_.forwardHost,
forwardPort: networkConfig_.forwardPort,
forwardHost: _networkConfig.forwardHost,
forwardPort: _networkConfig.forwardPort,
serverHost: this.grpcServerAgent.getHost(),
serverPort: this.grpcServerAgent.getPort(),
proxyHost: networkConfig_.proxyHost,
proxyPort: networkConfig_.proxyPort,
proxyHost: _networkConfig.proxyHost,
proxyPort: _networkConfig.proxyPort,
tlsConfig,
});
await this.nodeManager.start();
Expand All @@ -768,10 +816,10 @@ class PolykeyAgent {
await this.status.finishStart({
pid: process.pid,
nodeId: this.keyRing.getNodeId(),
clientHost: this.grpcServerClient.getHost(),
clientPort: this.grpcServerClient.getPort(),
clientHost: this.webSocketServerClient.getHost(),
clientPort: this.webSocketServerClient.getPort(),
agentHost: this.grpcServerAgent.getHost(),
agentPort: this.grpcServerClient.getPort(),
agentPort: this.grpcServerAgent.getPort(),
forwardHost: this.proxy.getForwardHost(),
forwardPort: this.proxy.getForwardPort(),
proxyHost: this.proxy.getProxyHost(),
Expand All @@ -793,7 +841,7 @@ class PolykeyAgent {
await this.nodeManager?.stop();
await this.proxy?.stop();
await this.grpcServerAgent?.stop();
await this.grpcServerClient?.stop();
await this.webSocketServerClient.stop(true);
await this.identitiesManager?.stop();
await this.gestaltGraph?.stop();
await this.acl?.stop();
Expand Down Expand Up @@ -829,7 +877,7 @@ class PolykeyAgent {
await this.nodeManager.stop();
await this.proxy.stop();
await this.grpcServerAgent.stop();
await this.grpcServerClient.stop();
await this.webSocketServerClient.stop(true);
await this.identitiesManager.stop();
await this.gestaltGraph.stop();
await this.acl.stop();
Expand Down Expand Up @@ -877,6 +925,7 @@ class PolykeyAgent {
await this.vaultManager.destroy();
await this.discovery.destroy();
await this.nodeGraph.destroy();
await this.rpcServerClient.destroy();
await this.identitiesManager.destroy();
await this.gestaltGraph.destroy();
await this.acl.destroy();
Expand Down
Loading