Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transferring data through the RPC stream skips the second message #70

Closed
aryanjassal opened this issue Nov 8, 2024 · 18 comments · Fixed by #74
Closed

Transferring data through the RPC stream skips the second message #70

aryanjassal opened this issue Nov 8, 2024 · 18 comments · Fixed by #74
Assignees
Labels
development Standard development

Comments

@aryanjassal
Copy link
Member

aryanjassal commented Nov 8, 2024

Description

The second value is always skipped when the messages are being processed by RPCServer. This might be happening due to a potential race condition which happens when we try to disconnect the stream from one pipe and connect it to another. When we disconnect the stream, the stream has already partially loaded the next chunk of the message. This results in the data being lost.

This is also reflected in the tests. Weirdly, all the tests in the feature branch pass for multiple commits, but anytime we merge, this failure is brought back. More interestingly, the MacOS machines are the only machines which have a failure due to this. However, once I pulled down the code from staging and ran the tests, I consistently got the same error.

Interestingly, this would have caused basically all RPC calls to be non-functional, but that is not the case. For Polykey and Polykey CLI, the RPC system has been working pretty reliably, so it's even more weird why this is happening only in the tests, but is not an issue anywhere else.

I went back to the first commit in this repo, ran npm install, then ran the tests again, but the tests still had the same result of failing in the same way, so this issue could be in the repo from the beginning, but no one ever ran into this for some reason.

To Reproduce

  1. Run the CI on a separate branch
  2. The CI should pass
  3. Merge the branch into staging
  4. The CI should fail on the MacOS machine
  5. Try running CI on other branch
  6. CI fails again

Additional context

Details of the commented out test
// Temporarily commenting out to allow the CI to make a release
test.prop({
  messages: specificMessageArb,
})('forward middlewares', async ({ messages }) => {
  const stream = rpcTestUtils.messagesToReadableStream(messages);
  class TestMethod extends DuplexHandler {
    public handle = async function* (
      input: AsyncGenerator<JSONRPCRequestParams>,
      _cancel: (reason?: any) => void,
      _meta: Record<string, JSONValue> | undefined,
      _ctx: ContextTimed,
    ): AsyncGenerator<JSONRPCResponseResult> {
      yield* input;
    };
  }
  const middlewareFactory = rpcUtilsMiddleware.defaultServerMiddlewareWrapper(
    () => {
      return {
        forward: new TransformStream({
          transform: (chunk, controller) => {
            chunk.params = { value: 1 };
            controller.enqueue(chunk);
          },
        }),
        reverse: new TransformStream(),
      };
    },
  );
  const rpcServer = new RPCServer({
    middlewareFactory: middlewareFactory,
    logger,
    idGen,
  });
  await rpcServer.start({
    manifest: {
      testMethod: new TestMethod({}),
    },
  });
  const [outputResult, outputStream] = rpcTestUtils.streamToArray();
  const readWriteStream: RPCStream<Uint8Array, Uint8Array> = {
    cancel: () => {},
    readable: stream,
    writable: outputStream,
  };
  rpcServer.handleStream(readWriteStream);
  const out = await outputResult;
  expect(out.map((v) => v!.toString())).toStrictEqual(
    messages.map(() =>
      JSON.stringify({
        jsonrpc: '2.0',
        result: { value: 1 },
        id: null,
      }),
    ),
  );
  await rpcServer.stop({ force: true });
});
test.prop(
  {
    messages: specificMessageArb,
  },
  { numRuns: 1 },
)('reverse middlewares', async ({ messages }) => {
  const stream = rpcTestUtils.messagesToReadableStream(messages);
  class TestMethod extends DuplexHandler {
    public handle = async function* (
      input: AsyncGenerator<JSONRPCRequestParams<{ value: number }>>,
      _cancel: (reason?: any) => void,
      _meta: Record<string, JSONValue> | undefined,
      _ctx: ContextTimed,
    ): AsyncGenerator<JSONRPCResponseResult<{ value: number }>> {
      yield* input;
    };
  }
  const middleware = rpcUtilsMiddleware.defaultServerMiddlewareWrapper(() => {
    return {
      forward: new TransformStream(),
      reverse: new TransformStream({
        transform: (chunk, controller) => {
          if ('result' in chunk) chunk.result = { value: 1 };
          controller.enqueue(chunk);
        },
      }),
    };
  });
  const rpcServer = new RPCServer({
    middlewareFactory: middleware,
    logger,
    idGen,
  });
  await rpcServer.start({
    manifest: {
      testMethod: new TestMethod({}),
    },
  });
  const [outputResult, outputStream] = rpcTestUtils.streamToArray();
  const readWriteStream: RPCStream<Uint8Array, Uint8Array> = {
    cancel: () => {},
    readable: stream,
    writable: outputStream,
  };
  rpcServer.handleStream(readWriteStream);
  const out = await outputResult;
  expect(out.map((v) => v!.toString())).toStrictEqual(
    messages.map(() =>
      JSON.stringify({
        jsonrpc: '2.0',
        result: { value: 1 },
        id: null,
      }),
    ),
  );
  await rpcServer.stop({ force: true });
});

Platform

  • Device: CI / Dell Precision 3480
  • OS: MacOS Github Runner / NixOS
  • Version: 6.0

Methods of Resolution

  1. Instead of relying on switching pipes, use something more reliable
  2. Update the tests to allow better identification. For example, give each input value a separate number, which can help identify exactly which items are being left out.

Notify Maintainers

@aryanjassal @tegefaulkes

