diff --git a/src/RPCServer.ts b/src/RPCServer.ts index fbf9161..8226009 100644 --- a/src/RPCServer.ts +++ b/src/RPCServer.ts @@ -287,7 +287,9 @@ class RPCServer { // Input generator derived from the forward stream const inputGen = async function* (): AsyncIterable { for await (const data of forwardStream) { - ctx.timer.refresh(); + if (ctx.timer.status !== 'settled') { + ctx.timer.refresh(); + } yield data.params as I; } }; @@ -296,7 +298,9 @@ class RPCServer { timer: ctx.timer, }); for await (const response of handlerG) { - ctx.timer.refresh(); + if (ctx.timer.status !== 'settled') { + ctx.timer.refresh(); + } const responseMessage: JSONRPCResponseResult = { jsonrpc: '2.0', result: response, @@ -570,13 +574,16 @@ class RPCServer { } // Setting up Timeout logic const timeout = this.defaultTimeoutMap.get(method); - if (timeout != null && timeout < this.handlerTimeoutTime) { - // Reset timeout with new delay if it is less than the default - timer.reset(timeout); - } else { - // Otherwise refresh - timer.refresh(); + if (timer.status !== 'settled') { + if (timeout != null && timeout < this.handlerTimeoutTime) { + // Reset timeout with new delay if it is less than the default + timer.reset(timeout); + } else { + // Otherwise refresh + timer.refresh(); + } } + this.logger.info(`Handling stream with method (${method})`); let handlerResult: [JSONValue | undefined, ReadableStream]; const headerWriter = rpcStream.writable.getWriter(); diff --git a/src/errors.ts b/src/errors.ts index ae8958a..4a1fd4b 100644 --- a/src/errors.ts +++ b/src/errors.ts @@ -148,6 +148,13 @@ class ErrorRPCStreamEnded extends ErrorRPCProtocol { class ErrorRPCTimedOut extends ErrorRPCProtocol { static description = 'RPC handler has timed out'; code = JSONRPCErrorCode.RPCTimedOut; + public toJSON(): JSONRPCError { + const json = super.toJSON(); + if (typeof json === 'object' && !Array.isArray(json)) { + (json as POJO).type = this.constructor.name; + } + return json; + } } class ErrorUtilsUndefinedBehaviour extends ErrorRPCProtocol { diff --git a/src/middleware.ts b/src/middleware.ts index 4ab2226..d7cbb0b 100644 --- a/src/middleware.ts +++ b/src/middleware.ts @@ -278,9 +278,9 @@ const defaultClientMiddlewareWrapper = ( export { binaryToJsonMessageStream, jsonMessageToBinaryStream, + timeoutMiddlewareClient, + timeoutMiddlewareServer, defaultMiddleware, defaultServerMiddlewareWrapper, defaultClientMiddlewareWrapper, - timeoutMiddlewareClient, - timeoutMiddlewareServer, }; diff --git a/src/types.ts b/src/types.ts index 8972905..904184a 100644 --- a/src/types.ts +++ b/src/types.ts @@ -110,7 +110,12 @@ type JSONRPCResponseError = { id: string | number | null; }; -type ObjectEmpty = NonNullable; +/** + * Used when an empty object is needed. + * Defined here with a linter override to avoid a false positive. + */ +// eslint-disable-next-line +type ObjectEmpty = {}; // Prevent overwriting the metadata type with `Omit<>` type JSONRPCRequestMetadata = ObjectEmpty> = diff --git a/src/utils.ts b/src/utils.ts index 82168db..6b0fedf 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -278,6 +278,7 @@ const standardErrors: { URIError, AggregateError, AbstractError, + ErrorRPCTimedOut: errors.ErrorRPCTimedOut, }; /** @@ -342,6 +343,7 @@ function toError( let e: Error; switch (eClass) { case AbstractError: + case errors.ErrorRPCTimedOut: e = eClass.fromJSON(errorData); break; case AggregateError: diff --git a/tests/RPC.test.ts b/tests/RPC.test.ts index 60fecbb..849728b 100644 --- a/tests/RPC.test.ts +++ b/tests/RPC.test.ts @@ -926,6 +926,70 @@ describe('RPC', () => { await rpcServer.stop({ force: true }); }); + testProp( + 'RPC client times out and server is able to ignore exception', + [fc.string()], + async (message) => { + // Setup server and client communication pairs + const { clientPair, serverPair } = rpcTestUtils.createTapPairs< + Uint8Array, + Uint8Array + >(); + const { p: ctxP, resolveP: resolveCtxP } = utils.promise(); + class TestMethod extends UnaryHandler { + public handle = async ( + input: JSONValue, + cancel: (reason?: any) => void, + meta: Record | undefined, + ctx: ContextTimed, + ): Promise => { + const abortProm = utils.promise(); + ctx.signal.addEventListener('abort', () => { + resolveCtxP(ctx); + abortProm.resolveP(ctx.signal.reason); + }); + await abortProm.p; + return input; + }; + } + // Set up a client and server with matching timeout settings + const rpcServer = new RPCServer({ + logger, + idGen, + handlerTimeoutTime: 150, + }); + await rpcServer.start({ + manifest: { + testMethod: new TestMethod({}), + }, + }); + rpcServer.handleStream({ + ...serverPair, + cancel: () => {}, + }); + + const rpcClient = new RPCClient({ + manifest: { + testMethod: new UnaryCaller(), + }, + streamFactory: async () => { + return { + ...clientPair, + cancel: () => {}, + }; + }, + logger, + idGen, + }); + await expect( + rpcClient.methods.testMethod(message, { timer: 100 }), + ).resolves.toBe(message); + await expect(ctxP).resolves.toHaveProperty(['timer', 'delay'], 100); + + await rpcServer.stop({ force: true }); + }, + { numRuns: 1 }, + ); testProp( 'RPC Serializes and Deserializes Error', [rpcTestUtils.errorArb(rpcTestUtils.errorArb())],