Skip to content

Commit

Permalink
fix: test fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
tegefaulkes committed Apr 6, 2023
1 parent 3c1d525 commit f5030d6
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 60 deletions.
31 changes: 10 additions & 21 deletions src/rpc/RPCClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import type {
MapCallers,
} from './types';
import type { ContextTimed } from '../contexts/types';
import { TransformStream } from 'stream/web';
import { CreateDestroy, ready } from '@matrixai/async-init/dist/CreateDestroy';
import Logger from '@matrixai/logger';
import { Timer } from '@matrixai/timer';
Expand All @@ -23,6 +22,8 @@ import * as rpcErrors from './errors';
import * as rpcUtils from './utils/utils';
import { never, promise } from '../utils';

const timerCleanupReasonSymbol = Symbol('timerCleanUpReasonSymbol');

// eslint-disable-next-line
interface RPCClient<M extends ClientManifest> extends CreateDestroy {}
@CreateDestroy()
Expand Down Expand Up @@ -276,7 +277,7 @@ class RPCClient<M extends ClientManifest> {
});
const cleanUp = () => {
// Clean up the timer and signal
if (ctx.timer == null) timer.cancel(Error('TMP Clean up reason'));
if (ctx.timer == null) timer.cancel(timerCleanupReasonSymbol);
signal.removeEventListener('abort', abortHandler);
};
// Setting up abort events for timeout
Expand Down Expand Up @@ -352,14 +353,14 @@ class RPCClient<M extends ClientManifest> {
};
}

// FIXME: the CTX timeout here is just for stream creation. We can't/wont do
// keep alive timeout for raw streams.
/**
* Generic caller for raw RPC calls.
* This returns a `ReadableWritablePair` of the raw RPC stream.
* When finished the streams must be ended manually. Failing to do so will
* hold the connection open and result in a resource leak until the
* call times out.
* Raw streams don't support the keep alive timeout. Timeout will only apply\
* to the creation of the stream.
* @param method - Method name of the RPC call
* @param headerParams - Parameters for the header message. The header is a
* single RPC message that is sent to specify the method for the RPC call.
Expand All @@ -376,7 +377,7 @@ class RPCClient<M extends ClientManifest> {
const signal = abortController.signal;
// A promise that will reject if there is an abort signal or timeout
const abortRaceProm = promise<never>();
// Prevent unhandled rejection when we're don with the promise
// Prevent unhandled rejection when we're done with the promise
abortRaceProm.p.catch(() => {});
let abortHandler: () => void;
if (ctx.signal != null) {
Expand All @@ -393,10 +394,9 @@ class RPCClient<M extends ClientManifest> {
new Timer({
delay: this.streamKeepAliveTimeoutTime,
});
// Ignore unhandled rejections
const cleanUp = () => {
// Clean up the timer and signal
if (ctx.timer == null) timer.cancel(Error('TMP Clean up reason'));
if (ctx.timer == null) timer.cancel(timerCleanupReasonSymbol);
signal.removeEventListener('abort', abortHandler);
};
const timeoutError = new rpcErrors.ErrorRPCTimedOut();
Expand All @@ -423,27 +423,16 @@ class RPCClient<M extends ClientManifest> {
};
try {
rpcStream = await Promise.race([setupStream(), abortRaceProm.p]);
} catch (e) {
} finally {
cleanUp();
throw e;
}
// Need to tell when a stream has ended to clean up the timer
const forwardStream = new TransformStream<Uint8Array, Uint8Array>();
const reverseStream = new TransformStream<Uint8Array, Uint8Array>();

void Promise.all([
rpcStream.readable.pipeTo(reverseStream.writable),
forwardStream.readable.pipeTo(rpcStream.writable),
]).finally(() => {
cleanUp();
});
const metadata = {
...(rpcStream.meta ?? {}),
command: method,
};
return {
writable: forwardStream.writable,
readable: reverseStream.readable,
writable: rpcStream.writable,
readable: rpcStream.readable,
cancel: rpcStream.cancel,
meta: metadata,
};
Expand Down
6 changes: 6 additions & 0 deletions src/rpc/RPCServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,12 @@ class RPCServer extends EventTarget {
this.logger.info(`Destroyed ${this.constructor.name}`);
}

/**
* Registers a raw stream handler. This is the basis for all handlers as
* handling the streams is done with raw streams only.
* The raw streams do not automatically refresh the timeout timer when
* messages are sent or received.
*/
protected registerRawStreamHandler(
method: string,
handler: RawHandlerImplementation,
Expand Down
36 changes: 0 additions & 36 deletions tests/rpc/RPCClient.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -785,42 +785,6 @@ describe(`${RPCClient.name}`, () => {
expect(ctx?.signal.aborted).toBeTrue();
expect(ctx?.signal.reason).toBe(rejectReason);
});
test('raw caller uses default timeout awaiting stream', async () => {
const forwardPassThroughStream = new TransformStream<
Uint8Array,
Uint8Array
>();
const reversePassThroughStream = new TransformStream<
Uint8Array,
Uint8Array
>();
const streamPair: RPCStream<Uint8Array, Uint8Array> = {
cancel: () => {},
meta: undefined,
writable: forwardPassThroughStream.writable,
readable: reversePassThroughStream.readable,
};
let ctx: ContextTimed | undefined;
const rpcClient = await RPCClient.createRPCClient({
manifest: {},
streamFactory: async (ctx_) => {
ctx = ctx_;
return streamPair;
},
streamKeepAliveTimeoutTime: 200,
logger,
});

// Timing out on stream.
// Stream creation needs to read the header to complete.
await Promise.all([
rpcClient.rawStreamCaller('testMethod', {}),
forwardPassThroughStream.readable.getReader().read(),
]);
await ctx?.timer;
expect(ctx?.signal.aborted).toBeTrue();
expect(ctx?.signal.reason).toBeInstanceOf(rpcErrors.ErrorRPCTimedOut);
});
test('raw caller times out awaiting stream', async () => {
const forwardPassThroughStream = new TransformStream<
Uint8Array,
Expand Down
14 changes: 11 additions & 3 deletions tests/rpc/utils/utils.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { testProp } from '@fast-check/jest';
import { testProp, fc } from '@fast-check/jest';
import * as rpcUtils from '@/rpc/utils';
import 'ix/add/asynciterable-operators/toarray';
import * as rpcTestUtils from '../utils';
Expand All @@ -12,6 +12,14 @@ describe('utils tests', () => {
},
{ numRuns: 1000 },
);
// TODO:
// - Test for badly structured data
testProp(
'malformed data cases parsing errors',
[fc.json()],
async (message) => {
expect(() =>
rpcUtils.parseJSONRPCMessage(Buffer.from(JSON.stringify(message))),
).toThrow();
},
{ numRuns: 1000 },
);
});

0 comments on commit f5030d6

Please sign in to comment.