Skip to content

Commit

Permalink
Refactor metrics (DataLinkDC#2633)
Browse files Browse the repository at this point in the history
* Refactored the monitoring page

* formate code

* restore LocalStreamExecutor

* restore Error submission
  • Loading branch information
gaoyan1998 authored Dec 13, 2023
1 parent 4093fc7 commit 4034302
Show file tree
Hide file tree
Showing 29 changed files with 381 additions and 317 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,20 @@

package org.dinky.controller;

import org.dinky.data.MetricsLayoutVo;
import org.dinky.data.annotations.Log;
import org.dinky.data.dto.MetricsLayoutDTO;
import org.dinky.data.enums.BusinessType;
import org.dinky.data.enums.MetricsType;
import org.dinky.data.metrics.Jvm;
import org.dinky.data.model.Metrics;
import org.dinky.data.result.ProTableResult;
import org.dinky.data.result.Result;
import org.dinky.data.vo.MetricsVO;
import org.dinky.data.vo.task.JobInstanceVo;
import org.dinky.service.JobInstanceService;
import org.dinky.service.MonitorService;

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
Expand All @@ -44,9 +42,6 @@
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;

import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.lang.Opt;
Expand Down Expand Up @@ -88,19 +83,11 @@ public Result<List<MetricsVO>> getData(@RequestParam Long startTime, Long endTim
@ApiImplicitParam(name = "endTime", value = "End Time", required = false, dataType = "Long"),
@ApiImplicitParam(name = "taskIds", value = "Task Ids", required = true, dataType = "String")
})
public Result<List<MetricsVO>> getFlinkData(@RequestParam Long startTime, Long endTime, String taskIds) {
JsonNodeFactory nodeFactory = JsonNodeFactory.instance;
ObjectNode para = nodeFactory.objectNode();
para.put("isHistory", false);
para.put("taskId", taskIds);
ProTableResult<JobInstanceVo> jobInstanceProTableResult = jobInstanceService.listJobInstances(para);
List<String> jids = jobInstanceProTableResult.getData().stream()
.map(JobInstanceVo::getJid)
.collect(Collectors.toList());
public Result<List<MetricsVO>> getFlinkData(@RequestParam Long startTime, Long endTime, String flinkJobIds) {
return Result.succeed(monitorService.getData(
DateUtil.date(startTime),
DateUtil.date(Opt.ofNullable(endTime).orElse(DateUtil.date().getTime())),
jids));
Arrays.asList(flinkJobIds.split(","))));
}

