From 89a1baebc6ae3eddc92f2bc8d1a820a8ccffaf3e Mon Sep 17 00:00:00 2001 From: ZackYoung Date: Fri, 6 Sep 2024 14:50:00 +0800 Subject: [PATCH] [Feature][Flink] Add built -in Flink History Server to reduce the state of the Unknown, the final information of the Flink task is more accurate (#3780) Signed-off-by: Zzm0809 <934230207@qq.com> Co-authored-by: zackyoungh Co-authored-by: Zzm0809 <934230207@qq.com> --- .../dinky/controller/DownloadController.java | 41 +++++- .../org/dinky/init/FlinkHistoryServer.java | 137 ++++++++++++++++++ .../dinky/job/handler/JobRefreshHandler.java | 25 +++- .../org/dinky/service/JobInstanceService.java | 2 + .../service/impl/JobInstanceServiceImpl.java | 30 ++++ .../dinky/service/impl/TaskServiceImpl.java | 10 ++ .../dinky/resource/impl/HttpFileSystem.java | 107 ++++++++++++++ .../resource/impl/HttpFsDataOutputStream.java | 85 +++++++++++ .../resource/impl/LocalResourceManager.java | 7 + .../flink/yarn/YarnClusterDescriptor.java | 1 + .../webmonitor/history/HistoryServerUtil.java | 59 ++++++++ .../dinky/resource/BaseResourceManager.java | 6 +- .../resource/impl/HdfsResourceManager.java | 7 + .../resource/impl/LocalResourceManager.java | 8 + .../resource/impl/OssResourceManager.java | 28 ++++ .../org/dinky/url/ResourceFileSystem.java | 22 +-- .../java/org/dinky/data/enums/Status.java | 9 ++ .../org/dinky/data/model/Configuration.java | 4 +- .../S3Configuration.java} | 20 ++- .../dinky/data/model/SystemConfiguration.java | 37 +++++ .../main/java/org/dinky/utils/JsonUtils.java | 2 +- .../resources/i18n/messages_en_US.properties | 9 +- .../resources/i18n/messages_zh_CN.properties | 9 +- .../java/org/dinky/function/util/UDFUtil.java | 7 - .../dinky/metadata/config/PaimonConfig.java | 9 +- 25 files changed, 641 insertions(+), 40 deletions(-) create mode 100644 dinky-admin/src/main/java/org/dinky/init/FlinkHistoryServer.java create mode 100644 dinky-app/dinky-app-base/src/main/java/org/dinky/resource/impl/HttpFileSystem.java create mode 100644 dinky-app/dinky-app-base/src/main/java/org/dinky/resource/impl/HttpFsDataOutputStream.java create mode 100644 dinky-client/dinky-client-base/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerUtil.java rename dinky-common/src/main/java/org/dinky/data/{constant/SseConstant.java => model/S3Configuration.java} (63%) diff --git a/dinky-admin/src/main/java/org/dinky/controller/DownloadController.java b/dinky-admin/src/main/java/org/dinky/controller/DownloadController.java index 65794a7fb3..65a4672500 100644 --- a/dinky-admin/src/main/java/org/dinky/controller/DownloadController.java +++ b/dinky-admin/src/main/java/org/dinky/controller/DownloadController.java @@ -23,11 +23,15 @@ import org.dinky.data.constant.DirConstant; import org.dinky.data.exception.BusException; import org.dinky.data.model.FlinkUdfManifest; +import org.dinky.data.model.ResourcesModelEnum; +import org.dinky.data.model.SystemConfiguration; +import org.dinky.data.result.Result; import org.dinky.function.constant.PathConstant; import org.dinky.function.util.ZipWriter; import org.dinky.resource.BaseResourceManager; import java.io.File; +import java.io.IOException; import java.io.InputStream; import java.nio.charset.Charset; import java.util.List; @@ -37,9 +41,13 @@ import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; +import org.springframework.web.multipart.MultipartFile; +import cn.dev33.satoken.annotation.SaIgnore; import cn.hutool.core.collection.CollUtil; import cn.hutool.core.convert.Convert; import cn.hutool.core.io.FileUtil; @@ -58,7 +66,7 @@ @Api(tags = "UDF & App Jar Controller") @RequestMapping("/download") public class DownloadController { - + // todo: Controller has injection risk @GetMapping("downloadDepJar/{taskId}") @ApiOperation("Download UDF Jar") public void downloadJavaUDF(@PathVariable Integer taskId, HttpServletResponse resp) { @@ -100,9 +108,9 @@ public void downloadJavaUDF(@PathVariable Integer taskId, HttpServletResponse re } /** - * 提供docker通过http下载dinky-app.jar + * Provide Docker to download dinky-app.jar via HTTP request * - * @param version 版本 + * @param version version of dinky-app.jar * @param resp resp */ @GetMapping("downloadAppJar/{version}") @@ -117,8 +125,33 @@ public void downloadAppJar(@PathVariable String version, HttpServletResponse res @GetMapping("downloadFromRs") @ApiOperation("Download From Resource") - public void downloadJavaUDF(String path, HttpServletResponse resp) { + @SaIgnore + public void downloadFromRs(String path, HttpServletResponse resp) { InputStream inputStream = BaseResourceManager.getInstance().readFile(path); ServletUtil.write(resp, inputStream); } + + // todo: There is a risk of injection in this interface + @PostMapping("uploadFromRsByLocal") + @ApiOperation("Upload From Resource By Local") + @SaIgnore + public Result uploadFromRs(String path, @RequestParam("file") MultipartFile file) { + SystemConfiguration systemConfiguration = SystemConfiguration.getInstances(); + if (!systemConfiguration.getResourcesEnable().getValue() + || !systemConfiguration.getResourcesModel().getValue().equals(ResourcesModelEnum.LOCAL)) { + return Result.failed("resources model is not local or resources is not enable"); + } + + try { + File dest = new File(path); + if (!dest.getParentFile().exists()) { + dest.getParentFile().mkdirs(); + } + file.transferTo(dest); + return Result.succeed(); + } catch (IOException e) { + log.error("upload file failed", e); + throw new BusException("upload file failed"); + } + } } diff --git a/dinky-admin/src/main/java/org/dinky/init/FlinkHistoryServer.java b/dinky-admin/src/main/java/org/dinky/init/FlinkHistoryServer.java new file mode 100644 index 0000000000..6aaf689bb4 --- /dev/null +++ b/dinky-admin/src/main/java/org/dinky/init/FlinkHistoryServer.java @@ -0,0 +1,137 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.dinky.init; + +import org.dinky.data.model.ResourcesModelEnum; +import org.dinky.data.model.S3Configuration; +import org.dinky.data.model.SystemConfiguration; +import org.dinky.data.properties.OssProperties; +import org.dinky.service.JobInstanceService; +import org.dinky.service.SysConfigService; + +import org.apache.flink.runtime.webmonitor.history.HistoryServerUtil; + +import java.util.LinkedHashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; + +import org.springframework.boot.ApplicationArguments; +import org.springframework.boot.ApplicationRunner; +import org.springframework.core.annotation.Order; +import org.springframework.stereotype.Component; + +import cn.hutool.core.collection.CollUtil; +import cn.hutool.core.net.NetUtil; +import lombok.extern.slf4j.Slf4j; + +@Component +@Slf4j +@Order(value = 2) +public class FlinkHistoryServer implements ApplicationRunner { + public static final Set HISTORY_JOBID_SET = new LinkedHashSet<>(); + private final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor( + 5, 20, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100), new ThreadPoolExecutor.DiscardOldestPolicy()); + + private final Runnable historyRunnable; + private final SystemConfiguration systemConfiguration = SystemConfiguration.getInstances(); + private final SysConfigService sysConfigService; + + public FlinkHistoryServer(JobInstanceService jobInstanceService, SysConfigService sysConfigService) { + this.sysConfigService = sysConfigService; + this.historyRunnable = () -> { + Map flinkHistoryServerConfiguration = + SystemConfiguration.getInstances().getFlinkHistoryServerConfiguration(); + if (systemConfiguration.getResourcesEnable().getValue()) { + if (systemConfiguration.getResourcesModel().getValue().equals(ResourcesModelEnum.OSS)) { + OssProperties ossProperties = systemConfiguration.getOssProperties(); + flinkHistoryServerConfiguration.put(S3Configuration.ENDPOINT, ossProperties.getEndpoint()); + flinkHistoryServerConfiguration.put(S3Configuration.ACCESS_KEY, ossProperties.getAccessKey()); + flinkHistoryServerConfiguration.put(S3Configuration.SECRET_KEY, ossProperties.getSecretKey()); + flinkHistoryServerConfiguration.put( + S3Configuration.PATH_STYLE_ACCESS, String.valueOf(ossProperties.getPathStyleAccess())); + } + } + + HistoryServerUtil.run( + (jobId) -> { + HISTORY_JOBID_SET.add(jobId); + threadPoolExecutor.execute(() -> { + jobInstanceService.hookJobDoneByHistory(jobId); + }); + }, + flinkHistoryServerConfiguration); + }; + } + + @Override + public void run(ApplicationArguments args) throws Exception { + AtomicReference historyThread = new AtomicReference<>(new Thread(historyRunnable)); + Runnable closeHistory = () -> { + if (historyThread.get().isAlive()) { + historyThread.get().interrupt(); + HISTORY_JOBID_SET.clear(); + } + }; + + // Check if the port is available + Consumer checkAndUpdatePort = (port) -> { + if (!NetUtil.isUsableLocalPort(port)) { + int usableLocalPort = NetUtil.getUsableLocalPort(8000); + sysConfigService.updateSysConfigByKv( + systemConfiguration.getFlinkHistoryServerPort().getKey(), String.valueOf(usableLocalPort)); + } + }; + systemConfiguration.getFlinkHistoryServerPort().addChangeEvent(checkAndUpdatePort); + systemConfiguration.getFlinkHistoryServerPort().addParameterCheck(checkAndUpdatePort); + CollUtil.newArrayList( + systemConfiguration.getUseFlinkHistoryServer(), + systemConfiguration.getFlinkHistoryServerPort(), + systemConfiguration.getFlinkHistoryServerArchiveRefreshInterval()) + .forEach(x -> x.addChangeEvent(d -> { + if (systemConfiguration.getUseFlinkHistoryServer().getValue()) { + closeHistory.run(); + checkAndUpdatePort.accept( + systemConfiguration.getFlinkHistoryServerPort().getValue()); + historyThread + .updateAndGet((t) -> new Thread(historyRunnable)) + .start(); + + } else { + closeHistory.run(); + } + })); + if (systemConfiguration.getUseFlinkHistoryServer().getValue()) { + checkAndUpdatePort.accept( + systemConfiguration.getFlinkHistoryServerPort().getValue()); + try { + if (!historyThread.get().isAlive()) { + historyThread.get().start(); + } + } catch (Exception e) { + log.error("Flink history server start failed: ", e); + } + } + } +} diff --git a/dinky-admin/src/main/java/org/dinky/job/handler/JobRefreshHandler.java b/dinky-admin/src/main/java/org/dinky/job/handler/JobRefreshHandler.java index 36f15358c5..1990d5731c 100644 --- a/dinky-admin/src/main/java/org/dinky/job/handler/JobRefreshHandler.java +++ b/dinky-admin/src/main/java/org/dinky/job/handler/JobRefreshHandler.java @@ -19,6 +19,8 @@ package org.dinky.job.handler; +import static org.dinky.utils.JsonUtils.objectMapper; + import org.dinky.api.FlinkAPI; import org.dinky.assertion.Asserts; import org.dinky.cluster.FlinkClusterInfo; @@ -36,12 +38,14 @@ import org.dinky.data.flink.job.FlinkJobDetailInfo; import org.dinky.data.flink.watermark.FlinkJobNodeWaterMark; import org.dinky.data.model.ClusterInstance; +import org.dinky.data.model.SystemConfiguration; import org.dinky.data.model.ext.JobInfoDetail; import org.dinky.data.model.job.JobInstance; import org.dinky.gateway.Gateway; import org.dinky.gateway.config.GatewayConfig; import org.dinky.gateway.exception.NotSupportGetStatusException; import org.dinky.gateway.model.FlinkClusterConfig; +import org.dinky.init.FlinkHistoryServer; import org.dinky.job.JobConfig; import org.dinky.service.ClusterInstanceService; import org.dinky.service.HistoryService; @@ -52,6 +56,8 @@ import java.time.Duration; import java.time.LocalDateTime; +import java.util.ArrayList; +import java.util.List; import java.util.Optional; import org.springframework.context.annotation.DependsOn; @@ -59,6 +65,7 @@ import com.alibaba.fastjson2.JSON; import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.type.CollectionType; import cn.hutool.core.bean.BeanUtil; import cn.hutool.core.bean.copier.CopyOptions; @@ -223,6 +230,13 @@ public static boolean refreshJob(JobInfoDetail jobInfoDetail, boolean needSave) * @return {@link org.dinky.data.dto.JobDataDto}. */ public static JobDataDto getJobData(Integer id, String jobManagerHost, String jobId) { + if (FlinkHistoryServer.HISTORY_JOBID_SET.contains(jobId) + && SystemConfiguration.getInstances().getUseFlinkHistoryServer().getValue()) { + jobManagerHost = "127.0.0.1:" + + SystemConfiguration.getInstances() + .getFlinkHistoryServerPort() + .getValue(); + } JobDataDto.JobDataDtoBuilder builder = JobDataDto.builder(); FlinkAPI api = FlinkAPI.build(jobManagerHost); try { @@ -240,8 +254,15 @@ public static JobDataDto getJobData(Integer id, String jobManagerHost, String jo api.getVertices(jobId).forEach(vertex -> { flinkJobDetailInfo.getPlan().getNodes().forEach(planNode -> { if (planNode.getId().equals(vertex)) { - planNode.setWatermark( - JsonUtils.toList(api.getWatermark(jobId, vertex), FlinkJobNodeWaterMark.class)); + try { + CollectionType listType = objectMapper + .getTypeFactory() + .constructCollectionType(ArrayList.class, FlinkJobNodeWaterMark.class); + List watermark = + objectMapper.readValue(api.getWatermark(jobId, vertex), listType); + planNode.setWatermark(watermark); + } catch (Exception ignored) { + } planNode.setBackpressure(JsonUtils.toJavaBean( api.getBackPressure(jobId, vertex), FlinkJobNodeBackPressure.class)); } diff --git a/dinky-admin/src/main/java/org/dinky/service/JobInstanceService.java b/dinky-admin/src/main/java/org/dinky/service/JobInstanceService.java index 204ccafc55..1542364bfa 100644 --- a/dinky-admin/src/main/java/org/dinky/service/JobInstanceService.java +++ b/dinky-admin/src/main/java/org/dinky/service/JobInstanceService.java @@ -94,6 +94,8 @@ public interface JobInstanceService extends ISuperService { */ boolean hookJobDone(String jobId, Integer taskId); + boolean hookJobDoneByHistory(String jobId); + /** * Refresh the job instances for the given task IDs. * diff --git a/dinky-admin/src/main/java/org/dinky/service/impl/JobInstanceServiceImpl.java b/dinky-admin/src/main/java/org/dinky/service/impl/JobInstanceServiceImpl.java index 7a0a4ad9e7..75454bb37e 100644 --- a/dinky-admin/src/main/java/org/dinky/service/impl/JobInstanceServiceImpl.java +++ b/dinky-admin/src/main/java/org/dinky/service/impl/JobInstanceServiceImpl.java @@ -66,6 +66,7 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import cn.hutool.core.util.StrUtil; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -245,6 +246,35 @@ public boolean hookJobDone(String jobId, Integer taskId) { return isDone; } + @Override + public boolean hookJobDoneByHistory(String jobId) { + LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper<>(); + queryWrapper + .eq(JobInstance::getJid, jobId) + .orderByDesc(JobInstance::getCreateTime) + .last("limit 1"); + JobInstance instance = baseMapper.selectOne(queryWrapper); + + if (instance == null + || !StrUtil.equalsAny( + instance.getStatus(), JobStatus.RECONNECTING.getValue(), JobStatus.UNKNOWN.getValue())) { + // Not having a corresponding jobinstance means that this may not have succeeded in running, + // returning true to prevent retry. + return true; + } + + DaemonTaskConfig config = DaemonTaskConfig.build(FlinkJobTask.TYPE, instance.getId()); + DaemonTask daemonTask = FlinkJobThreadPool.getInstance().removeByTaskConfig(config); + daemonTask = Optional.ofNullable(daemonTask).orElse(DaemonTask.build(config)); + + boolean isDone = daemonTask.dealTask(); + // If the task is not completed, it is re-queued + if (!isDone) { + FlinkJobThreadPool.getInstance().execute(daemonTask); + } + return isDone; + } + @Override public void refreshJobByTaskIds(Integer... taskIds) { for (Integer taskId : taskIds) { diff --git a/dinky-admin/src/main/java/org/dinky/service/impl/TaskServiceImpl.java b/dinky-admin/src/main/java/org/dinky/service/impl/TaskServiceImpl.java index 6e0946dbd8..c91a3cef51 100644 --- a/dinky-admin/src/main/java/org/dinky/service/impl/TaskServiceImpl.java +++ b/dinky-admin/src/main/java/org/dinky/service/impl/TaskServiceImpl.java @@ -19,6 +19,8 @@ package org.dinky.service.impl; +import static org.dinky.data.model.SystemConfiguration.FLINK_JOB_ARCHIVE; + import org.dinky.assertion.Asserts; import org.dinky.assertion.DinkyAssert; import org.dinky.config.Dialect; @@ -233,6 +235,14 @@ public JobConfig buildJobSubmitConfig(TaskDTO task) { // When disabling checkpoints, delete the checkpoint path config.setSavePointPath(null); } + if (SystemConfiguration.getInstances().getUseFlinkHistoryServer().getValue()) { + config.getConfigJson().compute("jobmanager.archive.fs.dir", (k, v) -> { + if (StringUtils.isNotBlank(v)) { + return v + "," + FLINK_JOB_ARCHIVE; + } + return FLINK_JOB_ARCHIVE; + }); + } if (GatewayType.get(task.getType()).isDeployCluster()) { log.info("Init gateway config, type:{}", task.getType()); FlinkClusterConfig flinkClusterCfg = diff --git a/dinky-app/dinky-app-base/src/main/java/org/dinky/resource/impl/HttpFileSystem.java b/dinky-app/dinky-app-base/src/main/java/org/dinky/resource/impl/HttpFileSystem.java new file mode 100644 index 0000000000..d2acd3e07a --- /dev/null +++ b/dinky-app/dinky-app-base/src/main/java/org/dinky/resource/impl/HttpFileSystem.java @@ -0,0 +1,107 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.dinky.resource.impl; + +import org.apache.flink.core.fs.BlockLocation; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.FileSystemKind; +import org.apache.flink.core.fs.Path; + +import java.io.IOException; +import java.net.URI; + +public class HttpFileSystem extends FileSystem { + public static final HttpFileSystem INSTANCE = new HttpFileSystem(); + + private HttpFileSystem() {} + + @Override + public Path getWorkingDirectory() { + return null; + } + + @Override + public Path getHomeDirectory() { + return null; + } + + @Override + public URI getUri() { + return null; + } + + @Override + public FileStatus getFileStatus(Path f) throws IOException { + return null; + } + + @Override + public BlockLocation[] getFileBlockLocations(FileStatus file, long start, long len) throws IOException { + return new BlockLocation[0]; + } + + @Override + public FSDataInputStream open(Path f, int bufferSize) throws IOException { + return null; + } + + @Override + public FSDataInputStream open(Path f) throws IOException { + return null; + } + + @Override + public FileStatus[] listStatus(Path f) throws IOException { + return new FileStatus[0]; + } + + @Override + public boolean delete(Path f, boolean recursive) throws IOException { + return false; + } + + @Override + public boolean mkdirs(Path f) throws IOException { + return false; + } + + @Override + public FSDataOutputStream create(Path f, WriteMode overwriteMode) throws IOException { + return new HttpFsDataOutputStream(f); + } + + @Override + public boolean rename(Path src, Path dst) throws IOException { + return false; + } + + @Override + public boolean isDistributedFS() { + return false; + } + + @Override + public FileSystemKind getKind() { + return null; + } +} diff --git a/dinky-app/dinky-app-base/src/main/java/org/dinky/resource/impl/HttpFsDataOutputStream.java b/dinky-app/dinky-app-base/src/main/java/org/dinky/resource/impl/HttpFsDataOutputStream.java new file mode 100644 index 0000000000..821925d5be --- /dev/null +++ b/dinky-app/dinky-app-base/src/main/java/org/dinky/resource/impl/HttpFsDataOutputStream.java @@ -0,0 +1,85 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.dinky.resource.impl; + +import org.dinky.data.model.SystemConfiguration; + +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.fs.local.LocalDataOutputStream; + +import java.io.File; +import java.io.IOException; +import java.util.UUID; + +import cn.hutool.core.io.FileUtil; +import cn.hutool.http.HttpResponse; +import cn.hutool.http.HttpUtil; + +public class HttpFsDataOutputStream extends FSDataOutputStream { + private final String uuid = UUID.randomUUID().toString(); + + private final Path path; + final File file = FileUtil.file(FileUtil.getTmpDir(), "/dinky-tmp/" + uuid + ".tmp"); + final LocalDataOutputStream localDataOutputStream; + + public HttpFsDataOutputStream(Path f) throws IOException { + this.path = f; + FileUtil.mkParentDirs(file); + localDataOutputStream = new LocalDataOutputStream(file); + } + + @Override + public void write(int b) throws IOException { + localDataOutputStream.write(b); + } + + @Override + public long getPos() throws IOException { + return localDataOutputStream.getPos(); + } + + @Override + public void flush() throws IOException { + localDataOutputStream.flush(); + sendFile(); + } + + private void sendFile() { + try (HttpResponse httpResponse = HttpUtil.createPost( + SystemConfiguration.getInstances().getDinkyAddr().getValue() + "/download/uploadFromRsByLocal") + .form("file", file) + .form("path", path.toString()) + .execute()) { + httpResponse.body(); + } + } + + @Override + public void sync() throws IOException { + localDataOutputStream.sync(); + } + + @Override + public void close() throws IOException { + localDataOutputStream.close(); + sendFile(); + } +} diff --git a/dinky-app/dinky-app-base/src/main/java/org/dinky/resource/impl/LocalResourceManager.java b/dinky-app/dinky-app-base/src/main/java/org/dinky/resource/impl/LocalResourceManager.java index a762dd2b7c..c01bc2d1e6 100644 --- a/dinky-app/dinky-app-base/src/main/java/org/dinky/resource/impl/LocalResourceManager.java +++ b/dinky-app/dinky-app-base/src/main/java/org/dinky/resource/impl/LocalResourceManager.java @@ -24,6 +24,8 @@ import org.dinky.data.model.SystemConfiguration; import org.dinky.resource.BaseResourceManager; +import org.apache.flink.core.fs.FileSystem; + import java.io.BufferedInputStream; import java.io.File; import java.io.IOException; @@ -138,4 +140,9 @@ public InputStream readFile(String path) { throw new RuntimeException(e); } } + + @Override + public FileSystem getFileSystem() { + return HttpFileSystem.INSTANCE; + } } diff --git a/dinky-client/dinky-client-1.19/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java b/dinky-client/dinky-client-1.19/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java index 2acf26c18e..05d79728cd 100644 --- a/dinky-client/dinky-client-1.19/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java +++ b/dinky-client/dinky-client-1.19/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java @@ -156,6 +156,7 @@ /** The descriptor with deployment information for deploying a Flink cluster on Yarn. */ public class YarnClusterDescriptor implements ClusterDescriptor { private static final Logger LOG = LoggerFactory.getLogger(YarnClusterDescriptor.class); + public static final String pathSeparator = ":"; @VisibleForTesting static final String IGNORE_UNRECOGNIZED_VM_OPTIONS = "-XX:+IgnoreUnrecognizedVMOptions"; diff --git a/dinky-client/dinky-client-base/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerUtil.java b/dinky-client/dinky-client-base/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerUtil.java new file mode 100644 index 0000000000..025bf90b95 --- /dev/null +++ b/dinky-client/dinky-client-base/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerUtil.java @@ -0,0 +1,59 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.runtime.webmonitor.history; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.plugin.PluginUtils; +import org.apache.flink.util.FlinkException; + +import java.io.IOException; +import java.util.Map; +import java.util.Optional; +import java.util.function.Consumer; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public final class HistoryServerUtil { + + public static void run(Consumer jobIdEventListener, Map config) { + log.info("Starting Flink History service...."); + HistoryServer hs; + try { + org.apache.flink.configuration.Configuration configuration = Configuration.fromMap(config); + FileSystem.initialize(configuration, PluginUtils.createPluginManagerFromRootFolder(configuration)); + + hs = new HistoryServer(configuration, (event) -> { + if (event.getType() == HistoryServerArchiveFetcher.ArchiveEventType.CREATED) { + Optional.ofNullable(jobIdEventListener).ifPresent(listener -> listener.accept(event.getJobID())); + } + }); + log.info("Flink History service started successfully."); + } catch (IOException | FlinkException e) { + log.error( + "The Flink History service failed to start with the following error message: {}", + e.getMessage(), + e); + throw new RuntimeException(e); + } + hs.run(); + } +} diff --git a/dinky-client/dinky-client-base/src/main/java/org/dinky/resource/BaseResourceManager.java b/dinky-client/dinky-client-base/src/main/java/org/dinky/resource/BaseResourceManager.java index de6c733a3a..af9b0e7935 100644 --- a/dinky-client/dinky-client-base/src/main/java/org/dinky/resource/BaseResourceManager.java +++ b/dinky-client/dinky-client-base/src/main/java/org/dinky/resource/BaseResourceManager.java @@ -31,6 +31,7 @@ import org.apache.hadoop.fs.FileSystem; import java.io.File; +import java.io.IOException; import java.io.InputStream; import java.nio.charset.Charset; import java.util.List; @@ -58,6 +59,8 @@ public interface BaseResourceManager { InputStream readFile(String path); + org.apache.flink.core.fs.FileSystem getFileSystem() throws IOException; + static BaseResourceManager getInstance() { switch (SystemConfiguration.getInstances().getResourcesModel().getValue()) { case HDFS: @@ -65,9 +68,8 @@ static BaseResourceManager getInstance() { case OSS: return Singleton.get(OssResourceManager.class); case LOCAL: - return Singleton.get(LocalResourceManager.class); default: - return null; + return Singleton.get(LocalResourceManager.class); } } diff --git a/dinky-client/dinky-client-base/src/main/java/org/dinky/resource/impl/HdfsResourceManager.java b/dinky-client/dinky-client-base/src/main/java/org/dinky/resource/impl/HdfsResourceManager.java index b413b55f26..bb6178ffec 100644 --- a/dinky-client/dinky-client-base/src/main/java/org/dinky/resource/impl/HdfsResourceManager.java +++ b/dinky-client/dinky-client-base/src/main/java/org/dinky/resource/impl/HdfsResourceManager.java @@ -24,6 +24,7 @@ import org.dinky.data.model.ResourcesVO; import org.dinky.resource.BaseResourceManager; +import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -37,6 +38,7 @@ import cn.hutool.core.io.FileUtil; import cn.hutool.core.io.IoUtil; +import cn.hutool.core.lang.Singleton; import cn.hutool.core.util.ArrayUtil; import cn.hutool.core.util.StrUtil; @@ -140,6 +142,11 @@ public InputStream readFile(String path) { } } + @Override + public org.apache.flink.core.fs.FileSystem getFileSystem() { + return Singleton.get(HadoopFileSystem.class, getHdfs()); + } + public FileSystem getHdfs() { if (hdfs == null && instances.getResourcesEnable().getValue()) { throw new BusException(Status.RESOURCE_HDFS_CONFIGURATION_ERROR); diff --git a/dinky-client/dinky-client-base/src/main/java/org/dinky/resource/impl/LocalResourceManager.java b/dinky-client/dinky-client-base/src/main/java/org/dinky/resource/impl/LocalResourceManager.java index 74390edf1f..e792f4f396 100644 --- a/dinky-client/dinky-client-base/src/main/java/org/dinky/resource/impl/LocalResourceManager.java +++ b/dinky-client/dinky-client-base/src/main/java/org/dinky/resource/impl/LocalResourceManager.java @@ -24,6 +24,9 @@ import org.dinky.data.model.ResourcesVO; import org.dinky.resource.BaseResourceManager; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.local.LocalFileSystem; + import java.io.BufferedInputStream; import java.io.File; import java.io.IOException; @@ -127,4 +130,9 @@ public List getFullDirectoryStructure(int rootId) { public InputStream readFile(String path) { return FileUtil.getInputStream(getFilePath(path)); } + + @Override + public FileSystem getFileSystem() { + return LocalFileSystem.getSharedInstance(); + } } diff --git a/dinky-client/dinky-client-base/src/main/java/org/dinky/resource/impl/OssResourceManager.java b/dinky-client/dinky-client-base/src/main/java/org/dinky/resource/impl/OssResourceManager.java index 6639b53051..e85c40684e 100644 --- a/dinky-client/dinky-client-base/src/main/java/org/dinky/resource/impl/OssResourceManager.java +++ b/dinky-client/dinky-client-base/src/main/java/org/dinky/resource/impl/OssResourceManager.java @@ -22,10 +22,18 @@ import org.dinky.data.enums.Status; import org.dinky.data.exception.BusException; import org.dinky.data.model.ResourcesVO; +import org.dinky.data.model.S3Configuration; +import org.dinky.data.model.SystemConfiguration; +import org.dinky.data.properties.OssProperties; import org.dinky.oss.OssTemplate; import org.dinky.resource.BaseResourceManager; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.fs.s3presto.S3FileSystemFactory; + import java.io.File; +import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; import java.util.HashMap; @@ -39,6 +47,7 @@ import cn.hutool.core.io.FileUtil; import cn.hutool.core.io.IoUtil; import cn.hutool.core.util.StrUtil; +import cn.hutool.core.util.URLUtil; public class OssResourceManager implements BaseResourceManager { OssTemplate ossTemplate; @@ -134,6 +143,25 @@ public InputStream readFile(String path) { .getObjectContent(); } + private FileSystem fileSystem; + + @Override + public FileSystem getFileSystem() throws IOException { + if (fileSystem == null) { + SystemConfiguration systemConfiguration = SystemConfiguration.getInstances(); + S3FileSystemFactory s3FileSystemFactory = new S3FileSystemFactory(); + OssProperties ossProperties = systemConfiguration.getOssProperties(); + Configuration config = new Configuration(); + config.setString(S3Configuration.ENDPOINT, ossProperties.getEndpoint()); + config.setString(S3Configuration.ACCESS_KEY, ossProperties.getAccessKey()); + config.setString(S3Configuration.SECRET_KEY, ossProperties.getSecretKey()); + config.setString(S3Configuration.PATH_STYLE_ACCESS, String.valueOf(ossProperties.getPathStyleAccess())); + s3FileSystemFactory.configure(config); + fileSystem = s3FileSystemFactory.create(URLUtil.toURI("s3://" + ossProperties.getBucketName())); + } + return fileSystem; + } + public OssTemplate getOssTemplate() { if (ossTemplate == null && instances.getResourcesEnable().getValue()) { throw new BusException(Status.RESOURCE_OSS_CONFIGURATION_ERROR); diff --git a/dinky-client/dinky-client-base/src/main/java/org/dinky/url/ResourceFileSystem.java b/dinky-client/dinky-client-base/src/main/java/org/dinky/url/ResourceFileSystem.java index 009ee2d50c..cfd19bcfbc 100644 --- a/dinky-client/dinky-client-base/src/main/java/org/dinky/url/ResourceFileSystem.java +++ b/dinky-client/dinky-client-base/src/main/java/org/dinky/url/ResourceFileSystem.java @@ -39,7 +39,6 @@ @Slf4j public class ResourceFileSystem extends FileSystem { - private final BaseResourceManager BASE_RESOURCE_MANAGER; public static final URI URI_SCHEMA = URI.create("rs:/"); private static ResourceFileSystem INSTANCE; @@ -51,8 +50,8 @@ public static synchronized ResourceFileSystem getInstance() { return INSTANCE; } - public ResourceFileSystem() { - this.BASE_RESOURCE_MANAGER = BaseResourceManager.getInstance(); + public BaseResourceManager getBaseResourceManager() { + return BaseResourceManager.getInstance(); } @Override @@ -76,7 +75,7 @@ public FileStatus getFileStatus(Path f) throws IOException { } protected File getFile(Path f) { - return new File(BASE_RESOURCE_MANAGER.getFilePath(f.getPath())); + return new File(getBaseResourceManager().getFilePath(f.getPath())); } @Override @@ -91,18 +90,22 @@ public FSDataInputStream open(Path f, int bufferSize) throws IOException { @Override public FSDataInputStream open(Path f) throws IOException { - return new InputStreamFSInputWrapper(BASE_RESOURCE_MANAGER.readFile(f.getPath())); + return new InputStreamFSInputWrapper(getBaseResourceManager().readFile(f.getPath())); } @Override public FileStatus[] listStatus(Path f) throws IOException { - return new FileStatus[0]; + Path path = new Path(getBaseResourceManager().getFilePath(f.getPath())); + if (!getBaseResourceManager().getFileSystem().exists(path)) { + return new FileStatus[0]; + } + return getBaseResourceManager().getFileSystem().listStatus(path); } @Override public boolean delete(Path f, boolean recursive) throws IOException { try { - BASE_RESOURCE_MANAGER.remove(f.getPath()); + getBaseResourceManager().remove(f.getPath()); return true; } catch (Exception e) { log.error("delete file failed, path: {}", f.getPath(), e); @@ -117,13 +120,14 @@ public boolean mkdirs(Path f) throws IOException { @Override public FSDataOutputStream create(Path f, WriteMode overwriteMode) throws IOException { - return null; + Path path = new Path(getBaseResourceManager().getFilePath(f.getPath())); + return getBaseResourceManager().getFileSystem().create(path, overwriteMode); } @Override public boolean rename(Path src, Path dst) throws IOException { try { - BASE_RESOURCE_MANAGER.rename(src.getPath(), dst.getPath()); + getBaseResourceManager().rename(src.getPath(), dst.getPath()); return true; } catch (Exception e) { log.error("rename file failed, src: {}, dst: {}", src.getPath(), dst.getPath(), e); diff --git a/dinky-common/src/main/java/org/dinky/data/enums/Status.java b/dinky-common/src/main/java/org/dinky/data/enums/Status.java index ef58f83f59..85ff1a18dd 100644 --- a/dinky-common/src/main/java/org/dinky/data/enums/Status.java +++ b/dinky-common/src/main/java/org/dinky/data/enums/Status.java @@ -448,6 +448,15 @@ public enum Status { PROCESS_REGISTER_EXITS(196, "process.register.exits"), PROCESS_CLEAR_LOG_SUCCESS(198, "process.clear.log.success"), PROCESS_CLEAR_LOG_FAILED(199, "process.clear.log.failed"), + + SYS_FLINK_SETTINGS_USE_FLINK_HISTORY_SERVER(200, "sys.flink.settings.useFlinkHistoryServer"), + SYS_FLINK_SETTINGS_USE_FLINK_HISTORY_SERVER_NOTE(201, "sys.flink.settings.useFlinkHistoryServer.note"), + SYS_FLINK_SETTINGS_FLINK_HISTORY_SERVER_PORT(202, "sys.flink.settings.flinkHistoryServerPort"), + SYS_FLINK_SETTINGS_FLINK_HISTORY_SERVER_PORT_NOTE(203, "sys.flink.settings.flinkHistoryServerPort.note"), + SYS_FLINK_SETTINGS_FLINK_HISTORY_SERVER_ARCHIVE_REFRESH_INTERVAL( + 204, "sys.flink.settings.flinkHistoryServerArchiveRefreshInterval"), + SYS_FLINK_SETTINGS_FLINK_HISTORY_SERVER_ARCHIVE_REFRESH_INTERVAL_NOTE( + 205, "sys.flink.settings.flinkHistoryServerArchiveRefreshInterval.note"), ; private final int code; private final String key; diff --git a/dinky-common/src/main/java/org/dinky/data/model/Configuration.java b/dinky-common/src/main/java/org/dinky/data/model/Configuration.java index fe3892fb28..d8ed826b9c 100644 --- a/dinky-common/src/main/java/org/dinky/data/model/Configuration.java +++ b/dinky-common/src/main/java/org/dinky/data/model/Configuration.java @@ -39,9 +39,11 @@ import lombok.AllArgsConstructor; import lombok.Getter; import lombok.Setter; +import lombok.extern.slf4j.Slf4j; @Getter @Setter +@Slf4j public class Configuration implements Serializable { private String key; private String name; @@ -190,7 +192,7 @@ public void runChangeEvent() { try { x.accept(getValue()); } catch (Exception e) { - e.printStackTrace(); + log.error("", e); } }); } diff --git a/dinky-common/src/main/java/org/dinky/data/constant/SseConstant.java b/dinky-common/src/main/java/org/dinky/data/model/S3Configuration.java similarity index 63% rename from dinky-common/src/main/java/org/dinky/data/constant/SseConstant.java rename to dinky-common/src/main/java/org/dinky/data/model/S3Configuration.java index e6610ee17d..6a0d0c6890 100644 --- a/dinky-common/src/main/java/org/dinky/data/constant/SseConstant.java +++ b/dinky-common/src/main/java/org/dinky/data/model/S3Configuration.java @@ -17,13 +17,17 @@ * */ -package org.dinky.data.constant; +package org.dinky.data.model; -public class SseConstant { - /** - * Sse label for front to reconnect session - */ - public static final String SSE_SESSION_INVALID = "SESSION_INVALID"; - - public static final String HEART_TOPIC = "HEART_BEAT"; +/** + * 配置类 S3Configuration 用于存储 S3 配置信息 + * + */ +public class S3Configuration { + public static String ACCESS_KEY = "s3.access-key"; + public static String SECRET_KEY = "s3.secret-key"; + public static String ENDPOINT = "s3.endpoint"; + public static String BUCKET_NAME = "s3.bucket-name"; + public static String PATH_STYLE_ACCESS = "s3.path.style.access"; + public static String REGION = "s3.region"; } diff --git a/dinky-common/src/main/java/org/dinky/data/model/SystemConfiguration.java b/dinky-common/src/main/java/org/dinky/data/model/SystemConfiguration.java index 62a1af8f17..0c4290a70f 100644 --- a/dinky-common/src/main/java/org/dinky/data/model/SystemConfiguration.java +++ b/dinky-common/src/main/java/org/dinky/data/model/SystemConfiguration.java @@ -21,6 +21,7 @@ import org.dinky.context.EngineContextHolder; import org.dinky.data.constant.CommonConstant; +import org.dinky.data.constant.DirConstant; import org.dinky.data.enums.Status; import org.dinky.data.enums.TaskOwnerAlertStrategyEnum; import org.dinky.data.enums.TaskOwnerLockStrategyEnum; @@ -28,6 +29,7 @@ import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.TreeMap; @@ -35,6 +37,7 @@ import java.util.stream.Collectors; import cn.hutool.core.convert.Convert; +import cn.hutool.core.io.FileUtil; import cn.hutool.core.lang.Opt; import cn.hutool.core.util.DesensitizedUtil; import cn.hutool.core.util.ReflectUtil; @@ -76,6 +79,21 @@ public static Configuration.OptionBuilder key(Status status) { .defaultValue(30) .note(Status.SYS_FLINK_SETTINGS_JOBIDWAIT_NOTE); + private final Configuration useFlinkHistoryServer = key(Status.SYS_FLINK_SETTINGS_USE_FLINK_HISTORY_SERVER) + .booleanType() + .defaultValue(true) + .note(Status.SYS_FLINK_SETTINGS_USE_FLINK_HISTORY_SERVER_NOTE); + private final Configuration flinkHistoryServerPort = + key(Status.SYS_FLINK_SETTINGS_FLINK_HISTORY_SERVER_PORT) + .intType() + .defaultValue(8082) + .note(Status.SYS_FLINK_SETTINGS_FLINK_HISTORY_SERVER_PORT_NOTE); + private final Configuration flinkHistoryServerArchiveRefreshInterval = + key(Status.SYS_FLINK_SETTINGS_FLINK_HISTORY_SERVER_ARCHIVE_REFRESH_INTERVAL) + .intType() + .defaultValue(5000) + .note(Status.SYS_FLINK_SETTINGS_FLINK_HISTORY_SERVER_ARCHIVE_REFRESH_INTERVAL_NOTE); + private final Configuration mavenSettings = key(Status.SYS_MAVEN_SETTINGS_SETTINGSFILEPATH) .stringType() .defaultValue("") @@ -404,4 +422,23 @@ public OssProperties getOssProperties() { public TaskOwnerLockStrategyEnum getTaskOwnerLockStrategy() { return taskOwnerLockStrategy.getValue(); } + + public static final String FLINK_JOB_ARCHIVE = "rs:/tmp/flink-job-archive"; + + public Map getFlinkHistoryServerConfiguration() { + Map config = new HashMap<>(); + if (useFlinkHistoryServer.getValue()) { + config.put( + "historyserver.web.port", flinkHistoryServerPort.getValue().toString()); + config.put( + "historyserver.archive.fs.refresh-interval", + flinkHistoryServerArchiveRefreshInterval.getValue().toString()); + config.put( + "historyserver.web.tmpdir", + FileUtil.file(DirConstant.getTempRootDir(), "flink-job-archive") + .getAbsolutePath()); + config.put("historyserver.archive.fs.dir", FLINK_JOB_ARCHIVE); + } + return config; + } } diff --git a/dinky-common/src/main/java/org/dinky/utils/JsonUtils.java b/dinky-common/src/main/java/org/dinky/utils/JsonUtils.java index a22069cb08..9e3bdcaf80 100644 --- a/dinky-common/src/main/java/org/dinky/utils/JsonUtils.java +++ b/dinky-common/src/main/java/org/dinky/utils/JsonUtils.java @@ -68,7 +68,7 @@ public class JsonUtils { private static final Logger logger = LoggerFactory.getLogger(JsonUtils.class); - private static final ObjectMapper objectMapper = new ObjectMapper() + public static final ObjectMapper objectMapper = new ObjectMapper() .configure(FAIL_ON_UNKNOWN_PROPERTIES, false) .configure(ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT, true) .configure(READ_UNKNOWN_ENUM_VALUES_AS_NULL, true) diff --git a/dinky-common/src/main/resources/i18n/messages_en_US.properties b/dinky-common/src/main/resources/i18n/messages_en_US.properties index d1ce15d0e6..1d24fd9618 100644 --- a/dinky-common/src/main/resources/i18n/messages_en_US.properties +++ b/dinky-common/src/main/resources/i18n/messages_en_US.properties @@ -308,4 +308,11 @@ file.path.visit.failed=File path access failed, please check whether the path is resource.hdfs.configuration.error=Resource configuration error, HDFS is not enabled resource.oss.configuration.error=Resource configuration error, OSS is not enabled resource.root.dir.not.exist=The root directory does not exist, please check the database -resource.folder.exists=The folder already exists \ No newline at end of file +resource.folder.exists=The folder already exists + +sys.flink.settings.useFlinkHistoryServer=Use Flink History Server +sys.flink.settings.useFlinkHistoryServer.note=This feature will have a built-in Flink History Server in Dinky, which can be used to query the history of Flink tasks, so that Flink tasks can reduce the UNKNOWN status and enter the last status information of Flink tasks +sys.flink.settings.flinkHistoryServerPort=Flink History Server Port +sys.flink.settings.flinkHistoryServerPort.note=Flink History Server Port,For example, 8082, make sure that the port is not occupied +sys.flink.settings.flinkHistoryServerArchiveRefreshInterval= Flink History Server refresh Interval +sys.flink.settings.flinkHistoryServerArchiveRefreshInterval.note=For example, 10,000 refresh interval of the Flink History Server is refreshed every 10 seconds diff --git a/dinky-common/src/main/resources/i18n/messages_zh_CN.properties b/dinky-common/src/main/resources/i18n/messages_zh_CN.properties index 1bd66abddf..0046e567dd 100644 --- a/dinky-common/src/main/resources/i18n/messages_zh_CN.properties +++ b/dinky-common/src/main/resources/i18n/messages_zh_CN.properties @@ -309,4 +309,11 @@ file.path.visit.failed=文件路径访问失败,请检查路径在对应存储 resource.hdfs.configuration.error=资源配置错误,未启用HDFS resource.oss.configuration.error=资源配置错误,未启用OSS resource.root.dir.not.exist=根目录不存在,请检查数据库 -resource.folder.exists=文件夹已存在 \ No newline at end of file +resource.folder.exists=文件夹已存在 + +sys.flink.settings.useFlinkHistoryServer=使用 Flink History Server +sys.flink.settings.useFlinkHistoryServer.note=此功能会在 Dinky 里面内置一个Flink History Server ,作用于 Flink 任务的历史查询,使 Flink 任务减少 UNKNOWN 状态的情况,并打入 Flink 任务最后的状态信息 +sys.flink.settings.flinkHistoryServerPort=Flink History Server 端口 +sys.flink.settings.flinkHistoryServerPort.note=Flink History Server 端口,例如:8082,确保端口没有被占用 +sys.flink.settings.flinkHistoryServerArchiveRefreshInterval= Flink History Server 刷新间隔 +sys.flink.settings.flinkHistoryServerArchiveRefreshInterval.note=Flink History Server 刷新间隔,单位:毫秒,例如:10000,表示每隔10秒刷新一次 diff --git a/dinky-function/src/main/java/org/dinky/function/util/UDFUtil.java b/dinky-function/src/main/java/org/dinky/function/util/UDFUtil.java index 9c2decc98a..f19a8c0f8d 100644 --- a/dinky-function/src/main/java/org/dinky/function/util/UDFUtil.java +++ b/dinky-function/src/main/java/org/dinky/function/util/UDFUtil.java @@ -357,13 +357,6 @@ public static UDF toUDF(String statement, DinkyClassLoader classLoader) { // create FlinkUdfPathContextHolder from UdfCodePool public static FlinkUdfPathContextHolder createFlinkUdfPathContextHolder() { FlinkUdfPathContextHolder udfPathContextHolder = new FlinkUdfPathContextHolder(); - UdfCodePool.getUdfCodePool().values().forEach(udf -> { - if (udf.getFunctionLanguage() == FunctionLanguage.PYTHON) { - udfPathContextHolder.addPyUdfPath(new File(udf.getCode())); - } else { - udfPathContextHolder.addUdfPath(new File(udf.getCode())); - } - }); UdfCodePool.getGitPool().values().forEach(gitPackage -> { if ("jar".equals(FileUtil.getSuffix(gitPackage))) { diff --git a/dinky-metadata/dinky-metadata-paimon/src/main/java/org/dinky/metadata/config/PaimonConfig.java b/dinky-metadata/dinky-metadata-paimon/src/main/java/org/dinky/metadata/config/PaimonConfig.java index 56d6b86554..6f636b7480 100644 --- a/dinky-metadata/dinky-metadata-paimon/src/main/java/org/dinky/metadata/config/PaimonConfig.java +++ b/dinky-metadata/dinky-metadata-paimon/src/main/java/org/dinky/metadata/config/PaimonConfig.java @@ -20,6 +20,7 @@ package org.dinky.metadata.config; import org.dinky.data.model.CustomConfig; +import org.dinky.data.model.S3Configuration; import org.apache.paimon.options.Options; @@ -48,10 +49,10 @@ public Options getOptions() { options.set("warehouse", warehouse); if (Objects.requireNonNull(FileSystemType.fromType(fileSystemType)) == FileSystemType.S3) { if (s3 != null) { - options.set("s3.endpoint", s3.getEndpoint()); - options.set("s3.access-key", s3.getAccessKey()); - options.set("s3.secret-key", s3.getSecretKey()); - options.set("s3.path.style.access", String.valueOf(s3.isPathStyle())); + options.set(S3Configuration.ENDPOINT, s3.getEndpoint()); + options.set(S3Configuration.ACCESS_KEY, s3.getAccessKey()); + options.set(S3Configuration.SECRET_KEY, s3.getSecretKey()); + options.set(S3Configuration.PATH_STYLE_ACCESS, String.valueOf(s3.isPathStyle())); } else { throw new IllegalArgumentException("S3 config is required for S3 file system"); }