Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ungraceful closures of streams used cancel or abort with no reason - remove all of these, and we can rely on timeouts instead #27

Closed
addievo opened this issue Sep 30, 2023 · 10 comments
Assignees
Labels
development Standard development r&d:polykey:supporting activity Supporting core activity

Comments

@addievo
Copy link
Contributor

addievo commented Sep 30, 2023

Specification

We cannot call cancel or abort without reasons. This can cause undefined errors, which is very difficult to debug.

These are being used to ungracefully close streams, possibly in RPCClient and RPCServer.

This shouldn't be used anymore anyway because we can rely on timeouts instead, rather than ungracefully closing streams.

Additional Context

@addievo addievo added the bug Something isn't working label Sep 30, 2023
@addievo
Copy link
Contributor Author

addievo commented Sep 30, 2023

All instances of cancel or abort without a reason

RPCClient.ts

  @ready(new rpcErrors.ErrorMissingCaller())
  public async unaryCaller<I extends JSONValue, O extends JSONValue>(
    method: string,
    parameters: I,
    ctx: Partial<ContextTimedInput> = {},
  ): Promise<O> {
    const callerInterface = await this.duplexStreamCaller<I, O>(method, ctx);
    const reader = callerInterface.readable.getReader();
    const writer = callerInterface.writable.getWriter();
    try {
      await writer.write(parameters);
      const output = await reader.read();
      if (output.done) {
        throw new rpcErrors.ErrorMissingCaller('Missing response', {
          cause: ctx.signal?.reason,
        });
      }
      await reader.cancel();
      await writer.close();
      return output.value;
    } finally {
      // Attempt clean up, ignore errors if already cleaned up
      await reader.cancel().catch(() => {});
      await writer.close().catch(() => {});
    }
  }

Instance 2

    // Hooking up agnostic stream side
    let rpcStream: RPCStream<Uint8Array, Uint8Array>;
    const streamFactoryProm = this.streamFactory({ signal, timer });
    try {
      rpcStream = await Promise.race([streamFactoryProm, abortRaceProm.p]);
    } catch (e) {
      cleanUp();
      void streamFactoryProm.then((stream) =>
        stream.cancel(ErrorRPCStreamEnded),
      );
      throw e;
    }
    void timer.then(
      () => {
        rpcStream.cancel(
          new rpcErrors.ErrorRPCTimedOut('RPC has timed out', {
            cause: ctx.signal?.reason,
          }),
        );
      },
      () => {}, // Ignore cancellation error
    );
    // Deciding if we want to allow refreshing
    // We want to refresh timer if none was provided
    const refreshingTimer: Timer | undefined =
      ctx.timer == null ? timer : undefined;
    // Composing stream transforms and middleware
    const metadata = {
      ...(rpcStream.meta ?? {}),
      command: method,
    };
    const outputMessageTransformStream =
      rpcUtils.clientOutputTransformStream<O>(metadata, refreshingTimer);
    const inputMessageTransformStream = rpcUtils.clientInputTransformStream<I>(
      method,
      refreshingTimer,
    );
    const middleware = this.middlewareFactory(
      { signal, timer },
      rpcStream.cancel,
      metadata,
    );

