diff --git a/src/RPCServer.ts b/src/RPCServer.ts index 853d03c..fce7efb 100644 --- a/src/RPCServer.ts +++ b/src/RPCServer.ts @@ -85,7 +85,6 @@ class RPCServer extends EventTarget { middlewareFactory = rpcUtilsMiddleware.defaultServerMiddlewareWrapper(), sensitive = false, handlerTimeoutTime = 60_000, // 1 minute - handlerTimeoutGraceTime = 2_000, // 2 seconds logger = new Logger(this.name), }: { manifest: ServerManifest; @@ -97,7 +96,6 @@ class RPCServer extends EventTarget { >; sensitive?: boolean; handlerTimeoutTime?: number; - handlerTimeoutGraceTime?: number; logger?: Logger; }): Promise { logger.info(`Creating ${this.name}`); @@ -106,7 +104,6 @@ class RPCServer extends EventTarget { middlewareFactory, sensitive, handlerTimeoutTime, - handlerTimeoutGraceTime, logger, }); logger.info(`Created ${this.name}`); @@ -116,7 +113,6 @@ class RPCServer extends EventTarget { protected handlerMap: Map = new Map(); protected defaultTimeoutMap: Map = new Map(); protected handlerTimeoutTime: number; - protected handlerTimeoutGraceTime: number; protected activeStreams: Set> = new Set(); protected sensitive: boolean; protected middlewareFactory: MiddlewareFactory< @@ -131,7 +127,6 @@ class RPCServer extends EventTarget { middlewareFactory, sensitive, handlerTimeoutTime = 60_000, // 1 minuet - handlerTimeoutGraceTime = 2_000, // 2 seconds logger, }: { manifest: ServerManifest; @@ -143,7 +138,6 @@ class RPCServer extends EventTarget { JSONRPCResponseResult >; handlerTimeoutTime?: number; - handlerTimeoutGraceTime?: number; sensitive: boolean; logger: Logger; }) { @@ -202,7 +196,6 @@ class RPCServer extends EventTarget { this.middlewareFactory = middlewareFactory; this.sensitive = sensitive; this.handlerTimeoutTime = handlerTimeoutTime; - this.handlerTimeoutGraceTime = handlerTimeoutGraceTime; this.logger = logger; } @@ -450,18 +443,15 @@ class RPCServer extends EventTarget { }, }); - // Grace timer is triggered with any abort signal. - // If grace timer completes then it will cause the RPCStream to end with // `RPCStream.cancel(reason)`. - let graceTimer: Timer | undefined; const handleAbort = () => { - const graceTimer = new Timer({ - delay: this.handlerTimeoutGraceTime, + const timer = new Timer({ + delay: this.handlerTimeoutTime, handler: () => { rpcStream.cancel(abortController.signal.reason); }, }); - void graceTimer + void timer .catch(() => {}) // Ignore cancellation error .finally(() => { abortController.signal.removeEventListener('abort', handleAbort); @@ -496,9 +486,7 @@ class RPCServer extends EventTarget { await rpcStream.writable.abort(reason); await inputStreamEndProm; timer.cancel(cleanupReason); - graceTimer?.cancel(cleanupReason); await timer.catch(() => {}); - await graceTimer?.catch(() => {}); }; // Read a single empty value to consume the first message const reader = headTransformStream.readable.getReader(); @@ -522,9 +510,7 @@ class RPCServer extends EventTarget { ); await inputStreamEndProm; timer.cancel(cleanupReason); - graceTimer?.cancel(cleanupReason); await timer.catch(() => {}); - await graceTimer?.catch(() => {}); this.dispatchEvent( new rpcEvents.RPCErrorEvent({ detail: new rpcErrors.ErrorRPCOutputStreamError( @@ -616,8 +602,6 @@ class RPCServer extends EventTarget { // Clean up and return timer.cancel(cleanupReason); abortController.signal.removeEventListener('abort', handleAbort); - graceTimer?.cancel(cleanupReason); - abortController.abort(new rpcErrors.ErrorRPCStreamEnded()); rpcStream.cancel(Error('TMP header message was an error')); return; } @@ -641,7 +625,6 @@ class RPCServer extends EventTarget { // Cleaning up abort and timer timer.cancel(cleanupReason); abortController.signal.removeEventListener('abort', handleAbort); - graceTimer?.cancel(cleanupReason); abortController.abort(new rpcErrors.ErrorRPCStreamEnded()); })(); const handlerProm = PromiseCancellable.from(prom, abortController).finally( diff --git a/tests/RPCServer.test.ts b/tests/RPCServer.test.ts index 41b698a..f71c987 100644 --- a/tests/RPCServer.test.ts +++ b/tests/RPCServer.test.ts @@ -1084,58 +1084,6 @@ describe(`${RPCServer.name}`, () => { await expect(ctx.timer).toReject(); await rpcServer.destroy(); }); - test('Timeout has a grace period before forcing the streams closed', async () => { - const ctxProm = promise(); - class TestHandler extends RawHandler { - public handle = async ( - input: [JSONRPCRequest, ReadableStream], - cancel: (reason?: any) => void, - meta: Record | undefined, - ctx: ContextTimed, - ): Promise<[JSONValue, ReadableStream]> => { - ctxProm.resolveP(ctx); - - return Promise.resolve([null, new ReadableStream()]); - }; - } - const rpcServer = await RPCServer.createRPCServer({ - manifest: { - testMethod: new TestHandler({}), - }, - handlerTimeoutTime: 50, - handlerTimeoutGraceTime: 100, - logger, - }); - const [, outputStream] = rpcTestUtils.streamToArray(); - const stream = rpcTestUtils.messagesToReadableStream([ - { - jsonrpc: '2.0', - method: 'testMethod', - params: null, - }, - { - jsonrpc: '2.0', - method: 'testMethod', - params: null, - }, - ]); - const cancelProm = promise(); - const readWriteStream: RPCStream = { - cancel: (reason) => cancelProm.resolveP(reason), - readable: stream, - writable: outputStream, - }; - rpcServer.handleStream(readWriteStream); - const ctx = await ctxProm.p; - await ctx.timer; - const then = Date.now(); - expect(ctx.signal.reason).toBeInstanceOf(rpcErrors.ErrorRPCTimedOut); - // Should end after grace period - await expect(cancelProm.p).resolves.toBeInstanceOf( - rpcErrors.ErrorRPCTimedOut, - ); - expect(Date.now() - then).toBeGreaterThanOrEqual(90); - }); testProp( 'middleware can update timeout timer', [specificMessageArb], @@ -1180,56 +1128,4 @@ describe(`${RPCServer.name}`, () => { expect(ctx.timer.delay).toBe(12345); }, ); - test('destroying the `RPCServer` sends an abort signal and closes connection', async () => { - const ctxProm = promise(); - class TestHandler extends RawHandler { - public handle = async ( - input: [JSONRPCRequest, ReadableStream], - _cancel: (reason?: any) => void, - _meta: Record | undefined, - ctx_: ContextTimed, - ): Promise<[JSONValue, ReadableStream]> => { - return new Promise((resolve) => { - ctxProm.resolveP(ctx_); - // Echo messages - return [null, input[1]]; - }); - }; - } - const rpcServer = await RPCServer.createRPCServer({ - manifest: { - testMethod: new TestHandler({}), - }, - handlerTimeoutGraceTime: 0, - logger, - }); - const [, outputStream] = rpcTestUtils.streamToArray(); - const message = Buffer.from( - JSON.stringify({ - jsonrpc: '2.0', - method: 'testMethod', - params: null, - }), - ); - const forwardStream = new TransformStream(); - const cancelProm = promise(); - const readWriteStream: RPCStream = { - cancel: (reason) => cancelProm.resolveP(reason), - readable: forwardStream.readable, - writable: outputStream, - }; - rpcServer.handleStream(readWriteStream); - const writer = forwardStream.writable.getWriter(); - await writer.write(message); - const ctx = await ctxProm.p; - void rpcServer.destroy(true).then( - () => {}, - () => {}, - ); - await expect(cancelProm.p).resolves.toBeInstanceOf( - rpcErrors.ErrorRPCStopping, - ); - expect(ctx.signal.reason).toBeInstanceOf(rpcErrors.ErrorRPCStopping); - await writer.close(); - }); });