Skip to content

Commit

Permalink
[feature][process] Fuck process (DataLinkDC#2432)
Browse files Browse the repository at this point in the history
* Optimize the process

* Fixed SSE registration failure under certain circumstances

* delete process model

* change sse log level

* add todo

* formate code
  • Loading branch information
gaoyan1998 authored Oct 25, 2023
1 parent 35e751e commit 4256a3b
Show file tree
Hide file tree
Showing 42 changed files with 76 additions and 175 deletions.
14 changes: 7 additions & 7 deletions dinky-admin/src/main/java/org/dinky/aop/ProcessAspect.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@
package org.dinky.aop;

import org.dinky.context.ConsoleContextHolder;
import org.dinky.process.annotations.ExecuteProcess;
import org.dinky.process.annotations.ProcessId;
import org.dinky.process.annotations.ProcessStep;
import org.dinky.process.enums.ProcessStatus;
import org.dinky.process.enums.ProcessStepType;
import org.dinky.process.enums.ProcessType;
import org.dinky.process.model.ProcessStepEntity;
import org.dinky.data.annotations.ExecuteProcess;
import org.dinky.data.annotations.ProcessId;
import org.dinky.data.annotations.ProcessStep;
import org.dinky.data.enums.ProcessStatus;
import org.dinky.data.enums.ProcessStepType;
import org.dinky.data.enums.ProcessType;
import org.dinky.data.model.ProcessStepEntity;

import org.apache.http.util.TextUtils;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@

import org.dinky.classloader.DinkyClassLoader;
import org.dinky.context.DinkyClassLoaderContextHolder;
import org.dinky.data.exception.DinkyException;
import org.dinky.job.JobResult;
import org.dinky.process.exception.DinkyException;

import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@
package org.dinky.context;

import org.dinky.aop.ProcessAspect;
import org.dinky.data.enums.ProcessStatus;
import org.dinky.data.enums.ProcessStepType;
import org.dinky.data.enums.ProcessType;
import org.dinky.data.enums.SseTopic;
import org.dinky.data.exception.BusException;
import org.dinky.process.enums.ProcessStatus;
import org.dinky.process.enums.ProcessStepType;
import org.dinky.process.enums.ProcessType;
import org.dinky.process.exception.DinkyException;
import org.dinky.process.model.ProcessEntity;
import org.dinky.process.model.ProcessStepEntity;
import org.dinky.data.exception.DinkyException;
import org.dinky.data.model.ProcessEntity;
import org.dinky.data.model.ProcessStepEntity;
import org.dinky.utils.LogUtil;

import org.apache.http.util.TextUtils;
Expand All @@ -36,11 +36,11 @@
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;

import org.slf4j.MDC;

Expand Down Expand Up @@ -129,7 +129,7 @@ public void registerProcess(ProcessType type, String processName) throws Runtime
.type(type)
.title(processName)
.startTime(LocalDateTime.now())
.children(new LinkedList<>())
.children(new CopyOnWriteArrayList<>())
.build();
logPross.put(processName, entity);
appendLog(processName, null, "Start Process:" + processName);
Expand Down Expand Up @@ -157,7 +157,7 @@ public ProcessStepEntity registerProcessStep(ProcessStepType type, String proces
.type(type)
.title(type.getDesc().getMessage())
.log(new StringBuilder())
.children(new LinkedList<>())
.children(new CopyOnWriteArrayList<>())
.build();

if (TextUtils.isEmpty(parentStepPid)) {
Expand Down Expand Up @@ -223,7 +223,7 @@ public void finishedStep(String processName, ProcessStepEntity step, ProcessStat
StrFormatter.format("Process Step {} exit with status:{}", step.getType(), status));
}

private ProcessStepEntity getStepNode(String stepPid, LinkedList<ProcessStepEntity> stepsMap) {
private ProcessStepEntity getStepNode(String stepPid, CopyOnWriteArrayList<ProcessStepEntity> stepsMap) {
ProcessStepEntity stepNode = findStepNode(stepPid, stepsMap);
if (stepNode != null) {
return stepNode;
Expand All @@ -240,7 +240,7 @@ private ProcessStepEntity getStepNode(String stepPid, LinkedList<ProcessStepEnti
/**
* 递归查找节点
* */
private ProcessStepEntity findStepNode(String stepPid, LinkedList<ProcessStepEntity> stepsMap) {
private ProcessStepEntity findStepNode(String stepPid, CopyOnWriteArrayList<ProcessStepEntity> stepsMap) {
for (ProcessStepEntity processStepEntity : stepsMap) {
if (processStepEntity.getKey().equals(stepPid)) {
return processStepEntity;
Expand All @@ -254,7 +254,7 @@ private ProcessStepEntity findStepNode(String stepPid, LinkedList<ProcessStepEnt
return null;
}

private LinkedList<ProcessStepEntity> getStepsMap(String processName) {
private CopyOnWriteArrayList<ProcessStepEntity> getStepsMap(String processName) {
return logPross.get(processName).getChildren();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public static Set<String> subscribeTopic(String sessionId, List<String> topics)
* @return The SseEmitter for the session.
*/
public static SseEmitter connectSession(String sessionKey) {
log.info("New session wants to connect: {}", sessionKey);
log.debug("New session wants to connect: {}", sessionKey);
if (exists(sessionKey)) {
log.warn("Session key already exists: {}", sessionKey);
closeSse(sessionKey);
Expand Down Expand Up @@ -97,7 +97,7 @@ public static boolean exists(String sessionKey) {
* @param sessionKey The session key of the timed-out session.
*/
public static void onTimeout(String sessionKey) {
log.info("Type: SseSession Timeout, Session ID: {}", sessionKey);
log.debug("Type: SseSession Timeout, Session ID: {}", sessionKey);
closeSse(sessionKey);
}

Expand Down Expand Up @@ -142,7 +142,7 @@ public static void onError(String sessionKey, Throwable throwable) {
* @param sessionKey The session key of the completed session.
*/
public static void onCompletion(String sessionKey) {
log.info("Type: SseSession Completion, Session ID: {}", sessionKey);
log.debug("Type: SseSession Completion, Session ID: {}", sessionKey);
closeSse(sessionKey);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
package org.dinky.controller;

import org.dinky.context.ConsoleContextHolder;
import org.dinky.data.model.ProcessEntity;
import org.dinky.data.result.ProTableResult;
import org.dinky.data.result.Result;
import org.dinky.process.model.ProcessEntity;

import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@
package org.dinky.controller;

import org.dinky.data.annotation.Log;
import org.dinky.data.annotations.ExecuteProcess;
import org.dinky.data.annotations.ProcessId;
import org.dinky.data.dto.TaskDTO;
import org.dinky.data.dto.TaskRollbackVersionDTO;
import org.dinky.data.enums.BusinessType;
import org.dinky.data.enums.JobLifeCycle;
import org.dinky.data.enums.ProcessType;
import org.dinky.data.enums.Status;
import org.dinky.data.exception.NotSupportExplainExcepition;
import org.dinky.data.model.Task;
Expand All @@ -33,9 +36,6 @@
import org.dinky.gateway.enums.SavePointType;
import org.dinky.gateway.result.SavePointResult;
import org.dinky.job.JobResult;
import org.dinky.process.annotations.ExecuteProcess;
import org.dinky.process.annotations.ProcessId;
import org.dinky.process.enums.ProcessType;
import org.dinky.service.TaskService;

import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@

package org.dinky.data.exception;

import org.dinky.process.exception.ExcuteException;

public class SqlExplainExcepition extends ExcuteException {
public SqlExplainExcepition(String message) {
super(message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@

package org.dinky.data.exception;

import org.dinky.process.exception.ExcuteException;

public class TaskNotDoneException extends ExcuteException {
public TaskNotDoneException(String message) {
super(message);
Expand Down
2 changes: 1 addition & 1 deletion dinky-admin/src/main/java/org/dinky/init/SystemInit.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.dinky.context.TenantContextHolder;
import org.dinky.daemon.task.DaemonFactory;
import org.dinky.daemon.task.DaemonTaskConfig;
import org.dinky.data.exception.DinkyException;
import org.dinky.data.model.JobInstance;
import org.dinky.data.model.SystemConfiguration;
import org.dinky.data.model.Task;
Expand All @@ -33,7 +34,6 @@
import org.dinky.function.constant.PathConstant;
import org.dinky.function.pool.UdfCodePool;
import org.dinky.job.FlinkJobTask;
import org.dinky.process.exception.DinkyException;
import org.dinky.scheduler.client.ProjectClient;
import org.dinky.scheduler.exception.SchedulerException;
import org.dinky.scheduler.model.Project;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.dinky.data.dto.TaskDTO;
import org.dinky.data.dto.TaskRollbackVersionDTO;
import org.dinky.data.enums.JobLifeCycle;
import org.dinky.data.exception.ExcuteException;
import org.dinky.data.exception.NotSupportExplainExcepition;
import org.dinky.data.model.JobModelOverview;
import org.dinky.data.model.JobTypeOverView;
Expand All @@ -33,7 +34,6 @@
import org.dinky.gateway.result.SavePointResult;
import org.dinky.job.JobResult;
import org.dinky.mybatis.service.ISuperService;
import org.dinky.process.exception.ExcuteException;

import java.util.List;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public FlinkClusterInfo checkHeartBeat(String hosts, String host) {

@Override
public String getJobManagerAddress(ClusterInstance clusterInstance) {
// TODO 这里判空逻辑有问题,clusterInstance有可能为null
Assert.check(clusterInstance);
FlinkClusterInfo info =
FlinkCluster.testFlinkJobManagerIP(clusterInstance.getHosts(), clusterInstance.getJobManagerHost());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@
package org.dinky.service.impl;

import org.dinky.assertion.Asserts;
import org.dinky.data.annotations.ProcessStep;
import org.dinky.data.constant.CommonConstant;
import org.dinky.data.dto.SqlDTO;
import org.dinky.data.dto.TaskDTO;
import org.dinky.data.enums.ProcessStepType;
import org.dinky.data.enums.Status;
import org.dinky.data.model.Column;
import org.dinky.data.model.DataBase;
Expand All @@ -36,8 +38,6 @@
import org.dinky.metadata.driver.Driver;
import org.dinky.metadata.result.JdbcSelectResult;
import org.dinky.mybatis.service.impl.SuperServiceImpl;
import org.dinky.process.annotations.ProcessStep;
import org.dinky.process.enums.ProcessStepType;
import org.dinky.service.DataBaseService;

import org.apache.commons.lang3.StringUtils;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@
import org.dinky.data.dto.GitAnalysisJarDTO;
import org.dinky.data.dto.GitProjectDTO;
import org.dinky.data.dto.TreeNodeDTO;
import org.dinky.data.exception.DinkyException;
import org.dinky.data.model.GitProject;
import org.dinky.data.params.GitProjectSortJarParams;
import org.dinky.function.pool.UdfCodePool;
import org.dinky.mapper.GitProjectMapper;
import org.dinky.mybatis.service.impl.SuperServiceImpl;
import org.dinky.process.exception.DinkyException;
import org.dinky.service.GitProjectService;
import org.dinky.utils.GitRepository;
import org.dinky.utils.TreeUtil;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@
package org.dinky.service.impl;

import org.dinky.data.dto.MetricsLayoutDTO;
import org.dinky.data.exception.DinkyException;
import org.dinky.data.metrics.Jvm;
import org.dinky.data.model.Metrics;
import org.dinky.data.vo.MetricsVO;
import org.dinky.mapper.MetricsMapper;
import org.dinky.process.exception.DinkyException;
import org.dinky.service.MonitorService;
import org.dinky.utils.PaimonUtil;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@
import org.dinky.assertion.Asserts;
import org.dinky.config.Dialect;
import org.dinky.context.TenantContextHolder;
import org.dinky.data.annotations.ProcessStep;
import org.dinky.data.constant.CommonConstant;
import org.dinky.data.dto.AbstractStatementDTO;
import org.dinky.data.dto.TaskDTO;
import org.dinky.data.dto.TaskRollbackVersionDTO;
import org.dinky.data.enums.JobLifeCycle;
import org.dinky.data.enums.JobStatus;
import org.dinky.data.enums.ProcessStepType;
import org.dinky.data.enums.Status;
import org.dinky.data.exception.BusException;
import org.dinky.data.exception.NotSupportExplainExcepition;
Expand Down Expand Up @@ -65,8 +67,6 @@
import org.dinky.job.JobResult;
import org.dinky.mapper.TaskMapper;
import org.dinky.mybatis.service.impl.SuperServiceImpl;
import org.dinky.process.annotations.ProcessStep;
import org.dinky.process.enums.ProcessStepType;
import org.dinky.service.AlertGroupService;
import org.dinky.service.CatalogueService;
import org.dinky.service.ClusterConfigurationService;
Expand Down Expand Up @@ -238,7 +238,7 @@ public String buildEnvSql(AbstractStatementDTO task) {
task.setVariables(fragmentVariableService.listEnabledVariables());
}
int envId = Optional.ofNullable(task.getEnvId()).orElse(-1);
if (envId != -1) {
if (envId >= 0) {
TaskDTO envTask = this.getTaskInfoById(task.getEnvId());
if (Asserts.isNotNull(envTask) && Asserts.isNotNullString(envTask.getStatement())) {
sql += envTask.getStatement() + CommonConstant.LineSep;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
package org.dinky.service.resource.impl;

import org.dinky.data.exception.BusException;
import org.dinky.process.exception.DinkyException;
import org.dinky.data.exception.DinkyException;
import org.dinky.service.resource.BaseResourceManager;
import org.dinky.utils.OssTemplate;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
package org.dinky.sse.git;

import org.dinky.data.dto.GitAnalysisJarDTO;
import org.dinky.data.exception.DinkyException;
import org.dinky.data.model.GitProject;
import org.dinky.function.util.UDFUtil;
import org.dinky.process.exception.DinkyException;
import org.dinky.sse.StepSse;

import java.io.File;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@
package org.dinky.sse.git;

import org.dinky.data.dto.GitAnalysisJarDTO;
import org.dinky.data.exception.DinkyException;
import org.dinky.data.model.GitProject;
import org.dinky.data.model.SystemConfiguration;
import org.dinky.function.util.UDFUtil;
import org.dinky.process.exception.DinkyException;
import org.dinky.sse.StepSse;

import java.io.File;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
package org.dinky.utils;

import org.dinky.context.GitBuildContextHolder;
import org.dinky.data.exception.DinkyException;
import org.dinky.data.model.GitProject;
import org.dinky.data.result.StepResult;
import org.dinky.process.exception.DinkyException;
import org.dinky.sse.DoneStepSse;
import org.dinky.sse.StepSse;
import org.dinky.sse.git.AnalysisUdfClassStepSse;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
package org.dinky.utils;

import org.dinky.data.dto.GitProjectDTO;
import org.dinky.data.exception.DinkyException;
import org.dinky.function.constant.PathConstant;
import org.dinky.process.exception.DinkyException;

import java.io.File;
import java.io.StringWriter;
Expand Down
2 changes: 1 addition & 1 deletion dinky-admin/src/main/java/org/dinky/utils/MavenUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@

package org.dinky.utils;

import org.dinky.data.exception.DinkyException;
import org.dinky.data.model.SystemConfiguration;
import org.dinky.function.constant.PathConstant;
import org.dinky.process.exception.DinkyException;

import java.io.File;
import java.io.IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
*
*/

package org.dinky.process.annotations;
package org.dinky.data.annotations;

import org.dinky.process.enums.ProcessType;
import org.dinky.data.enums.ProcessType;

import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*
*/

package org.dinky.process.annotations;
package org.dinky.data.annotations;

import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
Expand Down
Loading

0 comments on commit 4256a3b

Please sign in to comment.