@PutMapping("/saveFlinkMetrics/{layout}")
Expand All @@ -124,7 +111,7 @@ public Result<Void> saveFlinkMetricLayout(

@GetMapping("/getMetricsLayout")
@ApiOperation("Get Metrics Layout to Display")
public Result<Map<String, List<Metrics>>> getMetricsLayout() {
public Result<List<MetricsLayoutVo>> getMetricsLayout() {
return Result.succeed(monitorService.getMetricsLayout());
}

Expand Down
50 changes: 50 additions & 0 deletions dinky-admin/src/main/java/org/dinky/data/MetricsLayoutVo.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.dinky.data;

import org.dinky.data.model.Metrics;

import java.util.List;

import io.swagger.annotations.ApiModelProperty;
import lombok.Builder;
import lombok.Data;

@Data
@Builder
public class MetricsLayoutVo {
@ApiModelProperty(value = "Layout Name", dataType = "String", notes = "Name of the layout")
private String layoutName;

@ApiModelProperty(value = "Job ID", dataType = "String", notes = "ID of the associated job")
private String flinkJobId;

@ApiModelProperty(value = "Task ID", dataType = "Integer", example = "1001", notes = "ID of the associated task")
private int taskId;

/**
* The feature is not complete and may be implemented in the future
*/
@ApiModelProperty(value = "Show In Dashboard", dataType = "Boolean", notes = "Whether to show in dashboard")
private boolean showInDashboard;

@ApiModelProperty(value = "Metrics", dataType = "List<Metrics>", notes = "Metrics information")
private List<Metrics> metrics;
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@
import org.dinky.data.model.job.JobInstance;

import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;

import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
Expand Down Expand Up @@ -61,12 +59,6 @@ public class JobInfoDetail implements Serializable {
@ApiModelProperty(value = "JobDataDto", notes = "Details about the job")
private JobDataDto jobDataDto;

/**
* jobId -> metricsId -> metricsValue
*/
@ApiModelProperty(value = "Metrics Map", notes = "Details about the metrics map")
private Map<String, Map<String, String>> customMetricsMap = new HashMap<>();

public JobInfoDetail(Integer id) {
this.id = id;
}
Expand Down
22 changes: 20 additions & 2 deletions dinky-admin/src/main/java/org/dinky/job/FlinkJobTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,11 @@
import org.dinky.job.handler.JobMetricsHandler;
import org.dinky.job.handler.JobRefreshHandler;
import org.dinky.service.JobInstanceService;
import org.dinky.service.MonitorService;

import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;

import org.springframework.context.annotation.DependsOn;

Expand All @@ -43,14 +46,22 @@
public class FlinkJobTask implements DaemonTask {

private DaemonTaskConfig config;

public static final String TYPE = FlinkJobTask.class.toString();

private static final JobInstanceService jobInstanceService;

private static final MonitorService monitorService;

private long preDealTime;

private long refreshCount = 0;

private Map<String, Map<String, String>> verticesAndMetricsMap = new ConcurrentHashMap<>();

static {
jobInstanceService = SpringContextUtils.getBean("jobInstanceServiceImpl", JobInstanceService.class);
monitorService = SpringContextUtils.getBean("monitorServiceImpl", MonitorService.class);
}

private JobInfoDetail jobInfoDetail;
Expand All @@ -59,6 +70,13 @@ public class FlinkJobTask implements DaemonTask {
public DaemonTask setConfig(DaemonTaskConfig config) {
this.config = config;
this.jobInfoDetail = jobInstanceService.getJobInfoDetail(config.getId());
// Get a list of metrics and deduplicate them based on vertices and metrics
monitorService
.getMetricsLayoutByTaskId(jobInfoDetail.getInstance().getTaskId())
.forEach(m -> {
verticesAndMetricsMap.putIfAbsent(m.getVertices(), new ConcurrentHashMap<>());
verticesAndMetricsMap.get(m.getVertices()).put(m.getMetrics(), "");
});
return this;
}

Expand All @@ -83,9 +101,9 @@ public boolean dealTask() {
volatilityBalance();

boolean isDone = JobRefreshHandler.refreshJob(jobInfoDetail, isNeedSave());
if (Asserts.isAllNotNull(jobInfoDetail.getInstance(), jobInfoDetail.getClusterInstance())) {
if (Asserts.isAllNotNull(jobInfoDetail.getClusterInstance())) {
JobAlertHandler.getInstance().check(jobInfoDetail);
JobMetricsHandler.writeFlinkMetrics(jobInfoDetail);
JobMetricsHandler.refeshAndWriteFlinkMetrics(jobInfoDetail, verticesAndMetricsMap);
}
return isDone;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ public class JobMetricsHandler {
* Send to MetricsContextHolder asynchronously at the end of the method. </br>
* Thus, the operation of writing the Flink indicator is completed. </br>
*/
public static void writeFlinkMetrics(JobInfoDetail jobInfoDetail) {
Map<String, Map<String, String>> customMetricsList = jobInfoDetail.getCustomMetricsMap();
public static void refeshAndWriteFlinkMetrics(
JobInfoDetail jobInfoDetail, Map<String, Map<String, String>> customMetricsList) {
String[] jobManagerUrls =
jobInfoDetail.getClusterInstance().getJobManagerHost().split(",");
String jobId = jobInfoDetail.getInstance().getJid();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@

package org.dinky.service;

import org.dinky.data.MetricsLayoutVo;
import org.dinky.data.dto.MetricsLayoutDTO;
import org.dinky.data.model.Metrics;
import org.dinky.data.vo.MetricsVO;

import java.util.Date;
import java.util.List;
import java.util.Map;

import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
Expand Down Expand Up @@ -65,7 +65,7 @@ public interface MonitorService extends IService<Metrics> {
*
* @return A map where the keys are layout names and the values are lists of {@link Metrics} objects representing the metrics in each layout.
*/
Map<String, List<Metrics>> getMetricsLayout();
List<MetricsLayoutVo> getMetricsLayout();

/**
* Get the metrics layout by name.
Expand All @@ -81,5 +81,5 @@ public interface MonitorService extends IService<Metrics> {
* @param taskId The ID of the task to get the job metrics for.
* @return A list of {@link Metrics} objects representing the job metrics for the specified task ID.
*/
List<Metrics> getJobMetrics(Integer taskId);
List<Metrics> getMetricsLayoutByTaskId(Integer taskId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,10 @@
import org.dinky.service.HistoryService;
import org.dinky.service.JobHistoryService;
import org.dinky.service.JobInstanceService;
import org.dinky.service.MonitorService;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;

import org.springframework.stereotype.Service;

Expand Down Expand Up @@ -82,7 +80,6 @@ public class JobInstanceServiceImpl extends SuperServiceImpl<JobInstanceMapper,
private final ClusterInstanceService clusterInstanceService;
private final ClusterConfigurationService clusterConfigurationService;
private final JobHistoryService jobHistoryService;
private final MonitorService monitorService;

@Override
public JobInstance getByIdWithoutTenant(Integer id) {
Expand Down Expand Up @@ -174,13 +171,6 @@ public JobInfoDetail getJobInfoDetailInfo(JobInstance jobInstance) {
JobDataDto jobDataDto = jobHistoryService.getJobHistoryDto(jobInstance.getId());
jobInfoDetail.setJobDataDto(jobDataDto);

// Get a list of metrics and deduplicate them based on vertices and metrics
Map<String, Map<String, String>> verticesAndMetricsMap = new ConcurrentHashMap<>();
monitorService.getJobMetrics(jobInstance.getTaskId()).forEach(m -> {
verticesAndMetricsMap.putIfAbsent(m.getVertices(), new ConcurrentHashMap<>());
verticesAndMetricsMap.get(m.getVertices()).put(m.getMetrics(), "");
});
jobInfoDetail.setCustomMetricsMap(verticesAndMetricsMap);
return jobInfoDetail;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,18 @@

package org.dinky.service.impl;

import org.dinky.data.MetricsLayoutVo;
import org.dinky.data.constant.PaimonTableConstant;
import org.dinky.data.dto.MetricsLayoutDTO;
import org.dinky.data.exception.DinkyException;
import org.dinky.data.metrics.Jvm;
import org.dinky.data.model.Metrics;
import org.dinky.data.model.job.JobInstance;
import org.dinky.data.vo.MetricsVO;
import org.dinky.mapper.MetricsMapper;
import org.dinky.service.JobInstanceService;
import org.dinky.service.MonitorService;
import org.dinky.utils.JsonUtils;
import org.dinky.utils.PaimonUtil;

import org.apache.paimon.data.BinaryString;
Expand All @@ -36,7 +40,6 @@

import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
Expand All @@ -55,17 +58,21 @@
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.lang.Opt;
import cn.hutool.core.thread.ThreadUtil;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import lombok.RequiredArgsConstructor;

@Service
@RequiredArgsConstructor
public class MonitorServiceImpl extends ServiceImpl<MetricsMapper, Metrics> implements MonitorService {

private final Executor scheduleRefreshMonitorDataExecutor;
private final JobInstanceService jobInstanceService;

@Override
public List<MetricsVO> getData(Date startTime, Date endTime, List<String> models) {
if (models.isEmpty()) {
throw new DinkyException("Please provide at least one monitoring ID");
}
endTime = Opt.ofNullable(endTime).orElse(DateUtil.date());
Timestamp startTS = Timestamp.fromLocalDateTime(DateUtil.toLocalDateTime(startTime));
Timestamp endTS = Timestamp.fromLocalDateTime(DateUtil.toLocalDateTime(endTime));
Expand All @@ -86,7 +93,7 @@ public List<MetricsVO> getData(Date startTime, Date endTime, List<String> models
return metricsVOList.stream()
.filter(x -> x.getHeartTime().isAfter(startTS.toLocalDateTime()))
.filter(x -> x.getHeartTime().isBefore(endTS.toLocalDateTime()))
.peek(vo -> vo.setContent(new JSONObject(vo.getContent().toString())))
.peek(vo -> vo.setContent(JsonUtils.parseObject(vo.getContent().toString())))
.collect(Collectors.toList());
}

Expand Down Expand Up @@ -121,14 +128,24 @@ public void saveFlinkMetricLayout(String layout, List<MetricsLayoutDTO> metricsL
}

@Override
public Map<String, List<Metrics>> getMetricsLayout() {
List<Metrics> list = list();
Map<String, List<Metrics>> result = new HashMap<>();
list.forEach(x -> {
String layoutName = x.getLayoutName();
result.computeIfAbsent(layoutName, (k) -> new ArrayList<>());
result.get(layoutName).add(x);
});
public List<MetricsLayoutVo> getMetricsLayout() {
Map<String, List<Metrics>> collect = list().stream().collect(Collectors.groupingBy(Metrics::getLayoutName));

List<MetricsLayoutVo> result = new ArrayList<>();
for (Map.Entry<String, List<Metrics>> entry : collect.entrySet()) {
// It is derived from a group, so the value must have a value,
// and a layout name only corresponds to a task ID, so only the first one can be taken
Integer taskId = entry.getValue().get(0).getTaskId();
JobInstance jobInstance = jobInstanceService.getJobInstanceByTaskId(taskId);
MetricsLayoutVo metricsLayoutVo = MetricsLayoutVo.builder()
.layoutName(entry.getKey())
.metrics(entry.getValue())
.flinkJobId(jobInstance == null ? null : jobInstance.getJid())
.taskId(taskId)
.showInDashboard(true)
.build();
result.add(metricsLayoutVo);
}
return result;
}

Expand All @@ -140,7 +157,7 @@ public List<Metrics> getMetricsLayoutByName(String layoutName) {
}

@Override
public List<Metrics> getJobMetrics(Integer taskId) {
public List<Metrics> getMetricsLayoutByTaskId(Integer taskId) {
QueryWrapper<Metrics> wrapper = new QueryWrapper<>();
wrapper.lambda().eq(Metrics::getTaskId, taskId);
return this.baseMapper.selectList(wrapper);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ public JobConfig buildJobConfig(TaskDTO task) {
try {
config.setAddress(clusterInstanceService.buildEnvironmentAddress(config));
} catch (Exception e) {
log.error("Init remote cluster error", e);
log.error("Init remote cluster error:{}", e.getMessage());
}
return config;
}
Expand Down
Loading

0 comments on commit 4034302

Please sign in to comment.