Skip to content

Commit

Permalink
wip: working on stuff
Browse files Browse the repository at this point in the history
  • Loading branch information
aryanjassal committed Dec 3, 2024
1 parent 123e7aa commit 4419604
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 24 deletions.
45 changes: 22 additions & 23 deletions src/RPCServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -510,36 +510,18 @@ class RPCServer {
.pipeTo(passthroughTransform.writable)
// Ignore any errors here, we only care that it ended
.catch(() => {});
const headerStream = await (async () => {
const reader = inputStream.getReader();
const header = await reader.read();
reader.releaseLock(); // Release the lock to allow other reads
// Header must be defined
if (header.value == null) {
// TEMP: actually define the header error
throw new errors.ErrorRPC('header is undefined');
}
// Return a stream to be piped through the head transform
return new ReadableStream<Uint8Array>({
start: async (controller) => {
controller.enqueue(header.value);
controller.close();
},
});
})();
// Feed in the single header value into the header stream
void headerStream.pipeTo(headTransformStream.writable);
const cleanUp = async (reason: any) => {
await inputStream.cancel(reason);
await rpcStream.writable.abort(reason);
await inputStreamEndProm;
timer.cancel(cleanupReason);
await timer.catch(() => {});
};
const reader = headTransformStream.readable.getReader();
const reader = inputStream.getReader();
console.log('about to read header message');
// Allows timing out when waiting for the first message
let headerMessage:
| ReadableStreamDefaultReadResult<JSONRPCRequest>
| ReadableStreamDefaultReadResult<Uint8Array>
| undefined
| void;
try {
Expand Down Expand Up @@ -570,6 +552,7 @@ class RPCServer {
}
// Downgrade back to the raw stream
reader.releaseLock();
console.log('read header message');
// There are 2 conditions where we just end here
// 1. The timeout timer resolves before the first message
// 2. the stream ends before the first message
Expand Down Expand Up @@ -601,7 +584,23 @@ class RPCServer {
);
return;
}
const method = headerMessage.value.method;
console.log('resulting header message', headerMessage);
const headerStream = new ReadableStream<Uint8Array>({
start: async (controller) => {
controller.enqueue(headerMessage!.value);
controller.close();
},
});
console.log('piping readable stream to head transform writable');
await headerStream.pipeTo(headTransformStream.writable);
console.log('piping finished');
// Read the transformed header message
const transformedReader = headTransformStream.readable.getReader();
const transformedHeaderMessage = (await transformedReader.read()).value;
if (transformedHeaderMessage == null) utils.never();
console.log('got header properly', transformedHeaderMessage);
// Use the parsed header message
const method = transformedHeaderMessage.method;
const handler = this.handlerMap.get(method);
if (handler == null) {
await cleanUp(new errors.ErrorRPCHandlerFailed('Missing handler'));
Expand Down Expand Up @@ -631,7 +630,7 @@ class RPCServer {
const headerWriter = rpcStream.writable.getWriter();
try {
handlerResult = await handler(
[headerMessage.value, inputStream],
[transformedHeaderMessage, inputStream],
rpcStream.cancel,
rpcStream.meta,
{ signal: abortController.signal, timer },
Expand Down
2 changes: 1 addition & 1 deletion src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import type {
PromiseDeconstructed,
ToError,
} from './types';
import { ReadableStream, TransformStream } from 'stream/web';
import { TransformStream } from 'stream/web';
import { JSONParser } from '@streamparser/json';
import { AbstractError } from '@matrixai/errors';
import * as errors from './errors';
Expand Down

0 comments on commit 4419604

Please sign in to comment.