Skip to content

Commit

Permalink
Support push ds modify process (#2668)
Browse files Browse the repository at this point in the history
* Spotless Apply

* support-push-ds-modify-process

---------

Co-authored-by: Zzm0809 <[email protected]>
  • Loading branch information
Zzm0809 and Zzm0809 authored Dec 17, 2023
1 parent aaa43e1 commit 2dd4143
Show file tree
Hide file tree
Showing 11 changed files with 152 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.dinky.scheduler.enums.ReleaseState;
import org.dinky.scheduler.exception.SchedulerException;
import org.dinky.scheduler.model.DagData;
import org.dinky.scheduler.model.DagNodeLocation;
import org.dinky.scheduler.model.DinkyTaskParams;
import org.dinky.scheduler.model.DinkyTaskRequest;
import org.dinky.scheduler.model.ProcessDefinition;
Expand All @@ -39,6 +40,8 @@
import org.dinky.service.CatalogueService;
import org.dinky.service.SchedulerService;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import org.springframework.stereotype.Service;
Expand All @@ -47,6 +50,8 @@
import com.google.common.base.Strings;

import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.RandomUtil;
import cn.hutool.json.JSONArray;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
Expand Down Expand Up @@ -94,18 +99,25 @@ public boolean pushAddTask(DinkyTaskRequest dinkyTaskRequest) {
dinkyTaskRequest.setName(taskName);

TaskRequest taskRequest = new TaskRequest();
JSONArray array = new JSONArray();
Long taskCode = taskClient.genTaskCode(projectCode);

if (process == null) {
Long taskCode = taskClient.genTaskCode(projectCode);
dinkyTaskRequest.setCode(taskCode);
BeanUtil.copyProperties(dinkyTaskRequest, taskRequest);
taskRequest.setTimeoutFlag(dinkyTaskRequest.getTimeoutFlag());
taskRequest.setFlag(dinkyTaskRequest.getFlag());
JSONObject jsonObject = JSONUtil.parseObj(taskRequest);
JSONArray array = new JSONArray();
array.set(jsonObject);
processClient.createProcessDefinition(projectCode, processName, taskCode, array.toString());
log.info(Status.DS_ADD_WORK_FLOW_DEFINITION_SUCCESS.getMessage());
// 随机出一个 x y 坐标
DagNodeLocation dagNodeLocation = new DagNodeLocation();
dagNodeLocation.setTaskCode(taskCode);
dagNodeLocation.setX(RandomUtil.randomLong(200, 500));
dagNodeLocation.setY(RandomUtil.randomLong(100, 400));
log.info("DagNodeLocation Info: {}", dagNodeLocation);
processClient.createOrUpdateProcessDefinition(
projectCode, null, processName, taskCode, array.toString(), Arrays.asList(dagNodeLocation), false);
}

if (process != null && process.getReleaseState() == ReleaseState.ONLINE) {
Expand All @@ -120,7 +132,6 @@ public boolean pushAddTask(DinkyTaskRequest dinkyTaskRequest) {
projectCode, taskMainInfo.getProcessDefinitionCode(), taskMainInfo.getTaskCode(), dinkyTaskRequest);
}

Long taskCode = taskClient.genTaskCode(projectCode);
dinkyTaskRequest.setCode(taskCode);
BeanUtil.copyProperties(dinkyTaskRequest, taskRequest);
taskRequest.setTimeoutFlag(dinkyTaskRequest.getTimeoutFlag());
Expand All @@ -129,12 +140,70 @@ public boolean pushAddTask(DinkyTaskRequest dinkyTaskRequest) {
if (process != null) {
taskClient.createTaskDefinition(
projectCode, process.getCode(), dinkyTaskRequest.getUpstreamCodes(), taskDefinitionJsonObj);
// 更新 process 的 location 信息
updateProcessDefinition(process, taskCode, taskRequest, array, projectCode);

log.info(Status.DS_ADD_TASK_DEFINITION_SUCCESS.getMessage());
return true;
}
return false;
}

private void updateProcessDefinition(
ProcessDefinition process, Long taskCode, TaskRequest taskRequest, JSONArray array, long projectCode) {
JSONObject jsonObject = JSONUtil.parseObj(taskRequest);
array.set(jsonObject);

List<DagNodeLocation> locations = new ArrayList<>();

if (CollUtil.isNotEmpty(process.getLocations())) {
boolean matched = process.getLocations().stream().anyMatch(location -> location.getTaskCode() == taskCode);
// if not matched, add a new location
if (!matched) {
// 获取最大的 x y 坐标
long xMax = process.getLocations().stream()
.mapToLong(DagNodeLocation::getX)
.max()
.getAsLong();
long xMin = process.getLocations().stream()
.mapToLong(DagNodeLocation::getX)
.min()
.getAsLong();
long yMax = process.getLocations().stream()
.mapToLong(DagNodeLocation::getY)
.max()
.getAsLong();
long yMin = process.getLocations().stream()
.mapToLong(DagNodeLocation::getY)
.max()
.getAsLong();
// 随机出一个 x y 坐标
DagNodeLocation dagNodeLocation = new DagNodeLocation();
dagNodeLocation.setTaskCode(taskCode);
dagNodeLocation.setX(RandomUtil.randomLong(xMax, xMin));
dagNodeLocation.setY(RandomUtil.randomLong(yMax, yMin));
locations = process.getLocations();
locations.add(dagNodeLocation);
}
} else {
// 随机出一个 x y 坐标
DagNodeLocation dagNodeLocation = new DagNodeLocation();
dagNodeLocation.setTaskCode(taskCode);
dagNodeLocation.setX(RandomUtil.randomLong(200, 500));
dagNodeLocation.setY(RandomUtil.randomLong(100, 400));
locations.add(dagNodeLocation);
}

processClient.createOrUpdateProcessDefinition(
projectCode, process.getCode(), process.getName(), taskCode, array.toString(), locations, true);
log.info(
Status.DS_PROCESS_DEFINITION_UPDATE.getMessage(),
process.getName(),
taskCode,
array.toString(),
locations);
}

/**
* Pushes an update task to the API.
*
Expand Down Expand Up @@ -186,6 +255,10 @@ public boolean pushUpdateTask(
String taskDefinitionJsonObj = JSONUtil.toJsonStr(taskRequest);
Long updatedTaskDefinition = taskClient.updateTaskDefinition(
projectCode, taskCode, dinkyTaskRequest.getUpstreamCodes(), taskDefinitionJsonObj);
JSONObject jsonObject = JSONUtil.parseObj(taskRequest);
JSONArray array = new JSONArray();
array.set(jsonObject);
updateProcessDefinition(process, taskCode, taskRequest, array, projectCode);
if (updatedTaskDefinition != null && updatedTaskDefinition > 0) {
log.info(Status.MODIFY_SUCCESS.getMessage());
return true;
Expand Down Expand Up @@ -239,8 +312,8 @@ public TaskDefinition getTaskDefinitionInfo(long dinkyTaskId) {
TaskMainInfo taskMainInfo = taskClient.getTaskMainInfo(projectCode, processName, taskName, "DINKY");
TaskDefinition taskDefinition = null;
if (taskMainInfo == null) {
log.error(Status.DS_WORK_FLOW_DEFINITION_TASK_NAME_EXIST.getMessage(), processName, taskName);
throw new BusException(Status.DS_WORK_FLOW_DEFINITION_TASK_NAME_EXIST, processName, taskName);
log.error(Status.DS_WORK_FLOW_DEFINITION_NOT_EXIST.getMessage(), processName, taskName);
throw new BusException(Status.DS_WORK_FLOW_DEFINITION_NOT_EXIST, processName, taskName);
}

taskDefinition = taskClient.getTaskDefinition(projectCode, taskMainInfo.getTaskCode());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ public enum Status {
DS_TASK_NOT_EXIST(17007, "ds.task.not.exist"),
DS_TASK_TYPE_NOT_SUPPORT(17008, "ds.task.type.not.support"),
DS_WORK_FLOW_DEFINITION_NOT_EXIST(17009, "ds.work.flow.definition.not.exist"),
DS_PROCESS_DEFINITION_UPDATE(17010, "ds.work.flow.definition.process.update"),

/**
* LDAP About *
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ test.msg.title=Real Time alarm mertics
user.name.passwd.error=UserName Or Password Not Correct
no.prefix=The token was not submitted according to the specified prefix
query.success=Query Successfully
ds.work.flow.definition.not.exist=Workflow Definition Not Exist
ds.work.flow.definition.not.exist=Workflow Definition Not Exist, You Can Add Workflow Definition
ds.work.flow.definition.process.update=Workflow Definition [{}] Update, TaskCode: [{}], Parameter 1: [{}], Parameter 2: [{}]
tenant.name.exist=Tenant Already Exists
failed=Failed
added.failed=Added Failed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ test.msg.title=实时告警监控
user.name.passwd.error=用户名或密码不正确
no.prefix=未按照指定前缀提交 token
query.success=查询成功
ds.work.flow.definition.not.exist=工作流定义不存在
ds.work.flow.definition.not.exist=工作流定义不存在,你可以添加工作流定义
ds.work.flow.definition.process.update=工作流定义 [{}] 进行更新,TaskCode: [{}],参数 1: [{}],参数 2: [{}]
tenant.name.exist=租户已存在
failed=获取失败
added.failed=新增失败
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.dinky.data.model.SystemConfiguration;
import org.dinky.scheduler.constant.Constants;
import org.dinky.scheduler.model.DagData;
import org.dinky.scheduler.model.DagNodeLocation;
import org.dinky.scheduler.model.ProcessDefinition;
import org.dinky.scheduler.result.PageInfo;
import org.dinky.scheduler.result.Result;
Expand All @@ -43,9 +44,12 @@
import cn.hutool.core.lang.TypeReference;
import cn.hutool.core.util.StrUtil;
import cn.hutool.http.HttpRequest;
import cn.hutool.http.HttpResponse;
import cn.hutool.json.JSONObject;

/** 工作流定义 */
/**
* 工作流定义
*/
@Component
public class ProcessClient {

Expand Down Expand Up @@ -133,14 +137,20 @@ public DagData getProcessDefinitionInfo(Long projectCode, Long processCode) {
/**
* Create a new process definition.
*
* @param projectCode The ID of the project to create the process definition for.
* @param processName The name of the process definition to create.
* @param taskCode The ID of the task to associate with the process definition.
* @param projectCode The ID of the project to create the process definition for.
* @param processName The name of the process definition to create.
* @param taskCode The ID of the task to associate with the process definition.
* @param taskDefinitionJson A JSON string representing the task definition to associate with the process definition.
* @return A {@link ProcessDefinition} object representing the newly created process definition.
*/
public ProcessDefinition createProcessDefinition(
Long projectCode, String processName, Long taskCode, String taskDefinitionJson) {
public ProcessDefinition createOrUpdateProcessDefinition(
Long projectCode,
Long processCode,
String processName,
Long taskCode,
String taskDefinitionJson,
List<DagNodeLocation> locations,
boolean isModify) {
String format = StrUtil.format(
SystemConfiguration.getInstances().getDolphinschedulerUrl().getValue()
+ "/projects/{projectCode}/process-definition",
Expand All @@ -150,21 +160,27 @@ public ProcessDefinition createProcessDefinition(
params.put("name", processName);
params.put("description", "系统添加");
params.put("tenantCode", "default");
params.put("locations", locations);
params.put("taskRelationJson", ReadFileUtil.taskRelation(Collections.singletonMap("code", taskCode)));
params.put("taskDefinitionJson", taskDefinitionJson);
params.put("executionType", "PARALLEL");

String content = HttpRequest.post(format)
HttpRequest httpRequest;
if (!isModify) {
httpRequest = HttpRequest.post(format);
} else {
httpRequest = HttpRequest.put(format + "/" + processCode);
}
HttpResponse httpResponse = httpRequest
.header(
Constants.TOKEN,
SystemConfiguration.getInstances()
.getDolphinschedulerToken()
.getValue())
.form(params)
.timeout(5000)
.execute()
.body();

.execute();
String content = httpResponse.body();
return MyJSONUtil.verifyResult(MyJSONUtil.toBean(content, new TypeReference<Result<ProcessDefinition>>() {}));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.dinky.scheduler.model;

import java.io.Serializable;

import lombok.Data;

@Data
public class DagNodeLocation implements Serializable {

private long taskCode;
private long x;
private long y;
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.dinky.scheduler.enums.ProcessExecutionTypeEnum;
import org.dinky.scheduler.enums.ReleaseState;

import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -87,7 +88,7 @@ public class ProcessDefinition {
private String projectName;

@ApiModelProperty(value = "位置")
private String locations;
private List<DagNodeLocation> locations = new ArrayList<>();

@ApiModelProperty(value = "计划发布状态 online/offline")
private ReleaseState scheduleReleaseState;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,10 @@ const Explain: React.FC<ExplainProps> = (props: any) => {
};
setResult(<Text>{l('pages.datastudio.explain.validate')}</Text>);
setExplainData([]);
const result = explainSql(l('pages.datastudio.editor.checking', '', { jobName: current?.name }),param);
const result = explainSql(
l('pages.datastudio.editor.checking', '', { jobName: current?.name }),
param
);
result.then((res) => {
const errorExplainData: [] = [];
let errorCount: number = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ export const PushDolphin: React.FC<PushDolphinProps> = (props) => {
false
) as DolphinTaskDefinition;
onSubmit(transformPushDolphinParamsValue);
console.log('transformPushDolphinParamsValue', transformPushDolphinParamsValue);
handleCancel();
};

const renderFooter = () => {
Expand Down
3 changes: 2 additions & 1 deletion dinky-web/src/pages/DataStudio/HeaderContainer/index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,8 @@ const HeaderContainer = (props: connect) => {
// 推送海豚, 此处需要将系统设置中的 ds 的配置拿出来做判断 启用才展示
icon: <PushpinIcon loading={pushDolphinState.buttonLoading} className={'blue-icon'} />,
title: l('button.push'),
hotKey: (e: KeyboardEvent) => e.ctrlKey && e.key === 's',
hotKey: (e: KeyboardEvent) => e.ctrlKey && e.key === 'e',
hotKeyDesc: 'Ctrl+E',
isShow: enabledDs && isCanPushDolphin(currentData),
click: () => handlePushDolphinOpen()
},
Expand Down
2 changes: 1 addition & 1 deletion dinky-web/src/pages/DataStudio/HeaderContainer/service.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*
*/

import {handleGetOption, handleOption} from '@/services/BusinessCrud';
import { handleGetOption, handleOption } from '@/services/BusinessCrud';
import { DIALECT } from '@/services/constants';

export async function explainSql(title: string, params: any) {
Expand Down

0 comments on commit 2dd4143

Please sign in to comment.