Skip to content

Commit

Permalink
optimize flink option (DataLinkDC#2615)
Browse files Browse the repository at this point in the history
  • Loading branch information
gaoyan1998 authored Dec 7, 2023
1 parent 0f0832c commit 149345f
Show file tree
Hide file tree
Showing 10 changed files with 236 additions and 61 deletions.
1 change: 1 addition & 0 deletions dinky-admin/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,7 @@
<exclude>/application*.yml</exclude>
<exclude>/mybatis*.xml</exclude>
<exclude>/DinkyFlinkDockerfile</exclude>
<exclude>/FlinkConfClass</exclude>
</excludes>
</configuration>
<executions>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,8 @@
import org.dinky.data.result.Result;
import org.dinky.data.vo.CascaderVO;
import org.dinky.flink.checkpoint.CheckpointRead;
import org.dinky.utils.CascaderOptionsUtils;
import org.dinky.service.FlinkService;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;

Expand All @@ -36,14 +34,18 @@

import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

@RestController
@Slf4j
@Api(tags = "Flink Conf Controller", hidden = true)
@RequestMapping("/api/flinkConf")
@RequiredArgsConstructor
public class FlinkController {

protected static final CheckpointRead INSTANCE = new CheckpointRead();
private final FlinkService flinkService;

@GetMapping("/readCheckPoint")
@ApiOperation("Read Checkpoint")
Expand All @@ -54,41 +56,6 @@ public Result<Map<String, Map<String, CheckPointReadTable>>> readCheckPoint(Stri
@GetMapping("/configOptions")
@ApiOperation("Query Flink Configuration Options")
public Result<List<CascaderVO>> loadDataByGroup() {
final String[] nameList = {
"org.apache.flink.configuration.CoreOptions",
"org.apache.flink.configuration.RestOptions",
"org.apache.flink.configuration.PipelineOptions",
"org.apache.flink.configuration.SecurityOptions",
"org.apache.flink.configuration.YarnConfigOptions",
"org.apache.flink.configuration.WebOptions",
"org.apache.flink.configuration.JobManagerOptions",
"org.apache.flink.configuration.TaskManagerOptions",
"org.apache.flink.configuration.HighAvailabilityOptions",
"org.apache.flink.configuration.KubernetesConfigOptions",
"org.apache.flink.configuration.ClusterOptions",
"org.apache.flink.configuration.StateBackendOptions",
"org.apache.flink.configuration.QueryableStateOptions",
"org.apache.flink.configuration.CheckpointingOptions",
"org.apache.flink.configuration.JMXServerOptions",
"org.apache.flink.configuration.HeartbeatManagerOptions",
"org.apache.flink.configuration.OptimizerOptions",
"org.apache.flink.configuration.AkkaOptions",
"org.apache.flink.configuration.AlgorithmOptions",
"org.apache.flink.configuration.BlobServerOptions",
"org.apache.flink.configuration.ExecutionOptions",
"org.apache.flink.configuration.ExternalResourceOptions",
"org.apache.flink.configuration.ResourceManagerOptions",
"org.apache.flink.configuration.HistoryServerOptions",
"org.apache.flink.configuration.MetricOptions",
"org.apache.flink.configuration.NettyShuffleEnvironmentOptions",
"org.apache.flink.configuration.RestartStrategyOptions",
"org.apache.flink.yarn.configuration.YarnConfigOptions",
"org.apache.flink.kubernetes.configuration.KubernetesConfigOptions",
"org.dinky.constant.CustomerConfigureOptions"
};
List<CascaderVO> dataList = new ArrayList<>();
Arrays.stream(nameList).map(CascaderOptionsUtils::buildCascadeOptions).forEach(dataList::addAll);

return Result.succeed(dataList);
return Result.succeed(flinkService.loadConfigOptions());
}
}
13 changes: 13 additions & 0 deletions dinky-admin/src/main/java/org/dinky/data/vo/CascaderVO.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,17 @@ public class CascaderVO implements Serializable {
notes = "List of child CascaderVO objects",
dataType = "List<CascaderVO>")
private List<CascaderVO> children;

public CascaderVO() {}

public CascaderVO(String label) {
this.label = label;
this.value = label;
}

public CascaderVO(String label, List<CascaderVO> children) {
this.label = label;
this.value = label;
this.children = children;
}
}
29 changes: 29 additions & 0 deletions dinky-admin/src/main/java/org/dinky/service/FlinkService.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.dinky.service;

import org.dinky.data.vo.CascaderVO;

import java.util.List;

public interface FlinkService {

List<CascaderVO> loadConfigOptions();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.dinky.service.impl;

import org.dinky.data.flink.config.FlinkConfigOption;
import org.dinky.data.model.Document;
import org.dinky.data.vo.CascaderVO;
import org.dinky.service.FlinkService;
import org.dinky.utils.FlinkConfigOptionsUtils;

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

import org.springframework.stereotype.Service;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

@Service
@RequiredArgsConstructor
@Slf4j
public class FlinkServiceImpl implements FlinkService {

private final DocumentServiceImpl documentService;

@Override
public List<CascaderVO> loadConfigOptions() {
List<CascaderVO> dataList = new ArrayList<>();

for (String name : FlinkConfigOptionsUtils.getConfigOptionsClass()) {
List<FlinkConfigOption> flinkConfigOptions = FlinkConfigOptionsUtils.loadOptionsByClassName(name);
String binlogGroup = FlinkConfigOptionsUtils.parsedBinlogGroup(name);
List<CascaderVO> child = flinkConfigOptions.stream()
.map(conf -> new CascaderVO(conf.getKey()))
.collect(Collectors.toList());
CascaderVO cascaderVO = new CascaderVO(binlogGroup, child);
dataList.add(cascaderVO);
}

List<CascaderVO> voList = documentService.lambdaQuery().eq(Document::getCategory, "Variable").list().stream()
.map(d -> new CascaderVO(d.getName().replace("set ", "")))
.collect(Collectors.toList());

CascaderVO cascaderVO = new CascaderVO();
cascaderVO.setLabel("Custom Doc");
cascaderVO.setChildren(voList);

dataList.add(cascaderVO);
return dataList;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,24 @@

package org.dinky.service.impl;

import org.dinky.data.flink.config.FlinkConfigOption;
import org.dinky.data.model.Document;
import org.dinky.data.model.FragmentVariable;
import org.dinky.data.vo.suggestion.SuggestionLabelVO;
import org.dinky.data.vo.suggestion.SuggestionVO;
import org.dinky.service.DocumentService;
import org.dinky.service.FragmentVariableService;
import org.dinky.service.SuggestionService;
import org.dinky.utils.FlinkConfigOptionsUtils;

import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

import org.springframework.stereotype.Service;

import cn.hutool.core.text.StrFormatter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

Expand Down Expand Up @@ -65,7 +69,8 @@ public Set<SuggestionVO> getSuggestions(boolean enableSchemaSuggestion) {
}
// 4. 自定义关键词提示
buildCustomSuggestions(new HashSet<>(), suggestionVOS);

// flink config提示
buildFlinkConfSuggestions(suggestionVOS);
return suggestionVOS;
}

Expand Down Expand Up @@ -144,6 +149,25 @@ private static void buildDocumentSuggestions(Set<Document> documentList, Set<Sug
.forEach(suggestionVOS::add);
}

private static void buildFlinkConfSuggestions(Set<SuggestionVO> suggestionVOS) {
for (String name : FlinkConfigOptionsUtils.getConfigOptionsClass()) {
List<FlinkConfigOption> flinkConfigOptions = FlinkConfigOptionsUtils.loadOptionsByClassName(name);
flinkConfigOptions.stream()
.map(conf -> {
SuggestionLabelVO suggestionLabelVO = SuggestionLabelVO.builder()
.label("set " + conf.getKey())
.build();
return SuggestionVO.builder()
.key(conf.getKey() + "_flink_conf")
.insertText(StrFormatter.format("set '{}'='{}'", conf.getKey(), conf.getDefaultValue()))
.kind(4)
.label(suggestionLabelVO)
.build();
})
.forEach(suggestionVOS::add);
}
}

/**
* by keyword get suggestions list
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

package org.dinky.utils;

import org.dinky.data.vo.CascaderVO;
import org.dinky.data.flink.config.FlinkConfigOption;

import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
Expand All @@ -29,26 +29,28 @@
import java.util.Map;
import java.util.logging.Logger;

import cn.hutool.core.io.resource.ResourceUtil;
import cn.hutool.core.util.ClassLoaderUtil;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.ReflectUtil;

public class CascaderOptionsUtils {
private static final Logger logger = Logger.getLogger(CascaderOptionsUtils.class.getName());
public class FlinkConfigOptionsUtils {
private static final Logger logger = Logger.getLogger(FlinkConfigOptionsUtils.class.getName());

private static final String FLINK_CONFIG_REPLACE_SUFFIX = "Options";

private static final Map<String, List<CascaderVO>> cache = new HashMap<>();
private static final Map<String, List<FlinkConfigOption>> cache = new HashMap<>();
/**
* build flink config cascade options
* * @param name
*/
public static List<CascaderVO> buildCascadeOptions(String name) {
public static List<FlinkConfigOption> loadOptionsByClassName(String name) {

if (cache.containsKey(name) && cache.get(name) != null) {
return cache.get(name);
}

List<CascaderVO> dataList = new ArrayList<>();
List<FlinkConfigOption> configList = new ArrayList<>();
try {
Class<?> loadClass = ClassLoaderUtil.getContextClassLoader().loadClass(name);
Field[] fields = ReflectUtil.getFields(loadClass, f -> {
Expand All @@ -59,31 +61,26 @@ public static List<CascaderVO> buildCascadeOptions(String name) {
return false;
}
});
List<CascaderVO> configList = new ArrayList<>();
for (Field field : fields) {
CascaderVO config = new CascaderVO();
FlinkConfigOption config = new FlinkConfigOption();
Object fieldValue = ReflectUtil.getStaticFieldValue(field);
String key = ReflectUtil.invoke(fieldValue, "key");
config.setValue(key);
config.setLabel(key);
Object defaultValue = ReflectUtil.invoke(fieldValue, "defaultValue");
config.setKey(key);
if (ObjectUtil.isBasicType(defaultValue)) {
config.setDefaultValue(String.valueOf(defaultValue));
} else {
config.setDefaultValue("");
}
configList.add(config);
}
CascaderVO cascaderVO = new CascaderVO();
cascaderVO.setChildren(configList);
// parsed binlog group
String parsedBinlogGroup = parsedBinlogGroup(name);

cascaderVO.setLabel(parsedBinlogGroup);
cascaderVO.setValue(parsedBinlogGroup);
dataList.add(cascaderVO);
cache.put(name, dataList);
} catch (ClassNotFoundException ignored) {
logger.warning("get config option error, class not found: " + name);
}
return dataList;
return configList;
}

private static String parsedBinlogGroup(String name) {
public static String parsedBinlogGroup(String name) {
String[] names = name.split("\\.");
// delete Option suffix and return
String lastName = names[names.length - 1];
Expand All @@ -92,4 +89,8 @@ private static String parsedBinlogGroup(String name) {
}
return lastName;
}

public static String[] getConfigOptionsClass() {
return ResourceUtil.readUtf8Str("FlinkConfClass").replace("\r", "").split("\n");
}
}
30 changes: 30 additions & 0 deletions dinky-admin/src/main/resources/FlinkConfClass
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
org.apache.flink.configuration.CoreOptions
org.apache.flink.configuration.RestOptions
org.apache.flink.configuration.PipelineOptions
org.apache.flink.configuration.SecurityOptions
org.apache.flink.configuration.YarnConfigOptions
org.apache.flink.configuration.WebOptions
org.apache.flink.configuration.JobManagerOptions
org.apache.flink.configuration.TaskManagerOptions
org.apache.flink.configuration.HighAvailabilityOptions
org.apache.flink.configuration.KubernetesConfigOptions
org.apache.flink.configuration.ClusterOptions
org.apache.flink.configuration.StateBackendOptions
org.apache.flink.configuration.QueryableStateOptions
org.apache.flink.configuration.CheckpointingOptions
org.apache.flink.configuration.JMXServerOptions
org.apache.flink.configuration.HeartbeatManagerOptions
org.apache.flink.configuration.OptimizerOptions
org.apache.flink.configuration.AkkaOptions
org.apache.flink.configuration.AlgorithmOptions
org.apache.flink.configuration.BlobServerOptions
org.apache.flink.configuration.ExecutionOptions
org.apache.flink.configuration.ExternalResourceOptions
org.apache.flink.configuration.ResourceManagerOptions
org.apache.flink.configuration.HistoryServerOptions
org.apache.flink.configuration.MetricOptions
org.apache.flink.configuration.NettyShuffleEnvironmentOptions
org.apache.flink.configuration.RestartStrategyOptions
org.apache.flink.yarn.configuration.YarnConfigOptions
org.apache.flink.kubernetes.configuration.KubernetesConfigOptions
org.dinky.constant.CustomerConfigureOptions
1 change: 1 addition & 0 deletions dinky-assembly/src/main/assembly/package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
<include>**/*.yaml</include>
<include>**/log4j2.xml</include>
<include>**/DinkyFlinkDockerfile</include>
<include>**/FlinkConfClass</include>
</includes>
</fileSet>
<fileSet>
Expand Down
Loading

0 comments on commit 149345f

Please sign in to comment.