From c2dc2b098b20bbc05d140533ee720e5e3c691c4c Mon Sep 17 00:00:00 2001 From: gaoyan Date: Wed, 20 Sep 2023 15:31:58 +0800 Subject: [PATCH] refactor JobInstance refesh (#2324) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * refactor JobInstance refesh * fix job refesh npe * fix code style * Compatible with J Dk 8 * Modify the job refresh time to 5 秒之前 * fix alert npe * fix devops data * fix json parse error --- .../dinky/controller/AlertRuleController.java | 5 +- .../controller/JobInstanceController.java | 2 +- .../java/org/dinky/data/dto/JobDataDto.java | 134 +++++++++++ .../java/org/dinky/data/model/JobHistory.java | 43 +--- .../org/dinky/data/model/JobInfoDetail.java | 93 +------- .../org/dinky/data/model/JobInstance.java | 18 -- .../dinky/data/options/AlertRuleOptions.java | 3 +- .../main/java/org/dinky/job/FlinkJobTask.java | 121 ++++++++-- .../java/org/dinky/job/Job2MysqlHandler.java | 30 +-- .../handler/JobAlertHandler.java} | 147 ++++++------ .../dinky/job/handler/JobRefeshHandler.java | 217 ++++++++++++++++++ .../org/dinky/service/JobHistoryService.java | 5 +- .../org/dinky/service/JobInstanceService.java | 2 +- .../java/org/dinky/service/TaskService.java | 7 - .../service/impl/JobHistoryServiceImpl.java | 84 +------ .../service/impl/JobInstanceServiceImpl.java | 84 +++---- .../dinky/service/impl/TaskServiceImpl.java | 163 +------------ .../org/dinky/alert/Rules/ExceptionRule.java | 11 +- .../java/org/dinky/data/enums/JobStatus.java | 2 - .../java/org/dinky/data/enums/Status.java | 7 + .../main/java/org/dinky/utils/JSONUtil.java | 10 +- .../main/java/org/dinky/utils/TimeUtil.java | 20 +- .../resources/i18n/messages_en_US.properties | 3 + .../resources/i18n/messages_zh_CN.properties | 3 +- .../constant/FlinkRestResultConstant.java | 3 + .../daemon/constant/FlinkTaskConstant.java | 2 +- .../org/dinky/daemon/entity/TaskQueue.java | 13 +- .../org/dinky/daemon/entity/TaskWorker.java | 25 +- .../org/dinky/daemon/task/DaemonFactory.java | 36 ++- .../org/dinky/daemon/task/DaemonTask.java | 10 +- .../dinky/daemon/task/DaemonTaskConfig.java | 25 +- .../NotSupportGetStatusException.java | 21 +- .../components/CheckpointTable.tsx | 4 +- .../CheckPointsTab/components/CkDesc.tsx | 6 +- .../JobLogs/components/ExceptionTab.tsx | 4 +- .../MetricsFilter/MetricsConfigForm.tsx | 2 +- .../MetricsFilter/MetricsConfigTab.tsx | 2 +- .../JobOverview/components/FlinkTable.tsx | 2 +- .../JobOverview/components/JobDesc.tsx | 4 +- dinky-web/src/types/DevOps/data.d.ts | 4 +- .../1.0.0-SNAPSHOT_schema/mysql/dinky_dml.sql | 2 +- 41 files changed, 730 insertions(+), 649 deletions(-) create mode 100644 dinky-admin/src/main/java/org/dinky/data/dto/JobDataDto.java rename dinky-admin/src/main/java/org/dinky/{configure/schedule/Alert/JobAlerts.java => job/handler/JobAlertHandler.java} (70%) create mode 100644 dinky-admin/src/main/java/org/dinky/job/handler/JobRefeshHandler.java rename dinky-admin/src/main/java/org/dinky/job/FlinkJobTaskPool.java => dinky-gateway/src/main/java/org/dinky/gateway/exception/NotSupportGetStatusException.java (67%) diff --git a/dinky-admin/src/main/java/org/dinky/controller/AlertRuleController.java b/dinky-admin/src/main/java/org/dinky/controller/AlertRuleController.java index 856a7d7f7b..7593f006ec 100644 --- a/dinky-admin/src/main/java/org/dinky/controller/AlertRuleController.java +++ b/dinky-admin/src/main/java/org/dinky/controller/AlertRuleController.java @@ -19,13 +19,13 @@ package org.dinky.controller; -import org.dinky.configure.schedule.Alert.JobAlerts; import org.dinky.data.annotation.Log; import org.dinky.data.enums.BusinessType; import org.dinky.data.enums.Status; import org.dinky.data.model.AlertRule; import org.dinky.data.result.ProTableResult; import org.dinky.data.result.Result; +import org.dinky.job.handler.JobAlertHandler; import org.dinky.service.AlertRuleService; import java.util.List; @@ -53,7 +53,6 @@ public class AlertRuleController { private final AlertRuleService alertRuleService; - private final JobAlerts jobAlerts; @PostMapping("/list") @ApiOperation("Query alert rules list") @@ -81,7 +80,7 @@ public ProTableResult list(@RequestBody JsonNode para) { public Result saveOrUpdateAlertRule(@RequestBody AlertRule alertRule) { boolean saved = alertRuleService.saveOrUpdate(alertRule); if (saved) { - jobAlerts.refreshRulesData(); + JobAlertHandler.getInstance().refreshRulesData(); return Result.succeed(Status.MODIFY_SUCCESS); } return Result.failed(Status.MODIFY_FAILED); diff --git a/dinky-admin/src/main/java/org/dinky/controller/JobInstanceController.java b/dinky-admin/src/main/java/org/dinky/controller/JobInstanceController.java index aff51aa103..cc48ba8afa 100644 --- a/dinky-admin/src/main/java/org/dinky/controller/JobInstanceController.java +++ b/dinky-admin/src/main/java/org/dinky/controller/JobInstanceController.java @@ -117,7 +117,7 @@ public Result getJobInfoDetail(@RequestParam Integer id) { paramType = "query", required = true) public Result refreshJobInfoDetail(@RequestParam Integer id) { - return Result.succeed(taskService.refreshJobInfoDetail(id), Status.RESTART_SUCCESS); + return Result.succeed(jobInstanceService.refreshJobInfoDetail(id), Status.RESTART_SUCCESS); } /** 获取单任务实例的血缘分析 */ diff --git a/dinky-admin/src/main/java/org/dinky/data/dto/JobDataDto.java b/dinky-admin/src/main/java/org/dinky/data/dto/JobDataDto.java new file mode 100644 index 0000000000..4770c09b45 --- /dev/null +++ b/dinky-admin/src/main/java/org/dinky/data/dto/JobDataDto.java @@ -0,0 +1,134 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.dinky.data.dto; + +import org.dinky.data.model.JobHistory; +import org.dinky.utils.JSONUtil; + +import java.time.LocalDateTime; + +import com.baomidou.mybatisplus.annotation.TableField; +import com.fasterxml.jackson.databind.JsonNode; + +import io.swagger.annotations.ApiModelProperty; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class JobDataDto { + + @ApiModelProperty( + value = "ID", + dataType = "Integer", + example = "1", + notes = "Unique identifier for the job history") + private Integer id; + + @ApiModelProperty( + value = "Tenant ID", + dataType = "Integer", + example = "1", + notes = "Tenant ID associated with the job history") + private Integer tenantId; + + @TableField(exist = false) + @ApiModelProperty(value = "Job Object", notes = "Object representing job details") + private JsonNode job; + + @TableField(exist = false) + @ApiModelProperty(value = "Exceptions Object", notes = "Object representing job exceptions") + private JsonNode exceptions; + + @TableField(exist = false) + @ApiModelProperty(value = "Checkpoints Object", notes = "Object representing job checkpoints") + private JsonNode checkpoints; + + @TableField(exist = false) + @ApiModelProperty(value = "Checkpoints Config Object", notes = "Object representing checkpoints configuration") + private JsonNode checkpointsConfig; + + @TableField(exist = false) + @ApiModelProperty(value = "Config Object", notes = "Object representing job configuration") + private JsonNode config; + + @TableField(exist = false) + @ApiModelProperty(value = "Jar Object", notes = "Object representing the JAR used in the job") + private JsonNode jar; + + @TableField(exist = false) + @ApiModelProperty(value = "Cluster Object", notes = "Object representing the cluster") + private JsonNode cluster; + + @TableField(exist = false) + @ApiModelProperty(value = "Cluster Configuration Object", notes = "Object representing cluster configuration") + private JsonNode clusterConfiguration; + + @TableField(exist = false) + @ApiModelProperty( + value = "Error Flag", + dataType = "boolean", + example = "true", + notes = "Flag indicating if there was an error") + private boolean error; + + @TableField(exist = false) + @ApiModelProperty( + value = "Error Message", + dataType = "boolean", + example = "true", + notes = "Flag indicating if there was an error") + private String errorMsg; + + public JobHistory toJobHistory() { + return JobHistory.builder() + .id(this.id) + .tenantId(this.tenantId) + .jobJson(JSONUtil.toJsonString(getJob())) + .exceptionsJson(JSONUtil.toJsonString(getExceptions())) + .checkpointsJson(JSONUtil.toJsonString(getCheckpoints())) + .checkpointsConfigJson(JSONUtil.toJsonString(getCheckpointsConfig())) + .configJson(JSONUtil.toJsonString(getConfig())) + .jarJson(JSONUtil.toJsonString(getJar())) + .clusterJson(JSONUtil.toJsonString(getCluster())) + .clusterConfigurationJson(JSONUtil.toJsonString(getClusterConfiguration())) + .updateTime(LocalDateTime.now()) + .build(); + } + + public static JobDataDto fromJobHistory(JobHistory jobHistory) { + return JobDataDto.builder() + .id(jobHistory.getId()) + .tenantId(jobHistory.getTenantId()) + .job(JSONUtil.parseToJsonNode(jobHistory.getJobJson())) + .exceptions(JSONUtil.parseToJsonNode(jobHistory.getExceptionsJson())) + .checkpoints(JSONUtil.parseToJsonNode(jobHistory.getCheckpointsJson())) + .checkpointsConfig(JSONUtil.parseToJsonNode(jobHistory.getCheckpointsConfigJson())) + .config(JSONUtil.parseToJsonNode(jobHistory.getConfigJson())) + .jar(JSONUtil.parseToJsonNode(jobHistory.getJarJson())) + .cluster(JSONUtil.parseToJsonNode(jobHistory.getClusterJson())) + .clusterConfiguration(JSONUtil.parseToJsonNode(jobHistory.getClusterConfigurationJson())) + .build(); + } +} diff --git a/dinky-admin/src/main/java/org/dinky/data/model/JobHistory.java b/dinky-admin/src/main/java/org/dinky/data/model/JobHistory.java index 2845bc4171..b80620c792 100644 --- a/dinky-admin/src/main/java/org/dinky/data/model/JobHistory.java +++ b/dinky-admin/src/main/java/org/dinky/data/model/JobHistory.java @@ -25,10 +25,10 @@ import com.baomidou.mybatisplus.annotation.FieldFill; import com.baomidou.mybatisplus.annotation.TableField; import com.baomidou.mybatisplus.annotation.TableName; -import com.fasterxml.jackson.databind.node.ObjectNode; import io.swagger.annotations.ApiModel; import io.swagger.annotations.ApiModelProperty; +import lombok.Builder; import lombok.Data; import lombok.EqualsAndHashCode; @@ -38,6 +38,7 @@ * @since 2022/3/2 19:48 */ @Data +@Builder @EqualsAndHashCode(callSuper = false) @TableName("dinky_job_history") @ApiModel(value = "JobHistory", description = "Job History Information") @@ -59,10 +60,6 @@ public class JobHistory implements Serializable { notes = "Tenant ID associated with the job history") private Integer tenantId; - @TableField(exist = false) - @ApiModelProperty(value = "Job Object", notes = "Object representing job details") - private ObjectNode job; - @ApiModelProperty( value = "Job JSON", dataType = "String", @@ -70,10 +67,6 @@ public class JobHistory implements Serializable { notes = "JSON representation of the job") private String jobJson; - @TableField(exist = false) - @ApiModelProperty(value = "Exceptions Object", notes = "Object representing job exceptions") - private ObjectNode exceptions; - @ApiModelProperty( value = "Exceptions JSON", dataType = "String", @@ -81,10 +74,6 @@ public class JobHistory implements Serializable { notes = "JSON representation of exceptions") private String exceptionsJson; - @TableField(exist = false) - @ApiModelProperty(value = "Checkpoints Object", notes = "Object representing job checkpoints") - private ObjectNode checkpoints; - @ApiModelProperty( value = "Checkpoints JSON", dataType = "String", @@ -92,10 +81,6 @@ public class JobHistory implements Serializable { notes = "JSON representation of checkpoints") private String checkpointsJson; - @TableField(exist = false) - @ApiModelProperty(value = "Checkpoints Config Object", notes = "Object representing checkpoints configuration") - private ObjectNode checkpointsConfig; - @ApiModelProperty( value = "Checkpoints Config JSON", dataType = "String", @@ -103,10 +88,6 @@ public class JobHistory implements Serializable { notes = "JSON representation of checkpoints config") private String checkpointsConfigJson; - @TableField(exist = false) - @ApiModelProperty(value = "Config Object", notes = "Object representing job configuration") - private ObjectNode config; - @ApiModelProperty( value = "Config JSON", dataType = "String", @@ -114,10 +95,6 @@ public class JobHistory implements Serializable { notes = "JSON representation of config") private String configJson; - @TableField(exist = false) - @ApiModelProperty(value = "Jar Object", notes = "Object representing the JAR used in the job") - private ObjectNode jar; - @ApiModelProperty( value = "Jar JSON", dataType = "String", @@ -125,10 +102,6 @@ public class JobHistory implements Serializable { notes = "JSON representation of the JAR") private String jarJson; - @TableField(exist = false) - @ApiModelProperty(value = "Cluster Object", notes = "Object representing the cluster") - private ObjectNode cluster; - @ApiModelProperty( value = "Cluster JSON", dataType = "String", @@ -136,10 +109,6 @@ public class JobHistory implements Serializable { notes = "JSON representation of the cluster") private String clusterJson; - @TableField(exist = false) - @ApiModelProperty(value = "Cluster Configuration Object", notes = "Object representing cluster configuration") - private ObjectNode clusterConfiguration; - @ApiModelProperty( value = "Cluster Configuration JSON", dataType = "String", @@ -153,12 +122,4 @@ public class JobHistory implements Serializable { dataType = "LocalDateTime", notes = "Timestamp indicating the last update time") private LocalDateTime updateTime; - - @TableField(exist = false) - @ApiModelProperty( - value = "Error Flag", - dataType = "boolean", - example = "true", - notes = "Flag indicating if there was an error") - private boolean error; } diff --git a/dinky-admin/src/main/java/org/dinky/data/model/JobInfoDetail.java b/dinky-admin/src/main/java/org/dinky/data/model/JobInfoDetail.java index 3e54a0b7bf..ef53059a43 100644 --- a/dinky-admin/src/main/java/org/dinky/data/model/JobInfoDetail.java +++ b/dinky-admin/src/main/java/org/dinky/data/model/JobInfoDetail.java @@ -19,16 +19,20 @@ package org.dinky.data.model; +import org.dinky.data.dto.JobDataDto; + import java.util.Set; import io.swagger.annotations.ApiModel; import io.swagger.annotations.ApiModelProperty; +import lombok.Data; /** * JobInfoDetail * * @since 2022/3/1 19:31 */ +@Data @ApiModel(value = "JobInfoDetail", description = "Job Information Detail") public class JobInfoDetail { @@ -47,99 +51,16 @@ public class JobInfoDetail { @ApiModelProperty(value = "History", notes = "Details about the history") private History history; - @ApiModelProperty(value = "Job History", notes = "Details about the job history") - private JobHistory jobHistory; - @ApiModelProperty(value = "Job Manager Configuration", notes = "Details about the job manager configuration") private JobManagerConfiguration jobManagerConfiguration; + @ApiModelProperty(value = "JobDataDto", notes = "Details about the job") + private JobDataDto jobDataDto; + @ApiModelProperty(value = "Task Manager Configurations", notes = "Set of task manager configurations") private Set taskManagerConfiguration; - @ApiModelProperty( - value = "Refresh Count", - dataType = "Integer", - example = "5", - notes = "Count of refresh operations") - private Integer refreshCount; - public JobInfoDetail(Integer id) { this.id = id; - this.refreshCount = 0; - } - - public Integer getId() { - return id; - } - - public void setId(Integer id) { - this.id = id; - } - - public JobInstance getInstance() { - return instance; - } - - public void setInstance(JobInstance instance) { - this.instance = instance; - } - - public Cluster getCluster() { - return cluster; - } - - public void setCluster(Cluster cluster) { - this.cluster = cluster; - } - - public ClusterConfiguration getClusterConfiguration() { - return clusterConfiguration; - } - - public void setClusterConfiguration(ClusterConfiguration clusterConfiguration) { - this.clusterConfiguration = clusterConfiguration; - } - - public void setJobManagerConfiguration(JobManagerConfiguration jobMangerConfiguration) { - this.jobManagerConfiguration = jobMangerConfiguration; - } - - public JobManagerConfiguration getJobManagerConfiguration() { - return jobManagerConfiguration; - } - - public void setTaskManagerConfiguration(Set taskManagerConfiguration) { - this.taskManagerConfiguration = taskManagerConfiguration; - } - - public Set getTaskManagerConfiguration() { - return taskManagerConfiguration; - } - - public History getHistory() { - return history; - } - - public void setHistory(History history) { - this.history = history; - } - - public JobHistory getJobHistory() { - return jobHistory; - } - - public void setJobHistory(JobHistory jobHistory) { - this.jobHistory = jobHistory; - } - - public void refresh() { - refreshCount = refreshCount + 1; - if (isNeedSave()) { - refreshCount = 0; - } - } - - public boolean isNeedSave() { - return refreshCount % 60 == 0; } } diff --git a/dinky-admin/src/main/java/org/dinky/data/model/JobInstance.java b/dinky-admin/src/main/java/org/dinky/data/model/JobInstance.java index adf4206e02..6478080c70 100644 --- a/dinky-admin/src/main/java/org/dinky/data/model/JobInstance.java +++ b/dinky-admin/src/main/java/org/dinky/data/model/JobInstance.java @@ -134,22 +134,4 @@ public class JobInstance implements Serializable { example = "2", notes = "Count of failed restarts") private Integer failedRestartCount; - - @TableField(exist = false) - @ApiModelProperty(value = "Type", dataType = "String", notes = "Type of the job instance") - private String type; - - @TableField(exist = false) - @ApiModelProperty( - value = "Cluster Name", - dataType = "String", - notes = "Name of the cluster associated with the job instance") - private String clusterName; - - @TableField(exist = false) - @ApiModelProperty( - value = "Job Manager Address", - dataType = "String", - notes = "Job manager address associated with the job instance") - private String jobManagerAddress; } diff --git a/dinky-admin/src/main/java/org/dinky/data/options/AlertRuleOptions.java b/dinky-admin/src/main/java/org/dinky/data/options/AlertRuleOptions.java index a097987c10..05b4dda587 100644 --- a/dinky-admin/src/main/java/org/dinky/data/options/AlertRuleOptions.java +++ b/dinky-admin/src/main/java/org/dinky/data/options/AlertRuleOptions.java @@ -31,7 +31,8 @@ public class AlertRuleOptions { public static final String JOB_ALERT_RULE_CHECK_POINTS = "checkPoints"; public static final String JOB_ALERT_RULE_CLUSTER = "cluster"; public static final String JOB_ALERT_RULE_EXCEPTIONS = "exceptions"; - public static final String JOB_ALERT_RULE_REFRESH_RULES_DATA = "refreshRulesData"; + public static final String JOB_ALERT_RULE_EXCEPTIONS_MSG = "exceptions_msg"; + public static final String JOB_ALERT_RULE_EXCEPTION_CHECK = "exceptionRule"; public static final String JOB_ALERT_RULE_CHECKPOINT_RULES = "checkpointRule"; public static final String JOB_ALERT_RULE_TASK = "task"; public static final String JOB_ALERT_RULE_TASK_URL = "taskUrl"; diff --git a/dinky-admin/src/main/java/org/dinky/job/FlinkJobTask.java b/dinky-admin/src/main/java/org/dinky/job/FlinkJobTask.java index 06d3c73fdf..20355540d0 100644 --- a/dinky-admin/src/main/java/org/dinky/job/FlinkJobTask.java +++ b/dinky-admin/src/main/java/org/dinky/job/FlinkJobTask.java @@ -19,65 +19,138 @@ package org.dinky.job; -import org.dinky.assertion.Asserts; import org.dinky.context.SpringContextUtils; +import org.dinky.context.TenantContextHolder; import org.dinky.daemon.constant.FlinkTaskConstant; -import org.dinky.daemon.pool.DefaultThreadPool; import org.dinky.daemon.task.DaemonTask; import org.dinky.daemon.task.DaemonTaskConfig; -import org.dinky.data.enums.JobStatus; -import org.dinky.data.model.JobInstance; -import org.dinky.service.TaskService; +import org.dinky.data.model.JobInfoDetail; +import org.dinky.job.handler.JobAlertHandler; +import org.dinky.job.handler.JobRefeshHandler; +import org.dinky.service.JobInstanceService; -import java.time.Duration; -import java.time.LocalDateTime; +import java.util.Objects; import org.springframework.context.annotation.DependsOn; +import lombok.extern.slf4j.Slf4j; + @DependsOn("springContextUtils") +@Slf4j public class FlinkJobTask implements DaemonTask { private DaemonTaskConfig config; public static final String TYPE = "jobInstance"; - private static TaskService taskService; + private static final JobInstanceService jobInstanceService; private long preDealTime; + private long refreshCount = 0; static { - taskService = SpringContextUtils.getBean("taskServiceImpl", TaskService.class); + jobInstanceService = SpringContextUtils.getBean("jobInstanceServiceImpl", JobInstanceService.class); } + private JobInfoDetail jobInfoDetail; + @Override public DaemonTask setConfig(DaemonTaskConfig config) { this.config = config; + this.jobInfoDetail = jobInstanceService.getJobInfoDetail(config.getId()); return this; } + /** + * Processing tasks. + *

+ * Handle job refresh, alarm, monitoring and other actions + * Returns true if the job has completed or exceeded the time to obtain data, + * indicating that the processing is complete and moved out of the thread pool + * Otherwise, false is returned, indicating that the processing is not completed and continues to remain in the thread pool + *

+ * + * @return Returns true if the job has completed, otherwise returns false + */ @Override - public String getType() { - return TYPE; + public boolean dealTask() { + // TODO: 目前只是完成作业刷新,告警与监控需要继续完善 + volatilityBalance(); + TenantContextHolder.set(1); + + boolean isDone = JobRefeshHandler.refeshJob(jobInfoDetail, isNeedSave()); + JobAlertHandler.getInstance().check(jobInfoDetail); + + return isDone; } - @Override - public void dealTask() { + /** + * Volatility balance. + *

+ * This method is used to perform volatility equilibrium operations. Between each call, + * by calculating the time interval between the current time and the last processing time, + * If the interval is less than the set sleep time (TIME_SLEEP), + * The thread sleeps for a period of time. Then update the last processing time to the current time. + *

+ */ + public void volatilityBalance() { long gap = System.currentTimeMillis() - this.preDealTime; if (gap < FlinkTaskConstant.TIME_SLEEP) { try { Thread.sleep(FlinkTaskConstant.TIME_SLEEP); } catch (InterruptedException e) { - e.printStackTrace(); + log.error(e.getMessage(), e); } } preDealTime = System.currentTimeMillis(); - JobInstance jobInstance = taskService.refreshJobInstance(config.getId(), false); - if ((!JobStatus.isDone(jobInstance.getStatus())) - || (Asserts.isNotNull(jobInstance.getFinishTime()) - && Duration.between(jobInstance.getFinishTime(), LocalDateTime.now()) - .toMinutes() - < 1)) { - DefaultThreadPool.getInstance().execute(this); - } else { - taskService.handleJobDone(jobInstance); - FlinkJobTaskPool.INSTANCE.remove(config.getId().toString()); + } + + /** + * Determine if you need to save. + *

+ * This method is used to determine whether saving is required. + * According to the value of the refresh count, + * if the refreshCount is divisible by 60, it returns true, indicating that it needs to be saved. + * At the same time, if you need to save, reset refreshCount to 0 to restart the count. + * Finally, increment refreshCount by 1. + *

+ * + * @return Returns true if you need to save, otherwise returns false + */ + public boolean isNeedSave() { + boolean isNeed = refreshCount % 60 == 0; + if (isNeed) { + refreshCount = 0; } + refreshCount++; + return isNeed; + } + + /** + * Determine whether objects are equal. + *

+ * This method is used to determine whether the current object is equal to the given object. + * If two object references are the same, return true directly. + * Returns false if the given object is null or the class of the given object is not the same as the class of the current object. + * Otherwise, convert the given object to the FlinkJobTask type and compare the config IDs of the two objects equal. + * In FlinkJobTask, the ID of the config is unique, so only need to compare whether the ID of the config is equal or not. + * If the objects are judged to be equal, the same task will join the thread pool multiple times. + *

+ * + * @param obj object to compare + * @return Returns true if two objects are equal, otherwise returns false + */ + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + FlinkJobTask other = (FlinkJobTask) obj; + return Objects.equals(config.getId(), other.config.getId()); + } + + @Override + public String getType() { + return TYPE; } } diff --git a/dinky-admin/src/main/java/org/dinky/job/Job2MysqlHandler.java b/dinky-admin/src/main/java/org/dinky/job/Job2MysqlHandler.java index fe0e5f84ca..a6ea6a1a67 100644 --- a/dinky-admin/src/main/java/org/dinky/job/Job2MysqlHandler.java +++ b/dinky-admin/src/main/java/org/dinky/job/Job2MysqlHandler.java @@ -183,21 +183,21 @@ public boolean success() { task.setJobInstanceId(jobInstance.getId()); taskService.updateById(task); - JobHistory jobHistory = new JobHistory(); - jobHistory.setId(jobInstance.getId()); - jobHistory.setClusterJson(JSONUtil.toJsonString(cluster)); - - jobHistory.setJarJson( - Asserts.isNotNull(job.getJobConfig().getJarId()) - ? JSONUtil.toJsonString( - jarService.getById(job.getJobConfig().getJarId())) - : null); - - jobHistory.setClusterConfigurationJson( - Asserts.isNotNull(clusterConfigurationId) - ? JSONUtil.toJsonString( - clusterConfigurationService.getClusterConfigById(clusterConfigurationId)) - : null); + JobHistory.JobHistoryBuilder jobHistoryBuilder = JobHistory.builder(); + JobHistory jobHistory = jobHistoryBuilder + .id(jobInstance.getId()) + .clusterJson(JSONUtil.toJsonString(cluster)) + .jarJson( + Asserts.isNotNull(job.getJobConfig().getJarId()) + ? JSONUtil.toJsonString( + jarService.getById(job.getJobConfig().getJarId())) + : null) + .clusterConfigurationJson( + Asserts.isNotNull(clusterConfigurationId) + ? JSONUtil.toJsonString( + clusterConfigurationService.getClusterConfigById(clusterConfigurationId)) + : null) + .build(); jobHistoryService.save(jobHistory); DaemonFactory.addTask(DaemonTaskConfig.build(FlinkJobTask.TYPE, jobInstance.getId())); diff --git a/dinky-admin/src/main/java/org/dinky/configure/schedule/Alert/JobAlerts.java b/dinky-admin/src/main/java/org/dinky/job/handler/JobAlertHandler.java similarity index 70% rename from dinky-admin/src/main/java/org/dinky/configure/schedule/Alert/JobAlerts.java rename to dinky-admin/src/main/java/org/dinky/job/handler/JobAlertHandler.java index 686ce16063..3de6bd5fad 100644 --- a/dinky-admin/src/main/java/org/dinky/configure/schedule/Alert/JobAlerts.java +++ b/dinky-admin/src/main/java/org/dinky/job/handler/JobAlertHandler.java @@ -17,7 +17,7 @@ * */ -package org.dinky.configure.schedule.Alert; +package org.dinky.job.handler; import org.dinky.alert.Alert; import org.dinky.alert.AlertConfig; @@ -25,9 +25,10 @@ import org.dinky.alert.Rules.CheckpointsRule; import org.dinky.alert.Rules.ExceptionRule; import org.dinky.assertion.Asserts; -import org.dinky.configure.schedule.BaseSchedule; import org.dinky.context.FreeMarkerHolder; -import org.dinky.context.TenantContextHolder; +import org.dinky.context.SpringContextUtils; +import org.dinky.daemon.pool.DefaultThreadPool; +import org.dinky.data.constant.FlinkRestResultConstant; import org.dinky.data.dto.AlertRuleDTO; import org.dinky.data.enums.Status; import org.dinky.data.model.AlertGroup; @@ -38,11 +39,10 @@ import org.dinky.data.model.SystemConfiguration; import org.dinky.data.model.Task; import org.dinky.data.options.AlertRuleOptions; -import org.dinky.job.FlinkJobTaskPool; -import org.dinky.service.impl.AlertGroupServiceImpl; -import org.dinky.service.impl.AlertHistoryServiceImpl; +import org.dinky.service.AlertGroupService; +import org.dinky.service.AlertHistoryService; +import org.dinky.service.TaskService; import org.dinky.service.impl.AlertRuleServiceImpl; -import org.dinky.service.impl.TaskServiceImpl; import org.dinky.utils.TimeUtil; import java.io.IOException; @@ -51,8 +51,6 @@ import java.util.Map; import java.util.stream.Collectors; -import javax.annotation.PostConstruct; - import org.jeasy.rules.api.Facts; import org.jeasy.rules.api.Rule; import org.jeasy.rules.api.Rules; @@ -60,53 +58,23 @@ import org.jeasy.rules.core.DefaultRulesEngine; import org.jeasy.rules.core.RuleBuilder; import org.jeasy.rules.spel.SpELCondition; -import org.springframework.beans.factory.annotation.Configurable; -import org.springframework.scheduling.support.PeriodicTrigger; -import org.springframework.stereotype.Component; +import org.springframework.context.annotation.DependsOn; import cn.hutool.core.text.StrFormatter; import cn.hutool.json.JSONObject; import cn.hutool.json.JSONUtil; import freemarker.template.TemplateException; import lombok.Data; -import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -/** - * This class, JobAlerts, is responsible for scheduling and executing job alerts. - * It checks for rule conditions and triggers alerts based on those conditions. - */ -@Configurable -@Component -@RequiredArgsConstructor @Slf4j -public class JobAlerts extends BaseSchedule { - - /** - * Service for managing alert history. - */ - private final AlertHistoryServiceImpl alertHistoryService; - - /** - * Service for managing alert groups. - */ - private final AlertGroupServiceImpl alertGroupService; - - /** - * Service for managing tasks. - */ - private final TaskServiceImpl taskService; - - /** - * Service for managing alert rules. - */ - private final AlertRuleServiceImpl alertRuleService; +@DependsOn("springContextUtils") +public class JobAlertHandler { - /** - * The pool containing Flink job tasks. - * // TODO 任务刷新逻辑重购后记得修改这里逻辑 - */ - private final FlinkJobTaskPool taskPool = FlinkJobTaskPool.INSTANCE; + private static final AlertHistoryService alertHistoryService; + private static final AlertGroupService alertGroupService; + private static final TaskService taskService; + private static final AlertRuleServiceImpl alertRuleService; /** * Rules for evaluating alert conditions. @@ -125,53 +93,80 @@ public class JobAlerts extends BaseSchedule { private final Facts ruleFacts = new Facts(); + private static JobAlertHandler defaultJobAlertHandler; + + static { + taskService = SpringContextUtils.getBean("taskServiceImpl", TaskService.class); + alertHistoryService = SpringContextUtils.getBean("alertHistoryServiceImpl", AlertHistoryService.class); + alertGroupService = SpringContextUtils.getBean("alertGroupServiceImpl", AlertGroupService.class); + alertRuleService = SpringContextUtils.getBean("alertRuleServiceImpl", AlertRuleServiceImpl.class); + } + + public static JobAlertHandler getInstance() { + if (defaultJobAlertHandler == null) { + synchronized (DefaultThreadPool.class) { + if (defaultJobAlertHandler == null) { + defaultJobAlertHandler = new JobAlertHandler(); + } + } + } + return defaultJobAlertHandler; + } + /** * Initializes the JobAlerts class by refreshing rules and setting up the scheduler. */ - @PostConstruct - public void init() { + public JobAlertHandler() { refreshRulesData(); - addSchedule(AlertRuleOptions.JOB_ALERT_SCHEDULE, this::check, new PeriodicTrigger(1000 * 30)); } /** * checks for alert conditions for each job in the task pool. */ - protected void check() { - TenantContextHolder.set(1); - - for (Map.Entry job : taskPool.entrySet()) { - JobInfoDetail jobInfoDetail = job.getValue(); - String key = job.getKey(); - ruleFacts.put(AlertRuleOptions.JOB_ALERT_RULE_TIME, TimeUtil.nowStr()); - ruleFacts.put(AlertRuleOptions.JOB_ALERT_RULE_JOB_DETAIL, jobInfoDetail); - ruleFacts.put( - AlertRuleOptions.JOB_ALERT_RULE_JOB_NAME, - jobInfoDetail.getJobHistory().getJob()); - ruleFacts.put(AlertRuleOptions.JOB_ALERT_RULE_KEY, key); - ruleFacts.put(AlertRuleOptions.JOB_ALERT_RULE_JOB_INSTANCE, jobInfoDetail.getInstance()); + public void check(JobInfoDetail jobInfoDetail) { + ruleFacts.put(AlertRuleOptions.JOB_ALERT_RULE_TIME, TimeUtil.nowStr()); + ruleFacts.put(AlertRuleOptions.JOB_ALERT_RULE_JOB_DETAIL, jobInfoDetail); + ruleFacts.put( + AlertRuleOptions.JOB_ALERT_RULE_JOB_NAME, + jobInfoDetail.getJobDataDto().getJob()); + ruleFacts.put( + AlertRuleOptions.JOB_ALERT_RULE_KEY, jobInfoDetail.getInstance().getId()); + ruleFacts.put(AlertRuleOptions.JOB_ALERT_RULE_JOB_INSTANCE, jobInfoDetail.getInstance()); + ruleFacts.put( + AlertRuleOptions.JOB_ALERT_RULE_START_TIME, + TimeUtil.convertTimeToString(jobInfoDetail.getInstance().getCreateTime())); + ruleFacts.put( + AlertRuleOptions.JOB_ALERT_RULE_END_TIME, + TimeUtil.convertTimeToString(jobInfoDetail.getInstance().getFinishTime())); + ruleFacts.put( + AlertRuleOptions.JOB_ALERT_RULE_CHECK_POINTS, + jobInfoDetail.getJobDataDto().getCheckpoints()); + ruleFacts.put(AlertRuleOptions.JOB_ALERT_RULE_CLUSTER, jobInfoDetail.getCluster()); + ruleFacts.put( + AlertRuleOptions.JOB_ALERT_RULE_EXCEPTIONS, + jobInfoDetail.getJobDataDto().getExceptions()); + if (jobInfoDetail.getJobDataDto().isError()) { ruleFacts.put( - AlertRuleOptions.JOB_ALERT_RULE_START_TIME, - TimeUtil.convertTimeToString(jobInfoDetail.getHistory().getStartTime())); + AlertRuleOptions.JOB_ALERT_RULE_EXCEPTIONS_MSG, + jobInfoDetail.getJobDataDto().getErrorMsg()); + } else { ruleFacts.put( - AlertRuleOptions.JOB_ALERT_RULE_END_TIME, - TimeUtil.convertTimeToString(jobInfoDetail.getHistory().getEndTime())); - ruleFacts.put( - AlertRuleOptions.JOB_ALERT_RULE_CHECK_POINTS, - jobInfoDetail.getJobHistory().getCheckpoints()); - ruleFacts.put(AlertRuleOptions.JOB_ALERT_RULE_CLUSTER, jobInfoDetail.getCluster()); - ruleFacts.put( - AlertRuleOptions.JOB_ALERT_RULE_EXCEPTIONS, - jobInfoDetail.getJobHistory().getExceptions()); - rulesEngine.fire(rules, ruleFacts); + AlertRuleOptions.JOB_ALERT_RULE_EXCEPTIONS_MSG, + jobInfoDetail + .getJobDataDto() + .getExceptions() + .get(FlinkRestResultConstant.ROOT_EXCEPTION) + .toString()); } + + rulesEngine.fire(rules, ruleFacts); } /** * Refreshes the alert rules and related data. */ public void refreshRulesData() { - ruleFacts.put(AlertRuleOptions.JOB_ALERT_RULE_REFRESH_RULES_DATA, new ExceptionRule()); + ruleFacts.put(AlertRuleOptions.JOB_ALERT_RULE_EXCEPTION_CHECK, new ExceptionRule()); ruleFacts.put(AlertRuleOptions.JOB_ALERT_RULE_CHECKPOINT_RULES, new CheckpointsRule()); List ruleDTOS = alertRuleService.getBaseMapper().selectWithTemplate(); @@ -286,7 +281,7 @@ private void sendAlert( } @Data - public static class RuleItem { + private static class RuleItem { private String ruleKey; private String ruleOperator; // private int rulePriority; diff --git a/dinky-admin/src/main/java/org/dinky/job/handler/JobRefeshHandler.java b/dinky-admin/src/main/java/org/dinky/job/handler/JobRefeshHandler.java new file mode 100644 index 0000000000..824182a2f8 --- /dev/null +++ b/dinky-admin/src/main/java/org/dinky/job/handler/JobRefeshHandler.java @@ -0,0 +1,217 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.dinky.job.handler; + +import org.dinky.api.FlinkAPI; +import org.dinky.assertion.Asserts; +import org.dinky.context.SpringContextUtils; +import org.dinky.data.constant.FlinkRestResultConstant; +import org.dinky.data.dto.JobDataDto; +import org.dinky.data.enums.JobStatus; +import org.dinky.data.model.ClusterConfiguration; +import org.dinky.data.model.JobInfoDetail; +import org.dinky.data.model.JobInstance; +import org.dinky.gateway.Gateway; +import org.dinky.gateway.config.GatewayConfig; +import org.dinky.gateway.enums.GatewayType; +import org.dinky.gateway.exception.NotSupportGetStatusException; +import org.dinky.gateway.model.FlinkClusterConfig; +import org.dinky.job.JobConfig; +import org.dinky.service.JobHistoryService; +import org.dinky.service.JobInstanceService; +import org.dinky.utils.TimeUtil; + +import java.time.Duration; +import java.time.LocalDateTime; + +import org.springframework.context.annotation.DependsOn; +import org.springframework.stereotype.Component; + +import com.fasterxml.jackson.databind.JsonNode; + +import cn.hutool.json.JSONObject; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@RequiredArgsConstructor +@Component +@DependsOn("springContextUtils") +public class JobRefeshHandler { + + private static final JobInstanceService jobInstanceService; + private static final JobHistoryService jobHistoryService; + + static { + jobInstanceService = SpringContextUtils.getBean("jobInstanceServiceImpl", JobInstanceService.class); + jobHistoryService = SpringContextUtils.getBean("jobHistoryServiceImpl", JobHistoryService.class); + } + + /** + * Refresh the job + * It receives two parameters: {@link org.dinky.data.model.JobInfoDetail} and needSave and returns a Boolean value. + * When the return value is true, the job has completed and needs to be removed from the thread pool, + * otherwise it means that the next round of flushing continues + * + * @param jobInfoDetail job info detail. + * @param needSave Indicates if the job needs to be saved. + * @return True if the job is done, false otherwise. + */ + public static boolean refeshJob(JobInfoDetail jobInfoDetail, boolean needSave) { + log.debug( + "Start to refresh job: {}->{}", + jobInfoDetail.getInstance().getId(), + jobInfoDetail.getInstance().getName()); + + JobInstance jobInstance = jobInfoDetail.getInstance(); + String oldStatus = jobInstance.getStatus(); + + JobDataDto jobDataDto = getJobHistory( + jobInstance.getId(), + jobInfoDetail.getCluster().getJobManagerHost(), + jobInfoDetail.getInstance().getJid()); + + if (Asserts.isNull(jobDataDto.getJob()) || jobDataDto.isError()) { + // If the job fails to get it, the default Finish Time is the current time + jobInstance.setStatus(JobStatus.UNKNOWN.getValue()); + jobInstance.setFinishTime(LocalDateTime.now()); + jobInstance.setError(jobDataDto.getErrorMsg()); + jobInfoDetail.getJobDataDto().setError(true); + jobInfoDetail.getJobDataDto().setErrorMsg(jobDataDto.getErrorMsg()); + } else { + jobInfoDetail.setJobDataDto(jobDataDto); + JsonNode job = jobDataDto.getJob(); + Long startTime = job.get(FlinkRestResultConstant.JOB_CREATE_TIME).asLong(); + Long endTime = job.get(FlinkRestResultConstant.JOB_FINISH_TIME).asLong(); + + jobInstance.setStatus(getJobStatus(jobInfoDetail).getValue()); + jobInstance.setDuration( + job.get(FlinkRestResultConstant.JOB_DURATION).asLong()); + jobInstance.setCreateTime(TimeUtil.longToLocalDateTime(startTime)); + // if the job is still running the end-time is -1 + jobInstance.setFinishTime(TimeUtil.longToLocalDateTime(endTime)); + } + jobInstance.setUpdateTime(LocalDateTime.now()); + + // Set to true if the job status has completed + // If the job status is Unknown and the status fails to be updated for 1 minute, set to true and discard the + // update + boolean isDone = (JobStatus.isDone(jobInstance.getStatus())) + || (TimeUtil.localDateTimeToLong(jobInstance.getFinishTime()) > 0 + && Duration.between(jobInstance.getFinishTime(), LocalDateTime.now()) + .toMinutes() + >= 1); + + if (!oldStatus.equals(jobInstance.getStatus()) || isDone || needSave) { + log.debug("Dump JobInfo to database: {}->{}", jobInstance.getId(), jobInstance.getName()); + jobInstanceService.updateById(jobInstance); + jobHistoryService.updateById(jobInfoDetail.getJobDataDto().toJobHistory()); + } + + if (isDone) { + log.debug("Job is done: {}->{}", jobInstance.getId(), jobInstance.getName()); + handleJobDone(jobInfoDetail); + } + return isDone; + } + + /** + * Retrieves job history. + * getJobStatusInformationFromFlinkRestAPI + * + * @param id The job ID. + * @param jobManagerHost The job manager host. + * @param jobId The job ID. + * @return {@link org.dinky.data.dto.JobDataDto}. + */ + public static JobDataDto getJobHistory(Integer id, String jobManagerHost, String jobId) { + JobDataDto.JobDataDtoBuilder builder = JobDataDto.builder(); + FlinkAPI api = FlinkAPI.build(jobManagerHost); + try { + JsonNode jobInfo = FlinkAPI.build(jobManagerHost).getJobInfo(jobId); + if (jobInfo.has(FlinkRestResultConstant.ERRORS)) { + throw new Exception(String.valueOf(jobInfo.get(FlinkRestResultConstant.ERRORS))); + } + return builder.id(id) + .checkpoints(api.getCheckPoints(jobId)) + .checkpointsConfig(api.getCheckPointsConfig(jobId)) + .exceptions(api.getException(jobId)) + .job(jobInfo) + .config(api.getJobsConfig(jobId)) + .build(); + } catch (Exception e) { + log.error("Connect {} failed,{}", jobManagerHost, e.getMessage()); + return builder.id(id).error(true).errorMsg(e.getMessage()).build(); + } + } + + /** + * Gets the job status. + * + * @param jobInfoDetail The job info detail. + * @return The job status. + */ + private static JobStatus getJobStatus(JobInfoDetail jobInfoDetail) { + + ClusterConfiguration clusterCfg = jobInfoDetail.getClusterConfiguration(); + + if (!Asserts.isNull(clusterCfg)) { + try { + String appId = jobInfoDetail.getCluster().getName(); + + GatewayConfig gatewayConfig = GatewayConfig.build(clusterCfg.getFlinkClusterCfg()); + gatewayConfig.getClusterConfig().setAppId(appId); + gatewayConfig + .getFlinkConfig() + .setJobName(jobInfoDetail.getInstance().getName()); + + Gateway gateway = Gateway.build(gatewayConfig); + return gateway.getJobStatusById(appId); + } catch (NotSupportGetStatusException ignored) { + } + } + + JobDataDto jobDataDto = jobInfoDetail.getJobDataDto(); + String status = + jobDataDto.getJob().get(FlinkRestResultConstant.JOB_STATE).asText(); + return JobStatus.get(status); + } + + /** + * Handles job completion. + * + * @param jobInfoDetail The job info detail. + */ + private static void handleJobDone(JobInfoDetail jobInfoDetail) { + JobInstance jobInstance = jobInfoDetail.getInstance(); + JobDataDto jobDataDto = jobInfoDetail.getJobDataDto(); + String clusterType = jobInfoDetail.getCluster().getType(); + + if (GatewayType.isDeployCluster(clusterType)) { + JobConfig jobConfig = new JobConfig(); + String configJson = + jobDataDto.getClusterConfiguration().get("configJson").asText(); + jobConfig.buildGatewayConfig(new JSONObject(configJson).toBean(FlinkClusterConfig.class)); + jobConfig.getGatewayConfig().setType(GatewayType.get(clusterType)); + jobConfig.getGatewayConfig().getFlinkConfig().setJobName(jobInstance.getName()); + Gateway.build(jobConfig.getGatewayConfig()).onJobFinishCallback(jobInstance.getStatus()); + } + } +} diff --git a/dinky-admin/src/main/java/org/dinky/service/JobHistoryService.java b/dinky-admin/src/main/java/org/dinky/service/JobHistoryService.java index 81cdfd4e81..bf568f7526 100644 --- a/dinky-admin/src/main/java/org/dinky/service/JobHistoryService.java +++ b/dinky-admin/src/main/java/org/dinky/service/JobHistoryService.java @@ -19,6 +19,7 @@ package org.dinky.service; +import org.dinky.data.dto.JobDataDto; import org.dinky.data.model.JobHistory; import org.dinky.mybatis.service.ISuperService; @@ -33,7 +34,5 @@ public interface JobHistoryService extends ISuperService { JobHistory getJobHistory(Integer id); - JobHistory getJobHistoryInfo(JobHistory jobHistory); - - JobHistory refreshJobHistory(Integer id, String jobManagerHost, String jobId, boolean needSave); + JobDataDto getJobHistoryDto(Integer id); } diff --git a/dinky-admin/src/main/java/org/dinky/service/JobInstanceService.java b/dinky-admin/src/main/java/org/dinky/service/JobInstanceService.java index 977ae27e3f..f76e3700da 100644 --- a/dinky-admin/src/main/java/org/dinky/service/JobInstanceService.java +++ b/dinky-admin/src/main/java/org/dinky/service/JobInstanceService.java @@ -47,7 +47,7 @@ public interface JobInstanceService extends ISuperService { JobInfoDetail getJobInfoDetailInfo(JobInstance jobInstance); - JobInfoDetail refreshJobInfoDetailInfo(JobInstance jobInstance); + JobInfoDetail refreshJobInfoDetail(Integer jobInstanceId); LineageResult getLineage(Integer id); diff --git a/dinky-admin/src/main/java/org/dinky/service/TaskService.java b/dinky-admin/src/main/java/org/dinky/service/TaskService.java index 777d882506..fea869d4a7 100644 --- a/dinky-admin/src/main/java/org/dinky/service/TaskService.java +++ b/dinky-admin/src/main/java/org/dinky/service/TaskService.java @@ -23,7 +23,6 @@ import org.dinky.data.enums.JobLifeCycle; import org.dinky.data.enums.JobStatus; import org.dinky.data.model.JobInfoDetail; -import org.dinky.data.model.JobInstance; import org.dinky.data.model.JobModelOverview; import org.dinky.data.model.JobTypeOverView; import org.dinky.data.model.Task; @@ -88,10 +87,6 @@ public interface TaskService extends ISuperService { boolean savepointTask(Integer taskId, String savePointType); - JobInstance refreshJobInstance(Integer id, boolean isCoercive); - - JobInfoDetail refreshJobInfoDetail(Integer id); - String getTaskAPIAddress(); Result rollbackTask(TaskRollbackVersionDTO dto); @@ -104,8 +99,6 @@ public interface TaskService extends ISuperService { Result uploadTaskJson(MultipartFile file) throws Exception; - void handleJobDone(JobInstance jobInstance); - Result> queryAllCatalogue(); Result> queryOnLineTaskByDoneStatus( diff --git a/dinky-admin/src/main/java/org/dinky/service/impl/JobHistoryServiceImpl.java b/dinky-admin/src/main/java/org/dinky/service/impl/JobHistoryServiceImpl.java index 4bca3cd315..b85559669e 100644 --- a/dinky-admin/src/main/java/org/dinky/service/impl/JobHistoryServiceImpl.java +++ b/dinky-admin/src/main/java/org/dinky/service/impl/JobHistoryServiceImpl.java @@ -19,22 +19,14 @@ package org.dinky.service.impl; -import org.dinky.api.FlinkAPI; -import org.dinky.assertion.Asserts; -import org.dinky.data.constant.FlinkRestResultConstant; +import org.dinky.data.dto.JobDataDto; import org.dinky.data.model.JobHistory; import org.dinky.mapper.JobHistoryMapper; import org.dinky.mybatis.service.impl.SuperServiceImpl; import org.dinky.service.JobHistoryService; -import org.dinky.utils.JSONUtil; - -import java.util.Objects; import org.springframework.stereotype.Service; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.node.ObjectNode; - import lombok.extern.slf4j.Slf4j; /** @@ -53,79 +45,11 @@ public JobHistory getByIdWithoutTenant(Integer id) { @Override public JobHistory getJobHistory(Integer id) { - return getJobHistoryInfo(baseMapper.getByIdWithoutTenant(id)); - } - - @Override - public JobHistory getJobHistoryInfo(JobHistory jobHistory) { - if (Asserts.isNotNull(jobHistory)) { - if (Asserts.isNotNullString(jobHistory.getJobJson())) { - jobHistory.setJob(JSONUtil.parseObject(jobHistory.getJobJson())); - jobHistory.setJobJson(null); - } - if (Asserts.isNotNullString(jobHistory.getExceptionsJson())) { - jobHistory.setExceptions(JSONUtil.parseObject(jobHistory.getExceptionsJson())); - jobHistory.setExceptionsJson(null); - } - if (Asserts.isNotNullString(jobHistory.getCheckpointsJson())) { - jobHistory.setCheckpoints(JSONUtil.parseObject(jobHistory.getCheckpointsJson())); - jobHistory.setCheckpointsJson(null); - } - if (Asserts.isNotNullString(jobHistory.getCheckpointsConfigJson())) { - jobHistory.setCheckpointsConfig(JSONUtil.parseObject(jobHistory.getCheckpointsConfigJson())); - jobHistory.setCheckpointsConfigJson(null); - } - if (Asserts.isNotNullString(jobHistory.getConfigJson())) { - jobHistory.setConfig(JSONUtil.parseObject(jobHistory.getConfigJson())); - jobHistory.setConfigJson(null); - } - if (Asserts.isNotNullString(jobHistory.getJarJson())) { - jobHistory.setJar(JSONUtil.parseObject(jobHistory.getJarJson())); - jobHistory.setJarJson(null); - } - if (Asserts.isNotNullString(jobHistory.getClusterJson())) { - jobHistory.setCluster(JSONUtil.parseObject(jobHistory.getClusterJson())); - jobHistory.setClusterJson(null); - } - if (Asserts.isNotNullString(jobHistory.getClusterConfigurationJson())) { - jobHistory.setClusterConfiguration(JSONUtil.parseObject(jobHistory.getClusterConfigurationJson())); - jobHistory.setClusterConfigurationJson(null); - } - } - return jobHistory; + return baseMapper.getByIdWithoutTenant(id); } @Override - public JobHistory refreshJobHistory(Integer id, String jobManagerHost, String jobId, boolean needSave) { - JobHistory jobHistory = new JobHistory(); - jobHistory.setId(id); - try { - JsonNode jobInfo = FlinkAPI.build(jobManagerHost).getJobInfo(jobId); - if (jobInfo.has(FlinkRestResultConstant.ERRORS)) { - throw new Exception(String.valueOf(jobInfo.get(FlinkRestResultConstant.ERRORS))); - } - JsonNode exception = FlinkAPI.build(jobManagerHost).getException(jobId); - JsonNode checkPoints = FlinkAPI.build(jobManagerHost).getCheckPoints(jobId); - JsonNode checkPointsConfig = FlinkAPI.build(jobManagerHost).getCheckPointsConfig(jobId); - JsonNode jobsConfig = FlinkAPI.build(jobManagerHost).getJobsConfig(jobId); - jobHistory.setJob((ObjectNode) jobInfo); - jobHistory.setCheckpoints((ObjectNode) checkPoints); - jobHistory.setJobJson(JSONUtil.toJsonString(jobInfo)); - jobHistory.setExceptionsJson(JSONUtil.toJsonString(exception)); - jobHistory.setCheckpointsJson(JSONUtil.toJsonString(checkPoints)); - jobHistory.setCheckpointsConfigJson(JSONUtil.toJsonString(checkPointsConfig)); - jobHistory.setConfigJson(JSONUtil.toJsonString(jobsConfig)); - if (needSave) { - updateById(jobHistory); - } - } catch (Exception e) { - final JobHistory dbHistory = getById(id); - if (Objects.nonNull(dbHistory)) { - jobHistory = dbHistory; - } - jobHistory.setError(true); - log.error("Connect {} failed,{}", jobManagerHost, e.getMessage()); - } - return jobHistory; + public JobDataDto getJobHistoryDto(Integer id) { + return JobDataDto.fromJobHistory(getJobHistory(id)); } } diff --git a/dinky-admin/src/main/java/org/dinky/service/impl/JobInstanceServiceImpl.java b/dinky-admin/src/main/java/org/dinky/service/impl/JobInstanceServiceImpl.java index 25377f5dac..ae1650b2e6 100644 --- a/dinky-admin/src/main/java/org/dinky/service/impl/JobInstanceServiceImpl.java +++ b/dinky-admin/src/main/java/org/dinky/service/impl/JobInstanceServiceImpl.java @@ -21,8 +21,13 @@ import org.dinky.assertion.Asserts; import org.dinky.context.TenantContextHolder; +import org.dinky.daemon.task.DaemonFactory; +import org.dinky.daemon.task.DaemonTaskConfig; +import org.dinky.data.dto.JobDataDto; import org.dinky.data.enums.JobStatus; import org.dinky.data.enums.Status; +import org.dinky.data.model.Cluster; +import org.dinky.data.model.ClusterConfiguration; import org.dinky.data.model.History; import org.dinky.data.model.JobInfoDetail; import org.dinky.data.model.JobInstance; @@ -31,7 +36,8 @@ import org.dinky.data.result.ProTableResult; import org.dinky.explainer.lineage.LineageBuilder; import org.dinky.explainer.lineage.LineageResult; -import org.dinky.job.FlinkJobTaskPool; +import org.dinky.job.FlinkJobTask; +import org.dinky.job.handler.JobRefeshHandler; import org.dinky.mapper.JobInstanceMapper; import org.dinky.mybatis.service.impl.SuperServiceImpl; import org.dinky.mybatis.util.ProTableUtil; @@ -141,52 +147,33 @@ public JobInfoDetail getJobInfoDetail(Integer id) { @Override public JobInfoDetail getJobInfoDetailInfo(JobInstance jobInstance) { - Asserts.checkNull(jobInstance, Status.JOB_INSTANCE_NOT_EXIST.getMessage()); - String key = jobInstance.getId().toString(); - FlinkJobTaskPool pool = FlinkJobTaskPool.INSTANCE; - if (pool.containsKey(key)) { - return pool.get(key); - } else { - JobInfoDetail jobInfoDetail = new JobInfoDetail(jobInstance.getId()); - jobInfoDetail.setInstance(jobInstance); - jobInfoDetail.setCluster(clusterInstanceService.getById(jobInstance.getClusterId())); - jobInfoDetail.setJobHistory(jobHistoryService.getJobHistory(jobInstance.getId())); - History history = historyService.getById(jobInstance.getHistoryId()); - history.setConfig(JSONUtil.parseObject(history.getConfigJson())); - jobInfoDetail.setHistory(history); - if (Asserts.isNotNull(history.getClusterConfigurationId())) { - jobInfoDetail.setClusterConfiguration( - clusterConfigurationService.getClusterConfigById(history.getClusterConfigurationId())); - } - return jobInfoDetail; - } - } + JobInfoDetail jobInfoDetail = new JobInfoDetail(jobInstance.getId()); - @Override - public JobInfoDetail refreshJobInfoDetailInfo(JobInstance jobInstance) { Asserts.checkNull(jobInstance, Status.JOB_INSTANCE_NOT_EXIST.getMessage()); - JobInfoDetail jobInfoDetail; - FlinkJobTaskPool pool = FlinkJobTaskPool.INSTANCE; - String key = jobInstance.getId().toString(); - - jobInfoDetail = new JobInfoDetail(jobInstance.getId()); jobInfoDetail.setInstance(jobInstance); - jobInfoDetail.setCluster(clusterInstanceService.getById(jobInstance.getClusterId())); - jobInfoDetail.setJobHistory(jobHistoryService.getJobHistory(jobInstance.getId())); - History history = historyService.getById(jobInstance.getHistoryId()); - if (Asserts.isNotNull(history) && Asserts.isNotNull(history.getClusterConfigurationId())) { - history.setConfig(JSONUtil.parseObject(history.getConfigJson())); - jobInfoDetail.setHistory(history); + Cluster cluster = clusterInstanceService.getById(jobInstance.getClusterId()); + jobInfoDetail.setCluster(cluster); - jobInfoDetail.setClusterConfiguration( - clusterConfigurationService.getClusterConfigById(history.getClusterConfigurationId())); - } - if (pool.containsKey(key)) { - pool.refresh(jobInfoDetail); - } else { - pool.put(key, jobInfoDetail); + History history = historyService.getById(jobInstance.getHistoryId()); + history.setConfig(JSONUtil.parseObject(history.getConfigJson())); + jobInfoDetail.setHistory(history); + if (Asserts.isNotNull(history.getClusterConfigurationId())) { + ClusterConfiguration clusterConfigById = + clusterConfigurationService.getClusterConfigById(history.getClusterConfigurationId()); + jobInfoDetail.setClusterConfiguration(clusterConfigById); } + + JobDataDto jobDataDto = jobHistoryService.getJobHistoryDto(jobInstance.getId()); + jobInfoDetail.setJobDataDto(jobDataDto); + return jobInfoDetail; + } + + @Override + public JobInfoDetail refreshJobInfoDetail(Integer jobInstanceId) { + JobInfoDetail jobInfoDetail = getJobInfoDetail(jobInstanceId); + JobRefeshHandler.refeshJob(jobInfoDetail, true); + DaemonFactory.addTask(DaemonTaskConfig.build(FlinkJobTask.TYPE, jobInstanceId)); return jobInfoDetail; } @@ -211,21 +198,6 @@ public ProTableResult listJobInstances(JsonNode para) { Map param = mapper.convertValue(para, Map.class); Page page = new Page<>(current, pageSize); List list = baseMapper.selectForProTable(page, queryWrapper, param); - FlinkJobTaskPool pool = FlinkJobTaskPool.INSTANCE; - for (JobInstance jobInstance : list) { - if (pool.containsKey(jobInstance.getId().toString())) { - jobInstance.setStatus( - pool.get(jobInstance.getId().toString()).getInstance().getStatus()); - jobInstance.setUpdateTime( - pool.get(jobInstance.getId().toString()).getInstance().getUpdateTime()); - jobInstance.setFinishTime( - pool.get(jobInstance.getId().toString()).getInstance().getFinishTime()); - jobInstance.setError( - pool.get(jobInstance.getId().toString()).getInstance().getError()); - jobInstance.setDuration( - pool.get(jobInstance.getId().toString()).getInstance().getDuration()); - } - } return ProTableResult.builder() .success(true) .data(list) diff --git a/dinky-admin/src/main/java/org/dinky/service/impl/TaskServiceImpl.java b/dinky-admin/src/main/java/org/dinky/service/impl/TaskServiceImpl.java index 3f9aade623..32cba5e1ed 100644 --- a/dinky-admin/src/main/java/org/dinky/service/impl/TaskServiceImpl.java +++ b/dinky-admin/src/main/java/org/dinky/service/impl/TaskServiceImpl.java @@ -25,10 +25,8 @@ import org.dinky.config.Docker; import org.dinky.context.RowLevelPermissionsContext; import org.dinky.context.TenantContextHolder; -import org.dinky.daemon.task.DaemonFactory; -import org.dinky.daemon.task.DaemonTaskConfig; import org.dinky.data.constant.CommonConstant; -import org.dinky.data.constant.FlinkRestResultConstant; +import org.dinky.data.dto.JobDataDto; import org.dinky.data.dto.SqlDTO; import org.dinky.data.dto.TaskRollbackVersionDTO; import org.dinky.data.dto.TaskVersionConfigureDTO; @@ -43,9 +41,7 @@ import org.dinky.data.model.Cluster; import org.dinky.data.model.ClusterConfiguration; import org.dinky.data.model.DataBase; -import org.dinky.data.model.History; import org.dinky.data.model.Jar; -import org.dinky.data.model.JobHistory; import org.dinky.data.model.JobInfoDetail; import org.dinky.data.model.JobInstance; import org.dinky.data.model.JobModelOverview; @@ -72,8 +68,6 @@ import org.dinky.gateway.model.FlinkClusterConfig; import org.dinky.gateway.model.JobInfo; import org.dinky.gateway.result.SavePointResult; -import org.dinky.job.FlinkJobTask; -import org.dinky.job.FlinkJobTaskPool; import org.dinky.job.Job; import org.dinky.job.JobConfig; import org.dinky.job.JobManager; @@ -103,7 +97,6 @@ import org.dinky.service.UserService; import org.dinky.utils.DockerClientUtils; import org.dinky.utils.FragmentVariableUtils; -import org.dinky.utils.JSONUtil; import org.dinky.utils.UDFUtils; import org.apache.commons.collections4.CollectionUtils; @@ -113,7 +106,6 @@ import java.io.InputStreamReader; import java.io.Reader; import java.nio.charset.StandardCharsets; -import java.time.Duration; import java.time.LocalDateTime; import java.util.ArrayList; import java.util.Arrays; @@ -146,10 +138,11 @@ import cn.hutool.core.lang.tree.TreeNode; import cn.hutool.core.lang.tree.TreeUtil; import cn.hutool.core.util.StrUtil; -import cn.hutool.json.JSONObject; import lombok.RequiredArgsConstructor; -/** TaskServiceImpl */ +/** + * TaskServiceImpl + */ @Service @RequiredArgsConstructor public class TaskServiceImpl extends SuperServiceImpl implements TaskService { @@ -1013,112 +1006,6 @@ private void buildRowPermission() { } } - @Override - public JobInstance refreshJobInstance(Integer id, boolean isCoercive) { - JobInfoDetail jobInfoDetail; - FlinkJobTaskPool pool = FlinkJobTaskPool.INSTANCE; - String key = id.toString(); - - if (pool.containsKey(key)) { - jobInfoDetail = pool.get(key); - } else { - jobInfoDetail = new JobInfoDetail(id); - JobInstance jobInstance = jobInstanceService.getByIdWithoutTenant(id); - Asserts.checkNull(jobInstance, "the task instance not exist."); - TenantContextHolder.set(jobInstance.getTenantId()); - jobInfoDetail.setInstance(jobInstance); - Cluster cluster = clusterInstanceService.getById(jobInstance.getClusterId()); - jobInfoDetail.setCluster(cluster); - History history = historyService.getById(jobInstance.getHistoryId()); - history.setConfig(JSONUtil.parseObject(history.getConfigJson())); - if (Asserts.isNotNull(history.getClusterConfigurationId())) { - ClusterConfiguration clusterConfigById = - clusterCfgService.getClusterConfigById(history.getClusterConfigurationId()); - jobInfoDetail.setClusterConfiguration(clusterConfigById); - jobInfoDetail.getInstance().setType(history.getType()); - } - jobInfoDetail.setHistory(history); - jobInfoDetail.setJobHistory(jobHistoryService.getJobHistory(id)); - pool.put(key, jobInfoDetail); - } - - if (!isCoercive && !inRefreshPlan(jobInfoDetail.getInstance())) { - return jobInfoDetail.getInstance(); - } - - JobHistory jobHistoryJson = jobHistoryService.refreshJobHistory( - id, - jobInfoDetail.getCluster().getJobManagerHost(), - jobInfoDetail.getInstance().getJid(), - jobInfoDetail.isNeedSave()); - JobHistory jobHistory = jobHistoryService.getJobHistoryInfo(jobHistoryJson); - jobInfoDetail.setJobHistory(jobHistory); - JobStatus checkStatus = null; - if (JobStatus.isDone(jobInfoDetail.getInstance().getStatus()) - && (Asserts.isNull(jobHistory.getJob()) || jobHistory.isError())) { - checkStatus = checkJobStatus(jobInfoDetail); - if (checkStatus.isDone()) { - jobInfoDetail.getInstance().setStatus(checkStatus.getValue()); - jobInstanceService.updateById(jobInfoDetail.getInstance()); - return jobInfoDetail.getInstance(); - } - } - - String status = jobInfoDetail.getInstance().getStatus(); - boolean jobStatusChanged = false; - if (Asserts.isNull(jobInfoDetail.getJobHistory().getJob()) - || jobInfoDetail.getJobHistory().isError()) { - if (Asserts.isNotNull(checkStatus)) { - jobInfoDetail.getInstance().setStatus(checkStatus.getValue()); - } else { - jobInfoDetail.getInstance().setStatus(JobStatus.UNKNOWN.getValue()); - } - } else { - jobInfoDetail - .getInstance() - .setDuration(jobInfoDetail - .getJobHistory() - .getJob() - .get(FlinkRestResultConstant.JOB_DURATION) - .asLong() - / 1000); - jobInfoDetail - .getInstance() - .setStatus(jobInfoDetail - .getJobHistory() - .getJob() - .get(FlinkRestResultConstant.JOB_STATE) - .asText()); - } - if (JobStatus.isDone(jobInfoDetail.getInstance().getStatus()) - && !status.equals(jobInfoDetail.getInstance().getStatus())) { - jobStatusChanged = true; - jobInfoDetail.getInstance().setFinishTime(LocalDateTime.now()); - } - if (isCoercive) { - DaemonFactory.addTask(DaemonTaskConfig.build( - FlinkJobTask.TYPE, jobInfoDetail.getInstance().getId())); - } - if (jobStatusChanged || jobInfoDetail.isNeedSave()) { - jobInstanceService.updateById(jobInfoDetail.getInstance()); - } - pool.refresh(jobInfoDetail); - return jobInfoDetail.getInstance(); - } - - private boolean inRefreshPlan(JobInstance jobInstance) { - return !JobStatus.isDone(jobInstance.getStatus()) - || (Asserts.isNotNull(jobInstance.getFinishTime()) - && Duration.between(jobInstance.getFinishTime(), LocalDateTime.now()) - .toMinutes() - < 1); - } - - @Override - public JobInfoDetail refreshJobInfoDetail(Integer id) { - return jobInstanceService.refreshJobInfoDetailInfo(refreshJobInstance(id, true)); - } - @Override public String getTaskAPIAddress() { return SystemConfiguration.getInstances().getDinkyAddr().getValue(); @@ -1336,44 +1223,6 @@ public String getTaskPathByTaskId(Integer taskId) { return path.toString(); } - @Override - public void handleJobDone(JobInstance jobInstance) { - if (Asserts.isNull(jobInstance.getTaskId()) || Asserts.isNull(jobInstance.getType())) { - return; - } - - Task updateTask = new Task(); - updateTask.setId(jobInstance.getTaskId()); - updateTask.setJobInstanceId(jobInstance.getId()); - - Integer jobInstanceId = jobInstance.getId(); - // 获取任务历史信息 - JobHistory jobHistory = jobHistoryService.getJobHistory(jobInstanceId); - // some job need do something on Done, example flink-kubernets-operator - if (GatewayType.isDeployCluster(jobInstance.getType())) { - JobConfig jobConfig = new JobConfig(); - String configJson = - jobHistory.getClusterConfiguration().get("configJson").asText(); - jobConfig.buildGatewayConfig(new JSONObject(configJson).toBean(FlinkClusterConfig.class)); - jobConfig.getGatewayConfig().setType(GatewayType.get(jobInstance.getType())); - jobConfig.getGatewayConfig().getFlinkConfig().setJobName(jobInstance.getName()); - Gateway.build(jobConfig.getGatewayConfig()).onJobFinishCallback(jobInstance.getStatus()); - } - - if (!JobLifeCycle.ONLINE.equalsValue(jobInstance.getStep())) { - updateById(updateTask); - return; - } - - ObjectNode jsonNodes = jobHistory.getJob(); - if (jsonNodes.has("errors")) { - return; - } - - updateTask.setStep(JobLifeCycle.RELEASE.getValue()); - updateById(updateTask); - } - @Override public Result> queryAllCatalogue() { final LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper() @@ -1468,9 +1317,9 @@ public void selectSavepointOnLineTask(TaskOperatingResult taskOperatingResult) { private void findTheConditionSavePointToOnline( TaskOperatingResult taskOperatingResult, JobInstance jobInstanceByTaskId) { - final JobHistory jobHistory = jobHistoryService.getJobHistory(jobInstanceByTaskId.getId()); + final JobDataDto jobHistory = jobHistoryService.getJobHistoryDto(jobInstanceByTaskId.getId()); if (jobHistory != null) { - final ObjectNode jsonNodes = jobHistory.getCheckpoints(); + final JsonNode jsonNodes = jobHistory.getCheckpoints(); final ArrayNode history = jsonNodes.withArray("history"); if (!history.isEmpty()) { startGoingLiveTask(taskOperatingResult, findTheConditionSavePoint(history)); diff --git a/dinky-alert/dinky-alert-base/src/main/java/org/dinky/alert/Rules/ExceptionRule.java b/dinky-alert/dinky-alert-base/src/main/java/org/dinky/alert/Rules/ExceptionRule.java index 50b76c6efe..c7942da8b6 100644 --- a/dinky-alert/dinky-alert-base/src/main/java/org/dinky/alert/Rules/ExceptionRule.java +++ b/dinky-alert/dinky-alert-base/src/main/java/org/dinky/alert/Rules/ExceptionRule.java @@ -28,7 +28,7 @@ public class ExceptionRule { - private final LoadingCache hisTime; + private final LoadingCache hisTime; public ExceptionRule() { hisTime = CacheBuilder.newBuilder() @@ -44,7 +44,7 @@ public ExceptionRule() { * @param exceptions The exceptions object containing relevant data. * @return True if the operation should be executed, false otherwise. */ - public Boolean isException(String key, ObjectNode exceptions) { + public Boolean isException(Integer key, ObjectNode exceptions) { // If the exception is the same as the previous one, it will not be reported again if (exceptions.get("timestamp") == null) { @@ -56,9 +56,10 @@ public Boolean isException(String key, ObjectNode exceptions) { return false; } hisTime.put(key, timestamp); - if (exceptions.has("root-exception") && exceptions.get("root-exception") != null) { - return true; + if (exceptions.has("root-exception")) { + return !exceptions.get("root-exception").isNull(); + } else { + return false; } - return true; } } diff --git a/dinky-common/src/main/java/org/dinky/data/enums/JobStatus.java b/dinky-common/src/main/java/org/dinky/data/enums/JobStatus.java index 4edba2cd50..ea062204c8 100644 --- a/dinky-common/src/main/java/org/dinky/data/enums/JobStatus.java +++ b/dinky-common/src/main/java/org/dinky/data/enums/JobStatus.java @@ -97,7 +97,6 @@ public static boolean isDone(String value) { case FAILED: case CANCELED: case FINISHED: - case UNKNOWN: return true; default: return false; @@ -109,7 +108,6 @@ public boolean isDone() { case FAILED: case CANCELED: case FINISHED: - case UNKNOWN: return true; default: return false; diff --git a/dinky-common/src/main/java/org/dinky/data/enums/Status.java b/dinky-common/src/main/java/org/dinky/data/enums/Status.java index b964ad3dae..668d579837 100644 --- a/dinky-common/src/main/java/org/dinky/data/enums/Status.java +++ b/dinky-common/src/main/java/org/dinky/data/enums/Status.java @@ -256,6 +256,13 @@ public enum Status { GLOBAL_PARAMS_CHECK_ERROR(90001, "global.params.check.error"), GLOBAL_PARAMS_CHECK_ERROR_VALUE(90002, "global.params.check.error.value"), + /** + * + * Daemon About + * */ + DAEMON_TASK_CONFIG_NOT_EXIST(100001, "daemon.task.config.not.exist"), + DAEMON_TASK_NOT_SUPPORT(100002, "daemon.task.not.support"), + /** * system config */ diff --git a/dinky-common/src/main/java/org/dinky/utils/JSONUtil.java b/dinky-common/src/main/java/org/dinky/utils/JSONUtil.java index 108288b57a..f04fbc944c 100644 --- a/dinky-common/src/main/java/org/dinky/utils/JSONUtil.java +++ b/dinky-common/src/main/java/org/dinky/utils/JSONUtil.java @@ -172,11 +172,15 @@ public static byte[] toJsonByteArray(T obj) { } public static ObjectNode parseObject(String text) { + return (ObjectNode) parseToJsonNode(text); + } + + public static JsonNode parseToJsonNode(String text) { try { - if (text.isEmpty()) { - return parseObject(text, ObjectNode.class); + if (TextUtil.isEmpty(text)) { + return parseObject(text, JsonNode.class); } else { - return (ObjectNode) objectMapper.readTree(text); + return objectMapper.readTree(text); } } catch (Exception e) { throw new RuntimeException("String json deserialization exception.", e); diff --git a/dinky-common/src/main/java/org/dinky/utils/TimeUtil.java b/dinky-common/src/main/java/org/dinky/utils/TimeUtil.java index a7c728d2a8..e8eebc0f54 100644 --- a/dinky-common/src/main/java/org/dinky/utils/TimeUtil.java +++ b/dinky-common/src/main/java/org/dinky/utils/TimeUtil.java @@ -69,7 +69,7 @@ public static String convertTimeToString(LocalDateTime time) { /** * Converts a Long timestamp to a String based on the provided format. * - * @param time The Long timestamp to convert. + * @param time The Long timestamp to convert. * @param formate The desired date and time format. * @return A formatted String representation of the timestamp. */ @@ -93,6 +93,10 @@ public static Long convertTimeToLong(String time) { .toEpochMilli(); } + public static Long localDateTimeToLong(LocalDateTime time) { + return time.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(); + } + /** * Converts a LocalDateTime object to a formatted String in "yyyy-MM-dd HH:mm:ss" format. * @@ -107,7 +111,7 @@ public static String convertDateToString(LocalDateTime localDateTime) { * Converts a LocalDateTime object to a formatted String based on the provided format. * * @param localDateTime The LocalDateTime object to convert. - * @param formate The desired date and time format. + * @param formate The desired date and time format. * @return A formatted String representation of the LocalDateTime. */ public static String convertDateToString(LocalDateTime localDateTime, String formate) { @@ -125,10 +129,20 @@ public static LocalDateTime convertStringToDate(String time) { return convertStringToDate(time, "yyyy-MM-dd HH:mm:ss"); } + /** + * Converts a date and time String to a LocalDateTime object. + * + * @return A LocalDateTime object representing the parsed date and time. + */ + public static LocalDateTime longToLocalDateTime(Long timestamp) { + Instant instant = Instant.ofEpochMilli(timestamp); + return instant.atZone(ZoneId.systemDefault()).toLocalDateTime(); + } + /** * Converts a date and time String to a LocalDateTime object based on the provided format. * - * @param time The date and time String to convert. + * @param time The date and time String to convert. * @param formate The desired date and time format. * @return A LocalDateTime object representing the parsed date and time. */ diff --git a/dinky-common/src/main/resources/i18n/messages_en_US.properties b/dinky-common/src/main/resources/i18n/messages_en_US.properties index f51441f8bb..5c80d72d5a 100644 --- a/dinky-common/src/main/resources/i18n/messages_en_US.properties +++ b/dinky-common/src/main/resources/i18n/messages_en_US.properties @@ -147,6 +147,9 @@ alert.rule.checkpointFail=Checkpoint Failure alert.rule.jobRunException=Job Run Exception alert.rule.checkpointTimeout=Checkpoint Timeout +daemon.task.config.not.exist=the thread task configuration can not be empty +daemon.task.not.support=threaded task types are notsupported: + # system config sys.flink.settings.useRestAPI=Use Rest API diff --git a/dinky-common/src/main/resources/i18n/messages_zh_CN.properties b/dinky-common/src/main/resources/i18n/messages_zh_CN.properties index 4850d96cbe..df6c6cc546 100644 --- a/dinky-common/src/main/resources/i18n/messages_zh_CN.properties +++ b/dinky-common/src/main/resources/i18n/messages_zh_CN.properties @@ -137,7 +137,8 @@ file.upload.failed=文件上传失败, 原因: {0} hdfs.init.failed=内部hdfs信息错误, 原因: {0} hdfs.file.lose= 在工程根目录下没有找到core-site.xml/hdfs-site.xml/yarn-site.xml文件, 请先上传这些文件 - +daemon.task.config.not.exist=线程任务配置不能为空 +daemon.task.not.support=不支持线程任务类型: # dinky-alert alert.rule.jobFail=作业失败 diff --git a/dinky-core/src/main/java/org/dinky/data/constant/FlinkRestResultConstant.java b/dinky-core/src/main/java/org/dinky/data/constant/FlinkRestResultConstant.java index c3fc48b90e..79f34f31cb 100644 --- a/dinky-core/src/main/java/org/dinky/data/constant/FlinkRestResultConstant.java +++ b/dinky-core/src/main/java/org/dinky/data/constant/FlinkRestResultConstant.java @@ -27,6 +27,9 @@ public final class FlinkRestResultConstant { public static final String ERRORS = "errors"; + public static final String ROOT_EXCEPTION = "root-exception"; public static final String JOB_DURATION = "duration"; + public static final String JOB_CREATE_TIME = "start-time"; + public static final String JOB_FINISH_TIME = "end-time"; public static final String JOB_STATE = "state"; } diff --git a/dinky-daemon/src/main/java/org/dinky/daemon/constant/FlinkTaskConstant.java b/dinky-daemon/src/main/java/org/dinky/daemon/constant/FlinkTaskConstant.java index 74d5679702..67fbbc4b0c 100644 --- a/dinky-daemon/src/main/java/org/dinky/daemon/constant/FlinkTaskConstant.java +++ b/dinky-daemon/src/main/java/org/dinky/daemon/constant/FlinkTaskConstant.java @@ -22,7 +22,7 @@ public interface FlinkTaskConstant { /** 检测停顿时间 */ - int TIME_SLEEP = 1000; + int TIME_SLEEP = 1000 * 5; /** 启动线程轮询日志时间,用于设置work等信息 */ int MAX_POLLING_GAP = 1000; diff --git a/dinky-daemon/src/main/java/org/dinky/daemon/entity/TaskQueue.java b/dinky-daemon/src/main/java/org/dinky/daemon/entity/TaskQueue.java index e83c397242..7391b97985 100644 --- a/dinky-daemon/src/main/java/org/dinky/daemon/entity/TaskQueue.java +++ b/dinky-daemon/src/main/java/org/dinky/daemon/entity/TaskQueue.java @@ -21,6 +21,11 @@ import java.util.LinkedList; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + +@Getter +@Slf4j public class TaskQueue { private final LinkedList tasks = new LinkedList<>(); @@ -30,6 +35,8 @@ public class TaskQueue { public void enqueue(T task) { synchronized (lock) { lock.notifyAll(); + // prevent duplicate additions + tasks.remove(task); tasks.addLast(task); } } @@ -40,12 +47,10 @@ public T dequeue() { try { lock.wait(); } catch (InterruptedException e) { - e.printStackTrace(); + log.error(e.getMessage(), e); } } - - T task = tasks.removeFirst(); - return task; + return tasks.removeFirst(); } } diff --git a/dinky-daemon/src/main/java/org/dinky/daemon/entity/TaskWorker.java b/dinky-daemon/src/main/java/org/dinky/daemon/entity/TaskWorker.java index 225aa79893..8713dccd82 100644 --- a/dinky-daemon/src/main/java/org/dinky/daemon/entity/TaskWorker.java +++ b/dinky-daemon/src/main/java/org/dinky/daemon/entity/TaskWorker.java @@ -30,29 +30,42 @@ public class TaskWorker implements Runnable { private volatile boolean running = true; - private TaskQueue queue; + private final TaskQueue queue; - public TaskWorker(TaskQueue queue) { + public TaskWorker(TaskQueue queue) { this.queue = queue; } + /** + * Perform tasks. + *

+ * This method is used to perform tasks. Continuously fetch tasks from the queue + * while the task is running (call the queue.dequeue() method).

+ *

If the task is fetched, try to process the task (call the daemonTask.dealTask() method).

+ *

If the processing task does not complete (returns False), + * the task is put back into the queue again (call the queue.enqueue(daemonTask) method). + *

+ */ @Override public void run() { - // log.info("TaskWorker run"); + log.debug("TaskWorker run:" + Thread.currentThread().getName()); while (running) { DaemonTask daemonTask = queue.dequeue(); if (daemonTask != null) { try { - daemonTask.dealTask(); + boolean done = daemonTask.dealTask(); + if (!done) { + queue.enqueue(daemonTask); + } } catch (Exception e) { - e.printStackTrace(); + log.error(e.getMessage(), e); } } } } public void shutdown() { - // log.info(Thread.currentThread().getName() + "TaskWorker shutdown"); + log.debug(Thread.currentThread().getName() + "TaskWorker shutdown"); running = false; } } diff --git a/dinky-daemon/src/main/java/org/dinky/daemon/task/DaemonFactory.java b/dinky-daemon/src/main/java/org/dinky/daemon/task/DaemonFactory.java index 061bef7c8c..dbc88f12b2 100644 --- a/dinky-daemon/src/main/java/org/dinky/daemon/task/DaemonFactory.java +++ b/dinky-daemon/src/main/java/org/dinky/daemon/task/DaemonFactory.java @@ -24,25 +24,55 @@ import java.util.List; +import lombok.extern.slf4j.Slf4j; + +@Slf4j public class DaemonFactory { + /** + * + *

Start the daemon thread.

+ *

This method accepts a List<{@link org.dinky.daemon.task.DaemonTaskConfig}> parameter to configure daemon tasks. + * Inside the method, it creates a thread and performs the following operations:

+ * + *
    + *
  1. Iterate through each configuration item in configList and construct the corresponding DaemonTask.
  2. + *
  3. Submit each DaemonTask to the thread pool for execution.
  4. + *
  5. Enter an infinite loop where the following actions are performed: + *
      + *
    • Calculate the waiting time based on the task count, ensuring that the polling interval between tasks + * stays within the specified minimum and maximum intervals.
    • + *
    • Calculate the desired number of working threads, num, increasing one working thread for + * every 100 tasks.
    • + *
    • Dynamically adjust the number of working threads in the thread pool based on a comparison between + * the actual number of working threads and the desired quantity.
    • + *
    + *
  6. + *
+ */ public static void start(List configList) { Thread thread = new Thread(() -> { DefaultThreadPool defaultThreadPool = DefaultThreadPool.getInstance(); for (DaemonTaskConfig config : configList) { + // Build a daemon task based on the config and Submit the task to the thread pool for execution DaemonTask daemonTask = DaemonTask.build(config); defaultThreadPool.execute(daemonTask); } + while (true) { int taskSize = defaultThreadPool.getTaskSize(); try { + // where (taskSize + 1) is to avoid dividing by 0 when taskSize is 0. Thread.sleep(Math.max( FlinkTaskConstant.MAX_POLLING_GAP / (taskSize + 1), FlinkTaskConstant.MIN_POLLING_GAP)); } catch (InterruptedException e) { - e.printStackTrace(); + log.error(e.getMessage(), e); } + // Calculate the desired number of worker threads, adding one worker for every 100 tasks int num = taskSize / 100 + 1; + + // Dynamically adjust the number of worker threads in the thread pool if (defaultThreadPool.getWorkCount() < num) { defaultThreadPool.addWorkers(num - defaultThreadPool.getWorkCount()); } else if (defaultThreadPool.getWorkCount() > num) { @@ -53,6 +83,10 @@ public static void start(List configList) { thread.start(); } + /** + * @param config + * add task + * */ public static void addTask(DaemonTaskConfig config) { DefaultThreadPool.getInstance().execute(DaemonTask.build(config)); } diff --git a/dinky-daemon/src/main/java/org/dinky/daemon/task/DaemonTask.java b/dinky-daemon/src/main/java/org/dinky/daemon/task/DaemonTask.java index 39e6917139..4ea7522ea2 100644 --- a/dinky-daemon/src/main/java/org/dinky/daemon/task/DaemonTask.java +++ b/dinky-daemon/src/main/java/org/dinky/daemon/task/DaemonTask.java @@ -21,6 +21,7 @@ import org.dinky.assertion.Asserts; import org.dinky.daemon.exception.DaemonTaskException; +import org.dinky.data.enums.Status; import java.util.Optional; import java.util.ServiceLoader; @@ -28,7 +29,7 @@ public interface DaemonTask { static Optional get(DaemonTaskConfig config) { - Asserts.checkNotNull(config, "线程任务配置不能为空"); + Asserts.checkNotNull(config, Status.DAEMON_TASK_CONFIG_NOT_EXIST.getMessage()); ServiceLoader daemonTasks = ServiceLoader.load(DaemonTask.class); for (DaemonTask daemonTask : daemonTasks) { if (daemonTask.canHandle(config.getType())) { @@ -41,10 +42,9 @@ static Optional get(DaemonTaskConfig config) { static DaemonTask build(DaemonTaskConfig config) { Optional optionalDaemonTask = DaemonTask.get(config); if (!optionalDaemonTask.isPresent()) { - throw new DaemonTaskException("不支持线程任务类型【" + config.getType() + "】"); + throw new DaemonTaskException(Status.DAEMON_TASK_NOT_SUPPORT.getMessage() + config.getType()); } - DaemonTask daemonTask = optionalDaemonTask.get(); - return daemonTask; + return optionalDaemonTask.get(); } DaemonTask setConfig(DaemonTaskConfig config); @@ -55,5 +55,5 @@ default boolean canHandle(String type) { String getType(); - void dealTask(); + boolean dealTask(); } diff --git a/dinky-daemon/src/main/java/org/dinky/daemon/task/DaemonTaskConfig.java b/dinky-daemon/src/main/java/org/dinky/daemon/task/DaemonTaskConfig.java index 9f4daa5c2c..89a62da69d 100644 --- a/dinky-daemon/src/main/java/org/dinky/daemon/task/DaemonTaskConfig.java +++ b/dinky-daemon/src/main/java/org/dinky/daemon/task/DaemonTaskConfig.java @@ -19,12 +19,13 @@ package org.dinky.daemon.task; -public class DaemonTaskConfig { +import lombok.Getter; - private String type; - private Integer id; +@Getter +public class DaemonTaskConfig { - public DaemonTaskConfig() {} + private final String type; + private final Integer id; public DaemonTaskConfig(String type, Integer id) { this.type = type; @@ -34,20 +35,4 @@ public DaemonTaskConfig(String type, Integer id) { public static DaemonTaskConfig build(String type, Integer id) { return new DaemonTaskConfig(type, id); } - - public String getType() { - return type; - } - - public void setType(String type) { - this.type = type; - } - - public Integer getId() { - return id; - } - - public void setId(Integer id) { - this.id = id; - } } diff --git a/dinky-admin/src/main/java/org/dinky/job/FlinkJobTaskPool.java b/dinky-gateway/src/main/java/org/dinky/gateway/exception/NotSupportGetStatusException.java similarity index 67% rename from dinky-admin/src/main/java/org/dinky/job/FlinkJobTaskPool.java rename to dinky-gateway/src/main/java/org/dinky/gateway/exception/NotSupportGetStatusException.java index 98d376d072..fbc0b5d41b 100644 --- a/dinky-admin/src/main/java/org/dinky/job/FlinkJobTaskPool.java +++ b/dinky-gateway/src/main/java/org/dinky/gateway/exception/NotSupportGetStatusException.java @@ -17,22 +17,15 @@ * */ -package org.dinky.job; +package org.dinky.gateway.exception; -import org.dinky.data.model.JobInfoDetail; -import org.dinky.pool.AbstractPool; +public class NotSupportGetStatusException extends RuntimeException { -/** - * FlinkJobTaskPool - * - * @since 2022/5/28 16:39 - */ -public class FlinkJobTaskPool extends AbstractPool { - - public static final FlinkJobTaskPool INSTANCE = new FlinkJobTaskPool(); + public NotSupportGetStatusException(String message, Throwable cause) { + super(message, cause); + } - @Override - public void refresh(JobInfoDetail entity) { - entity.refresh(); + public NotSupportGetStatusException(String message) { + super(message); } } diff --git a/dinky-web/src/pages/DevOps/JobDetail/CheckPointsTab/components/CheckpointTable.tsx b/dinky-web/src/pages/DevOps/JobDetail/CheckPointsTab/components/CheckpointTable.tsx index b8e1f215fc..fca3a332b2 100644 --- a/dinky-web/src/pages/DevOps/JobDetail/CheckPointsTab/components/CheckpointTable.tsx +++ b/dinky-web/src/pages/DevOps/JobDetail/CheckPointsTab/components/CheckpointTable.tsx @@ -49,7 +49,7 @@ const CheckpointTable = (props: JobProps) => { const actionRef = useRef(); - const checkpoints = jobDetail?.jobHistory?.checkpoints; + const checkpoints = jobDetail?.jobDataDto?.checkpoints; // const restartFromCheckpoint = useRequest((id: number, isOnLine: boolean, savePointPath: string) => ( // { @@ -178,7 +178,7 @@ const CheckpointTable = (props: JobProps) => { columns={columns} style={{ width: '100%' }} dataSource={checkpoints?.history} - onDataSourceChange={(_) => actionRef.current?.reload()} + onDataSourceChange={() => actionRef.current?.reload()} actionRef={actionRef} rowKey='id' pagination={{ diff --git a/dinky-web/src/pages/DevOps/JobDetail/CheckPointsTab/components/CkDesc.tsx b/dinky-web/src/pages/DevOps/JobDetail/CheckPointsTab/components/CkDesc.tsx index 5701a7a1c6..8a64fce621 100644 --- a/dinky-web/src/pages/DevOps/JobDetail/CheckPointsTab/components/CkDesc.tsx +++ b/dinky-web/src/pages/DevOps/JobDetail/CheckPointsTab/components/CkDesc.tsx @@ -30,9 +30,9 @@ import { Descriptions, Tag } from 'antd'; const CkDesc = (props: JobProps) => { const { jobDetail } = props; - let counts = jobDetail?.jobHistory?.checkpoints.counts; - let latest = jobDetail?.jobHistory?.checkpoints.latest; - let checkpointsConfigInfo = jobDetail?.jobHistory?.checkpointsConfig; + let counts = jobDetail?.jobDataDto?.checkpoints.counts; + let latest = jobDetail?.jobDataDto?.checkpoints.latest; + let checkpointsConfigInfo = jobDetail?.jobDataDto?.checkpointsConfig; return ( <> diff --git a/dinky-web/src/pages/DevOps/JobDetail/JobLogs/components/ExceptionTab.tsx b/dinky-web/src/pages/DevOps/JobDetail/JobLogs/components/ExceptionTab.tsx index b9af914e4c..9a07176dcf 100644 --- a/dinky-web/src/pages/DevOps/JobDetail/JobLogs/components/ExceptionTab.tsx +++ b/dinky-web/src/pages/DevOps/JobDetail/JobLogs/components/ExceptionTab.tsx @@ -46,13 +46,13 @@ const ExceptionTab = (props: JobProps) => { const renderLogTab = () => { let logs = []; - const rte = jobDetail?.jobHistory?.exceptions['root-exception']; + const rte = jobDetail?.jobDataDto?.exceptions['root-exception']; logs.push({ taskName: 'RootException', stacktrace: rte, exceptionName: rte }); - logs.push(...jobDetail.jobHistory?.exceptions['exceptionHistory']['entries']); + logs.push(...jobDetail.jobDataDto?.exceptions['exceptionHistory']['entries']); return ( diff --git a/dinky-web/src/pages/DevOps/JobDetail/JobMetrics/MetricsFilter/MetricsConfigForm.tsx b/dinky-web/src/pages/DevOps/JobDetail/JobMetrics/MetricsFilter/MetricsConfigForm.tsx index c0e262aa5f..2f64fc9ae1 100644 --- a/dinky-web/src/pages/DevOps/JobDetail/JobMetrics/MetricsFilter/MetricsConfigForm.tsx +++ b/dinky-web/src/pages/DevOps/JobDetail/JobMetrics/MetricsFilter/MetricsConfigForm.tsx @@ -61,7 +61,7 @@ const MetricsConfigForm = (props: any) => { }); }; - const itemTabs = jobDetail?.jobHistory?.job?.vertices?.map((item: any) => { + const itemTabs = jobDetail?.jobDataDto?.job?.vertices?.map((item: any) => { return { key: item.id, label: item.name, diff --git a/dinky-web/src/pages/DevOps/JobDetail/JobMetrics/MetricsFilter/MetricsConfigTab.tsx b/dinky-web/src/pages/DevOps/JobDetail/JobMetrics/MetricsFilter/MetricsConfigTab.tsx index 25803da3f9..e640672e49 100644 --- a/dinky-web/src/pages/DevOps/JobDetail/JobMetrics/MetricsFilter/MetricsConfigTab.tsx +++ b/dinky-web/src/pages/DevOps/JobDetail/JobMetrics/MetricsFilter/MetricsConfigTab.tsx @@ -27,7 +27,7 @@ import { Transfer } from 'antd'; const MetricsConfigTab = (props: any) => { const { vertice, onValueChange, jobDetail, metricsTarget } = props; const jobManagerUrl = jobDetail.cluster?.jobManagerHost; - const jobId = jobDetail.jobHistory.job?.jid; + const jobId = jobDetail.jobDataDto.job?.jid; const { data } = useRequest({ url: API_CONSTANTS.GET_JOB_MERTICE_ITEMS, diff --git a/dinky-web/src/pages/DevOps/JobDetail/JobOverview/components/FlinkTable.tsx b/dinky-web/src/pages/DevOps/JobDetail/JobOverview/components/FlinkTable.tsx index cf2919f815..1cb97587a7 100644 --- a/dinky-web/src/pages/DevOps/JobDetail/JobOverview/components/FlinkTable.tsx +++ b/dinky-web/src/pages/DevOps/JobDetail/JobOverview/components/FlinkTable.tsx @@ -104,7 +104,7 @@ const FlinkTable = (props: JobProps): JSX.Element => { { - {jobDetail?.jobHistory?.config?.['execution-config']?.['restart-strategy']} + {jobDetail?.jobDataDto?.config?.['execution-config']?.['restart-strategy']} @@ -116,7 +116,7 @@ const JobDesc = (props: JobProps) => { - {jobDetail?.jobHistory?.config?.['execution-config']?.['job-parallelism']} + {jobDetail?.jobDataDto?.config?.['execution-config']?.['job-parallelism']} diff --git a/dinky-web/src/types/DevOps/data.d.ts b/dinky-web/src/types/DevOps/data.d.ts index 901ad9af2f..28f7581c73 100644 --- a/dinky-web/src/types/DevOps/data.d.ts +++ b/dinky-web/src/types/DevOps/data.d.ts @@ -97,7 +97,7 @@ declare namespace Jobs { 'status-counts': {}; plan: {}; }; - export type JobHistoryItem = { + export type JobDataDtoItem = { id: number; job: Job; exceptions: any; @@ -116,7 +116,7 @@ declare namespace Jobs { cluster: any; clusterConfiguration: any; history: History; - jobHistory: JobHistoryItem; + jobDataDto: JobDataDtoItem; jobManagerConfiguration: any; taskManagerConfiguration: any; }; diff --git a/script/sql/upgrade/1.0.0-SNAPSHOT_schema/mysql/dinky_dml.sql b/script/sql/upgrade/1.0.0-SNAPSHOT_schema/mysql/dinky_dml.sql index 135defb61c..a4331f61f5 100644 --- a/script/sql/upgrade/1.0.0-SNAPSHOT_schema/mysql/dinky_dml.sql +++ b/script/sql/upgrade/1.0.0-SNAPSHOT_schema/mysql/dinky_dml.sql @@ -169,7 +169,7 @@ INSERT INTO dinky_alert_template (id, name, template_content, enabled, create_ti - **Alert Time :** ${time} - **Start Time :** ${startTime} - **End Time :** ${endTime} -- **${exceptions.get("root-exception").toString()?substring(0,20)}** +- **<#if exceptions_msg?length gt 100>${exceptions_msg?substring(0,100)}<#else>${exceptions_msg}** [Go toTask Web](http://${taskUrl}) ', 1, null, null);