Skip to content

Commit

Permalink
Bug fixs (#2487)
Browse files Browse the repository at this point in the history
* Optimize the process

* fix cluster json bug

* fix cluster cancel bug

* add check on online

* fix offline bug

* fix job life bug

* limitTheParametersPassedToATask

* formate code

* formate code

* formate code

* remove jar config

* Optimized the automatic process build format

* Added code comparison filter

* fix dinky-app env bug

* fix dinky-app env bug

* formate coed

* fix some code

* fix k8s client
  • Loading branch information
gaoyan1998 authored Nov 3, 2023
1 parent bc09f8f commit 1cdfd5f
Show file tree
Hide file tree
Showing 17 changed files with 255 additions and 89 deletions.
6 changes: 2 additions & 4 deletions dinky-admin/src/main/java/org/dinky/aop/ProcessAspect.java
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,8 @@ public Object processStepAround(ProceedingJoinPoint joinPoint, ProcessStep proce
contextHolder.finishedStep(MDC.get(PROCESS_NAME), step, ProcessStatus.FAILED, e);
throw e;
} finally {
// If a parent step exists, it is restored after the execution is complete
if (parentStep != null) {
MDC.put(PROCESS_STEP, parentStep);
}
// restored after the execution is complete
MDC.put(PROCESS_STEP, parentStep);
}
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ public void finishedProcess(String processName, ProcessStatus status, Throwable
if (e != null) {
appendLog(processName, null, LogUtil.getError(e.getCause()), true);
}
String filePath = String.format("%s/tmp/log/%s.json", System.getProperty("user.dir"), process.getTitle());
String filePath = String.format("%s/tmp/log/%s.json", System.getProperty("user.dir"), processName);
if (FileUtil.exist(filePath)) {
Assert.isTrue(FileUtil.del(filePath));
}
Expand Down
21 changes: 12 additions & 9 deletions dinky-admin/src/main/java/org/dinky/controller/TaskController.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@
import org.dinky.data.dto.DebugDTO;
import org.dinky.data.dto.TaskDTO;
import org.dinky.data.dto.TaskRollbackVersionDTO;
import org.dinky.data.dto.TaskSaveDTO;
import org.dinky.data.enums.BusinessType;
import org.dinky.data.enums.JobLifeCycle;
import org.dinky.data.enums.ProcessType;
import org.dinky.data.enums.Status;
import org.dinky.data.exception.NotSupportExplainExcepition;
import org.dinky.data.exception.SqlExplainExcepition;
import org.dinky.data.model.Task;
import org.dinky.data.result.ProTableResult;
import org.dinky.data.result.Result;
Expand Down Expand Up @@ -134,11 +136,12 @@ public Result<SavePointResult> savepoint(@RequestParam Integer taskId, @RequestP
Status.EXECUTE_SUCCESS);
}

@GetMapping("/onLineTask")
@Log(title = "onLineTask", businessType = BusinessType.TRIGGER)
@ApiOperation("onLineTask")
public Result<Boolean> onLineTask(@RequestParam Integer taskId) {
if (taskService.changeTaskLifeRecyle(taskId, JobLifeCycle.ONLINE)) {
@GetMapping("/changeTaskLife")
@Log(title = "changeTaskLife", businessType = BusinessType.TRIGGER)
@ApiOperation("changeTaskLife")
public Result<Boolean> changeTaskLife(@RequestParam Integer taskId, @RequestParam Integer lifeCycle)
throws SqlExplainExcepition {
if (taskService.changeTaskLifeRecyle(taskId, JobLifeCycle.get(lifeCycle))) {
return Result.succeed(Status.PUBLISH_SUCCESS);
} else {
return Result.failed(Status.PUBLISH_FAILED);
Expand All @@ -165,11 +168,11 @@ public Result<ObjectNode> getJobPlan(@ProcessId @RequestBody TaskDTO taskDTO) {
name = "task",
value = "Task",
required = true,
dataType = "Task",
dataType = "TaskSaveDTO",
paramType = "body",
dataTypeClass = Task.class)
public Result<Void> saveOrUpdateTask(@RequestBody Task task) {
if (taskService.saveOrUpdateTask(task)) {
dataTypeClass = TaskSaveDTO.class)
public Result<Void> saveOrUpdateTask(@RequestBody TaskSaveDTO task) {
if (taskService.saveOrUpdateTask(task.toTaskEntity())) {
return Result.succeed(Status.SAVE_SUCCESS);
} else {
return Result.failed(Status.SAVE_FAILED);
Expand Down
6 changes: 0 additions & 6 deletions dinky-admin/src/main/java/org/dinky/data/dto/TaskDTO.java
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,6 @@ public class TaskDTO extends AbstractStatementDTO {
notes = "The identifier of the database")
private Integer databaseId;

@ApiModelProperty(value = "JAR ID", dataType = "Integer", example = "4", notes = "The identifier of the JAR")
private Integer jarId;

@ApiModelProperty(
value = "Alert Group ID",
dataType = "Integer",
Expand Down Expand Up @@ -174,9 +171,6 @@ public class TaskDTO extends AbstractStatementDTO {
@ApiModelProperty(value = "Path", dataType = "String", notes = "Path associated with the task")
private String path;

@ApiModelProperty(value = "JAR Name", dataType = "String", notes = "Name of the associated JAR")
private String jarName;

@ApiModelProperty(
value = "Cluster Configuration Name",
dataType = "String",
Expand Down
151 changes: 151 additions & 0 deletions dinky-admin/src/main/java/org/dinky/data/dto/TaskSaveDTO.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.dinky.data.dto;

import org.dinky.data.model.Task;
import org.dinky.data.model.TaskExtConfig;
import org.dinky.data.typehandler.JSONObjectHandler;
import org.dinky.mybatis.annotation.Save;

import org.apache.ibatis.type.JdbcType;

import javax.validation.constraints.NotNull;

import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;

import cn.hutool.core.bean.BeanUtil;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;

@Data
public class TaskSaveDTO {

/** 主键ID */
@TableId(value = "id", type = IdType.AUTO)
@ApiModelProperty(value = "ID", required = true, dataType = "Integer", example = "1", notes = "Primary Key")
private Integer id;

@NotNull(
message = "Name cannot be null",
groups = {Save.class})
@ApiModelProperty(value = "Name", required = true, dataType = "String", example = "Name")
private String name;

@NotNull(
message = "Enabled cannot be null",
groups = {Save.class})
@ApiModelProperty(value = "Enabled", required = true, dataType = "Boolean", example = "true")
private Boolean enabled;

@ApiModelProperty(value = "Dialect", dataType = "String", notes = "Dialect for the task")
private String dialect;

@ApiModelProperty(value = "Type", dataType = "String", notes = "Type of the task")
private String type;

@ApiModelProperty(value = "Check Point", dataType = "Integer", example = "1", notes = "Check point for the task")
private Integer checkPoint;

@ApiModelProperty(value = "Save point strategy", dataType = "SavePointStrategy", notes = "Save point strategy")
private Integer savePointStrategy;

@ApiModelProperty(value = "Save Point Path", dataType = "String", notes = "Save point path for the task")
private String savePointPath;

@ApiModelProperty(value = "Parallelism", dataType = "Integer", example = "4", notes = "Parallelism for the task")
private Integer parallelism;

@ApiModelProperty(
value = "Fragment",
dataType = "Boolean",
example = "true",
notes = "Fragment option for the task")
private Boolean fragment;

@ApiModelProperty(
value = "Statement Set",
dataType = "Boolean",
example = "false",
notes = "Statement set option for the task")
private Boolean statementSet;

@ApiModelProperty(
value = "Batch Model",
dataType = "Boolean",
example = "true",
notes = "Batch model option for the task")
private Boolean batchModel;

@ApiModelProperty(
value = "ClusterInstance ID",
dataType = "Integer",
example = "2001",
notes = "ID of the cluster associated with the task")
private Integer clusterId;

@ApiModelProperty(
value = "Cluster Configuration ID",
dataType = "Integer",
example = "3001",
notes = "ID of the cluster configuration associated with the task")
private Integer clusterConfigurationId;

@ApiModelProperty(
value = "Database ID",
dataType = "Integer",
example = "4001",
notes = "ID of the database associated with the task")
private Integer databaseId;

@ApiModelProperty(
value = "Environment ID",
dataType = "Integer",
example = "6001",
notes = "ID of the environment associated with the task")
private Integer envId;

@ApiModelProperty(
value = "Alert Group ID",
dataType = "Integer",
example = "7001",
notes = "ID of the alert group associated with the task")
private Integer alertGroupId;

@ApiModelProperty(
value = "Configuration JSON",
dataType = "TaskExtConfig",
notes = "Extended configuration in JSON format for the task")
@TableField(typeHandler = JSONObjectHandler.class, jdbcType = JdbcType.VARCHAR)
private TaskExtConfig configJson;

@ApiModelProperty(value = "Note", dataType = "String", notes = "Additional notes for the task")
private String note;

@ApiModelProperty(value = "Statement", dataType = "String", notes = "SQL statement for the task")
private String statement;

public Task toTaskEntity() {
Task task = new Task();
BeanUtil.copyProperties(this, task);
return task;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.dinky.data.model.mapping;

import org.dinky.data.model.ClusterConfiguration;
import org.dinky.gateway.model.FlinkClusterConfig;

import java.time.LocalDateTime;

Expand Down Expand Up @@ -75,7 +76,7 @@ public class ClusterConfigurationMapping {
dataType = "String",
example = "test",
notes = "cluster config json")
private String configJson;
private FlinkClusterConfig configJson;

@ApiModelProperty(
value = "isAvailable",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@
import com.alibaba.fastjson2.JSON;
import com.fasterxml.jackson.databind.JsonNode;

import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -249,8 +248,8 @@ private static void handleJobDone(JobInfoDetail jobInfoDetail) {

if (GatewayType.isDeployCluster(clusterType)) {
JobConfig jobConfig = new JobConfig();
String configJson = jobDataDto.getClusterConfiguration().getConfigJson();
jobConfig.buildGatewayConfig(new JSONObject(configJson).toBean(FlinkClusterConfig.class));
FlinkClusterConfig configJson = jobDataDto.getClusterConfiguration().getConfigJson();
jobConfig.buildGatewayConfig(configJson);
jobConfig.getGatewayConfig().setType(GatewayType.get(clusterType));
jobConfig.getGatewayConfig().getFlinkConfig().setJobName(jobInstance.getName());
Gateway.build(jobConfig.getGatewayConfig()).onJobFinishCallback(jobInstance.getStatus());
Expand Down
3 changes: 2 additions & 1 deletion dinky-admin/src/main/java/org/dinky/service/TaskService.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.dinky.data.enums.JobLifeCycle;
import org.dinky.data.exception.ExcuteException;
import org.dinky.data.exception.NotSupportExplainExcepition;
import org.dinky.data.exception.SqlExplainExcepition;
import org.dinky.data.model.JobModelOverview;
import org.dinky.data.model.JobTypeOverView;
import org.dinky.data.model.Task;
Expand Down Expand Up @@ -162,7 +163,7 @@ public interface TaskService extends ISuperService<Task> {
* @param lifeCycle The new life cycle of the task.
* @return true if the life cycle is successfully changed, false otherwise.
*/
boolean changeTaskLifeRecyle(Integer taskId, JobLifeCycle lifeCycle);
boolean changeTaskLifeRecyle(Integer taskId, JobLifeCycle lifeCycle) throws SqlExplainExcepition;

/**
* Save or update the given task.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ private String[] buildParams(int id) {
}

@ProcessStep(type = ProcessStepType.SUBMIT_PRECHECK)
public void preCheckTask(TaskDTO task) throws TaskNotDoneException, SqlExplainExcepition {
public void preCheckTask(TaskDTO task) throws TaskNotDoneException {
log.info("Start check and config task, task:{}", task.getName());

Assert.notNull(task, Status.TASK_NOT_EXIST.getMessage());
Expand All @@ -180,21 +180,6 @@ public void preCheckTask(TaskDTO task) throws TaskNotDoneException, SqlExplainEx
throw new BusException(Status.TASK_STATUS_IS_NOT_DONE.getMessage());
}
}

log.info("Start explain Sql,task: {},Dialect:{}", task.getName(), task.getDialect());

List<SqlExplainResult> sqlExplainResults = explainTask(task);
for (SqlExplainResult sqlExplainResult : sqlExplainResults) {
if (!sqlExplainResult.isParseTrue() || !sqlExplainResult.isExplainTrue()) {
throw new SqlExplainExcepition(StrFormatter.format(
"task [{}] sql explain failed, sql [{}], error: [{}]",
task.getName(),
sqlExplainResult.getSql(),
sqlExplainResult.getError()));
}
}

log.info("Explain Sql finish");
}

@ProcessStep(type = ProcessStepType.SUBMIT_EXECUTE)
Expand All @@ -221,11 +206,17 @@ public JobConfig buildJobConfig(TaskDTO task) {
flinkClusterCfg.getAppConfig().setUserJarParas(buildParams(config.getTaskId()));
flinkClusterCfg.getAppConfig().setUserJarMainAppClass(CommonConstant.DINKY_APP_MAIN_CLASS);
config.buildGatewayConfig(flinkClusterCfg);
Optional.ofNullable(task.getJobInstanceId()).ifPresent(i -> {
JobInstance jobInstance = jobInstanceService.getById(i);
config.setClusterId(jobInstance.getClusterId());
});
} else {
log.info("Init remote cluster");
String address = clusterInstanceService.buildEnvironmentAddress(config.isUseRemote(), task.getClusterId());
config.setAddress(address);
Optional.ofNullable(task.getClusterId()).ifPresent(config::setClusterId);
}
log.info("Init remote cluster");
Optional.ofNullable(config.getClusterId()).ifPresent(i -> {
config.setAddress(clusterInstanceService.buildEnvironmentAddress(config.isUseRemote(), i));
});
return config;
}

Expand Down Expand Up @@ -263,6 +254,10 @@ public JobResult submitTask(Integer id, String savePointPath) throws Exception {
}
// 注解自调用会失效,这里通过获取对象方法绕过此限制
TaskServiceImpl taskServiceBean = applicationContext.getBean(TaskServiceImpl.class);
taskServiceBean.preCheckTask(taskDTO);
// The job instance does not exist by default,
// so that it does not affect other operations, such as checking the jobmanager address
taskDTO.setJobInstanceId(null);
JobResult jobResult = taskServiceBean.executeJob(taskDTO);
log.info("Job Submit success");
Task task = new Task(id, jobResult.getJobInstanceId());
Expand Down Expand Up @@ -427,19 +422,29 @@ public void initTenantByTaskId(Integer id) {
}

@Override
public boolean changeTaskLifeRecyle(Integer taskId, JobLifeCycle lifeCycle) {
TaskDTO taskInfoById = getTaskInfoById(taskId);
taskInfoById.setStep(lifeCycle.getValue());
if (lifeCycle == JobLifeCycle.ONLINE) {
taskVersionService.createTaskVersionSnapshot(taskInfoById);
}
return saveOrUpdate(taskInfoById.buildTask());
public boolean changeTaskLifeRecyle(Integer taskId, JobLifeCycle lifeCycle) throws SqlExplainExcepition {
TaskDTO task = getTaskInfoById(taskId);
task.setStep(lifeCycle.getValue());
if (lifeCycle == JobLifeCycle.PUBLISH) {
// List<SqlExplainResult> sqlExplainResults = explainTask(task);
// for (SqlExplainResult sqlExplainResult : sqlExplainResults) {
// if (!sqlExplainResult.isParseTrue() || !sqlExplainResult.isExplainTrue()) {
// throw new SqlExplainExcepition(StrFormatter.format(
// "task [{}] sql explain failed, sql [{}], error: [{}]",
// task.getName(),
// sqlExplainResult.getSql(),
// sqlExplainResult.getError()));
// }
// }
taskVersionService.createTaskVersionSnapshot(task);
}
return saveOrUpdate(task.buildTask());
}

@Override
public boolean saveOrUpdateTask(Task task) {

if (JobLifeCycle.ONLINE.equalsValue(task.getStep())) {
if (JobLifeCycle.PUBLISH.equalsValue(task.getStep())) {
throw new BusException(Status.TASK_IS_ONLINE.getMessage());
}

Expand Down
Loading

0 comments on commit 1cdfd5f

Please sign in to comment.