Skip to content

Commit

Permalink
fix sse bug
Browse files Browse the repository at this point in the history
  • Loading branch information
gaoyan1998 committed Dec 15, 2023
1 parent 45b0494 commit 54f342d
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,20 +94,21 @@ public ProcessEntity getProcess(String processName) {
*
* @param processName process name
* @param stepPid process step type
* @param log messages
* @param logLine messages
* @throws BusException Throws an exception if the process does not exist
*/
public void appendLog(String processName, String stepPid, String log, boolean recordGlobal) {
public void appendLog(String processName, String stepPid, String logLine, boolean recordGlobal) {
if (!logPross.containsKey(processName)) {
throw new BusException(StrFormatter.format("process {} does not exist", processName));
log.debug("Process {} does not exist, This log was abandoned", processName);
return;
}
ProcessEntity process = logPross.get(processName);
if (recordGlobal) {
process.appendLog(log);
process.appendLog(logLine);
}
if (stepPid != null) {
ProcessStepEntity stepNode = getStepNode(stepPid, getStepsMap(processName));
stepNode.appendLog(log);
stepNode.appendLog(logLine);
process.setLastUpdateStep(stepNode);
}
// /TOPIC/PROCESS_CONSOLE/FlinkSubmit/12
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ public static void sendTopic(String topic, Object content) {
* @param content The SSE data to send.
* @throws IOException If an I/O error occurs while sending the data.
*/
public static void sendSse(String sessionKey, SseDataVo content) throws IOException {
public static void sendSse(String sessionKey, SseDataVo content) throws Exception {
if (exists(sessionKey)) {
sessionMap.get(sessionKey).getEmitter().send(content);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,6 @@ public Set<SuggestionVO> getSuggestions(boolean enableSchemaSuggestion) {
if (enableSchemaSuggestion) {
buildSchemaSuggestions(new HashSet<>(), suggestionVOS);
}
// 4. 自定义关键词提示
buildCustomSuggestions(new HashSet<>(), suggestionVOS);
// flink config提示
buildFlinkConfSuggestions(suggestionVOS);
return suggestionVOS;
Expand Down Expand Up @@ -110,18 +108,6 @@ private static void buildSchemaSuggestions(Set<Object> buildingSchemaList, Set<S
// todo: 构建schema的建议列表 , 包含 库名 、表名、字段名、.... , 能做到根据库名点出表名,根据表名点出字段名
}

/**
* build custom suggestions
*
* @param customKeyWordList custom keyword list
* @param suggestionVOS suggestion list
*/
private static void buildCustomSuggestions(Set<Object> customKeyWordList, Set<SuggestionVO> suggestionVOS) {
// todo: 自定义关键词提示,
// 1. 此处自定义是属于 dinky 内部自定义语法关键词提示, 如果有片段, 将片段的建议列表加入到文档中进行提示
// 2. 可以加入 yml 语法的关键词提示 , 因为在集群配置中会有 yml 的配置文件写法 , 获取方式待定
}

/**
* build document suggestions
*
Expand Down

0 comments on commit 54f342d

Please sign in to comment.