Skip to content

Commit

Permalink
Merge branch 'DataLinkDC:dev' into dev
Browse files Browse the repository at this point in the history
  • Loading branch information
Zzm0809 authored Dec 25, 2023
2 parents fd5cd9c + 5be6960 commit 69586a0
Show file tree
Hide file tree
Showing 47 changed files with 593 additions and 273 deletions.
15 changes: 5 additions & 10 deletions .github/ISSUE_TEMPLATE/bug-report.yml
Original file line number Diff line number Diff line change
Expand Up @@ -95,20 +95,15 @@ body:
Which version of Dinky are you running? We only accept bugs report from the LTS projects.
options:
- dev
- 1.0.0
- 0.7.5
- 0.7.4
- 0.7.3
- 0.7.2
- 0.7.1
- 0.7.0
- 0.6.7
- 0.6.6
- 0.6.5
- 0.6.4
- 0.6.3
- 0.6.2
- 0.6.1
- 0.6.0
- 0.5.1
- 0.5.0
- 0.6.*
- 0.5.*
validations:
required: true

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,10 @@ public Result<JobResult> submitTask(@RequestBody TaskSubmitDto submitDto) throws
@GetMapping("/cancel")
// @Log(title = "Cancel Flink Job", businessType = BusinessType.TRIGGER)
@ApiOperation("Cancel Flink Job")
public Result<Boolean> cancel(@RequestParam Integer id) {
return Result.succeed(taskService.cancelTaskJob(taskService.getTaskInfoById(id)), Status.EXECUTE_SUCCESS);
public Result<Boolean> cancel(
@RequestParam Integer id, @RequestParam(defaultValue = "false") boolean withSavePoint) {
return Result.succeed(
taskService.cancelTaskJob(taskService.getTaskInfoById(id), withSavePoint), Status.EXECUTE_SUCCESS);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,9 @@ public class ClusterInstanceController {
mode = SaMode.OR)
public Result<Void> saveOrUpdateClusterInstance(@RequestBody ClusterInstanceDTO clusterInstanceDTO)
throws Exception {
clusterInstanceDTO.setAutoRegisters(false);
if (clusterInstanceDTO.getAutoRegisters() == null) {
clusterInstanceDTO.setAutoRegisters(false);
}
clusterInstanceService.registersCluster(clusterInstanceDTO);
return Result.succeed(Status.SAVE_SUCCESS);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.dinky.assertion.Asserts;
import org.dinky.data.annotations.Log;
import org.dinky.data.enums.BusinessType;
import org.dinky.data.enums.Status;
import org.dinky.data.model.ID;
import org.dinky.data.model.devops.TaskManagerConfiguration;
import org.dinky.data.model.ext.JobInfoDetail;
Expand All @@ -40,6 +41,7 @@

import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.PutMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
Expand Down Expand Up @@ -69,6 +71,18 @@
public class JobInstanceController {
private final JobInstanceService jobInstanceService;

@PutMapping
@Log(title = "update JobInstance Job Id", businessType = BusinessType.INSERT_OR_UPDATE)
@ApiOperation("update JobInstance Job Id")
public Result<Void> updateJobInstanceJobId(@RequestBody JobInstance jobInstance) {
boolean updated = jobInstanceService.updateById(jobInstance);
if (updated) {
return Result.succeed(Status.SAVE_SUCCESS);
} else {
return Result.failed(Status.SAVE_FAILED);
}
}

/**
* 动态查询列表
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import cn.hutool.core.collection.CollUtil;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiOperation;
Expand Down Expand Up @@ -84,9 +83,6 @@ public Result<TaskDefinition> getTaskDefinition(@ApiParam(value = "dinky任务id
example = "1")
public Result<List<TaskMainInfo>> getTaskMainInfos(@ApiParam(value = "dinky任务id") @RequestParam Long dinkyTaskId) {
List<TaskMainInfo> taskMainInfos = schedulerService.getTaskMainInfos(dinkyTaskId);
if (CollUtil.isEmpty(taskMainInfos)) {
return Result.failed();
}
return Result.succeed(taskMainInfos);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,8 @@ public Result<JobResult> debugTask(@RequestBody TaskDTO task) throws Exception {
@GetMapping("/cancel")
@Log(title = "Cancel Flink Job", businessType = BusinessType.TRIGGER)
@ApiOperation("Cancel Flink Job")
public Result<Void> cancel(@RequestParam Integer id) {
if (taskService.cancelTaskJob(taskService.getTaskInfoById(id))) {
public Result<Void> cancel(@RequestParam Integer id, @RequestParam(defaultValue = "false") boolean withSavePoint) {
if (taskService.cancelTaskJob(taskService.getTaskInfoById(id), withSavePoint)) {
return Result.succeed(Status.EXECUTE_SUCCESS);
} else {
return Result.failed(Status.EXECUTE_FAILED);
Expand Down
3 changes: 1 addition & 2 deletions dinky-admin/src/main/java/org/dinky/init/SystemInit.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.dinky.daemon.pool.ScheduleThreadPool;
import org.dinky.daemon.task.DaemonTask;
import org.dinky.daemon.task.DaemonTaskConfig;
import org.dinky.data.exception.BusException;
import org.dinky.data.exception.DinkyException;
import org.dinky.data.model.Configuration;
import org.dinky.data.model.SystemConfiguration;
Expand Down Expand Up @@ -204,7 +203,7 @@ private void aboutDolphinSchedulerInitOperation(Object v) {
}
} catch (Exception e) {
log.error("Error in DolphinScheduler: ", e);
throw new BusException(
log.error(
"get or create DolphinScheduler project failed, please check the config of DolphinScheduler!");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public interface TaskService extends ISuperService<Task> {
* @param task The {@link TaskDTO} object representing the task to cancel.
* @return true if the task job is successfully cancelled, false otherwise.
*/
boolean cancelTaskJob(TaskDTO task);
boolean cancelTaskJob(TaskDTO task, boolean withSavePoint);

/**
* Get the stream graph of the given task job.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,15 @@
import org.dinky.scheduler.model.DinkyTaskParams;
import org.dinky.scheduler.model.DinkyTaskRequest;
import org.dinky.scheduler.model.ProcessDefinition;
import org.dinky.scheduler.model.ProcessTaskRelation;
import org.dinky.scheduler.model.Project;
import org.dinky.scheduler.model.TaskDefinition;
import org.dinky.scheduler.model.TaskMainInfo;
import org.dinky.scheduler.model.TaskRequest;
import org.dinky.service.CatalogueService;
import org.dinky.service.SchedulerService;
import org.dinky.utils.JsonUtils;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

Expand Down Expand Up @@ -76,6 +77,13 @@ public class SchedulerServiceImpl implements SchedulerService {
*/
@Override
public boolean pushAddTask(DinkyTaskRequest dinkyTaskRequest) {
// Use root catalog as process (workflow) name.
Catalogue catalogue = catalogueService.getOne(
new LambdaQueryWrapper<Catalogue>().eq(Catalogue::getTaskId, dinkyTaskRequest.getTaskId()));
if (catalogue == null) {
log.error(Status.DS_GET_NODE_LIST_ERROR.getMessage());
throw new BusException(Status.DS_GET_NODE_LIST_ERROR);
}

DinkyTaskParams dinkyTaskParams = new DinkyTaskParams();
dinkyTaskParams.setTaskId(dinkyTaskRequest.getTaskId());
Expand All @@ -84,77 +92,91 @@ public boolean pushAddTask(DinkyTaskRequest dinkyTaskRequest) {
dinkyTaskRequest.setTaskParams(JSONUtil.parseObj(dinkyTaskParams).toString());
dinkyTaskRequest.setTaskType(TASK_TYPE);

Catalogue catalogue = catalogueService.getOne(
new LambdaQueryWrapper<Catalogue>().eq(Catalogue::getTaskId, dinkyTaskRequest.getTaskId()));
if (catalogue == null) {
log.error(Status.DS_GET_NODE_LIST_ERROR.getMessage());
throw new BusException(Status.DS_GET_NODE_LIST_ERROR);
}

String processName = getDinkyNames(catalogue, 0);
long projectCode = SystemInit.getProject().getCode();
// Get process from dolphin scheduler
ProcessDefinition process = processClient.getProcessDefinitionInfo(projectCode, processName);

String taskName = catalogue.getName() + ":" + catalogue.getId();
dinkyTaskRequest.setName(taskName);

TaskRequest taskRequest = new TaskRequest();
JSONArray array = new JSONArray();
Long taskCode = taskClient.genTaskCode(projectCode);

// If the process does not exist, a process needs to be created.
if (process == null) {
dinkyTaskRequest.setCode(taskCode);
BeanUtil.copyProperties(dinkyTaskRequest, taskRequest);
taskRequest.setTimeoutFlag(dinkyTaskRequest.getTimeoutFlag());
taskRequest.setFlag(dinkyTaskRequest.getFlag());
taskRequest.setIsCache(dinkyTaskRequest.getIsCache());
JSONObject jsonObject = JSONUtil.parseObj(taskRequest);
array.set(jsonObject);
JSONArray taskArray = new JSONArray();
taskArray.set(jsonObject);
log.info(Status.DS_ADD_WORK_FLOW_DEFINITION_SUCCESS.getMessage());
// 随机出一个 x y 坐标

DagNodeLocation dagNodeLocation = new DagNodeLocation();
dagNodeLocation.setTaskCode(taskCode);
dagNodeLocation.setX(RandomUtil.randomLong(200, 500));
dagNodeLocation.setY(RandomUtil.randomLong(100, 400));
log.info("DagNodeLocation Info: {}", dagNodeLocation);

ProcessTaskRelation processTaskRelation = ProcessTaskRelation.generateProcessTaskRelation(taskCode);
JSONObject processTaskRelationJSONObject = JSONUtil.parseObj(processTaskRelation);
JSONArray taskRelationArray = new JSONArray();
taskRelationArray.set(processTaskRelationJSONObject);

processClient.createOrUpdateProcessDefinition(
projectCode, null, processName, taskCode, array.toString(), Arrays.asList(dagNodeLocation), false);
projectCode,
null,
processName,
taskCode,
taskRelationArray.toString(),
taskArray.toString(),
Arrays.asList(dagNodeLocation),
false);
return true;
}

if (process != null && process.getReleaseState() == ReleaseState.ONLINE) {
// If the workflow is in an online state, it cannot be updated.
if (process.getReleaseState() == ReleaseState.ONLINE) {
log.error(Status.DS_WORK_FLOW_DEFINITION_ONLINE.getMessage(), processName);
}

TaskMainInfo taskMainInfo = taskClient.getTaskMainInfo(projectCode, processName, taskName, "DINKY");
TaskMainInfo taskMainInfo = taskClient.getTaskMainInfo(projectCode, processName, taskName, TASK_TYPE);
// If task name exist, update task definition.
if (taskMainInfo != null) {
// if task name exist, update task definition
log.warn(Status.DS_WORK_FLOW_DEFINITION_TASK_NAME_EXIST.getMessage(), processName, taskName);
return pushUpdateTask(
projectCode, taskMainInfo.getProcessDefinitionCode(), taskMainInfo.getTaskCode(), dinkyTaskRequest);
}

// If the task does not exist, a dinky task needs to be created.
dinkyTaskRequest.setCode(taskCode);
BeanUtil.copyProperties(dinkyTaskRequest, taskRequest);
taskRequest.setTimeoutFlag(dinkyTaskRequest.getTimeoutFlag());
taskRequest.setFlag(dinkyTaskRequest.getFlag());
taskRequest.setIsCache(dinkyTaskRequest.getIsCache());

String taskDefinitionJsonObj = JSONUtil.toJsonStr(taskRequest);
if (process != null) {
taskClient.createTaskDefinition(
projectCode, process.getCode(), dinkyTaskRequest.getUpstreamCodes(), taskDefinitionJsonObj);
// 更新 process 的 location 信息
updateProcessDefinition(process, taskCode, taskRequest, array, projectCode);
taskClient.createTaskDefinition(
projectCode, process.getCode(), dinkyTaskRequest.getUpstreamCodes(), taskDefinitionJsonObj);
// update the location of process
updateProcessDefinition(process, taskCode, taskRequest, projectCode);

log.info(Status.DS_ADD_TASK_DEFINITION_SUCCESS.getMessage());
return true;
}
return false;
log.info(Status.DS_ADD_TASK_DEFINITION_SUCCESS.getMessage());
return true;
}

private void updateProcessDefinition(
ProcessDefinition process, Long taskCode, TaskRequest taskRequest, JSONArray array, long projectCode) {
JSONObject jsonObject = JSONUtil.parseObj(taskRequest);
array.set(jsonObject);
private void updateProcessDefinition(ProcessDefinition process, Long taskCode, TaskRequest task, long projectCode) {

List<DagNodeLocation> locations = new ArrayList<>();
DagData dagData = processClient.getProcessDefinitionInfo(projectCode, process.getCode());
if (dagData == null) {
log.error(Status.DS_WORK_FLOW_DEFINITION_NOT_EXIST.getMessage());
throw new BusException(Status.DS_WORK_FLOW_DEFINITION_NOT_EXIST);
}
List<ProcessTaskRelation> processTaskRelationList = dagData.getProcessTaskRelationList();
List<TaskDefinition> taskDefinitionList = dagData.getTaskDefinitionList();
List<DagNodeLocation> locations = process.getLocations();

if (CollUtil.isNotEmpty(process.getLocations())) {
boolean matched = process.getLocations().stream().anyMatch(location -> location.getTaskCode() == taskCode);
Expand All @@ -175,13 +197,13 @@ private void updateProcessDefinition(
.getAsLong();
long yMin = process.getLocations().stream()
.mapToLong(DagNodeLocation::getY)
.max()
.min()
.getAsLong();
// 随机出一个 x y 坐标
DagNodeLocation dagNodeLocation = new DagNodeLocation();
dagNodeLocation.setTaskCode(taskCode);
dagNodeLocation.setX(RandomUtil.randomLong(xMax, xMin));
dagNodeLocation.setY(RandomUtil.randomLong(yMax, yMin));
dagNodeLocation.setX(RandomUtil.randomLong(xMin == xMax ? 0 : xMin, xMax));
dagNodeLocation.setY(RandomUtil.randomLong(yMin == yMax ? 0 : yMin, yMax));
locations = process.getLocations();
locations.add(dagNodeLocation);
}
Expand All @@ -194,13 +216,27 @@ private void updateProcessDefinition(
locations.add(dagNodeLocation);
}

JSONArray taskArray = new JSONArray();
taskDefinitionList.removeIf(taskDefinition -> (task.getName()).equalsIgnoreCase(taskDefinition.getName()));

taskArray.addAll(taskDefinitionList);
taskArray.add(task);
String processTaskRelationListJson = JsonUtils.toJsonString(processTaskRelationList);

processClient.createOrUpdateProcessDefinition(
projectCode, process.getCode(), process.getName(), taskCode, array.toString(), locations, true);
projectCode,
process.getCode(),
process.getName(),
taskCode,
processTaskRelationListJson,
taskArray.toString(),
locations,
true);
log.info(
Status.DS_PROCESS_DEFINITION_UPDATE.getMessage(),
process.getName(),
taskCode,
array.toString(),
taskArray.toString(),
locations);
}

Expand Down Expand Up @@ -251,14 +287,13 @@ public boolean pushUpdateTask(
BeanUtil.copyProperties(dinkyTaskRequest, taskRequest);
taskRequest.setTimeoutFlag(dinkyTaskRequest.getTimeoutFlag());
taskRequest.setFlag(dinkyTaskRequest.getFlag());
taskRequest.setIsCache(dinkyTaskRequest.getIsCache());

String taskDefinitionJsonObj = JSONUtil.toJsonStr(taskRequest);
Long updatedTaskDefinition = taskClient.updateTaskDefinition(
projectCode, taskCode, dinkyTaskRequest.getUpstreamCodes(), taskDefinitionJsonObj);
JSONObject jsonObject = JSONUtil.parseObj(taskRequest);
JSONArray array = new JSONArray();
array.set(jsonObject);
updateProcessDefinition(process, taskCode, taskRequest, array, projectCode);

updateProcessDefinition(process, taskCode, taskRequest, projectCode);
if (updatedTaskDefinition != null && updatedTaskDefinition > 0) {
log.info(Status.MODIFY_SUCCESS.getMessage());
return true;
Expand Down Expand Up @@ -309,7 +344,7 @@ public TaskDefinition getTaskDefinitionInfo(long dinkyTaskId) {

String processName = getDinkyNames(catalogue, 0);
String taskName = catalogue.getName() + ":" + catalogue.getId();
TaskMainInfo taskMainInfo = taskClient.getTaskMainInfo(projectCode, processName, taskName, "DINKY");
TaskMainInfo taskMainInfo = taskClient.getTaskMainInfo(projectCode, processName, taskName, TASK_TYPE);
TaskDefinition taskDefinition = null;
if (taskMainInfo == null) {
log.error(Status.DS_WORK_FLOW_DEFINITION_NOT_EXIST.getMessage(), processName, taskName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,15 +367,15 @@ public JobResult restartTask(Integer id, String savePointPath) throws Exception
if (!Dialect.isCommonSql(task.getDialect()) && Asserts.isNotNull(task.getJobInstanceId())) {
String status = jobInstanceService.getById(task.getJobInstanceId()).getStatus();
if (!JobStatus.isDone(status)) {
cancelTaskJob(task);
cancelTaskJob(task, true);
}
}
return submitTask(
TaskSubmitDto.builder().id(id).savePointPath(savePointPath).build());
}

@Override
public boolean cancelTaskJob(TaskDTO task) {
public boolean cancelTaskJob(TaskDTO task, boolean withSavePoint) {
if (Dialect.isCommonSql(task.getDialect())) {
return true;
}
Expand All @@ -385,7 +385,7 @@ public boolean cancelTaskJob(TaskDTO task) {
Assert.notNull(clusterInstance, Status.CLUSTER_NOT_EXIST.getMessage());

JobManager jobManager = JobManager.build(buildJobConfig(task));
return jobManager.cancel(jobInstance.getJid());
return jobManager.cancel(jobInstance.getJid(), withSavePoint);
}

@Override
Expand Down
Loading

0 comments on commit 69586a0

Please sign in to comment.