diff --git a/dinky-admin/src/main/java/org/dinky/init/SystemInit.java b/dinky-admin/src/main/java/org/dinky/init/SystemInit.java index a06608afe5..791ea505f6 100644 --- a/dinky-admin/src/main/java/org/dinky/init/SystemInit.java +++ b/dinky-admin/src/main/java/org/dinky/init/SystemInit.java @@ -19,8 +19,6 @@ package org.dinky.init; -import static org.apache.hadoop.fs.FileSystem.getDefaultUri; - import org.dinky.assertion.Asserts; import org.dinky.context.TenantContextHolder; import org.dinky.daemon.constant.FlinkTaskConstant; @@ -35,14 +33,12 @@ import org.dinky.data.model.Task; import org.dinky.data.model.job.JobInstance; import org.dinky.data.model.rbac.Tenant; -import org.dinky.data.properties.OssProperties; import org.dinky.function.constant.PathConstant; import org.dinky.function.pool.UdfCodePool; import org.dinky.job.ClearJobHistoryTask; import org.dinky.job.DynamicResizeFlinkJobPoolTask; import org.dinky.job.FlinkJobTask; import org.dinky.job.SystemMetricsTask; -import org.dinky.oss.OssTemplate; import org.dinky.scheduler.client.ProjectClient; import org.dinky.scheduler.exception.SchedulerException; import org.dinky.scheduler.model.Project; @@ -51,14 +47,12 @@ import org.dinky.service.SysConfigService; import org.dinky.service.TaskService; import org.dinky.service.TenantService; -import org.dinky.service.resource.impl.HdfsResourceManager; -import org.dinky.service.resource.impl.OssResourceManager; +import org.dinky.service.resource.BaseResourceManager; import org.dinky.url.RsURLStreamHandlerFactory; import org.dinky.utils.JsonUtils; import org.dinky.utils.UDFUtils; import org.apache.catalina.webresources.TomcatURLStreamHandlerFactory; -import org.apache.hadoop.fs.FileSystem; import java.util.List; import java.util.concurrent.TimeUnit; @@ -77,7 +71,6 @@ import cn.hutool.core.collection.CollUtil; import cn.hutool.core.convert.Convert; import cn.hutool.core.io.FileUtil; -import cn.hutool.core.lang.Singleton; import cn.hutool.core.util.StrUtil; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -140,49 +133,7 @@ private void initResources() { .forEach(x -> x.addParameterCheck(y -> { if (Boolean.TRUE.equals( systemConfiguration.getResourcesEnable().getValue())) { - switch (systemConfiguration.getResourcesModel().getValue()) { - case OSS: - OssProperties ossProperties = new OssProperties(); - ossProperties.setAccessKey(systemConfiguration - .getResourcesOssAccessKey() - .getValue()); - ossProperties.setSecretKey(systemConfiguration - .getResourcesOssSecretKey() - .getValue()); - ossProperties.setEndpoint(systemConfiguration - .getResourcesOssEndpoint() - .getValue()); - ossProperties.setBucketName(systemConfiguration - .getResourcesOssBucketName() - .getValue()); - ossProperties.setRegion(systemConfiguration - .getResourcesOssRegion() - .getValue()); - ossProperties.setPathStyleAccess(systemConfiguration - .getResourcesPathStyleAccess() - .getValue()); - Singleton.get(OssResourceManager.class).setOssTemplate(new OssTemplate(ossProperties)); - break; - case HDFS: - final org.apache.hadoop.conf.Configuration configuration = - new org.apache.hadoop.conf.Configuration(); - configuration.set( - "fs.defaultFS", - systemConfiguration - .getResourcesHdfsDefaultFS() - .getValue()); - try { - FileSystem fileSystem = FileSystem.get( - getDefaultUri(configuration), - configuration, - systemConfiguration - .getResourcesHdfsUser() - .getValue()); - Singleton.get(HdfsResourceManager.class).setHdfs(fileSystem); - } catch (Exception e) { - throw new DinkyException(e); - } - } + BaseResourceManager.initResourceManager(); } })); } diff --git a/dinky-admin/src/main/java/org/dinky/service/resource/BaseResourceManager.java b/dinky-admin/src/main/java/org/dinky/service/resource/BaseResourceManager.java index a73c4fd888..6111cc2eb1 100644 --- a/dinky-admin/src/main/java/org/dinky/service/resource/BaseResourceManager.java +++ b/dinky-admin/src/main/java/org/dinky/service/resource/BaseResourceManager.java @@ -19,10 +19,16 @@ package org.dinky.service.resource; +import org.dinky.data.exception.DinkyException; import org.dinky.data.model.SystemConfiguration; +import org.dinky.oss.OssTemplate; import org.dinky.service.resource.impl.HdfsResourceManager; +import org.dinky.service.resource.impl.LocalResourceManager; import org.dinky.service.resource.impl.OssResourceManager; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; + import java.io.File; import java.io.InputStream; @@ -52,11 +58,37 @@ static BaseResourceManager getInstance() { return Singleton.get(HdfsResourceManager.class); case OSS: return Singleton.get(OssResourceManager.class); + case LOCAL: + return Singleton.get(LocalResourceManager.class); default: return null; } } + static void initResourceManager() { + switch (instances.getResourcesModel().getValue()) { + case LOCAL: + Singleton.get(LocalResourceManager.class); + case OSS: + OssTemplate template = new OssTemplate(instances.getOssProperties()); + Singleton.get(OssResourceManager.class).setOssTemplate(template); + break; + case HDFS: + final Configuration configuration = new Configuration(); + configuration.set( + "fs.defaultFS", instances.getResourcesHdfsDefaultFS().getValue()); + try { + FileSystem fileSystem = FileSystem.get( + FileSystem.getDefaultUri(configuration), + configuration, + instances.getResourcesHdfsUser().getValue()); + Singleton.get(HdfsResourceManager.class).setHdfs(fileSystem); + } catch (Exception e) { + throw new DinkyException(e); + } + } + } + default String getFilePath(String path) { return FileUtil.normalize( FileUtil.file(instances.getResourcesUploadBasePath().getValue(), path) diff --git a/dinky-admin/src/main/java/org/dinky/service/resource/impl/LocalResourceManager.java b/dinky-admin/src/main/java/org/dinky/service/resource/impl/LocalResourceManager.java new file mode 100644 index 0000000000..12f559a1f2 --- /dev/null +++ b/dinky-admin/src/main/java/org/dinky/service/resource/impl/LocalResourceManager.java @@ -0,0 +1,88 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.dinky.service.resource.impl; + +import org.dinky.data.exception.BusException; +import org.dinky.service.resource.BaseResourceManager; + +import java.io.BufferedInputStream; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; + +import org.springframework.web.multipart.MultipartFile; + +import cn.hutool.core.io.FileUtil; +import cn.hutool.core.io.IORuntimeException; +import cn.hutool.core.io.IoUtil; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class LocalResourceManager implements BaseResourceManager { + @Override + public void remove(String path) { + try { + boolean isSuccess = FileUtil.del(getFilePath(path)); + if (!isSuccess) { + throw new BusException("remove file failed,reason unknown"); + } + } catch (IORuntimeException e) { + log.error("remove file failed", e); + throw new BusException(e.getMessage()); + } + } + + @Override + public void rename(String path, String newPath) { + try { + String newName = FileUtil.getName(newPath); + FileUtil.rename(new File(getFilePath(path)), newName, true); + } catch (Exception e) { + log.error("rename file failed", e); + throw new BusException(e.getMessage()); + } + } + + @Override + public void putFile(String path, MultipartFile file) { + try { + FileUtil.writeFromStream(file.getInputStream(), getFilePath(path)); + } catch (IOException e) { + log.error("putFile file failed", e); + throw new BusException(e.getMessage()); + } + } + + @Override + public void putFile(String path, File file) { + BufferedInputStream inputStream = FileUtil.getInputStream(file); + FileUtil.writeFromStream(inputStream, getFilePath(path)); + } + + @Override + public String getFileContent(String path) { + return IoUtil.readUtf8(readFile(path)); + } + + @Override + public InputStream readFile(String path) { + return FileUtil.getInputStream(getFilePath(path)); + } +} diff --git a/dinky-admin/src/main/java/org/dinky/service/resource/impl/ResourceServiceImpl.java b/dinky-admin/src/main/java/org/dinky/service/resource/impl/ResourceServiceImpl.java index 17f0bde9f6..daa0d04512 100644 --- a/dinky-admin/src/main/java/org/dinky/service/resource/impl/ResourceServiceImpl.java +++ b/dinky-admin/src/main/java/org/dinky/service/resource/impl/ResourceServiceImpl.java @@ -276,13 +276,11 @@ public boolean remove(Integer id) { () -> new BusException(Status.ROOT_DIR_NOT_ALLOW_DELETE)); try { if (id < 1) { - getBaseResourceManager().remove("/"); // todo 删除主目录,实际是清空 remove(new LambdaQueryWrapper().ne(Resources::getId, 0)); } Resources byId = getById(id); if (isExistsChildren(id)) { - getBaseResourceManager().remove(byId.getFullName()); if (byId.getIsDirectory()) { List resourceByPidToChildren = getResourceByPidToChildren(new ArrayList<>(), byId.getId()); @@ -291,7 +289,6 @@ public boolean remove(Integer id) { List resourceByPidToParent = getResourceByPidToParent(new ArrayList<>(), byId.getPid()); resourceByPidToParent.forEach(x -> x.setSize(x.getSize() - byId.getSize())); updateBatchById(resourceByPidToParent); - getBaseResourceManager().remove(byId.getFullName()); return removeById(id); } return removeById(id); diff --git a/dinky-app/dinky-app-base/src/main/java/org/dinky/app/flinksql/Submitter.java b/dinky-app/dinky-app-base/src/main/java/org/dinky/app/flinksql/Submitter.java index 8f082972fe..70544bca03 100644 --- a/dinky-app/dinky-app-base/src/main/java/org/dinky/app/flinksql/Submitter.java +++ b/dinky-app/dinky-app-base/src/main/java/org/dinky/app/flinksql/Submitter.java @@ -19,13 +19,10 @@ package org.dinky.app.flinksql; -import static org.apache.hadoop.fs.FileSystem.getDefaultUri; - import org.dinky.app.db.DBUtil; import org.dinky.app.model.StatementParam; import org.dinky.app.model.SysConfig; -import org.dinky.app.resource.impl.HdfsResourceManager; -import org.dinky.app.resource.impl.OssResourceManager; +import org.dinky.app.resource.BaseResourceManager; import org.dinky.app.url.RsURLStreamHandlerFactory; import org.dinky.assertion.Asserts; import org.dinky.classloader.DinkyClassLoader; @@ -33,14 +30,11 @@ import org.dinky.constant.FlinkSQLConstant; import org.dinky.data.app.AppParamConfig; import org.dinky.data.app.AppTask; -import org.dinky.data.exception.DinkyException; import org.dinky.data.model.SystemConfiguration; -import org.dinky.data.properties.OssProperties; import org.dinky.executor.Executor; import org.dinky.executor.ExecutorConfig; import org.dinky.executor.ExecutorFactory; import org.dinky.interceptor.FlinkInterceptor; -import org.dinky.oss.OssTemplate; import org.dinky.parser.SqlType; import org.dinky.trans.Operations; import org.dinky.trans.dml.ExecuteJarOperation; @@ -53,8 +47,6 @@ import org.apache.commons.lang3.StringUtils; import org.apache.flink.configuration.PipelineOptions; import org.apache.flink.python.PythonOptions; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; import java.io.File; import java.io.FileOutputStream; @@ -80,7 +72,6 @@ import cn.hutool.core.collection.CollUtil; import cn.hutool.core.io.FileUtil; -import cn.hutool.core.lang.Singleton; import cn.hutool.core.util.ArrayUtil; import cn.hutool.core.util.URLUtil; import lombok.SneakyThrows; @@ -102,43 +93,9 @@ private static void initSystemConfiguration() throws SQLException { systemConfiguration.initSetConfiguration(configMap); } - private static void initResource() throws SQLException { - SystemConfiguration systemConfiguration = SystemConfiguration.getInstances(); - switch (systemConfiguration.getResourcesModel().getValue()) { - case OSS: - OssProperties ossProperties = new OssProperties(); - ossProperties.setAccessKey( - systemConfiguration.getResourcesOssAccessKey().getValue()); - ossProperties.setSecretKey( - systemConfiguration.getResourcesOssSecretKey().getValue()); - ossProperties.setEndpoint( - systemConfiguration.getResourcesOssEndpoint().getValue()); - ossProperties.setBucketName( - systemConfiguration.getResourcesOssBucketName().getValue()); - ossProperties.setRegion( - systemConfiguration.getResourcesOssRegion().getValue()); - Singleton.get(OssResourceManager.class).setOssTemplate(new OssTemplate(ossProperties)); - break; - case HDFS: - final Configuration configuration = new Configuration(); - configuration.set( - "fs.defaultFS", - systemConfiguration.getResourcesHdfsDefaultFS().getValue()); - try { - FileSystem fileSystem = FileSystem.get( - getDefaultUri(configuration), - configuration, - systemConfiguration.getResourcesHdfsUser().getValue()); - Singleton.get(HdfsResourceManager.class).setHdfs(fileSystem); - } catch (Exception e) { - throw new DinkyException(e); - } - } - } - public static void submit(AppParamConfig config) throws SQLException { initSystemConfiguration(); - initResource(); + BaseResourceManager.initResourceManager(); URL.setURLStreamHandlerFactory(new RsURLStreamHandlerFactory()); log.info("{} Start Submit Job:{}", LocalDateTime.now(), config.getTaskId()); diff --git a/dinky-app/dinky-app-base/src/main/java/org/dinky/app/resource/BaseResourceManager.java b/dinky-app/dinky-app-base/src/main/java/org/dinky/app/resource/BaseResourceManager.java index b144bf9820..8e6721e1eb 100644 --- a/dinky-app/dinky-app-base/src/main/java/org/dinky/app/resource/BaseResourceManager.java +++ b/dinky-app/dinky-app-base/src/main/java/org/dinky/app/resource/BaseResourceManager.java @@ -20,8 +20,14 @@ package org.dinky.app.resource; import org.dinky.app.resource.impl.HdfsResourceManager; +import org.dinky.app.resource.impl.LocalResourceManager; import org.dinky.app.resource.impl.OssResourceManager; +import org.dinky.data.exception.DinkyException; import org.dinky.data.model.SystemConfiguration; +import org.dinky.oss.OssTemplate; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import java.io.InputStream; @@ -39,11 +45,37 @@ static BaseResourceManager getInstance() { return Singleton.get(HdfsResourceManager.class); case OSS: return Singleton.get(OssResourceManager.class); + case LOCAL: + return Singleton.get(LocalResourceManager.class); default: return null; } } + static void initResourceManager() { + switch (instances.getResourcesModel().getValue()) { + case LOCAL: + Singleton.get(LocalResourceManager.class); + case OSS: + OssTemplate template = new OssTemplate(instances.getOssProperties()); + Singleton.get(OssResourceManager.class).setOssTemplate(template); + break; + case HDFS: + final Configuration configuration = new Configuration(); + configuration.set( + "fs.defaultFS", instances.getResourcesHdfsDefaultFS().getValue()); + try { + FileSystem fileSystem = FileSystem.get( + FileSystem.getDefaultUri(configuration), + configuration, + instances.getResourcesHdfsUser().getValue()); + Singleton.get(HdfsResourceManager.class).setHdfs(fileSystem); + } catch (Exception e) { + throw new DinkyException(e); + } + } + } + default String getFilePath(String path) { return FileUtil.normalize( FileUtil.file(instances.getResourcesUploadBasePath().getValue(), path) diff --git a/dinky-app/dinky-app-base/src/main/java/org/dinky/app/resource/impl/HdfsResourceManager.java b/dinky-app/dinky-app-base/src/main/java/org/dinky/app/resource/impl/HdfsResourceManager.java index bb53b27951..d85a9c54dc 100644 --- a/dinky-app/dinky-app-base/src/main/java/org/dinky/app/resource/impl/HdfsResourceManager.java +++ b/dinky-app/dinky-app-base/src/main/java/org/dinky/app/resource/impl/HdfsResourceManager.java @@ -21,29 +21,19 @@ import org.dinky.app.resource.BaseResourceManager; import org.dinky.data.exception.BusException; -import org.dinky.data.model.SystemConfiguration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import java.io.IOException; import java.io.InputStream; -import java.net.URL; - -import cn.hutool.core.util.URLUtil; public class HdfsResourceManager implements BaseResourceManager { FileSystem hdfs; - SystemConfiguration systemConfiguration = SystemConfiguration.getInstances(); @Override public InputStream readFile(String path) { try { - if (systemConfiguration.getResourcesHdfsDefaultFS().getValue().contains("file:/")) { - return new URL("http://" + systemConfiguration.getDinkyAddr().getValue() - + "/download/downloadFromRs?path=" + URLUtil.encode(path)) - .openStream(); - } return getHdfs().open(new Path(getFilePath(path))); } catch (IOException e) { throw BusException.valueOf("file.read.failed", e); diff --git a/dinky-app/dinky-app-base/src/main/java/org/dinky/app/resource/impl/LocalResourceManager.java b/dinky-app/dinky-app-base/src/main/java/org/dinky/app/resource/impl/LocalResourceManager.java new file mode 100644 index 0000000000..229020ef91 --- /dev/null +++ b/dinky-app/dinky-app-base/src/main/java/org/dinky/app/resource/impl/LocalResourceManager.java @@ -0,0 +1,46 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.dinky.app.resource.impl; + +import org.dinky.app.resource.BaseResourceManager; +import org.dinky.data.model.SystemConfiguration; + +import java.io.IOException; +import java.io.InputStream; +import java.net.URL; + +import cn.hutool.core.util.URLUtil; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class LocalResourceManager implements BaseResourceManager { + SystemConfiguration systemConfiguration = SystemConfiguration.getInstances(); + + @Override + public InputStream readFile(String path) { + try { + return new URL("http://" + systemConfiguration.getDinkyAddr().getValue() + "/download/downloadFromRs?path=" + + URLUtil.encode(path)) + .openStream(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } +} diff --git a/dinky-common/src/main/java/org/dinky/data/model/ResourcesModelEnum.java b/dinky-common/src/main/java/org/dinky/data/model/ResourcesModelEnum.java index b0ca4d5862..1b0fde37f9 100644 --- a/dinky-common/src/main/java/org/dinky/data/model/ResourcesModelEnum.java +++ b/dinky-common/src/main/java/org/dinky/data/model/ResourcesModelEnum.java @@ -20,6 +20,7 @@ package org.dinky.data.model; public enum ResourcesModelEnum { + LOCAL, HDFS, OSS } diff --git a/dinky-common/src/main/java/org/dinky/data/model/SystemConfiguration.java b/dinky-common/src/main/java/org/dinky/data/model/SystemConfiguration.java index 56aa8bdf4c..8daaddd98b 100644 --- a/dinky-common/src/main/java/org/dinky/data/model/SystemConfiguration.java +++ b/dinky-common/src/main/java/org/dinky/data/model/SystemConfiguration.java @@ -20,6 +20,7 @@ package org.dinky.data.model; import org.dinky.data.enums.Status; +import org.dinky.data.properties.OssProperties; import java.util.ArrayList; import java.util.Arrays; @@ -211,14 +212,15 @@ public static Configuration.OptionBuilder key(Status status) { .defaultValue(true) .note(Status.SYS_RESOURCE_SETTINGS_ENABLE_NOTE); + private final Configuration resourcesModel = key(Status.SYS_RESOURCE_SETTINGS_MODEL) + .enumType(ResourcesModelEnum.class) + .defaultValue(ResourcesModelEnum.LOCAL) + .note(Status.SYS_RESOURCE_SETTINGS_MODEL_NOTE); + private final Configuration resourcesUploadBasePath = key(Status.SYS_RESOURCE_SETTINGS_UPLOAD_BASE_PATH) .stringType() .defaultValue("/dinky") .note(Status.SYS_RESOURCE_SETTINGS_UPLOAD_BASE_PATH_NOTE); - private final Configuration resourcesModel = key(Status.SYS_RESOURCE_SETTINGS_MODEL) - .enumType(ResourcesModelEnum.class) - .defaultValue(ResourcesModelEnum.HDFS) - .note(Status.SYS_RESOURCE_SETTINGS_MODEL_NOTE); private final Configuration resourcesOssEndpoint = key(Status.SYS_RESOURCE_SETTINGS_OSS_ENDPOINT) .stringType() @@ -343,4 +345,16 @@ public String getMavenRepositoryPassword() { public String getPythonHome() { return pythonHome.getValue(); } + + public OssProperties getOssProperties() { + return OssProperties.builder() + .enable(true) + .endpoint(resourcesOssEndpoint.getValue()) + .accessKey(resourcesOssAccessKey.getValue()) + .secretKey(resourcesOssSecretKey.getValue()) + .bucketName(resourcesOssBucketName.getValue()) + .region(resourcesOssRegion.getValue()) + .pathStyleAccess(resourcesPathStyleAccess.getValue()) + .build(); + } } diff --git a/dinky-common/src/main/java/org/dinky/data/properties/OssProperties.java b/dinky-common/src/main/java/org/dinky/data/properties/OssProperties.java index 9940f037c6..54d641d8d3 100644 --- a/dinky-common/src/main/java/org/dinky/data/properties/OssProperties.java +++ b/dinky-common/src/main/java/org/dinky/data/properties/OssProperties.java @@ -21,12 +21,14 @@ import io.swagger.annotations.ApiModel; import io.swagger.annotations.ApiModelProperty; +import lombok.Builder; import lombok.Getter; import lombok.Setter; @Getter @Setter @ApiModel(value = "OssProperties", description = "Configuration Properties for Object Storage Service (OSS)") +@Builder public class OssProperties { @ApiModelProperty( @@ -34,7 +36,7 @@ public class OssProperties { dataType = "boolean", notes = "Whether to enable OSS (Object Storage Service)", example = "true") - private boolean enable = true; + private boolean enable; @ApiModelProperty( value = "OSS Endpoint", @@ -43,19 +45,12 @@ public class OssProperties { example = "https://example.oss-cn-hangzhou.aliyuncs.com") private String endpoint; - @ApiModelProperty( - value = "Custom Domain", - dataType = "String", - notes = "Custom domain for OSS", - example = "https://custom-domain.com") - private String customDomain; - @ApiModelProperty( value = "Path Style Access", dataType = "Boolean", notes = "Path style access configuration (true for path-style, false for virtual-hosted-style)", example = "true") - private Boolean pathStyleAccess = true; + private Boolean pathStyleAccess; @ApiModelProperty(value = "Region", dataType = "String", notes = "Region for OSS", example = "oss-cn-hangzhou") private String region;