Skip to content

Commit

Permalink
feat: fixed random message skipping
Browse files Browse the repository at this point in the history
  • Loading branch information
aryanjassal committed Dec 2, 2024
1 parent 3c39ecd commit ddab967
Show file tree
Hide file tree
Showing 3 changed files with 161 additions and 157 deletions.
13 changes: 10 additions & 3 deletions src/RPCServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -424,16 +424,20 @@ class RPCServer {
meta,
ctx,
) {
// const iterator = input[Symbol.asyncIterator]();
// const inputVal = await iterator.next();
// yield await handler(inputVal.value, cancel, meta, ctx);

// The `input` is expected to be an async iterable with only 1 value.
// Unlike generators, there is no `next()` method.
// So we use `break` after the first iteration.
for await (const inputVal of input) {
yield await handler(inputVal, cancel, meta, ctx);
break;
}
for await (const _ of input) {
// Noop so that stream can close after flushing
}

// Noop so that stream can close after flushing
for await (const _ of input);
};
this.registerDuplexStreamHandler(method, wrapperDuplex, timeout);
}
Expand All @@ -452,6 +456,9 @@ class RPCServer {
meta,
ctx,
) {
// const iterator = input[Symbol.asyncIterator]();
// const inputVal = await iterator.next();
// yield* handler(inputVal.value, cancel, meta, ctx);
for await (const inputVal of input) {
yield* handler(inputVal, cancel, meta, ctx);
break;
Expand Down
77 changes: 37 additions & 40 deletions src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import type {
PromiseDeconstructed,
ToError,
} from './types';
import { TransformStream } from 'stream/web';
import { ReadableStream, TransformStream } from 'stream/web';
import { JSONParser } from '@streamparser/json';
import { AbstractError } from '@matrixai/errors';
import * as errors from './errors';
Expand Down Expand Up @@ -494,47 +494,44 @@ function parseHeadStream<T extends JSONRPCMessage>(
const endP = promise();
parser.onEnd = () => endP.resolveP();

return new TransformStream<Uint8Array, T | Uint8Array>(
{
flush: async () => {
if (!parser.isEnded) parser.end();
await endP.p;
},
start: (controller) => {
parser.onValue = async (value) => {
const jsonMessage = messageParser(value.value);
controller.enqueue(jsonMessage);
bytesWritten = 0;
parsing = false;
};
},
transform: async (chunk, controller) => {
if (parsing) {
try {
bytesWritten += chunk.byteLength;
parser.write(chunk);
} catch (e) {
throw new errors.ErrorRPCParse(undefined, {
cause: e,
});
}
if (bytesWritten > bufferByteLimit) {
throw new errors.ErrorRPCMessageLength();
}
} else {
// Wait for parser to end
if (!ended) {
parser.end();
await endP.p;
ended = true;
}
// Pass through normal chunks
controller.enqueue(chunk);
return new TransformStream<Uint8Array, T | Uint8Array>({
flush: async () => {
if (!parser.isEnded) parser.end();
await endP.p;
},
start: (controller) => {
parser.onValue = async (value) => {
const jsonMessage = messageParser(value.value);
controller.enqueue(jsonMessage);
bytesWritten = 0;
parsing = false;
};
},
transform: async (chunk, controller) => {
if (parsing) {
try {
bytesWritten += chunk.byteLength;
parser.write(chunk);
} catch (e) {
throw new errors.ErrorRPCParse(undefined, {
cause: e,
});
}
},
if (bytesWritten > bufferByteLimit) {
throw new errors.ErrorRPCMessageLength();
}
} else {
// Wait for parser to end
if (!ended) {
parser.end();
await endP.p;
ended = true;
}
// Pass through normal chunks
controller.enqueue(chunk);
}
},
{ highWaterMark: 1 },
);
});
}

function never(): never {
Expand Down
228 changes: 114 additions & 114 deletions tests/RPCServer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -641,120 +641,120 @@ describe(`${RPCServer.name}`, () => {
expect(ctx.signal.reason).toBe(readerReason);
await rpcServer.stop({ force: true });
});
// Temporarily commenting out to allow the CI to make a release
// test.prop({
// messages: specificMessageArb,
// })('forward middlewares', async ({ messages }) => {
// const stream = rpcTestUtils.messagesToReadableStream(messages);
// class TestMethod extends DuplexHandler {
// public handle = async function* (
// input: AsyncGenerator<JSONRPCRequestParams>,
// _cancel: (reason?: any) => void,
// _meta: Record<string, JSONValue> | undefined,
// _ctx: ContextTimed,
// ): AsyncGenerator<JSONRPCResponseResult> {
// yield* input;
// };
// }
// const middlewareFactory = rpcUtilsMiddleware.defaultServerMiddlewareWrapper(
// () => {
// return {
// forward: new TransformStream({
// transform: (chunk, controller) => {
// chunk.params = { value: 1 };
// controller.enqueue(chunk);
// },
// }),
// reverse: new TransformStream(),
// };
// },
// );
// const rpcServer = new RPCServer({
// middlewareFactory: middlewareFactory,
// logger,
// idGen,
// });
// await rpcServer.start({
// manifest: {
// testMethod: new TestMethod({}),
// },
// });
// const [outputResult, outputStream] = rpcTestUtils.streamToArray();
// const readWriteStream: RPCStream<Uint8Array, Uint8Array> = {
// cancel: () => {},
// readable: stream,
// writable: outputStream,
// };
// rpcServer.handleStream(readWriteStream);
// const out = await outputResult;
// expect(out.map((v) => v!.toString())).toStrictEqual(
// messages.map(() =>
// JSON.stringify({
// jsonrpc: '2.0',
// result: { value: 1 },
// id: null,
// }),
// ),
// );
// await rpcServer.stop({ force: true });
// });
// test.prop(
// {
// messages: specificMessageArb,
// },
// { numRuns: 1 },
// )('reverse middlewares', async ({ messages }) => {
// const stream = rpcTestUtils.messagesToReadableStream(messages);
// class TestMethod extends DuplexHandler {
// public handle = async function* (
// input: AsyncGenerator<JSONRPCRequestParams<{ value: number }>>,
// _cancel: (reason?: any) => void,
// _meta: Record<string, JSONValue> | undefined,
// _ctx: ContextTimed,
// ): AsyncGenerator<JSONRPCResponseResult<{ value: number }>> {
// yield* input;
// };
// }
// const middleware = rpcUtilsMiddleware.defaultServerMiddlewareWrapper(() => {
// return {
// forward: new TransformStream(),
// reverse: new TransformStream({
// transform: (chunk, controller) => {
// if ('result' in chunk) chunk.result = { value: 1 };
// controller.enqueue(chunk);
// },
// }),
// };
// });
// const rpcServer = new RPCServer({
// middlewareFactory: middleware,
// logger,
// idGen,
// });
// await rpcServer.start({
// manifest: {
// testMethod: new TestMethod({}),
// },
// });
// const [outputResult, outputStream] = rpcTestUtils.streamToArray();
// const readWriteStream: RPCStream<Uint8Array, Uint8Array> = {
// cancel: () => {},
// readable: stream,
// writable: outputStream,
// };
// rpcServer.handleStream(readWriteStream);
// const out = await outputResult;
// expect(out.map((v) => v!.toString())).toStrictEqual(
// messages.map(() =>
// JSON.stringify({
// jsonrpc: '2.0',
// result: { value: 1 },
// id: null,
// }),
// ),
// );
// await rpcServer.stop({ force: true });
// });
test.prop({
messages: specificMessageArb,
})('forward middlewares', async ({ messages }) => {
const stream = rpcTestUtils.messagesToReadableStream(messages);
class TestMethod extends DuplexHandler {
public handle = async function* (
input: AsyncGenerator<JSONRPCRequestParams>,
_cancel: (reason?: any) => void,
_meta: Record<string, JSONValue> | undefined,
_ctx: ContextTimed,
): AsyncGenerator<JSONRPCResponseResult> {
yield* input;
};
}
const middlewareFactory = rpcUtilsMiddleware.defaultServerMiddlewareWrapper(
() => {
return {
forward: new TransformStream({
transform: (chunk, controller) => {
chunk.params = { value: 1 };
controller.enqueue(chunk);
},
}),
reverse: new TransformStream(),
};
},
);
const rpcServer = new RPCServer({
middlewareFactory: middlewareFactory,
logger,
idGen,
});
await rpcServer.start({
manifest: {
testMethod: new TestMethod({}),
},
});
const [outputResult, outputStream] = rpcTestUtils.streamToArray();
const readWriteStream: RPCStream<Uint8Array, Uint8Array> = {
cancel: () => {},
readable: stream,
writable: outputStream,
};
rpcServer.handleStream(readWriteStream);
const out = await outputResult;
expect(out.map((v) => v!.toString())).toStrictEqual(
messages.map(() =>
JSON.stringify({
jsonrpc: '2.0',
result: { value: 1 },
id: null,
}),
),
);
await rpcServer.stop({ force: true });
});
test.prop(
{
messages: specificMessageArb,
},
// { numRuns: 1 },
{ seed: 1292472631, path: "0", endOnFailure: true }
)('reverse middlewares', async ({ messages }) => {
const stream = rpcTestUtils.messagesToReadableStream(messages);
class TestMethod extends DuplexHandler {
public handle = async function* (
input: AsyncGenerator<JSONRPCRequestParams<{ value: number }>>,
_cancel: (reason?: any) => void,
_meta: Record<string, JSONValue> | undefined,
_ctx: ContextTimed,
): AsyncGenerator<JSONRPCResponseResult<{ value: number }>> {
yield* input;
};
}
const middleware = rpcUtilsMiddleware.defaultServerMiddlewareWrapper(() => {
return {
forward: new TransformStream(),
reverse: new TransformStream({
transform: (chunk, controller) => {
if ('result' in chunk) chunk.result = { value: 1 };
controller.enqueue(chunk);
},
}),
};
});
const rpcServer = new RPCServer({
middlewareFactory: middleware,
logger,
idGen,
});
await rpcServer.start({
manifest: {
testMethod: new TestMethod({}),
},
});
const [outputResult, outputStream] = rpcTestUtils.streamToArray();
const readWriteStream: RPCStream<Uint8Array, Uint8Array> = {
cancel: () => {},
readable: stream,
writable: outputStream,
};
rpcServer.handleStream(readWriteStream);
const out = await outputResult;
expect(out.map((v) => v!.toString())).toStrictEqual(
messages.map(() =>
JSON.stringify({
jsonrpc: '2.0',
result: { value: 1 },
id: null,
}),
),
);
await rpcServer.stop({ force: true });
});
test.prop({
message: invalidTokenMessageArb,
})('forward middleware authentication', async ({ message }) => {
Expand Down

0 comments on commit ddab967

Please sign in to comment.