Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: XStream 返回流同时支持异步迭代 #319

Merged
merged 22 commits into from
Dec 23, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 21 additions & 12 deletions components/x-stream/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -137,11 +137,15 @@ export interface XStreamOptions<Output> {
transformStream?: TransformStream<string, Output>;
}

interface XReadableStream<R = any> extends ReadableStream<R> {
[Symbol.asyncIterator](): AsyncGenerator<R, void, unknown>;
}

/**
* @description Transform Uint8Array binary stream to {@link SSEOutput} by default
* @warning The `XStream` only support the `utf-8` encoding. More encoding support maybe in the future.
*/
async function* XStream<Output = SSEOutput>(options: XStreamOptions<Output>) {
function XStream<Output = SSEOutput>(options: XStreamOptions<Output>) {
const { readableStream, transformStream } = options;

if (!(readableStream instanceof ReadableStream)) {
Expand All @@ -166,18 +170,23 @@ async function* XStream<Output = SSEOutput>(options: XStreamOptions<Output>) {
.pipeThrough(splitStream())
.pipeThrough(splitPart());

const reader = stream.getReader() as ReadableStreamDefaultReader<Output>;

while (reader instanceof ReadableStreamDefaultReader) {
const { value, done } = await reader.read();

if (done) break;

if (!value) continue;
/** support async iterator */
(stream as XReadableStream<Output>)[Symbol.asyncIterator] = async function* () {
const reader = this.getReader();
try {
while (true) {
const { done, value } = await reader.read();
if (done) {
return;
}
yield value;
}
} finally {
reader.cancel();
}
};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

在 finally 块中应使用 reader.releaseLock() 而非 reader.cancel()

在异步迭代器的 finally 块中,调用 reader.cancel() 会取消流的读取,并可能导致未读取的数据被丢弃。如果只是想释放读取器的锁,应使用 reader.releaseLock()

请应用以下修改来修正此问题:

  } finally {
-   reader.cancel();
+   reader.releaseLock();
  }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
/** support async iterator */
(stream as XReadableStream<Output>)[Symbol.asyncIterator] = async function* () {
const reader = this.getReader();
try {
while (true) {
const { done, value } = await reader.read();
if (done) {
return;
}
yield value;
}
} finally {
reader.cancel();
}
};
/** support async iterator */
(stream as XReadableStream<Output>)[Symbol.asyncIterator] = async function* () {
const reader = this.getReader();
try {
while (true) {
const { done, value } = await reader.read();
if (done) {
return;
}
yield value;
}
} finally {
reader.releaseLock();
}
};


// Transformed data through all transform pipes
yield value;
}
return stream as XReadableStream<Output>;
ppbl marked this conversation as resolved.
Show resolved Hide resolved
}

export default XStream;
Loading