Skip to content

Commit

Permalink
Add local resource (DataLinkDC#2717)
Browse files Browse the repository at this point in the history
* fix log error

* add local resoucrce manager

* fix app

* formte code
  • Loading branch information
gaoyan1998 authored Dec 22, 2023
1 parent 3e1cbf4 commit e9c59e2
Show file tree
Hide file tree
Showing 11 changed files with 225 additions and 122 deletions.
53 changes: 2 additions & 51 deletions dinky-admin/src/main/java/org/dinky/init/SystemInit.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@

package org.dinky.init;

import static org.apache.hadoop.fs.FileSystem.getDefaultUri;

import org.dinky.assertion.Asserts;
import org.dinky.context.TenantContextHolder;
import org.dinky.daemon.constant.FlinkTaskConstant;
Expand All @@ -35,14 +33,12 @@
import org.dinky.data.model.Task;
import org.dinky.data.model.job.JobInstance;
import org.dinky.data.model.rbac.Tenant;
import org.dinky.data.properties.OssProperties;
import org.dinky.function.constant.PathConstant;
import org.dinky.function.pool.UdfCodePool;
import org.dinky.job.ClearJobHistoryTask;
import org.dinky.job.DynamicResizeFlinkJobPoolTask;
import org.dinky.job.FlinkJobTask;
import org.dinky.job.SystemMetricsTask;
import org.dinky.oss.OssTemplate;
import org.dinky.scheduler.client.ProjectClient;
import org.dinky.scheduler.exception.SchedulerException;
import org.dinky.scheduler.model.Project;
Expand All @@ -51,14 +47,12 @@
import org.dinky.service.SysConfigService;
import org.dinky.service.TaskService;
import org.dinky.service.TenantService;
import org.dinky.service.resource.impl.HdfsResourceManager;
import org.dinky.service.resource.impl.OssResourceManager;
import org.dinky.service.resource.BaseResourceManager;
import org.dinky.url.RsURLStreamHandlerFactory;
import org.dinky.utils.JsonUtils;
import org.dinky.utils.UDFUtils;

import org.apache.catalina.webresources.TomcatURLStreamHandlerFactory;
import org.apache.hadoop.fs.FileSystem;

import java.util.List;
import java.util.concurrent.TimeUnit;
Expand All @@ -77,7 +71,6 @@
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.convert.Convert;
import cn.hutool.core.io.FileUtil;
import cn.hutool.core.lang.Singleton;
import cn.hutool.core.util.StrUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -140,49 +133,7 @@ private void initResources() {
.forEach(x -> x.addParameterCheck(y -> {
if (Boolean.TRUE.equals(
systemConfiguration.getResourcesEnable().getValue())) {
switch (systemConfiguration.getResourcesModel().getValue()) {
case OSS:
OssProperties ossProperties = new OssProperties();
ossProperties.setAccessKey(systemConfiguration
.getResourcesOssAccessKey()
.getValue());
ossProperties.setSecretKey(systemConfiguration
.getResourcesOssSecretKey()
.getValue());
ossProperties.setEndpoint(systemConfiguration
.getResourcesOssEndpoint()
.getValue());
ossProperties.setBucketName(systemConfiguration
.getResourcesOssBucketName()
.getValue());
ossProperties.setRegion(systemConfiguration
.getResourcesOssRegion()
.getValue());
ossProperties.setPathStyleAccess(systemConfiguration
.getResourcesPathStyleAccess()
.getValue());
Singleton.get(OssResourceManager.class).setOssTemplate(new OssTemplate(ossProperties));
break;
case HDFS:
final org.apache.hadoop.conf.Configuration configuration =
new org.apache.hadoop.conf.Configuration();
configuration.set(
"fs.defaultFS",
systemConfiguration
.getResourcesHdfsDefaultFS()
.getValue());
try {
FileSystem fileSystem = FileSystem.get(
getDefaultUri(configuration),
configuration,
systemConfiguration
.getResourcesHdfsUser()
.getValue());
Singleton.get(HdfsResourceManager.class).setHdfs(fileSystem);
} catch (Exception e) {
throw new DinkyException(e);
}
}
BaseResourceManager.initResourceManager();
}
}));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,16 @@

package org.dinky.service.resource;

import org.dinky.data.exception.DinkyException;
import org.dinky.data.model.SystemConfiguration;
import org.dinky.oss.OssTemplate;
import org.dinky.service.resource.impl.HdfsResourceManager;
import org.dinky.service.resource.impl.LocalResourceManager;
import org.dinky.service.resource.impl.OssResourceManager;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;

import java.io.File;
import java.io.InputStream;

Expand Down Expand Up @@ -52,11 +58,37 @@ static BaseResourceManager getInstance() {
return Singleton.get(HdfsResourceManager.class);
case OSS:
return Singleton.get(OssResourceManager.class);
case LOCAL:
return Singleton.get(LocalResourceManager.class);
default:
return null;
}
}

static void initResourceManager() {
switch (instances.getResourcesModel().getValue()) {
case LOCAL:
Singleton.get(LocalResourceManager.class);
case OSS:
OssTemplate template = new OssTemplate(instances.getOssProperties());
Singleton.get(OssResourceManager.class).setOssTemplate(template);
break;
case HDFS:
final Configuration configuration = new Configuration();
configuration.set(
"fs.defaultFS", instances.getResourcesHdfsDefaultFS().getValue());
try {
FileSystem fileSystem = FileSystem.get(
FileSystem.getDefaultUri(configuration),
configuration,
instances.getResourcesHdfsUser().getValue());
Singleton.get(HdfsResourceManager.class).setHdfs(fileSystem);
} catch (Exception e) {
throw new DinkyException(e);
}
}
}

default String getFilePath(String path) {
return FileUtil.normalize(
FileUtil.file(instances.getResourcesUploadBasePath().getValue(), path)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.dinky.service.resource.impl;

import org.dinky.data.exception.BusException;
import org.dinky.service.resource.BaseResourceManager;

import java.io.BufferedInputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;

import org.springframework.web.multipart.MultipartFile;

import cn.hutool.core.io.FileUtil;
import cn.hutool.core.io.IORuntimeException;
import cn.hutool.core.io.IoUtil;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class LocalResourceManager implements BaseResourceManager {
@Override
public void remove(String path) {
try {
boolean isSuccess = FileUtil.del(getFilePath(path));
if (!isSuccess) {
throw new BusException("remove file failed,reason unknown");
}
} catch (IORuntimeException e) {
log.error("remove file failed", e);
throw new BusException(e.getMessage());
}
}

@Override
public void rename(String path, String newPath) {
try {
String newName = FileUtil.getName(newPath);
FileUtil.rename(new File(getFilePath(path)), newName, true);
} catch (Exception e) {
log.error("rename file failed", e);
throw new BusException(e.getMessage());
}
}

@Override
public void putFile(String path, MultipartFile file) {
try {
FileUtil.writeFromStream(file.getInputStream(), getFilePath(path));
} catch (IOException e) {
log.error("putFile file failed", e);
throw new BusException(e.getMessage());
}
}

@Override
public void putFile(String path, File file) {
BufferedInputStream inputStream = FileUtil.getInputStream(file);
FileUtil.writeFromStream(inputStream, getFilePath(path));
}

@Override
public String getFileContent(String path) {
return IoUtil.readUtf8(readFile(path));
}

@Override
public InputStream readFile(String path) {
return FileUtil.getInputStream(getFilePath(path));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -276,13 +276,11 @@ public boolean remove(Integer id) {
() -> new BusException(Status.ROOT_DIR_NOT_ALLOW_DELETE));
try {
if (id < 1) {
getBaseResourceManager().remove("/");
// todo 删除主目录,实际是清空
remove(new LambdaQueryWrapper<Resources>().ne(Resources::getId, 0));
}
Resources byId = getById(id);
if (isExistsChildren(id)) {
getBaseResourceManager().remove(byId.getFullName());
if (byId.getIsDirectory()) {
List<Resources> resourceByPidToChildren =
getResourceByPidToChildren(new ArrayList<>(), byId.getId());
Expand All @@ -291,7 +289,6 @@ public boolean remove(Integer id) {
List<Resources> resourceByPidToParent = getResourceByPidToParent(new ArrayList<>(), byId.getPid());
resourceByPidToParent.forEach(x -> x.setSize(x.getSize() - byId.getSize()));
updateBatchById(resourceByPidToParent);
getBaseResourceManager().remove(byId.getFullName());
return removeById(id);
}
return removeById(id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,28 +19,22 @@

package org.dinky.app.flinksql;

import static org.apache.hadoop.fs.FileSystem.getDefaultUri;

import org.dinky.app.db.DBUtil;
import org.dinky.app.model.StatementParam;
import org.dinky.app.model.SysConfig;
import org.dinky.app.resource.impl.HdfsResourceManager;
import org.dinky.app.resource.impl.OssResourceManager;
import org.dinky.app.resource.BaseResourceManager;
import org.dinky.app.url.RsURLStreamHandlerFactory;
import org.dinky.assertion.Asserts;
import org.dinky.classloader.DinkyClassLoader;
import org.dinky.config.Dialect;
import org.dinky.constant.FlinkSQLConstant;
import org.dinky.data.app.AppParamConfig;
import org.dinky.data.app.AppTask;
import org.dinky.data.exception.DinkyException;
import org.dinky.data.model.SystemConfiguration;
import org.dinky.data.properties.OssProperties;
import org.dinky.executor.Executor;
import org.dinky.executor.ExecutorConfig;
import org.dinky.executor.ExecutorFactory;
import org.dinky.interceptor.FlinkInterceptor;
import org.dinky.oss.OssTemplate;
import org.dinky.parser.SqlType;
import org.dinky.trans.Operations;
import org.dinky.trans.dml.ExecuteJarOperation;
Expand All @@ -53,8 +47,6 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.python.PythonOptions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;

import java.io.File;
import java.io.FileOutputStream;
Expand All @@ -80,7 +72,6 @@

import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.io.FileUtil;
import cn.hutool.core.lang.Singleton;
import cn.hutool.core.util.ArrayUtil;
import cn.hutool.core.util.URLUtil;
import lombok.SneakyThrows;
Expand All @@ -102,43 +93,9 @@ private static void initSystemConfiguration() throws SQLException {
systemConfiguration.initSetConfiguration(configMap);
}

private static void initResource() throws SQLException {
SystemConfiguration systemConfiguration = SystemConfiguration.getInstances();
switch (systemConfiguration.getResourcesModel().getValue()) {
case OSS:
OssProperties ossProperties = new OssProperties();
ossProperties.setAccessKey(
systemConfiguration.getResourcesOssAccessKey().getValue());
ossProperties.setSecretKey(
systemConfiguration.getResourcesOssSecretKey().getValue());
ossProperties.setEndpoint(
systemConfiguration.getResourcesOssEndpoint().getValue());
ossProperties.setBucketName(
systemConfiguration.getResourcesOssBucketName().getValue());
ossProperties.setRegion(
systemConfiguration.getResourcesOssRegion().getValue());
Singleton.get(OssResourceManager.class).setOssTemplate(new OssTemplate(ossProperties));
break;
case HDFS:
final Configuration configuration = new Configuration();
configuration.set(
"fs.defaultFS",
systemConfiguration.getResourcesHdfsDefaultFS().getValue());
try {
FileSystem fileSystem = FileSystem.get(
getDefaultUri(configuration),
configuration,
systemConfiguration.getResourcesHdfsUser().getValue());
Singleton.get(HdfsResourceManager.class).setHdfs(fileSystem);
} catch (Exception e) {
throw new DinkyException(e);
}
}
}

public static void submit(AppParamConfig config) throws SQLException {
initSystemConfiguration();
initResource();
BaseResourceManager.initResourceManager();
URL.setURLStreamHandlerFactory(new RsURLStreamHandlerFactory());
log.info("{} Start Submit Job:{}", LocalDateTime.now(), config.getTaskId());

Expand Down
Loading

0 comments on commit e9c59e2

Please sign in to comment.