Skip to content

Commit

Permalink
fix: always initialize iterator state in rpc output stream
Browse files Browse the repository at this point in the history
  • Loading branch information
jsaguet committed Nov 25, 2024
1 parent 8a508df commit 1c6a89e
Showing 1 changed file with 2 additions and 10 deletions.
12 changes: 2 additions & 10 deletions packages/runtime-rpc/src/rpc-output-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -181,14 +181,14 @@ export class RpcOutputStreamController<T extends object = object> {

// iterator state.
// is undefined when no iterator has been acquired yet.
private _itState: undefined | {
private _itState: {
// a pending result. we yielded that because we were
// waiting for messages at the time.
p?: Deferred<IteratorResult<T, null>>,

// a queue of results that we produced faster that the iterator consumed
q: Array<IteratorResult<T, null> | Error>,
};
} = {q: []};


/**
Expand All @@ -205,12 +205,6 @@ export class RpcOutputStreamController<T extends object = object> {
* messages are queued.
*/
[Symbol.asyncIterator](): AsyncIterator<T> {

// init the iterator state, enabling pushIt()
if (!this._itState) {
this._itState = {q: []};
}

// if we are closed, we are definitely not receiving any more messages.
// but we can't let the iterator get stuck. we want to either:
// a) finish the new iterator immediately, because we are completed
Expand Down Expand Up @@ -249,8 +243,6 @@ export class RpcOutputStreamController<T extends object = object> {
// this either resolves a pending promise, or enqueues the result.
private pushIt(result: IteratorResult<T, null> | Error): void {
let state = this._itState;
if (!state)
return;

// is the consumer waiting for us?
if (state.p) {
Expand Down

0 comments on commit 1c6a89e

Please sign in to comment.