Skip to content

Commit

Permalink
chore: cleaned up code
Browse files Browse the repository at this point in the history
  • Loading branch information
aryanjassal committed Dec 6, 2024
1 parent 964c3f6 commit 34b8b86
Show file tree
Hide file tree
Showing 5 changed files with 143 additions and 141 deletions.
28 changes: 10 additions & 18 deletions src/RPCServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,7 @@ class RPCServer {
.pipeTo(passthroughTransform.writable)
// Ignore any errors here, we only care that it ended
.catch(() => {});
const cleanup = async (reason: any) => {
const cleanUp = async (reason: any) => {
// Release resources
await transformStream.readable.cancel(reason);
await transformStream.writable.abort(reason);
Expand Down Expand Up @@ -564,7 +564,7 @@ class RPCServer {
'Timed out waiting for header',
{ cause: new errors.ErrorRPCStreamEnded() },
);
await cleanup(err);
await cleanUp(err);
this.dispatchEvent(
new events.RPCErrorEvent({
detail: new errors.ErrorRPCTimedOut(
Expand All @@ -579,7 +579,7 @@ class RPCServer {
}
if (headerMessage.done) {
const err = new errors.ErrorMissingHeader('Missing header');
await cleanup(err);
await cleanUp(err);
this.dispatchEvent(
new events.RPCErrorEvent({
detail: new errors.ErrorRPCOutputStreamError(),
Expand All @@ -589,7 +589,7 @@ class RPCServer {
}
if (headerMessage.value instanceof Uint8Array) {
const err = new errors.ErrorRPCParse('Invalid message type');
await cleanup(err);
await cleanUp(err);
this.dispatchEvent(
new events.RPCErrorEvent({
detail: new errors.ErrorRPCParse(),
Expand All @@ -600,11 +600,13 @@ class RPCServer {
const method = headerMessage.value.method;
const handler = this.handlerMap.get(method);
if (handler == null) {
await cleanup(new errors.ErrorRPCHandlerFailed('Missing handler'));
await cleanUp(
new errors.ErrorRPCHandlerFailed(`Missing handler for ${method}`),
);
return;
}
if (abortController.signal.aborted) {
await cleanup(
await cleanUp(
new errors.ErrorHandlerAborted('Aborted', {
cause: new errors.ErrorHandlerAborted(),
}),
Expand All @@ -622,22 +624,12 @@ class RPCServer {
timer.refresh();
}
}
// // Set up a wrapper ReadableStream of the correct type
// const binaryReadableStream = new ReadableStream<Uint8Array>({
// pull: async (controller) => {
// for await (const chunk of transformStream.readable) {
// // The transformStream is guaranteed to return binary data after
// // sending the header message;
// if (!(chunk instanceof Uint8Array)) utils.never();
// controller.enqueue(chunk);
// }
// controller.close();
// },
// });
this.logger.info(`Handling stream with method (${method})`);
let handlerResult: [JSONObject | undefined, ReadableStream<Uint8Array>];
const headerWriter = rpcStream.writable.getWriter();
try {
// The as keyword is used here as the middleware will only return the
// first message as a JSONMessage, and others as raw Uint8Arrays.
handlerResult = await handler(
[
headerMessage.value,
Expand Down
25 changes: 18 additions & 7 deletions src/middleware.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,27 +83,38 @@ function binaryToJsonHeaderMessageStream<T extends JSONRPCMessage>(
let bytesWritten: number = 0;
let accumulator = Buffer.alloc(0);
let rawStream = false;
let parserEnded = false;

return new TransformStream<Uint8Array, T | Uint8Array>({
flush: async () => {
// Avoid potential race conditions by allowing parser to end first
const waitP = utils.promise();
parser.onEnd = () => waitP.resolveP();
parser.end();
await waitP.p;
if (!parserEnded) {
// Avoid potential race conditions by allowing parser to end first
const waitP = utils.promise();
parser.onEnd = () => waitP.resolveP();
parser.end();
await waitP.p;
}
},
start: (controller) => {
parser.onValue = (value) => {
parser.onValue = async (value) => {
// Enqueue the regular JSON message
const jsonMessage = messageParser(value.value);
controller.enqueue(jsonMessage);
// Remove the header message from the accumulated data
const headerLength = Buffer.from(JSON.stringify(jsonMessage)).length;
const headerLength = Buffer.from(
JSON.stringify(jsonMessage),
).byteLength;
accumulator = accumulator.subarray(headerLength);
if (accumulator.length > 0) controller.enqueue(accumulator);
// End the parser as we no longer need it
const waitP = utils.promise();
parser.onEnd = () => waitP.resolveP();
parser.end();
await waitP.p;
// Set system state
bytesWritten = 0;
rawStream = true;
parserEnded = true;
};
},
transform: (chunk, controller) => {
Expand Down
75 changes: 39 additions & 36 deletions src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -494,44 +494,47 @@ function parseHeadStream<T extends JSONRPCMessage>(
const endP = promise();
parser.onEnd = () => endP.resolveP();

return new TransformStream<Uint8Array, T | Uint8Array>({
flush: async () => {
if (!parser.isEnded) parser.end();
await endP.p;
},
start: (controller) => {
parser.onValue = async (value) => {
const jsonMessage = messageParser(value.value);
controller.enqueue(jsonMessage);
bytesWritten = 0;
parsing = false;
};
},
transform: async (chunk, controller) => {
if (parsing) {
try {
bytesWritten += chunk.byteLength;
parser.write(chunk);
} catch (e) {
throw new errors.ErrorRPCParse(undefined, {
cause: e,
});
}
if (bytesWritten > bufferByteLimit) {
throw new errors.ErrorRPCMessageLength();
}
} else {
// Wait for parser to end
if (!ended) {
parser.end();
await endP.p;
ended = true;
return new TransformStream<Uint8Array, T | Uint8Array>(
{
flush: async () => {
if (!parser.isEnded) parser.end();
await endP.p;
},
start: (controller) => {
parser.onValue = async (value) => {
const jsonMessage = messageParser(value.value);
controller.enqueue(jsonMessage);
bytesWritten = 0;
parsing = false;
};
},
transform: async (chunk, controller) => {
if (parsing) {
try {
bytesWritten += chunk.byteLength;
parser.write(chunk);
} catch (e) {
throw new errors.ErrorRPCParse(undefined, {
cause: e,
});
}
if (bytesWritten > bufferByteLimit) {
throw new errors.ErrorRPCMessageLength();
}
} else {
// Wait for parser to end
if (!ended) {
parser.end();
await endP.p;
ended = true;
}
// Pass through normal chunks
controller.enqueue(chunk);
}
// Pass through normal chunks
controller.enqueue(chunk);
}
},
},
});
{ highWaterMark: 1 },
);
}

function never(): never {
Expand Down
67 changes: 32 additions & 35 deletions tests/RPCClient.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1152,39 +1152,38 @@ describe('RPCClient tests', () => {
await ctx.timer.catch(() => {});
},
);
// Disabled to get feedback.
// test.prop({ timeouts: rpcTestUtils.timeoutsArb })(
// 'calling with ctx should override lower timeout of RPCClient',
// async ({ timeouts: [lowerTimeoutTime, higherTimeoutTime] }) => {
// const streamPair: RPCStream<Uint8Array, Uint8Array> = {
// cancel: () => {},
// meta: undefined,
// readable: new ReadableStream(),
// writable: new WritableStream(),
// };
// const { p: ctxP, resolveP: resolveCtxP } = promise<ContextTimed>();
// const rpcClient = new RPCClient({
// manifest: {},
// streamFactory: async (ctx) => {
// resolveCtxP(ctx);
// return streamPair;
// },
// logger,
// idGen,
// timeoutTime: lowerTimeoutTime,
// });
// await rpcClient.duplexStreamCaller<
// JSONRPCRequestParams,
// JSONRPCResponseResult
// >(methodName, {
// timer: higherTimeoutTime,
// });
// const ctx = await ctxP;
// expect(ctx.timer.delay).toBe(higherTimeoutTime);
// ctx.timer.cancel();
// await ctx.timer.catch(() => {});
// },
// );
test.prop({ timeouts: rpcTestUtils.timeoutsArb })(
'calling with ctx should override lower timeout of RPCClient',
async ({ timeouts: [lowerTimeoutTime, higherTimeoutTime] }) => {
const streamPair: RPCStream<Uint8Array, Uint8Array> = {
cancel: () => {},
meta: undefined,
readable: new ReadableStream(),
writable: new WritableStream(),
};
const { p: ctxP, resolveP: resolveCtxP } = promise<ContextTimed>();
const rpcClient = new RPCClient({
manifest: {},
streamFactory: async (ctx) => {
resolveCtxP(ctx);
return streamPair;
},
logger,
idGen,
timeoutTime: lowerTimeoutTime,
});
await rpcClient.duplexStreamCaller<
JSONRPCRequestParams,
JSONRPCResponseResult
>(methodName, {
timer: higherTimeoutTime,
});
const ctx = await ctxP;
expect(ctx.timer.delay).toBe(higherTimeoutTime);
ctx.timer.cancel();
await ctx.timer.catch(() => {});
},
);
test.prop({ timeoutTime: fc.integer({ min: 0 }) })(
'calling with ctx should override lower timeout of RPCClient with Infinity',
async ({ timeoutTime }) => {
Expand Down Expand Up @@ -1217,8 +1216,6 @@ describe('RPCClient tests', () => {
await ctx.timer.catch(() => {});
},
);
// Given the message, I feel that the test should be split into two separate
// tests.
test.prop({ messages: specificMessageArb }, { numRuns: 1 })(
'ctx should be provided to the middleware and the middleware should be able to reset the timer',
async ({ messages }) => {
Expand Down
89 changes: 44 additions & 45 deletions tests/RPCServer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1074,51 +1074,50 @@ describe('RPCServer', () => {
await ctx.timer.catch(() => {});
},
);
// Get feedback on this.
// test.prop({
// messages: specificMessageArb,
// timeouts: rpcTestUtils.timeoutsArb,
// })(
// 'handlers should be able to override lower timeout of RPCServer',
// async ({ messages, timeouts: [lowerTimeoutTime, higherTimeoutTime] }) => {
// const stream = rpcTestUtils.messagesToReadableStream(messages);
// const { p: ctxP, resolveP: resolveCtxP } = promise<ContextTimed>();
// class TestMethod extends DuplexHandler {
// public timeout = higherTimeoutTime;
// public handle = async function* (
// input: AsyncGenerator<JSONRPCRequestParams>,
// _cancel: (reason?: any) => void,
// _meta: Record<string, JSONValue> | undefined,
// ctx: ContextTimed,
// ): AsyncGenerator<JSONRPCResponseResult> {
// resolveCtxP(ctx);
// yield* input;
// };
// }
// const rpcServer = new RPCServer({
// logger,
// timeoutTime: lowerTimeoutTime,
// idGen,
// });
// await rpcServer.start({
// manifest: {
// testMethod: new TestMethod({}),
// },
// });
// const [outputResult, outputStream] = rpcTestUtils.streamToArray();
// const readWriteStream: RPCStream<Uint8Array, Uint8Array> = {
// cancel: () => {},
// readable: stream,
// writable: outputStream,
// };
// rpcServer.handleStream(readWriteStream);
// await outputResult;
// const ctx = await ctxP;
// expect(ctx.timer.delay).toBe(higherTimeoutTime);
// ctx.timer.cancel();
// await ctx.timer.catch(() => {});
// },
// );
test.prop({
messages: specificMessageArb,
timeouts: rpcTestUtils.timeoutsArb,
})(
'handlers should be able to override lower timeout of RPCServer',
async ({ messages, timeouts: [lowerTimeoutTime, higherTimeoutTime] }) => {
const stream = rpcTestUtils.messagesToReadableStream(messages);
const { p: ctxP, resolveP: resolveCtxP } = promise<ContextTimed>();
class TestMethod extends DuplexHandler {
public timeout = higherTimeoutTime;
public handle = async function* (
input: AsyncGenerator<JSONRPCRequestParams>,
_cancel: (reason?: any) => void,
_meta: Record<string, JSONValue> | undefined,
ctx: ContextTimed,
): AsyncGenerator<JSONRPCResponseResult> {
resolveCtxP(ctx);
yield* input;
};
}
const rpcServer = new RPCServer({
logger,
idGen,
timeoutTime: lowerTimeoutTime,
});
await rpcServer.start({
manifest: {
testMethod: new TestMethod({}),
},
});
const [outputResult, outputStream] = rpcTestUtils.streamToArray();
const readWriteStream: RPCStream<Uint8Array, Uint8Array> = {
cancel: () => {},
readable: stream,
writable: outputStream,
};
rpcServer.handleStream(readWriteStream);
await outputResult;
const ctx = await ctxP;
expect(ctx.timer.delay).toBe(higherTimeoutTime);
ctx.timer.cancel();
await ctx.timer.catch(() => {});
},
);
});
test.prop({
messages: specificMessageArb,
Expand Down

0 comments on commit 34b8b86

Please sign in to comment.