From 1cdfd5f977e1e71ca65f805dd59ff4183e0d965d Mon Sep 17 00:00:00 2001 From: gaoyan Date: Fri, 3 Nov 2023 16:28:06 +0800 Subject: [PATCH] Bug fixs (#2487) * Optimize the process * fix cluster json bug * fix cluster cancel bug * add check on online * fix offline bug * fix job life bug * limitTheParametersPassedToATask * formate code * formate code * formate code * remove jar config * Optimized the automatic process build format * Added code comparison filter * fix dinky-app env bug * fix dinky-app env bug * formate coed * fix some code * fix k8s client --- .../java/org/dinky/aop/ProcessAspect.java | 6 +- .../dinky/context/ConsoleContextHolder.java | 2 +- .../org/dinky/controller/TaskController.java | 21 +-- .../main/java/org/dinky/data/dto/TaskDTO.java | 6 - .../java/org/dinky/data/dto/TaskSaveDTO.java | 151 ++++++++++++++++++ .../mapping/ClusterConfigurationMapping.java | 3 +- .../dinky/job/handler/JobRefreshHandler.java | 5 +- .../java/org/dinky/service/TaskService.java | 3 +- .../dinky/service/impl/TaskServiceImpl.java | 59 +++---- .../org/dinky/app/flinksql/Submitter.java | 2 +- .../org/dinky/data/enums/JobLifeCycle.java | 14 +- .../gateway/kubernetes/KubernetesGateway.java | 10 +- .../DataStudio/HeaderContainer/index.tsx | 16 +- .../DataStudio/HeaderContainer/service.tsx | 8 +- .../MiddleContainer/Editor/constants.tsx | 34 ++-- dinky-web/src/pages/DataStudio/model.ts | 1 - .../1.0.0-SNAPSHOT_schema/mysql/dinky_dml.sql | 3 +- 17 files changed, 255 insertions(+), 89 deletions(-) create mode 100644 dinky-admin/src/main/java/org/dinky/data/dto/TaskSaveDTO.java diff --git a/dinky-admin/src/main/java/org/dinky/aop/ProcessAspect.java b/dinky-admin/src/main/java/org/dinky/aop/ProcessAspect.java index 1b36bc5ced..677cabeb38 100644 --- a/dinky-admin/src/main/java/org/dinky/aop/ProcessAspect.java +++ b/dinky-admin/src/main/java/org/dinky/aop/ProcessAspect.java @@ -110,10 +110,8 @@ public Object processStepAround(ProceedingJoinPoint joinPoint, ProcessStep proce contextHolder.finishedStep(MDC.get(PROCESS_NAME), step, ProcessStatus.FAILED, e); throw e; } finally { - // If a parent step exists, it is restored after the execution is complete - if (parentStep != null) { - MDC.put(PROCESS_STEP, parentStep); - } + // restored after the execution is complete + MDC.put(PROCESS_STEP, parentStep); } return result; } diff --git a/dinky-admin/src/main/java/org/dinky/context/ConsoleContextHolder.java b/dinky-admin/src/main/java/org/dinky/context/ConsoleContextHolder.java index a64ae4b58c..f6295da2ce 100644 --- a/dinky-admin/src/main/java/org/dinky/context/ConsoleContextHolder.java +++ b/dinky-admin/src/main/java/org/dinky/context/ConsoleContextHolder.java @@ -195,7 +195,7 @@ public void finishedProcess(String processName, ProcessStatus status, Throwable if (e != null) { appendLog(processName, null, LogUtil.getError(e.getCause()), true); } - String filePath = String.format("%s/tmp/log/%s.json", System.getProperty("user.dir"), process.getTitle()); + String filePath = String.format("%s/tmp/log/%s.json", System.getProperty("user.dir"), processName); if (FileUtil.exist(filePath)) { Assert.isTrue(FileUtil.del(filePath)); } diff --git a/dinky-admin/src/main/java/org/dinky/controller/TaskController.java b/dinky-admin/src/main/java/org/dinky/controller/TaskController.java index 1457d39ab2..bc23c4fe79 100644 --- a/dinky-admin/src/main/java/org/dinky/controller/TaskController.java +++ b/dinky-admin/src/main/java/org/dinky/controller/TaskController.java @@ -25,11 +25,13 @@ import org.dinky.data.dto.DebugDTO; import org.dinky.data.dto.TaskDTO; import org.dinky.data.dto.TaskRollbackVersionDTO; +import org.dinky.data.dto.TaskSaveDTO; import org.dinky.data.enums.BusinessType; import org.dinky.data.enums.JobLifeCycle; import org.dinky.data.enums.ProcessType; import org.dinky.data.enums.Status; import org.dinky.data.exception.NotSupportExplainExcepition; +import org.dinky.data.exception.SqlExplainExcepition; import org.dinky.data.model.Task; import org.dinky.data.result.ProTableResult; import org.dinky.data.result.Result; @@ -134,11 +136,12 @@ public Result savepoint(@RequestParam Integer taskId, @RequestP Status.EXECUTE_SUCCESS); } - @GetMapping("/onLineTask") - @Log(title = "onLineTask", businessType = BusinessType.TRIGGER) - @ApiOperation("onLineTask") - public Result onLineTask(@RequestParam Integer taskId) { - if (taskService.changeTaskLifeRecyle(taskId, JobLifeCycle.ONLINE)) { + @GetMapping("/changeTaskLife") + @Log(title = "changeTaskLife", businessType = BusinessType.TRIGGER) + @ApiOperation("changeTaskLife") + public Result changeTaskLife(@RequestParam Integer taskId, @RequestParam Integer lifeCycle) + throws SqlExplainExcepition { + if (taskService.changeTaskLifeRecyle(taskId, JobLifeCycle.get(lifeCycle))) { return Result.succeed(Status.PUBLISH_SUCCESS); } else { return Result.failed(Status.PUBLISH_FAILED); @@ -165,11 +168,11 @@ public Result getJobPlan(@ProcessId @RequestBody TaskDTO taskDTO) { name = "task", value = "Task", required = true, - dataType = "Task", + dataType = "TaskSaveDTO", paramType = "body", - dataTypeClass = Task.class) - public Result saveOrUpdateTask(@RequestBody Task task) { - if (taskService.saveOrUpdateTask(task)) { + dataTypeClass = TaskSaveDTO.class) + public Result saveOrUpdateTask(@RequestBody TaskSaveDTO task) { + if (taskService.saveOrUpdateTask(task.toTaskEntity())) { return Result.succeed(Status.SAVE_SUCCESS); } else { return Result.failed(Status.SAVE_FAILED); diff --git a/dinky-admin/src/main/java/org/dinky/data/dto/TaskDTO.java b/dinky-admin/src/main/java/org/dinky/data/dto/TaskDTO.java index a346d66846..4e724cb71b 100644 --- a/dinky-admin/src/main/java/org/dinky/data/dto/TaskDTO.java +++ b/dinky-admin/src/main/java/org/dinky/data/dto/TaskDTO.java @@ -119,9 +119,6 @@ public class TaskDTO extends AbstractStatementDTO { notes = "The identifier of the database") private Integer databaseId; - @ApiModelProperty(value = "JAR ID", dataType = "Integer", example = "4", notes = "The identifier of the JAR") - private Integer jarId; - @ApiModelProperty( value = "Alert Group ID", dataType = "Integer", @@ -174,9 +171,6 @@ public class TaskDTO extends AbstractStatementDTO { @ApiModelProperty(value = "Path", dataType = "String", notes = "Path associated with the task") private String path; - @ApiModelProperty(value = "JAR Name", dataType = "String", notes = "Name of the associated JAR") - private String jarName; - @ApiModelProperty( value = "Cluster Configuration Name", dataType = "String", diff --git a/dinky-admin/src/main/java/org/dinky/data/dto/TaskSaveDTO.java b/dinky-admin/src/main/java/org/dinky/data/dto/TaskSaveDTO.java new file mode 100644 index 0000000000..83a3ba2b33 --- /dev/null +++ b/dinky-admin/src/main/java/org/dinky/data/dto/TaskSaveDTO.java @@ -0,0 +1,151 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.dinky.data.dto; + +import org.dinky.data.model.Task; +import org.dinky.data.model.TaskExtConfig; +import org.dinky.data.typehandler.JSONObjectHandler; +import org.dinky.mybatis.annotation.Save; + +import org.apache.ibatis.type.JdbcType; + +import javax.validation.constraints.NotNull; + +import com.baomidou.mybatisplus.annotation.IdType; +import com.baomidou.mybatisplus.annotation.TableField; +import com.baomidou.mybatisplus.annotation.TableId; + +import cn.hutool.core.bean.BeanUtil; +import io.swagger.annotations.ApiModelProperty; +import lombok.Data; + +@Data +public class TaskSaveDTO { + + /** 主键ID */ + @TableId(value = "id", type = IdType.AUTO) + @ApiModelProperty(value = "ID", required = true, dataType = "Integer", example = "1", notes = "Primary Key") + private Integer id; + + @NotNull( + message = "Name cannot be null", + groups = {Save.class}) + @ApiModelProperty(value = "Name", required = true, dataType = "String", example = "Name") + private String name; + + @NotNull( + message = "Enabled cannot be null", + groups = {Save.class}) + @ApiModelProperty(value = "Enabled", required = true, dataType = "Boolean", example = "true") + private Boolean enabled; + + @ApiModelProperty(value = "Dialect", dataType = "String", notes = "Dialect for the task") + private String dialect; + + @ApiModelProperty(value = "Type", dataType = "String", notes = "Type of the task") + private String type; + + @ApiModelProperty(value = "Check Point", dataType = "Integer", example = "1", notes = "Check point for the task") + private Integer checkPoint; + + @ApiModelProperty(value = "Save point strategy", dataType = "SavePointStrategy", notes = "Save point strategy") + private Integer savePointStrategy; + + @ApiModelProperty(value = "Save Point Path", dataType = "String", notes = "Save point path for the task") + private String savePointPath; + + @ApiModelProperty(value = "Parallelism", dataType = "Integer", example = "4", notes = "Parallelism for the task") + private Integer parallelism; + + @ApiModelProperty( + value = "Fragment", + dataType = "Boolean", + example = "true", + notes = "Fragment option for the task") + private Boolean fragment; + + @ApiModelProperty( + value = "Statement Set", + dataType = "Boolean", + example = "false", + notes = "Statement set option for the task") + private Boolean statementSet; + + @ApiModelProperty( + value = "Batch Model", + dataType = "Boolean", + example = "true", + notes = "Batch model option for the task") + private Boolean batchModel; + + @ApiModelProperty( + value = "ClusterInstance ID", + dataType = "Integer", + example = "2001", + notes = "ID of the cluster associated with the task") + private Integer clusterId; + + @ApiModelProperty( + value = "Cluster Configuration ID", + dataType = "Integer", + example = "3001", + notes = "ID of the cluster configuration associated with the task") + private Integer clusterConfigurationId; + + @ApiModelProperty( + value = "Database ID", + dataType = "Integer", + example = "4001", + notes = "ID of the database associated with the task") + private Integer databaseId; + + @ApiModelProperty( + value = "Environment ID", + dataType = "Integer", + example = "6001", + notes = "ID of the environment associated with the task") + private Integer envId; + + @ApiModelProperty( + value = "Alert Group ID", + dataType = "Integer", + example = "7001", + notes = "ID of the alert group associated with the task") + private Integer alertGroupId; + + @ApiModelProperty( + value = "Configuration JSON", + dataType = "TaskExtConfig", + notes = "Extended configuration in JSON format for the task") + @TableField(typeHandler = JSONObjectHandler.class, jdbcType = JdbcType.VARCHAR) + private TaskExtConfig configJson; + + @ApiModelProperty(value = "Note", dataType = "String", notes = "Additional notes for the task") + private String note; + + @ApiModelProperty(value = "Statement", dataType = "String", notes = "SQL statement for the task") + private String statement; + + public Task toTaskEntity() { + Task task = new Task(); + BeanUtil.copyProperties(this, task); + return task; + } +} diff --git a/dinky-admin/src/main/java/org/dinky/data/model/mapping/ClusterConfigurationMapping.java b/dinky-admin/src/main/java/org/dinky/data/model/mapping/ClusterConfigurationMapping.java index cafc218a3d..b8a09c9e30 100644 --- a/dinky-admin/src/main/java/org/dinky/data/model/mapping/ClusterConfigurationMapping.java +++ b/dinky-admin/src/main/java/org/dinky/data/model/mapping/ClusterConfigurationMapping.java @@ -20,6 +20,7 @@ package org.dinky.data.model.mapping; import org.dinky.data.model.ClusterConfiguration; +import org.dinky.gateway.model.FlinkClusterConfig; import java.time.LocalDateTime; @@ -75,7 +76,7 @@ public class ClusterConfigurationMapping { dataType = "String", example = "test", notes = "cluster config json") - private String configJson; + private FlinkClusterConfig configJson; @ApiModelProperty( value = "isAvailable", diff --git a/dinky-admin/src/main/java/org/dinky/job/handler/JobRefreshHandler.java b/dinky-admin/src/main/java/org/dinky/job/handler/JobRefreshHandler.java index 3bfe185241..38051af6c5 100644 --- a/dinky-admin/src/main/java/org/dinky/job/handler/JobRefreshHandler.java +++ b/dinky-admin/src/main/java/org/dinky/job/handler/JobRefreshHandler.java @@ -55,7 +55,6 @@ import com.alibaba.fastjson2.JSON; import com.fasterxml.jackson.databind.JsonNode; -import cn.hutool.json.JSONObject; import cn.hutool.json.JSONUtil; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -249,8 +248,8 @@ private static void handleJobDone(JobInfoDetail jobInfoDetail) { if (GatewayType.isDeployCluster(clusterType)) { JobConfig jobConfig = new JobConfig(); - String configJson = jobDataDto.getClusterConfiguration().getConfigJson(); - jobConfig.buildGatewayConfig(new JSONObject(configJson).toBean(FlinkClusterConfig.class)); + FlinkClusterConfig configJson = jobDataDto.getClusterConfiguration().getConfigJson(); + jobConfig.buildGatewayConfig(configJson); jobConfig.getGatewayConfig().setType(GatewayType.get(clusterType)); jobConfig.getGatewayConfig().getFlinkConfig().setJobName(jobInstance.getName()); Gateway.build(jobConfig.getGatewayConfig()).onJobFinishCallback(jobInstance.getStatus()); diff --git a/dinky-admin/src/main/java/org/dinky/service/TaskService.java b/dinky-admin/src/main/java/org/dinky/service/TaskService.java index 7701d59004..d4aaab016b 100644 --- a/dinky-admin/src/main/java/org/dinky/service/TaskService.java +++ b/dinky-admin/src/main/java/org/dinky/service/TaskService.java @@ -26,6 +26,7 @@ import org.dinky.data.enums.JobLifeCycle; import org.dinky.data.exception.ExcuteException; import org.dinky.data.exception.NotSupportExplainExcepition; +import org.dinky.data.exception.SqlExplainExcepition; import org.dinky.data.model.JobModelOverview; import org.dinky.data.model.JobTypeOverView; import org.dinky.data.model.Task; @@ -162,7 +163,7 @@ public interface TaskService extends ISuperService { * @param lifeCycle The new life cycle of the task. * @return true if the life cycle is successfully changed, false otherwise. */ - boolean changeTaskLifeRecyle(Integer taskId, JobLifeCycle lifeCycle); + boolean changeTaskLifeRecyle(Integer taskId, JobLifeCycle lifeCycle) throws SqlExplainExcepition; /** * Save or update the given task. diff --git a/dinky-admin/src/main/java/org/dinky/service/impl/TaskServiceImpl.java b/dinky-admin/src/main/java/org/dinky/service/impl/TaskServiceImpl.java index b85e129aee..26b93a2537 100644 --- a/dinky-admin/src/main/java/org/dinky/service/impl/TaskServiceImpl.java +++ b/dinky-admin/src/main/java/org/dinky/service/impl/TaskServiceImpl.java @@ -167,7 +167,7 @@ private String[] buildParams(int id) { } @ProcessStep(type = ProcessStepType.SUBMIT_PRECHECK) - public void preCheckTask(TaskDTO task) throws TaskNotDoneException, SqlExplainExcepition { + public void preCheckTask(TaskDTO task) throws TaskNotDoneException { log.info("Start check and config task, task:{}", task.getName()); Assert.notNull(task, Status.TASK_NOT_EXIST.getMessage()); @@ -180,21 +180,6 @@ public void preCheckTask(TaskDTO task) throws TaskNotDoneException, SqlExplainEx throw new BusException(Status.TASK_STATUS_IS_NOT_DONE.getMessage()); } } - - log.info("Start explain Sql,task: {},Dialect:{}", task.getName(), task.getDialect()); - - List sqlExplainResults = explainTask(task); - for (SqlExplainResult sqlExplainResult : sqlExplainResults) { - if (!sqlExplainResult.isParseTrue() || !sqlExplainResult.isExplainTrue()) { - throw new SqlExplainExcepition(StrFormatter.format( - "task [{}] sql explain failed, sql [{}], error: [{}]", - task.getName(), - sqlExplainResult.getSql(), - sqlExplainResult.getError())); - } - } - - log.info("Explain Sql finish"); } @ProcessStep(type = ProcessStepType.SUBMIT_EXECUTE) @@ -221,11 +206,17 @@ public JobConfig buildJobConfig(TaskDTO task) { flinkClusterCfg.getAppConfig().setUserJarParas(buildParams(config.getTaskId())); flinkClusterCfg.getAppConfig().setUserJarMainAppClass(CommonConstant.DINKY_APP_MAIN_CLASS); config.buildGatewayConfig(flinkClusterCfg); + Optional.ofNullable(task.getJobInstanceId()).ifPresent(i -> { + JobInstance jobInstance = jobInstanceService.getById(i); + config.setClusterId(jobInstance.getClusterId()); + }); } else { - log.info("Init remote cluster"); - String address = clusterInstanceService.buildEnvironmentAddress(config.isUseRemote(), task.getClusterId()); - config.setAddress(address); + Optional.ofNullable(task.getClusterId()).ifPresent(config::setClusterId); } + log.info("Init remote cluster"); + Optional.ofNullable(config.getClusterId()).ifPresent(i -> { + config.setAddress(clusterInstanceService.buildEnvironmentAddress(config.isUseRemote(), i)); + }); return config; } @@ -263,6 +254,10 @@ public JobResult submitTask(Integer id, String savePointPath) throws Exception { } // 注解自调用会失效,这里通过获取对象方法绕过此限制 TaskServiceImpl taskServiceBean = applicationContext.getBean(TaskServiceImpl.class); + taskServiceBean.preCheckTask(taskDTO); + // The job instance does not exist by default, + // so that it does not affect other operations, such as checking the jobmanager address + taskDTO.setJobInstanceId(null); JobResult jobResult = taskServiceBean.executeJob(taskDTO); log.info("Job Submit success"); Task task = new Task(id, jobResult.getJobInstanceId()); @@ -427,19 +422,29 @@ public void initTenantByTaskId(Integer id) { } @Override - public boolean changeTaskLifeRecyle(Integer taskId, JobLifeCycle lifeCycle) { - TaskDTO taskInfoById = getTaskInfoById(taskId); - taskInfoById.setStep(lifeCycle.getValue()); - if (lifeCycle == JobLifeCycle.ONLINE) { - taskVersionService.createTaskVersionSnapshot(taskInfoById); - } - return saveOrUpdate(taskInfoById.buildTask()); + public boolean changeTaskLifeRecyle(Integer taskId, JobLifeCycle lifeCycle) throws SqlExplainExcepition { + TaskDTO task = getTaskInfoById(taskId); + task.setStep(lifeCycle.getValue()); + if (lifeCycle == JobLifeCycle.PUBLISH) { + // List sqlExplainResults = explainTask(task); + // for (SqlExplainResult sqlExplainResult : sqlExplainResults) { + // if (!sqlExplainResult.isParseTrue() || !sqlExplainResult.isExplainTrue()) { + // throw new SqlExplainExcepition(StrFormatter.format( + // "task [{}] sql explain failed, sql [{}], error: [{}]", + // task.getName(), + // sqlExplainResult.getSql(), + // sqlExplainResult.getError())); + // } + // } + taskVersionService.createTaskVersionSnapshot(task); + } + return saveOrUpdate(task.buildTask()); } @Override public boolean saveOrUpdateTask(Task task) { - if (JobLifeCycle.ONLINE.equalsValue(task.getStep())) { + if (JobLifeCycle.PUBLISH.equalsValue(task.getStep())) { throw new BusException(Status.TASK_IS_ONLINE.getMessage()); } diff --git a/dinky-app/dinky-app-base/src/main/java/org/dinky/app/flinksql/Submitter.java b/dinky-app/dinky-app-base/src/main/java/org/dinky/app/flinksql/Submitter.java index 1a2c0b1864..63bb35abe9 100644 --- a/dinky-app/dinky-app-base/src/main/java/org/dinky/app/flinksql/Submitter.java +++ b/dinky-app/dinky-app-base/src/main/java/org/dinky/app/flinksql/Submitter.java @@ -103,7 +103,7 @@ public static void submit(AppParamConfig config) throws SQLException { public static String buildSql(AppTask appTask) throws SQLException { StringBuilder sb = new StringBuilder(); // build env task - if (Asserts.isNotNull(appTask.getEnvId())) { + if (Asserts.isNotNull(appTask.getEnvId()) && appTask.getEnvId() > 0) { AppTask envTask = DBUtil.getTask(appTask.getEnvId()); if (Asserts.isNotNullString(envTask.getStatement())) { log.info("use statement is enable, load env:{}", envTask.getName()); diff --git a/dinky-common/src/main/java/org/dinky/data/enums/JobLifeCycle.java b/dinky-common/src/main/java/org/dinky/data/enums/JobLifeCycle.java index 73cb91d520..048a796545 100644 --- a/dinky-common/src/main/java/org/dinky/data/enums/JobLifeCycle.java +++ b/dinky-common/src/main/java/org/dinky/data/enums/JobLifeCycle.java @@ -27,26 +27,20 @@ * @since 2022/2/1 16:37 */ public enum JobLifeCycle { - UNKNOWN(0, "未知"), - DEVELOP(1, "开发"), - ONLINE(2, "上线"); + UNKNOWN(0), + DEVELOP(1), + PUBLISH(2); private Integer value; - private String label; - JobLifeCycle(Integer value, String label) { + JobLifeCycle(Integer value) { this.value = value; - this.label = label; } public Integer getValue() { return value; } - public String getLabel() { - return label; - } - public static JobLifeCycle get(Integer value) { return Arrays.stream(values()) .filter(item -> item.getValue().equals(value)) diff --git a/dinky-gateway/src/main/java/org/dinky/gateway/kubernetes/KubernetesGateway.java b/dinky-gateway/src/main/java/org/dinky/gateway/kubernetes/KubernetesGateway.java index 7f37a48510..a5295fab58 100644 --- a/dinky-gateway/src/main/java/org/dinky/gateway/kubernetes/KubernetesGateway.java +++ b/dinky-gateway/src/main/java/org/dinky/gateway/kubernetes/KubernetesGateway.java @@ -39,8 +39,10 @@ import org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient; import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient; import org.apache.flink.kubernetes.kubeclient.FlinkKubeClientFactory; +import org.apache.http.util.TextUtils; import java.lang.reflect.Method; +import java.nio.charset.StandardCharsets; import java.util.Collections; import cn.hutool.core.io.FileUtil; @@ -111,7 +113,13 @@ private void preparPodTemplate(String podTemplate, ConfigOption option) private void initKubeClient() { client = FlinkKubeClientFactory.getInstance().fromConfiguration(configuration, "client"); - kubernetesClient = new DefaultKubernetesClient(); + String kubeFile = configuration.getString(KubernetesConfigOptions.KUBE_CONFIG_FILE); + if (TextUtils.isEmpty(kubeFile)) { + kubernetesClient = new DefaultKubernetesClient(); + } else { + String kubeStr = FileUtil.readString(kubeFile, StandardCharsets.UTF_8); + kubernetesClient = DefaultKubernetesClient.fromConfig(kubeStr); + } } public SavePointResult savepointCluster(String savePoint) { diff --git a/dinky-web/src/pages/DataStudio/HeaderContainer/index.tsx b/dinky-web/src/pages/DataStudio/HeaderContainer/index.tsx index 3ce66ca5db..0756abb6d9 100644 --- a/dinky-web/src/pages/DataStudio/HeaderContainer/index.tsx +++ b/dinky-web/src/pages/DataStudio/HeaderContainer/index.tsx @@ -30,10 +30,10 @@ import { } from '@/pages/DataStudio/HeaderContainer/function'; import { cancelTask, + changeTaskLife, debugTask, executeSql, - getJobPlan, - onLineTask + getJobPlan } from '@/pages/DataStudio/HeaderContainer/service'; import { StateType, TabsPageSubType, TabsPageType, VIEW } from '@/pages/DataStudio/model'; import { JOB_LIFE_CYCLE, JOB_STATUS } from '@/pages/DevOps/constants'; @@ -182,12 +182,20 @@ const HeaderContainer = (props: any) => { const handleChangeJobLife = async () => { if (!currentData) return; if (isOnline(currentData)) { - await cancelTask(l('global.table.lifecycle.offline'), currentData.id); + await changeTaskLife( + l('global.table.lifecycle.offline'), + currentData.id, + JOB_LIFE_CYCLE.DEVELOP + ); currentData.step = JOB_LIFE_CYCLE.DEVELOP; } else { const saved = await handleSave(); if (saved) { - await onLineTask(l('global.table.lifecycle.publishing'), currentData.id); + await changeTaskLife( + l('global.table.lifecycle.publishing'), + currentData.id, + JOB_LIFE_CYCLE.PUBLISH + ); currentData.step = JOB_LIFE_CYCLE.PUBLISH; } } diff --git a/dinky-web/src/pages/DataStudio/HeaderContainer/service.tsx b/dinky-web/src/pages/DataStudio/HeaderContainer/service.tsx index 9e33f994be..bee5d82c0b 100644 --- a/dinky-web/src/pages/DataStudio/HeaderContainer/service.tsx +++ b/dinky-web/src/pages/DataStudio/HeaderContainer/service.tsx @@ -41,12 +41,8 @@ export function cancelTask(title: string, id: number) { return handleGetOption('api/task/cancel', title, { id }); } -export function onLineTask(title = '', id: number) { - return handleGetOption('api/task/onLineTask', title, { taskId: id }); -} - -export function offLinelTask(id: number) { - return handleGetOption('api/task/cancel', '', { taskId: id }); +export function changeTaskLife(title = '', id: number, life: number) { + return handleGetOption('api/task/changeTaskLife', title, { taskId: id, lifeCycle: life }); } export const isSql = (dialect: string) => { diff --git a/dinky-web/src/pages/DataStudio/MiddleContainer/Editor/constants.tsx b/dinky-web/src/pages/DataStudio/MiddleContainer/Editor/constants.tsx index 5b0c43b4f4..3043d3a2ed 100644 --- a/dinky-web/src/pages/DataStudio/MiddleContainer/Editor/constants.tsx +++ b/dinky-web/src/pages/DataStudio/MiddleContainer/Editor/constants.tsx @@ -1,19 +1,19 @@ /* * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. * */ @@ -49,5 +49,13 @@ export const TASK_VAR_FILTER = [ 'useAutoCancel', 'status', 'step', - 'jobConfig' + 'jobConfig', + 'note', + 'step', + 'versionId', + 'clusterName', + 'clusterConfigurationName', + 'databaseName', + 'envName', + 'alertGroupName', ]; diff --git a/dinky-web/src/pages/DataStudio/model.ts b/dinky-web/src/pages/DataStudio/model.ts index 38b2f565a6..dc88f8e035 100644 --- a/dinky-web/src/pages/DataStudio/model.ts +++ b/dinky-web/src/pages/DataStudio/model.ts @@ -90,7 +90,6 @@ export type TaskType = { clusterConfigurationName?: string; databaseId?: number; databaseName?: string; - jarId?: number; envId?: number; jobInstanceId?: number; note?: string; diff --git a/script/sql/upgrade/1.0.0-SNAPSHOT_schema/mysql/dinky_dml.sql b/script/sql/upgrade/1.0.0-SNAPSHOT_schema/mysql/dinky_dml.sql index ee44ad9158..a1f76f286c 100644 --- a/script/sql/upgrade/1.0.0-SNAPSHOT_schema/mysql/dinky_dml.sql +++ b/script/sql/upgrade/1.0.0-SNAPSHOT_schema/mysql/dinky_dml.sql @@ -278,7 +278,8 @@ LEFT JOIN -- 删除dinky_job_history 的 jar_json 字段 alter table dinky_job_history drop column jar_json; alter table dinky_task drop column jar_id; - +UPDATE dinky_task_version SET task_configure=JSON_REMOVE(task_configure, '$.jarId'); +UPDATE dinky_history SET config_json=JSON_REMOVE(config_json, '$.jarId'); insert into `dinky_flink_document` values (218, 'Reference', '建表语句', 'Streaming', 'EXECUTE CDCSOURCE print', 'Whole library synchronization print', 'EXECUTE CDCSOURCE demo_print WITH ( ''connector'' = ''mysql-cdc'',