Skip to content

Commit

Permalink
fix: change of head, tail destructuring for raw handlers, head and ta…
Browse files Browse the repository at this point in the history
…il reconstruction via transform stream, and reverse pair propagation of cancellation event
  • Loading branch information
CMCDragonkai authored and tegefaulkes committed Mar 24, 2023
1 parent a2ae510 commit 8e19b76
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 63 deletions.
94 changes: 60 additions & 34 deletions src/rpc/RPCServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import type {
import type { ReadableWritablePair } from 'stream/web';
import type { JSONValue } from '../types';
import type { MiddlewareFactory } from './types';
import { TransformStream } from 'stream/web';
import { ReadableStream } from 'stream/web';
import { CreateDestroy, ready } from '@matrixai/async-init/dist/CreateDestroy';
import Logger from '@matrixai/logger';
Expand Down Expand Up @@ -191,14 +192,24 @@ class RPCServer extends EventTarget {
O extends JSONValue,
>(method: string, handler: DuplexHandlerImplementation<I, O>): void {
const rawSteamHandler: RawHandlerImplementation = (
[input, header],
[header, input],
connectionInfo,
ctx,
) => {
// Setting up middleware
const middleware = this.middlewareFactory(header);
const middleware = this.middlewareFactory();
// Forward from the client to the server
const forwardStream = input.pipeThrough(middleware.forward);
const headerStream = new TransformStream({
start(controller) {
controller.enqueue(Buffer.from(JSON.stringify(header)));
},
transform(chunk, controller) {
controller.enqueue(chunk);
},
});
const forwardStream = input
.pipeThrough(headerStream)
.pipeThrough(middleware.forward);
// Reverse from the server to the client
const reverseStream = middleware.reverse.writable;
// Generator derived from handler
Expand All @@ -210,17 +221,22 @@ class RPCServer extends EventTarget {
yield data.params as I;
}
};
for await (const response of handler(inputGen(), connectionInfo, ctx)) {
const handlerG = handler(inputGen(), connectionInfo, ctx);
for await (const response of handlerG) {
const responseMessage: JSONRPCResponseResult = {
jsonrpc: '2.0',
result: response,
id: null,
};
yield responseMessage;
try {
yield responseMessage;
} catch(e) {
// This catches any exceptions thrown into the reverse stream
await handlerG.throw(e);
}
}
};
const outputGenerator = outputGen();
let reason: any | undefined = undefined;
const reverseMiddlewareStream = new ReadableStream<JSONRPCResponse>({
pull: async (controller) => {
try {
Expand All @@ -231,39 +247,46 @@ class RPCServer extends EventTarget {
}
controller.enqueue(value);
} catch (e) {
if (reason == null) {
// We want to convert this error to an error message and pass it along
const rpcError: JSONRPCError = {
code: e.exitCode ?? sysexits.UNKNOWN,
message: e.description ?? '',
data: rpcUtils.fromError(e, this.sensitive),
};
const rpcErrorMessage: JSONRPCResponseError = {
jsonrpc: '2.0',
error: rpcError,
id: null,
};
controller.enqueue(rpcErrorMessage);
} else {
// These errors are emitted to the event system
// This contains the original error from enqueuing
this.dispatchEvent(
new rpcEvents.RPCErrorEvent({
detail: new rpcErrors.ErrorRPCSendErrorFailed(undefined, {
cause: [e, reason],
}),
}),
);
}
const rpcError: JSONRPCError = {
code: e.exitCode ?? sysexits.UNKNOWN,
message: e.description ?? '',
data: rpcUtils.fromError(e, this.sensitive),
};
const rpcErrorMessage: JSONRPCResponseError = {
jsonrpc: '2.0',
error: rpcError,
id: null,
};
controller.enqueue(rpcErrorMessage);
await forwardStream.cancel(
new rpcErrors.ErrorRPCHandlerFailed('Error clean up'),
);
controller.close();
}
},
cancel: async (_reason) => {
reason = _reason;
await outputGenerator.throw(_reason);
cancel: async (reason) => {
try {
// Throw the reason into the reverse stream
await outputGenerator.throw(reason);
} catch (e) {
// If the e is the same as the reason
// then the handler did not care about the reason
// and we just discard it
if (e !== reason) {
this.dispatchEvent(
new rpcEvents.RPCErrorEvent({
detail: new rpcErrors.ErrorRPCSendErrorFailed(
'Stream has been cancelled',
{
cause: e,
}
),
}),
);
}
}
// await outputGenerator.nexj
// handlerAbortController.abort(reason);
},
});
void reverseMiddlewareStream.pipeTo(reverseStream).catch(() => {});
Expand All @@ -281,6 +304,9 @@ class RPCServer extends EventTarget {
connectionInfo,
ctx,
) {
// The `input` is expected to be an async iterable with only 1 value.
// Unlike generators, there is no `next()` method.
// So we use `break` after the first iteration.
for await (const inputVal of input) {
yield await handler(inputVal, connectionInfo, ctx);
break;
Expand Down Expand Up @@ -369,7 +395,7 @@ class RPCServer extends EventTarget {
return;
}
const outputStream = handler(
[inputStream, leadingMetadataMessage],
[leadingMetadataMessage, inputStream],
connectionInfo,
{ signal: abortController.signal },
);
Expand Down
2 changes: 1 addition & 1 deletion src/rpc/handlers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ abstract class RawHandler<
Container extends ContainerType = ContainerType,
> extends Handler<Container> {
abstract handle(
input: [ReadableStream<Uint8Array>, JSONRPCRequest],
input: [JSONRPCRequest, ReadableStream<Uint8Array>],
connectionInfo: ConnectionInfo,
ctx: ContextCancellable,
): ReadableStream<Uint8Array>;
Expand Down
15 changes: 13 additions & 2 deletions src/rpc/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ type HandlerImplementation<I, O> = (
) => O;

type RawHandlerImplementation = HandlerImplementation<
[ReadableStream<Uint8Array>, JSONRPCRequest],
[JSONRPCRequest, ReadableStream<Uint8Array>],
ReadableStream<Uint8Array>
>;

Expand Down Expand Up @@ -212,7 +212,18 @@ type StreamFactory = () => Promise<
ReadableWritablePair<Uint8Array, Uint8Array>
>;

type MiddlewareFactory<FR, FW, RR, RW> = (header?: JSONRPCRequest) => {
/**
* Middleware factory creates middlewares.
* Each middleware is a pair of forward and reverse.
* Each forward and reverse is a `ReadableWritablePair`.
* The forward pair is used transform input from client to server.
* The reverse pair is used to transform output from server to client.
* FR, FW is the readable and writable types of the forward pair.
* RR, RW is the readable and writable types of the reverse pair.
* FW -> FR is the direction of data flow from client to server.
* RW -> RR is the direction of data flow from server to client.
*/
type MiddlewareFactory<FR, FW, RR, RW> = () => {
forward: ReadableWritablePair<FR, FW>;
reverse: ReadableWritablePair<RR, RW>;
};
Expand Down
35 changes: 12 additions & 23 deletions src/rpc/utils/middleware.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,10 @@ const jsonStreamParsers = require('@streamparser/json');
* specific type of message
* @param byteLimit - sets the number of bytes buffered before throwing an
* error. This is used to avoid infinitely buffering the input.
* @param firstMessage - This is a single message that is inserted into the
* front of the stream.
*/
function binaryToJsonMessageStream<T extends JSONRPCMessage>(
messageParser: (message: unknown) => T,
byteLimit: number = 1024 * 1024,
firstMessage?: T,
): TransformStream<Uint8Array, T> {
const parser = new jsonStreamParsers.JSONParser({
separator: '',
Expand All @@ -43,7 +40,6 @@ function binaryToJsonMessageStream<T extends JSONRPCMessage>(
await waitP.p;
},
start: (controller) => {
if (firstMessage != null) controller.enqueue(firstMessage);
parser.onValue = (value) => {
const jsonMessage = messageParser(value.value);
controller.enqueue(jsonMessage);
Expand Down Expand Up @@ -84,17 +80,12 @@ function jsonMessageToBinaryStream(): TransformStream<
* This function is a factory for creating a pass-through streamPair. It is used
* as the default middleware for the middleware wrappers.
*/
const defaultMiddleware: MiddlewareFactory<
JSONRPCRequest,
JSONRPCRequest,
JSONRPCResponse,
JSONRPCResponse
> = () => {
function defaultMiddleware() {
return {
forward: new TransformStream(),
reverse: new TransformStream(),
forward: new TransformStream<JSONRPCRequest, JSONRPCRequest>(),
reverse: new TransformStream<JSONRPCResponse, JSONRPCResponse>(),
};
};
}

/**
* This convenience factory for creating wrapping middleware with the basic
Expand All @@ -103,28 +94,26 @@ const defaultMiddleware: MiddlewareFactory<
* JsonRPCMessages and pipe it through the provided middleware.
* The reverse path will pipe the output stream through the provided middleware
* and then transform it back to a binary stream.
* @param middleware - The provided middleware
* @param middlewareFactory - The provided middleware
*/
const defaultServerMiddlewareWrapper = (
middleware: MiddlewareFactory<
function defaultServerMiddlewareWrapper(
middlewareFactory: MiddlewareFactory<
JSONRPCRequest,
JSONRPCRequest,
JSONRPCResponse,
JSONRPCResponse
> = defaultMiddleware,
) => {
return (header: JSONRPCRequest) => {
): MiddlewareFactory<JSONRPCRequest, Uint8Array, Uint8Array, JSONRPCResponse> {
return () => {
const inputTransformStream = binaryToJsonMessageStream(
rpcUtils.parseJSONRPCRequest,
undefined,
header,
);
const outputTransformStream = new TransformStream<
JSONRPCResponseResult,
JSONRPCResponseResult
>();

const middleMiddleware = middleware(header);
const middleMiddleware = middlewareFactory();

const forwardReadable = inputTransformStream.readable.pipeThrough(
middleMiddleware.forward,
Expand All @@ -144,7 +133,7 @@ const defaultServerMiddlewareWrapper = (
},
};
};
};
}

/**
* This convenience factory for creating wrapping middleware with the basic
Expand All @@ -171,7 +160,7 @@ const defaultClientMiddlewareWrapper = (
return () => {
const outputTransformStream = binaryToJsonMessageStream(
rpcUtils.parseJSONRPCResponse,
undefined,
// Undefined,
);
const inputTransformStream = new TransformStream<
JSONRPCRequest,
Expand Down
4 changes: 2 additions & 2 deletions tests/rpc/RPC.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ describe('RPC', () => {
let header: JSONRPCRequest | undefined;
class TestMethod extends RawHandler {
public handle(
input: [ReadableStream<Uint8Array>, JSONRPCRequest],
input: [JSONRPCRequest, ReadableStream<Uint8Array>],
): ReadableStream<Uint8Array> {
const [stream, header_] = input;
const [header_, stream] = input;
header = header_;
return stream;
}
Expand Down
2 changes: 1 addition & 1 deletion tests/rpc/RPCServer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ describe(`${RPCServer.name}`, () => {
rpcTestUtils.binaryStreamToSnippedStream([4, 7, 13, 2, 6]),
);
class TestHandler extends RawHandler {
public handle([input, _header]): ReadableStream<Uint8Array> {
public handle([_header, input]): ReadableStream<Uint8Array> {
void (async () => {
for await (const _ of input) {
// No touch, only consume
Expand Down

0 comments on commit 8e19b76

Please sign in to comment.