Skip to content

Commit

Permalink
formate code
Browse files Browse the repository at this point in the history
  • Loading branch information
gaoyan1998 committed Dec 13, 2023
1 parent f2a0a98 commit 7da0338
Show file tree
Hide file tree
Showing 9 changed files with 53 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,13 @@
import org.dinky.data.enums.MetricsType;
import org.dinky.data.metrics.Jvm;
import org.dinky.data.model.Metrics;
import org.dinky.data.result.ProTableResult;
import org.dinky.data.result.Result;
import org.dinky.data.vo.MetricsVO;
import org.dinky.data.vo.task.JobInstanceVo;
import org.dinky.service.JobInstanceService;
import org.dinky.service.MonitorService;

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
Expand All @@ -46,9 +42,6 @@
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;

import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.lang.Opt;
Expand All @@ -72,8 +65,8 @@ public class MonitorController {
@GetMapping("/getSysData")
@ApiOperation("Get System Data")
@ApiImplicitParams({
@ApiImplicitParam(name = "startTime", value = "Start Time", required = true, dataType = "Long"),
@ApiImplicitParam(name = "endTime", value = "End Time", required = false, dataType = "Long")
@ApiImplicitParam(name = "startTime", value = "Start Time", required = true, dataType = "Long"),
@ApiImplicitParam(name = "endTime", value = "End Time", required = false, dataType = "Long")
})
public Result<List<MetricsVO>> getData(@RequestParam Long startTime, Long endTime) {
List<MetricsVO> data = monitorService.getData(
Expand All @@ -86,9 +79,9 @@ public Result<List<MetricsVO>> getData(@RequestParam Long startTime, Long endTim
@GetMapping("/getFlinkData")
@ApiOperation("Get Flink Data")
@ApiImplicitParams({
@ApiImplicitParam(name = "startTime", value = "Start Time", required = true, dataType = "Long"),
@ApiImplicitParam(name = "endTime", value = "End Time", required = false, dataType = "Long"),
@ApiImplicitParam(name = "taskIds", value = "Task Ids", required = true, dataType = "String")
@ApiImplicitParam(name = "startTime", value = "Start Time", required = true, dataType = "Long"),
@ApiImplicitParam(name = "endTime", value = "End Time", required = false, dataType = "Long"),
@ApiImplicitParam(name = "taskIds", value = "Task Ids", required = true, dataType = "String")
})
public Result<List<MetricsVO>> getFlinkData(@RequestParam Long startTime, Long endTime, String flinkJobIds) {
return Result.succeed(monitorService.getData(
Expand All @@ -101,12 +94,12 @@ public Result<List<MetricsVO>> getFlinkData(@RequestParam Long startTime, Long e
@ApiOperation("Save Flink Metrics")
@Log(title = "Save Flink Metrics", businessType = BusinessType.INSERT)
@ApiImplicitParams({
@ApiImplicitParam(name = "layout", value = "Layout Name", required = true, dataType = "String"),
@ApiImplicitParam(
name = "metricsList",
value = "Metrics List",
required = true,
dataType = "List<MetricsLayoutDTO>")
@ApiImplicitParam(name = "layout", value = "Layout Name", required = true, dataType = "String"),
@ApiImplicitParam(
name = "metricsList",
value = "Metrics List",
required = true,
dataType = "List<MetricsLayoutDTO>")
})
public Result<Void> saveFlinkMetricLayout(
@PathVariable(value = "layout") String layoutName, @RequestBody List<MetricsLayoutDTO> metricsList) {
Expand Down
26 changes: 23 additions & 3 deletions dinky-admin/src/main/java/org/dinky/data/MetricsLayoutVo.java
Original file line number Diff line number Diff line change
@@ -1,12 +1,32 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.dinky.data;

import io.swagger.annotations.ApiModelProperty;
import lombok.Builder;
import lombok.Data;
import org.dinky.data.model.Metrics;

import java.util.List;

import io.swagger.annotations.ApiModelProperty;
import lombok.Builder;
import lombok.Data;

@Data
@Builder
public class MetricsLayoutVo {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@
import org.dinky.data.model.job.JobInstance;

import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;

import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
Expand Down
12 changes: 7 additions & 5 deletions dinky-admin/src/main/java/org/dinky/job/FlinkJobTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@
import org.dinky.job.handler.JobMetricsHandler;
import org.dinky.job.handler.JobRefreshHandler;
import org.dinky.service.JobInstanceService;
import org.dinky.service.MonitorService;

import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;

import org.dinky.service.MonitorService;
import org.springframework.context.annotation.DependsOn;

import lombok.Data;
Expand Down Expand Up @@ -71,10 +71,12 @@ public DaemonTask setConfig(DaemonTaskConfig config) {
this.config = config;
this.jobInfoDetail = jobInstanceService.getJobInfoDetail(config.getId());
// Get a list of metrics and deduplicate them based on vertices and metrics
monitorService.getMetricsLayoutByTaskId(jobInfoDetail.getInstance().getTaskId()).forEach(m -> {
verticesAndMetricsMap.putIfAbsent(m.getVertices(), new ConcurrentHashMap<>());
verticesAndMetricsMap.get(m.getVertices()).put(m.getMetrics(), "");
});
monitorService
.getMetricsLayoutByTaskId(jobInfoDetail.getInstance().getTaskId())
.forEach(m -> {
verticesAndMetricsMap.putIfAbsent(m.getVertices(), new ConcurrentHashMap<>());
verticesAndMetricsMap.get(m.getVertices()).put(m.getMetrics(), "");
});
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ public class JobMetricsHandler {
* Send to MetricsContextHolder asynchronously at the end of the method. </br>
* Thus, the operation of writing the Flink indicator is completed. </br>
*/
public static void refeshAndWriteFlinkMetrics(JobInfoDetail jobInfoDetail,Map<String, Map<String, String>> customMetricsList) {
public static void refeshAndWriteFlinkMetrics(
JobInfoDetail jobInfoDetail, Map<String, Map<String, String>> customMetricsList) {
String[] jobManagerUrls =
jobInfoDetail.getClusterInstance().getJobManagerHost().split(",");
String jobId = jobInfoDetail.getInstance().getJid();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@

import java.util.Date;
import java.util.List;
import java.util.Map;

import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,10 @@
import org.dinky.service.HistoryService;
import org.dinky.service.JobHistoryService;
import org.dinky.service.JobInstanceService;
import org.dinky.service.MonitorService;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;

import org.springframework.stereotype.Service;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@

import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
Expand All @@ -59,7 +58,6 @@
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.lang.Opt;
import cn.hutool.core.thread.ThreadUtil;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import lombok.RequiredArgsConstructor;

Expand All @@ -68,11 +66,11 @@
public class MonitorServiceImpl extends ServiceImpl<MetricsMapper, Metrics> implements MonitorService {

private final Executor scheduleRefreshMonitorDataExecutor;
private final JobInstanceService jobInstanceService;
private final JobInstanceService jobInstanceService;

@Override
public List<MetricsVO> getData(Date startTime, Date endTime, List<String> models) {
if (models.isEmpty()){
if (models.isEmpty()) {
throw new DinkyException("Please provide at least one monitoring ID");
}
endTime = Opt.ofNullable(endTime).orElse(DateUtil.date());
Expand Down Expand Up @@ -133,18 +131,19 @@ public void saveFlinkMetricLayout(String layout, List<MetricsLayoutDTO> metricsL
public List<MetricsLayoutVo> getMetricsLayout() {
Map<String, List<Metrics>> collect = list().stream().collect(Collectors.groupingBy(Metrics::getLayoutName));

List<MetricsLayoutVo> result= new ArrayList<>();
List<MetricsLayoutVo> result = new ArrayList<>();
for (Map.Entry<String, List<Metrics>> entry : collect.entrySet()) {
//It is derived from a group, so the value must have a value,
// It is derived from a group, so the value must have a value,
// and a layout name only corresponds to a task ID, so only the first one can be taken
Integer taskId = entry.getValue().get(0).getTaskId();
JobInstance jobInstance = jobInstanceService.getJobInstanceByTaskId(taskId);
MetricsLayoutVo metricsLayoutVo = MetricsLayoutVo.builder()
.layoutName(entry.getKey())
.metrics(entry.getValue())
.flinkJobId(jobInstance==null?null:jobInstance.getJid())
.flinkJobId(jobInstance == null ? null : jobInstance.getJid())
.taskId(taskId)
.showInDashboard(true).build();
.showInDashboard(true)
.build();
result.add(metricsLayoutVo);
}
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.stream.Collectors;
Expand Down Expand Up @@ -49,9 +48,9 @@ public LocalStreamExecutor(ExecutorConfig executorConfig) {
}
if (!executorConfig.isPlan()) {
Configuration configuration = Configuration.fromMap(executorConfig.getConfig());
// if (!configuration.contains(RestOptions.PORT)) {
// configuration.set(RestOptions.PORT, executorConfig.getPort());
// }
// if (!configuration.contains(RestOptions.PORT)) {
// configuration.set(RestOptions.PORT, executorConfig.getPort());
// }
this.environment = StreamExecutionEnvironment.createLocalEnvironment(configuration);
} else {
this.environment = StreamExecutionEnvironment.createLocalEnvironment();
Expand Down

0 comments on commit 7da0338

Please sign in to comment.