Skip to content

Commit

Permalink
feat(openai): SSE -> ReadableStream(chunk) 활용으로 변경
Browse files Browse the repository at this point in the history
- NestJS format SSE 활용하면 body data 받기 힘듦
- 특정 페이지에서만 요청한 값을 스트리밍으로 받는 것이기에 굳이 SSE 연결은 불필요하다 판단
  • Loading branch information
IsthisLee committed Aug 17, 2023
1 parent 944fa98 commit 8d6cf99
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 48 deletions.
18 changes: 9 additions & 9 deletions src/apis/laws/laws.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@ import {
ParseIntPipe,
UseGuards,
Delete,
Sse,
MessageEvent,
Res,
} from '@nestjs/common';
import { LawsService } from './laws.service';
import {
Expand Down Expand Up @@ -40,7 +39,7 @@ import { JwtUserPayload } from 'src/common/decorators/jwt-user.decorator';
import { JwtPayloadInfo } from 'src/common/types';
import { AuthGuard } from '@nestjs/passport';
import { GetBookmarkLawListDto } from './dtos/get-bookmark-laws.dto';
import { Observable, Subscriber } from 'rxjs';
import { Response } from 'express';

@Controller({ path: 'laws' })
@ApiTags('Laws')
Expand Down Expand Up @@ -259,12 +258,12 @@ export class LawsController {
return this.lawsService.createLawAdditionalSummary(type, id.toString());
}

@Sse(':type/:id/summary-stream')
@Post(':type/:id/summary-stream')
@ApiOperation({
summary: '판례/법령 본문 요약 요청 - SSE stream version',
summary: '판례/법령 본문 요약 요청 - stream version',
description: `
'더 쉽게 해석'을 위한 요청을 보내는 경우, 마지막에 제공받았던 요약문을 body의 recentAssistMsg에 담아서 요청합니다.\n\n
stream 버전 요약 API는 SSE로 응답이 제공됩니다. 요약 완료 시에는 이름이 'close'인 SSE event에 'true'(문자열) 데이터가 담겨서 넘어갑니다.`,
stream 버전 요약 API는 chunk data가 응답으로 제공됩니다. 클라이언트에서는 ReadableStream 형태로 받으시면 됩니다.`,
})
@ApiParam({
name: 'type',
Expand Down Expand Up @@ -303,16 +302,17 @@ export class LawsController {
description: 'recentSummaryMsg: 직전에 제공받은 요약문을 입력합니다.',
})
async createLawStreamSummary(
@Res() res: Response,
@Param('type', new ParseEnumPipe(SearchTabEnum)) type: SearchTabEnum,
@Param('id', new ParseIntPipe()) id: number,
@Body() requestSummaryDto?: RequestSummaryDto,
): Promise<Observable<MessageEvent>> {
const lawSummaryReadableStream: Stream<ChatCompletionChunk> = await this.lawsService.createLawStreamSummary(
): Promise<void> {
const lawSummaryStream: Stream<ChatCompletionChunk> = await this.lawsService.createLawStreamSummary(
type,
id.toString(),
requestSummaryDto?.recentSummaryMsg,
);
return this.openaiService.sendSSEWithOpenAIStream(lawSummaryReadableStream);
return this.openaiService.sendChunksWithOpenAIStream(res, lawSummaryStream);
}

@Post(':type/:id/bookmark')
Expand Down
4 changes: 2 additions & 2 deletions src/apis/laws/laws.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,9 @@ export class LawsService {
const summaryReqMsgs = await this.generateSummaryReqMessasges(lawDetail, recentSummaryMsg, {
onlySummary: true,
});
const summaryReadableStream = await this.openAiService.createAIStramChatCompletion(summaryReqMsgs);
const summaryStream = await this.openAiService.createAIStramChatCompletion(summaryReqMsgs);

return summaryReadableStream;
return summaryStream;
}

async postLawBookmark(userId: number, lawId: string, lawType: SearchTabEnum) {
Expand Down
41 changes: 4 additions & 37 deletions src/shared/services/openai.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,48 +41,15 @@ export class OpenaiService {
return stream;
}

async sendSSEWithOpenAIStream(opanAIStream: Stream<ChatCompletionChunk>): Promise<Observable<MessageEvent>> {
return new Observable((subscriber: Subscriber<MessageEvent>) => {
this.sendStreamToObservableData(opanAIStream, subscriber);
});
}
async sendChunksWithOpenAIStream(res: Response, opanAIStream: Stream<ChatCompletionChunk>): Promise<void> {
const encoder = new TextEncoder();

private async sendStreamToObservableData(
opanAIStream: Stream<ChatCompletionChunk>,
subscriber: Subscriber<MessageEvent>,
) {
for await (const part of opanAIStream) {
const content: string = part.choices[0]?.delta?.content;
if (content) {
subscriber.next({ data: content, retry: 1000 });
}
}

subscriber.next({
type: 'close',
data: 'true',
});

subscriber.complete();
}

// unused(legacy) code
async sendSSEResponseWithOpenAIStream(res: Response, opanAIStream: Stream<ChatCompletionChunk>): Promise<void> {
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
res.flushHeaders(); // SSE를 위해 지정된 헤더를 클라이언트에게 보냄

res.write('retry: 1000\n\n'); // 클라에서 연결이 끊기면 1초 간격으로 재연결을 시도하라는 의미

for await (const part of opanAIStream) {
const content: string = part.choices[0].delta.content;
content && res.write(`data: ${content}\n\n`);
const bytes = encoder.encode(content);
res.write(bytes);
}

res.write(`event: close\n`);
res.write(`data: true\n\n`);

res.end();
}

Expand Down

0 comments on commit 8d6cf99

Please sign in to comment.