RPCServer.ts

 @ready(new rpcErrors.ErrorRPCHandlerFailed())
  public handleStream(rpcStream: RPCStream<Uint8Array, Uint8Array>) {
    // This will take a buffer stream of json messages and set up service
    //  handling for it.
    // Constructing the PromiseCancellable for tracking the active stream
    const abortController = new AbortController();
    // Setting up timeout timer logic
    const timer = new Timer({
      delay: this.handlerTimeoutTime,
      handler: () => {
        abortController.abort(new rpcErrors.ErrorRPCTimedOut());
        if (this.onTimeoutCallback) {
          this.onTimeoutCallback();
        }
      },
    });

    const prom = (async () => {
      const id = await this.idGen();
      const headTransformStream = rpcUtilsMiddleware.binaryToJsonMessageStream(
        rpcUtils.parseJSONRPCRequest,
      );
      // Transparent transform used as a point to cancel the input stream from
      const passthroughTransform = new TransformStream<
        Uint8Array,
        Uint8Array
      >();
      const inputStream = passthroughTransform.readable;
      const inputStreamEndProm = rpcStream.readable
        .pipeTo(passthroughTransform.writable)
        // Ignore any errors here, we only care that it ended
        .catch(() => {});
      void inputStream
        // Allow us to re-use the readable after reading the first message
        .pipeTo(headTransformStream.writable, {
          preventClose: true,
          preventCancel: true,
        })
        // Ignore any errors here, we only care that it ended
        .catch(() => {});
      const cleanUp = async (reason: any) => {
        await inputStream.cancel(reason);
        await rpcStream.writable.abort(reason);
        await inputStreamEndProm;
        timer.cancel(cleanupReason);
        await timer.catch(() => {});
      };
      // Read a single empty value to consume the first message
      const reader = headTransformStream.readable.getReader();
      // Allows timing out when waiting for the first message
      let headerMessage:
        | ReadableStreamDefaultReadResult<JSONRPCRequest>
        | undefined
        | void;
      try {
        headerMessage = await Promise.race([
          reader.read(),
          timer.then(
            () => undefined,
            () => {},
          ),
        ]);
      } catch (e) {
        const newErr = new rpcErrors.ErrorRPCHandlerFailed(
          'Stream failed waiting for header',
          { cause: e },
        );
        await inputStreamEndProm;
        timer.cancel(cleanupReason);
        await timer.catch(() => {});
        this.dispatchEvent(
          new rpcEvents.RPCErrorEvent({
            detail: new rpcErrors.ErrorRPCOutputStreamError(
              'Stream failed waiting for header',
              {
                cause: newErr,
              },
            ),
          }),
        );
        return;
      }
      // Downgrade back to the raw stream
      await reader.cancel();
      // There are 2 conditions where we just end here
      //  1. The timeout timer resolves before the first message
      //  2. the stream ends before the first message

@CMCDragonkai
Copy link
Member

CMCDragonkai commented Oct 4, 2023

Discussion about RPCServer and RPCClient lifecycles:

MatrixAI/Polykey#552 (comment)

Brian — Yesterday at 12:15 PM
Moving the RPCServer into NCM is what we discussed before. I  think it's fine so long as the server manifest is generic and passed in just to simplify testing.

Seems like the RPCServer could be StartStop pattern. It could just be a normal class but the handleStream is ready decorated. While the RPCServer doesn't really manage the streams it's handling. It is managing the handlers and fully allowed to cancel any active streams directly.

I think the main reason the RPCServer has a life cycle is to clearly track the existence of active streams. If we're ending the RPCServer, for simplicity we want any running handlers to end before the RPCServer has ended. This makes it clear that it's safe to stop any domains because we can be sure no handlers are actively using them.

That said, stopping the transport layer stuff serves the same function, but since the RPC is agnostic to transport we don't want to depend on that. Then again, the RPC can only advise abort unless we want to keep  a force destroy functionality around.
CMCDragonkai — Yesterday at 12:17 PM
Yes that's why I was thinking that stopping the  RPCServer is more appropriate and in stopping should only result in a signal abortion, not cancellation of streams
Because RPCServer is agnostic to the stream, and managing the lifecycle of the handlers is controlled by an advisory signal abortion instead of cancelling streams
Brian — Yesterday at 12:17 PM
Fair, In any case, the RPCServer should only by fully stopped once all handlers have completed.
CMCDragonkai — Yesterday at 12:18 PM
Yes after sending the signal abortion it can wait for all handlers to have completely resolved
Brian — Yesterday at 12:24 PM
Mmmk, So as far as changes go, based on that we're just removing the force: boolean option from rpcServer.destroy(), making sure it awaits all active streams. And possibly converting it to StartStop.
This will need to be changed on js-rpc.
CMCDragonkai — Yesterday at 12:33 PM
Including unary, cs, SS, and duplex
Brian — Yesterday at 12:33 PM
I'm not sure what you mean?
CMCDragonkai — Yesterday at 12:58 PM
I'm not sure we have force at all
It's just signal abortion
For all 4 different kinds of handlers
And signal abortion is always advisory
So you just wait until handlers have finished
Brian — Yesterday at 12:59 PM
Logic wise, setting force:true for RPCServer.destroy() just triggers abortion for the handlers before waiting for them to complete. force:false just waits for them to complete.

Conclusion is:

  1. RPCServer can be Start/Stop. Where Stop has force and force true means sending a special abortion signal to all the handler types. (Not cancel streams)
  2. RPCClient can just be synchronous - there is no lifecycle here

@CMCDragonkai
Copy link
Member

js-rpc/src/RPCServer.ts

Lines 217 to 236 in c885dc1

public async destroy(force: boolean = true): Promise<void> {
// Log and dispatch an event before starting the destruction
this.logger.info(`Destroying ${this.constructor.name}`);
this.dispatchEvent(new events.EventRPCServerDestroy());
// Your existing logic for stopping active streams and other cleanup
if (force) {
for await (const [activeStream] of this.activeStreams.entries()) {
activeStream.cancel(new rpcErrors.ErrorRPCStopping());
}
}
for await (const [activeStream] of this.activeStreams.entries()) {
await activeStream;
}
// Log and dispatch an event after the destruction has been completed
this.dispatchEvent(new events.EventRPCServerDestroyed());
this.logger.info(`Destroyed ${this.constructor.name}`);
}

Right now this says this.activeStreams. If this is the handler promises. This is a very bad name for this. It should be called this.handlerPs. This tells us that these are the active handler promises that still resolving.

And it needs to send a special reason for force true.

@CMCDragonkai
Copy link
Member

If the RPC handlers reject upon being aborted by stop, and it uses throwIfAborted, that will result in RPCServer.stop throwing.

@CMCDragonkai
Copy link
Member

Internal signal reasons are usually symbols. Not exception objects.

@CMCDragonkai CMCDragonkai changed the title Investigation and Resolution of Unexplained .cancel() and .abort() Operations and Asynchronous WebSocket Behavior Remove or replace all calls to cancel or abort that have no reason passed in Oct 27, 2023
@CMCDragonkai CMCDragonkai changed the title Remove or replace all calls to cancel or abort that have no reason passed in Remove all calls cancel or abort that have no reason passed in - because this is bad practice and also we can rely on timeouts now Oct 27, 2023
@CMCDragonkai
Copy link
Member

Ignore everything except than the OP. The OP has been rewritten to focus on what we need.

@CMCDragonkai CMCDragonkai assigned amydevs and unassigned tegefaulkes and addievo Oct 27, 2023
@CMCDragonkai CMCDragonkai added development Standard development and removed bug Something isn't working labels Oct 27, 2023
@CMCDragonkai CMCDragonkai changed the title Remove all calls cancel or abort that have no reason passed in - because this is bad practice and also we can rely on timeouts now Ungraceful closures of streams used cancel or abort with no reason - remove all of these, and we can rely on timeouts instead Oct 27, 2023
@CMCDragonkai
Copy link
Member

@amydevs is this also being done?

@CMCDragonkai
Copy link
Member

This is still pending as there is still cancellations occurring without reasons in RPCServer.ts.

@amydevs
Copy link
Contributor

amydevs commented Nov 14, 2023

i don't think there are any places where cancel or abort are used without reason, or closing the stream ungracefully where it is unneeded.

There is one place where it is done:

const reader = headTransformStream.readable.getReader();
      // Allows timing out when waiting for the first message
      let headerMessage:
        | ReadableStreamDefaultReadResult<JSONRPCRequest>
        | undefined
        | void;
      try {
        headerMessage = await Promise.race([
          reader.read(),
          timer.then(
            () => undefined,
            () => {},
          ),
        ]);
      } catch (e) {
        const newErr = new errors.ErrorRPCHandlerFailed(
          'Stream failed waiting for header',
          { cause: e },
        );
        await inputStreamEndProm;
        timer.cancel(cleanupReason);
        await timer.catch(() => {});
        this.dispatchEvent(
          new events.RPCErrorEvent({
            detail: new errors.ErrorRPCOutputStreamError(
              'Stream failed waiting for header',
              { cause: newErr },
            ),
          }),
        );
        return;
      }
      // Downgrade back to the raw stream
      await reader.cancel();

But this does not matter, as the headTransformStream is Tee of the actual underlying stream that is meant to only parse the header of each RPC call. Hence, the cancel does not propagate to the parent stream.

Therefore, this issue is considered no longer applicable.

@amydevs amydevs closed this as completed Nov 14, 2023
@CMCDragonkai
Copy link
Member

When you say it doesn't matter, should we even keep it?

@CMCDragonkai CMCDragonkai added the r&d:polykey:supporting activity Supporting core activity label Aug 13, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
development Standard development r&d:polykey:supporting activity Supporting core activity
Development

No branches or pull requests

4 participants