diff --git a/src/RPCServer.ts b/src/RPCServer.ts index 1b183d7..2143045 100644 --- a/src/RPCServer.ts +++ b/src/RPCServer.ts @@ -431,9 +431,8 @@ class RPCServer { yield await handler(inputVal, cancel, meta, ctx); break; } - for await (const _ of input) { - // Noop so that stream can close after flushing - } + // Noop so that stream can close after flushing + for await (const _ of input); }; this.registerDuplexStreamHandler(method, wrapperDuplex, timeout); } @@ -498,7 +497,7 @@ class RPCServer { const prom = (async () => { const id = await this.idGen(); - const headTransformStream = middleware.binaryToJsonMessageStream( + const transformStream = middleware.binaryToJsonHeaderMessageStream( utils.parseJSONRPCRequest, ); // Transparent transform used as a point to cancel the input stream from @@ -506,43 +505,39 @@ class RPCServer { Uint8Array, Uint8Array >(); - const inputStream = passthroughTransform.readable; const inputStreamEndProm = rpcStream.readable .pipeTo(passthroughTransform.writable) // Ignore any errors here, we only care that it ended .catch(() => {}); - void inputStream - // Allow us to re-use the readable after reading the first message - .pipeTo(headTransformStream.writable, { - preventClose: true, - preventCancel: true, - }) - // Ignore any errors here, we only care that it ended - .catch(() => {}); const cleanUp = async (reason: any) => { - await inputStream.cancel(reason); + // Release resources + await transformStream.readable.cancel(reason); + await transformStream.writable.abort(reason); + await passthroughTransform.readable.cancel(reason); await rpcStream.writable.abort(reason); await inputStreamEndProm; + // Stop the timer timer.cancel(cleanupReason); await timer.catch(() => {}); }; - // Read a single empty value to consume the first message - const reader = headTransformStream.readable.getReader(); + passthroughTransform.readable + .pipeTo(transformStream.writable) + .catch(() => {}); + const reader = transformStream.readable.getReader(); // Allows timing out when waiting for the first message let headerMessage: - | ReadableStreamDefaultReadResult - | undefined - | void; + | ReadableStreamDefaultReadResult + | undefined; try { headerMessage = await Promise.race([ reader.read(), timer.then( () => undefined, - () => {}, + () => undefined, ), ]); } catch (e) { - const newErr = new errors.ErrorRPCHandlerFailed( + const err = new errors.ErrorRPCHandlerFailed( 'Stream failed waiting for header', { cause: e }, ); @@ -553,29 +548,29 @@ class RPCServer { new events.RPCErrorEvent({ detail: new errors.ErrorRPCOutputStreamError( 'Stream failed waiting for header', - { cause: newErr }, + { cause: err }, ), }), ); return; } // Downgrade back to the raw stream - await reader.cancel(); + reader.releaseLock(); // There are 2 conditions where we just end here // 1. The timeout timer resolves before the first message // 2. the stream ends before the first message if (headerMessage == null) { - const newErr = new errors.ErrorRPCTimedOut( + const err = new errors.ErrorRPCTimedOut( 'Timed out waiting for header', { cause: new errors.ErrorRPCStreamEnded() }, ); - await cleanUp(newErr); + await cleanUp(err); this.dispatchEvent( new events.RPCErrorEvent({ detail: new errors.ErrorRPCTimedOut( 'Timed out waiting for header', { - cause: newErr, + cause: err, }, ), }), @@ -583,8 +578,8 @@ class RPCServer { return; } if (headerMessage.done) { - const newErr = new errors.ErrorMissingHeader('Missing header'); - await cleanUp(newErr); + const err = new errors.ErrorMissingHeader('Missing header'); + await cleanUp(err); this.dispatchEvent( new events.RPCErrorEvent({ detail: new errors.ErrorRPCOutputStreamError(), @@ -592,10 +587,22 @@ class RPCServer { ); return; } + if (headerMessage.value instanceof Uint8Array) { + const err = new errors.ErrorRPCParse('Invalid message type'); + await cleanUp(err); + this.dispatchEvent( + new events.RPCErrorEvent({ + detail: new errors.ErrorRPCParse(), + }), + ); + return; + } const method = headerMessage.value.method; const handler = this.handlerMap.get(method); if (handler == null) { - await cleanUp(new errors.ErrorRPCHandlerFailed('Missing handler')); + await cleanUp( + new errors.ErrorRPCHandlerFailed(`Missing handler for ${method}`), + ); return; } if (abortController.signal.aborted) { @@ -617,13 +624,17 @@ class RPCServer { timer.refresh(); } } - this.logger.info(`Handling stream with method (${method})`); let handlerResult: [JSONObject | undefined, ReadableStream]; const headerWriter = rpcStream.writable.getWriter(); try { + // The as keyword is used here as the middleware will only return the + // first message as a JSONMessage, and others as raw Uint8Arrays. handlerResult = await handler( - [headerMessage.value, inputStream], + [ + headerMessage.value, + transformStream.readable as ReadableStream, + ], rpcStream.cancel, rpcStream.meta, { signal: abortController.signal, timer }, diff --git a/src/middleware.ts b/src/middleware.ts index 19ca045..e415bc4 100644 --- a/src/middleware.ts +++ b/src/middleware.ts @@ -61,6 +61,81 @@ function binaryToJsonMessageStream( }); } +/** + * This function is a factory to create a TransformStream that will + * transform a `Uint8Array` stream to a stream containing the JSON header + * message and the rest of the data in `Uint8Array` format. + * The header message will be validated with the provided messageParser, this + * also infers the type of the stream output. + * @param messageParser - Validates the JSONRPC messages, so you can select for a + * specific type of message + * @param bufferByteLimit - sets the number of bytes buffered before throwing an + * error. This is used to avoid infinitely buffering the input. + */ +function binaryToJsonHeaderMessageStream( + messageParser: (message: unknown) => T, + bufferByteLimit: number = 1024 * 1024, +): TransformStream { + const parser = new JSONParser({ + separator: '', + paths: ['$'], + }); + let bytesWritten: number = 0; + let accumulator = Buffer.alloc(0); + let rawStream = false; + let parserEnded = false; + + const cleanUp = async () => { + // Avoid potential race conditions by allowing parser to end first + const waitP = utils.promise(); + parser.onEnd = () => waitP.resolveP(); + parser.end(); + await waitP.p; + }; + + return new TransformStream({ + flush: async () => { + if (!parserEnded) await cleanUp(); + }, + start: (controller) => { + parser.onValue = async (value) => { + // Enqueue the regular JSON message + const jsonMessage = messageParser(value.value); + controller.enqueue(jsonMessage); + // Remove the header message from the accumulated data + const headerLength = Buffer.from( + JSON.stringify(jsonMessage), + ).byteLength; + accumulator = accumulator.subarray(headerLength); + if (accumulator.length > 0) controller.enqueue(accumulator); + // Set system state + bytesWritten = 0; + rawStream = true; + await cleanUp(); + parserEnded = true; + }; + }, + transform: (chunk, controller) => { + try { + bytesWritten += chunk.byteLength; + if (rawStream) { + // Send raw binary data directly + controller.enqueue(chunk); + } else { + // Prepare the data to be parsed to JSON + accumulator = Buffer.concat([accumulator, chunk]); + parser.write(chunk); + } + } catch (e) { + throw new rpcErrors.ErrorRPCParse(undefined, { cause: e }); + } + if (bytesWritten > bufferByteLimit) { + throw new rpcErrors.ErrorRPCMessageLength(); + } + }, + }); +} + /** * This function is a factory for a TransformStream that will transform * JsonRPCMessages into the `Uint8Array` form. This is used for the stream @@ -270,6 +345,7 @@ const defaultClientMiddlewareWrapper = ( export { binaryToJsonMessageStream, + binaryToJsonHeaderMessageStream, jsonMessageToBinaryStream, timeoutMiddlewareClient, timeoutMiddlewareServer, diff --git a/tests/RPC.test.ts b/tests/RPC.test.ts index 478b9bc..bef466f 100644 --- a/tests/RPC.test.ts +++ b/tests/RPC.test.ts @@ -31,100 +31,89 @@ import ClientHandler from '@/handlers/ClientHandler'; import { filterSensitive } from '@/utils'; import * as rpcTestUtils from './utils'; -describe('RPC', () => { +describe('RPC tests', () => { const logger = new Logger(`RPC Test`, LogLevel.WARN, [new StreamHandler()]); const idGen: IdGen = () => Promise.resolve(null); - test.prop( - { - values: rpcTestUtils.rawDataArb, - }, - {}, - )('RPC communication with raw stream', async ({ values }) => { - const [outputResult, outputWriterStream] = - rpcTestUtils.streamToArray(); - const { clientPair, serverPair } = rpcTestUtils.createTapPairs< - Uint8Array, - Uint8Array - >(); - - let header: JSONRPCRequest | undefined = undefined; - - class TestMethod extends RawHandler { - public handle = async ( - input: [JSONRPCRequest, ReadableStream], - _cancel: (reason?: any) => void, - _meta: Record | undefined, - ): Promise<[JSONObject, ReadableStream]> => { - return new Promise((resolve) => { - const [header_, stream] = input; - header = header_; - resolve([{ value: 'some leading data' }, stream]); - }); - }; - } - const rpcServer = new RPCServer({ - logger, - idGen, - }); - await rpcServer.start({ - manifest: { - testMethod: new TestMethod({}), - }, - }); - rpcServer.handleStream({ - ...serverPair, - cancel: () => {}, - }); - - const rpcClient = new RPCClient({ - manifest: { - testMethod: new RawCaller(), - }, - streamFactory: async () => { - return { - ...clientPair, - cancel: () => {}, + test.prop({ values: rpcTestUtils.rawDataArb }, {})( + 'the RPC should communicate using raw streams', + async ({ values }) => { + const [outputResult, outputWriterStream] = + rpcTestUtils.streamToArray(); + const { clientPair, serverPair } = rpcTestUtils.createTapPairs< + Uint8Array, + Uint8Array + >(); + let header: JSONRPCRequest | undefined = undefined; + class TestMethod extends RawHandler { + public handle = async ( + input: [JSONRPCRequest, ReadableStream], + _cancel: (reason?: any) => void, + _meta: Record | undefined, + ): Promise<[JSONObject, ReadableStream]> => { + return new Promise((resolve) => { + const [header_, stream] = input; + header = header_; + resolve([{ value: 'some leading data' }, stream]); + }); }; - }, - logger, - idGen, - }); - - const callerInterface = await rpcClient.methods.testMethod({ - hello: 'world', - }); - const writer = callerInterface.writable.getWriter(); - const pipeProm = callerInterface.readable.pipeTo(outputWriterStream); - for (const value of values) { - await writer.write(value); - } - await writer.close(); - const expectedHeader: JSONRPCRequest = { - jsonrpc: '2.0', - method: 'testMethod', - params: { hello: 'world' }, - id: null, - }; - expect(header).toStrictEqual(expectedHeader); - expect(callerInterface.meta?.result).toStrictEqual({ - value: 'some leading data', - }); - expect(await outputResult).toStrictEqual(values); - await pipeProm; - await rpcServer.stop({ force: true }); - }); - test('RPC communication with raw stream times out waiting for leading message', async () => { + } + const rpcServer = new RPCServer({ logger, idGen }); + await rpcServer.start({ + manifest: { + testMethod: new TestMethod({}), + }, + }); + rpcServer.handleStream({ + ...serverPair, + cancel: () => {}, + }); + const rpcClient = new RPCClient({ + manifest: { + testMethod: new RawCaller(), + }, + streamFactory: async () => { + return { + ...clientPair, + cancel: () => {}, + }; + }, + logger, + idGen, + }); + const callerInterface = await rpcClient.methods.testMethod({ + hello: 'world', + }); + const writer = callerInterface.writable.getWriter(); + const pipeProm = callerInterface.readable.pipeTo(outputWriterStream); + for (const value of values) { + await writer.write(value); + } + await writer.close(); + const expectedHeader: JSONRPCRequest = { + jsonrpc: '2.0', + method: 'testMethod', + params: { hello: 'world' }, + id: null, + }; + expect(header).toStrictEqual(expectedHeader); + expect(callerInterface.meta?.result).toStrictEqual({ + value: 'some leading data', + }); + expect(await outputResult).toStrictEqual(values); + await pipeProm; + await rpcServer.stop({ force: true }); + }, + ); + test('the RPC should time out waiting for leading message using raw streams', async () => { const { clientPair, serverPair } = rpcTestUtils.createTapPairs< Uint8Array, Uint8Array >(); void (async () => { - for await (const _ of serverPair.readable) { - // Just consume - } + // Consume values + for await (const _ of serverPair.readable); })(); - const rpcClient = new RPCClient({ manifest: { testMethod: new RawCaller(), @@ -138,22 +127,15 @@ describe('RPC', () => { logger, idGen, }); - await expect( - rpcClient.methods.testMethod( - { - hello: 'world', - }, - { timer: 100 }, - ), + rpcClient.methods.testMethod({ hello: 'world' }, { timer: 100 }), ).rejects.toThrow(rpcErrors.ErrorRPCTimedOut); }); - test('RPC communication with raw stream, raw handler throws', async () => { + test('the RPC should properly catch an error thrown by the handler using raw streams', async () => { const { clientPair, serverPair } = rpcTestUtils.createTapPairs< Uint8Array, Uint8Array >(); - class TestMethod extends RawHandler { public handle = async ( _input: [JSONRPCRequest, ReadableStream], @@ -164,11 +146,7 @@ describe('RPC', () => { throw new Error('some error'); }; } - - const rpcServer = new RPCServer({ - logger, - idGen, - }); + const rpcServer = new RPCServer({ logger, idGen }); await rpcServer.start({ manifest: { testMethod: new TestMethod({}), @@ -178,7 +156,6 @@ describe('RPC', () => { ...serverPair, cancel: () => {}, }); - const rpcClient = new RPCClient({ manifest: { testMethod: new RawCaller(), @@ -198,15 +175,12 @@ describe('RPC', () => { await expect(callP).rejects.toThrow(rpcErrors.ErrorRPCRemote); const result = await callP.catch((e) => e); expect(result.cause.message).toBe('some error'); - await rpcServer.stop({ force: true }); }); test.prop( - { - values: fc.array(rpcTestUtils.safeJsonObjectArb, { minLength: 1 }), - }, + { values: fc.array(rpcTestUtils.safeJsonObjectArb, { minLength: 1 }) }, {}, - )('RPC communication with duplex stream', async ({ values }) => { + )('the RPC should communicate using duplex streams', async ({ values }) => { const { clientPair, serverPair } = rpcTestUtils.createTapPairs< Uint8Array, Uint8Array @@ -221,10 +195,7 @@ describe('RPC', () => { yield* input; }; } - const rpcServer = new RPCServer({ - logger, - idGen, - }); + const rpcServer = new RPCServer({ logger, idGen }); await rpcServer.start({ manifest: { testMethod: new TestMethod({}), @@ -234,7 +205,6 @@ describe('RPC', () => { ...serverPair, cancel: () => {}, }); - const rpcClient = new RPCClient({ manifest: { testMethod: new DuplexCaller(), @@ -248,7 +218,6 @@ describe('RPC', () => { logger, idGen, }); - const callerInterface = await rpcClient.methods.testMethod(); const writer = callerInterface.writable.getWriter(); const reader = callerInterface.readable.getReader(); @@ -277,7 +246,7 @@ describe('RPC', () => { }, { numRuns: 1 }, )( - 'RPC communication with duplex stream responds after timeout', + 'the RPC should respond after timeout while using duplex stream', async ({ values }) => { const { clientPair, serverPair } = rpcTestUtils.createTapPairs< Uint8Array, @@ -292,22 +261,14 @@ describe('RPC', () => { ): AsyncGenerator { const { p, resolveP } = utils.promise(); if (ctx.signal.aborted) resolveP(); - ctx.signal.addEventListener( - 'abort', - () => { - resolveP(); - }, - { once: true }, - ); + ctx.signal.addEventListener('abort', () => resolveP(), { + once: true, + }); await p; yield* input; }; } - const rpcServer = new RPCServer({ - timeoutTime: 500, - logger, - idGen, - }); + const rpcServer = new RPCServer({ logger, idGen, timeoutTime: 500 }); await rpcServer.start({ manifest: { testMethod: new TestMethod({}), @@ -317,7 +278,6 @@ describe('RPC', () => { ...serverPair, cancel: () => {}, }); - let aborted = false; const rpcClient = new RPCClient({ manifest: { @@ -336,7 +296,6 @@ describe('RPC', () => { logger, idGen, }); - const callerInterface = await rpcClient.methods.testMethod(); const writer = callerInterface.writable.getWriter(); const reader = callerInterface.readable.getReader(); @@ -361,12 +320,11 @@ describe('RPC', () => { ); test.prop({ value: fc.integer({ min: 1, max: 100 }), - })('RPC communication with server stream', async ({ value }) => { + })('the RPC should communicate using server streams', async ({ value }) => { const { clientPair, serverPair } = rpcTestUtils.createTapPairs< Uint8Array, Uint8Array >(); - class TestMethod extends ServerHandler< ContainerType, { value: number }, @@ -380,11 +338,7 @@ describe('RPC', () => { } }; } - - const rpcServer = new RPCServer({ - logger, - idGen, - }); + const rpcServer = new RPCServer({ logger, idGen }); await rpcServer.start({ manifest: { testMethod: new TestMethod({}), @@ -394,7 +348,6 @@ describe('RPC', () => { ...serverPair, cancel: () => {}, }); - const rpcClient = new RPCClient({ manifest: { testMethod: new ServerCaller<{ value: number }, { value: number }>(), @@ -408,9 +361,7 @@ describe('RPC', () => { logger, idGen, }); - const callerInterface = await rpcClient.methods.testMethod({ value }); - const outputs: Array = []; for await (const num of callerInterface) { outputs.push(num.value); @@ -420,12 +371,11 @@ describe('RPC', () => { }); test.prop({ values: fc.array(fc.integer(), { minLength: 1 }).noShrink(), - })('RPC communication with client stream', async ({ values }) => { + })('the RPC should communicate using client streams', async ({ values }) => { const { clientPair, serverPair } = rpcTestUtils.createTapPairs< Uint8Array, Uint8Array >(); - class TestMethod extends ClientHandler< ContainerType, { value: number }, @@ -441,11 +391,7 @@ describe('RPC', () => { return { value: acc }; }; } - - const rpcServer = new RPCServer({ - logger, - idGen, - }); + const rpcServer = new RPCServer({ logger, idGen }); await rpcServer.start({ manifest: { testMethod: new TestMethod({}), @@ -455,7 +401,6 @@ describe('RPC', () => { ...serverPair, cancel: () => {}, }); - const rpcClient = new RPCClient({ manifest: { testMethod: new ClientCaller<{ value: number }, { value: number }>(), @@ -469,7 +414,6 @@ describe('RPC', () => { logger, idGen, }); - const { output, writable } = await rpcClient.methods.testMethod(); const writer = writable.getWriter(); for (const value of values) { @@ -482,12 +426,11 @@ describe('RPC', () => { }); test.prop({ value: rpcTestUtils.safeJsonObjectArb, - })('RPC communication with unary call', async ({ value }) => { + })('the RPC should communicate using unary calls', async ({ value }) => { const { clientPair, serverPair } = rpcTestUtils.createTapPairs< Uint8Array, Uint8Array >(); - class TestMethod extends UnaryHandler { public handle = async ( input: JSONRPCRequestParams, @@ -495,10 +438,7 @@ describe('RPC', () => { return input; }; } - const rpcServer = new RPCServer({ - logger, - idGen, - }); + const rpcServer = new RPCServer({ logger, idGen }); await rpcServer.start({ manifest: { testMethod: new TestMethod({}), @@ -508,7 +448,6 @@ describe('RPC', () => { ...serverPair, cancel: () => {}, }); - const rpcClient = new RPCClient({ manifest: { testMethod: new UnaryCaller(), @@ -522,7 +461,6 @@ describe('RPC', () => { logger, idGen, }); - const result = await rpcClient.methods.testMethod(value); if (result.metadata != null && result.metadata.timeout === null) { result.metadata.timeout = Infinity; @@ -530,19 +468,13 @@ describe('RPC', () => { expect(result).toEqual(value); await rpcServer.stop({ force: true }); }); - test.prop( - { - value: rpcTestUtils.safeJsonObjectArb, - }, - { numRuns: 1 }, - )( - 'RPC communication with unary call responds after timeout', + test.prop({ value: rpcTestUtils.safeJsonObjectArb }, { numRuns: 1 })( + 'the RPC should respond after time out while using unary calls', async ({ value }) => { const { clientPair, serverPair } = rpcTestUtils.createTapPairs< Uint8Array, Uint8Array >(); - class TestMethod extends UnaryHandler { public handle = async ( input: JSONRPCRequestParams, @@ -563,11 +495,7 @@ describe('RPC', () => { return input; }; } - const rpcServer = new RPCServer({ - timeoutTime: 500, - logger, - idGen, - }); + const rpcServer = new RPCServer({ logger, idGen, timeoutTime: 500 }); await rpcServer.start({ manifest: { testMethod: new TestMethod({}), @@ -577,7 +505,6 @@ describe('RPC', () => { ...serverPair, cancel: () => {}, }); - let aborted = false; const rpcClient = new RPCClient({ manifest: { @@ -596,7 +523,6 @@ describe('RPC', () => { logger, idGen, }); - const result = await rpcClient.methods.testMethod(value); if (result.metadata != null && result.metadata.timeout === null) { result.metadata.timeout = Infinity; @@ -609,12 +535,11 @@ describe('RPC', () => { test.prop({ value: rpcTestUtils.safeJsonValueArb, error: rpcTestUtils.errorArb(rpcTestUtils.errorArb()), - })('RPC handles and sends errors', async ({ value, error }) => { + })('the RPC can handle and send errors', async ({ value, error }) => { const { clientPair, serverPair } = rpcTestUtils.createTapPairs< Uint8Array, Uint8Array >(); - class TestMethod extends UnaryHandler { public handle = async ( _input: JSONObject, @@ -625,18 +550,13 @@ describe('RPC', () => { throw error; }; } - - const rpcServer = new RPCServer({ - logger, - idGen, - }); + const rpcServer = new RPCServer({ logger, idGen }); await rpcServer.start({ manifest: { testMethod: new TestMethod({}), }, }); rpcServer.handleStream({ ...serverPair, cancel: () => {} }); - const rpcClient = new RPCClient({ manifest: { testMethod: new UnaryCaller(), @@ -647,21 +567,15 @@ describe('RPC', () => { logger, idGen, }); - - // Create a new promise, so we can await it multiple times for assertions + // Create a new promise, so we can await it multiple times for assertions, + // which should be rejected. const callProm = rpcClient.methods.testMethod({ value }); - - // The promise should be rejected const rejection = await callProm.catch((e) => e); - - // The error should have specific properties expect(rejection.cause).toBeInstanceOf(error.constructor); expect(rejection.cause).toEqual(error); - - // Cleanup await rpcServer.stop({ force: true }); }); - test('middleware can end stream early', async () => { + test('the middleware can end the stream early', async () => { const { clientPair, serverPair } = rpcTestUtils.createTapPairs< Uint8Array, Uint8Array @@ -733,87 +647,72 @@ describe('RPC', () => { await expect(reader.closed).toReject(); await expect(rpcServer.stop({ force: false })).toResolve(); }); - test.prop( - { - inputData: rpcTestUtils.safeJsonValueArb, - }, - { numRuns: 1 }, - )('RPC server times out before client', async ({ inputData }) => { - const serverTimedOutProm = utils.promise(); - - // Setup server and client communication pairs - const { clientPair, serverPair } = rpcTestUtils.createTapPairs< - Uint8Array, - Uint8Array - >(); - - // Define the server's method behavior - class TestMethod extends DuplexHandler { - public handle = async function* ( - _input: AsyncIterableIterator, - _cancel: (reason?: any) => void, - _meta: Record | undefined, - ctx: ContextTimed, - ) { - const abortProm = utils.promise(); - if (ctx.signal.aborted) { - abortProm.rejectP(ctx.signal.reason); - } else { - ctx.signal.addEventListener('abort', () => { + test.prop({ inputData: rpcTestUtils.safeJsonValueArb }, { numRuns: 1 })( + 'the RPC server can time out before client', + async ({ inputData }) => { + const serverTimedOutProm = utils.promise(); + // Setup server and client communication pairs + const { clientPair, serverPair } = rpcTestUtils.createTapPairs< + Uint8Array, + Uint8Array + >(); + // Define the server's method behavior + class TestMethod extends DuplexHandler { + public handle = async function* ( + _input: AsyncIterableIterator, + _cancel: (reason?: any) => void, + _meta: Record | undefined, + ctx: ContextTimed, + ) { + const abortProm = utils.promise(); + if (ctx.signal.aborted) { abortProm.rejectP(ctx.signal.reason); + } else { + ctx.signal.addEventListener('abort', () => { + abortProm.rejectP(ctx.signal.reason); + }); + } + await abortProm.p.catch((e) => { + serverTimedOutProm.resolveP(); + throw e; }); - } - await abortProm.p.catch((e) => { - serverTimedOutProm.resolveP(); - throw e; - }); - }; - } - - // Create an instance of the RPC server with a shorter timeout - const rpcServer = new RPCServer({ - logger, - idGen, - timeoutTime: 1, - }); - await rpcServer.start({ manifest: { testMethod: new TestMethod({}) } }); - rpcServer.handleStream({ ...serverPair, cancel: () => {} }); - - // Create an instance of the RPC client with a longer timeout - const rpcClient = new RPCClient({ - manifest: { testMethod: new DuplexCaller() }, - streamFactory: async () => ({ ...clientPair, cancel: () => {} }), - logger, - idGen, - }); - - // Get server and client interfaces - const callerInterface = await rpcClient.methods.testMethod({ - timer: 10, - }); - const writer = callerInterface.writable.getWriter(); - const reader = callerInterface.readable.getReader(); - - // We expect server to timeout before the client - await expect(writer.write({ value: inputData })).toResolve(); - await serverTimedOutProm.p; - const readP = reader.read(); - await expect(readP).rejects.toThrow(ErrorRPCRemote); - await expect( - readP.catch((e) => { - throw e.cause; - }), - ).rejects.toThrow(rpcErrors.ErrorRPCTimedOut); - - // Cleanup - await rpcServer.stop({ force: true }); - }); + }; + } + // Create an instance of the RPC server with a shorter timeout + const rpcServer = new RPCServer({ logger, idGen, timeoutTime: 1 }); + await rpcServer.start({ manifest: { testMethod: new TestMethod({}) } }); + rpcServer.handleStream({ ...serverPair, cancel: () => {} }); + // Create an instance of the RPC client with a longer timeout + const rpcClient = new RPCClient({ + manifest: { testMethod: new DuplexCaller() }, + streamFactory: async () => ({ ...clientPair, cancel: () => {} }), + logger, + idGen, + }); + // Get server and client interfaces + const callerInterface = await rpcClient.methods.testMethod({ timer: 10 }); + const writer = callerInterface.writable.getWriter(); + const reader = callerInterface.readable.getReader(); + // We expect server to timeout before the client + await expect(writer.write({ value: inputData })).toResolve(); + await serverTimedOutProm.p; + const readP = reader.read(); + await expect(readP).rejects.toThrow(ErrorRPCRemote); + await expect( + readP.catch((e) => { + throw e.cause; + }), + ).rejects.toThrow(rpcErrors.ErrorRPCTimedOut); + // Cleanup + await rpcServer.stop({ force: true }); + }, + ); test.prop( { value: rpcTestUtils.safeJsonValueArb, }, { numRuns: 1 }, - )('RPC client times out before server', async ({ value }) => { + )('the RPC client can time out before the server', async ({ value }) => { // Setup server and client communication pairs const { clientPair, serverPair } = rpcTestUtils.createTapPairs< Uint8Array, @@ -835,11 +734,7 @@ describe('RPC', () => { }; } // Set up a client and server with matching timeout settings - const rpcServer = new RPCServer({ - logger, - idGen, - timeoutTime: 400, - }); + const rpcServer = new RPCServer({ logger, idGen, timeoutTime: 400 }); await rpcServer.start({ manifest: { testMethod: new TestMethod({}), @@ -849,7 +744,6 @@ describe('RPC', () => { ...serverPair, cancel: () => {}, }); - const rpcClient = new RPCClient({ manifest: { testMethod: new DuplexCaller(), @@ -863,102 +757,78 @@ describe('RPC', () => { logger, idGen, }); - const callerInterface = await rpcClient.methods.testMethod({ - timer: 300, - }); + const callerInterface = await rpcClient.methods.testMethod({ timer: 300 }); const writer = callerInterface.writable.getWriter(); const reader = callerInterface.readable.getReader(); // Expect the client to time out first await expect(writer.write({ value })).toResolve(); await expect(reader.read()).toReject(); - await rpcServer.stop({ force: true }); }); - test.prop( - { - inputData: rpcTestUtils.safeJsonValueArb, + test.prop({ inputData: rpcTestUtils.safeJsonValueArb }, { numRuns: 1 })( + 'the RPC client and server should work with an infinite timeout', + async ({ inputData }) => { + // Set up a client and server with infinite timeout settings + const { clientPair, serverPair } = rpcTestUtils.createTapPairs< + Uint8Array, + Uint8Array + >(); + class TestMethod extends DuplexHandler { + public handle = async function* ( + _input: AsyncIterableIterator, + _cancel: (reason?: any) => void, + _meta: Record | undefined, + ctx: ContextTimed, + ) { + ctx.signal.throwIfAborted(); + const abortProm = utils.promise(); + ctx.signal.addEventListener('abort', () => { + abortProm.rejectP(ctx.signal.reason); + }); + await abortProm.p; + }; + } + const rpcServer = new RPCServer({ logger, idGen, timeoutTime: Infinity }); + await rpcServer.start({ manifest: { testMethod: new TestMethod({}) } }); + rpcServer.handleStream({ ...serverPair, cancel: () => {} }); + const rpcClient = new RPCClient({ + manifest: { testMethod: new DuplexCaller() }, + streamFactory: async () => ({ ...clientPair, cancel: () => {} }), + logger, + idGen, + }); + const callerTimer = new Timer(() => {}, Infinity); + const callerInterface = await rpcClient.methods.testMethod({ + timer: callerTimer, + }); + const writer = callerInterface.writable.getWriter(); + const reader = callerInterface.readable.getReader(); + // Trigger a call that will hang indefinitely or for a long time + // Write a value to the stream + await writer.write({ value: inputData }); + // Trigger a read that will hang indefinitely + const readPromise = reader.read(); + // Adding a randomized sleep here to check that neither timeout + const randomSleepTime = Math.floor(Math.random() * 1000) + 1; + // Random time between 1 and 1000 ms + await utils.sleep(randomSleepTime); + // At this point, writePromise and readPromise should neither be resolved nor rejected + // because the server method is hanging. + // Check if the promises are neither resolved nor rejected + const timeoutPromise = new Promise((_, reject) => + setTimeout(() => reject('timeout'), 1000), + ); + // Check if read status is still pending; + await expect(Promise.race([readPromise, timeoutPromise])).rejects.toBe( + 'timeout', + ); + // Cancel caller timer + callerTimer.cancel(); + // Expect neither to time out and verify that they can still handle other operations + await rpcServer.stop({ force: true }); }, - { numRuns: 1 }, - )('RPC client and server with infinite timeout', async ({ inputData }) => { - // Set up a client and server with infinite timeout settings - - const { clientPair, serverPair } = rpcTestUtils.createTapPairs< - Uint8Array, - Uint8Array - >(); - - class TestMethod extends DuplexHandler { - public handle = async function* ( - _input: AsyncIterableIterator, - _cancel: (reason?: any) => void, - _meta: Record | undefined, - ctx: ContextTimed, - ) { - ctx.signal.throwIfAborted(); - const abortProm = utils.promise(); - ctx.signal.addEventListener('abort', () => { - abortProm.rejectP(ctx.signal.reason); - }); - await abortProm.p; - }; - } - const rpcServer = new RPCServer({ - logger, - idGen, - timeoutTime: Infinity, - }); - await rpcServer.start({ manifest: { testMethod: new TestMethod({}) } }); - rpcServer.handleStream({ ...serverPair, cancel: () => {} }); - - const rpcClient = new RPCClient({ - manifest: { testMethod: new DuplexCaller() }, - streamFactory: async () => ({ ...clientPair, cancel: () => {} }), - logger, - idGen, - }); - - const callerTimer = new Timer(() => {}, Infinity); - - const callerInterface = await rpcClient.methods.testMethod({ - timer: callerTimer, - }); - - const writer = callerInterface.writable.getWriter(); - const reader = callerInterface.readable.getReader(); - - // Trigger a call that will hang indefinitely or for a long time - - // Write a value to the stream - await writer.write({ value: inputData }); - - // Trigger a read that will hang indefinitely - - const readPromise = reader.read(); - // Adding a randomized sleep here to check that neither timeout - const randomSleepTime = Math.floor(Math.random() * 1000) + 1; - // Random time between 1 and 1,000 ms - await utils.sleep(randomSleepTime); - // At this point, writePromise and readPromise should neither be resolved nor rejected - // because the server method is hanging. - - // Check if the promises are neither resolved nor rejected - const timeoutPromise = new Promise((_, reject) => - setTimeout(() => reject('timeout'), 1000), - ); - - // Check if read status is still pending; - - await expect(Promise.race([readPromise, timeoutPromise])).rejects.toBe( - 'timeout', - ); - - // Cancel caller timer - callerTimer.cancel(); - - // Expect neither to time out and verify that they can still handle other operations - await rpcServer.stop({ force: true }); - }); - test('RPC server times out using client timeout', async () => { + ); + test('the RPC server times out using client timeout', async () => { // Setup server and client communication pairs const { clientPair, serverPair } = rpcTestUtils.createTapPairs< Uint8Array, @@ -981,11 +851,7 @@ describe('RPC', () => { }; } // Set up a client and server with matching timeout settings - const rpcServer = new RPCServer({ - logger, - idGen, - timeoutTime: 150, - }); + const rpcServer = new RPCServer({ logger, idGen, timeoutTime: 150 }); await rpcServer.start({ manifest: { testMethod: new TestMethod({}), @@ -995,7 +861,6 @@ describe('RPC', () => { ...serverPair, cancel: () => {}, }); - const rpcClient = new RPCClient({ manifest: { testMethod: new UnaryCaller(), @@ -1011,16 +876,10 @@ describe('RPC', () => { }); await expect(rpcClient.methods.testMethod({}, { timer: 100 })).toReject(); await expect(ctxP).resolves.toHaveProperty(['timer', 'delay'], 100); - await rpcServer.stop({ force: true }); }); - test.prop( - { - message: fc.string(), - }, - { numRuns: 1 }, - )( - 'RPC client times out and server is able to ignore exception', + test.prop({ message: fc.string() }, { numRuns: 1 })( + 'the RPC server ignores exception when the client times out', async ({ message }) => { // Setup server and client communication pairs const { clientPair, serverPair } = rpcTestUtils.createTapPairs< @@ -1045,11 +904,7 @@ describe('RPC', () => { }; } // Set up a client and server with matching timeout settings - const rpcServer = new RPCServer({ - logger, - idGen, - timeoutTime: 150, - }); + const rpcServer = new RPCServer({ logger, idGen, timeoutTime: 150 }); await rpcServer.start({ manifest: { testMethod: new TestMethod({}), @@ -1059,7 +914,6 @@ describe('RPC', () => { ...serverPair, cancel: () => {}, }); - const rpcClient = new RPCClient({ manifest: { testMethod: new UnaryCaller(), @@ -1077,18 +931,16 @@ describe('RPC', () => { rpcClient.methods.testMethod({ value: message }, { timer: 100 }), ).resolves.toHaveProperty('value', message); await expect(ctxP).resolves.toHaveProperty(['timer', 'delay'], 100); - await rpcServer.stop({ force: true }); }, ); test.prop({ error: rpcTestUtils.errorArb(rpcTestUtils.errorArb()), - })('RPC Serializes and Deserializes Error', async ({ error }) => { + })('the RPC can serialize and deserialize errors', async ({ error }) => { const { clientPair, serverPair } = rpcTestUtils.createTapPairs< Uint8Array, Uint8Array >(); - class TestMethod extends UnaryHandler { public handle = async ( _input: JSONObject, @@ -1099,17 +951,13 @@ describe('RPC', () => { throw error; }; } - const rpcServer = new RPCServer({ - logger, - idGen, - }); + const rpcServer = new RPCServer({ logger, idGen }); await rpcServer.start({ manifest: { testMethod: new TestMethod({}), }, }); rpcServer.handleStream({ ...serverPair, cancel: () => {} }); - const rpcClient = new RPCClient({ manifest: { testMethod: new UnaryCaller(), @@ -1120,24 +968,21 @@ describe('RPC', () => { logger, idGen, }); - const callProm = rpcClient.methods.testMethod({}); const callError = await callProm.catch((e) => e); await expect(callProm).rejects.toThrow(rpcErrors.ErrorRPCRemote); expect(callError.cause).toEqual(error); - await rpcServer.stop({ force: true }); }); test.prop({ error: rpcTestUtils.errorArb(rpcTestUtils.errorArb()), })( - 'RPC Serializes and Deserializes Error with Custom Replacer Function', + 'the RPC can serialize and deserialize errors with a custom replacer', async ({ error }) => { const { clientPair, serverPair } = rpcTestUtils.createTapPairs< Uint8Array, Uint8Array >(); - class TestMethod extends UnaryHandler { public handle = async ( _input: JSONObject, @@ -1159,7 +1004,6 @@ describe('RPC', () => { }, }); rpcServer.handleStream({ ...serverPair, cancel: () => {} }); - const rpcClient = new RPCClient({ manifest: { testMethod: new UnaryCaller(), @@ -1170,26 +1014,20 @@ describe('RPC', () => { logger, idGen, }); - const callProm = rpcClient.methods.testMethod({}); const callError = await callProm.catch((e) => e); - await expect(callProm).rejects.toThrow(rpcErrors.ErrorRPCRemote); expect(callError.cause).toEqual(error); - await rpcServer.stop({ force: true }); }, ); - test('RPCServer force stop will propagate correct errors', async () => { + test('force stopping the RPC server should propagate correct errors', async () => { const { clientPair, serverPair } = rpcTestUtils.createTapPairs< Uint8Array, Uint8Array >(); - const errorMessage = 'test error'; - const testReason = Error(errorMessage); - class TestMethod extends UnaryHandler { public handle = async ( _input: JSONObject, @@ -1206,18 +1044,13 @@ describe('RPC', () => { throw await abortP.p; }; } - - const rpcServer = new RPCServer({ - logger, - idGen, - }); + const rpcServer = new RPCServer({ logger, idGen }); await rpcServer.start({ manifest: { testMethod: new TestMethod({}), }, }); rpcServer.handleStream({ ...serverPair, cancel: () => {} }); - const rpcClient = new RPCClient({ manifest: { testMethod: new UnaryCaller(), @@ -1228,9 +1061,7 @@ describe('RPC', () => { logger, idGen, }); - const testProm = rpcClient.methods.testMethod({}); - await rpcServer.stop({ force: true, reason: testReason }); const rejection = await testProm.catch((e) => e); expect(rejection).toBeInstanceOf(ErrorRPCRemote); diff --git a/tests/RPCClient.test.ts b/tests/RPCClient.test.ts index 24ace95..7467178 100644 --- a/tests/RPCClient.test.ts +++ b/tests/RPCClient.test.ts @@ -22,15 +22,14 @@ import ServerCaller from '@/callers/ServerCaller'; import ClientCaller from '@/callers/ClientCaller'; import UnaryCaller from '@/callers/UnaryCaller'; import RPCClient from '@/RPCClient'; -import RPCServer from '@/RPCServer'; import * as rpcErrors from '@/errors'; import * as rpcUtilsMiddleware from '@/middleware'; import { promise, timeoutCancelledReason } from '@/utils'; import * as utils from '@/utils'; import * as rpcTestUtils from './utils'; -describe(`${RPCClient.name}`, () => { - const logger = new Logger(`${RPCServer.name} Test`, LogLevel.WARN, [ +describe('RPCClient tests', () => { + const logger = new Logger('RPCClient Test', LogLevel.WARN, [ new StreamHandler(), ]); const idGen: IdGen = () => Promise.resolve(null); @@ -46,221 +45,231 @@ describe(`${RPCClient.name}`, () => { headerParams: rpcTestUtils.safeJsonObjectArb, inputData: rpcTestUtils.rawDataArb, outputData: rpcTestUtils.rawDataArb, - })('raw caller', async ({ headerParams, inputData, outputData }) => { - const [inputResult, inputWritableStream] = - rpcTestUtils.streamToArray(); - const [outputResult, outputWritableStream] = - rpcTestUtils.streamToArray(); - const streamPair: RPCStream = { - cancel: () => {}, - meta: undefined, - readable: new ReadableStream({ - start: (controller) => { - const leadingResponse: JSONRPCResponseSuccess = { - jsonrpc: '2.0', - result: {}, - id: null, - }; - controller.enqueue(Buffer.from(JSON.stringify(leadingResponse))); - for (const datum of outputData) { - controller.enqueue(datum); - } - controller.close(); - }, - }), - writable: inputWritableStream, - }; - const rpcClient = new RPCClient({ - manifest: {}, - streamFactory: async () => streamPair, - logger, - idGen, - }); - const callerInterface = await rpcClient.rawStreamCaller( - 'testMethod', - headerParams, - ); - await callerInterface.readable.pipeTo(outputWritableStream); - const writer = callerInterface.writable.getWriter(); - for (const inputDatum of inputData) { - await writer.write(inputDatum); - } - await writer.close(); - - const expectedHeader: JSONRPCRequest = { - jsonrpc: '2.0', - method: methodName, - params: headerParams, - id: null, - }; - expect(await inputResult).toStrictEqual([ - Buffer.from(JSON.stringify(expectedHeader)), - ...inputData, - ]); - expect(await outputResult).toStrictEqual(outputData); - }); - test.prop({ - messages: specificMessageArb, - })('generic duplex caller', async ({ messages }) => { - const inputStream = rpcTestUtils.messagesToReadableStream(messages); - const [outputResult, outputStream] = - rpcTestUtils.streamToArray(); - const streamPair: RPCStream = { - cancel: () => {}, - meta: undefined, - readable: inputStream, - writable: outputStream, - }; - const rpcClient = new RPCClient({ - manifest: {}, - streamFactory: async () => streamPair, - logger, - idGen, - }); - const callerInterface = await rpcClient.duplexStreamCaller< - JSONRPCRequestParams, - JSONRPCResponseResult - >(methodName); - const writable = callerInterface.writable.getWriter(); - for await (const value of callerInterface.readable) { - await writable.write(value); - } - await writable.close(); - - const expectedMessages: Array = messages.map( - (v, i) => ({ + })( + 'the RPC should be able to interface with a raw caller', + async ({ headerParams, inputData, outputData }) => { + const [inputResult, inputWritableStream] = + rpcTestUtils.streamToArray(); + const [outputResult, outputWritableStream] = + rpcTestUtils.streamToArray(); + const streamPair: RPCStream = { + cancel: () => {}, + meta: undefined, + readable: new ReadableStream({ + start: (controller) => { + const leadingResponse: JSONRPCResponseSuccess = { + jsonrpc: '2.0', + result: {}, + id: null, + }; + controller.enqueue(Buffer.from(JSON.stringify(leadingResponse))); + for (const datum of outputData) { + controller.enqueue(datum); + } + controller.close(); + }, + }), + writable: inputWritableStream, + }; + const rpcClient = new RPCClient({ + manifest: {}, + streamFactory: async () => streamPair, + logger, + idGen, + }); + const callerInterface = await rpcClient.rawStreamCaller( + 'testMethod', + headerParams, + ); + await callerInterface.readable.pipeTo(outputWritableStream); + const writer = callerInterface.writable.getWriter(); + for (const inputDatum of inputData) { + await writer.write(inputDatum); + } + await writer.close(); + const expectedHeader: JSONRPCRequest = { jsonrpc: '2.0', method: methodName, + params: headerParams, id: null, - params: { - ...v.result, - ...(i === 0 ? { metadata: { timeout: null } } : {}), - }, - }), - ); - - const outputMessages = (await outputResult).map((v) => - JSON.parse(v.toString()), - ); - - expect(outputMessages).toStrictEqual(expectedMessages); - }); + }; + expect(await inputResult).toStrictEqual([ + Buffer.from(JSON.stringify(expectedHeader)), + ...inputData, + ]); + expect(await outputResult).toStrictEqual(outputData); + }, + ); + test.prop({ + messages: specificMessageArb, + })( + 'the RPC should be able to interface with a generic duplex caller', + async ({ messages }) => { + const inputStream = rpcTestUtils.messagesToReadableStream(messages); + const [outputResult, outputStream] = + rpcTestUtils.streamToArray(); + const streamPair: RPCStream = { + cancel: () => {}, + meta: undefined, + readable: inputStream, + writable: outputStream, + }; + const rpcClient = new RPCClient({ + manifest: {}, + streamFactory: async () => streamPair, + logger, + idGen, + }); + const callerInterface = await rpcClient.duplexStreamCaller< + JSONRPCRequestParams, + JSONRPCResponseResult + >(methodName); + const writable = callerInterface.writable.getWriter(); + for await (const value of callerInterface.readable) { + await writable.write(value); + } + await writable.close(); + const expectedMessages: Array = messages.map( + (v, i) => ({ + jsonrpc: '2.0', + method: methodName, + id: null, + params: { + ...v.result, + ...(i === 0 ? { metadata: { timeout: null } } : {}), + }, + }), + ); + const outputMessages = (await outputResult).map((v) => + JSON.parse(v.toString()), + ); + expect(outputMessages).toStrictEqual(expectedMessages); + }, + ); test.prop({ messages: specificMessageArb, params: rpcTestUtils.safeJsonObjectArb, - })('generic server stream caller', async ({ messages, params }) => { - const inputStream = rpcTestUtils.messagesToReadableStream(messages); - const [outputResult, outputStream] = rpcTestUtils.streamToArray(); - const streamPair: RPCStream = { - cancel: () => {}, - meta: undefined, - readable: inputStream, - writable: outputStream, - }; - const rpcClient = new RPCClient({ - manifest: {}, - streamFactory: async () => streamPair, - logger, - idGen, - }); - const callerInterface = await rpcClient.serverStreamCaller< - JSONRPCRequestParams, - JSONRPCResponseResult - >(methodName, params); - const values: Array = []; - for await (const value of callerInterface) { - values.push(value); - } - const expectedValues = messages.map((v) => v.result); - expect(values).toStrictEqual(expectedValues); - expect((await outputResult)[0]?.toString()).toStrictEqual( - JSON.stringify({ - method: methodName, - jsonrpc: '2.0', - id: null, - params: { - ...params, - metadata: { - timeout: null, + })( + 'the RPC should be able to interface with a generic server caller', + async ({ messages, params }) => { + const inputStream = rpcTestUtils.messagesToReadableStream(messages); + const [outputResult, outputStream] = rpcTestUtils.streamToArray(); + const streamPair: RPCStream = { + cancel: () => {}, + meta: undefined, + readable: inputStream, + writable: outputStream, + }; + const rpcClient = new RPCClient({ + manifest: {}, + streamFactory: async () => streamPair, + logger, + idGen, + }); + const callerInterface = await rpcClient.serverStreamCaller< + JSONRPCRequestParams, + JSONRPCResponseResult + >(methodName, params); + const values: Array = []; + for await (const value of callerInterface) { + values.push(value); + } + const expectedValues = messages.map((v) => v.result); + expect(values).toStrictEqual(expectedValues); + expect((await outputResult)[0]?.toString()).toStrictEqual( + JSON.stringify({ + method: methodName, + jsonrpc: '2.0', + id: null, + params: { + ...params, + metadata: { + timeout: null, + }, }, - }, - }), - ); - }); + }), + ); + }, + ); test.prop({ message: rpcTestUtils.JSONRPCResponseSuccessArb(), params: fc.array(rpcTestUtils.safeJsonObjectArb), - })('generic client stream caller', async ({ message, params }) => { - const inputStream = rpcTestUtils.messagesToReadableStream([message]); - const [outputResult, outputStream] = - rpcTestUtils.streamToArray(); - const streamPair: RPCStream = { - cancel: () => {}, - meta: undefined, - readable: inputStream, - writable: outputStream, - }; - const rpcClient = new RPCClient({ - manifest: {}, - streamFactory: async () => streamPair, - logger, - idGen, - }); - const { output, writable } = await rpcClient.clientStreamCaller< - JSONRPCRequestParams, - JSONRPCResponseResult - >(methodName); - const writer = writable.getWriter(); - for (const param of params) { - await writer.write(param); - } - await writer.close(); - expect(await output).toStrictEqual(message.result); - const expectedOutput = params.map((v, i) => - JSON.stringify({ - method: methodName, - jsonrpc: '2.0', - id: null, - params: { ...v, ...(i === 0 ? { metadata: { timeout: null } } : {}) }, - }), - ); - - expect((await outputResult).map((v) => v.toString())).toStrictEqual( - expectedOutput, - ); - }); + })( + 'the RPC should interface with a generic client caller', + async ({ message, params }) => { + const inputStream = rpcTestUtils.messagesToReadableStream([message]); + const [outputResult, outputStream] = + rpcTestUtils.streamToArray(); + const streamPair: RPCStream = { + cancel: () => {}, + meta: undefined, + readable: inputStream, + writable: outputStream, + }; + const rpcClient = new RPCClient({ + manifest: {}, + streamFactory: async () => streamPair, + logger, + idGen, + }); + const { output, writable } = await rpcClient.clientStreamCaller< + JSONRPCRequestParams, + JSONRPCResponseResult + >(methodName); + const writer = writable.getWriter(); + for (const param of params) { + await writer.write(param); + } + await writer.close(); + expect(await output).toStrictEqual(message.result); + const expectedOutput = params.map((v, i) => + JSON.stringify({ + method: methodName, + jsonrpc: '2.0', + id: null, + params: { ...v, ...(i === 0 ? { metadata: { timeout: null } } : {}) }, + }), + ); + expect((await outputResult).map((v) => v.toString())).toStrictEqual( + expectedOutput, + ); + }, + ); test.prop({ message: rpcTestUtils.JSONRPCResponseSuccessArb(), params: rpcTestUtils.safeJsonObjectArb, - })('generic unary caller', async ({ message, params }) => { - const inputStream = rpcTestUtils.messagesToReadableStream([message]); - const [outputResult, outputStream] = rpcTestUtils.streamToArray(); - const streamPair: RPCStream = { - cancel: () => {}, - meta: undefined, - readable: inputStream, - writable: outputStream, - }; - const rpcClient = new RPCClient({ - manifest: {}, - streamFactory: async () => streamPair, - logger, - idGen, - }); - const result = await rpcClient.unaryCaller< - JSONRPCRequestParams, - JSONRPCResponseResult - >(methodName, params); - expect(result).toStrictEqual(message.result); - expect((await outputResult)[0]?.toString()).toStrictEqual( - JSON.stringify({ - method: methodName, - jsonrpc: '2.0', - id: null, - params: { ...params, metadata: { timeout: null } }, - }), - ); - }); + })( + 'the RPC should interface with a generic unary caller', + async ({ message, params }) => { + const inputStream = rpcTestUtils.messagesToReadableStream([message]); + const [outputResult, outputStream] = rpcTestUtils.streamToArray(); + const streamPair: RPCStream = { + cancel: () => {}, + meta: undefined, + readable: inputStream, + writable: outputStream, + }; + const rpcClient = new RPCClient({ + manifest: {}, + streamFactory: async () => streamPair, + logger, + idGen, + }); + const result = await rpcClient.unaryCaller< + JSONRPCRequestParams, + JSONRPCResponseResult + >(methodName, params); + expect(result).toStrictEqual(message.result); + expect((await outputResult)[0]?.toString()).toStrictEqual( + JSON.stringify({ + method: methodName, + jsonrpc: '2.0', + id: null, + params: { ...params, metadata: { timeout: null } }, + }), + ); + }, + ); test.prop({ messages: fc.array(rpcTestUtils.JSONRPCResponseSuccessArb()), errorMessage: rpcTestUtils.JSONRPCResponseFailedArb( @@ -292,26 +301,17 @@ describe(`${RPCClient.name}`, () => { >(methodName); await callerInterface.writable.close(); const callProm = (async () => { - for await (const _ of callerInterface.readable) { - // Only consume - } + // Only consume values + for await (const _ of callerInterface.readable); })(); await expect(callProm).rejects.toThrow(rpcErrors.ErrorRPCRemote); await outputResult; }, ); - test.prop({ - messages: fc.array(rpcTestUtils.JSONRPCResponseSuccessArb()), - errorMessage: rpcTestUtils.JSONRPCResponseFailedArb( - rpcTestUtils.errorArb(), - ), - })( - 'generic duplex caller can throw received error message with sensitive', - async ({ messages, errorMessage }) => { - const inputStream = rpcTestUtils.messagesToReadableStream([ - ...messages, - errorMessage, - ]); + test.prop({ messages: specificMessageArb })( + 'generic duplex caller should work with forward middleware', + async ({ messages }) => { + const inputStream = rpcTestUtils.messagesToReadableStream(messages); const [outputResult, outputStream] = rpcTestUtils.streamToArray(); const streamPair: RPCStream = { @@ -323,6 +323,21 @@ describe(`${RPCClient.name}`, () => { const rpcClient = new RPCClient({ manifest: {}, streamFactory: async () => streamPair, + middlewareFactory: rpcUtilsMiddleware.defaultClientMiddlewareWrapper( + () => { + return { + forward: new TransformStream({ + transform: (chunk, controller) => { + controller.enqueue({ + ...chunk, + params: { value: 'one', metadata: chunk.params?.metadata }, + }); + }, + }), + reverse: new TransformStream(), + }; + }, + ), logger, idGen, }); @@ -330,28 +345,40 @@ describe(`${RPCClient.name}`, () => { JSONRPCRequestParams, JSONRPCResponseResult >(methodName); - await callerInterface.writable.close(); - const callProm = (async () => { - for await (const _ of callerInterface.readable) { - // Only consume + const reader = callerInterface.readable.getReader(); + const writer = callerInterface.writable.getWriter(); + while (true) { + const { value, done } = await reader.read(); + if (done) { + // We have to end the writer otherwise the stream never closes + await writer.close(); + break; } - })(); - await expect(callProm).rejects.toThrow(rpcErrors.ErrorRPCRemote); - await outputResult; + await writer.write(value); + } + const expectedMessages: Array = messages.map( + (_, i) => ({ + jsonrpc: '2.0', + method: methodName, + id: null, + params: { + value: 'one', + ...(i === 0 ? { metadata: { timeout: null } } : {}), + }, + }), + ); + const outputMessages = (await outputResult).map((v) => + JSON.parse(v.toString()), + ); + expect(outputMessages).toStrictEqual(expectedMessages); }, ); test.prop({ - messages: fc.array(rpcTestUtils.JSONRPCResponseSuccessArb()), - errorMessage: rpcTestUtils.JSONRPCResponseFailedArb( - rpcTestUtils.errorArb(rpcTestUtils.errorArb()), - ), + messages: specificMessageArb, })( - 'generic duplex caller can throw received error message with causes', - async ({ messages, errorMessage }) => { - const inputStream = rpcTestUtils.messagesToReadableStream([ - ...messages, - errorMessage, - ]); + 'generic duplex caller should work with reverse middleware', + async ({ messages }) => { + const inputStream = rpcTestUtils.messagesToReadableStream(messages); const [outputResult, outputStream] = rpcTestUtils.streamToArray(); const streamPair: RPCStream = { @@ -363,6 +390,21 @@ describe(`${RPCClient.name}`, () => { const rpcClient = new RPCClient({ manifest: {}, streamFactory: async () => streamPair, + middlewareFactory: rpcUtilsMiddleware.defaultClientMiddlewareWrapper( + () => { + return { + forward: new TransformStream(), + reverse: new TransformStream({ + transform: (chunk, controller) => { + controller.enqueue({ + ...chunk, + result: { value: 'one' }, + }); + }, + }), + }; + }, + ), logger, idGen, }); @@ -370,343 +412,247 @@ describe(`${RPCClient.name}`, () => { JSONRPCRequestParams, JSONRPCResponseResult >(methodName); - await callerInterface.writable.close(); - const callProm = (async () => { - for await (const _ of callerInterface.readable) { - // Only consume + const reader = callerInterface.readable.getReader(); + const writer = callerInterface.writable.getWriter(); + while (true) { + const { value, done } = await reader.read(); + if (done) { + // We have to end the writer otherwise the stream never closes + await writer.close(); + break; } - })(); - await expect(callProm).rejects.toThrow(rpcErrors.ErrorRPCRemote); + expect(value).toStrictEqual({ value: 'one' }); + await writer.write(value); + } await outputResult; }, ); test.prop({ messages: specificMessageArb, - })('generic duplex caller with forward Middleware', async ({ messages }) => { - const inputStream = rpcTestUtils.messagesToReadableStream(messages); - const [outputResult, outputStream] = - rpcTestUtils.streamToArray(); - const streamPair: RPCStream = { - cancel: () => {}, - meta: undefined, - readable: inputStream, - writable: outputStream, - }; - const rpcClient = new RPCClient({ - manifest: {}, - streamFactory: async () => streamPair, - middlewareFactory: rpcUtilsMiddleware.defaultClientMiddlewareWrapper( - () => { - return { - forward: new TransformStream({ - transform: (chunk, controller) => { - controller.enqueue({ - ...chunk, - params: { value: 'one', metadata: chunk.params?.metadata }, - }); - }, - }), - reverse: new TransformStream(), - }; - }, - ), - logger, - idGen, - }); - - const callerInterface = await rpcClient.duplexStreamCaller< - JSONRPCRequestParams, - JSONRPCResponseResult - >(methodName); - const reader = callerInterface.readable.getReader(); - const writer = callerInterface.writable.getWriter(); - while (true) { - const { value, done } = await reader.read(); - if (done) { - // We have to end the writer otherwise the stream never closes - await writer.close(); - break; - } - await writer.write(value); - } - - const expectedMessages: Array = messages.map( - (_, i) => ({ - jsonrpc: '2.0', - method: methodName, - id: null, - params: { - value: 'one', - ...(i === 0 ? { metadata: { timeout: null } } : {}), - }, - }), - ); - - const outputMessages = (await outputResult).map((v) => - JSON.parse(v.toString()), - ); - expect(outputMessages).toStrictEqual(expectedMessages); - }); - test.prop({ - messages: specificMessageArb, - })('generic duplex caller with reverse Middleware', async ({ messages }) => { - const inputStream = rpcTestUtils.messagesToReadableStream(messages); - const [outputResult, outputStream] = - rpcTestUtils.streamToArray(); - const streamPair: RPCStream = { - cancel: () => {}, - meta: undefined, - readable: inputStream, - writable: outputStream, - }; - const rpcClient = new RPCClient({ - manifest: {}, - streamFactory: async () => streamPair, - middlewareFactory: rpcUtilsMiddleware.defaultClientMiddlewareWrapper( - () => { - return { - forward: new TransformStream(), - reverse: new TransformStream({ - transform: (chunk, controller) => { - controller.enqueue({ - ...chunk, - result: { value: 'one' }, - }); - }, - }), - }; + params: fc.string(), + })( + 'the manifest should be able to provide a server caller', + async ({ messages, params }) => { + const inputStream = rpcTestUtils.messagesToReadableStream(messages); + const [outputResult, outputStream] = + rpcTestUtils.streamToArray(); + const streamPair: RPCStream = { + cancel: () => {}, + meta: undefined, + readable: inputStream, + writable: outputStream, + }; + const rpcClient = new RPCClient({ + manifest: { + server: new ServerCaller< + JSONRPCRequestParams, + JSONRPCResponseResult + >(), }, - ), - logger, - idGen, - }); - - const callerInterface = await rpcClient.duplexStreamCaller< - JSONRPCRequestParams, - JSONRPCResponseResult - >(methodName); - const reader = callerInterface.readable.getReader(); - const writer = callerInterface.writable.getWriter(); - while (true) { - const { value, done } = await reader.read(); - if (done) { - // We have to end the writer otherwise the stream never closes - await writer.close(); - break; + streamFactory: async () => streamPair, + logger, + idGen, + }); + const callerInterface = await rpcClient.methods.server({ value: params }); + const values: Array = []; + for await (const value of callerInterface) { + values.push(value); } - expect(value).toStrictEqual({ value: 'one' }); - await writer.write(value); - } - await outputResult; - }); - test.prop({ - messages: specificMessageArb, - params: fc.string(), - })('manifest server call', async ({ messages, params }) => { - const inputStream = rpcTestUtils.messagesToReadableStream(messages); - const [outputResult, outputStream] = - rpcTestUtils.streamToArray(); - const streamPair: RPCStream = { - cancel: () => {}, - meta: undefined, - readable: inputStream, - writable: outputStream, - }; - const rpcClient = new RPCClient({ - manifest: { - server: new ServerCaller(), - }, - streamFactory: async () => streamPair, - logger, - idGen, - }); - const callerInterface = await rpcClient.methods.server({ value: params }); - const values: Array = []; - for await (const value of callerInterface) { - values.push(value); - } - const expectedValues = messages.map((v) => v.result); - expect(values).toStrictEqual(expectedValues); - expect((await outputResult)[0]?.toString()).toStrictEqual( - JSON.stringify({ - method: 'server', - jsonrpc: '2.0', - id: null, - params: { value: params, metadata: { timeout: null } }, - }), - ); - }); + const expectedValues = messages.map((v) => v.result); + expect(values).toStrictEqual(expectedValues); + expect((await outputResult)[0]?.toString()).toStrictEqual( + JSON.stringify({ + method: 'server', + jsonrpc: '2.0', + id: null, + params: { value: params, metadata: { timeout: null } }, + }), + ); + }, + ); test.prop({ message: rpcTestUtils.JSONRPCResponseSuccessArb( rpcTestUtils.safeJsonObjectArb, ), params: fc.array(fc.string(), { minLength: 5 }), - })('manifest client call', async ({ message, params }) => { - const inputStream = rpcTestUtils.messagesToReadableStream([message]); - const [outputResult, outputStream] = - rpcTestUtils.streamToArray(); - const streamPair: RPCStream = { - cancel: () => {}, - meta: undefined, - readable: inputStream, - writable: outputStream, - }; - const rpcClient = new RPCClient({ - manifest: { - client: new ClientCaller(), - }, - streamFactory: async () => streamPair, - logger, - idGen, - }); - const { output, writable } = await rpcClient.methods.client(); - const writer = writable.getWriter(); - for (const param of params) { - await writer.write({ value: param }); - } - expect(await output).toStrictEqual(message.result); - await writer.close(); - const expectedOutput = params.map((v, i) => - JSON.stringify({ - method: 'client', - jsonrpc: '2.0', - id: null, - params: { - value: v, - ...(i === 0 ? { metadata: { timeout: null } } : {}), + })( + 'the manifest should be able to provide a client caller', + async ({ message, params }) => { + const inputStream = rpcTestUtils.messagesToReadableStream([message]); + const [outputResult, outputStream] = + rpcTestUtils.streamToArray(); + const streamPair: RPCStream = { + cancel: () => {}, + meta: undefined, + readable: inputStream, + writable: outputStream, + }; + const rpcClient = new RPCClient({ + manifest: { + client: new ClientCaller< + JSONRPCRequestParams, + JSONRPCResponseResult + >(), }, - }), - ); - expect((await outputResult).map((v) => v.toString())).toStrictEqual( - expectedOutput, - ); - }); + streamFactory: async () => streamPair, + logger, + idGen, + }); + const { output, writable } = await rpcClient.methods.client(); + const writer = writable.getWriter(); + for (const param of params) { + await writer.write({ value: param }); + } + expect(await output).toStrictEqual(message.result); + await writer.close(); + const expectedOutput = params.map((v, i) => + JSON.stringify({ + method: 'client', + jsonrpc: '2.0', + id: null, + params: { + value: v, + ...(i === 0 ? { metadata: { timeout: null } } : {}), + }, + }), + ); + expect((await outputResult).map((v) => v.toString())).toStrictEqual( + expectedOutput, + ); + }, + ); test.prop({ message: rpcTestUtils.JSONRPCResponseSuccessArb(), params: fc.string(), - })('manifest unary call', async ({ message, params }) => { - const inputStream = rpcTestUtils.messagesToReadableStream([message]); - const [outputResult, outputStream] = rpcTestUtils.streamToArray(); - const streamPair: RPCStream = { - cancel: () => {}, - meta: undefined, - readable: inputStream, - writable: outputStream, - }; - const rpcClient = new RPCClient({ - manifest: { - unary: new UnaryCaller(), - }, - streamFactory: async () => streamPair, - logger, - idGen, - }); - const result = await rpcClient.methods.unary({ value: params }); - expect(result).toStrictEqual(message.result); - expect((await outputResult)[0]?.toString()).toStrictEqual( - JSON.stringify({ - method: 'unary', - jsonrpc: '2.0', - id: null, - params: { value: params, metadata: { timeout: null } }, - }), - ); - }); + })( + 'the manifest should be able to provide a unary caller', + async ({ message, params }) => { + const inputStream = rpcTestUtils.messagesToReadableStream([message]); + const [outputResult, outputStream] = rpcTestUtils.streamToArray(); + const streamPair: RPCStream = { + cancel: () => {}, + meta: undefined, + readable: inputStream, + writable: outputStream, + }; + const rpcClient = new RPCClient({ + manifest: { + unary: new UnaryCaller(), + }, + streamFactory: async () => streamPair, + logger, + idGen, + }); + const result = await rpcClient.methods.unary({ value: params }); + expect(result).toStrictEqual(message.result); + expect((await outputResult)[0]?.toString()).toStrictEqual( + JSON.stringify({ + method: 'unary', + jsonrpc: '2.0', + id: null, + params: { value: params, metadata: { timeout: null } }, + }), + ); + }, + ); test.prop({ headerParams: rpcTestUtils.safeJsonObjectArb, inputData: rpcTestUtils.rawDataArb, outputData: rpcTestUtils.rawDataArb, - })('manifest raw caller', async ({ headerParams, inputData, outputData }) => { - const [inputResult, inputWritableStream] = - rpcTestUtils.streamToArray(); - const [outputResult, outputWritableStream] = - rpcTestUtils.streamToArray(); - const streamPair: RPCStream = { - cancel: () => {}, - meta: undefined, - readable: new ReadableStream({ - start: (controller) => { - const leadingResponse: JSONRPCResponseSuccess = { - jsonrpc: '2.0', - result: { value: null }, - id: null, - }; - controller.enqueue(Buffer.from(JSON.stringify(leadingResponse))); - for (const datum of outputData) { - controller.enqueue(datum); - } - controller.close(); + })( + 'the manifest should be able to provide a raw caller', + async ({ headerParams, inputData, outputData }) => { + const [inputResult, inputWritableStream] = + rpcTestUtils.streamToArray(); + const [outputResult, outputWritableStream] = + rpcTestUtils.streamToArray(); + const streamPair: RPCStream = { + cancel: () => {}, + meta: undefined, + readable: new ReadableStream({ + start: (controller) => { + const leadingResponse: JSONRPCResponseSuccess = { + jsonrpc: '2.0', + result: { value: null }, + id: null, + }; + controller.enqueue(Buffer.from(JSON.stringify(leadingResponse))); + for (const datum of outputData) { + controller.enqueue(datum); + } + controller.close(); + }, + }), + writable: inputWritableStream, + }; + const rpcClient = new RPCClient({ + manifest: { + raw: new RawCaller(), }, - }), - writable: inputWritableStream, - }; - const rpcClient = new RPCClient({ - manifest: { - raw: new RawCaller(), - }, - streamFactory: async () => streamPair, - logger, - idGen, - }); - const callerInterface = await rpcClient.methods.raw(headerParams); - await callerInterface.readable.pipeTo(outputWritableStream); - const writer = callerInterface.writable.getWriter(); - for (const inputDatum of inputData) { - await writer.write(inputDatum); - } - await writer.close(); - - const expectedHeader: JSONRPCRequest = { - jsonrpc: '2.0', - method: 'raw', - params: headerParams, - id: null, - }; - expect(await inputResult).toStrictEqual([ - Buffer.from(JSON.stringify(expectedHeader)), - ...inputData, - ]); - expect(await outputResult).toStrictEqual(outputData); - }); + streamFactory: async () => streamPair, + logger, + idGen, + }); + const callerInterface = await rpcClient.methods.raw(headerParams); + await callerInterface.readable.pipeTo(outputWritableStream); + const writer = callerInterface.writable.getWriter(); + for (const inputDatum of inputData) { + await writer.write(inputDatum); + } + await writer.close(); + const expectedHeader: JSONRPCRequest = { + jsonrpc: '2.0', + method: 'raw', + params: headerParams, + id: null, + }; + expect(await inputResult).toStrictEqual([ + Buffer.from(JSON.stringify(expectedHeader)), + ...inputData, + ]); + expect(await outputResult).toStrictEqual(outputData); + }, + ); test.prop({ messages: fc.array( rpcTestUtils.JSONRPCResponseSuccessArb(rpcTestUtils.safeJsonObjectArb), - { - minLength: 1, - }, + { minLength: 1 }, ), - })('manifest duplex caller', async ({ messages }) => { - const inputStream = rpcTestUtils.messagesToReadableStream(messages); - const [outputResult, outputStream] = - rpcTestUtils.streamToArray(); - const streamPair: RPCStream = { - cancel: () => {}, - meta: undefined, - readable: inputStream, - writable: outputStream, - }; - const rpcClient = new RPCClient({ - manifest: { - duplex: new DuplexCaller(), - }, - streamFactory: async () => streamPair, - logger, - idGen, - }); - let count = 0; - const callerInterface = await rpcClient.methods.duplex(); - const writer = callerInterface.writable.getWriter(); - for await (const value of callerInterface.readable) { - count += 1; - await writer.write(value); - } - await writer.close(); - const result = await outputResult; - // We're just checking that it's consuming the messages as expected - expect(result.length).toEqual(messages.length); - expect(count).toEqual(messages.length); - }); - test('manifest without handler errors', async () => { + })( + 'the manifest should be able to provide a duplex caller', + async ({ messages }) => { + const inputStream = rpcTestUtils.messagesToReadableStream(messages); + const [outputResult, outputStream] = + rpcTestUtils.streamToArray(); + const streamPair: RPCStream = { + cancel: () => {}, + meta: undefined, + readable: inputStream, + writable: outputStream, + }; + const rpcClient = new RPCClient({ + manifest: { + duplex: new DuplexCaller(), + }, + streamFactory: async () => streamPair, + logger, + idGen, + }); + let count = 0; + const callerInterface = await rpcClient.methods.duplex(); + const writer = callerInterface.writable.getWriter(); + for await (const value of callerInterface.readable) { + count += 1; + await writer.write(value); + } + await writer.close(); + const result = await outputResult; + // We're just checking that it's consuming the messages as expected + expect(result.length).toEqual(messages.length); + expect(count).toEqual(messages.length); + }, + ); + test('the manifest should error without any handlers', async () => { const rpcClient = new RPCClient({ manifest: {}, streamFactory: async () => { @@ -720,10 +666,8 @@ describe(`${RPCClient.name}`, () => { // @ts-ignore: ignoring type safety here expect(() => rpcClient.withMethods.someMethod()).toThrow(); }); - test.prop({ - timeoutTime: fc.integer({ max: -1 }), - })( - 'constructor should throw when passed a negative timeoutTime', + test.prop({ timeoutTime: fc.integer({ max: -1 }) })( + 'the constructor should throw when passed a negative timeoutTime', async ({ timeoutTime }) => { const streamPair: RPCStream = { cancel: () => {}, @@ -739,12 +683,12 @@ describe(`${RPCClient.name}`, () => { logger, idGen, }); - expect(constructorF).toThrow(rpcErrors.ErrorRPCInvalidTimeout); }, ); - describe('raw caller', () => { - test('raw caller uses default timeout when creating stream', async () => { + + describe('RawCaller', () => { + test('raw caller uses default timeout when creating a stream', async () => { const holdProm = promise(); let ctx: ContextTimed | undefined; const rpcClient = new RPCClient({ @@ -795,7 +739,7 @@ describe(`${RPCClient.name}`, () => { expect(ctx?.signal.aborted).toBeTrue(); expect(ctx?.signal.reason).toBeInstanceOf(rpcErrors.ErrorRPCTimedOut); }); - test('raw caller handles abort when creating stream', async () => { + test('raw caller handles abortion when creating a stream', async () => { const holdProm = promise(); const ctxProm = promise(); const rpcClient = new RPCClient({ @@ -811,7 +755,6 @@ describe(`${RPCClient.name}`, () => { }); const abortController = new AbortController(); const rejectReason = Symbol('rejectReason'); - // Timing out on stream creation const callerInterfaceProm = rpcClient.rawStreamCaller( 'testMethod', @@ -825,7 +768,7 @@ describe(`${RPCClient.name}`, () => { expect(ctx?.signal.aborted).toBeTrue(); expect(ctx?.signal.reason).toBe(rejectReason); }); - test('raw caller times out awaiting stream', async () => { + test('raw caller times out awaiting a stream', async () => { const forwardPassThroughStream = new TransformStream< Uint8Array, Uint8Array @@ -862,7 +805,7 @@ describe(`${RPCClient.name}`, () => { expect(ctx?.signal.aborted).toBeTrue(); expect(ctx?.signal.reason).toBeInstanceOf(rpcErrors.ErrorRPCTimedOut); }); - test('raw caller handles abort awaiting stream', async () => { + test('raw caller handles abortion while awaiting a stream', async () => { const forwardPassThroughStream = new TransformStream< Uint8Array, Uint8Array @@ -912,8 +855,9 @@ describe(`${RPCClient.name}`, () => { expect(ctx?.signal.reason).toBe(rejectReason); }); }); - describe('duplex caller', () => { - test('duplex caller uses default timeout when creating stream', async () => { + + describe('DuplexCaller', () => { + test('duplex caller uses default timeout when creating a stream', async () => { const holdProm = promise(); let ctx: ContextTimed | undefined; const rpcClient = new RPCClient({ @@ -937,7 +881,7 @@ describe(`${RPCClient.name}`, () => { expect(ctx?.signal.aborted).toBeTrue(); expect(ctx?.signal.reason).toBeInstanceOf(rpcErrors.ErrorRPCTimedOut); }); - test('duplex caller times out when creating stream', async () => { + test('duplex caller times out when creating a stream', async () => { const holdProm = promise(); let ctx: ContextTimed | undefined; const rpcClient = new RPCClient({ @@ -962,7 +906,7 @@ describe(`${RPCClient.name}`, () => { expect(ctx?.signal.aborted).toBeTrue(); expect(ctx?.signal.reason).toBeInstanceOf(rpcErrors.ErrorRPCTimedOut); }); - test('duplex caller handles abort when creating stream', async () => { + test('duplex caller handles abortion when creating a stream', async () => { const holdProm = promise(); let ctx: ContextTimed | undefined; const rpcClient = new RPCClient({ @@ -979,7 +923,6 @@ describe(`${RPCClient.name}`, () => { const abortController = new AbortController(); const rejectReason = Symbol('rejectReason'); abortController.abort(rejectReason); - // Timing out on stream creation const callerInterfaceProm = rpcClient.duplexStreamCaller('testMethod', { signal: abortController.signal, @@ -989,7 +932,7 @@ describe(`${RPCClient.name}`, () => { expect(ctx?.signal.aborted).toBeTrue(); expect(ctx?.signal.reason).toBe(rejectReason); }); - test('duplex caller uses default timeout awaiting stream', async () => { + test('duplex caller uses default timeout while awaiting a stream', async () => { const forwardPassThroughStream = new TransformStream< Uint8Array, Uint8Array @@ -1015,14 +958,13 @@ describe(`${RPCClient.name}`, () => { logger, idGen, }); - // Timing out on stream await rpcClient.duplexStreamCaller('testMethod'); await ctx?.timer; expect(ctx?.signal.aborted).toBeTrue(); expect(ctx?.signal.reason).toBeInstanceOf(rpcErrors.ErrorRPCTimedOut); }); - test('duplex caller times out awaiting stream', async () => { + test('duplex caller times out while awaiting a stream', async () => { const forwardPassThroughStream = new TransformStream< Uint8Array, Uint8Array @@ -1064,7 +1006,7 @@ describe(`${RPCClient.name}`, () => { expect(Date.now() - start).toBeGreaterThan(500); expect(reason).toBeInstanceOf(rpcErrors.ErrorRPCTimedOut); }); - test('duplex caller handles abort awaiting stream', async () => { + test('duplex caller handles abortion awaiting a stream', async () => { const forwardPassThroughStream = new TransformStream< Uint8Array, Uint8Array @@ -1110,13 +1052,8 @@ describe(`${RPCClient.name}`, () => { expect(ctx?.signal.aborted).toBeTrue(); expect(ctx?.signal.reason).toBe(rejectReason); }); - test.prop( - { - messages: specificMessageArb, - }, - { numRuns: 5 }, - )( - 'duplex caller timeout is cancelled when receiving message', + test.prop({ messages: specificMessageArb }, { numRuns: 5 })( + 'duplex caller timeout is cancelled when receiving a message', async ({ messages }) => { const inputStream = rpcTestUtils.messagesToReadableStream(messages); const streamPair: RPCStream = { @@ -1139,23 +1076,16 @@ describe(`${RPCClient.name}`, () => { JSONRPCRequestParams, JSONRPCResponseResult >(methodName, { timer: 200 }); - const ctx = await ctxProm.p; const reader = callerInterface.readable.getReader(); reader.releaseLock(); - for await (const _ of callerInterface.readable) { - // Do nothing - } + // Do nothing + for await (const _ of callerInterface.readable); await expect(ctx.timer).rejects.toBe(timeoutCancelledReason); }, ); }); - test.prop( - { - messages: specificMessageArb, - }, - { numRuns: 5 }, - )( + test.prop({ messages: specificMessageArb }, { numRuns: 5 })( 'duplex caller timeout is not cancelled when receiving message with provided ctx', async ({ messages }) => { const inputStream = rpcTestUtils.messagesToReadableStream(messages); @@ -1179,22 +1109,19 @@ describe(`${RPCClient.name}`, () => { JSONRPCRequestParams, JSONRPCResponseResult >(methodName, { timer: new Timer(undefined, 200) }); - const ctx = await ctxProm.p; const reader = callerInterface.readable.getReader(); reader.releaseLock(); - for await (const _ of callerInterface.readable) { - // Do nothing - } + // Do nothing + for await (const _ of callerInterface.readable); await ctx.timer; expect(ctx.signal.reason).toBeInstanceOf(rpcErrors.ErrorRPCTimedOut); }, ); + describe('timeout priority', () => { - test.prop({ - timeouts: rpcTestUtils.timeoutsArb, - })( - 'check that call with ctx can override higher timeout of RPCClient', + test.prop({ timeouts: rpcTestUtils.timeoutsArb })( + 'calling with ctx should override higher timeout of RPCClient', async ({ timeouts: [lowerTimeoutTime, higherTimeoutTime] }) => { const streamPair: RPCStream = { cancel: () => {}, @@ -1213,24 +1140,20 @@ describe(`${RPCClient.name}`, () => { idGen, timeoutTime: higherTimeoutTime, }); - await rpcClient.duplexStreamCaller< JSONRPCRequestParams, JSONRPCResponseResult >(methodName, { timer: lowerTimeoutTime, }); - const ctx = await ctxP; expect(ctx.timer.delay).toBe(lowerTimeoutTime); ctx.timer.cancel(); await ctx.timer.catch(() => {}); }, ); - test.prop({ - timeouts: rpcTestUtils.timeoutsArb, - })( - 'check that call with ctx can override lower timeout of RPCClient', + test.prop({ timeouts: rpcTestUtils.timeoutsArb })( + 'calling with ctx should override lower timeout of RPCClient', async ({ timeouts: [lowerTimeoutTime, higherTimeoutTime] }) => { const streamPair: RPCStream = { cancel: () => {}, @@ -1249,24 +1172,20 @@ describe(`${RPCClient.name}`, () => { idGen, timeoutTime: lowerTimeoutTime, }); - await rpcClient.duplexStreamCaller< JSONRPCRequestParams, JSONRPCResponseResult >(methodName, { timer: higherTimeoutTime, }); - const ctx = await ctxP; expect(ctx.timer.delay).toBe(higherTimeoutTime); ctx.timer.cancel(); await ctx.timer.catch(() => {}); }, ); - test.prop({ - timeoutTime: fc.integer({ min: 0 }), - })( - 'check that call with ctx can override lower timeout of RPCClient with Infinity', + test.prop({ timeoutTime: fc.integer({ min: 0 }) })( + 'calling with ctx should override lower timeout of RPCClient with Infinity', async ({ timeoutTime }) => { const streamPair: RPCStream = { cancel: () => {}, @@ -1285,27 +1204,20 @@ describe(`${RPCClient.name}`, () => { idGen, timeoutTime, }); - await rpcClient.duplexStreamCaller< JSONRPCRequestParams, JSONRPCResponseResult >(methodName, { timer: Infinity, }); - const ctx = await ctxP; expect(ctx.timer.delay).toBe(Infinity); ctx.timer.cancel(); await ctx.timer.catch(() => {}); }, ); - test.prop( - { - messages: specificMessageArb, - }, - { numRuns: 1 }, - )( - 'Check that ctx is provided to the middleware and that the middleware can reset the timer', + test.prop({ messages: specificMessageArb }, { numRuns: 1 })( + 'ctx should be provided to the middleware and the middleware should be able to reset the timer', async ({ messages }) => { const inputStream = rpcTestUtils.messagesToReadableStream(messages); const [outputResult, outputStream] = @@ -1339,18 +1251,16 @@ describe(`${RPCClient.name}`, () => { JSONRPCRequestParams, JSONRPCResponseResult >(methodName); - const ctx = await ctxProm.p; // Writing should refresh timer engage the middleware const writer = callerInterface.writable.getWriter(); await writer.write({}); expect(ctx.timer.delay).toBe(123); await writer.close(); - await outputResult; }, ); - test('Check that promises returned by streamFactory are handled', async () => { + test('promises returned by streamFactory should be handled', async () => { const error = new Error('streamFactory error'); const rpcClients = [ new RPCClient({ diff --git a/tests/RPCServer.test.ts b/tests/RPCServer.test.ts index f45c633..60bb2f3 100644 --- a/tests/RPCServer.test.ts +++ b/tests/RPCServer.test.ts @@ -27,8 +27,8 @@ import UnaryHandler from '@/handlers/UnaryHandler'; import ClientHandler from '@/handlers/ClientHandler'; import * as rpcTestUtils from './utils'; -describe(`${RPCServer.name}`, () => { - const logger = new Logger(`${RPCServer.name} Test`, LogLevel.WARN, [ +describe('RPCServer', () => { + const logger = new Logger('RPCServer Test', LogLevel.WARN, [ new StreamHandler(), ]); const idGen: IdGen = () => Promise.resolve(null); @@ -58,433 +58,540 @@ describe(`${RPCServer.name}`, () => { data: rpcTestUtils.safeJsonValueArb, }), ); - test.prop( - { - messages: specificMessageArb, + test.prop({ messages: specificMessageArb }, { numRuns: 1 })( + 'can stream data with raw duplex stream handler', + async ({ messages }) => { + const stream = rpcTestUtils + .messagesToReadableStream(messages) + .pipeThrough( + rpcTestUtils.binaryStreamToSnippedStream([4, 7, 13, 2, 6]), + ); + class TestHandler extends RawHandler { + public handle = async ( + input: [JSONRPCRequest, ReadableStream], + _cancel: (reason?: any) => void, + _meta: Record | undefined, + _ctx: ContextTimed, + ): Promise<[JSONRPCResponseResult, ReadableStream]> => { + // Consume values + for await (const _ of input[1]); + const readableStream = new ReadableStream({ + start: (controller) => { + controller.enqueue(Buffer.from('hello world!')); + controller.close(); + }, + }); + return Promise.resolve([{}, readableStream]); + }; + } + const rpcServer = new RPCServer({ logger, idGen }); + await rpcServer.start({ + manifest: { + testMethod: new TestHandler({}), + }, + }); + const [outputResult, outputStream] = rpcTestUtils.streamToArray(); + const readWriteStream: RPCStream = { + cancel: () => {}, + readable: stream, + writable: outputStream, + }; + rpcServer.handleStream(readWriteStream); + await outputResult; + await rpcServer.stop({ force: true }); }, - { numRuns: 1 }, - )('can stream data with raw duplex stream handler', async ({ messages }) => { - const stream = rpcTestUtils - .messagesToReadableStream(messages) - .pipeThrough(rpcTestUtils.binaryStreamToSnippedStream([4, 7, 13, 2, 6])); - class TestHandler extends RawHandler { - public handle = async ( - input: [JSONRPCRequest, ReadableStream], - _cancel: (reason?: any) => void, - _meta: Record | undefined, - _ctx: ContextTimed, - ): Promise<[JSONRPCResponseResult, ReadableStream]> => { - for await (const _ of input[1]) { - // No touch, only consume - } - const readableStream = new ReadableStream({ - start: (controller) => { - controller.enqueue(Buffer.from('hello world!')); - controller.close(); - }, - }); - return Promise.resolve([{}, readableStream]); + ); + test.prop({ messages: specificMessageArb })( + 'can stream data with duplex stream handler', + async ({ messages }) => { + const stream = rpcTestUtils.messagesToReadableStream(messages); + class TestMethod extends DuplexHandler { + public handle = async function* ( + input: AsyncGenerator, + _cancel: (reason?: any) => void, + _meta: Record | undefined, + _ctx: ContextTimed, + ): AsyncGenerator { + // Yield only the first value + const result = await input.next(); + if (!result.done) yield result.value; + await input.return(undefined); + }; + } + const rpcServer = new RPCServer({ logger, idGen }); + await rpcServer.start({ + manifest: { + testMethod: new TestMethod({}), + }, + }); + const [outputResult, outputStream] = rpcTestUtils.streamToArray(); + const readWriteStream: RPCStream = { + cancel: () => {}, + readable: stream, + writable: outputStream, }; - } - - const rpcServer = new RPCServer({ - logger, - idGen, - }); - await rpcServer.start({ - manifest: { - testMethod: new TestHandler({}), - }, - }); - const [outputResult, outputStream] = rpcTestUtils.streamToArray(); - const readWriteStream: RPCStream = { - cancel: () => {}, - readable: stream, - writable: outputStream, - }; - rpcServer.handleStream(readWriteStream); - await outputResult; - await rpcServer.stop({ force: true }); - }); - test.prop({ - messages: specificMessageArb, - })('can stream data with duplex stream handler', async ({ messages }) => { - const stream = rpcTestUtils.messagesToReadableStream(messages); - class TestMethod extends DuplexHandler { - public handle = async function* ( - input: AsyncGenerator, - _cancel: (reason?: any) => void, - _meta: Record | undefined, - _ctx: ContextTimed, - ): AsyncGenerator { - // Yield only the first value - const result = await input.next(); - if (!result.done) yield result.value; - await input.return(undefined); + rpcServer.handleStream(readWriteStream); + await outputResult; + await rpcServer.stop({ force: true }); + }, + ); + test.prop({ messages: specificMessageArb })( + 'can stream data with client stream handler', + async ({ messages }) => { + const stream = rpcTestUtils.messagesToReadableStream(messages); + class TestMethod extends ClientHandler { + public handle = async ( + input: AsyncGenerator>, + _cancel: (reason?: any) => void, + _meta: Record | undefined, + _ctx: ContextTimed, + ): Promise> => { + let count = 0; + for await (const _ of input) { + count += 1; + } + return { value: count }; + }; + } + const rpcServer = new RPCServer({ logger, idGen }); + await rpcServer.start({ + manifest: { + testMethod: new TestMethod({}), + }, + }); + const [outputResult, outputStream] = rpcTestUtils.streamToArray(); + const readWriteStream: RPCStream = { + cancel: () => {}, + readable: stream, + writable: outputStream, }; - } - const rpcServer = new RPCServer({ - logger, - idGen, - }); - await rpcServer.start({ - manifest: { - testMethod: new TestMethod({}), - }, - }); - const [outputResult, outputStream] = rpcTestUtils.streamToArray(); - const readWriteStream: RPCStream = { - cancel: () => {}, - readable: stream, - writable: outputStream, - }; - rpcServer.handleStream(readWriteStream); - await outputResult; - await rpcServer.stop({ force: true }); - }); - test.prop({ - messages: specificMessageArb, - })('can stream data with client stream handler', async ({ messages }) => { - const stream = rpcTestUtils.messagesToReadableStream(messages); - class TestMethod extends ClientHandler { - public handle = async ( - input: AsyncGenerator>, - _cancel: (reason?: any) => void, - _meta: Record | undefined, - _ctx: ContextTimed, - ): Promise> => { - let count = 0; - for await (const _ of input) { - count += 1; - } - return { value: count }; + rpcServer.handleStream(readWriteStream); + await outputResult; + await rpcServer.stop({ force: true }); + }, + ); + test.prop({ messages: singleNumberMessageArb })( + 'can stream data with server stream handler', + async ({ messages }) => { + const stream = rpcTestUtils.messagesToReadableStream(messages); + class TestMethod extends ServerHandler< + ContainerType, + JSONRPCRequestParams, + JSONRPCResponseResult + > { + public handle = async function* ( + input: JSONRPCRequestParams, + ): AsyncGenerator { + const number = (input.value as number) ?? 0; + for (let i = 0; i < number; i++) { + yield { value: i }; + } + }; + } + const rpcServer = new RPCServer({ logger, idGen }); + await rpcServer.start({ + manifest: { + testMethod: new TestMethod({}), + }, + }); + const [outputResult, outputStream] = rpcTestUtils.streamToArray(); + const readWriteStream: RPCStream = { + cancel: () => {}, + readable: stream, + writable: outputStream, }; - } - const rpcServer = new RPCServer({ - logger, - idGen, - }); - await rpcServer.start({ - manifest: { - testMethod: new TestMethod({}), - }, - }); - const [outputResult, outputStream] = rpcTestUtils.streamToArray(); - const readWriteStream: RPCStream = { - cancel: () => {}, - readable: stream, - writable: outputStream, - }; - rpcServer.handleStream(readWriteStream); - await outputResult; - await rpcServer.stop({ force: true }); - }); - test.prop({ - messages: singleNumberMessageArb, - })('can stream data with server stream handler', async ({ messages }) => { - const stream = rpcTestUtils.messagesToReadableStream(messages); - class TestMethod extends ServerHandler< - ContainerType, - JSONRPCRequestParams, - JSONRPCResponseResult - > { - public handle = async function* ( - input: JSONRPCRequestParams, - ): AsyncGenerator { - const number = (input.value as number) ?? 0; - for (let i = 0; i < number; i++) { - yield { value: i }; - } + rpcServer.handleStream(readWriteStream); + await outputResult; + await rpcServer.stop({ force: true }); + }, + ); + test.prop({ messages: specificMessageArb })( + 'can stream data with unary call handler', + async ({ messages }) => { + const stream = rpcTestUtils.messagesToReadableStream(messages); + class TestMethod extends UnaryHandler { + public handle = async ( + input: JSONRPCRequestParams, + ): Promise => { + return input; + }; + } + const rpcServer = new RPCServer({ logger, idGen }); + await rpcServer.start({ + manifest: { + testMethod: new TestMethod({}), + }, + }); + const [outputResult, outputStream] = rpcTestUtils.streamToArray(); + const readWriteStream: RPCStream = { + cancel: () => {}, + readable: stream, + writable: outputStream, }; - } - const rpcServer = new RPCServer({ - logger, - idGen, - }); - await rpcServer.start({ - manifest: { - testMethod: new TestMethod({}), - }, - }); - const [outputResult, outputStream] = rpcTestUtils.streamToArray(); - const readWriteStream: RPCStream = { - cancel: () => {}, - readable: stream, - writable: outputStream, - }; - rpcServer.handleStream(readWriteStream); - await outputResult; - await rpcServer.stop({ force: true }); - }); - test.prop({ - messages: specificMessageArb, - })('can stream data with server stream handler', async ({ messages }) => { - const stream = rpcTestUtils.messagesToReadableStream(messages); - class TestMethod extends UnaryHandler { - public handle = async ( - input: JSONRPCRequestParams, - ): Promise => { - return input; + rpcServer.handleStream(readWriteStream); + await outputResult; + await rpcServer.stop({ force: true }); + }, + ); + test.prop({ messages: specificMessageArb })( + 'handler should be provided with container', + async ({ messages }) => { + const stream = rpcTestUtils.messagesToReadableStream(messages); + const container = { + a: Symbol('a'), + B: Symbol('b'), + C: Symbol('c'), }; - } - const rpcServer = new RPCServer({ - logger, - idGen, - }); - await rpcServer.start({ - manifest: { - testMethod: new TestMethod({}), - }, - }); - const [outputResult, outputStream] = rpcTestUtils.streamToArray(); - const readWriteStream: RPCStream = { - cancel: () => {}, - readable: stream, - writable: outputStream, - }; - rpcServer.handleStream(readWriteStream); - await outputResult; - await rpcServer.stop({ force: true }); - }); - test.prop({ - messages: specificMessageArb, - })('handler is provided with container', async ({ messages }) => { - const stream = rpcTestUtils.messagesToReadableStream(messages); - const container = { - a: Symbol('a'), - B: Symbol('b'), - C: Symbol('c'), - }; - class TestMethod extends DuplexHandler { - public handle = async function* ( - input: AsyncGenerator, - _cancel: (reason?: any) => void, - _meta: Record | undefined, - _ctx: ContextTimed, - ): AsyncGenerator { - expect(this.container).toBe(container); - for await (const val of input) { - yield val; - } + class TestMethod extends DuplexHandler { + public handle = async function* ( + input: AsyncGenerator, + _cancel: (reason?: any) => void, + _meta: Record | undefined, + _ctx: ContextTimed, + ): AsyncGenerator { + expect(this.container).toBe(container); + for await (const val of input) { + yield val; + } + }; + } + const rpcServer = new RPCServer({ logger, idGen }); + await rpcServer.start({ + manifest: { + testMethod: new TestMethod(container), + }, + }); + const [outputResult, outputStream] = rpcTestUtils.streamToArray(); + const readWriteStream: RPCStream = { + cancel: () => {}, + readable: stream, + writable: outputStream, }; - } - - const rpcServer = new RPCServer({ - logger, - idGen, - }); - await rpcServer.start({ - manifest: { - testMethod: new TestMethod(container), - }, - }); - const [outputResult, outputStream] = rpcTestUtils.streamToArray(); - const readWriteStream: RPCStream = { - cancel: () => {}, - readable: stream, - writable: outputStream, - }; - rpcServer.handleStream(readWriteStream); - await outputResult; - await rpcServer.stop({ force: true }); - }); - test.prop({ - messages: specificMessageArb, - })('handler is provided with connectionInfo', async ({ messages }) => { - const stream = rpcTestUtils.messagesToReadableStream(messages); - const meta = { - localHost: 'hostA', - localPort: 12341, - remoteCertificates: [], - remoteHost: 'hostA', - remotePort: 12341, - }; - let handledMeta: Record | undefined = undefined; - class TestMethod extends DuplexHandler { - public handle = async function* ( - input: AsyncGenerator, - _cancel: (reason?: any) => void, - meta: Record | undefined, - _ctx: ContextTimed, - ): AsyncGenerator { - handledMeta = meta; - for await (const val of input) { - yield val; - } + rpcServer.handleStream(readWriteStream); + await outputResult; + await rpcServer.stop({ force: true }); + }, + ); + test.prop({ messages: specificMessageArb })( + 'handler should be provided with connectionInfo', + async ({ messages }) => { + const stream = rpcTestUtils.messagesToReadableStream(messages); + const meta = { + localHost: 'hostA', + localPort: 12341, + remoteCertificates: [], + remoteHost: 'hostA', + remotePort: 12341, }; - } - const rpcServer = new RPCServer({ - logger, - idGen, - }); - await rpcServer.start({ - manifest: { - testMethod: new TestMethod({}), - }, - }); - const [outputResult, outputStream] = rpcTestUtils.streamToArray(); - const readWriteStream: RPCStream = { - cancel: () => {}, - meta, - readable: stream, - writable: outputStream, - }; - rpcServer.handleStream(readWriteStream); - await outputResult; - await rpcServer.stop({ force: true }); - expect(handledMeta).toBe(meta); - }); - test.prop({ - messages: specificMessageArb, - })('handler can be aborted', async ({ messages }) => { - const stream = rpcTestUtils.messagesToReadableStream(messages); - class TestMethod extends DuplexHandler { - public handle = async function* ( - input: AsyncGenerator, - _cancel: (reason?: any) => void, - _meta: Record | undefined, - ctx: ContextTimed, - ): AsyncGenerator { - for await (const val of input) { - if (ctx.signal.aborted) throw ctx.signal.reason; - yield val; - } + let handledMeta: Record | undefined = undefined; + class TestMethod extends DuplexHandler { + public handle = async function* ( + input: AsyncGenerator, + _cancel: (reason?: any) => void, + meta: Record | undefined, + _ctx: ContextTimed, + ): AsyncGenerator { + handledMeta = meta; + for await (const val of input) { + yield val; + } + }; + } + const rpcServer = new RPCServer({ logger, idGen }); + await rpcServer.start({ + manifest: { + testMethod: new TestMethod({}), + }, + }); + const [outputResult, outputStream] = rpcTestUtils.streamToArray(); + const readWriteStream: RPCStream = { + cancel: () => {}, + meta, + readable: stream, + writable: outputStream, }; - } - const rpcServer = new RPCServer({ - logger, - idGen, - }); - await rpcServer.start({ - manifest: { - testMethod: new TestMethod({}), - }, - }); - const [outputResult, outputStream] = - rpcTestUtils.streamToArray(); - let activeStream: PromiseCancellable | undefined = undefined; - const tapStream = rpcTestUtils.tapTransformStream( - async (_, iteration) => { - if (iteration === 2) { - // @ts-ignore: kidnap private property - const activeStreams = rpcServer.activeStreams.values(); - for (const activeStream_ of activeStreams) { - activeStream = activeStream_; - activeStream_.cancel(Error('Some error')); + rpcServer.handleStream(readWriteStream); + await outputResult; + await rpcServer.stop({ force: true }); + expect(handledMeta).toBe(meta); + }, + ); + test.prop({ messages: specificMessageArb })( + 'handler can be aborted', + async ({ messages }) => { + const stream = rpcTestUtils.messagesToReadableStream(messages); + class TestMethod extends DuplexHandler { + public handle = async function* ( + input: AsyncGenerator, + _cancel: (reason?: any) => void, + _meta: Record | undefined, + ctx: ContextTimed, + ): AsyncGenerator { + for await (const val of input) { + if (ctx.signal.aborted) throw ctx.signal.reason; + yield val; } - } - }, - ); - void tapStream.readable.pipeTo(outputStream).catch(() => {}); - const readWriteStream: RPCStream = { - cancel: () => {}, - readable: stream, - writable: tapStream.writable, - }; - rpcServer.handleStream(readWriteStream); - const result = await outputResult; - const lastMessage = result[result.length - 1]; - expect(activeStream).toBeDefined(); - await expect(activeStream!).toResolve(); - expect(lastMessage).toBeDefined(); - expect(() => - rpcUtils.parseJSONRPCResponseFailed(JSON.parse(lastMessage.toString())), - ).not.toThrow(); - await rpcServer.stop({ force: true }); - }); - test.prop({ - messages: specificMessageArb, - })('handler yields nothing', async ({ messages }) => { - const stream = rpcTestUtils.messagesToReadableStream(messages); - class TestMethod extends DuplexHandler { - public handle = async function* ( - input: AsyncGenerator, - _cancel: (reason?: any) => void, - _meta: Record | undefined, - _ctx: ContextTimed, - ): AsyncGenerator { - for await (const _ of input) { - // Do nothing, just consume - } + }; + } + const rpcServer = new RPCServer({ logger, idGen }); + await rpcServer.start({ + manifest: { + testMethod: new TestMethod({}), + }, + }); + const [outputResult, outputStream] = + rpcTestUtils.streamToArray(); + let activeStream: PromiseCancellable | undefined = undefined; + const tapStream = rpcTestUtils.tapTransformStream( + async (_, iteration) => { + if (iteration === 2) { + // @ts-ignore: kidnap private property + const activeStreams = rpcServer.activeStreams.values(); + for (const activeStream_ of activeStreams) { + activeStream = activeStream_; + activeStream_.cancel(Error('Some error')); + } + } + }, + ); + void tapStream.readable.pipeTo(outputStream).catch(() => {}); + const readWriteStream: RPCStream = { + cancel: () => {}, + readable: stream, + writable: tapStream.writable, }; - } - const rpcServer = new RPCServer({ - logger, - idGen, - }); - await rpcServer.start({ - manifest: { - testMethod: new TestMethod({}), - }, - }); - const [outputResult, outputStream] = rpcTestUtils.streamToArray(); - const readWriteStream: RPCStream = { - cancel: () => {}, - readable: stream, - writable: outputStream, - }; - rpcServer.handleStream(readWriteStream); - await outputResult; - // We're just expecting no errors - await rpcServer.stop({ force: true }); - }); - test.prop({ - messages: specificMessageArb, - error: rpcTestUtils.errorArb(rpcTestUtils.errorArb()), - })('should send error message', async ({ messages, error }) => { - const stream = rpcTestUtils.messagesToReadableStream(messages); - class TestMethod extends DuplexHandler { - public handle = - async function* (): AsyncGenerator { - throw error; + rpcServer.handleStream(readWriteStream); + const result = await outputResult; + const lastMessage = result[result.length - 1]; + expect(activeStream).toBeDefined(); + await expect(activeStream!).toResolve(); + expect(lastMessage).toBeDefined(); + expect(() => + rpcUtils.parseJSONRPCResponseFailed(JSON.parse(lastMessage.toString())), + ).not.toThrow(); + await rpcServer.stop({ force: true }); + }, + ); + test.prop({ messages: specificMessageArb })( + 'handler can yield nothing', + async ({ messages }) => { + const stream = rpcTestUtils.messagesToReadableStream(messages); + class TestMethod extends DuplexHandler { + public handle = async function* ( + input: AsyncGenerator, + _cancel: (reason?: any) => void, + _meta: Record | undefined, + _ctx: ContextTimed, + ): AsyncGenerator { + // Consume values + for await (const _ of input); }; - } - const rpcServer = new RPCServer({ - logger, - idGen, - }); - await rpcServer.start({ - manifest: { - testMethod: new TestMethod({}), - }, - }); - const { - p: errorEventP, - resolveP: resolveErrorEventP, - rejectP: rejectErrorEventP, - } = rpcUtils.promise(); - rpcServer.addEventListener('error', (event: rpcEvents.RPCErrorEvent) => { - resolveErrorEventP(event); - }); - const [outputResult, outputStream] = rpcTestUtils.streamToArray(); - const readWriteStream: RPCStream = { - cancel: () => {}, - readable: stream, - writable: outputStream, - }; - rpcServer.handleStream(readWriteStream); - const rawErrorMessage = (await outputResult)[0]!.toString(); - const errorMessage = JSON.parse(rawErrorMessage); - expect(errorMessage.error.message).toEqual(error.message); - rejectErrorEventP(Error('Never received error event')); - await expect(errorEventP).toReject(); - await rpcServer.stop({ force: true }); - }); + } + const rpcServer = new RPCServer({ logger, idGen }); + await rpcServer.start({ + manifest: { + testMethod: new TestMethod({}), + }, + }); + const [outputResult, outputStream] = rpcTestUtils.streamToArray(); + const readWriteStream: RPCStream = { + cancel: () => {}, + readable: stream, + writable: outputStream, + }; + rpcServer.handleStream(readWriteStream); + await outputResult; + // We're just expecting no errors + await rpcServer.stop({ force: true }); + }, + ); test.prop({ messages: specificMessageArb, error: rpcTestUtils.errorArb(rpcTestUtils.errorArb()), })( - 'should send error message with sensitive', + 'handler should be able to send error messages', async ({ messages, error }) => { const stream = rpcTestUtils.messagesToReadableStream(messages); class TestMethod extends DuplexHandler { - public handle = - async function* (): AsyncGenerator { - throw error; - }; + public handle = + async function* (): AsyncGenerator { + throw error; + }; + } + const rpcServer = new RPCServer({ logger, idGen }); + await rpcServer.start({ + manifest: { + testMethod: new TestMethod({}), + }, + }); + const { + p: errorEventP, + resolveP: resolveErrorEventP, + rejectP: rejectErrorEventP, + } = rpcUtils.promise(); + rpcServer.addEventListener('error', (event: rpcEvents.RPCErrorEvent) => { + resolveErrorEventP(event); + }); + const [outputResult, outputStream] = rpcTestUtils.streamToArray(); + const readWriteStream: RPCStream = { + cancel: () => {}, + readable: stream, + writable: outputStream, + }; + rpcServer.handleStream(readWriteStream); + const rawErrorMessage = (await outputResult)[0]!.toString(); + const errorMessage = JSON.parse(rawErrorMessage); + expect(errorMessage.error.message).toEqual(error.message); + rejectErrorEventP(Error('Never received error event')); + await expect(errorEventP).toReject(); + await rpcServer.stop({ force: true }); + }, + ); + test.prop({ messages: specificMessageArb }, { numRuns: 1 })( + 'should emit a stream error if the input stream fails', + async ({ messages }) => { + const handlerEndedProm = promise(); + class TestMethod extends DuplexHandler { + public handle = async function* ( + input: AsyncGenerator, + ): AsyncGenerator { + try { + // Consume but don't yield anything + for await (const _ of input); + } finally { + handlerEndedProm.resolveP(); + } + }; + } + const rpcServer = new RPCServer({ logger, idGen }); + await rpcServer.start({ + manifest: { + testMethod: new TestMethod({}), + }, + }); + const passThroughStreamIn = new TransformStream(); + const [outputResult, outputStream] = rpcTestUtils.streamToArray(); + const readWriteStream: RPCStream = { + cancel: () => {}, + readable: passThroughStreamIn.readable, + writable: outputStream, + }; + rpcServer.handleStream(readWriteStream); + const writer = passThroughStreamIn.writable.getWriter(); + // Write messages + for (const message of messages) { + await writer.write(Buffer.from(JSON.stringify(message))); + } + // Abort stream + const writerReason = new Error('writerAbort'); + await writer.abort(writerReason); + // We should get an error RPC message + await expect(outputResult).toResolve(); + const errorMessage = JSON.parse((await outputResult)[0].toString()); + // Parse without error + rpcUtils.parseJSONRPCResponseFailed(errorMessage); + // Check that the handler was cleaned up. + await expect(handlerEndedProm.p).toResolve(); + await rpcServer.stop({ force: true }); + }, + ); + test.prop({ messages: specificMessageArb }, { numRuns: 1 })( + 'should emit a stream error if the output stream fails', + async ({ messages }) => { + const handlerEndedProm = promise(); + const ctxProm = promise(); + class TestMethod extends DuplexHandler { + public handle = async function* ( + input: AsyncGenerator, + _cancel: (reason?: any) => void, + _meta: Record | undefined, + ctx: ContextTimed, + ): AsyncGenerator { + ctxProm.resolveP(ctx); + // Echo input + try { + yield* input; + } finally { + handlerEndedProm.resolveP(); + } + }; + } + const rpcServer = new RPCServer({ logger, idGen }); + await rpcServer.start({ + manifest: { + testMethod: new TestMethod({}), + }, + }); + const { p: errorEventP, resolveP: resolveErrorEventP } = + rpcUtils.promise(); + rpcServer.addEventListener( + 'error', + (rpcErrorEvent: rpcEvents.RPCErrorEvent) => { + resolveErrorEventP(rpcErrorEvent); + }, + ); + const passThroughStreamIn = new TransformStream(); + const passThroughStreamOut = new TransformStream< + Uint8Array, + Uint8Array + >(); + const readWriteStream: RPCStream = { + cancel: () => {}, + readable: passThroughStreamIn.readable, + writable: passThroughStreamOut.writable, + }; + rpcServer.handleStream(readWriteStream); + const writer = passThroughStreamIn.writable.getWriter(); + const reader = passThroughStreamOut.readable.getReader(); + // Write messages + for (const message of messages) { + await writer.write(Buffer.from(JSON.stringify(message))); + await reader.read(); + } + // Abort stream + const readerReason = Symbol('readerAbort'); + await reader.cancel(readerReason); + // We should get an error event + const event = await errorEventP; + await writer.close(); + expect(event.detail).toBeInstanceOf(rpcErrors.ErrorRPCStreamEnded); + // Check that the handler was cleaned up. + await expect(handlerEndedProm.p).toResolve(); + // Check that an abort signal happened + const ctx = await ctxProm.p; + expect(ctx.signal.aborted).toBeTrue(); + expect(ctx.signal.reason).toBe(readerReason); + await rpcServer.stop({ force: true }); + }, + ); + test.prop({ messages: specificMessageArb })( + 'message should pass through the forward middleware', + async ({ messages }) => { + const stream = rpcTestUtils.messagesToReadableStream(messages); + class TestMethod extends DuplexHandler { + public handle = async function* ( + input: AsyncGenerator, + _cancel: (reason?: any) => void, + _meta: Record | undefined, + _ctx: ContextTimed, + ): AsyncGenerator { + yield* input; + }; } - + const middlewareFactory = + rpcUtilsMiddleware.defaultServerMiddlewareWrapper(() => { + return { + forward: new TransformStream({ + transform: (chunk, controller) => { + chunk.params = { value: 1 }; + controller.enqueue(chunk); + }, + }), + reverse: new TransformStream(), + }; + }); const rpcServer = new RPCServer({ + middlewareFactory: middlewareFactory, logger, idGen, }); @@ -493,14 +600,6 @@ describe(`${RPCServer.name}`, () => { testMethod: new TestMethod({}), }, }); - const { - p: errorEventP, - resolveP: resolveErrorEventP, - rejectP: rejectErrorEventP, - } = rpcUtils.promise(); - rpcServer.addEventListener('error', (thing: rpcEvents.RPCErrorEvent) => { - resolveErrorEventP(thing); - }); const [outputResult, outputStream] = rpcTestUtils.streamToArray(); const readWriteStream: RPCStream = { cancel: () => {}, @@ -508,345 +607,167 @@ describe(`${RPCServer.name}`, () => { writable: outputStream, }; rpcServer.handleStream(readWriteStream); - const rawErrorMessage = (await outputResult)[0]!.toString(); - const errorMessage = JSON.parse(rawErrorMessage); - expect(errorMessage.error.message).toEqual(error.message); - rejectErrorEventP(Error('Never received error event')); - await expect(errorEventP).toReject(); + const out = await outputResult; + expect(out.map((v) => v!.toString())).toStrictEqual( + messages.map(() => + JSON.stringify({ + jsonrpc: '2.0', + result: { value: 1 }, + id: null, + }), + ), + ); await rpcServer.stop({ force: true }); }, ); - test.prop( - { - messages: specificMessageArb, - }, - { numRuns: 1 }, - )('should emit stream error if input stream fails', async ({ messages }) => { - const handlerEndedProm = promise(); - class TestMethod extends DuplexHandler { - public handle = async function* ( - input: AsyncGenerator, - ): AsyncGenerator { - try { - for await (const _ of input) { - // Consume but don't yield anything - } - } finally { - handlerEndedProm.resolveP(); - } - }; - } - const rpcServer = new RPCServer({ - logger, - idGen, - }); - await rpcServer.start({ - manifest: { - testMethod: new TestMethod({}), - }, - }); - const passThroughStreamIn = new TransformStream(); - const [outputResult, outputStream] = rpcTestUtils.streamToArray(); - const readWriteStream: RPCStream = { - cancel: () => {}, - readable: passThroughStreamIn.readable, - writable: outputStream, - }; - rpcServer.handleStream(readWriteStream); - const writer = passThroughStreamIn.writable.getWriter(); - // Write messages - for (const message of messages) { - await writer.write(Buffer.from(JSON.stringify(message))); - } - // Abort stream - const writerReason = new Error('writerAbort'); - await writer.abort(writerReason); - // We should get an error RPC message - await expect(outputResult).toResolve(); - const errorMessage = JSON.parse((await outputResult)[0].toString()); - // Parse without error - rpcUtils.parseJSONRPCResponseFailed(errorMessage); - // Check that the handler was cleaned up. - await expect(handlerEndedProm.p).toResolve(); - await rpcServer.stop({ force: true }); - }); - test.prop( - { - messages: specificMessageArb, - }, - { numRuns: 1 }, - )('should emit stream error if output stream fails', async ({ messages }) => { - const handlerEndedProm = promise(); - const ctxProm = promise(); - class TestMethod extends DuplexHandler { - public handle = async function* ( - input: AsyncGenerator, - _cancel: (reason?: any) => void, - _meta: Record | undefined, - ctx: ContextTimed, - ): AsyncGenerator { - ctxProm.resolveP(ctx); - // Echo input - try { + test.prop({ messages: specificMessageArb }, { numRuns: 1 })( + 'message should pass through the reverse middleware', + async ({ messages }) => { + const stream = rpcTestUtils.messagesToReadableStream(messages); + class TestMethod extends DuplexHandler { + public handle = async function* ( + input: AsyncGenerator>, + _cancel: (reason?: any) => void, + _meta: Record | undefined, + _ctx: ContextTimed, + ): AsyncGenerator> { yield* input; - } finally { - handlerEndedProm.resolveP(); - } + }; + } + const middleware = rpcUtilsMiddleware.defaultServerMiddlewareWrapper( + () => { + return { + forward: new TransformStream(), + reverse: new TransformStream({ + transform: (chunk, controller) => { + if ('result' in chunk) chunk.result = { value: 1 }; + controller.enqueue(chunk); + }, + }), + }; + }, + ); + const rpcServer = new RPCServer({ + middlewareFactory: middleware, + logger, + idGen, + }); + await rpcServer.start({ + manifest: { + testMethod: new TestMethod({}), + }, + }); + const [outputResult, outputStream] = rpcTestUtils.streamToArray(); + const readWriteStream: RPCStream = { + cancel: () => {}, + readable: stream, + writable: outputStream, }; - } - const rpcServer = new RPCServer({ - logger, - idGen, - }); - await rpcServer.start({ - manifest: { - testMethod: new TestMethod({}), - }, - }); - const { p: errorEventP, resolveP: resolveErrorEventP } = - rpcUtils.promise(); - rpcServer.addEventListener( - 'error', - (rpcErrorEvent: rpcEvents.RPCErrorEvent) => { - resolveErrorEventP(rpcErrorEvent); - }, - ); - const passThroughStreamIn = new TransformStream(); - const passThroughStreamOut = new TransformStream(); - const readWriteStream: RPCStream = { - cancel: () => {}, - readable: passThroughStreamIn.readable, - writable: passThroughStreamOut.writable, - }; - rpcServer.handleStream(readWriteStream); - const writer = passThroughStreamIn.writable.getWriter(); - const reader = passThroughStreamOut.readable.getReader(); - // Write messages - for (const message of messages) { - await writer.write(Buffer.from(JSON.stringify(message))); - await reader.read(); - } - // Abort stream - const readerReason = Symbol('readerAbort'); - await reader.cancel(readerReason); - // We should get an error event - const event = await errorEventP; - await writer.close(); - expect(event.detail).toBeInstanceOf(rpcErrors.ErrorRPCStreamEnded); - // Check that the handler was cleaned up. - await expect(handlerEndedProm.p).toResolve(); - // Check that an abort signal happened - const ctx = await ctxProm.p; - expect(ctx.signal.aborted).toBeTrue(); - expect(ctx.signal.reason).toBe(readerReason); - await rpcServer.stop({ force: true }); - }); - // Temporarily commenting out to allow the CI to make a release - // test.prop({ - // messages: specificMessageArb, - // })('forward middlewares', async ({ messages }) => { - // const stream = rpcTestUtils.messagesToReadableStream(messages); - // class TestMethod extends DuplexHandler { - // public handle = async function* ( - // input: AsyncGenerator, - // _cancel: (reason?: any) => void, - // _meta: Record | undefined, - // _ctx: ContextTimed, - // ): AsyncGenerator { - // yield* input; - // }; - // } - // const middlewareFactory = rpcUtilsMiddleware.defaultServerMiddlewareWrapper( - // () => { - // return { - // forward: new TransformStream({ - // transform: (chunk, controller) => { - // chunk.params = { value: 1 }; - // controller.enqueue(chunk); - // }, - // }), - // reverse: new TransformStream(), - // }; - // }, - // ); - // const rpcServer = new RPCServer({ - // middlewareFactory: middlewareFactory, - // logger, - // idGen, - // }); - // await rpcServer.start({ - // manifest: { - // testMethod: new TestMethod({}), - // }, - // }); - // const [outputResult, outputStream] = rpcTestUtils.streamToArray(); - // const readWriteStream: RPCStream = { - // cancel: () => {}, - // readable: stream, - // writable: outputStream, - // }; - // rpcServer.handleStream(readWriteStream); - // const out = await outputResult; - // expect(out.map((v) => v!.toString())).toStrictEqual( - // messages.map(() => - // JSON.stringify({ - // jsonrpc: '2.0', - // result: { value: 1 }, - // id: null, - // }), - // ), - // ); - // await rpcServer.stop({ force: true }); - // }); - // test.prop( - // { - // messages: specificMessageArb, - // }, - // { numRuns: 1 }, - // )('reverse middlewares', async ({ messages }) => { - // const stream = rpcTestUtils.messagesToReadableStream(messages); - // class TestMethod extends DuplexHandler { - // public handle = async function* ( - // input: AsyncGenerator>, - // _cancel: (reason?: any) => void, - // _meta: Record | undefined, - // _ctx: ContextTimed, - // ): AsyncGenerator> { - // yield* input; - // }; - // } - // const middleware = rpcUtilsMiddleware.defaultServerMiddlewareWrapper(() => { - // return { - // forward: new TransformStream(), - // reverse: new TransformStream({ - // transform: (chunk, controller) => { - // if ('result' in chunk) chunk.result = { value: 1 }; - // controller.enqueue(chunk); - // }, - // }), - // }; - // }); - // const rpcServer = new RPCServer({ - // middlewareFactory: middleware, - // logger, - // idGen, - // }); - // await rpcServer.start({ - // manifest: { - // testMethod: new TestMethod({}), - // }, - // }); - // const [outputResult, outputStream] = rpcTestUtils.streamToArray(); - // const readWriteStream: RPCStream = { - // cancel: () => {}, - // readable: stream, - // writable: outputStream, - // }; - // rpcServer.handleStream(readWriteStream); - // const out = await outputResult; - // expect(out.map((v) => v!.toString())).toStrictEqual( - // messages.map(() => - // JSON.stringify({ - // jsonrpc: '2.0', - // result: { value: 1 }, - // id: null, - // }), - // ), - // ); - // await rpcServer.stop({ force: true }); - // }); + rpcServer.handleStream(readWriteStream); + const out = await outputResult; + expect(out.map((v) => v!.toString())).toStrictEqual( + messages.map(() => + JSON.stringify({ + jsonrpc: '2.0', + result: { value: 1 }, + id: null, + }), + ), + ); + await rpcServer.stop({ force: true }); + }, + ); test.prop({ message: invalidTokenMessageArb, - })('forward middleware authentication', async ({ message }) => { - const stream = rpcTestUtils.messagesToReadableStream([message]); - class TestMethod extends DuplexHandler { - public handle = async function* ( - input: AsyncGenerator, - _cancel: (reason?: any) => void, - _meta: Record | undefined, - _ctx: ContextTimed, - ): AsyncGenerator { - yield* input; + })( + 'message should pass through authentication forward middleware', + async ({ message }) => { + const stream = rpcTestUtils.messagesToReadableStream([message]); + class TestMethod extends DuplexHandler { + public handle = async function* ( + input: AsyncGenerator, + _cancel: (reason?: any) => void, + _meta: Record | undefined, + _ctx: ContextTimed, + ): AsyncGenerator { + yield* input; + }; + } + const middleware = rpcUtilsMiddleware.defaultServerMiddlewareWrapper( + () => { + let first = true; + let reverseController: TransformStreamDefaultController; + return { + forward: new TransformStream< + JSONRPCRequest, + JSONRPCRequest + >({ + transform: (chunk, controller) => { + if (first && chunk.params?.metadata?.token !== validToken) { + reverseController.enqueue(failureMessage); + // Closing streams early + controller.terminate(); + reverseController.terminate(); + } + first = false; + controller.enqueue(chunk); + }, + }), + reverse: new TransformStream({ + start: (controller) => { + // Kidnapping reverse controller + reverseController = controller; + }, + transform: (chunk, controller) => { + controller.enqueue(chunk); + }, + }), + }; + }, + ); + const rpcServer = new RPCServer({ + middlewareFactory: middleware, + logger, + idGen, + }); + await rpcServer.start({ + manifest: { + testMethod: new TestMethod({}), + }, + }); + const [outputResult, outputStream] = rpcTestUtils.streamToArray(); + const readWriteStream: RPCStream = { + cancel: () => {}, + readable: stream, + writable: outputStream, }; - } - const middleware = rpcUtilsMiddleware.defaultServerMiddlewareWrapper(() => { - let first = true; - let reverseController: TransformStreamDefaultController; - return { - forward: new TransformStream< - JSONRPCRequest, - JSONRPCRequest - >({ - transform: (chunk, controller) => { - if (first && chunk.params?.metadata?.token !== validToken) { - reverseController.enqueue(failureMessage); - // Closing streams early - controller.terminate(); - reverseController.terminate(); - } - first = false; - controller.enqueue(chunk); - }, - }), - reverse: new TransformStream({ - start: (controller) => { - // Kidnapping reverse controller - reverseController = controller; - }, - transform: (chunk, controller) => { - controller.enqueue(chunk); - }, - }), + type TestType = { + metadata: { + token: string; + }; + data: JSONValue; }; - }); - const rpcServer = new RPCServer({ - middlewareFactory: middleware, - logger, - idGen, - }); - await rpcServer.start({ - manifest: { - testMethod: new TestMethod({}), - }, - }); - const [outputResult, outputStream] = rpcTestUtils.streamToArray(); - const readWriteStream: RPCStream = { - cancel: () => {}, - readable: stream, - writable: outputStream, - }; - type TestType = { - metadata: { - token: string; + const failureMessage: JSONRPCResponseFailed = { + jsonrpc: '2.0', + id: null, + error: { + code: 1, + message: 'failure of some kind', + }, }; - data: JSONValue; - }; - const failureMessage: JSONRPCResponseFailed = { - jsonrpc: '2.0', - id: null, - error: { - code: 1, - message: 'failure of some kind', - }, - }; - rpcServer.handleStream(readWriteStream); - expect((await outputResult).toString()).toEqual( - JSON.stringify(failureMessage), - ); - await rpcServer.stop({ force: true }); - }); + rpcServer.handleStream(readWriteStream); + expect((await outputResult).toString()).toEqual( + JSON.stringify(failureMessage), + ); + await rpcServer.stop({ force: true }); + }, + ); test.prop({ timeoutTime: fc.integer({ max: -1 }), })( 'constructor should throw when passed a negative timeout', async ({ timeoutTime }) => { - const constructorF = () => - new RPCServer({ - timeoutTime, - logger, - idGen, - }); - + const constructorF = () => new RPCServer({ timeoutTime, logger, idGen }); expect(constructorF).toThrow(rpcErrors.ErrorRPCInvalidTimeout); }, ); @@ -857,7 +778,6 @@ describe(`${RPCServer.name}`, () => { async ({ timeoutTime }) => { const waitProm = promise(); const ctxLongProm = promise(); - class TestMethodArbitraryTimeout extends UnaryHandler { timeout = timeoutTime; public handle = async ( @@ -871,11 +791,7 @@ describe(`${RPCServer.name}`, () => { return input; }; } - const rpcServer = new RPCServer({ - logger, - idGen, - }); - + const rpcServer = new RPCServer({ logger, idGen }); await expect( rpcServer.start({ manifest: { @@ -885,11 +801,9 @@ describe(`${RPCServer.name}`, () => { ).rejects.toBeInstanceOf(rpcErrors.ErrorRPCInvalidHandlerTimeout); }, ); - test('timeout with default time after handler selected', async () => { + test('timeout with default time after a handler is selected', async () => { const ctxProm = promise(); - // Diagnostic log to indicate the start of the test - class TestHandler extends RawHandler { public handle = async ( _input: [JSONRPCRequest, ReadableStream], @@ -899,34 +813,26 @@ describe(`${RPCServer.name}`, () => { ): Promise<[JSONRPCResponseResult, ReadableStream]> => { return new Promise((resolve) => { ctxProm.resolveP(ctx); - let controller: ReadableStreamController; const stream = new ReadableStream({ start: (controller_) => { controller = controller_; }, }); - ctx.signal.addEventListener('abort', () => { controller!.error(Error('ending')); }); - // Return something to fulfill the Promise type expectation. resolve([{}, stream]); }); }; } - const rpcServer = new RPCServer({ - timeoutTime: 100, - logger, - idGen, - }); + const rpcServer = new RPCServer({ logger, idGen, timeoutTime: 100 }); await rpcServer.start({ manifest: { testMethod: new TestHandler({}), }, }); - const [outputResult, outputStream] = rpcTestUtils.streamToArray(); const stream = rpcTestUtils.messagesToReadableStream([ { @@ -940,7 +846,6 @@ describe(`${RPCServer.name}`, () => { params: {}, }, ]); - const readWriteStream: RPCStream = { cancel: () => {}, readable: stream, @@ -952,26 +857,19 @@ describe(`${RPCServer.name}`, () => { await ctx.timer; expect(ctx.signal.reason).toBeInstanceOf(rpcErrors.ErrorRPCTimedOut); await expect(outputResult).toReject(); - await rpcServer.stop({ force: true }); }); - test('timeout with default time before handler selected', async () => { - const rpcServer = new RPCServer({ - timeoutTime: 100, - logger, - idGen, - }); + test('timeout with default time before a handler is selected', async () => { + const rpcServer = new RPCServer({ logger, idGen, timeoutTime: 100 }); await rpcServer.start({ manifest: {}, }); const readWriteStream: RPCStream = { cancel: () => {}, readable: new ReadableStream({ - // Ignore cancel: () => {}, }), writable: new WritableStream({ - // Ignore abort: () => {}, }), }; @@ -995,17 +893,12 @@ describe(`${RPCServer.name}`, () => { ctx: ContextTimed, ): AsyncGenerator> { contextProm.resolveP(ctx); - for await (const _ of input) { - // Do nothing - } + // Do nothing + for await (const _ of input); yield { value: 1 }; }; } - const rpcServer = new RPCServer({ - logger, - idGen, - timeoutTime: 1000, - }); + const rpcServer = new RPCServer({ logger, idGen, timeoutTime: 1000 }); await rpcServer.start({ manifest: { testMethod: new TestHandler({}), @@ -1048,9 +941,8 @@ describe(`${RPCServer.name}`, () => { return new Promise((resolve) => { ctxProm.resolveP(ctx); void (async () => { - for await (const _ of input[1]) { - // Do nothing, just consume - } + // Consume values + for await (const _ of input[1]); })(); const readableStream = new ReadableStream({ start: (controller) => { @@ -1061,10 +953,7 @@ describe(`${RPCServer.name}`, () => { }); }; } - const rpcServer = new RPCServer({ - logger, - idGen, - }); + const rpcServer = new RPCServer({ logger, idGen }); await rpcServer.start({ manifest: { testMethod: new TestHandler({}), @@ -1093,58 +982,59 @@ describe(`${RPCServer.name}`, () => { await expect(ctx.timer).toReject(); await rpcServer.stop({ force: true }); }); - test.prop({ - messages: specificMessageArb, - })('middleware can update timeout timer', async ({ messages }) => { - const stream = rpcTestUtils.messagesToReadableStream(messages); - const ctxProm = promise(); - class TestMethod extends DuplexHandler { - public handle = async function* ( - input: AsyncGenerator, - _cancel: (reason?: any) => void, - _meta: Record | undefined, - ctx: ContextTimed, - ): AsyncGenerator { - ctxProm.resolveP(ctx); - yield* input; - }; - } - const middlewareFactory = rpcUtilsMiddleware.defaultServerMiddlewareWrapper( - (ctx) => { - ctx.timer.reset(12345); - return { - forward: new TransformStream(), - reverse: new TransformStream(), + test.prop({ messages: specificMessageArb })( + 'middleware can update timeout timer', + async ({ messages }) => { + const stream = rpcTestUtils.messagesToReadableStream(messages); + const ctxProm = promise(); + class TestMethod extends DuplexHandler { + public handle = async function* ( + input: AsyncGenerator, + _cancel: (reason?: any) => void, + _meta: Record | undefined, + ctx: ContextTimed, + ): AsyncGenerator { + ctxProm.resolveP(ctx); + yield* input; }; - }, - ); - const rpcServer = new RPCServer({ - middlewareFactory: middlewareFactory, - logger, - idGen, - }); - await rpcServer.start({ - manifest: { - testMethod: new TestMethod({}), - }, - }); - const [outputResult, outputStream] = rpcTestUtils.streamToArray(); - const readWriteStream: RPCStream = { - cancel: () => {}, - readable: stream, - writable: outputStream, - }; - rpcServer.handleStream(readWriteStream); - await outputResult; - const ctx = await ctxProm.p; - expect(ctx.timer.delay).toBe(12345); - }); + } + const middlewareFactory = + rpcUtilsMiddleware.defaultServerMiddlewareWrapper((ctx) => { + ctx.timer.reset(12345); + return { + forward: new TransformStream(), + reverse: new TransformStream(), + }; + }); + const rpcServer = new RPCServer({ + middlewareFactory: middlewareFactory, + logger, + idGen, + }); + await rpcServer.start({ + manifest: { + testMethod: new TestMethod({}), + }, + }); + const [outputResult, outputStream] = rpcTestUtils.streamToArray(); + const readWriteStream: RPCStream = { + cancel: () => {}, + readable: stream, + writable: outputStream, + }; + rpcServer.handleStream(readWriteStream); + await outputResult; + const ctx = await ctxProm.p; + expect(ctx.timer.delay).toBe(12345); + }, + ); + describe('timeout priority', () => { test.prop({ messages: specificMessageArb, timeouts: rpcTestUtils.timeoutsArb, })( - 'check that handler can override higher timeout of RPCServer', + 'handlers should be able to override higher timeout of RPCServer', async ({ messages, timeouts: [lowerTimeoutTime, higherTimeoutTime] }) => { const stream = rpcTestUtils.messagesToReadableStream(messages); const { p: ctxP, resolveP: resolveCtxP } = promise(); @@ -1162,8 +1052,8 @@ describe(`${RPCServer.name}`, () => { } const rpcServer = new RPCServer({ logger, - timeoutTime: higherTimeoutTime, idGen, + timeoutTime: higherTimeoutTime, }); await rpcServer.start({ manifest: { @@ -1188,7 +1078,7 @@ describe(`${RPCServer.name}`, () => { messages: specificMessageArb, timeouts: rpcTestUtils.timeoutsArb, })( - 'check that handler can override lower timeout of RPCServer', + 'handlers should be able to override lower timeout of RPCServer', async ({ messages, timeouts: [lowerTimeoutTime, higherTimeoutTime] }) => { const stream = rpcTestUtils.messagesToReadableStream(messages); const { p: ctxP, resolveP: resolveCtxP } = promise(); @@ -1206,8 +1096,8 @@ describe(`${RPCServer.name}`, () => { } const rpcServer = new RPCServer({ logger, - timeoutTime: lowerTimeoutTime, idGen, + timeoutTime: lowerTimeoutTime, }); await rpcServer.start({ manifest: { @@ -1233,7 +1123,7 @@ describe(`${RPCServer.name}`, () => { messages: specificMessageArb, timeoutTime: fc.integer({ min: 0 }), })( - 'check that handler can override lower timeout of RPCServer with Infinity', + 'handlers should be able to override lower timeout of RPCServer with Infinity', async ({ messages, timeoutTime }) => { const stream = rpcTestUtils.messagesToReadableStream(messages); const { p: ctxP, resolveP: resolveCtxP } = promise(); @@ -1249,11 +1139,7 @@ describe(`${RPCServer.name}`, () => { yield* input; }; } - const rpcServer = new RPCServer({ - logger, - timeoutTime, - idGen, - }); + const rpcServer = new RPCServer({ logger, idGen, timeoutTime }); await rpcServer.start({ manifest: { testMethod: new TestMethod({}), diff --git a/tests/middleware.test.ts b/tests/middleware.test.ts index 4b738db..242d1e1 100644 --- a/tests/middleware.test.ts +++ b/tests/middleware.test.ts @@ -1,8 +1,8 @@ +import 'ix/add/asynciterable-operators/toarray'; import { fc, test } from '@fast-check/jest'; import { AsyncIterableX as AsyncIterable } from 'ix/asynciterable'; import { Timer } from '@matrixai/timer'; import * as rpcUtils from '@/utils'; -import 'ix/add/asynciterable-operators/toarray'; import * as rpcErrors from '@/errors'; import * as rpcUtilsMiddleware from '@/middleware'; import * as rpcTestUtils from './utils'; @@ -15,36 +15,53 @@ describe('Middleware tests', () => { ) .noShrink(); - test.prop( - { - messages: rpcTestUtils.jsonMessagesArb, + test.prop({ messages: rpcTestUtils.jsonMessagesArb }, { numRuns: 1000 })( + 'converting to raw and back to JSON', + async ({ messages }) => { + const parsedStream = rpcTestUtils + .messagesToReadableStream(messages) + .pipeThrough( + rpcUtilsMiddleware.binaryToJsonMessageStream( + rpcUtils.parseJSONRPCMessage, + ), + ); // Converting back. + + const messagesParsed = await AsyncIterable.as(parsedStream).toArray(); + expect(messagesParsed).toEqual(messages); }, - { - numRuns: 1000, + ); + test.prop({ messages: rpcTestUtils.jsonMessagesArb }, { numRuns: 100 })( + 'header message is json while content is binary stream', + async ({ messages }) => { + const parsedStream = rpcTestUtils + .messagesToReadableStream(messages) + .pipeThrough( + rpcUtilsMiddleware.binaryToJsonHeaderMessageStream( + rpcUtils.parseJSONRPCMessage, + ), + ); + let first = true; + for await (const chunk of parsedStream) { + if (first) { + // We can't check for types at runtime, especially a JSON type which + // can have arbitrary fields. + expect(chunk).not.toBeInstanceOf(Uint8Array); + first = false; + continue; + } + expect(chunk).toBeInstanceOf(Uint8Array); + } }, - )('converting to raw and back to JSON', async ({ messages }) => { - const parsedStream = rpcTestUtils - .messagesToReadableStream(messages) - .pipeThrough( - rpcUtilsMiddleware.binaryToJsonMessageStream( - rpcUtils.parseJSONRPCMessage, - ), - ); // Converting back. - - const messagesParsed = await AsyncIterable.as(parsedStream).toArray(); - expect(messagesParsed).toEqual(messages); - }); + ); test.prop( { messages: fc.array( rpcTestUtils.jsonRpcRequestMessageArb(fc.string({ minLength: 100 })), - { - minLength: 1, - }, + { minLength: 1 }, ), }, { numRuns: 1000 }, - )('Message size limit is enforced when parsing', async ({ messages }) => { + )('message size limit is enforced when parsing', async ({ messages }) => { const parsedStream = rpcTestUtils .messagesToReadableStream(messages) .pipeThrough(rpcTestUtils.binaryStreamToSnippedStream([10])) @@ -91,7 +108,7 @@ describe('Middleware tests', () => { noise: noiseArb, }, { numRuns: 1000 }, - )('Will error on bad data', async ({ messages, snipPattern, noise }) => { + )('should error on bad data', async ({ messages, snipPattern, noise }) => { const parsedStream = rpcTestUtils .messagesToReadableStream(messages) .pipeThrough(rpcTestUtils.binaryStreamToSnippedStream(snipPattern)) // Imaginary internet here @@ -145,7 +162,7 @@ describe('Middleware tests', () => { messages: rpcTestUtils.jsonMessagesArb, timeout: fc.integer({ min: 1 }), })( - 'timeoutMiddlewareServer wont set ctx.timeout if timeout is higher', + 'timeoutMiddlewareServer will not set ctx.timeout if timeout is higher', async ({ messages, timeout }) => { if (messages[0].params == null) messages[0].params = {}; messages[0].params.metadata = { ...messages[0].params.metadata, timeout };