From 862eace2de6356e9e35fd8f2fc769cff187f9af8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=A1=8C=E6=A5=A0?= Date: Fri, 6 Dec 2024 18:25:04 +0800 Subject: [PATCH 01/14] =?UTF-8?q?feat:=20XStream=20=E8=BF=94=E5=9B=9E?= =?UTF-8?q?=E6=B5=81=E5=90=8C=E6=97=B6=E6=94=AF=E6=8C=81=E5=BC=82=E6=AD=A5?= =?UTF-8?q?=E8=BF=AD=E4=BB=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- components/x-stream/index.ts | 33 +++++++++++++++++++++------------ 1 file changed, 21 insertions(+), 12 deletions(-) diff --git a/components/x-stream/index.ts b/components/x-stream/index.ts index 6868ae61..e5ba424f 100644 --- a/components/x-stream/index.ts +++ b/components/x-stream/index.ts @@ -137,11 +137,15 @@ export interface XStreamOptions { transformStream?: TransformStream; } +interface XReadableStream extends ReadableStream { + [Symbol.asyncIterator](): AsyncGenerator; +} + /** * @description Transform Uint8Array binary stream to {@link SSEOutput} by default * @warning The `XStream` only support the `utf-8` encoding. More encoding support maybe in the future. */ -async function* XStream(options: XStreamOptions) { +function XStream(options: XStreamOptions) { const { readableStream, transformStream } = options; if (!(readableStream instanceof ReadableStream)) { @@ -166,18 +170,23 @@ async function* XStream(options: XStreamOptions) { .pipeThrough(splitStream()) .pipeThrough(splitPart()); - const reader = stream.getReader() as ReadableStreamDefaultReader; - - while (reader instanceof ReadableStreamDefaultReader) { - const { value, done } = await reader.read(); - - if (done) break; - - if (!value) continue; + /** support async iterator */ + (stream as XReadableStream)[Symbol.asyncIterator] = async function* () { + const reader = this.getReader(); + try { + while (true) { + const { done, value } = await reader.read(); + if (done) { + return; + } + yield value; + } + } finally { + reader.cancel(); + } + }; - // Transformed data through all transform pipes - yield value; - } + return stream as XReadableStream; } export default XStream; From 75fda7035cd04c9538c23400e4e8bcb8ca818217 Mon Sep 17 00:00:00 2001 From: ppbl <820523733@qq.com> Date: Mon, 9 Dec 2024 18:17:03 +0800 Subject: [PATCH 02/14] feat: asyncGenerator -> asyncIterable --- components/x-stream/__tests__/index.test.tsx | 4 ++- components/x-stream/index.ts | 31 +++++++++++--------- 2 files changed, 20 insertions(+), 15 deletions(-) diff --git a/components/x-stream/__tests__/index.test.tsx b/components/x-stream/__tests__/index.test.tsx index feee60f4..d5da3593 100644 --- a/components/x-stream/__tests__/index.test.tsx +++ b/components/x-stream/__tests__/index.test.tsx @@ -63,7 +63,9 @@ describe('XStream', () => { const result: any[] = []; for await (const value of XStream({ readableStream, transformStream: customTransform })) { - result.push(value); + if (value) { + result.push(value); + } } expect(result).toEqual(['custom-part1', 'custom-part2']); diff --git a/components/x-stream/index.ts b/components/x-stream/index.ts index e5ba424f..786f2af6 100644 --- a/components/x-stream/index.ts +++ b/components/x-stream/index.ts @@ -137,9 +137,7 @@ export interface XStreamOptions { transformStream?: TransformStream; } -interface XReadableStream extends ReadableStream { - [Symbol.asyncIterator](): AsyncGenerator; -} +type XReadableStream = ReadableStream & AsyncIterable; /** * @description Transform Uint8Array binary stream to {@link SSEOutput} by default @@ -171,19 +169,24 @@ function XStream(options: XStreamOptions) { .pipeThrough(splitPart()); /** support async iterator */ - (stream as XReadableStream)[Symbol.asyncIterator] = async function* () { + (stream as XReadableStream)[Symbol.asyncIterator] = function () { const reader = this.getReader(); - try { - while (true) { + return { + async next() { const { done, value } = await reader.read(); - if (done) { - return; - } - yield value; - } - } finally { - reader.cancel(); - } + return { + done, + value: value!, + }; + }, + async return() { + await reader.cancel(); + return { + done: true, + value: null, + }; + }, + }; }; return stream as XReadableStream; From bc83b3821b5baf83f973bb5a4fd4a2ed1759caba Mon Sep 17 00:00:00 2001 From: ppbl <33046279+ppbl@users.noreply.github.com> Date: Tue, 10 Dec 2024 17:51:56 +0800 Subject: [PATCH 03/14] Update components/x-stream/index.ts Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com> --- components/x-stream/index.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/components/x-stream/index.ts b/components/x-stream/index.ts index 786f2af6..9d3905b8 100644 --- a/components/x-stream/index.ts +++ b/components/x-stream/index.ts @@ -176,7 +176,7 @@ function XStream(options: XStreamOptions) { const { done, value } = await reader.read(); return { done, - value: value!, + value: value ?? null, }; }, async return() { From 70640d940667ae2a68197f3847616bcd16492ea7 Mon Sep 17 00:00:00 2001 From: ppbl <33046279+ppbl@users.noreply.github.com> Date: Wed, 11 Dec 2024 11:45:44 +0800 Subject: [PATCH 04/14] refactor: return to asyncGenerator --- components/x-stream/__tests__/index.test.tsx | 4 +- components/x-stream/index.ts | 44 +++++++++----------- 2 files changed, 20 insertions(+), 28 deletions(-) diff --git a/components/x-stream/__tests__/index.test.tsx b/components/x-stream/__tests__/index.test.tsx index d5da3593..feee60f4 100644 --- a/components/x-stream/__tests__/index.test.tsx +++ b/components/x-stream/__tests__/index.test.tsx @@ -63,9 +63,7 @@ describe('XStream', () => { const result: any[] = []; for await (const value of XStream({ readableStream, transformStream: customTransform })) { - if (value) { - result.push(value); - } + result.push(value); } expect(result).toEqual(['custom-part1', 'custom-part2']); diff --git a/components/x-stream/index.ts b/components/x-stream/index.ts index 9d3905b8..4a3a9da2 100644 --- a/components/x-stream/index.ts +++ b/components/x-stream/index.ts @@ -137,7 +137,7 @@ export interface XStreamOptions { transformStream?: TransformStream; } -type XReadableStream = ReadableStream & AsyncIterable; +type XReadableStream = ReadableStream & AsyncGenerator; /** * @description Transform Uint8Array binary stream to {@link SSEOutput} by default @@ -157,36 +157,30 @@ function XStream(options: XStreamOptions) { ? /** * Uint8Array binary -> string -> Output */ - readableStream - .pipeThrough(decoderStream) - .pipeThrough(transformStream) + readableStream.pipeThrough(decoderStream).pipeThrough(transformStream) : /** * Uint8Array binary -> string -> SSE part string -> Default Output {@link SSEOutput} */ - readableStream - .pipeThrough(decoderStream) - .pipeThrough(splitStream()) - .pipeThrough(splitPart()); + readableStream.pipeThrough(decoderStream).pipeThrough(splitStream()).pipeThrough(splitPart()); /** support async iterator */ - (stream as XReadableStream)[Symbol.asyncIterator] = function () { - const reader = this.getReader(); - return { - async next() { + (stream as XReadableStream)[Symbol.asyncIterator] = async function* () { + const reader = stream.getReader() as ReadableStreamDefaultReader; + + try { + while (reader instanceof ReadableStreamDefaultReader) { const { done, value } = await reader.read(); - return { - done, - value: value ?? null, - }; - }, - async return() { - await reader.cancel(); - return { - done: true, - value: null, - }; - }, - }; + + if (done) break; + + if (!value) continue; + + // Transformed data through all transform pipes + yield value; + } + } finally { + await reader.cancel(); + } }; return stream as XReadableStream; From 991802fae9d33b88279df2e4a1b6188de709bb90 Mon Sep 17 00:00:00 2001 From: ppbl <33046279+ppbl@users.noreply.github.com> Date: Tue, 17 Dec 2024 19:20:03 +0800 Subject: [PATCH 05/14] refactor: delete try..finally --- components/x-stream/index.ts | 51 +++++++++++++++++------------------- 1 file changed, 24 insertions(+), 27 deletions(-) diff --git a/components/x-stream/index.ts b/components/x-stream/index.ts index 4a3a9da2..fc0989f6 100644 --- a/components/x-stream/index.ts +++ b/components/x-stream/index.ts @@ -137,13 +137,11 @@ export interface XStreamOptions { transformStream?: TransformStream; } -type XReadableStream = ReadableStream & AsyncGenerator; - /** * @description Transform Uint8Array binary stream to {@link SSEOutput} by default * @warning The `XStream` only support the `utf-8` encoding. More encoding support maybe in the future. */ -function XStream(options: XStreamOptions) { +async function* XStream(options: XStreamOptions) { const { readableStream, transformStream } = options; if (!(readableStream instanceof ReadableStream)) { @@ -153,37 +151,36 @@ function XStream(options: XStreamOptions) { // Default encoding is `utf-8` const decoderStream = new TextDecoderStream(); - const stream = transformStream - ? /** - * Uint8Array binary -> string -> Output - */ - readableStream.pipeThrough(decoderStream).pipeThrough(transformStream) - : /** - * Uint8Array binary -> string -> SSE part string -> Default Output {@link SSEOutput} - */ - readableStream.pipeThrough(decoderStream).pipeThrough(splitStream()).pipeThrough(splitPart()); + const stream = ( + transformStream + ? /** + * Uint8Array binary -> string -> Output + */ + readableStream.pipeThrough(decoderStream).pipeThrough(transformStream) + : /** + * Uint8Array binary -> string -> SSE part string -> Default Output {@link SSEOutput} + */ + readableStream + .pipeThrough(decoderStream) + .pipeThrough(splitStream()) + .pipeThrough(splitPart()) + ) as XReadableStream; /** support async iterator */ - (stream as XReadableStream)[Symbol.asyncIterator] = async function* () { - const reader = stream.getReader() as ReadableStreamDefaultReader; + stream[Symbol.asyncIterator] = async function* () { + const reader = this.getReader(); - try { - while (reader instanceof ReadableStreamDefaultReader) { - const { done, value } = await reader.read(); + while (true) { + const { done, value } = await reader.read(); - if (done) break; + if (done) break; - if (!value) continue; + if (!value) continue; - // Transformed data through all transform pipes - yield value; - } - } finally { - await reader.cancel(); + // Transformed data through all transform pipes + yield value; } - }; - - return stream as XReadableStream; + } } export default XStream; From 98e44d811992cbb3891c3abd7c93e2ca8fdd051c Mon Sep 17 00:00:00 2001 From: ppbl <33046279+ppbl@users.noreply.github.com> Date: Tue, 17 Dec 2024 19:20:58 +0800 Subject: [PATCH 06/14] refactor: add type --- components/x-stream/index.ts | 2 ++ 1 file changed, 2 insertions(+) diff --git a/components/x-stream/index.ts b/components/x-stream/index.ts index fc0989f6..02f3a5f2 100644 --- a/components/x-stream/index.ts +++ b/components/x-stream/index.ts @@ -137,6 +137,8 @@ export interface XStreamOptions { transformStream?: TransformStream; } +type XReadableStream = ReadableStream & AsyncGenerator; + /** * @description Transform Uint8Array binary stream to {@link SSEOutput} by default * @warning The `XStream` only support the `utf-8` encoding. More encoding support maybe in the future. From 793b4eefaacffca34b518083a072dce5cfc9c573 Mon Sep 17 00:00:00 2001 From: ppbl <33046279+ppbl@users.noreply.github.com> Date: Tue, 17 Dec 2024 19:23:04 +0800 Subject: [PATCH 07/14] feat: return stream --- components/x-stream/index.ts | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/components/x-stream/index.ts b/components/x-stream/index.ts index 02f3a5f2..cc1aa20e 100644 --- a/components/x-stream/index.ts +++ b/components/x-stream/index.ts @@ -143,7 +143,7 @@ type XReadableStream = ReadableStream & AsyncGenerator; * @description Transform Uint8Array binary stream to {@link SSEOutput} by default * @warning The `XStream` only support the `utf-8` encoding. More encoding support maybe in the future. */ -async function* XStream(options: XStreamOptions) { +function XStream(options: XStreamOptions) { const { readableStream, transformStream } = options; if (!(readableStream instanceof ReadableStream)) { @@ -183,6 +183,8 @@ async function* XStream(options: XStreamOptions) { yield value; } } + + return stream; } export default XStream; From 717b11d55b261f9edadd422bbca2e105db3a0512 Mon Sep 17 00:00:00 2001 From: ppbl <33046279+ppbl@users.noreply.github.com> Date: Tue, 17 Dec 2024 19:28:45 +0800 Subject: [PATCH 08/14] feat: XReadableStream -> XReadableStream --- components/x-stream/index.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/components/x-stream/index.ts b/components/x-stream/index.ts index cc1aa20e..6ae18a98 100644 --- a/components/x-stream/index.ts +++ b/components/x-stream/index.ts @@ -166,7 +166,7 @@ function XStream(options: XStreamOptions) { .pipeThrough(decoderStream) .pipeThrough(splitStream()) .pipeThrough(splitPart()) - ) as XReadableStream; + ) as XReadableStream; /** support async iterator */ stream[Symbol.asyncIterator] = async function* () { From 56ed94ff511406f55dc206bfb51dfed109bfb7d7 Mon Sep 17 00:00:00 2001 From: ppbl <33046279+ppbl@users.noreply.github.com> Date: Wed, 18 Dec 2024 15:46:01 +0800 Subject: [PATCH 09/14] feat: add test --- components/x-stream/__tests__/index.test.tsx | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/components/x-stream/__tests__/index.test.tsx b/components/x-stream/__tests__/index.test.tsx index feee60f4..8d394dd8 100644 --- a/components/x-stream/__tests__/index.test.tsx +++ b/components/x-stream/__tests__/index.test.tsx @@ -97,4 +97,17 @@ describe('XStream', () => { })(), ).rejects.toThrow('The key-value separator ":" is not found in the sse line chunk!'); }); + + it('should return an instance of ReadableStream', () => { + expect( + XStream({ + readableStream: new ReadableStream({ + async start(controller) { + controller.enqueue(new TextEncoder().encode('event: message\n\ndata: value\n\n')); + controller.close(); + }, + }), + }), + ).toBeInstanceOf(ReadableStream); + }); }); From 359987195e4540b986026c5c767c237f65cbe1d4 Mon Sep 17 00:00:00 2001 From: ppbl <33046279+ppbl@users.noreply.github.com> Date: Wed, 18 Dec 2024 16:16:04 +0800 Subject: [PATCH 10/14] feat: add interrupt streaming output demo --- components/useXChat/demo/stream-cancel.md | 7 ++ components/useXChat/demo/stream-cancel.tsx | 113 +++++++++++++++++++++ components/useXChat/index.en-US.md | 1 + components/useXChat/index.zh-CN.md | 1 + 4 files changed, 122 insertions(+) create mode 100644 components/useXChat/demo/stream-cancel.md create mode 100644 components/useXChat/demo/stream-cancel.tsx diff --git a/components/useXChat/demo/stream-cancel.md b/components/useXChat/demo/stream-cancel.md new file mode 100644 index 00000000..f9f993b8 --- /dev/null +++ b/components/useXChat/demo/stream-cancel.md @@ -0,0 +1,7 @@ +## zh-CN + +打断正在流式输出的内容。 + +## en-US + +Interrupt the ongoing streaming output. diff --git a/components/useXChat/demo/stream-cancel.tsx b/components/useXChat/demo/stream-cancel.tsx new file mode 100644 index 00000000..ad8f8631 --- /dev/null +++ b/components/useXChat/demo/stream-cancel.tsx @@ -0,0 +1,113 @@ +import { UserOutlined } from '@ant-design/icons'; +import { Bubble, Sender, useXAgent, useXChat, XStream } from '@ant-design/x'; +import { Flex, type GetProp } from 'antd'; +import React, { useRef } from 'react'; + +const roles: GetProp = { + ai: { + placement: 'start', + avatar: { icon: , style: { background: '#fde3cf' } }, + }, + local: { + placement: 'end', + avatar: { icon: , style: { background: '#87d068' } }, + }, +}; + +const contentChunks = [ + 'He', + 'llo', + ', w', + 'or', + 'ld!', + ' Ant', + ' Design', + ' X', + ' is', + ' the', + ' best', + '!', +]; + +function mockReadableStream() { + const sseChunks: string[] = []; + + for (let i = 0; i < contentChunks.length; i++) { + const sseEventPart = `event: message\ndata: {"id":"${i}","content":"${contentChunks[i]}"}\n\n`; + sseChunks.push(sseEventPart); + } + + return new ReadableStream({ + async start(controller) { + for (const chunk of sseChunks) { + await new Promise((resolve) => setTimeout(resolve, 300)); + controller.enqueue(new TextEncoder().encode(chunk)); + } + controller.close(); + }, + }); +} + +const App = () => { + const [content, setContent] = React.useState(''); + + const abortRef = useRef(() => {}); + + // Agent for request + const [agent] = useXAgent({ + request: async ({}, { onSuccess, onUpdate }) => { + const stream = XStream({ + readableStream: mockReadableStream(), + }); + + const reader = stream.getReader(); + abortRef.current = () => { + reader?.cancel(); + }; + + let current = ''; + while (reader) { + const { value, done } = await reader.read(); + if (done) { + onSuccess(current); + break; + } + if (!value) continue; + const data = JSON.parse(value.data); + current += data.content || ''; + onUpdate(current); + } + }, + }); + + // Chat messages + const { onRequest, messages } = useXChat({ + agent, + }); + + return ( + + ({ + key: id, + role: status === 'local' ? 'local' : 'ai', + content: message, + }))} + /> + { + onRequest(nextContent); + setContent(''); + }} + onCancel={() => abortRef.current()} + /> + + ); +}; + +export default App; diff --git a/components/useXChat/index.en-US.md b/components/useXChat/index.en-US.md index 3b4cba62..416a7544 100644 --- a/components/useXChat/index.en-US.md +++ b/components/useXChat/index.en-US.md @@ -20,6 +20,7 @@ Use Agent to manage conversation data and produce data for page rendering. Basic Streaming +Interrupt the output Multiple Suggestion ## API diff --git a/components/useXChat/index.zh-CN.md b/components/useXChat/index.zh-CN.md index 5f5f10a3..4477cadc 100644 --- a/components/useXChat/index.zh-CN.md +++ b/components/useXChat/index.zh-CN.md @@ -21,6 +21,7 @@ demo: 基本 流式输出 +打断输出 多项建议 ## API From 6831da6a5486ce5f4190757c73466c70d87debd3 Mon Sep 17 00:00:00 2001 From: ppbl <33046279+ppbl@users.noreply.github.com> Date: Wed, 18 Dec 2024 16:21:10 +0800 Subject: [PATCH 11/14] feat: add cleanup code --- components/useXChat/demo/stream-cancel.tsx | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/components/useXChat/demo/stream-cancel.tsx b/components/useXChat/demo/stream-cancel.tsx index ad8f8631..038dfe76 100644 --- a/components/useXChat/demo/stream-cancel.tsx +++ b/components/useXChat/demo/stream-cancel.tsx @@ -53,6 +53,12 @@ const App = () => { const abortRef = useRef(() => {}); + useEffect(() => { + return () => { + abortRef.current(); + }; + }, []); + // Agent for request const [agent] = useXAgent({ request: async ({}, { onSuccess, onUpdate }) => { From 465167fa2364f65cb75900827891f155ca28abe7 Mon Sep 17 00:00:00 2001 From: ppbl <33046279+ppbl@users.noreply.github.com> Date: Wed, 18 Dec 2024 16:24:48 +0800 Subject: [PATCH 12/14] fix: add useEffect import --- components/useXChat/demo/stream-cancel.tsx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/components/useXChat/demo/stream-cancel.tsx b/components/useXChat/demo/stream-cancel.tsx index 038dfe76..cc5b7ff7 100644 --- a/components/useXChat/demo/stream-cancel.tsx +++ b/components/useXChat/demo/stream-cancel.tsx @@ -1,7 +1,7 @@ import { UserOutlined } from '@ant-design/icons'; import { Bubble, Sender, useXAgent, useXChat, XStream } from '@ant-design/x'; import { Flex, type GetProp } from 'antd'; -import React, { useRef } from 'react'; +import React, { useEffect, useRef } from 'react'; const roles: GetProp = { ai: { From bd38b5dd86457af5988d1c1e26493f46d0324f5c Mon Sep 17 00:00:00 2001 From: ppbl <33046279+ppbl@users.noreply.github.com> Date: Fri, 20 Dec 2024 13:33:34 +0800 Subject: [PATCH 13/14] fix: lint --- components/useXChat/demo/stream-cancel.tsx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/components/useXChat/demo/stream-cancel.tsx b/components/useXChat/demo/stream-cancel.tsx index cc5b7ff7..98639410 100644 --- a/components/useXChat/demo/stream-cancel.tsx +++ b/components/useXChat/demo/stream-cancel.tsx @@ -61,7 +61,7 @@ const App = () => { // Agent for request const [agent] = useXAgent({ - request: async ({}, { onSuccess, onUpdate }) => { + request: async (_, { onSuccess, onUpdate }) => { const stream = XStream({ readableStream: mockReadableStream(), }); From 075ff2a0dffeb62eaed5c11ef8ae418b00c81335 Mon Sep 17 00:00:00 2001 From: ppbl <33046279+ppbl@users.noreply.github.com> Date: Fri, 20 Dec 2024 14:53:34 +0800 Subject: [PATCH 14/14] feat: update snapshot --- .../__tests__/__snapshots__/demo.test.ts.snap | 58 +++++++++++++++++++ 1 file changed, 58 insertions(+) diff --git a/components/useXChat/__tests__/__snapshots__/demo.test.ts.snap b/components/useXChat/__tests__/__snapshots__/demo.test.ts.snap index bea7dad7..2699e1b6 100644 --- a/components/useXChat/__tests__/__snapshots__/demo.test.ts.snap +++ b/components/useXChat/__tests__/__snapshots__/demo.test.ts.snap @@ -116,6 +116,64 @@ exports[`renders components/useXChat/demo/stream.tsx correctly 1`] = ` `; +exports[`renders components/useXChat/demo/stream-cancel.tsx correctly 1`] = ` +
+
+
+
+