Skip to content

Commit

Permalink
fix: server handlers are now able to persist operations and ignore ti…
Browse files Browse the repository at this point in the history
…meouts

fix: changes to `ObjectEmpty` type to be in line with polykey

[ci-skip]
  • Loading branch information
amydevs committed Oct 30, 2023
1 parent 228f824 commit caa9bda
Show file tree
Hide file tree
Showing 6 changed files with 96 additions and 11 deletions.
23 changes: 15 additions & 8 deletions src/RPCServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,9 @@ class RPCServer {
// Input generator derived from the forward stream
const inputGen = async function* (): AsyncIterable<I> {
for await (const data of forwardStream) {
ctx.timer.refresh();
if (ctx.timer.status !== 'settled') {
ctx.timer.refresh();
}
yield data.params as I;
}
};
Expand All @@ -296,7 +298,9 @@ class RPCServer {
timer: ctx.timer,
});
for await (const response of handlerG) {
ctx.timer.refresh();
if (ctx.timer.status !== 'settled') {
ctx.timer.refresh();
}
const responseMessage: JSONRPCResponseResult = {
jsonrpc: '2.0',
result: response,
Expand Down Expand Up @@ -570,13 +574,16 @@ class RPCServer {
}
// Setting up Timeout logic
const timeout = this.defaultTimeoutMap.get(method);
if (timeout != null && timeout < this.handlerTimeoutTime) {
// Reset timeout with new delay if it is less than the default
timer.reset(timeout);
} else {
// Otherwise refresh
timer.refresh();
if (timer.status !== 'settled') {
if (timeout != null && timeout < this.handlerTimeoutTime) {
// Reset timeout with new delay if it is less than the default
timer.reset(timeout);
} else {
// Otherwise refresh
timer.refresh();
}
}

this.logger.info(`Handling stream with method (${method})`);
let handlerResult: [JSONValue | undefined, ReadableStream<Uint8Array>];
const headerWriter = rpcStream.writable.getWriter();
Expand Down
7 changes: 7 additions & 0 deletions src/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,13 @@ class ErrorRPCStreamEnded<T> extends ErrorRPCProtocol<T> {
class ErrorRPCTimedOut<T> extends ErrorRPCProtocol<T> {
static description = 'RPC handler has timed out';
code = JSONRPCErrorCode.RPCTimedOut;
public toJSON(): JSONRPCError {
const json = super.toJSON();
if (typeof json === 'object' && !Array.isArray(json)) {
(json as POJO).type = this.constructor.name;
}
return json;
}
}

class ErrorUtilsUndefinedBehaviour<T> extends ErrorRPCProtocol<T> {
Expand Down
4 changes: 2 additions & 2 deletions src/middleware.ts
Original file line number Diff line number Diff line change
Expand Up @@ -278,9 +278,9 @@ const defaultClientMiddlewareWrapper = (
export {
binaryToJsonMessageStream,
jsonMessageToBinaryStream,
timeoutMiddlewareClient,
timeoutMiddlewareServer,
defaultMiddleware,
defaultServerMiddlewareWrapper,
defaultClientMiddlewareWrapper,
timeoutMiddlewareClient,
timeoutMiddlewareServer,
};
7 changes: 6 additions & 1 deletion src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,12 @@ type JSONRPCResponseError = {
id: string | number | null;
};

type ObjectEmpty = NonNullable<unknown>;
/**
* Used when an empty object is needed.
* Defined here with a linter override to avoid a false positive.
*/
// eslint-disable-next-line
type ObjectEmpty = {};

// Prevent overwriting the metadata type with `Omit<>`
type JSONRPCRequestMetadata<T extends Record<string, JSONValue> = ObjectEmpty> =
Expand Down
2 changes: 2 additions & 0 deletions src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ const standardErrors: {
URIError,
AggregateError,
AbstractError,
ErrorRPCTimedOut: errors.ErrorRPCTimedOut,
};

/**
Expand Down Expand Up @@ -342,6 +343,7 @@ function toError(
let e: Error;
switch (eClass) {
case AbstractError:
case errors.ErrorRPCTimedOut:
e = eClass.fromJSON(errorData);
break;
case AggregateError:
Expand Down
64 changes: 64 additions & 0 deletions tests/RPC.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -926,6 +926,70 @@ describe('RPC', () => {

await rpcServer.stop({ force: true });
});
testProp(
'RPC client times out and server is able to ignore exception',
[fc.string()],
async (message) => {
// Setup server and client communication pairs
const { clientPair, serverPair } = rpcTestUtils.createTapPairs<
Uint8Array,
Uint8Array
>();
const { p: ctxP, resolveP: resolveCtxP } = utils.promise<ContextTimed>();
class TestMethod extends UnaryHandler {
public handle = async (
input: JSONValue,
cancel: (reason?: any) => void,
meta: Record<string, JSONValue> | undefined,
ctx: ContextTimed,
): Promise<JSONValue> => {
const abortProm = utils.promise<never>();
ctx.signal.addEventListener('abort', () => {
resolveCtxP(ctx);
abortProm.resolveP(ctx.signal.reason);
});
await abortProm.p;
return input;
};
}
// Set up a client and server with matching timeout settings
const rpcServer = new RPCServer({
logger,
idGen,
handlerTimeoutTime: 150,
});
await rpcServer.start({
manifest: {
testMethod: new TestMethod({}),
},
});
rpcServer.handleStream({
...serverPair,
cancel: () => {},
});

const rpcClient = new RPCClient({
manifest: {
testMethod: new UnaryCaller(),
},
streamFactory: async () => {
return {
...clientPair,
cancel: () => {},
};
},
logger,
idGen,
});
await expect(
rpcClient.methods.testMethod(message, { timer: 100 }),
).resolves.toBe(message);
await expect(ctxP).resolves.toHaveProperty(['timer', 'delay'], 100);

await rpcServer.stop({ force: true });
},
{ numRuns: 1 },
);
testProp(
'RPC Serializes and Deserializes Error',
[rpcTestUtils.errorArb(rpcTestUtils.errorArb())],
Expand Down

0 comments on commit caa9bda

Please sign in to comment.