Skip to content

Commit

Permalink
[Fix][Runner] Fixed Multiple metric are configured for a Job By Local…
Browse files Browse the repository at this point in the history
… mode, resulting in abnormal alarm information. (#375)
  • Loading branch information
xxzuo authored Mar 8, 2024
1 parent cf7a5bd commit 9cbcd77
Showing 1 changed file with 41 additions and 37 deletions.
78 changes: 41 additions & 37 deletions datavines-runner/src/main/java/io/datavines/runner/JobRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -99,46 +99,50 @@ public void run() {
Map<String,String> scriptConfigMap = new HashMap<>();
scriptConfigMap.put("execution_id", String.valueOf(jobExecutionId));
executeRequestParam.setScript(validateResultStorageFactory.getDialect().getValidateResultDataScript(scriptConfigMap));
ConnectorResponse response = validateResultStorageFactory.getExecutor().queryForOne(executeRequestParam);
ConnectorResponse response = validateResultStorageFactory.getExecutor().queryForList(executeRequestParam);
if (response != null && response.getResult()!= null) {
ListWithQueryColumn validateResultDataList = (ListWithQueryColumn)response.getResult();
Map<String, Object> validateResultData = validateResultDataList.getResultList().get(0);
MetricExecutionResult metricExecutionResult = new MetricExecutionResult(validateResultData);
if (!MetricValidator.isSuccess(metricExecutionResult)) {
if (StringUtils.isEmpty(jobExecutionRequest.getNotificationParameters())) {
log.warn("notification parameter is null");
return;
}

List<NotificationParameter> notificationParameters =
JSONUtils.toList(jobExecutionRequest.getNotificationParameters(), NotificationParameter.class);

if (CollectionUtils.isEmpty(notificationParameters)) {
log.error("parse notification parameter error");
return;
}

SlaNotificationMessage notificationMessage = new SlaNotificationMessage();
notificationMessage.setMessage(buildAlertMessage(metricExecutionResult, jobExecutionRequest.getEngineType(), jobExecutionRequest.isEn()));
notificationMessage.setSubject(buildAlertSubject(metricExecutionResult, jobExecutionRequest.isEn()));
Map<SlaSenderMessage, Set<SlaConfigMessage>> configMap = new HashMap<>();
for (NotificationParameter notificationParameter : notificationParameters) {
SlaSenderMessage slaSenderMessage = new SlaSenderMessage();
slaSenderMessage.setType(notificationParameter.getType());
slaSenderMessage.setConfig(JSONUtils.toJsonString(notificationParameter.getConfig()));

Set<SlaConfigMessage> set = new HashSet<>();
if (MapUtils.isNotEmpty(notificationParameter.getReceiver())) {
SlaConfigMessage slaConfigMessage = new SlaConfigMessage();
slaConfigMessage.setType(notificationParameter.getType());
slaConfigMessage.setConfig(JSONUtils.toJsonString(notificationParameter.getReceiver()));
set.add(slaConfigMessage);
ListWithQueryColumn validateResult = (ListWithQueryColumn)response.getResult();
List<Map<String, Object>> validateResultDataList = validateResult.getResultList();
if(validateResultDataList != null){
for(Map<String, Object> validateResultData : validateResultDataList){
MetricExecutionResult metricExecutionResult = new MetricExecutionResult(validateResultData);
if (!MetricValidator.isSuccess(metricExecutionResult)) {
if (StringUtils.isEmpty(jobExecutionRequest.getNotificationParameters())) {
log.warn("notification parameter is null");
return;
}

List<NotificationParameter> notificationParameters =
JSONUtils.toList(jobExecutionRequest.getNotificationParameters(), NotificationParameter.class);

if (CollectionUtils.isEmpty(notificationParameters)) {
log.error("parse notification parameter error");
return;
}

SlaNotificationMessage notificationMessage = new SlaNotificationMessage();
notificationMessage.setMessage(buildAlertMessage(metricExecutionResult, jobExecutionRequest.getEngineType(), jobExecutionRequest.isEn()));
notificationMessage.setSubject(buildAlertSubject(metricExecutionResult, jobExecutionRequest.isEn()));
Map<SlaSenderMessage, Set<SlaConfigMessage>> configMap = new HashMap<>();
for (NotificationParameter notificationParameter : notificationParameters) {
SlaSenderMessage slaSenderMessage = new SlaSenderMessage();
slaSenderMessage.setType(notificationParameter.getType());
slaSenderMessage.setConfig(JSONUtils.toJsonString(notificationParameter.getConfig()));

Set<SlaConfigMessage> set = new HashSet<>();
if (MapUtils.isNotEmpty(notificationParameter.getReceiver())) {
SlaConfigMessage slaConfigMessage = new SlaConfigMessage();
slaConfigMessage.setType(notificationParameter.getType());
slaConfigMessage.setConfig(JSONUtils.toJsonString(notificationParameter.getReceiver()));
set.add(slaConfigMessage);
}
configMap.put(slaSenderMessage, set);
}

NotificationManager notificationManager = new NotificationManager();
notificationManager.notify(notificationMessage, configMap);
}
configMap.put(slaSenderMessage, set);
}

NotificationManager notificationManager = new NotificationManager();
notificationManager.notify(notificationMessage, configMap);
}
}
}
Expand Down

0 comments on commit 9cbcd77

Please sign in to comment.