Skip to content

Commit

Permalink
fix: RPCServer handling output stream cancellation
Browse files Browse the repository at this point in the history
[ci skip]
  • Loading branch information
tegefaulkes committed Mar 24, 2023
1 parent 8e19b76 commit b0876c8
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 40 deletions.
49 changes: 18 additions & 31 deletions src/rpc/RPCServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -228,12 +228,7 @@ class RPCServer extends EventTarget {
result: response,
id: null,
};
try {
yield responseMessage;
} catch(e) {
// This catches any exceptions thrown into the reverse stream
await handlerG.throw(e);
}
yield responseMessage;
}
};
const outputGenerator = outputGen();
Expand All @@ -258,35 +253,27 @@ class RPCServer extends EventTarget {
id: null,
};
controller.enqueue(rpcErrorMessage);
await forwardStream.cancel(
new rpcErrors.ErrorRPCHandlerFailed('Error clean up'),
);
// Clean up the input stream here, ignore error if already ended
await forwardStream
.cancel(new rpcErrors.ErrorRPCHandlerFailed('Error clean up'))
.catch(() => {});
controller.close();
}
},
cancel: async (reason) => {
try {
// Throw the reason into the reverse stream
await outputGenerator.throw(reason);
} catch (e) {
// If the e is the same as the reason
// then the handler did not care about the reason
// and we just discard it
if (e !== reason) {
this.dispatchEvent(
new rpcEvents.RPCErrorEvent({
detail: new rpcErrors.ErrorRPCSendErrorFailed(
'Stream has been cancelled',
{
cause: e,
}
),
}),
);
}
}
// await outputGenerator.nexj
// handlerAbortController.abort(reason);
this.dispatchEvent(
new rpcEvents.RPCErrorEvent({
detail: new rpcErrors.ErrorRPCOutputStreamError(
'Stream has been cancelled',
{
cause: reason,
},
),
}),
);
// If the output stream path fails then we need to end the generator
// early.
await outputGenerator.return(undefined);
},
});
void reverseMiddlewareStream.pipeTo(reverseStream).catch(() => {});
Expand Down
6 changes: 3 additions & 3 deletions src/rpc/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ class ErrorRPCMissingResponse<T> extends ErrorRPC<T> {
exitCode = sysexits.UNAVAILABLE;
}

class ErrorRPCSendErrorFailed<T> extends ErrorRPC<T> {
static description = 'Failed to send error message';
class ErrorRPCOutputStreamError<T> extends ErrorRPC<T> {
static description = 'Output stream failed, unable to send data';
exitCode = sysexits.UNAVAILABLE;
}

Expand Down Expand Up @@ -102,6 +102,6 @@ export {
ErrorRPCHandlerFailed,
ErrorRPCMessageLength,
ErrorRPCMissingResponse,
ErrorRPCSendErrorFailed,
ErrorRPCOutputStreamError,
ErrorPolykeyRemote,
};
74 changes: 68 additions & 6 deletions tests/rpc/RPCServer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import {
UnaryHandler,
} from '@/rpc/handlers';
import * as middlewareUtils from '@/rpc/utils/middleware';
import { promise } from '@/utils';
import * as rpcTestUtils from './utils';

describe(`${RPCServer.name}`, () => {
Expand Down Expand Up @@ -454,13 +455,70 @@ describe(`${RPCServer.name}`, () => {
},
);
testProp(
'should emit stream error',
'should emit stream error if input stream fails',
[specificMessageArb],
async (messages) => {
const handlerEndedProm = promise();
class TestMethod extends DuplexHandler {
public async *handle(input): AsyncIterable<JSONValue> {
try {
for await (const _ of input) {
// Consume but don't yield anything
}
} finally {
handlerEndedProm.resolveP();
}
}
}
const rpcServer = await RPCServer.createRPCServer({
manifest: {
testMethod: new TestMethod({}),
},
logger,
});
let resolve;
rpcServer.addEventListener('error', (thing: RPCErrorEvent) => {
resolve(thing);
});
const passThroughStreamIn = new TransformStream<Uint8Array, Uint8Array>();
const [outputResult, outputStream] = rpcTestUtils.streamToArray<Buffer>();
const readWriteStream: ReadableWritablePair<Uint8Array, Uint8Array> = {
readable: passThroughStreamIn.readable,
writable: outputStream,
};
rpcServer.handleStream(readWriteStream, {} as ConnectionInfo);
const writer = passThroughStreamIn.writable.getWriter();
// Write messages
for (const message of messages) {
await writer.write(Buffer.from(JSON.stringify(message)));
}
// Abort stream
const writerReason = Symbol('writerAbort');
await writer.abort(writerReason);
// We should get an error RPC message
await expect(outputResult).toResolve();
const errorMessage = JSON.parse((await outputResult)[0].toString());
// Parse without error
rpcUtils.parseJSONRPCResponseError(errorMessage);
// Check that the handler was cleaned up.
await expect(handlerEndedProm.p).toResolve();
await rpcServer.destroy();
},
{ numRuns: 1 },
);
testProp.only(
'should emit stream error if output stream fails',
[specificMessageArb],
async (messages) => {
const handlerEndedProm = promise();
class TestMethod extends DuplexHandler {
public async *handle(input): AsyncIterable<JSONValue> {
// Echo input
yield* input;
try {
yield* input;
} finally {
handlerEndedProm.resolveP();
}
}
}
const rpcServer = await RPCServer.createRPCServer({
Expand Down Expand Up @@ -494,14 +552,18 @@ describe(`${RPCServer.name}`, () => {
await reader.read();
}
// Abort stream
const writerReason = Symbol('writerAbort');
// const writerReason = Symbol('writerAbort');
const readerReason = Symbol('readerAbort');
await writer.abort(writerReason);
// Await writer.abort(writerReason);
await reader.cancel(readerReason);
// We should get an error event
const event = await errorProm;
expect(event.detail.cause).toContain(writerReason);
expect(event.detail.cause).toContain(readerReason);
await writer.close();
// Expect(event.detail.cause).toContain(writerReason);
expect(event.detail).toBeInstanceOf(rpcErrors.ErrorRPCOutputStreamError);
expect(event.detail.cause).toBe(readerReason);
// Check that the handler was cleaned up.
await expect(handlerEndedProm.p).toResolve();
await rpcServer.destroy();
},
{ numRuns: 1 },
Expand Down

0 comments on commit b0876c8

Please sign in to comment.