diff --git a/dinky-admin/src/main/java/org/dinky/controller/DownloadController.java b/dinky-admin/src/main/java/org/dinky/controller/DownloadController.java index 31da1d9ad4..1f09b3bb90 100644 --- a/dinky-admin/src/main/java/org/dinky/controller/DownloadController.java +++ b/dinky-admin/src/main/java/org/dinky/controller/DownloadController.java @@ -26,7 +26,7 @@ import org.dinky.data.model.FlinkUdfManifest; import org.dinky.function.constant.PathConstant; import org.dinky.function.util.ZipWriter; -import org.dinky.service.resource.BaseResourceManager; +import org.dinky.resource.BaseResourceManager; import java.io.File; import java.io.InputStream; diff --git a/dinky-admin/src/main/java/org/dinky/data/model/Resources.java b/dinky-admin/src/main/java/org/dinky/data/model/Resources.java index 998785cfef..5a00c3032e 100644 --- a/dinky-admin/src/main/java/org/dinky/data/model/Resources.java +++ b/dinky-admin/src/main/java/org/dinky/data/model/Resources.java @@ -35,6 +35,7 @@ import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer; import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateTimeSerializer; +import cn.hutool.core.bean.BeanUtil; import io.swagger.annotations.ApiModel; import io.swagger.annotations.ApiModelProperty; import lombok.AllArgsConstructor; @@ -146,4 +147,8 @@ public class Resources extends Model { @TableField(fill = FieldFill.INSERT_UPDATE) @ApiModelProperty(value = "Updater", required = true, dataType = "Integer", example = "updater") private Integer updater; + + public static Resources of(ResourcesVO resourcesVO) { + return BeanUtil.toBean(resourcesVO, Resources.class); + } } diff --git a/dinky-admin/src/main/java/org/dinky/init/SystemInit.java b/dinky-admin/src/main/java/org/dinky/init/SystemInit.java index a3e8520c9e..1b64c6cabe 100644 --- a/dinky-admin/src/main/java/org/dinky/init/SystemInit.java +++ b/dinky-admin/src/main/java/org/dinky/init/SystemInit.java @@ -36,6 +36,7 @@ import org.dinky.job.ClearJobHistoryTask; import org.dinky.job.FlinkJobTask; import org.dinky.job.SystemMetricsTask; +import org.dinky.resource.BaseResourceManager; import org.dinky.scheduler.client.ProjectClient; import org.dinky.scheduler.exception.SchedulerException; import org.dinky.scheduler.model.Project; @@ -44,7 +45,6 @@ import org.dinky.service.SysConfigService; import org.dinky.service.TaskService; import org.dinky.service.TenantService; -import org.dinky.service.resource.BaseResourceManager; import org.dinky.url.RsURLStreamHandlerFactory; import org.dinky.utils.JsonUtils; import org.dinky.utils.UDFUtils; diff --git a/dinky-admin/src/main/java/org/dinky/service/resource/impl/ResourceServiceImpl.java b/dinky-admin/src/main/java/org/dinky/service/resource/impl/ResourceServiceImpl.java index cc4c2f67a6..6e920b90e6 100644 --- a/dinky-admin/src/main/java/org/dinky/service/resource/impl/ResourceServiceImpl.java +++ b/dinky-admin/src/main/java/org/dinky/service/resource/impl/ResourceServiceImpl.java @@ -26,11 +26,12 @@ import org.dinky.data.model.Resources; import org.dinky.data.result.Result; import org.dinky.mapper.ResourcesMapper; -import org.dinky.service.resource.BaseResourceManager; +import org.dinky.resource.BaseResourceManager; import org.dinky.service.resource.ResourcesService; import org.dinky.utils.URLUtils; import java.io.File; +import java.io.IOException; import java.util.ArrayList; import java.util.Comparator; import java.util.List; @@ -72,6 +73,7 @@ public boolean syncRemoteDirectoryStructure() { List resourcesList = getBaseResourceManager().getFullDirectoryStructure(rootResource.getId()).stream() .filter(x -> x.getPid() != -1) + .map(Resources::of) .peek(x -> { // Restore the existing information. If the remotmap is not available, // it means that the configuration is out of sync and no processing will be done. @@ -294,7 +296,19 @@ public void uploadFile(Integer pid, String desc, MultipartFile file) { } long size = file.getSize(); String fileName = file.getOriginalFilename(); - upload(pid, desc, (fullName) -> getBaseResourceManager().putFile(fullName, file), fileName, pResource, size); + upload( + pid, + desc, + (fullName) -> { + try { + getBaseResourceManager().putFile(fullName, file.getInputStream()); + } catch (IOException e) { + throw new RuntimeException(e); + } + }, + fileName, + pResource, + size); } @Transactional(rollbackFor = Exception.class) diff --git a/dinky-admin/src/main/java/org/dinky/url/RsURLConnection.java b/dinky-admin/src/main/java/org/dinky/url/RsURLConnection.java index 77af19f38c..007a5b0f73 100644 --- a/dinky-admin/src/main/java/org/dinky/url/RsURLConnection.java +++ b/dinky-admin/src/main/java/org/dinky/url/RsURLConnection.java @@ -20,7 +20,7 @@ package org.dinky.url; import org.dinky.data.exception.BusException; -import org.dinky.service.resource.BaseResourceManager; +import org.dinky.resource.BaseResourceManager; import java.io.InputStream; import java.net.URL; diff --git a/dinky-app/dinky-app-base/src/main/java/org/dinky/app/flinksql/Submitter.java b/dinky-app/dinky-app-base/src/main/java/org/dinky/app/flinksql/Submitter.java index 5197b3b5ab..6769929508 100644 --- a/dinky-app/dinky-app-base/src/main/java/org/dinky/app/flinksql/Submitter.java +++ b/dinky-app/dinky-app-base/src/main/java/org/dinky/app/flinksql/Submitter.java @@ -22,8 +22,6 @@ import org.dinky.app.db.DBUtil; import org.dinky.app.model.StatementParam; import org.dinky.app.model.SysConfig; -import org.dinky.app.resource.BaseResourceManager; -import org.dinky.app.url.RsURLStreamHandlerFactory; import org.dinky.app.util.FlinkAppUtil; import org.dinky.assertion.Asserts; import org.dinky.classloader.DinkyClassLoader; @@ -37,10 +35,12 @@ import org.dinky.executor.ExecutorFactory; import org.dinky.interceptor.FlinkInterceptor; import org.dinky.parser.SqlType; +import org.dinky.resource.BaseResourceManager; import org.dinky.trans.Operations; import org.dinky.trans.dml.ExecuteJarOperation; import org.dinky.trans.parse.AddJarSqlParseStrategy; import org.dinky.trans.parse.ExecuteJarParseStrategy; +import org.dinky.url.RsURLStreamHandlerFactory; import org.dinky.utils.SqlUtil; import org.dinky.utils.ZipUtils; diff --git a/dinky-app/dinky-app-base/src/main/java/org/dinky/app/resource/BaseResourceManager.java b/dinky-app/dinky-app-base/src/main/java/org/dinky/app/resource/BaseResourceManager.java deleted file mode 100644 index f6363be051..0000000000 --- a/dinky-app/dinky-app-base/src/main/java/org/dinky/app/resource/BaseResourceManager.java +++ /dev/null @@ -1,96 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.dinky.app.resource; - -import org.dinky.app.resource.impl.HdfsResourceManager; -import org.dinky.app.resource.impl.LocalResourceManager; -import org.dinky.app.resource.impl.OssResourceManager; -import org.dinky.data.exception.DinkyException; -import org.dinky.data.model.SystemConfiguration; -import org.dinky.oss.OssTemplate; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; - -import java.io.InputStream; -import java.nio.charset.Charset; - -import cn.hutool.core.io.FileUtil; -import cn.hutool.core.io.IoUtil; -import cn.hutool.core.lang.Opt; -import cn.hutool.core.lang.Singleton; -import cn.hutool.core.util.StrUtil; - -public interface BaseResourceManager { - SystemConfiguration instances = SystemConfiguration.getInstances(); - - InputStream readFile(String path); - - static BaseResourceManager getInstance() { - switch (SystemConfiguration.getInstances().getResourcesModel().getValue()) { - case HDFS: - return Singleton.get(HdfsResourceManager.class); - case OSS: - return Singleton.get(OssResourceManager.class); - case LOCAL: - return Singleton.get(LocalResourceManager.class); - default: - return null; - } - } - - static void initResourceManager() { - switch (instances.getResourcesModel().getValue()) { - case LOCAL: - Singleton.get(LocalResourceManager.class); - case OSS: - OssTemplate template = new OssTemplate(instances.getOssProperties()); - Singleton.get(OssResourceManager.class).setOssTemplate(template); - break; - case HDFS: - final Configuration configuration = new Configuration(); - Charset charset = Charset.defaultCharset(); - String coreSite = instances.getResourcesHdfsCoreSite().getValue(); - Opt.ofBlankAble(coreSite).ifPresent(x -> configuration.addResource(IoUtil.toStream(x, charset), true)); - String hdfsSite = instances.getResourcesHdfsHdfsSite().getValue(); - Opt.ofBlankAble(hdfsSite).ifPresent(x -> configuration.addResource(IoUtil.toStream(x, charset), true)); - if (StrUtil.isEmpty(coreSite)) { - configuration.set( - "fs.defaultFS", - instances.getResourcesHdfsDefaultFS().getValue()); - } - try { - FileSystem fileSystem = FileSystem.get( - FileSystem.getDefaultUri(configuration), - configuration, - instances.getResourcesHdfsUser().getValue()); - Singleton.get(HdfsResourceManager.class).setHdfs(fileSystem); - } catch (Exception e) { - throw new DinkyException(e); - } - } - } - - default String getFilePath(String path) { - return FileUtil.normalize( - FileUtil.file(instances.getResourcesUploadBasePath().getValue(), path) - .toString()); - } -} diff --git a/dinky-app/dinky-app-base/src/main/java/org/dinky/app/resource/impl/HdfsResourceManager.java b/dinky-app/dinky-app-base/src/main/java/org/dinky/app/resource/impl/HdfsResourceManager.java deleted file mode 100644 index d85a9c54dc..0000000000 --- a/dinky-app/dinky-app-base/src/main/java/org/dinky/app/resource/impl/HdfsResourceManager.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.dinky.app.resource.impl; - -import org.dinky.app.resource.BaseResourceManager; -import org.dinky.data.exception.BusException; - -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; - -import java.io.IOException; -import java.io.InputStream; - -public class HdfsResourceManager implements BaseResourceManager { - FileSystem hdfs; - - @Override - public InputStream readFile(String path) { - try { - return getHdfs().open(new Path(getFilePath(path))); - } catch (IOException e) { - throw BusException.valueOf("file.read.failed", e); - } - } - - public FileSystem getHdfs() { - if (hdfs == null && instances.getResourcesEnable().getValue()) { - throw BusException.valueOf("Resource configuration error, HDFS is not enabled"); - } - return hdfs; - } - - public synchronized void setHdfs(FileSystem hdfs) { - this.hdfs = hdfs; - } -} diff --git a/dinky-app/dinky-app-base/src/main/java/org/dinky/app/resource/impl/OssResourceManager.java b/dinky-app/dinky-app-base/src/main/java/org/dinky/app/resource/impl/OssResourceManager.java deleted file mode 100644 index 554cc47992..0000000000 --- a/dinky-app/dinky-app-base/src/main/java/org/dinky/app/resource/impl/OssResourceManager.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.dinky.app.resource.impl; - -import org.dinky.app.resource.BaseResourceManager; -import org.dinky.data.exception.BusException; -import org.dinky.oss.OssTemplate; - -import java.io.InputStream; - -public class OssResourceManager implements BaseResourceManager { - OssTemplate ossTemplate; - - @Override - public InputStream readFile(String path) { - return getOssTemplate() - .getObject(getOssTemplate().getBucketName(), getFilePath(path)) - .getObjectContent(); - } - - public OssTemplate getOssTemplate() { - if (ossTemplate == null && instances.getResourcesEnable().getValue()) { - throw BusException.valueOf("Resource configuration error, OSS is not enabled"); - } - return ossTemplate; - } - - public void setOssTemplate(OssTemplate ossTemplate) { - this.ossTemplate = ossTemplate; - } -} diff --git a/dinky-app/dinky-app-base/src/main/java/org/dinky/resource/impl/LocalResourceManager.java b/dinky-app/dinky-app-base/src/main/java/org/dinky/resource/impl/LocalResourceManager.java new file mode 100644 index 0000000000..a762dd2b7c --- /dev/null +++ b/dinky-app/dinky-app-base/src/main/java/org/dinky/resource/impl/LocalResourceManager.java @@ -0,0 +1,141 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.dinky.resource.impl; + +import org.dinky.data.exception.BusException; +import org.dinky.data.model.ResourcesVO; +import org.dinky.data.model.SystemConfiguration; +import org.dinky.resource.BaseResourceManager; + +import java.io.BufferedInputStream; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import cn.hutool.core.io.FileUtil; +import cn.hutool.core.io.IORuntimeException; +import cn.hutool.core.io.IoUtil; +import cn.hutool.core.util.URLUtil; +import cn.hutool.http.HttpResponse; +import cn.hutool.http.HttpUtil; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class LocalResourceManager implements BaseResourceManager { + SystemConfiguration systemConfiguration = SystemConfiguration.getInstances(); + + @Override + public void remove(String path) { + try { + boolean isSuccess = FileUtil.del(getFilePath(path)); + if (!isSuccess) { + throw new BusException("remove file failed,reason unknown"); + } + } catch (IORuntimeException e) { + log.error("remove file failed", e); + throw new BusException(e.getMessage()); + } + } + + @Override + public void rename(String path, String newPath) { + try { + String newName = FileUtil.getName(newPath); + FileUtil.rename(new File(getFilePath(path)), newName, true); + } catch (Exception e) { + log.error("rename file failed", e); + throw new BusException(e.getMessage()); + } + } + + @Override + public void putFile(String path, InputStream fileStream) { + try { + FileUtil.writeFromStream(fileStream, getFilePath(path)); + } catch (Exception e) { + log.error("putFile file failed", e); + throw new BusException(e.getMessage()); + } + } + + @Override + public void putFile(String path, File file) { + BufferedInputStream inputStream = FileUtil.getInputStream(file); + FileUtil.writeFromStream(inputStream, getFilePath(path)); + } + + @Override + public String getFileContent(String path) { + return IoUtil.readUtf8(readFile(path)); + } + + @Override + public List getFullDirectoryStructure(int rootId) { + String basePath = getBasePath(); + try (Stream paths = Files.walk(Paths.get(basePath))) { + return paths.map(path -> { + if (path.compareTo(Paths.get(basePath)) == 0) { + // 跳过根目录 | skip root directory + return null; + } + Path parentPath = path.getParent(); + String parent = ""; + if (parentPath != null) { + parent = parentPath.toString().replace(basePath, ""); + } + String self = path.toString().replace(basePath, ""); + int pid = parent.isEmpty() ? rootId : parent.hashCode(); + File file = new File(path.toString()); + return ResourcesVO.builder() + .id(self.hashCode()) + .pid(pid) + .fullName(self) + .fileName(file.getName()) + .isDirectory(file.isDirectory()) + .type(0) + .size(file.length()) + .build(); + }) + .filter(Objects::nonNull) + .collect(Collectors.toList()); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public InputStream readFile(String path) { + try (HttpResponse exec = HttpUtil.createGet( + systemConfiguration.getDinkyAddr().getValue() + "/download/downloadFromRs?path=" + + URLUtil.encode(path)) + .execute()) { + return exec.bodyStream(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } +} diff --git a/dinky-client/dinky-client-1.14/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java b/dinky-client/dinky-client-1.14/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java new file mode 100644 index 0000000000..67b8a28836 --- /dev/null +++ b/dinky-client/dinky-client-1.14/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java @@ -0,0 +1,262 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.client.program; + +import static org.apache.flink.util.Preconditions.checkState; + +import org.apache.commons.collections.CollectionUtils; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.dag.Pipeline; +import org.apache.flink.client.FlinkPipelineTranslationUtil; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.optimizer.CompilerException; +import org.apache.flink.runtime.jobgraph.JobGraph; + +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.io.PrintStream; +import java.net.MalformedURLException; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.URL; +import java.nio.file.FileSystems; +import java.nio.file.FileVisitResult; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.SimpleFileVisitor; +import java.nio.file.attribute.BasicFileAttributes; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import javax.annotation.Nullable; + +/** Utility class for {@link PackagedProgram} related operations. */ +public enum PackagedProgramUtils { + ; + + private static final String PYTHON_GATEWAY_CLASS_NAME = "org.apache.flink.client.python.PythonGatewayServer"; + + private static final String PYTHON_DRIVER_CLASS_NAME = "org.apache.flink.client.python.PythonDriver"; + + /** + * Creates a {@link JobGraph} with a specified {@link JobID} from the given {@link + * PackagedProgram}. + * + * @param packagedProgram to extract the JobGraph from + * @param configuration to use for the optimizer and job graph generator + * @param defaultParallelism for the JobGraph + * @param jobID the pre-generated job id + * @return JobGraph extracted from the PackagedProgram + * @throws ProgramInvocationException if the JobGraph generation failed + */ + public static JobGraph createJobGraph( + PackagedProgram packagedProgram, + Configuration configuration, + int defaultParallelism, + @Nullable JobID jobID, + boolean suppressOutput) + throws ProgramInvocationException { + final Pipeline pipeline = + getPipelineFromProgram(packagedProgram, configuration, defaultParallelism, suppressOutput); + final JobGraph jobGraph = FlinkPipelineTranslationUtil.getJobGraphUnderUserClassLoader( + packagedProgram.getUserCodeClassLoader(), pipeline, configuration, defaultParallelism); + if (jobID != null) { + jobGraph.setJobID(jobID); + } + jobGraph.addJars(packagedProgram.getJobJarAndDependencies()); + jobGraph.setClasspaths(packagedProgram.getClasspaths()); + jobGraph.setSavepointRestoreSettings(packagedProgram.getSavepointSettings()); + + return jobGraph; + } + + /** + * Creates a {@link JobGraph} with a random {@link JobID} from the given {@link + * PackagedProgram}. + * + * @param packagedProgram to extract the JobGraph from + * @param configuration to use for the optimizer and job graph generator + * @param defaultParallelism for the JobGraph + * @param suppressOutput Whether to suppress stdout/stderr during interactive JobGraph creation. + * @return JobGraph extracted from the PackagedProgram + * @throws ProgramInvocationException if the JobGraph generation failed + */ + public static JobGraph createJobGraph( + PackagedProgram packagedProgram, + Configuration configuration, + int defaultParallelism, + boolean suppressOutput) + throws ProgramInvocationException { + return createJobGraph(packagedProgram, configuration, defaultParallelism, null, suppressOutput); + } + + public static Pipeline getPipelineFromProgram( + PackagedProgram program, Configuration configuration, int parallelism, boolean suppressOutput) + throws CompilerException, ProgramInvocationException { + final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader(); + + Thread.currentThread().setContextClassLoader(program.getUserCodeClassLoader()); + + final PrintStream originalOut = System.out; + final PrintStream originalErr = System.err; + final ByteArrayOutputStream stdOutBuffer; + final ByteArrayOutputStream stdErrBuffer; + + if (suppressOutput) { + // temporarily write STDERR and STDOUT to a byte array. + stdOutBuffer = new ByteArrayOutputStream(); + System.setOut(new PrintStream(stdOutBuffer)); + stdErrBuffer = new ByteArrayOutputStream(); + System.setErr(new PrintStream(stdErrBuffer)); + } else { + stdOutBuffer = null; + stdErrBuffer = null; + } + + // temporary hack to support the optimizer plan preview + OptimizerPlanEnvironment benv = + new OptimizerPlanEnvironment(configuration, program.getUserCodeClassLoader(), parallelism); + benv.setAsContext(); + StreamPlanEnvironment senv = + new StreamPlanEnvironment(configuration, program.getUserCodeClassLoader(), parallelism); + senv.setAsContext(); + + try { + program.invokeInteractiveModeForExecution(); + } catch (Throwable t) { + if (benv.getPipeline() != null) { + return benv.getPipeline(); + } + + if (senv.getPipeline() != null) { + return senv.getPipeline(); + } + + if (t instanceof ProgramInvocationException) { + throw t; + } + + throw generateException(program, "The program caused an error: ", t, stdOutBuffer, stdErrBuffer); + } finally { + benv.unsetAsContext(); + senv.unsetAsContext(); + if (suppressOutput) { + System.setOut(originalOut); + System.setErr(originalErr); + } + Thread.currentThread().setContextClassLoader(contextClassLoader); + } + + throw generateException( + program, + "The program plan could not be fetched - the program aborted pre-maturely.", + null, + stdOutBuffer, + stdErrBuffer); + } + + public static Boolean isPython(String entryPointClassName) { + return (entryPointClassName != null) + && (entryPointClassName.equals(PYTHON_DRIVER_CLASS_NAME) + || entryPointClassName.equals(PYTHON_GATEWAY_CLASS_NAME)); + } + + public static boolean isPython(String[] programArguments) { + return CollectionUtils.containsAny( + Arrays.asList(programArguments), Arrays.asList("-py", "-pym", "--python", "--pyModule")); + } + + public static URL getPythonJar() { + String flinkOptPath = System.getenv(ConfigConstants.ENV_FLINK_OPT_DIR); + final List pythonJarPath = new ArrayList<>(); + try { + + if (flinkOptPath == null) { + Class pyClass = Class.forName( + getPythonDriverClassName(), + false, + Thread.currentThread().getContextClassLoader()); + return pyClass.getProtectionDomain().getCodeSource().getLocation(); + } else { + Files.walkFileTree(FileSystems.getDefault().getPath(flinkOptPath), new SimpleFileVisitor() { + @Override + public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException { + FileVisitResult result = super.visitFile(file, attrs); + if (file.getFileName().toString().startsWith("flink-python")) { + pythonJarPath.add(file); + } + return result; + } + }); + } + } catch (Throwable e) { + throw new RuntimeException( + "Exception encountered during finding the flink-python jar. This should not happen.", e); + } + + if (pythonJarPath.size() != 1) { + throw new RuntimeException("Found " + pythonJarPath.size() + " flink-python jar."); + } + + try { + return pythonJarPath.get(0).toUri().toURL(); + } catch (MalformedURLException e) { + throw new RuntimeException("URL is invalid. This should not happen.", e); + } + } + + public static String getPythonDriverClassName() { + return PYTHON_DRIVER_CLASS_NAME; + } + + public static URI resolveURI(String path) throws URISyntaxException { + final URI uri = new URI(path); + if (uri.getScheme() != null) { + return uri; + } + return new File(path).getAbsoluteFile().toURI(); + } + + private static ProgramInvocationException generateException( + PackagedProgram program, + String msg, + @Nullable Throwable cause, + @Nullable ByteArrayOutputStream stdoutBuffer, + @Nullable ByteArrayOutputStream stderrBuffer) { + checkState( + (stdoutBuffer != null) == (stderrBuffer != null), + "Stderr/Stdout should either both be set or both be null."); + + final String stdout = (stdoutBuffer != null) ? stdoutBuffer.toString() : ""; + final String stderr = (stderrBuffer != null) ? stderrBuffer.toString() : ""; + + return new ProgramInvocationException( + String.format( + "%s\n\nClasspath: %s\n\nSystem.out: %s\n\nSystem.err: %s", + msg, + program.getJobJarAndDependencies(), + stdout.length() == 0 ? "(none)" : stdout, + stderr.length() == 0 ? "(none)" : stderr), + cause); + } +} diff --git a/dinky-client/dinky-client-1.15/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java b/dinky-client/dinky-client-1.15/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java new file mode 100644 index 0000000000..a2b8f9a7d5 --- /dev/null +++ b/dinky-client/dinky-client-1.15/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java @@ -0,0 +1,263 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.client.program; + +import static org.apache.flink.util.Preconditions.checkState; + +import org.apache.commons.collections.CollectionUtils; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.dag.Pipeline; +import org.apache.flink.client.FlinkPipelineTranslationUtil; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.optimizer.CompilerException; +import org.apache.flink.runtime.jobgraph.JobGraph; + +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.io.PrintStream; +import java.net.MalformedURLException; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.URL; +import java.nio.file.FileSystems; +import java.nio.file.FileVisitResult; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.SimpleFileVisitor; +import java.nio.file.attribute.BasicFileAttributes; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import javax.annotation.Nullable; + +/** Utility class for {@link PackagedProgram} related operations. */ +public enum PackagedProgramUtils { + ; + + private static final String PYTHON_GATEWAY_CLASS_NAME = "org.apache.flink.client.python.PythonGatewayServer"; + + private static final String PYTHON_DRIVER_CLASS_NAME = "org.apache.flink.client.python.PythonDriver"; + + /** + * Creates a {@link JobGraph} with a specified {@link JobID} from the given {@link + * PackagedProgram}. + * + * @param packagedProgram to extract the JobGraph from + * @param configuration to use for the optimizer and job graph generator + * @param defaultParallelism for the JobGraph + * @param jobID the pre-generated job id + * @return JobGraph extracted from the PackagedProgram + * @throws ProgramInvocationException if the JobGraph generation failed + */ + public static JobGraph createJobGraph( + PackagedProgram packagedProgram, + Configuration configuration, + int defaultParallelism, + @Nullable JobID jobID, + boolean suppressOutput) + throws ProgramInvocationException { + final Pipeline pipeline = + getPipelineFromProgram(packagedProgram, configuration, defaultParallelism, suppressOutput); + final JobGraph jobGraph = FlinkPipelineTranslationUtil.getJobGraphUnderUserClassLoader( + packagedProgram.getUserCodeClassLoader(), pipeline, configuration, defaultParallelism); + if (jobID != null) { + jobGraph.setJobID(jobID); + } + jobGraph.addJars(packagedProgram.getJobJarAndDependencies()); + jobGraph.setClasspaths(packagedProgram.getClasspaths()); + jobGraph.setSavepointRestoreSettings(packagedProgram.getSavepointSettings()); + + return jobGraph; + } + + /** + * Creates a {@link JobGraph} with a random {@link JobID} from the given {@link + * PackagedProgram}. + * + * @param packagedProgram to extract the JobGraph from + * @param configuration to use for the optimizer and job graph generator + * @param defaultParallelism for the JobGraph + * @param suppressOutput Whether to suppress stdout/stderr during interactive JobGraph creation. + * @return JobGraph extracted from the PackagedProgram + * @throws ProgramInvocationException if the JobGraph generation failed + */ + public static JobGraph createJobGraph( + PackagedProgram packagedProgram, + Configuration configuration, + int defaultParallelism, + boolean suppressOutput) + throws ProgramInvocationException { + return createJobGraph(packagedProgram, configuration, defaultParallelism, null, suppressOutput); + } + + public static Pipeline getPipelineFromProgram( + PackagedProgram program, Configuration configuration, int parallelism, boolean suppressOutput) + throws CompilerException, ProgramInvocationException { + final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader(); + + Thread.currentThread().setContextClassLoader(program.getUserCodeClassLoader()); + + final PrintStream originalOut = System.out; + final PrintStream originalErr = System.err; + final ByteArrayOutputStream stdOutBuffer; + final ByteArrayOutputStream stdErrBuffer; + + if (suppressOutput) { + // temporarily write STDERR and STDOUT to a byte array. + stdOutBuffer = new ByteArrayOutputStream(); + System.setOut(new PrintStream(stdOutBuffer)); + stdErrBuffer = new ByteArrayOutputStream(); + System.setErr(new PrintStream(stdErrBuffer)); + } else { + stdOutBuffer = null; + stdErrBuffer = null; + } + + // temporary hack to support the optimizer plan preview + OptimizerPlanEnvironment benv = + new OptimizerPlanEnvironment(configuration, program.getUserCodeClassLoader(), parallelism); + benv.setAsContext(); + StreamPlanEnvironment senv = + new StreamPlanEnvironment(configuration, program.getUserCodeClassLoader(), parallelism); + senv.setAsContext(); + + try { + program.invokeInteractiveModeForExecution(); + } catch (Throwable t) { + if (benv.getPipeline() != null) { + return benv.getPipeline(); + } + + if (senv.getPipeline() != null) { + return senv.getPipeline(); + } + + if (t instanceof ProgramInvocationException) { + throw t; + } + + throw generateException(program, "The program caused an error: ", t, stdOutBuffer, stdErrBuffer); + } finally { + benv.unsetAsContext(); + senv.unsetAsContext(); + if (suppressOutput) { + System.setOut(originalOut); + System.setErr(originalErr); + } + Thread.currentThread().setContextClassLoader(contextClassLoader); + } + + throw generateException( + program, + "The program plan could not be fetched - the program aborted pre-maturely.", + null, + stdOutBuffer, + stdErrBuffer); + } + + public static Boolean isPython(String entryPointClassName) { + return (entryPointClassName != null) + && (entryPointClassName.equals(PYTHON_DRIVER_CLASS_NAME) + || entryPointClassName.equals(PYTHON_GATEWAY_CLASS_NAME)); + } + + public static boolean isPython(String[] programArguments) { + return CollectionUtils.containsAny( + Arrays.asList(programArguments), Arrays.asList("-py", "-pym", "--python", "--pyModule")); + } + + // override + public static URL getPythonJar() { + String flinkOptPath = System.getenv(ConfigConstants.ENV_FLINK_OPT_DIR); + final List pythonJarPath = new ArrayList<>(); + try { + + if (flinkOptPath == null) { + Class pyClass = Class.forName( + getPythonDriverClassName(), + false, + Thread.currentThread().getContextClassLoader()); + return pyClass.getProtectionDomain().getCodeSource().getLocation(); + } else { + Files.walkFileTree(FileSystems.getDefault().getPath(flinkOptPath), new SimpleFileVisitor() { + @Override + public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException { + FileVisitResult result = super.visitFile(file, attrs); + if (file.getFileName().toString().startsWith("flink-python")) { + pythonJarPath.add(file); + } + return result; + } + }); + } + } catch (Throwable e) { + throw new RuntimeException( + "Exception encountered during finding the flink-python jar. This should not happen.", e); + } + + if (pythonJarPath.size() != 1) { + throw new RuntimeException("Found " + pythonJarPath.size() + " flink-python jar."); + } + + try { + return pythonJarPath.get(0).toUri().toURL(); + } catch (MalformedURLException e) { + throw new RuntimeException("URL is invalid. This should not happen.", e); + } + } + + public static String getPythonDriverClassName() { + return PYTHON_DRIVER_CLASS_NAME; + } + + public static URI resolveURI(String path) throws URISyntaxException { + final URI uri = new URI(path); + if (uri.getScheme() != null) { + return uri; + } + return new File(path).getAbsoluteFile().toURI(); + } + + private static ProgramInvocationException generateException( + PackagedProgram program, + String msg, + @Nullable Throwable cause, + @Nullable ByteArrayOutputStream stdoutBuffer, + @Nullable ByteArrayOutputStream stderrBuffer) { + checkState( + (stdoutBuffer != null) == (stderrBuffer != null), + "Stderr/Stdout should either both be set or both be null."); + + final String stdout = (stdoutBuffer != null) ? stdoutBuffer.toString() : ""; + final String stderr = (stderrBuffer != null) ? stderrBuffer.toString() : ""; + + return new ProgramInvocationException( + String.format( + "%s\n\nClasspath: %s\n\nSystem.out: %s\n\nSystem.err: %s", + msg, + program.getJobJarAndDependencies(), + stdout.length() == 0 ? "(none)" : stdout, + stderr.length() == 0 ? "(none)" : stderr), + cause); + } +} diff --git a/dinky-client/dinky-client-1.16/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java b/dinky-client/dinky-client-1.16/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java new file mode 100644 index 0000000000..67b8a28836 --- /dev/null +++ b/dinky-client/dinky-client-1.16/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java @@ -0,0 +1,262 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.client.program; + +import static org.apache.flink.util.Preconditions.checkState; + +import org.apache.commons.collections.CollectionUtils; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.dag.Pipeline; +import org.apache.flink.client.FlinkPipelineTranslationUtil; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.optimizer.CompilerException; +import org.apache.flink.runtime.jobgraph.JobGraph; + +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.io.PrintStream; +import java.net.MalformedURLException; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.URL; +import java.nio.file.FileSystems; +import java.nio.file.FileVisitResult; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.SimpleFileVisitor; +import java.nio.file.attribute.BasicFileAttributes; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import javax.annotation.Nullable; + +/** Utility class for {@link PackagedProgram} related operations. */ +public enum PackagedProgramUtils { + ; + + private static final String PYTHON_GATEWAY_CLASS_NAME = "org.apache.flink.client.python.PythonGatewayServer"; + + private static final String PYTHON_DRIVER_CLASS_NAME = "org.apache.flink.client.python.PythonDriver"; + + /** + * Creates a {@link JobGraph} with a specified {@link JobID} from the given {@link + * PackagedProgram}. + * + * @param packagedProgram to extract the JobGraph from + * @param configuration to use for the optimizer and job graph generator + * @param defaultParallelism for the JobGraph + * @param jobID the pre-generated job id + * @return JobGraph extracted from the PackagedProgram + * @throws ProgramInvocationException if the JobGraph generation failed + */ + public static JobGraph createJobGraph( + PackagedProgram packagedProgram, + Configuration configuration, + int defaultParallelism, + @Nullable JobID jobID, + boolean suppressOutput) + throws ProgramInvocationException { + final Pipeline pipeline = + getPipelineFromProgram(packagedProgram, configuration, defaultParallelism, suppressOutput); + final JobGraph jobGraph = FlinkPipelineTranslationUtil.getJobGraphUnderUserClassLoader( + packagedProgram.getUserCodeClassLoader(), pipeline, configuration, defaultParallelism); + if (jobID != null) { + jobGraph.setJobID(jobID); + } + jobGraph.addJars(packagedProgram.getJobJarAndDependencies()); + jobGraph.setClasspaths(packagedProgram.getClasspaths()); + jobGraph.setSavepointRestoreSettings(packagedProgram.getSavepointSettings()); + + return jobGraph; + } + + /** + * Creates a {@link JobGraph} with a random {@link JobID} from the given {@link + * PackagedProgram}. + * + * @param packagedProgram to extract the JobGraph from + * @param configuration to use for the optimizer and job graph generator + * @param defaultParallelism for the JobGraph + * @param suppressOutput Whether to suppress stdout/stderr during interactive JobGraph creation. + * @return JobGraph extracted from the PackagedProgram + * @throws ProgramInvocationException if the JobGraph generation failed + */ + public static JobGraph createJobGraph( + PackagedProgram packagedProgram, + Configuration configuration, + int defaultParallelism, + boolean suppressOutput) + throws ProgramInvocationException { + return createJobGraph(packagedProgram, configuration, defaultParallelism, null, suppressOutput); + } + + public static Pipeline getPipelineFromProgram( + PackagedProgram program, Configuration configuration, int parallelism, boolean suppressOutput) + throws CompilerException, ProgramInvocationException { + final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader(); + + Thread.currentThread().setContextClassLoader(program.getUserCodeClassLoader()); + + final PrintStream originalOut = System.out; + final PrintStream originalErr = System.err; + final ByteArrayOutputStream stdOutBuffer; + final ByteArrayOutputStream stdErrBuffer; + + if (suppressOutput) { + // temporarily write STDERR and STDOUT to a byte array. + stdOutBuffer = new ByteArrayOutputStream(); + System.setOut(new PrintStream(stdOutBuffer)); + stdErrBuffer = new ByteArrayOutputStream(); + System.setErr(new PrintStream(stdErrBuffer)); + } else { + stdOutBuffer = null; + stdErrBuffer = null; + } + + // temporary hack to support the optimizer plan preview + OptimizerPlanEnvironment benv = + new OptimizerPlanEnvironment(configuration, program.getUserCodeClassLoader(), parallelism); + benv.setAsContext(); + StreamPlanEnvironment senv = + new StreamPlanEnvironment(configuration, program.getUserCodeClassLoader(), parallelism); + senv.setAsContext(); + + try { + program.invokeInteractiveModeForExecution(); + } catch (Throwable t) { + if (benv.getPipeline() != null) { + return benv.getPipeline(); + } + + if (senv.getPipeline() != null) { + return senv.getPipeline(); + } + + if (t instanceof ProgramInvocationException) { + throw t; + } + + throw generateException(program, "The program caused an error: ", t, stdOutBuffer, stdErrBuffer); + } finally { + benv.unsetAsContext(); + senv.unsetAsContext(); + if (suppressOutput) { + System.setOut(originalOut); + System.setErr(originalErr); + } + Thread.currentThread().setContextClassLoader(contextClassLoader); + } + + throw generateException( + program, + "The program plan could not be fetched - the program aborted pre-maturely.", + null, + stdOutBuffer, + stdErrBuffer); + } + + public static Boolean isPython(String entryPointClassName) { + return (entryPointClassName != null) + && (entryPointClassName.equals(PYTHON_DRIVER_CLASS_NAME) + || entryPointClassName.equals(PYTHON_GATEWAY_CLASS_NAME)); + } + + public static boolean isPython(String[] programArguments) { + return CollectionUtils.containsAny( + Arrays.asList(programArguments), Arrays.asList("-py", "-pym", "--python", "--pyModule")); + } + + public static URL getPythonJar() { + String flinkOptPath = System.getenv(ConfigConstants.ENV_FLINK_OPT_DIR); + final List pythonJarPath = new ArrayList<>(); + try { + + if (flinkOptPath == null) { + Class pyClass = Class.forName( + getPythonDriverClassName(), + false, + Thread.currentThread().getContextClassLoader()); + return pyClass.getProtectionDomain().getCodeSource().getLocation(); + } else { + Files.walkFileTree(FileSystems.getDefault().getPath(flinkOptPath), new SimpleFileVisitor() { + @Override + public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException { + FileVisitResult result = super.visitFile(file, attrs); + if (file.getFileName().toString().startsWith("flink-python")) { + pythonJarPath.add(file); + } + return result; + } + }); + } + } catch (Throwable e) { + throw new RuntimeException( + "Exception encountered during finding the flink-python jar. This should not happen.", e); + } + + if (pythonJarPath.size() != 1) { + throw new RuntimeException("Found " + pythonJarPath.size() + " flink-python jar."); + } + + try { + return pythonJarPath.get(0).toUri().toURL(); + } catch (MalformedURLException e) { + throw new RuntimeException("URL is invalid. This should not happen.", e); + } + } + + public static String getPythonDriverClassName() { + return PYTHON_DRIVER_CLASS_NAME; + } + + public static URI resolveURI(String path) throws URISyntaxException { + final URI uri = new URI(path); + if (uri.getScheme() != null) { + return uri; + } + return new File(path).getAbsoluteFile().toURI(); + } + + private static ProgramInvocationException generateException( + PackagedProgram program, + String msg, + @Nullable Throwable cause, + @Nullable ByteArrayOutputStream stdoutBuffer, + @Nullable ByteArrayOutputStream stderrBuffer) { + checkState( + (stdoutBuffer != null) == (stderrBuffer != null), + "Stderr/Stdout should either both be set or both be null."); + + final String stdout = (stdoutBuffer != null) ? stdoutBuffer.toString() : ""; + final String stderr = (stderrBuffer != null) ? stderrBuffer.toString() : ""; + + return new ProgramInvocationException( + String.format( + "%s\n\nClasspath: %s\n\nSystem.out: %s\n\nSystem.err: %s", + msg, + program.getJobJarAndDependencies(), + stdout.length() == 0 ? "(none)" : stdout, + stderr.length() == 0 ? "(none)" : stderr), + cause); + } +} diff --git a/dinky-client/dinky-client-1.17/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java b/dinky-client/dinky-client-1.17/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java new file mode 100644 index 0000000000..67b8a28836 --- /dev/null +++ b/dinky-client/dinky-client-1.17/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java @@ -0,0 +1,262 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.client.program; + +import static org.apache.flink.util.Preconditions.checkState; + +import org.apache.commons.collections.CollectionUtils; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.dag.Pipeline; +import org.apache.flink.client.FlinkPipelineTranslationUtil; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.optimizer.CompilerException; +import org.apache.flink.runtime.jobgraph.JobGraph; + +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.io.PrintStream; +import java.net.MalformedURLException; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.URL; +import java.nio.file.FileSystems; +import java.nio.file.FileVisitResult; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.SimpleFileVisitor; +import java.nio.file.attribute.BasicFileAttributes; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import javax.annotation.Nullable; + +/** Utility class for {@link PackagedProgram} related operations. */ +public enum PackagedProgramUtils { + ; + + private static final String PYTHON_GATEWAY_CLASS_NAME = "org.apache.flink.client.python.PythonGatewayServer"; + + private static final String PYTHON_DRIVER_CLASS_NAME = "org.apache.flink.client.python.PythonDriver"; + + /** + * Creates a {@link JobGraph} with a specified {@link JobID} from the given {@link + * PackagedProgram}. + * + * @param packagedProgram to extract the JobGraph from + * @param configuration to use for the optimizer and job graph generator + * @param defaultParallelism for the JobGraph + * @param jobID the pre-generated job id + * @return JobGraph extracted from the PackagedProgram + * @throws ProgramInvocationException if the JobGraph generation failed + */ + public static JobGraph createJobGraph( + PackagedProgram packagedProgram, + Configuration configuration, + int defaultParallelism, + @Nullable JobID jobID, + boolean suppressOutput) + throws ProgramInvocationException { + final Pipeline pipeline = + getPipelineFromProgram(packagedProgram, configuration, defaultParallelism, suppressOutput); + final JobGraph jobGraph = FlinkPipelineTranslationUtil.getJobGraphUnderUserClassLoader( + packagedProgram.getUserCodeClassLoader(), pipeline, configuration, defaultParallelism); + if (jobID != null) { + jobGraph.setJobID(jobID); + } + jobGraph.addJars(packagedProgram.getJobJarAndDependencies()); + jobGraph.setClasspaths(packagedProgram.getClasspaths()); + jobGraph.setSavepointRestoreSettings(packagedProgram.getSavepointSettings()); + + return jobGraph; + } + + /** + * Creates a {@link JobGraph} with a random {@link JobID} from the given {@link + * PackagedProgram}. + * + * @param packagedProgram to extract the JobGraph from + * @param configuration to use for the optimizer and job graph generator + * @param defaultParallelism for the JobGraph + * @param suppressOutput Whether to suppress stdout/stderr during interactive JobGraph creation. + * @return JobGraph extracted from the PackagedProgram + * @throws ProgramInvocationException if the JobGraph generation failed + */ + public static JobGraph createJobGraph( + PackagedProgram packagedProgram, + Configuration configuration, + int defaultParallelism, + boolean suppressOutput) + throws ProgramInvocationException { + return createJobGraph(packagedProgram, configuration, defaultParallelism, null, suppressOutput); + } + + public static Pipeline getPipelineFromProgram( + PackagedProgram program, Configuration configuration, int parallelism, boolean suppressOutput) + throws CompilerException, ProgramInvocationException { + final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader(); + + Thread.currentThread().setContextClassLoader(program.getUserCodeClassLoader()); + + final PrintStream originalOut = System.out; + final PrintStream originalErr = System.err; + final ByteArrayOutputStream stdOutBuffer; + final ByteArrayOutputStream stdErrBuffer; + + if (suppressOutput) { + // temporarily write STDERR and STDOUT to a byte array. + stdOutBuffer = new ByteArrayOutputStream(); + System.setOut(new PrintStream(stdOutBuffer)); + stdErrBuffer = new ByteArrayOutputStream(); + System.setErr(new PrintStream(stdErrBuffer)); + } else { + stdOutBuffer = null; + stdErrBuffer = null; + } + + // temporary hack to support the optimizer plan preview + OptimizerPlanEnvironment benv = + new OptimizerPlanEnvironment(configuration, program.getUserCodeClassLoader(), parallelism); + benv.setAsContext(); + StreamPlanEnvironment senv = + new StreamPlanEnvironment(configuration, program.getUserCodeClassLoader(), parallelism); + senv.setAsContext(); + + try { + program.invokeInteractiveModeForExecution(); + } catch (Throwable t) { + if (benv.getPipeline() != null) { + return benv.getPipeline(); + } + + if (senv.getPipeline() != null) { + return senv.getPipeline(); + } + + if (t instanceof ProgramInvocationException) { + throw t; + } + + throw generateException(program, "The program caused an error: ", t, stdOutBuffer, stdErrBuffer); + } finally { + benv.unsetAsContext(); + senv.unsetAsContext(); + if (suppressOutput) { + System.setOut(originalOut); + System.setErr(originalErr); + } + Thread.currentThread().setContextClassLoader(contextClassLoader); + } + + throw generateException( + program, + "The program plan could not be fetched - the program aborted pre-maturely.", + null, + stdOutBuffer, + stdErrBuffer); + } + + public static Boolean isPython(String entryPointClassName) { + return (entryPointClassName != null) + && (entryPointClassName.equals(PYTHON_DRIVER_CLASS_NAME) + || entryPointClassName.equals(PYTHON_GATEWAY_CLASS_NAME)); + } + + public static boolean isPython(String[] programArguments) { + return CollectionUtils.containsAny( + Arrays.asList(programArguments), Arrays.asList("-py", "-pym", "--python", "--pyModule")); + } + + public static URL getPythonJar() { + String flinkOptPath = System.getenv(ConfigConstants.ENV_FLINK_OPT_DIR); + final List pythonJarPath = new ArrayList<>(); + try { + + if (flinkOptPath == null) { + Class pyClass = Class.forName( + getPythonDriverClassName(), + false, + Thread.currentThread().getContextClassLoader()); + return pyClass.getProtectionDomain().getCodeSource().getLocation(); + } else { + Files.walkFileTree(FileSystems.getDefault().getPath(flinkOptPath), new SimpleFileVisitor() { + @Override + public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException { + FileVisitResult result = super.visitFile(file, attrs); + if (file.getFileName().toString().startsWith("flink-python")) { + pythonJarPath.add(file); + } + return result; + } + }); + } + } catch (Throwable e) { + throw new RuntimeException( + "Exception encountered during finding the flink-python jar. This should not happen.", e); + } + + if (pythonJarPath.size() != 1) { + throw new RuntimeException("Found " + pythonJarPath.size() + " flink-python jar."); + } + + try { + return pythonJarPath.get(0).toUri().toURL(); + } catch (MalformedURLException e) { + throw new RuntimeException("URL is invalid. This should not happen.", e); + } + } + + public static String getPythonDriverClassName() { + return PYTHON_DRIVER_CLASS_NAME; + } + + public static URI resolveURI(String path) throws URISyntaxException { + final URI uri = new URI(path); + if (uri.getScheme() != null) { + return uri; + } + return new File(path).getAbsoluteFile().toURI(); + } + + private static ProgramInvocationException generateException( + PackagedProgram program, + String msg, + @Nullable Throwable cause, + @Nullable ByteArrayOutputStream stdoutBuffer, + @Nullable ByteArrayOutputStream stderrBuffer) { + checkState( + (stdoutBuffer != null) == (stderrBuffer != null), + "Stderr/Stdout should either both be set or both be null."); + + final String stdout = (stdoutBuffer != null) ? stdoutBuffer.toString() : ""; + final String stderr = (stderrBuffer != null) ? stderrBuffer.toString() : ""; + + return new ProgramInvocationException( + String.format( + "%s\n\nClasspath: %s\n\nSystem.out: %s\n\nSystem.err: %s", + msg, + program.getJobJarAndDependencies(), + stdout.length() == 0 ? "(none)" : stdout, + stderr.length() == 0 ? "(none)" : stderr), + cause); + } +} diff --git a/dinky-client/dinky-client-1.18/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java b/dinky-client/dinky-client-1.18/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java new file mode 100644 index 0000000000..a2b8f9a7d5 --- /dev/null +++ b/dinky-client/dinky-client-1.18/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java @@ -0,0 +1,263 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.client.program; + +import static org.apache.flink.util.Preconditions.checkState; + +import org.apache.commons.collections.CollectionUtils; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.dag.Pipeline; +import org.apache.flink.client.FlinkPipelineTranslationUtil; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.optimizer.CompilerException; +import org.apache.flink.runtime.jobgraph.JobGraph; + +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.io.PrintStream; +import java.net.MalformedURLException; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.URL; +import java.nio.file.FileSystems; +import java.nio.file.FileVisitResult; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.SimpleFileVisitor; +import java.nio.file.attribute.BasicFileAttributes; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import javax.annotation.Nullable; + +/** Utility class for {@link PackagedProgram} related operations. */ +public enum PackagedProgramUtils { + ; + + private static final String PYTHON_GATEWAY_CLASS_NAME = "org.apache.flink.client.python.PythonGatewayServer"; + + private static final String PYTHON_DRIVER_CLASS_NAME = "org.apache.flink.client.python.PythonDriver"; + + /** + * Creates a {@link JobGraph} with a specified {@link JobID} from the given {@link + * PackagedProgram}. + * + * @param packagedProgram to extract the JobGraph from + * @param configuration to use for the optimizer and job graph generator + * @param defaultParallelism for the JobGraph + * @param jobID the pre-generated job id + * @return JobGraph extracted from the PackagedProgram + * @throws ProgramInvocationException if the JobGraph generation failed + */ + public static JobGraph createJobGraph( + PackagedProgram packagedProgram, + Configuration configuration, + int defaultParallelism, + @Nullable JobID jobID, + boolean suppressOutput) + throws ProgramInvocationException { + final Pipeline pipeline = + getPipelineFromProgram(packagedProgram, configuration, defaultParallelism, suppressOutput); + final JobGraph jobGraph = FlinkPipelineTranslationUtil.getJobGraphUnderUserClassLoader( + packagedProgram.getUserCodeClassLoader(), pipeline, configuration, defaultParallelism); + if (jobID != null) { + jobGraph.setJobID(jobID); + } + jobGraph.addJars(packagedProgram.getJobJarAndDependencies()); + jobGraph.setClasspaths(packagedProgram.getClasspaths()); + jobGraph.setSavepointRestoreSettings(packagedProgram.getSavepointSettings()); + + return jobGraph; + } + + /** + * Creates a {@link JobGraph} with a random {@link JobID} from the given {@link + * PackagedProgram}. + * + * @param packagedProgram to extract the JobGraph from + * @param configuration to use for the optimizer and job graph generator + * @param defaultParallelism for the JobGraph + * @param suppressOutput Whether to suppress stdout/stderr during interactive JobGraph creation. + * @return JobGraph extracted from the PackagedProgram + * @throws ProgramInvocationException if the JobGraph generation failed + */ + public static JobGraph createJobGraph( + PackagedProgram packagedProgram, + Configuration configuration, + int defaultParallelism, + boolean suppressOutput) + throws ProgramInvocationException { + return createJobGraph(packagedProgram, configuration, defaultParallelism, null, suppressOutput); + } + + public static Pipeline getPipelineFromProgram( + PackagedProgram program, Configuration configuration, int parallelism, boolean suppressOutput) + throws CompilerException, ProgramInvocationException { + final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader(); + + Thread.currentThread().setContextClassLoader(program.getUserCodeClassLoader()); + + final PrintStream originalOut = System.out; + final PrintStream originalErr = System.err; + final ByteArrayOutputStream stdOutBuffer; + final ByteArrayOutputStream stdErrBuffer; + + if (suppressOutput) { + // temporarily write STDERR and STDOUT to a byte array. + stdOutBuffer = new ByteArrayOutputStream(); + System.setOut(new PrintStream(stdOutBuffer)); + stdErrBuffer = new ByteArrayOutputStream(); + System.setErr(new PrintStream(stdErrBuffer)); + } else { + stdOutBuffer = null; + stdErrBuffer = null; + } + + // temporary hack to support the optimizer plan preview + OptimizerPlanEnvironment benv = + new OptimizerPlanEnvironment(configuration, program.getUserCodeClassLoader(), parallelism); + benv.setAsContext(); + StreamPlanEnvironment senv = + new StreamPlanEnvironment(configuration, program.getUserCodeClassLoader(), parallelism); + senv.setAsContext(); + + try { + program.invokeInteractiveModeForExecution(); + } catch (Throwable t) { + if (benv.getPipeline() != null) { + return benv.getPipeline(); + } + + if (senv.getPipeline() != null) { + return senv.getPipeline(); + } + + if (t instanceof ProgramInvocationException) { + throw t; + } + + throw generateException(program, "The program caused an error: ", t, stdOutBuffer, stdErrBuffer); + } finally { + benv.unsetAsContext(); + senv.unsetAsContext(); + if (suppressOutput) { + System.setOut(originalOut); + System.setErr(originalErr); + } + Thread.currentThread().setContextClassLoader(contextClassLoader); + } + + throw generateException( + program, + "The program plan could not be fetched - the program aborted pre-maturely.", + null, + stdOutBuffer, + stdErrBuffer); + } + + public static Boolean isPython(String entryPointClassName) { + return (entryPointClassName != null) + && (entryPointClassName.equals(PYTHON_DRIVER_CLASS_NAME) + || entryPointClassName.equals(PYTHON_GATEWAY_CLASS_NAME)); + } + + public static boolean isPython(String[] programArguments) { + return CollectionUtils.containsAny( + Arrays.asList(programArguments), Arrays.asList("-py", "-pym", "--python", "--pyModule")); + } + + // override + public static URL getPythonJar() { + String flinkOptPath = System.getenv(ConfigConstants.ENV_FLINK_OPT_DIR); + final List pythonJarPath = new ArrayList<>(); + try { + + if (flinkOptPath == null) { + Class pyClass = Class.forName( + getPythonDriverClassName(), + false, + Thread.currentThread().getContextClassLoader()); + return pyClass.getProtectionDomain().getCodeSource().getLocation(); + } else { + Files.walkFileTree(FileSystems.getDefault().getPath(flinkOptPath), new SimpleFileVisitor() { + @Override + public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException { + FileVisitResult result = super.visitFile(file, attrs); + if (file.getFileName().toString().startsWith("flink-python")) { + pythonJarPath.add(file); + } + return result; + } + }); + } + } catch (Throwable e) { + throw new RuntimeException( + "Exception encountered during finding the flink-python jar. This should not happen.", e); + } + + if (pythonJarPath.size() != 1) { + throw new RuntimeException("Found " + pythonJarPath.size() + " flink-python jar."); + } + + try { + return pythonJarPath.get(0).toUri().toURL(); + } catch (MalformedURLException e) { + throw new RuntimeException("URL is invalid. This should not happen.", e); + } + } + + public static String getPythonDriverClassName() { + return PYTHON_DRIVER_CLASS_NAME; + } + + public static URI resolveURI(String path) throws URISyntaxException { + final URI uri = new URI(path); + if (uri.getScheme() != null) { + return uri; + } + return new File(path).getAbsoluteFile().toURI(); + } + + private static ProgramInvocationException generateException( + PackagedProgram program, + String msg, + @Nullable Throwable cause, + @Nullable ByteArrayOutputStream stdoutBuffer, + @Nullable ByteArrayOutputStream stderrBuffer) { + checkState( + (stdoutBuffer != null) == (stderrBuffer != null), + "Stderr/Stdout should either both be set or both be null."); + + final String stdout = (stdoutBuffer != null) ? stdoutBuffer.toString() : ""; + final String stderr = (stderrBuffer != null) ? stderrBuffer.toString() : ""; + + return new ProgramInvocationException( + String.format( + "%s\n\nClasspath: %s\n\nSystem.out: %s\n\nSystem.err: %s", + msg, + program.getJobJarAndDependencies(), + stdout.length() == 0 ? "(none)" : stdout, + stderr.length() == 0 ? "(none)" : stderr), + cause); + } +} diff --git a/dinky-admin/src/main/java/org/dinky/service/resource/BaseResourceManager.java b/dinky-client/dinky-client-base/src/main/java/org/dinky/resource/BaseResourceManager.java similarity index 88% rename from dinky-admin/src/main/java/org/dinky/service/resource/BaseResourceManager.java rename to dinky-client/dinky-client-base/src/main/java/org/dinky/resource/BaseResourceManager.java index c4dde4bb38..de6c733a3a 100644 --- a/dinky-admin/src/main/java/org/dinky/service/resource/BaseResourceManager.java +++ b/dinky-client/dinky-client-base/src/main/java/org/dinky/resource/BaseResourceManager.java @@ -17,15 +17,15 @@ * */ -package org.dinky.service.resource; +package org.dinky.resource; import org.dinky.data.exception.DinkyException; -import org.dinky.data.model.Resources; +import org.dinky.data.model.ResourcesVO; import org.dinky.data.model.SystemConfiguration; import org.dinky.oss.OssTemplate; -import org.dinky.service.resource.impl.HdfsResourceManager; -import org.dinky.service.resource.impl.LocalResourceManager; -import org.dinky.service.resource.impl.OssResourceManager; +import org.dinky.resource.impl.HdfsResourceManager; +import org.dinky.resource.impl.LocalResourceManager; +import org.dinky.resource.impl.OssResourceManager; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -35,8 +35,6 @@ import java.nio.charset.Charset; import java.util.List; -import org.springframework.web.multipart.MultipartFile; - import cn.hutool.core.io.FileUtil; import cn.hutool.core.io.IoUtil; import cn.hutool.core.lang.Opt; @@ -50,13 +48,13 @@ public interface BaseResourceManager { void rename(String path, String newPath); - void putFile(String path, MultipartFile file); + void putFile(String path, InputStream fileStream); void putFile(String path, File file); String getFileContent(String path); - List getFullDirectoryStructure(int rootId); + List getFullDirectoryStructure(int rootId); InputStream readFile(String path); @@ -85,9 +83,10 @@ static void initResourceManager() { final Configuration configuration = new Configuration(); Charset charset = Charset.defaultCharset(); String coreSite = instances.getResourcesHdfsCoreSite().getValue(); - Opt.ofBlankAble(coreSite).ifPresent(x -> configuration.addResource(IoUtil.toStream(x, charset), true)); + Opt.ofBlankAble(coreSite).ifPresent(x -> configuration.addResource(IoUtil.toStream(x, charset))); String hdfsSite = instances.getResourcesHdfsHdfsSite().getValue(); - Opt.ofBlankAble(hdfsSite).ifPresent(x -> configuration.addResource(IoUtil.toStream(x, charset), true)); + Opt.ofBlankAble(hdfsSite).ifPresent(x -> configuration.addResource(IoUtil.toStream(x, charset))); + configuration.reloadConfiguration(); if (StrUtil.isEmpty(coreSite)) { configuration.set( "fs.defaultFS", diff --git a/dinky-admin/src/main/java/org/dinky/service/resource/impl/HdfsResourceManager.java b/dinky-client/dinky-client-base/src/main/java/org/dinky/resource/impl/HdfsResourceManager.java similarity index 90% rename from dinky-admin/src/main/java/org/dinky/service/resource/impl/HdfsResourceManager.java rename to dinky-client/dinky-client-base/src/main/java/org/dinky/resource/impl/HdfsResourceManager.java index 637e9cb5fa..9d8bfcc385 100644 --- a/dinky-admin/src/main/java/org/dinky/service/resource/impl/HdfsResourceManager.java +++ b/dinky-client/dinky-client-base/src/main/java/org/dinky/resource/impl/HdfsResourceManager.java @@ -17,11 +17,11 @@ * */ -package org.dinky.service.resource.impl; +package org.dinky.resource.impl; import org.dinky.data.exception.BusException; -import org.dinky.data.model.Resources; -import org.dinky.service.resource.BaseResourceManager; +import org.dinky.data.model.ResourcesVO; +import org.dinky.resource.BaseResourceManager; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; @@ -32,8 +32,6 @@ import java.io.InputStream; import java.util.List; -import org.springframework.web.multipart.MultipartFile; - import cn.hutool.core.io.FileUtil; import cn.hutool.core.io.IoUtil; @@ -59,10 +57,10 @@ public void rename(String path, String newPath) { } @Override - public void putFile(String path, MultipartFile file) { + public void putFile(String path, InputStream fileStream) { try { FSDataOutputStream stream = getHdfs().create(new Path(getFilePath(path)), true); - stream.write(file.getBytes()); + stream.write(IoUtil.readBytes(fileStream)); stream.flush(); stream.close(); } catch (IOException e) { @@ -88,7 +86,7 @@ public String getFileContent(String path) { } @Override - public List getFullDirectoryStructure(int rootId) { + public List getFullDirectoryStructure(int rootId) { throw new RuntimeException("Sync HDFS Not implemented!"); } diff --git a/dinky-admin/src/main/java/org/dinky/service/resource/impl/LocalResourceManager.java b/dinky-client/dinky-client-base/src/main/java/org/dinky/resource/impl/LocalResourceManager.java similarity index 89% rename from dinky-admin/src/main/java/org/dinky/service/resource/impl/LocalResourceManager.java rename to dinky-client/dinky-client-base/src/main/java/org/dinky/resource/impl/LocalResourceManager.java index 0303e1c57c..421ef369f5 100644 --- a/dinky-admin/src/main/java/org/dinky/service/resource/impl/LocalResourceManager.java +++ b/dinky-client/dinky-client-base/src/main/java/org/dinky/resource/impl/LocalResourceManager.java @@ -17,11 +17,11 @@ * */ -package org.dinky.service.resource.impl; +package org.dinky.resource.impl; import org.dinky.data.exception.BusException; -import org.dinky.data.model.Resources; -import org.dinky.service.resource.BaseResourceManager; +import org.dinky.data.model.ResourcesVO; +import org.dinky.resource.BaseResourceManager; import java.io.BufferedInputStream; import java.io.File; @@ -35,8 +35,6 @@ import java.util.stream.Collectors; import java.util.stream.Stream; -import org.springframework.web.multipart.MultipartFile; - import cn.hutool.core.io.FileUtil; import cn.hutool.core.io.IORuntimeException; import cn.hutool.core.io.IoUtil; @@ -69,10 +67,10 @@ public void rename(String path, String newPath) { } @Override - public void putFile(String path, MultipartFile file) { + public void putFile(String path, InputStream fileStream) { try { - FileUtil.writeFromStream(file.getInputStream(), getFilePath(path)); - } catch (IOException e) { + FileUtil.writeFromStream(fileStream, getFilePath(path)); + } catch (Exception e) { log.error("putFile file failed", e); throw new BusException(e.getMessage()); } @@ -90,7 +88,7 @@ public String getFileContent(String path) { } @Override - public List getFullDirectoryStructure(int rootId) { + public List getFullDirectoryStructure(int rootId) { String basePath = getBasePath(); try (Stream paths = Files.walk(Paths.get(basePath))) { return paths.map(path -> { @@ -106,7 +104,7 @@ public List getFullDirectoryStructure(int rootId) { String self = path.toString().replace(basePath, ""); int pid = parent.isEmpty() ? rootId : parent.hashCode(); File file = new File(path.toString()); - return Resources.builder() + return ResourcesVO.builder() .id(self.hashCode()) .pid(pid) .fullName(self) diff --git a/dinky-admin/src/main/java/org/dinky/service/resource/impl/OssResourceManager.java b/dinky-client/dinky-client-base/src/main/java/org/dinky/resource/impl/OssResourceManager.java similarity index 90% rename from dinky-admin/src/main/java/org/dinky/service/resource/impl/OssResourceManager.java rename to dinky-client/dinky-client-base/src/main/java/org/dinky/resource/impl/OssResourceManager.java index f8c9c75e59..e41c958e77 100644 --- a/dinky-admin/src/main/java/org/dinky/service/resource/impl/OssResourceManager.java +++ b/dinky-client/dinky-client-base/src/main/java/org/dinky/resource/impl/OssResourceManager.java @@ -17,13 +17,13 @@ * */ -package org.dinky.service.resource.impl; +package org.dinky.resource.impl; import org.dinky.data.exception.BusException; import org.dinky.data.exception.DinkyException; -import org.dinky.data.model.Resources; +import org.dinky.data.model.ResourcesVO; import org.dinky.oss.OssTemplate; -import org.dinky.service.resource.BaseResourceManager; +import org.dinky.resource.BaseResourceManager; import java.io.File; import java.io.InputStream; @@ -32,8 +32,6 @@ import java.util.List; import java.util.Map; -import org.springframework.web.multipart.MultipartFile; - import com.amazonaws.services.s3.model.CopyObjectRequest; import com.amazonaws.services.s3.model.DeleteObjectRequest; import com.amazonaws.services.s3.model.S3ObjectSummary; @@ -63,9 +61,9 @@ public void rename(String path, String newPath) { } @Override - public void putFile(String path, MultipartFile file) { + public void putFile(String path, InputStream fileStream) { try { - getOssTemplate().putObject(getOssTemplate().getBucketName(), getFilePath(path), file.getInputStream()); + getOssTemplate().putObject(getOssTemplate().getBucketName(), getFilePath(path), fileStream); } catch (Exception e) { throw new DinkyException(e); } @@ -87,12 +85,12 @@ public String getFileContent(String path) { } @Override - public List getFullDirectoryStructure(int rootId) { + public List getFullDirectoryStructure(int rootId) { String basePath = getBasePath(); List listBucketObjects = getOssTemplate().listBucketObjects(getOssTemplate().getBucketName(), basePath); - Map resourcesMap = new HashMap<>(); + Map resourcesMap = new HashMap<>(); for (S3ObjectSummary obj : listBucketObjects) { obj.setKey(obj.getKey().replace(basePath, "")); @@ -105,7 +103,7 @@ public List getFullDirectoryStructure(int rootId) { String s = split[i]; int pid = parent.isEmpty() ? rootId : parent.hashCode(); parent = parent + "/" + s; - Resources.ResourcesBuilder builder = Resources.builder() + ResourcesVO.ResourcesVOBuilder builder = ResourcesVO.builder() .id(parent.hashCode()) .pid(pid) .fullName(parent) diff --git a/dinky-client/dinky-client-base/src/main/java/org/dinky/trans/dml/ExecuteJarOperation.java b/dinky-client/dinky-client-base/src/main/java/org/dinky/trans/dml/ExecuteJarOperation.java index e8bf2ddcad..2815d71b52 100644 --- a/dinky-client/dinky-client-base/src/main/java/org/dinky/trans/dml/ExecuteJarOperation.java +++ b/dinky-client/dinky-client-base/src/main/java/org/dinky/trans/dml/ExecuteJarOperation.java @@ -39,6 +39,7 @@ import java.util.Optional; import cn.hutool.core.lang.Assert; +import cn.hutool.core.lang.Opt; import cn.hutool.core.util.StrUtil; import lombok.Getter; import lombok.Setter; @@ -73,7 +74,16 @@ public static StreamGraph getStreamGraph(JarSubmitParam submitParam, CustomTable PackagedProgram program; try { Configuration configuration = tEnv.getConfig().getConfiguration(); - File file = URLUtils.toFile(submitParam.getUri()); + File file = + Opt.ofBlankAble(submitParam.getUri()).map(URLUtils::toFile).orElse(null); + if (!PackagedProgramUtils.isPython(submitParam.getMainClass())) { + tEnv.addJar(file); + } else { + // python submit + submitParam.setArgs("--python " + file.getAbsolutePath() + " " + + Opt.ofBlankAble(submitParam.getArgs()).orElse("")); + file = null; + } program = PackagedProgram.newBuilder() .setJarFile(file) .setEntryPointClassName(submitParam.getMainClass()) @@ -81,7 +91,6 @@ public static StreamGraph getStreamGraph(JarSubmitParam submitParam, CustomTable .setSavepointRestoreSettings(savepointRestoreSettings) .setArguments(RunTimeUtil.handleCmds(submitParam.getArgs())) .build(); - tEnv.addJar(file); Pipeline pipeline = PackagedProgramUtils.getPipelineFromProgram(program, configuration, 1, true); Assert.isTrue(pipeline instanceof StreamGraph, "can not translate"); return (StreamGraph) pipeline; diff --git a/dinky-client/dinky-client-base/src/main/java/org/dinky/url/ResourceFileSystem.java b/dinky-client/dinky-client-base/src/main/java/org/dinky/url/ResourceFileSystem.java new file mode 100644 index 0000000000..04aac35d88 --- /dev/null +++ b/dinky-client/dinky-client-base/src/main/java/org/dinky/url/ResourceFileSystem.java @@ -0,0 +1,136 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.dinky.url; + +import org.dinky.resource.BaseResourceManager; + +import org.apache.flink.core.fs.BlockLocation; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.FileSystemKind; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.fs.local.LocalDataInputStream; +import org.apache.flink.core.fs.local.LocalFileStatus; + +import java.io.File; +import java.io.IOException; +import java.net.URI; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class ResourceFileSystem extends FileSystem { + private static final BaseResourceManager BASE_RESOURCE_MANAGER = BaseResourceManager.getInstance(); + + public static final URI URI_SCHEMA = URI.create("rs:/"); + private static final ResourceFileSystem INSTANCE = new ResourceFileSystem(); + + @Override + public Path getWorkingDirectory() { + return null; + } + + @Override + public Path getHomeDirectory() { + return null; + } + + @Override + public URI getUri() { + return URI_SCHEMA; + } + + @Override + public FileStatus getFileStatus(Path f) throws IOException { + return new LocalFileStatus(getFile(f), this); + } + + protected File getFile(Path f) { + return new File(BASE_RESOURCE_MANAGER.getFilePath(f.getPath())); + } + + @Override + public BlockLocation[] getFileBlockLocations(FileStatus file, long start, long len) throws IOException { + return new BlockLocation[0]; + } + + @Override + public FSDataInputStream open(Path f, int bufferSize) throws IOException { + return open(f); + } + + @Override + public FSDataInputStream open(Path f) throws IOException { + return new LocalDataInputStream(getFile(f)); + } + + @Override + public FileStatus[] listStatus(Path f) throws IOException { + return new FileStatus[0]; + } + + @Override + public boolean delete(Path f, boolean recursive) throws IOException { + try { + BASE_RESOURCE_MANAGER.remove(f.getPath()); + return true; + } catch (Exception e) { + log.error("delete file failed, path: {}", f.getPath(), e); + } + return false; + } + + @Override + public boolean mkdirs(Path f) throws IOException { + return false; + } + + @Override + public FSDataOutputStream create(Path f, WriteMode overwriteMode) throws IOException { + return null; + } + + @Override + public boolean rename(Path src, Path dst) throws IOException { + try { + BASE_RESOURCE_MANAGER.rename(src.getPath(), dst.getPath()); + return true; + } catch (Exception e) { + log.error("rename file failed, src: {}, dst: {}", src.getPath(), dst.getPath(), e); + } + return false; + } + + @Override + public boolean isDistributedFS() { + return true; + } + + @Override + public FileSystemKind getKind() { + return FileSystemKind.OBJECT_STORE; + } + + public static ResourceFileSystem getSharedInstance() { + return INSTANCE; + } +} diff --git a/dinky-app/dinky-app-base/src/main/java/org/dinky/app/resource/impl/LocalResourceManager.java b/dinky-client/dinky-client-base/src/main/java/org/dinky/url/ResourceFileSystemFactory.java similarity index 53% rename from dinky-app/dinky-app-base/src/main/java/org/dinky/app/resource/impl/LocalResourceManager.java rename to dinky-client/dinky-client-base/src/main/java/org/dinky/url/ResourceFileSystemFactory.java index 229020ef91..c87cfd3bf8 100644 --- a/dinky-app/dinky-app-base/src/main/java/org/dinky/app/resource/impl/LocalResourceManager.java +++ b/dinky-client/dinky-client-base/src/main/java/org/dinky/url/ResourceFileSystemFactory.java @@ -17,30 +17,25 @@ * */ -package org.dinky.app.resource.impl; +package org.dinky.url; -import org.dinky.app.resource.BaseResourceManager; -import org.dinky.data.model.SystemConfiguration; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.FileSystemFactory; import java.io.IOException; -import java.io.InputStream; -import java.net.URL; +import java.net.URI; -import cn.hutool.core.util.URLUtil; -import lombok.extern.slf4j.Slf4j; +import com.google.auto.service.AutoService; -@Slf4j -public class LocalResourceManager implements BaseResourceManager { - SystemConfiguration systemConfiguration = SystemConfiguration.getInstances(); +@AutoService(FileSystemFactory.class) +public class ResourceFileSystemFactory implements FileSystemFactory { + @Override + public String getScheme() { + return ResourceFileSystem.URI_SCHEMA.getScheme(); + } @Override - public InputStream readFile(String path) { - try { - return new URL("http://" + systemConfiguration.getDinkyAddr().getValue() + "/download/downloadFromRs?path=" - + URLUtil.encode(path)) - .openStream(); - } catch (IOException e) { - throw new RuntimeException(e); - } + public FileSystem create(URI fsUri) throws IOException { + return ResourceFileSystem.getSharedInstance(); } } diff --git a/dinky-app/dinky-app-base/src/main/java/org/dinky/app/url/RsURLConnection.java b/dinky-client/dinky-client-base/src/main/java/org/dinky/url/RsURLConnection.java similarity index 95% rename from dinky-app/dinky-app-base/src/main/java/org/dinky/app/url/RsURLConnection.java rename to dinky-client/dinky-client-base/src/main/java/org/dinky/url/RsURLConnection.java index 2ed8ebb983..007a5b0f73 100644 --- a/dinky-app/dinky-app-base/src/main/java/org/dinky/app/url/RsURLConnection.java +++ b/dinky-client/dinky-client-base/src/main/java/org/dinky/url/RsURLConnection.java @@ -17,10 +17,10 @@ * */ -package org.dinky.app.url; +package org.dinky.url; -import org.dinky.app.resource.BaseResourceManager; import org.dinky.data.exception.BusException; +import org.dinky.resource.BaseResourceManager; import java.io.InputStream; import java.net.URL; diff --git a/dinky-app/dinky-app-base/src/main/java/org/dinky/app/url/RsURLStreamHandler.java b/dinky-client/dinky-client-base/src/main/java/org/dinky/url/RsURLStreamHandler.java similarity index 97% rename from dinky-app/dinky-app-base/src/main/java/org/dinky/app/url/RsURLStreamHandler.java rename to dinky-client/dinky-client-base/src/main/java/org/dinky/url/RsURLStreamHandler.java index 5523cf62c4..71bb1a7f18 100644 --- a/dinky-app/dinky-app-base/src/main/java/org/dinky/app/url/RsURLStreamHandler.java +++ b/dinky-client/dinky-client-base/src/main/java/org/dinky/url/RsURLStreamHandler.java @@ -17,7 +17,7 @@ * */ -package org.dinky.app.url; +package org.dinky.url; import java.net.URL; import java.net.URLConnection; diff --git a/dinky-app/dinky-app-base/src/main/java/org/dinky/app/url/RsURLStreamHandlerFactory.java b/dinky-client/dinky-client-base/src/main/java/org/dinky/url/RsURLStreamHandlerFactory.java similarity index 93% rename from dinky-app/dinky-app-base/src/main/java/org/dinky/app/url/RsURLStreamHandlerFactory.java rename to dinky-client/dinky-client-base/src/main/java/org/dinky/url/RsURLStreamHandlerFactory.java index 6f6874b2d7..b452a583cb 100644 --- a/dinky-app/dinky-app-base/src/main/java/org/dinky/app/url/RsURLStreamHandlerFactory.java +++ b/dinky-client/dinky-client-base/src/main/java/org/dinky/url/RsURLStreamHandlerFactory.java @@ -17,7 +17,7 @@ * */ -package org.dinky.app.url; +package org.dinky.url; import java.net.URLStreamHandler; import java.net.URLStreamHandlerFactory; @@ -27,12 +27,12 @@ public class RsURLStreamHandlerFactory implements URLStreamHandlerFactory { @Override public URLStreamHandler createURLStreamHandler(String protocol) { - if ("rs".equals(protocol)) { + if (ResourceFileSystem.URI_SCHEMA.getScheme().equals(protocol)) { return new RsURLStreamHandler(); } try { Class.forName("org.apache.hadoop.fs.FsUrlStreamHandlerFactory"); - } catch (Exception e) { + } catch (Throwable e) { return null; } String name = PREFIX + "." + protocol + ".Handler"; diff --git a/dinky-common/pom.xml b/dinky-common/pom.xml index fd785853db..46b10609c8 100644 --- a/dinky-common/pom.xml +++ b/dinky-common/pom.xml @@ -31,6 +31,11 @@ Dinky : Common + + com.google.auto.service + auto-service + 1.1.1 + com.amazonaws aws-java-sdk-s3 diff --git a/dinky-common/src/main/java/org/dinky/data/model/ResourcesVO.java b/dinky-common/src/main/java/org/dinky/data/model/ResourcesVO.java new file mode 100644 index 0000000000..e7644f600f --- /dev/null +++ b/dinky-common/src/main/java/org/dinky/data/model/ResourcesVO.java @@ -0,0 +1,118 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.dinky.data.model; + +import java.time.LocalDateTime; + +import com.fasterxml.jackson.annotation.JsonFormat; + +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@ApiModel(value = "Resources", description = "Resource Information") +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class ResourcesVO { + + @ApiModelProperty(value = "ID", dataType = "Integer", example = "1", notes = "Unique identifier for the resource") + private Integer id; + + @ApiModelProperty(value = "File Name", dataType = "String", example = "example.txt", notes = "Name of the file") + private String fileName; + + @ApiModelProperty(value = "Description", dataType = "String", notes = "Description or details about the resource") + private String description; + + @ApiModelProperty( + value = "User ID", + dataType = "Integer", + example = "1001", + notes = "ID of the user who owns the resource") + private Integer userId; + + @ApiModelProperty( + value = "Resource Type", + dataType = "Integer", + example = "0", + notes = "Type of the resource (0 for FILE, 1 for UDF)") + private Integer type; + + @ApiModelProperty( + value = "Resource Size", + dataType = "Long", + example = "1024", + notes = "Size of the resource in bytes") + private Long size; + + @ApiModelProperty( + value = "Parent ID", + dataType = "Integer", + example = "0", + notes = "ID of the parent resource (if applicable)") + private Integer pid; + + @ApiModelProperty( + value = "Full Name", + dataType = "String", + example = "path/to/example.txt", + notes = "Full name or path of the resource") + private String fullName; + + @ApiModelProperty( + value = "Is Directory", + dataType = "Boolean", + notes = "Flag indicating if the resource is a directory") + private Boolean isDirectory; + + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss") + @ApiModelProperty( + value = "Create Time", + dataType = "String", + notes = "Timestamp indicating the creation time of the resource") + private LocalDateTime createTime; + + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss") + @ApiModelProperty( + value = "Update Time", + dataType = "String", + notes = "Timestamp indicating the last update time of the resource") + private LocalDateTime updateTime; + + private static final long serialVersionUID = 1L; + + @ApiModelProperty( + value = "Is Leaf", + dataType = "boolean", + example = "false", + notes = "Indicates whether the tree node is a leaf node (true/false)") + private boolean isLeaf; + + @ApiModelProperty(value = "Creator", required = true, dataType = "Integer", example = "creator") + private Integer creator; + + @ApiModelProperty(value = "Updater", required = true, dataType = "Integer", example = "updater") + private Integer updater; +} diff --git a/docs/docs/extend/expand_statements/execute_jar.md b/docs/docs/extend/expand_statements/execute_jar.md index 12cbfc92c6..91fc879a7a 100644 --- a/docs/docs/extend/expand_statements/execute_jar.md +++ b/docs/docs/extend/expand_statements/execute_jar.md @@ -29,14 +29,14 @@ EXECUTE JAR WITH ( ``` -## Demo: +## Jar包任务提交 Demo: ```sql EXECUTE JAR WITH ( 'uri'='rs:/jar/flink/demo/SocketWindowWordCount.jar', 'main-class'='org.apache.flink.streaming.examples.socket', 'args'=' --hostname localhost ', -'parallelism'='', +'parallelism'='' ); ``` :::warning 注意 @@ -44,4 +44,17 @@ EXECUTE JAR WITH ( 1. 以上示例中, uri 的值为 rs:/jar/flink/demo/SocketWindowWordCount.jar, 该值为资源中心中的资源路径, 请确保资源中心中存在该资源,请忽略资源中心 Root 节点(该节点为虚拟节点) 2. 如果要读取S3,HDFS,LCOAL等存储上面的文件均可通过rs协议进行桥接使用,请参考 [资源管理](../../user_guide/register_center/resource) 中 rs 协议使用方式 -::: \ No newline at end of file +::: + +## PyFlink 任务提交: + +```sql +EXECUTE JAR WITH ( +'uri'='rs:/test_flink.py', +'main-class'='org.apache.flink.client.python.PythonDriver', +'args'='' +); +``` +:::warning 注意 +1. 以上示例中, main-class 的值为 `org.apache.flink.client.python.PythonDriver`, 该值为 Python 的必填值和固定值 +:::