Skip to content

Commit

Permalink
fix alert some npe
Browse files Browse the repository at this point in the history
  • Loading branch information
gaoyan1998 committed Dec 4, 2023
1 parent a1e1f5d commit 76be35d
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
import org.dinky.job.JobConfig;
import org.dinky.utils.TimeUtil;

import java.time.LocalDateTime;
import java.util.Optional;

import com.fasterxml.jackson.annotation.JsonProperty;

import cn.hutool.core.text.StrFormatter;
Expand All @@ -39,9 +42,6 @@
import lombok.Data;
import lombok.NoArgsConstructor;

import java.time.LocalDateTime;
import java.util.Optional;

@Data
@Builder
@NoArgsConstructor
Expand Down Expand Up @@ -188,7 +188,9 @@ public static JobAlertData buildData(JobInfoDetail jobInfoDetail) {
} else if (exceptions != null && ExceptionRule.isException(id, exceptions)) {
// The error message is too long to send an alarm,
// and only the first line of abnormal information is used
String err = Optional.ofNullable(exceptions.getRootException()).orElse("dinky didn't get any ERROR!").split("\n")[0];
String err = Optional.ofNullable(exceptions.getRootException())
.orElse("dinky didn't get any ERROR!")
.split("\n")[0];
if (err.length() > 100) {
err = err.substring(0, 100) + "...";
}
Expand Down
17 changes: 6 additions & 11 deletions dinky-admin/src/main/java/org/dinky/job/FlinkJobTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,17 +79,12 @@ public DaemonTaskConfig getConfig() {
*/
@Override
public boolean dealTask() {
try {
volatilityBalance();

boolean isDone = JobRefreshHandler.refreshJob(jobInfoDetail, isNeedSave());
JobAlertHandler.getInstance().check(jobInfoDetail);
JobMetricsHandler.writeFlinkMetrics(jobInfoDetail);
return isDone;
}catch (Exception e){
log.error("Deal Flink Job daemon refesh failed,", e);
return true;
}
volatilityBalance();

boolean isDone = JobRefreshHandler.refreshJob(jobInfoDetail, isNeedSave());
JobAlertHandler.getInstance().check(jobInfoDetail);
JobMetricsHandler.writeFlinkMetrics(jobInfoDetail);
return isDone;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.dinky.daemon.pool.FlinkJobThreadPool;
import org.dinky.data.dto.AlertRuleDTO;
import org.dinky.data.dto.TaskDTO;
import org.dinky.data.enums.JobLifeCycle;
import org.dinky.data.enums.Status;
import org.dinky.data.exception.DinkyException;
import org.dinky.data.model.alert.AlertGroup;
Expand All @@ -46,6 +47,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;

import org.jeasy.rules.api.Facts;
Expand Down Expand Up @@ -118,8 +120,8 @@ public JobAlertHandler() {
public void check(JobInfoDetail jobInfoDetail) {
Facts ruleFacts = new Facts();
JobAlertData jobAlertData = JobAlertData.buildData(jobInfoDetail);
JsonUtils.toMap(jobAlertData).forEach((k,v)->{
if (v == null){
JsonUtils.toMap(jobAlertData).forEach((k, v) -> {
if (v == null) {
throw new DinkyException(StrFormatter.format(
"When deal alert job data, the key [{}] value is null, its maybe dinky bug,please report", k));
}
Expand Down Expand Up @@ -178,6 +180,10 @@ private Rule buildRule(AlertRuleDTO alertRuleDTO) {
*/
private void executeAlertAction(Facts facts, AlertRuleDTO alertRuleDTO) {
TaskDTO task = taskService.getTaskInfoById(facts.get(JobAlertRuleOptions.FIELD_TASK_ID));
if (!Objects.equals(task.getStep(), JobLifeCycle.PUBLISH.getValue())) {
// Only publish job can be alerted
return;
}
Map<String, Object> dataModel = new HashMap<>(facts.asMap());
dataModel.put(JobAlertRuleOptions.OPTIONS_JOB_ALERT_RULE, alertRuleDTO);
String alertContent;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,12 @@
import cn.hutool.json.JSONArray;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import lombok.extern.slf4j.Slf4j;

/**
* The Job Metrics Handler class is used to process operations related to job metrics。
*/
@Slf4j
public class JobMetricsHandler {

/**
Expand All @@ -63,13 +65,17 @@ public static void writeFlinkMetrics(JobInfoDetail jobInfoDetail) {
() -> fetchFlinkMetrics(e.getKey(), e.getValue(), jobManagerUrls, jobId)))
.toArray(CompletableFuture[]::new);
// Wait for all Completable Future executions to finish
AsyncUtil.waitAll(array);
MetricsVO metricsVO = new MetricsVO();
metricsVO.setContent(customMetricsList);
metricsVO.setHeartTime(LocalDateTime.now());
metricsVO.setModel(jobId);
metricsVO.setDate(TimeUtil.nowStr("yyyy-MM-dd"));
MetricsContextHolder.getInstances().sendAsync(metricsVO.getModel(), metricsVO);
try {
AsyncUtil.waitAll(array);
MetricsVO metricsVO = new MetricsVO();
metricsVO.setContent(customMetricsList);
metricsVO.setHeartTime(LocalDateTime.now());
metricsVO.setModel(jobId);
metricsVO.setDate(TimeUtil.nowStr("yyyy-MM-dd"));
MetricsContextHolder.getInstances().sendAsync(metricsVO.getModel(), metricsVO);
} catch (Exception e) {
log.error("Get and save Flink metrics error", e);
}
}

/**
Expand Down

0 comments on commit 76be35d

Please sign in to comment.