diff --git a/dinky-admin/pom.xml b/dinky-admin/pom.xml index c0db8d5a2d..fb1ce6c011 100644 --- a/dinky-admin/pom.xml +++ b/dinky-admin/pom.xml @@ -404,6 +404,7 @@ /application*.yml /mybatis*.xml /DinkyFlinkDockerfile + /FlinkConfClass diff --git a/dinky-admin/src/main/java/org/dinky/controller/FlinkController.java b/dinky-admin/src/main/java/org/dinky/controller/FlinkController.java index 0b432a84b7..df2c545bc7 100644 --- a/dinky-admin/src/main/java/org/dinky/controller/FlinkController.java +++ b/dinky-admin/src/main/java/org/dinky/controller/FlinkController.java @@ -23,10 +23,8 @@ import org.dinky.data.result.Result; import org.dinky.data.vo.CascaderVO; import org.dinky.flink.checkpoint.CheckpointRead; -import org.dinky.utils.CascaderOptionsUtils; +import org.dinky.service.FlinkService; -import java.util.ArrayList; -import java.util.Arrays; import java.util.List; import java.util.Map; @@ -36,14 +34,18 @@ import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @RestController @Slf4j @Api(tags = "Flink Conf Controller", hidden = true) @RequestMapping("/api/flinkConf") +@RequiredArgsConstructor public class FlinkController { + protected static final CheckpointRead INSTANCE = new CheckpointRead(); + private final FlinkService flinkService; @GetMapping("/readCheckPoint") @ApiOperation("Read Checkpoint") @@ -54,41 +56,6 @@ public Result>> readCheckPoint(Stri @GetMapping("/configOptions") @ApiOperation("Query Flink Configuration Options") public Result> loadDataByGroup() { - final String[] nameList = { - "org.apache.flink.configuration.CoreOptions", - "org.apache.flink.configuration.RestOptions", - "org.apache.flink.configuration.PipelineOptions", - "org.apache.flink.configuration.SecurityOptions", - "org.apache.flink.configuration.YarnConfigOptions", - "org.apache.flink.configuration.WebOptions", - "org.apache.flink.configuration.JobManagerOptions", - "org.apache.flink.configuration.TaskManagerOptions", - "org.apache.flink.configuration.HighAvailabilityOptions", - "org.apache.flink.configuration.KubernetesConfigOptions", - "org.apache.flink.configuration.ClusterOptions", - "org.apache.flink.configuration.StateBackendOptions", - "org.apache.flink.configuration.QueryableStateOptions", - "org.apache.flink.configuration.CheckpointingOptions", - "org.apache.flink.configuration.JMXServerOptions", - "org.apache.flink.configuration.HeartbeatManagerOptions", - "org.apache.flink.configuration.OptimizerOptions", - "org.apache.flink.configuration.AkkaOptions", - "org.apache.flink.configuration.AlgorithmOptions", - "org.apache.flink.configuration.BlobServerOptions", - "org.apache.flink.configuration.ExecutionOptions", - "org.apache.flink.configuration.ExternalResourceOptions", - "org.apache.flink.configuration.ResourceManagerOptions", - "org.apache.flink.configuration.HistoryServerOptions", - "org.apache.flink.configuration.MetricOptions", - "org.apache.flink.configuration.NettyShuffleEnvironmentOptions", - "org.apache.flink.configuration.RestartStrategyOptions", - "org.apache.flink.yarn.configuration.YarnConfigOptions", - "org.apache.flink.kubernetes.configuration.KubernetesConfigOptions", - "org.dinky.constant.CustomerConfigureOptions" - }; - List dataList = new ArrayList<>(); - Arrays.stream(nameList).map(CascaderOptionsUtils::buildCascadeOptions).forEach(dataList::addAll); - - return Result.succeed(dataList); + return Result.succeed(flinkService.loadConfigOptions()); } } diff --git a/dinky-admin/src/main/java/org/dinky/data/vo/CascaderVO.java b/dinky-admin/src/main/java/org/dinky/data/vo/CascaderVO.java index ff48920af7..8298ab1f2b 100644 --- a/dinky-admin/src/main/java/org/dinky/data/vo/CascaderVO.java +++ b/dinky-admin/src/main/java/org/dinky/data/vo/CascaderVO.java @@ -47,4 +47,17 @@ public class CascaderVO implements Serializable { notes = "List of child CascaderVO objects", dataType = "List") private List children; + + public CascaderVO() {} + + public CascaderVO(String label) { + this.label = label; + this.value = label; + } + + public CascaderVO(String label, List children) { + this.label = label; + this.value = label; + this.children = children; + } } diff --git a/dinky-admin/src/main/java/org/dinky/service/FlinkService.java b/dinky-admin/src/main/java/org/dinky/service/FlinkService.java new file mode 100644 index 0000000000..eb4da64c06 --- /dev/null +++ b/dinky-admin/src/main/java/org/dinky/service/FlinkService.java @@ -0,0 +1,29 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.dinky.service; + +import org.dinky.data.vo.CascaderVO; + +import java.util.List; + +public interface FlinkService { + + List loadConfigOptions(); +} diff --git a/dinky-admin/src/main/java/org/dinky/service/impl/FlinkServiceImpl.java b/dinky-admin/src/main/java/org/dinky/service/impl/FlinkServiceImpl.java new file mode 100644 index 0000000000..33a82f6ddb --- /dev/null +++ b/dinky-admin/src/main/java/org/dinky/service/impl/FlinkServiceImpl.java @@ -0,0 +1,69 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.dinky.service.impl; + +import org.dinky.data.flink.config.FlinkConfigOption; +import org.dinky.data.model.Document; +import org.dinky.data.vo.CascaderVO; +import org.dinky.service.FlinkService; +import org.dinky.utils.FlinkConfigOptionsUtils; + +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +import org.springframework.stereotype.Service; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +@Service +@RequiredArgsConstructor +@Slf4j +public class FlinkServiceImpl implements FlinkService { + + private final DocumentServiceImpl documentService; + + @Override + public List loadConfigOptions() { + List dataList = new ArrayList<>(); + + for (String name : FlinkConfigOptionsUtils.getConfigOptionsClass()) { + List flinkConfigOptions = FlinkConfigOptionsUtils.loadOptionsByClassName(name); + String binlogGroup = FlinkConfigOptionsUtils.parsedBinlogGroup(name); + List child = flinkConfigOptions.stream() + .map(conf -> new CascaderVO(conf.getKey())) + .collect(Collectors.toList()); + CascaderVO cascaderVO = new CascaderVO(binlogGroup, child); + dataList.add(cascaderVO); + } + + List voList = documentService.lambdaQuery().eq(Document::getCategory, "Variable").list().stream() + .map(d -> new CascaderVO(d.getName().replace("set ", ""))) + .collect(Collectors.toList()); + + CascaderVO cascaderVO = new CascaderVO(); + cascaderVO.setLabel("Custom Doc"); + cascaderVO.setChildren(voList); + + dataList.add(cascaderVO); + return dataList; + } +} diff --git a/dinky-admin/src/main/java/org/dinky/service/impl/SuggestionServiceImpl.java b/dinky-admin/src/main/java/org/dinky/service/impl/SuggestionServiceImpl.java index 6805524487..56587a6fbf 100644 --- a/dinky-admin/src/main/java/org/dinky/service/impl/SuggestionServiceImpl.java +++ b/dinky-admin/src/main/java/org/dinky/service/impl/SuggestionServiceImpl.java @@ -19,6 +19,7 @@ package org.dinky.service.impl; +import org.dinky.data.flink.config.FlinkConfigOption; import org.dinky.data.model.Document; import org.dinky.data.model.FragmentVariable; import org.dinky.data.vo.suggestion.SuggestionLabelVO; @@ -26,13 +27,16 @@ import org.dinky.service.DocumentService; import org.dinky.service.FragmentVariableService; import org.dinky.service.SuggestionService; +import org.dinky.utils.FlinkConfigOptionsUtils; import java.util.HashSet; +import java.util.List; import java.util.Set; import java.util.stream.Collectors; import org.springframework.stereotype.Service; +import cn.hutool.core.text.StrFormatter; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -65,7 +69,8 @@ public Set getSuggestions(boolean enableSchemaSuggestion) { } // 4. 自定义关键词提示 buildCustomSuggestions(new HashSet<>(), suggestionVOS); - + // flink config提示 + buildFlinkConfSuggestions(suggestionVOS); return suggestionVOS; } @@ -144,6 +149,25 @@ private static void buildDocumentSuggestions(Set documentList, Set suggestionVOS) { + for (String name : FlinkConfigOptionsUtils.getConfigOptionsClass()) { + List flinkConfigOptions = FlinkConfigOptionsUtils.loadOptionsByClassName(name); + flinkConfigOptions.stream() + .map(conf -> { + SuggestionLabelVO suggestionLabelVO = SuggestionLabelVO.builder() + .label("set " + conf.getKey()) + .build(); + return SuggestionVO.builder() + .key(conf.getKey() + "_flink_conf") + .insertText(StrFormatter.format("set '{}'='{}'", conf.getKey(), conf.getDefaultValue())) + .kind(4) + .label(suggestionLabelVO) + .build(); + }) + .forEach(suggestionVOS::add); + } + } + /** * by keyword get suggestions list * diff --git a/dinky-admin/src/main/java/org/dinky/utils/CascaderOptionsUtils.java b/dinky-admin/src/main/java/org/dinky/utils/FlinkConfigOptionsUtils.java similarity index 69% rename from dinky-admin/src/main/java/org/dinky/utils/CascaderOptionsUtils.java rename to dinky-admin/src/main/java/org/dinky/utils/FlinkConfigOptionsUtils.java index 92d6459e3b..9007b79fc7 100644 --- a/dinky-admin/src/main/java/org/dinky/utils/CascaderOptionsUtils.java +++ b/dinky-admin/src/main/java/org/dinky/utils/FlinkConfigOptionsUtils.java @@ -19,7 +19,7 @@ package org.dinky.utils; -import org.dinky.data.vo.CascaderVO; +import org.dinky.data.flink.config.FlinkConfigOption; import java.lang.reflect.Field; import java.lang.reflect.Modifier; @@ -29,26 +29,28 @@ import java.util.Map; import java.util.logging.Logger; +import cn.hutool.core.io.resource.ResourceUtil; import cn.hutool.core.util.ClassLoaderUtil; +import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.ReflectUtil; -public class CascaderOptionsUtils { - private static final Logger logger = Logger.getLogger(CascaderOptionsUtils.class.getName()); +public class FlinkConfigOptionsUtils { + private static final Logger logger = Logger.getLogger(FlinkConfigOptionsUtils.class.getName()); private static final String FLINK_CONFIG_REPLACE_SUFFIX = "Options"; - private static final Map> cache = new HashMap<>(); + private static final Map> cache = new HashMap<>(); /** * build flink config cascade options * * @param name */ - public static List buildCascadeOptions(String name) { + public static List loadOptionsByClassName(String name) { if (cache.containsKey(name) && cache.get(name) != null) { return cache.get(name); } - List dataList = new ArrayList<>(); + List configList = new ArrayList<>(); try { Class loadClass = ClassLoaderUtil.getContextClassLoader().loadClass(name); Field[] fields = ReflectUtil.getFields(loadClass, f -> { @@ -59,31 +61,26 @@ public static List buildCascadeOptions(String name) { return false; } }); - List configList = new ArrayList<>(); for (Field field : fields) { - CascaderVO config = new CascaderVO(); + FlinkConfigOption config = new FlinkConfigOption(); Object fieldValue = ReflectUtil.getStaticFieldValue(field); String key = ReflectUtil.invoke(fieldValue, "key"); - config.setValue(key); - config.setLabel(key); + Object defaultValue = ReflectUtil.invoke(fieldValue, "defaultValue"); + config.setKey(key); + if (ObjectUtil.isBasicType(defaultValue)) { + config.setDefaultValue(String.valueOf(defaultValue)); + } else { + config.setDefaultValue(""); + } configList.add(config); } - CascaderVO cascaderVO = new CascaderVO(); - cascaderVO.setChildren(configList); - // parsed binlog group - String parsedBinlogGroup = parsedBinlogGroup(name); - - cascaderVO.setLabel(parsedBinlogGroup); - cascaderVO.setValue(parsedBinlogGroup); - dataList.add(cascaderVO); - cache.put(name, dataList); } catch (ClassNotFoundException ignored) { logger.warning("get config option error, class not found: " + name); } - return dataList; + return configList; } - private static String parsedBinlogGroup(String name) { + public static String parsedBinlogGroup(String name) { String[] names = name.split("\\."); // delete Option suffix and return String lastName = names[names.length - 1]; @@ -92,4 +89,8 @@ private static String parsedBinlogGroup(String name) { } return lastName; } + + public static String[] getConfigOptionsClass() { + return ResourceUtil.readUtf8Str("FlinkConfClass").replace("\r", "").split("\n"); + } } diff --git a/dinky-admin/src/main/resources/FlinkConfClass b/dinky-admin/src/main/resources/FlinkConfClass new file mode 100644 index 0000000000..f613e94441 --- /dev/null +++ b/dinky-admin/src/main/resources/FlinkConfClass @@ -0,0 +1,30 @@ +org.apache.flink.configuration.CoreOptions +org.apache.flink.configuration.RestOptions +org.apache.flink.configuration.PipelineOptions +org.apache.flink.configuration.SecurityOptions +org.apache.flink.configuration.YarnConfigOptions +org.apache.flink.configuration.WebOptions +org.apache.flink.configuration.JobManagerOptions +org.apache.flink.configuration.TaskManagerOptions +org.apache.flink.configuration.HighAvailabilityOptions +org.apache.flink.configuration.KubernetesConfigOptions +org.apache.flink.configuration.ClusterOptions +org.apache.flink.configuration.StateBackendOptions +org.apache.flink.configuration.QueryableStateOptions +org.apache.flink.configuration.CheckpointingOptions +org.apache.flink.configuration.JMXServerOptions +org.apache.flink.configuration.HeartbeatManagerOptions +org.apache.flink.configuration.OptimizerOptions +org.apache.flink.configuration.AkkaOptions +org.apache.flink.configuration.AlgorithmOptions +org.apache.flink.configuration.BlobServerOptions +org.apache.flink.configuration.ExecutionOptions +org.apache.flink.configuration.ExternalResourceOptions +org.apache.flink.configuration.ResourceManagerOptions +org.apache.flink.configuration.HistoryServerOptions +org.apache.flink.configuration.MetricOptions +org.apache.flink.configuration.NettyShuffleEnvironmentOptions +org.apache.flink.configuration.RestartStrategyOptions +org.apache.flink.yarn.configuration.YarnConfigOptions +org.apache.flink.kubernetes.configuration.KubernetesConfigOptions +org.dinky.constant.CustomerConfigureOptions \ No newline at end of file diff --git a/dinky-assembly/src/main/assembly/package.xml b/dinky-assembly/src/main/assembly/package.xml index 3f8d2c1588..3f291bbcff 100644 --- a/dinky-assembly/src/main/assembly/package.xml +++ b/dinky-assembly/src/main/assembly/package.xml @@ -41,6 +41,7 @@ **/*.yaml **/log4j2.xml **/DinkyFlinkDockerfile + **/FlinkConfClass diff --git a/dinky-common/src/main/java/org/dinky/data/flink/config/FlinkConfigOption.java b/dinky-common/src/main/java/org/dinky/data/flink/config/FlinkConfigOption.java new file mode 100644 index 0000000000..5eccfbb05c --- /dev/null +++ b/dinky-common/src/main/java/org/dinky/data/flink/config/FlinkConfigOption.java @@ -0,0 +1,40 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.dinky.data.flink.config; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Builder +@AllArgsConstructor +@NoArgsConstructor +@Data +public class FlinkConfigOption { + /** The current key for that config option. */ + private String key; + + /** The default value for this option. */ + private String defaultValue; + + /** The description for this option. */ + private String description; +}