From 48c8f991f6c066b04aab30fc801cbb31662f807b Mon Sep 17 00:00:00 2001 From: zixi0825 Date: Fri, 3 May 2024 21:22:59 +0800 Subject: [PATCH] [Fix][Server] Fix the errot when execute job through restapi --- .../datavines/runner/JobExecuteBootstrap.java | 2 +- .../operator/JobResultValidator.java | 103 ++++++++++-------- 2 files changed, 61 insertions(+), 44 deletions(-) diff --git a/datavines-runner/src/main/java/io/datavines/runner/JobExecuteBootstrap.java b/datavines-runner/src/main/java/io/datavines/runner/JobExecuteBootstrap.java index 732184936..a143d8e4c 100644 --- a/datavines-runner/src/main/java/io/datavines/runner/JobExecuteBootstrap.java +++ b/datavines-runner/src/main/java/io/datavines/runner/JobExecuteBootstrap.java @@ -67,7 +67,7 @@ public static void main(String[] args) { JobExecutionRequest jobExecutionRequest = new JobExecutionRequest(); jobExecutionRequest.setJobExecutionName(submitJob.getName()); jobExecutionRequest.setJobExecutionId(id); - jobExecutionRequest.setJobExecutionUniqueId(submitJob.getName()+"_"+ id); + jobExecutionRequest.setJobExecutionUniqueId(submitJob.getName() + "_" + id); jobExecutionRequest.setApplicationParameter(JSONUtils.toJsonString(qualityConfig)); jobExecutionRequest.setEngineType(submitJob.getEngineType()); jobExecutionRequest.setEngineParameter(JSONUtils.toJsonString(submitJob.getEngineParameter())); diff --git a/datavines-server/src/main/java/io/datavines/server/dqc/coordinator/operator/JobResultValidator.java b/datavines-server/src/main/java/io/datavines/server/dqc/coordinator/operator/JobResultValidator.java index 30f12a25b..72f5f3592 100644 --- a/datavines-server/src/main/java/io/datavines/server/dqc/coordinator/operator/JobResultValidator.java +++ b/datavines-server/src/main/java/io/datavines/server/dqc/coordinator/operator/JobResultValidator.java @@ -17,10 +17,10 @@ package io.datavines.server.dqc.coordinator.operator; import io.datavines.common.entity.JobExecutionRequest; -import io.datavines.common.enums.ExecutionStatus; import io.datavines.common.enums.OperatorType; import io.datavines.common.utils.JSONUtils; import io.datavines.common.utils.ParameterUtils; +import io.datavines.common.utils.StringUtils; import io.datavines.core.utils.LanguageUtils; import io.datavines.metric.api.*; import io.datavines.notification.api.entity.SlaConfigMessage; @@ -73,25 +73,29 @@ public void operateDqExecuteResult(JobExecutionRequest jobExecutionRequest) { List jobExecutionResultList = jobExternalService.listJobExecutionResultByJobExecutionId(jobExecutionRequest.getJobExecutionId()); - if (CollectionUtils.isNotEmpty(jobExecutionResultList)) { - boolean isSuccess = true; - Long jobExecutionId = 0L; - for (JobExecutionResult jobExecutionResult : jobExecutionResultList) { - if (jobExecutionResult != null) { - jobExecutionId = jobExecutionResult.getJobExecutionId(); - // 判断期望值是否为空,如果为空并且不是固定值类型,则将期望值设置为实际值 - if (jobExecutionResult.getExpectedValue() == null && !FIX_VALUE.equalsIgnoreCase(jobExecutionResult.getExpectedType())) { - jobExecutionResult.setExpectedValue(jobExecutionResult.getActualValue()); - jobExternalService.getJobExecutionResultService().updateById(jobExecutionResult); - } - //check the result ,if result is failure do some operator by failure strategy - isSuccess &= checkDqExecuteResult(jobExecutionResult); - } + if (CollectionUtils.isEmpty(jobExecutionResultList)) { + return; + } + + boolean isSuccess = true; + Long jobExecutionId = 0L; + for (JobExecutionResult jobExecutionResult : jobExecutionResultList) { + if (jobExecutionResult == null) { + continue; } - if (!isSuccess) { - sendErrorEmail(jobExecutionId); + jobExecutionId = jobExecutionResult.getJobExecutionId(); + // 判断期望值是否为空,如果为空并且不是固定值类型,则将期望值设置为实际值 + if (jobExecutionResult.getExpectedValue() == null && !FIX_VALUE.equalsIgnoreCase(jobExecutionResult.getExpectedType())) { + jobExecutionResult.setExpectedValue(jobExecutionResult.getActualValue()); + jobExternalService.getJobExecutionResultService().updateById(jobExecutionResult); } + //check the result ,if result is failure do some operator by failure strategy + isSuccess &= checkDqExecuteResult(jobExecutionResult); + } + + if (!isSuccess) { + sendErrorEmail(jobExecutionId); } } @@ -120,34 +124,47 @@ private void sendErrorEmail(Long jobExecutionId) { SlaNotificationMessage message = new SlaNotificationMessage(); JobExecution jobExecution = jobExternalService.getJobExecutionById(jobExecutionId); Long jobId = jobExecution.getJobId(); - JobService jobService = jobExternalService.getJobService(); - Job job = jobService.getById(jobId); - String jobName = job.getName(); - Long dataSourceId = job.getDataSourceId(); - DataSource dataSource = jobExternalService.getDataSourceService().getDataSourceById(dataSourceId); - String dataSourceName = dataSource.getName(); - String dataSourceType = dataSource.getType(); + String jobName; + String dataSourceName = null; + String dataSourceType = null; + if (jobId == -1L) { + jobName = jobExecution.getName(); + } else { + JobService jobService = jobExternalService.getJobService(); + Job job = jobService.getById(jobId); + jobName = job.getName(); + Long dataSourceId = job.getDataSourceId(); + DataSource dataSource = jobExternalService.getDataSourceService().getDataSourceById(dataSourceId); + dataSourceName = dataSource.getName(); + dataSourceType = dataSource.getType(); + } + List errorJobExecutionResultList = jobExternalService.listErrorJobExecutionResultByJobExecutionId(jobExecution.getId()); + if (CollectionUtils.isEmpty(errorJobExecutionResultList)) { + return; + } + boolean isEn = !LanguageUtils.isZhContext(); - if (CollectionUtils.isNotEmpty(errorJobExecutionResultList)) { - for (JobExecutionResult errorJobExecutionResult : errorJobExecutionResultList) { - MetricExecutionResult metricExecutionResult = new MetricExecutionResult(); - BeanUtils.copyProperties(errorJobExecutionResult, metricExecutionResult); - List messages = new ArrayList<>(); - messages.add((isEn ? "Job Name : ": "作业名称: ") + jobName); + for (JobExecutionResult errorJobExecutionResult : errorJobExecutionResultList) { + MetricExecutionResult metricExecutionResult = new MetricExecutionResult(); + BeanUtils.copyProperties(errorJobExecutionResult, metricExecutionResult); + List messages = new ArrayList<>(); + messages.add((isEn ? "Job Name : ": "作业名称: ") + jobName); + if (StringUtils.isNotEmpty(dataSourceType)) { messages.add(String.format((isEn ? "Datasource : %s [%s] : ": "数据源 : %s [%s]: ") ,dataSourceType.toUpperCase(), dataSourceName)); - String title = buildAlertSubject(metricExecutionResult, isEn); - String content = buildAlertMessage(messages, metricExecutionResult, jobExecution.getEngineType(), isEn); - message.setSubject(title); - message.setMessage(content); - saveIssue(jobId, title, content); - Map> config = slaNotificationService.getSlasNotificationConfigurationByJobId(jobId); - if (config.isEmpty()){ - return; - } - notificationClient.notify(message, config); } + String title = buildAlertSubject(metricExecutionResult, isEn); + String content = buildAlertMessage(messages, metricExecutionResult, jobExecution.getEngineType(), isEn); + message.setSubject(title); + message.setMessage(content); + saveIssue(jobId, title, content); + Map> config = slaNotificationService.getSlasNotificationConfigurationByJobId(jobId); + if (config.isEmpty()){ + return; + } + notificationClient.notify(message, config); } + } catch (Exception e) { log.error("send email error: ", e); } @@ -155,9 +172,9 @@ private void sendErrorEmail(Long jobExecutionId) { private String buildAlertMessage(List messages, MetricExecutionResult metricExecutionResult, String engineType, boolean isEn) { Map parameters = new HashMap<>(); - parameters.put("actual_value", metricExecutionResult.getActualValue()+""); - parameters.put("expected_value", metricExecutionResult.getExpectedValue()+""); - parameters.put("threshold", metricExecutionResult.getThreshold()+""); + parameters.put("actual_value", String.valueOf(metricExecutionResult.getActualValue())); + parameters.put("expected_value", String.valueOf(metricExecutionResult.getExpectedValue())); + parameters.put("threshold", String.valueOf(metricExecutionResult.getThreshold())); parameters.put("operator",OperatorType.of(metricExecutionResult.getOperator()).getSymbol()); SqlMetric sqlMetric = PluginLoader.getPluginLoader(SqlMetric.class).getOrCreatePlugin(metricExecutionResult.getMetricName());