Skip to content

Commit

Permalink
[Fix][Server] Fix the error when execute job through restapi (#395)
Browse files Browse the repository at this point in the history
  • Loading branch information
zixi0825 authored May 4, 2024
1 parent 70557a1 commit 7439f40
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public static void main(String[] args) {
JobExecutionRequest jobExecutionRequest = new JobExecutionRequest();
jobExecutionRequest.setJobExecutionName(submitJob.getName());
jobExecutionRequest.setJobExecutionId(id);
jobExecutionRequest.setJobExecutionUniqueId(submitJob.getName()+"_"+ id);
jobExecutionRequest.setJobExecutionUniqueId(submitJob.getName() + "_" + id);
jobExecutionRequest.setApplicationParameter(JSONUtils.toJsonString(qualityConfig));
jobExecutionRequest.setEngineType(submitJob.getEngineType());
jobExecutionRequest.setEngineParameter(JSONUtils.toJsonString(submitJob.getEngineParameter()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@
package io.datavines.server.dqc.coordinator.operator;

import io.datavines.common.entity.JobExecutionRequest;
import io.datavines.common.enums.ExecutionStatus;
import io.datavines.common.enums.OperatorType;
import io.datavines.common.utils.JSONUtils;
import io.datavines.common.utils.ParameterUtils;
import io.datavines.common.utils.StringUtils;
import io.datavines.core.utils.LanguageUtils;
import io.datavines.metric.api.*;
import io.datavines.notification.api.entity.SlaConfigMessage;
Expand Down Expand Up @@ -73,25 +73,29 @@ public void operateDqExecuteResult(JobExecutionRequest jobExecutionRequest) {
List<JobExecutionResult> jobExecutionResultList =
jobExternalService.listJobExecutionResultByJobExecutionId(jobExecutionRequest.getJobExecutionId());

if (CollectionUtils.isNotEmpty(jobExecutionResultList)) {
boolean isSuccess = true;
Long jobExecutionId = 0L;
for (JobExecutionResult jobExecutionResult : jobExecutionResultList) {
if (jobExecutionResult != null) {
jobExecutionId = jobExecutionResult.getJobExecutionId();
// 判断期望值是否为空,如果为空并且不是固定值类型,则将期望值设置为实际值
if (jobExecutionResult.getExpectedValue() == null && !FIX_VALUE.equalsIgnoreCase(jobExecutionResult.getExpectedType())) {
jobExecutionResult.setExpectedValue(jobExecutionResult.getActualValue());
jobExternalService.getJobExecutionResultService().updateById(jobExecutionResult);
}
//check the result ,if result is failure do some operator by failure strategy
isSuccess &= checkDqExecuteResult(jobExecutionResult);
}
if (CollectionUtils.isEmpty(jobExecutionResultList)) {
return;
}

boolean isSuccess = true;
Long jobExecutionId = 0L;
for (JobExecutionResult jobExecutionResult : jobExecutionResultList) {
if (jobExecutionResult == null) {
continue;
}

if (!isSuccess) {
sendErrorEmail(jobExecutionId);
jobExecutionId = jobExecutionResult.getJobExecutionId();
// 判断期望值是否为空,如果为空并且不是固定值类型,则将期望值设置为实际值
if (jobExecutionResult.getExpectedValue() == null && !FIX_VALUE.equalsIgnoreCase(jobExecutionResult.getExpectedType())) {
jobExecutionResult.setExpectedValue(jobExecutionResult.getActualValue());
jobExternalService.getJobExecutionResultService().updateById(jobExecutionResult);
}
//check the result ,if result is failure do some operator by failure strategy
isSuccess &= checkDqExecuteResult(jobExecutionResult);
}

if (!isSuccess) {
sendErrorEmail(jobExecutionId);
}
}

Expand Down Expand Up @@ -120,44 +124,57 @@ private void sendErrorEmail(Long jobExecutionId) {
SlaNotificationMessage message = new SlaNotificationMessage();
JobExecution jobExecution = jobExternalService.getJobExecutionById(jobExecutionId);
Long jobId = jobExecution.getJobId();
JobService jobService = jobExternalService.getJobService();
Job job = jobService.getById(jobId);
String jobName = job.getName();
Long dataSourceId = job.getDataSourceId();
DataSource dataSource = jobExternalService.getDataSourceService().getDataSourceById(dataSourceId);
String dataSourceName = dataSource.getName();
String dataSourceType = dataSource.getType();
String jobName;
String dataSourceName = null;
String dataSourceType = null;
if (jobId == -1L) {
jobName = jobExecution.getName();
} else {
JobService jobService = jobExternalService.getJobService();
Job job = jobService.getById(jobId);
jobName = job.getName();
Long dataSourceId = job.getDataSourceId();
DataSource dataSource = jobExternalService.getDataSourceService().getDataSourceById(dataSourceId);
dataSourceName = dataSource.getName();
dataSourceType = dataSource.getType();
}

List<JobExecutionResult> errorJobExecutionResultList = jobExternalService.listErrorJobExecutionResultByJobExecutionId(jobExecution.getId());
if (CollectionUtils.isEmpty(errorJobExecutionResultList)) {
return;
}

boolean isEn = !LanguageUtils.isZhContext();
if (CollectionUtils.isNotEmpty(errorJobExecutionResultList)) {
for (JobExecutionResult errorJobExecutionResult : errorJobExecutionResultList) {
MetricExecutionResult metricExecutionResult = new MetricExecutionResult();
BeanUtils.copyProperties(errorJobExecutionResult, metricExecutionResult);
List<String> messages = new ArrayList<>();
messages.add((isEn ? "Job Name : ": "作业名称: ") + jobName);
for (JobExecutionResult errorJobExecutionResult : errorJobExecutionResultList) {
MetricExecutionResult metricExecutionResult = new MetricExecutionResult();
BeanUtils.copyProperties(errorJobExecutionResult, metricExecutionResult);
List<String> messages = new ArrayList<>();
messages.add((isEn ? "Job Name : ": "作业名称: ") + jobName);
if (StringUtils.isNotEmpty(dataSourceType)) {
messages.add(String.format((isEn ? "Datasource : %s [%s] : ": "数据源 : %s [%s]: ") ,dataSourceType.toUpperCase(), dataSourceName));
String title = buildAlertSubject(metricExecutionResult, isEn);
String content = buildAlertMessage(messages, metricExecutionResult, jobExecution.getEngineType(), isEn);
message.setSubject(title);
message.setMessage(content);
saveIssue(jobId, title, content);
Map<SlaSenderMessage, Set<SlaConfigMessage>> config = slaNotificationService.getSlasNotificationConfigurationByJobId(jobId);
if (config.isEmpty()){
return;
}
notificationClient.notify(message, config);
}
String title = buildAlertSubject(metricExecutionResult, isEn);
String content = buildAlertMessage(messages, metricExecutionResult, jobExecution.getEngineType(), isEn);
message.setSubject(title);
message.setMessage(content);
saveIssue(jobId, title, content);
Map<SlaSenderMessage, Set<SlaConfigMessage>> config = slaNotificationService.getSlasNotificationConfigurationByJobId(jobId);
if (config.isEmpty()){
return;
}
notificationClient.notify(message, config);
}

} catch (Exception e) {
log.error("send email error: ", e);
}
}

private String buildAlertMessage(List<String> messages, MetricExecutionResult metricExecutionResult, String engineType, boolean isEn) {
Map<String,String> parameters = new HashMap<>();
parameters.put("actual_value", metricExecutionResult.getActualValue()+"");
parameters.put("expected_value", metricExecutionResult.getExpectedValue()+"");
parameters.put("threshold", metricExecutionResult.getThreshold()+"");
parameters.put("actual_value", String.valueOf(metricExecutionResult.getActualValue()));
parameters.put("expected_value", String.valueOf(metricExecutionResult.getExpectedValue()));
parameters.put("threshold", String.valueOf(metricExecutionResult.getThreshold()));
parameters.put("operator",OperatorType.of(metricExecutionResult.getOperator()).getSymbol());

SqlMetric sqlMetric = PluginLoader.getPluginLoader(SqlMetric.class).getOrCreatePlugin(metricExecutionResult.getMetricName());
Expand Down

0 comments on commit 7439f40

Please sign in to comment.