Skip to content

Commit

Permalink
Merge pull request #42 from MatrixAI/feature-middlware-composition
Browse files Browse the repository at this point in the history
feat: incorporate timeoutMiddleware to allow for server to utilise client's caller timeout
  • Loading branch information
amydevs authored Oct 30, 2023
2 parents 8434c70 + 8d38826 commit 31c7e0d
Show file tree
Hide file tree
Showing 12 changed files with 565 additions and 88 deletions.
39 changes: 39 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -899,6 +899,45 @@ main();

```
![img.png](images/unaryTest.png)

## Specifications

### Throwing Timeouts

By default, a timeout will not cause an RPC call to automatically throw, this must be manually done by the handler when it receives the abort signal from `ctx.signal`. An example of this is like so:

```ts
class TestMethod extends UnaryHandler {
public handle = async (
input: JSONValue,
cancel: (reason?: any) => void,
meta: Record<string, JSONValue> | undefined,
ctx: ContextTimed,
): Promise<JSONValue> => {
const abortProm = utils.promise<never>();
ctx.signal.addEventListener('abort', () => {
resolveCtxP(ctx);
abortProm.resolveP(ctx.signal.reason);
});
throw await abortProm.p;
};
}
```

### Timeout Middleware

The `timeoutMiddleware` sets an RPCServer's timeout based on the lowest timeout between the Client and the Server. This is so that handlers can eagerly time out and stop processing as soon as it is known that the client has timed out.

This case can be seen in the first diagram, where the server is able to stop the processing of the handler, and close the associated stream of the RPC call based on the shorter timeout sent by the client:

![RPCServer sets timeout based on RPCClient](images/timeoutMiddlewareClientTimeout.svg)

Where the `RPCClient` sends a timeout that is longer than that set on the `RPCServer`, it will be rejected. This is as the timeout of the client should never be expected to exceed that of the server, so that the server's timeout is an absolute limit.

![RPCServer rejects longer timeout sent by RPCClient](images/timeoutMiddlewareServerTimeout.svg)

The `timeoutMiddleware` is enabled by default, and uses the `.metadata.timeout` property on a JSON-RPC request object for the client to send it's timeout.

## Development

Run `nix-shell`, and once you're inside, you can use:
Expand Down
17 changes: 17 additions & 0 deletions images/timeoutMiddlewareClientTimeout.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
17 changes: 17 additions & 0 deletions images/timeoutMiddlewareServerTimeout.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
23 changes: 15 additions & 8 deletions src/RPCServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,9 @@ class RPCServer {
// Input generator derived from the forward stream
const inputGen = async function* (): AsyncIterable<I> {
for await (const data of forwardStream) {
ctx.timer.refresh();
if (ctx.timer.status !== 'settled') {
ctx.timer.refresh();
}
yield data.params as I;
}
};
Expand All @@ -296,7 +298,9 @@ class RPCServer {
timer: ctx.timer,
});
for await (const response of handlerG) {
ctx.timer.refresh();
if (ctx.timer.status !== 'settled') {
ctx.timer.refresh();
}
const responseMessage: JSONRPCResponseResult = {
jsonrpc: '2.0',
result: response,
Expand Down Expand Up @@ -570,13 +574,16 @@ class RPCServer {
}
// Setting up Timeout logic
const timeout = this.defaultTimeoutMap.get(method);
if (timeout != null && timeout < this.handlerTimeoutTime) {
// Reset timeout with new delay if it is less than the default
timer.reset(timeout);
} else {
// Otherwise refresh
timer.refresh();
if (timer.status !== 'settled') {
if (timeout != null && timeout < this.handlerTimeoutTime) {
// Reset timeout with new delay if it is less than the default
timer.reset(timeout);
} else {
// Otherwise refresh
timer.refresh();
}
}

this.logger.info(`Handling stream with method (${method})`);
let handlerResult: [JSONValue | undefined, ReadableStream<Uint8Array>];
const headerWriter = rpcStream.writable.getWriter();
Expand Down
7 changes: 7 additions & 0 deletions src/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,13 @@ class ErrorRPCStreamEnded<T> extends ErrorRPCProtocol<T> {
class ErrorRPCTimedOut<T> extends ErrorRPCProtocol<T> {
static description = 'RPC handler has timed out';
code = JSONRPCErrorCode.RPCTimedOut;
public toJSON(): JSONRPCError {
const json = super.toJSON();
if (typeof json === 'object' && !Array.isArray(json)) {
(json as POJO).type = this.constructor.name;
}
return json;
}
}

class ErrorUtilsUndefinedBehaviour<T> extends ErrorRPCProtocol<T> {
Expand Down
96 changes: 90 additions & 6 deletions src/middleware.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@ import type {
JSONRPCResponse,
JSONRPCResponseResult,
MiddlewareFactory,
JSONValue,
JSONRPCRequestMetadata,
JSONRPCResponseMetadata,
} from './types';
import type { ContextTimed } from '@matrixai/contexts';
import { TransformStream } from 'stream/web';
import { JSONParser } from '@streamparser/json';
import * as utils from './utils';
Expand Down Expand Up @@ -75,6 +79,80 @@ function jsonMessageToBinaryStream(): TransformStream<
});
}

function timeoutMiddlewareServer(
ctx: ContextTimed,
_cancel: (reason?: any) => void,
_meta: Record<string, JSONValue> | undefined,
) {
const currentTimeout = ctx.timer.delay;
// Flags for tracking if the first message has been processed
let forwardFirst = true;
return {
forward: new TransformStream<
JSONRPCRequest<JSONRPCRequestMetadata>,
JSONRPCRequest<JSONRPCRequestMetadata>
>({
transform: (chunk, controller) => {
controller.enqueue(chunk);
if (forwardFirst) {
forwardFirst = false;
let clientTimeout = chunk.metadata?.timeout;
if (clientTimeout === undefined) return;
if (clientTimeout === null) clientTimeout = Infinity;
if (clientTimeout < currentTimeout) ctx.timer.reset(clientTimeout);
}
},
}),
reverse: new TransformStream<
JSONRPCResponse<JSONRPCResponseMetadata>,
JSONRPCResponse<JSONRPCResponseMetadata>
>({
transform: (chunk, controller) => {
// Passthrough chunk, no need for server to send ctx.timeout
controller.enqueue(chunk);
},
}),
};
}

/**
* This adds its own timeout to the forward metadata and updates it's timeout
* based on the reverse metadata.
* @param ctx
* @param _cancel
* @param _meta
*/
function timeoutMiddlewareClient(
ctx: ContextTimed,
_cancel: (reason?: any) => void,
_meta: Record<string, JSONValue> | undefined,
) {
const currentTimeout = ctx.timer.delay;
// Flags for tracking if the first message has been processed
let forwardFirst = true;
return {
forward: new TransformStream<JSONRPCRequest, JSONRPCRequest>({
transform: (chunk, controller) => {
if (forwardFirst) {
forwardFirst = false;
if (chunk == null) chunk = { jsonrpc: '2.0', method: '' };
if (chunk.metadata == null) chunk.metadata = {};
(chunk.metadata as any).timeout = currentTimeout;
}
controller.enqueue(chunk);
},
}),
reverse: new TransformStream<
JSONRPCResponse<JSONRPCResponseMetadata>,
JSONRPCResponse<JSONRPCResponseMetadata>
>({
transform: (chunk, controller) => {
controller.enqueue(chunk); // Passthrough chunk, no need for client to set ctx.timeout
},
}),
};
}

/**
* This function is a factory for creating a pass-through streamPair. It is used
* as the default middleware for the middleware wrappers.
Expand Down Expand Up @@ -116,12 +194,14 @@ function defaultServerMiddlewareWrapper(
>();

const middleMiddleware = middlewareFactory(ctx, cancel, meta);
const timeoutMiddleware = timeoutMiddlewareServer(ctx, cancel, meta);

const forwardReadable = inputTransformStream.readable.pipeThrough(
middleMiddleware.forward,
); // Usual middleware here
const forwardReadable = inputTransformStream.readable
.pipeThrough(timeoutMiddleware.forward) // Timeout middleware here
.pipeThrough(middleMiddleware.forward); // Usual middleware here
const reverseReadable = outputTransformStream.readable
.pipeThrough(middleMiddleware.reverse) // Usual middleware here
.pipeThrough(timeoutMiddleware.reverse) // Timeout middleware here
.pipeThrough(jsonMessageToBinaryStream());

return {
Expand Down Expand Up @@ -172,13 +252,15 @@ const defaultClientMiddlewareWrapper = (
JSONRPCRequest
>();

const timeoutMiddleware = timeoutMiddlewareClient(ctx, cancel, meta);
const middleMiddleware = middleware(ctx, cancel, meta);
const forwardReadable = inputTransformStream.readable
.pipeThrough(timeoutMiddleware.forward)
.pipeThrough(middleMiddleware.forward) // Usual middleware here
.pipeThrough(jsonMessageToBinaryStream());
const reverseReadable = outputTransformStream.readable.pipeThrough(
middleMiddleware.reverse,
); // Usual middleware here
const reverseReadable = outputTransformStream.readable
.pipeThrough(middleMiddleware.reverse)
.pipeThrough(timeoutMiddleware.reverse); // Usual middleware here

return {
forward: {
Expand All @@ -196,6 +278,8 @@ const defaultClientMiddlewareWrapper = (
export {
binaryToJsonMessageStream,
jsonMessageToBinaryStream,
timeoutMiddlewareClient,
timeoutMiddlewareServer,
defaultMiddleware,
defaultServerMiddlewareWrapper,
defaultClientMiddlewareWrapper,
Expand Down
36 changes: 33 additions & 3 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type JSONRPCRequestMessage<T extends JSONValue = JSONValue> = {
* SHOULD NOT contain fractional parts [2]
*/
id: string | number | null;
};
} & JSONRPCRequestMetadata;

/**
* This is the JSON RPC notification object. this is used for a request that
Expand All @@ -60,7 +60,7 @@ type JSONRPCRequestNotification<T extends JSONValue = JSONValue> = {
* This member MAY be omitted.
*/
params?: T;
};
} & JSONRPCRequestMetadata;

/**
* This is the JSON RPC response result object. It contains the response data for a
Expand All @@ -84,7 +84,7 @@ type JSONRPCResponseResult<T extends JSONValue = JSONValue> = {
* it MUST be Null.
*/
id: string | number | null;
};
} & JSONRPCResponseMetadata;

/**
* This is the JSON RPC response Error object. It contains any errors that have
Expand All @@ -110,6 +110,34 @@ type JSONRPCResponseError = {
id: string | number | null;
};

/**
* Used when an empty object is needed.
* Defined here with a linter override to avoid a false positive.
*/
// eslint-disable-next-line
type ObjectEmpty = {};

// Prevent overwriting the metadata type with `Omit<>`
type JSONRPCRequestMetadata<T extends Record<string, JSONValue> = ObjectEmpty> =
{
metadata?: {
[Key: string]: JSONValue;
} & Partial<{
timeout: number | null;
}>;
} & Omit<T, 'metadata'>;

// Prevent overwriting the metadata type with `Omit<>`
type JSONRPCResponseMetadata<
T extends Record<string, JSONValue> = ObjectEmpty,
> = {
metadata?: {
[Key: string]: JSONValue;
} & Partial<{
timeout: number | null;
}>;
} & Omit<T, 'metadata'>;

/**
* This is a JSON RPC error object, it encodes the error data for the JSONRPCResponseError object.
*/
Expand Down Expand Up @@ -357,6 +385,8 @@ export type {
JSONRPCRequestNotification,
JSONRPCResponseResult,
JSONRPCResponseError,
JSONRPCRequestMetadata,
JSONRPCResponseMetadata,
JSONRPCError,
JSONRPCRequest,
JSONRPCResponse,
Expand Down
2 changes: 2 additions & 0 deletions src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ const standardErrors: {
URIError,
AggregateError,
AbstractError,
ErrorRPCTimedOut: errors.ErrorRPCTimedOut,
};

/**
Expand Down Expand Up @@ -342,6 +343,7 @@ function toError(
let e: Error;
switch (eClass) {
case AbstractError:
case errors.ErrorRPCTimedOut:
e = eClass.fromJSON(errorData);
break;
case AggregateError:
Expand Down
Loading

0 comments on commit 31c7e0d

Please sign in to comment.