Skip to content

Commit

Permalink
refactor-push-dolphinscheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
Zzm0809 committed Dec 16, 2023
1 parent e4a0efe commit bf8d976
Show file tree
Hide file tree
Showing 17 changed files with 1,150 additions and 692 deletions.
206 changes: 27 additions & 179 deletions dinky-admin/src/main/java/org/dinky/controller/SchedulerController.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,41 +20,22 @@
package org.dinky.controller;

import org.dinky.data.enums.Status;
import org.dinky.data.model.Catalogue;
import org.dinky.data.model.SystemConfiguration;
import org.dinky.data.result.Result;
import org.dinky.init.SystemInit;
import org.dinky.scheduler.client.ProcessClient;
import org.dinky.scheduler.client.TaskClient;
import org.dinky.scheduler.enums.ReleaseState;
import org.dinky.scheduler.exception.SchedulerException;
import org.dinky.scheduler.model.DagData;
import org.dinky.scheduler.model.DinkyTaskParams;
import org.dinky.scheduler.model.ProcessDefinition;
import org.dinky.scheduler.model.Project;
import org.dinky.scheduler.model.DinkyTaskRequest;
import org.dinky.scheduler.model.TaskDefinition;
import org.dinky.scheduler.model.TaskMainInfo;
import org.dinky.scheduler.model.TaskRequest;
import org.dinky.service.CatalogueService;
import org.dinky.service.SchedulerService;

import java.util.List;

import javax.validation.Valid;

import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.PutMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.google.common.base.Strings;

