Skip to content

Commit

Permalink
wip: created agent handlers and tests
Browse files Browse the repository at this point in the history
[ci skip]
  • Loading branch information
tegefaulkes committed Feb 24, 2023
1 parent d2eea49 commit ecf309c
Show file tree
Hide file tree
Showing 9 changed files with 365 additions and 4 deletions.
4 changes: 2 additions & 2 deletions src/RPC/RPCClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ class RPCClient<M extends ClientManifest> {
await writer.write(parameters);
const output = await reader.read();
if (output.done) {
throw new rpcErrors.ErrorRpcRemoteError('Stream ended before response');
throw new rpcErrors.ErrorRpcMissingResponse();
}
await reader.cancel();
await writer.close();
Expand Down Expand Up @@ -165,7 +165,7 @@ class RPCClient<M extends ClientManifest> {
const reader = callerInterface.readable.getReader();
const output = reader.read().then(({ value, done }) => {
if (done) {
throw new rpcErrors.ErrorRpcRemoteError('Stream ended before response');
throw new rpcErrors.ErrorRpcMissingResponse();
}
return value;
});
Expand Down
6 changes: 6 additions & 0 deletions src/RPC/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ class ErrorRpcMessageLength<T> extends ErrorRpc<T> {
exitCode = sysexits.DATAERR;
}

class ErrorRpcMissingResponse<T> extends ErrorRpc<T> {
static description = 'Stream ended before response';
exitCode = sysexits.UNAVAILABLE;
}

class ErrorRpcRemoteError<T> extends ErrorRpc<T> {
static description = 'RPC Message exceeds maximum size';
exitCode = sysexits.UNAVAILABLE;
Expand All @@ -51,6 +56,7 @@ export {
ErrorRpcParse,
ErrorRpcHandlerFailed,
ErrorRpcMessageLength,
ErrorRpcMissingResponse,
ErrorRpcRemoteError,
ErrorRpcNoMessageError,
ErrorRpcPlaceholderConnectionError,
Expand Down
8 changes: 8 additions & 0 deletions src/RPC/middleware.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import type {
import { TransformStream } from 'stream/web';
import * as rpcErrors from './errors';
import * as rpcUtils from './utils';
import { promise } from '../utils';
const jsonStreamParsers = require('@streamparser/json');

function binaryToJsonMessageStream<T extends JsonRpcMessage>(
Expand All @@ -22,6 +23,13 @@ function binaryToJsonMessageStream<T extends JsonRpcMessage>(
let bytesWritten: number = 0;

return new TransformStream<Uint8Array, T>({
flush: async () => {
// Avoid potential race conditions by allowing parser to end first
const waitP = promise();
parser.onEnd = () => waitP.resolveP();
parser.end();
await waitP.p;
},
start: (controller) => {
if (firstMessage != null) controller.enqueue(firstMessage);
parser.onValue = (value) => {
Expand Down
5 changes: 3 additions & 2 deletions src/clientRPC/ClientServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,8 @@ class ClientServer {
start: (controller) => {
readableController = controller;
context.message = (ws, message, _) => {
readableLogger.debug(`Received ${message.toString()}`);
const messageBuffer = Buffer.from(message);
readableLogger.debug(`Received ${messageBuffer.toString()}`);
if (message.byteLength === 0) {
readableLogger.debug('Null message received');
if (!readableClosed) {
Expand All @@ -302,7 +303,7 @@ class ClientServer {
}
return;
}
controller.enqueue(Buffer.from(message));
controller.enqueue(messageBuffer);
if (controller.desiredSize != null && controller.desiredSize < 0) {
readableLogger.error('Read stream buffer full');
if (!wsClosed) ws.end(4001, 'Read stream buffer full');
Expand Down
30 changes: 30 additions & 0 deletions src/clientRPC/handlers/agentLockAll.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import type Logger from '@matrixai/logger';
import type { RPCRequestParams, RPCResponseResult } from '../types';
import type { DB } from '@matrixai/db';
import type SessionManager from '../../sessions/SessionManager';
import { UnaryHandler } from '../../RPC/handlers';
import { UnaryCaller } from '../../RPC/callers';

const agentLockAllCaller = new UnaryCaller<
RPCRequestParams,
RPCResponseResult
>();

class AgentLockAllHandler extends UnaryHandler<
{
sessionManager: SessionManager;
db: DB;
logger: Logger;
},
RPCRequestParams,
RPCResponseResult
> {
public async handle(): Promise<RPCResponseResult> {
await this.container.db.withTransactionF((tran) =>
this.container.sessionManager.resetKey(tran),
);
return {};
}
}

export { agentLockAllCaller, AgentLockAllHandler };
32 changes: 32 additions & 0 deletions src/clientRPC/handlers/agentStop.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import type Logger from '@matrixai/logger';
import type { RPCRequestParams, RPCResponseResult } from '../types';
import type PolykeyAgent from '../../PolykeyAgent';
import { running, status } from '@matrixai/async-init';
import { UnaryHandler } from '../../RPC/handlers';
import { UnaryCaller } from '../../RPC/callers';

const agentStopCaller = new UnaryCaller<RPCRequestParams, RPCResponseResult>();

class AgentStopHandler extends UnaryHandler<
{
pkAgent: PolykeyAgent;
logger: Logger;
},
RPCRequestParams,
RPCResponseResult
> {
public async handle(): Promise<RPCResponseResult> {
const pkAgent = this.container.pkAgent;
// If not running or in stopping status, then respond successfully
if (!pkAgent[running] || pkAgent[status] === 'stopping') {
return {};
}
// Stop PK agent in the background, allow the RPC time to respond
setTimeout(async () => {
await pkAgent.stop();
}, 500);
return {};
}
}

export { agentStopCaller, AgentStopHandler };
37 changes: 37 additions & 0 deletions src/clientRPC/handlers/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import type Logger from '@matrixai/logger';
import type SessionManager from '../../sessions/SessionManager';
import type KeyRing from '../../keys/KeyRing';
import type CertManager from '../../keys/CertManager';
import type PolykeyAgent from '../../PolykeyAgent';
import type { DB } from '@matrixai/db';
import { agentStatusCaller, AgentStatusHandler } from './agentStatus';
import { agentStopCaller, AgentStopHandler } from './agentStop';
import { agentUnlockCaller, AgentUnlockHandler } from './agentUnlock';
import { agentLockAllCaller, AgentLockAllHandler } from './agentLockAll';

const serverManifest = (container: {
pkAgent: PolykeyAgent;
keyRing: KeyRing;
certManager: CertManager;
db: DB;
sessionManager: SessionManager;
logger: Logger;
}) => {
// No type used here, it will override type inference
return {
agentLockAll: new AgentLockAllHandler(container),
agentStatus: new AgentStatusHandler(container),
agentStop: new AgentStopHandler(container),
agentUnlock: new AgentUnlockHandler(container),
};
};

// No type used here, it will override type inference
const clientManifest = {
agentLockAll: agentLockAllCaller,
agentStatus: agentStatusCaller,
agentStop: agentStopCaller,
agentUnlock: agentUnlockCaller,
};

export { serverManifest, clientManifest };
113 changes: 113 additions & 0 deletions tests/clientRPC/handlers/agentLockAll.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
import type { TLSConfig } from '@/network/types';
import fs from 'fs';
import path from 'path';
import os from 'os';
import Logger, { formatting, LogLevel, StreamHandler } from '@matrixai/logger';
import { DB } from '@matrixai/db';
import KeyRing from '@/keys/KeyRing';
import * as keysUtils from '@/keys/utils';
import RPCServer from '@/RPC/RPCServer';
import TaskManager from '@/tasks/TaskManager';
import {
agentLockAllCaller,
AgentLockAllHandler,
} from '@/clientRPC/handlers/agentLockAll';
import RPCClient from '@/RPC/RPCClient';
import { SessionManager } from '@/sessions';
import ClientServer from '@/clientRPC/ClientServer';
import ClientClient from '@/clientRPC/ClientClient';
import * as testsUtils from '../../utils';

describe('agentLockAll', () => {
const logger = new Logger('agentUnlock test', LogLevel.WARN, [
new StreamHandler(
formatting.format`${formatting.level}:${formatting.keys}:${formatting.msg}`,
),
]);
const password = 'helloWorld';
const host = '127.0.0.1';
let dataDir: string;
let db: DB;
let keyRing: KeyRing;
let taskManager: TaskManager;
let sessionManager: SessionManager;
let clientClient: ClientClient;
let clientServer: ClientServer;
let tlsConfig: TLSConfig;

beforeEach(async () => {
dataDir = await fs.promises.mkdtemp(
path.join(os.tmpdir(), 'polykey-test-'),
);
const keysPath = path.join(dataDir, 'keys');
const dbPath = path.join(dataDir, 'db');
db = await DB.createDB({
dbPath,
logger,
});
keyRing = await KeyRing.createKeyRing({
password,
keysPath,
logger,
passwordOpsLimit: keysUtils.passwordOpsLimits.min,
passwordMemLimit: keysUtils.passwordMemLimits.min,
strictMemoryLock: false,
});
taskManager = await TaskManager.createTaskManager({ db, logger });
sessionManager = await SessionManager.createSessionManager({
db,
keyRing,
logger,
});
tlsConfig = await testsUtils.createTLSConfig(keyRing.keyPair);
});
afterEach(async () => {
await clientServer.stop(true);
await clientClient.destroy(true);
await taskManager.stop();
await keyRing.stop();
await db.stop();
await fs.promises.rm(dataDir, {
force: true,
recursive: true,
});
});
test('Locks all current sessions', async () => {
// Setup
const rpcServer = await RPCServer.createRPCServer({
manifest: {
agentLockAll: new AgentLockAllHandler({
db,
sessionManager,
logger,
}),
},
logger,
});
clientServer = await ClientServer.createClientServer({
connectionCallback: (streamPair, connectionInfo) =>
rpcServer.handleStream(streamPair, connectionInfo),
host,
tlsConfig,
logger: logger.getChild('server'),
});
clientClient = await ClientClient.createClientClient({
expectedNodeIds: [keyRing.getNodeId()],
host,
logger: logger.getChild('client'),
port: clientServer.port,
});
const rpcClient = await RPCClient.createRPCClient({
manifest: {
agentLockAll: agentLockAllCaller,
},
streamPairCreateCallback: async () => clientClient.startConnection(),
logger: logger.getChild('clientRPC'),
});

// Doing the test
const token = await sessionManager.createToken();
await rpcClient.methods.agentLockAll({});
expect(await sessionManager.verifyToken(token)).toBeFalsy();
});
});
Loading

0 comments on commit ecf309c

Please sign in to comment.