Skip to content

Commit

Permalink
* WIP - Newline now works, refers issue #1
Browse files Browse the repository at this point in the history
  • Loading branch information
addievo committed Sep 12, 2023
1 parent 6841d70 commit 21243e9
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 6 deletions.
10 changes: 6 additions & 4 deletions src/utils/middleware.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@ function binaryToJsonMessageStream<T extends JSONRPCMessage>(
let bytesWritten: number = 0;

return new TransformStream<Uint8Array, T>({
flush: async () => {
// Avoid potential race conditions by allowing parser to end first
flush: async (controller) => {
const waitP = promise();
parser.onEnd = () => waitP.resolveP();
parser.end();
Expand All @@ -42,11 +41,14 @@ function binaryToJsonMessageStream<T extends JSONRPCMessage>(
start: (controller) => {
parser.onValue = (value) => {
const jsonMessage = messageParser(value.value);
controller.enqueue(jsonMessage);
const jsonMessageString = JSON.stringify(jsonMessage);
const jsonMessageWithNewline = jsonMessageString + '\n';
const jsonMessageObject: T = JSON.parse(jsonMessageWithNewline) as T;

This comment has been minimized.

Copy link
@amydevs

amydevs Sep 14, 2023

Contributor

i don't think this is a good idea, JSON.parse(JSON.stringify(...)) is slow

controller.enqueue(jsonMessageObject);
bytesWritten = 0;
};
},
transform: (chunk) => {
transform: (chunk, controller) => {
try {
bytesWritten += chunk.byteLength;
parser.write(chunk);
Expand Down
23 changes: 21 additions & 2 deletions tests/rpc/utils/middleware.test.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import type { JSONRPCMessage, JSONValue } from '@/types';
import { TransformStream } from 'stream/web';
import { fc, testProp } from '@fast-check/jest';
import { JSONParser } from '@streamparser/json';
import { AsyncIterableX as AsyncIterable } from 'ix/asynciterable';
Expand All @@ -16,19 +18,36 @@ describe('Middleware tests', () => {
.noShrink();

testProp(
'can parse json stream',
'can parse json stream and are human readable',
[rpcTestUtils.jsonMessagesArb],
async (messages) => {
let captured = '';

const captureStream = new TransformStream<
JSONRPCMessage<JSONValue>,
JSONRPCMessage<JSONValue>
>({
transform(chunk, controller) {
captured += JSON.stringify(chunk) + '\n'; // Assuming chunk can be serialized to JSON
controller.enqueue(chunk);
},
});

const parsedStream = rpcTestUtils
.messagesToReadableStream(messages)
.pipeThrough(
rpcUtilsMiddleware.binaryToJsonMessageStream(
rpcUtils.parseJSONRPCMessage,
),
); // Converting back.
)
.pipeThrough(captureStream); // Converting back.

const asd = await AsyncIterable.as(parsedStream).toArray();
expect(asd).toEqual(messages);

// Check readibility

expect(captured).toContain('\n');
},
{ numRuns: 1000 },
);
Expand Down

0 comments on commit 21243e9

Please sign in to comment.