import cn.hutool.json.JSONArray;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import cn.hutool.core.collection.CollUtil;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiOperation;
Expand All @@ -67,13 +48,12 @@
@RequiredArgsConstructor
public class SchedulerController {

public static final String TASK_TYPE = "DINKY";
private final ProcessClient processClient;
private final TaskClient taskClient;
private final CatalogueService catalogueService;
private final SchedulerService schedulerService;

/** 获取任务定义 */
@GetMapping("/task")
/**
* get task definition
*/
@GetMapping("/queryTaskDefinition")
@ApiOperation("Get Task Definition")
@ApiImplicitParam(
name = "dinkyTaskId",
Expand All @@ -83,37 +63,17 @@ public class SchedulerController {
paramType = "query",
example = "1")
public Result<TaskDefinition> getTaskDefinition(@ApiParam(value = "dinky任务id") @RequestParam Long dinkyTaskId) {
Catalogue catalogue =
catalogueService.getOne(new LambdaQueryWrapper<Catalogue>().eq(Catalogue::getTaskId, dinkyTaskId));
if (catalogue == null) {
return Result.failed(Status.DS_GET_NODE_LIST_ERROR);
}

Project dinkyProject = SystemInit.getProject();
long projectCode = dinkyProject.getCode();

String processName = getDinkyNames(catalogue, 0);
String taskName = catalogue.getName() + ":" + catalogue.getId();
TaskMainInfo taskMainInfo = taskClient.getTaskMainInfo(projectCode, processName, taskName);
TaskDefinition taskDefinition = null;
if (taskMainInfo == null) {
return Result.succeed(taskDefinition);
}

taskDefinition = taskClient.getTaskDefinition(projectCode, taskMainInfo.getTaskCode());
if (taskDefinition == null) {
return Result.failed(Status.DS_WORK_FLOW_NOT_SAVE);
TaskDefinition taskDefinitionInfo = schedulerService.getTaskDefinitionInfo(dinkyTaskId);
if (taskDefinitionInfo == null) {
return Result.failed(Status.DS_TASK_NOT_EXIST);
}

taskDefinition.setProcessDefinitionCode(taskMainInfo.getProcessDefinitionCode());
taskDefinition.setProcessDefinitionName(taskMainInfo.getProcessDefinitionName());
taskDefinition.setProcessDefinitionVersion(taskMainInfo.getProcessDefinitionVersion());
taskDefinition.setUpstreamTaskMap(taskMainInfo.getUpstreamTaskMap());
return Result.succeed(taskDefinition);
return Result.succeed(taskDefinitionInfo);
}

/** 获取前置任务定义集合 */
@GetMapping("/upstream/tasks")
/**
* query upstream task
*/
@GetMapping("/queryUpstreamTasks")
@ApiOperation("Get Upstream Task Definition")
@ApiImplicitParam(
name = "dinkyTaskId",
Expand All @@ -123,134 +83,22 @@ public Result<TaskDefinition> getTaskDefinition(@ApiParam(value = "dinky任务id
paramType = "query",
example = "1")
public Result<List<TaskMainInfo>> getTaskMainInfos(@ApiParam(value = "dinky任务id") @RequestParam Long dinkyTaskId) {
Catalogue catalogue =
catalogueService.getOne(new LambdaQueryWrapper<Catalogue>().eq(Catalogue::getTaskId, dinkyTaskId));
if (catalogue == null) {
return Result.failed(Status.DS_GET_NODE_LIST_ERROR);
List<TaskMainInfo> taskMainInfos = schedulerService.getTaskMainInfos(dinkyTaskId);
if (CollUtil.isEmpty(taskMainInfos)) {
return Result.failed();
}

long projectCode = SystemInit.getProject().getCode();
String processName = getDinkyNames(catalogue, 0);
List<TaskMainInfo> taskMainInfos = taskClient.getTaskMainInfos(projectCode, processName, "");
// 去掉本身
taskMainInfos.removeIf(taskMainInfo ->
(catalogue.getName() + ":" + catalogue.getId()).equalsIgnoreCase(taskMainInfo.getTaskName()));

return Result.succeed(taskMainInfos);
}

/** 创建任务定义 */
@PostMapping("/task")
@ApiOperation("Create Task Definition")
public Result<String> createTaskDefinition(
@ApiParam(value = "前置任务编号 列表") @RequestParam(required = false) List<String> upstreamCodes,
@ApiParam(value = "dinky任务id") @RequestParam Long dinkyTaskId,
@Valid @RequestBody TaskRequest taskRequest) {
DinkyTaskParams dinkyTaskParams = new DinkyTaskParams();
dinkyTaskParams.setTaskId(dinkyTaskId.toString());
dinkyTaskParams.setAddress(
SystemConfiguration.getInstances().getDinkyAddr().getValue());
taskRequest.setTaskParams(JSONUtil.parseObj(dinkyTaskParams).toString());
taskRequest.setTaskType(TASK_TYPE);

Catalogue catalogue =
catalogueService.getOne(new LambdaQueryWrapper<Catalogue>().eq(Catalogue::getTaskId, dinkyTaskId));
if (catalogue == null) {
return Result.failed(Status.DS_GET_NODE_LIST_ERROR);
/**
* create or update
*/
@PostMapping("/createOrUpdateTaskDefinition")
@ApiOperation("Create or Update Task Definition")
public Result<String> createOrUpdateTaskDefinition(@RequestBody DinkyTaskRequest dinkyTaskRequest) {
if (schedulerService.pushAddTask(dinkyTaskRequest)) {
return Result.succeed(Status.DS_ADD_TASK_DEFINITION_SUCCESS);
}

String processName = getDinkyNames(catalogue, 0);
long projectCode = SystemInit.getProject().getCode();
ProcessDefinition process = processClient.getProcessDefinitionInfo(projectCode, processName);

String taskName = catalogue.getName() + ":" + catalogue.getId();
taskRequest.setName(taskName);
if (process == null) {
Long taskCode = taskClient.genTaskCode(projectCode);
taskRequest.setCode(taskCode);
JSONObject jsonObject = JSONUtil.parseObj(taskRequest);
JSONArray array = new JSONArray();
array.set(jsonObject);
processClient.createProcessDefinition(projectCode, processName, taskCode, array.toString());
return Result.succeed(Status.DS_ADD_WORK_FLOW_DEFINITION_SUCCESS);
}

if (process.getReleaseState() == ReleaseState.ONLINE) {
return Result.failed(Status.DS_WORK_FLOW_DEFINITION_ONLINE, (Object) processName);
}

TaskMainInfo taskDefinitionInfo = taskClient.getTaskMainInfo(projectCode, processName, taskName);
if (taskDefinitionInfo != null) {
return Result.failed(Status.DS_WORK_FLOW_DEFINITION_TASK_NAME_EXIST, processName, taskName);
}

Long taskCode = taskClient.genTaskCode(projectCode);
taskRequest.setCode(taskCode);

String taskDefinitionJsonObj = JSONUtil.toJsonStr(taskRequest);
taskClient.createTaskDefinition(projectCode, process.getCode(), upstreamCodes, taskDefinitionJsonObj);

return Result.succeed(Status.DS_ADD_TASK_DEFINITION_SUCCESS);
}

/** 更新任务定义 */
@PutMapping("/task")
@ApiOperation("Update Task Definition")
public Result<String> updateTaskDefinition(
@ApiParam(value = "项目编号") @RequestParam long projectCode,
@ApiParam(value = "工作流定义编号") @RequestParam long processCode,
@ApiParam(value = "任务定义编号") @RequestParam long taskCode,
@ApiParam(value = "前置任务编号 ") @RequestParam(required = false) List<String> upstreamCodes,
@Valid @RequestBody TaskRequest taskRequest) {

TaskDefinition taskDefinition = taskClient.getTaskDefinition(projectCode, taskCode);
if (taskDefinition == null) {
return Result.failed(Status.DS_TASK_NOT_EXIST);
}

if (!TASK_TYPE.equals(taskDefinition.getTaskType())) {
return Result.failed(Status.DS_TASK_TYPE_NOT_SUPPORT, (Object) taskDefinition.getTaskType());
}

DagData dagData = processClient.getProcessDefinitionInfo(projectCode, processCode);
if (dagData == null) {
return Result.failed(Status.DS_WORK_FLOW_DEFINITION_NOT_EXIST);
}

ProcessDefinition process = dagData.getProcessDefinition();
if (process == null) {
return Result.failed(Status.DS_WORK_FLOW_DEFINITION_NOT_EXIST);
}

if (process.getReleaseState() == ReleaseState.ONLINE) {
return Result.failed(Status.DS_WORK_FLOW_DEFINITION_ONLINE, (Object) process.getName());
}

taskRequest.setName(taskDefinition.getName());
taskRequest.setTaskParams(taskDefinition.getTaskParams());
taskRequest.setTaskType(TASK_TYPE);

String taskDefinitionJsonObj = JSONUtil.toJsonStr(taskRequest);
taskClient.updateTaskDefinition(projectCode, taskCode, upstreamCodes, taskDefinitionJsonObj);
return Result.succeed(Status.MODIFY_SUCCESS);
}

private String getDinkyNames(Catalogue catalogue, int i) {
if (i == 3 || catalogue.getParentId().equals(0)) {
return "";
}

catalogue = catalogueService.getById(catalogue.getParentId());
if (catalogue == null) {
throw new SchedulerException("Get Node List Error");
}

String name = i == 0 ? catalogue.getName() + ":" + catalogue.getId() : catalogue.getName();
String next = getDinkyNames(catalogue, ++i);

if (Strings.isNullOrEmpty(next)) {
return name;
}
return name + "_" + next;
}
}
64 changes: 64 additions & 0 deletions dinky-admin/src/main/java/org/dinky/service/SchedulerService.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.dinky.service;

import org.dinky.scheduler.model.DinkyTaskRequest;
import org.dinky.scheduler.model.TaskDefinition;
import org.dinky.scheduler.model.TaskMainInfo;

import java.util.List;

public interface SchedulerService {

/**
* Pushes the specified DinkyTaskRequest to the task queue.
*
* @param dinkyTaskRequest the DinkyTaskRequest to be added to the task queue
* @return true if the task was successfully added, false otherwise
*/
boolean pushAddTask(DinkyTaskRequest dinkyTaskRequest);

/**
* A description of the entire Java function.
*
* @param projectCode description of parameter
* @param processCode description of parameter
* @param taskCode description of parameter
* @param dinkyTaskRequest description of parameter
* @return description of return value
*/
boolean pushUpdateTask(long projectCode, long processCode, long taskCode, DinkyTaskRequest dinkyTaskRequest);

/**
* Retrieves a list of TaskMainInfo objects based on the provided dinkyTaskId.
*
* @param dinkyTaskId the ID of the task
* @return a list of TaskMainInfo objects
*/
List<TaskMainInfo> getTaskMainInfos(long dinkyTaskId);

/**
* Retrieves the task definition information for a given dinky task ID.
*
* @param dinkyTaskId the ID of the dinky task
* @return the task definition information
*/
TaskDefinition getTaskDefinitionInfo(long dinkyTaskId);
}
Loading

0 comments on commit bf8d976

Please sign in to comment.