diff --git a/src/utils/middleware.ts b/src/utils/middleware.ts index 138e04d..df85cc6 100644 --- a/src/utils/middleware.ts +++ b/src/utils/middleware.ts @@ -32,8 +32,7 @@ function binaryToJsonMessageStream( let bytesWritten: number = 0; return new TransformStream({ - flush: async () => { - // Avoid potential race conditions by allowing parser to end first + flush: async (controller) => { const waitP = promise(); parser.onEnd = () => waitP.resolveP(); parser.end(); @@ -42,11 +41,14 @@ function binaryToJsonMessageStream( start: (controller) => { parser.onValue = (value) => { const jsonMessage = messageParser(value.value); - controller.enqueue(jsonMessage); + const jsonMessageString = JSON.stringify(jsonMessage); + const jsonMessageWithNewline = jsonMessageString + '\n'; + const jsonMessageObject: T = JSON.parse(jsonMessageWithNewline) as T; + controller.enqueue(jsonMessageObject); bytesWritten = 0; }; }, - transform: (chunk) => { + transform: (chunk, controller) => { try { bytesWritten += chunk.byteLength; parser.write(chunk); diff --git a/tests/rpc/utils/middleware.test.ts b/tests/rpc/utils/middleware.test.ts index a995b8c..fd56a05 100644 --- a/tests/rpc/utils/middleware.test.ts +++ b/tests/rpc/utils/middleware.test.ts @@ -1,3 +1,5 @@ +import type { JSONRPCMessage, JSONValue } from '@/types'; +import { TransformStream } from 'stream/web'; import { fc, testProp } from '@fast-check/jest'; import { JSONParser } from '@streamparser/json'; import { AsyncIterableX as AsyncIterable } from 'ix/asynciterable'; @@ -16,19 +18,36 @@ describe('Middleware tests', () => { .noShrink(); testProp( - 'can parse json stream', + 'can parse json stream and are human readable', [rpcTestUtils.jsonMessagesArb], async (messages) => { + let captured = ''; + + const captureStream = new TransformStream< + JSONRPCMessage, + JSONRPCMessage + >({ + transform(chunk, controller) { + captured += JSON.stringify(chunk) + '\n'; // Assuming chunk can be serialized to JSON + controller.enqueue(chunk); + }, + }); + const parsedStream = rpcTestUtils .messagesToReadableStream(messages) .pipeThrough( rpcUtilsMiddleware.binaryToJsonMessageStream( rpcUtils.parseJSONRPCMessage, ), - ); // Converting back. + ) + .pipeThrough(captureStream); // Converting back. const asd = await AsyncIterable.as(parsedStream).toArray(); expect(asd).toEqual(messages); + + // Check readibility + + expect(captured).toContain('\n'); }, { numRuns: 1000 }, );