@aryanjassal aryanjassal added the development Standard development label Nov 8, 2024
@aryanjassal aryanjassal self-assigned this Nov 8, 2024
Copy link

linear bot commented Nov 8, 2024

@aryanjassal
Copy link
Member Author

Temporarily, to allow deployment for continuing work on Polykey#838, I have commented out the tests. From what can be observed, nothing is failing outside the tests, so it seems safe to do.

@CMCDragonkai
Copy link
Member

Can we fastcheck the sequential testing?

Copy link
Member Author

Should be able to. Currently, the testing is taking in a randomly generated value for params and returning 1 consistently. By using the default middleware and setting the params to the value we want to test, I was able to simplify the test. This can be a way to use fastcheck.

However, this is a result of a very brief testing and investigation. I will properly go through this when I will actually do this issue.

@CMCDragonkai
Copy link
Member

This deserves plenty of fuzz testing, use chatgpt to help.

Copy link
Member Author

Previously, it already had fastcheck tests, so fuzz testing wasn't an issue before. The issue is that, for some reason, the second input is being skipped. Brian and I think that this is caused by us disconnecting and reconnecting the pipe when we extract the header message only, then cancel the stream. To fix this issue, I will have to update that implementation to be better and more robust.

The sequential testing is mostly for convenience, to be able to easily locate where things went wrong.

@CMCDragonkai
Copy link
Member

This is a regression?

@CMCDragonkai
Copy link
Member

This is a bug, the fastcheck has discovered a bug here.

@CMCDragonkai
Copy link
Member

Why is this not a bug issue?

@CMCDragonkai
Copy link
Member

This should be added into linear todos.

@CMCDragonkai
Copy link
Member

The 2 test names should be more specific:

    ✕ forward middlewares (with seed=1292472631) (9 ms)
    ✕ reverse middlewares (with seed=1292472631) (8 ms)

@aryanjassal
Copy link
Member Author

The reason this happens is most likely due to how the header messages are handled. Currently, a new transform stream is created to fetch the header, then the stream is cancelled to allow changing the consumer from the header consumer to another consumer which parses the actual content.

The weird decision of creating a stream and cancelling it just after getting the first message needs to be changed for another approach, which handles this more elegantly. Instead of cancelling the stream, a promise can be returned for the header, and something like an async iterable can be returned for content data.

This would remove the janky solution and implement an elegant one, and should fix the CI issues.

This is a bug, the fastcheck has discovered a bug here.

This is probably not the case, as fastcheck won't discover the same bug when running multiple tests. Moreover, the second message is skipped, which is something fastcheck can't induce, so there is most likely some other underlying reason for this.

This deserves plenty of fuzz testing, use chatgpt to help.

The current fastcheck tests are already pretty robust and extensive in checking, so I don't believe this to be the issue here.

@aryanjassal
Copy link
Member Author

I have done the following in Polykey#847 to extract a header message from an async iterable.

// Extracts the header message from the iterator
const headerMessage = await (async () => {
  const iterator = input[Symbol.asyncIterator]();
  const header = (await iterator.next()).value;
  if (header.type === 'VaultNamesHeaderMessage') {
    if (header == null) throw new clientErrors.ErrorClientInvalidHeader();
    return header;
  }
})();

// Do stuff on rest of the messages
for await (const message of input) {
  // The header has been consumed, so all other messages will be
  // returned from the loop.
}

@CMCDragonkai
Copy link
Member

Why do const headerMessage = await (async () => { ... }();, seems redundant, are you sure this is the right structure of the code?

@aryanjassal
Copy link
Member Author

Why do const headerMessage = await (async () => { ... }();, seems redundant, are you sure this is the right structure of the code?

There are easier ways to do this, but most of them include something like this let headerMessage: Type | undefined;, which is something I wanted to avoid, as that would lead to a lot of extra checks down the line. One per message during iteration, I believe. This can add up, and moreover, random null checks sprawled everywhere just look weird, so I decided on this approach.

The code I've provided creates and calls an async function inline. It's basically doing a bunch of calculations inside a block to return a singular value. This lets us avoid needing to type-check everywhere. The syntax looks weird because we await an async function that we have created. It is functionally similar to this, but more concise.

const extractHeader = async (): HeaderType => {
  const iterator = input[Symbol.asyncIterator]();
  const header = (await iterator.next()).value;
  if (header.type === 'VaultNamesHeaderMessage') {
    if (header == null) throw new clientErrors.ErrorClientInvalidHeader();
    return header;
  }
}

const header = await extractHeader();

This makes it easier to see how this looks spread out, and how it would look like if extractHeader was substituted with the async call block instead.

@tegefaulkes
Copy link
Contributor

You shouldn't have to check it's type everywhere even if you unwrapped the logic out of that arrow function.

Also, does that code even work? Why check header == null after checking header.type === ''. If type is defined then header can't be null.

Isn't this simpler?

    const head = await input.next();
    if (head.done) utils.never();
    if (head.value.type === 'VaultNamesHeaderMessage') throw new clientErrors.ErrorClientInvalidHeader();
    const headerMessage = head.value;

@CMCDragonkai
Copy link
Member

Yes that code structure still smells to me. If you need to do some asynchronous work it should always be doable in a linear way without having to create anonymous async IIFEs. It is just not needed ever. It's too clever. Make it dumber.

Copy link
Member Author

I haven't merged the PR yet, so I will make this change in it before merging it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
development Standard development
Development

Successfully merging a pull request may close this issue.

3 participants