From 6bc388f84d53990e19497948852c21c1a790fb57 Mon Sep 17 00:00:00 2001 From: LiuGuoHua <129264181+sjgllgh@users.noreply.github.com> Date: Wed, 4 Dec 2024 19:35:10 +0800 Subject: [PATCH] Enhance the functionality of s3filesystem (#5208) * 1. Enhance the functionality of s3filesystem to support multipart uploads. 2. Support the use of s3 storage for BML materials and workspaces. * format code --- .../linkis/storage/fs/impl/S3FileSystem.java | 190 ++++++++------- .../storage/fs/stream/S3OutputStream.java | 216 ++++++++++++++++++ .../storage/utils/FileSystemUtils.scala | 4 +- .../common/utils/GovernanceUtils.scala | 2 +- .../entrance/utils/CommonLogPathUtils.scala | 5 +- .../bml/common/ResourceHelperFactory.java | 10 +- .../linkis/bml/common/S3ResourceHelper.java | 183 +++++++++++++++ .../bml/conf/BmlServerConfiguration.scala | 29 +++ .../filesystem/restful/api/FsRestfulApi.java | 4 + 9 files changed, 558 insertions(+), 85 deletions(-) create mode 100644 linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/stream/S3OutputStream.java create mode 100644 linkis-public-enhancements/linkis-bml-server/src/main/java/org/apache/linkis/bml/common/S3ResourceHelper.java diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/S3FileSystem.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/S3FileSystem.java index 2aff3da7f5..a9f00b60d5 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/S3FileSystem.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/S3FileSystem.java @@ -21,13 +21,17 @@ import org.apache.linkis.storage.domain.FsPathListWithError; import org.apache.linkis.storage.exception.StorageWarnException; import org.apache.linkis.storage.fs.FileSystem; +import org.apache.linkis.storage.fs.stream.S3OutputStream; import org.apache.linkis.storage.utils.StorageConfiguration; import org.apache.linkis.storage.utils.StorageUtils; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; -import java.io.*; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -99,6 +103,7 @@ public String rootUserName() { public FsPath get(String dest) throws IOException { FsPath ret = new FsPath(dest); if (exists(ret)) { + ret.setIsdir(isDir(buildKey(ret.getPath()))); return ret; } else { logger.warn("File or folder does not exist or file name is garbled(文件或者文件夹不存在或者文件名乱码)"); @@ -111,7 +116,7 @@ public FsPath get(String dest) throws IOException { @Override public InputStream read(FsPath dest) throws IOException { try { - return s3Client.getObject(bucket, buildPrefix(dest.getPath(), false)).getObjectContent(); + return s3Client.getObject(bucket, buildKey(dest.getPath())).getObjectContent(); } catch (AmazonS3Exception e) { throw new IOException("You have not permission to access path " + dest.getPath()); } @@ -119,13 +124,23 @@ public InputStream read(FsPath dest) throws IOException { @Override public OutputStream write(FsPath dest, boolean overwrite) throws IOException { - try (InputStream inputStream = read(dest); - OutputStream outputStream = - new S3OutputStream(s3Client, bucket, buildPrefix(dest.getPath(), false))) { + InputStream inputStream = null; + try { + if (!exists(dest)) { + create(dest.getPath()); + } + + OutputStream outputStream = new S3OutputStream(s3Client, bucket, buildKey(dest.getPath())); + if (!overwrite) { + inputStream = read(dest); IOUtils.copy(inputStream, outputStream); } return outputStream; + } catch (IOException e) { + throw new IOException("You have not permission to access path " + dest.getPath()); + } finally { + IOUtils.closeQuietly(inputStream); } } @@ -134,7 +149,7 @@ public boolean create(String dest) throws IOException { if (exists(new FsPath(dest))) { return false; } - s3Client.putObject(bucket, dest, ""); + s3Client.putObject(bucket, buildKey(dest), ""); return true; } @@ -142,16 +157,31 @@ public boolean create(String dest) throws IOException { public List list(FsPath path) throws IOException { try { if (!StringUtils.isEmpty(path.getPath())) { - ListObjectsV2Result listObjectsV2Result = s3Client.listObjectsV2(bucket, path.getPath()); - List s3ObjectSummaries = listObjectsV2Result.getObjectSummaries(); - return s3ObjectSummaries.stream() - .filter(summary -> !isInitFile(summary)) - .map( - summary -> { - FsPath newPath = new FsPath(buildPath(summary.getKey())); - return fillStorageFile(newPath, summary); - }) - .collect(Collectors.toList()); + ListObjectsV2Request listObjectsV2Request = + new ListObjectsV2Request() + .withBucketName(bucket) + .withPrefix(buildKey(path.getPath()) + "/") + .withDelimiter("/"); + ListObjectsV2Result dirResult = s3Client.listObjectsV2(listObjectsV2Request); + List s3ObjectSummaries = dirResult.getObjectSummaries(); + List commonPrefixes = dirResult.getCommonPrefixes(); + List fsPaths = + s3ObjectSummaries.stream() + .filter(summary -> !isInitFile(summary)) + .map( + summary -> { + FsPath newPath = new FsPath(buildPath(summary.getKey())); + return fillStorageFile(newPath, summary); + }) + .collect(Collectors.toList()); + if (commonPrefixes != null) { + for (String dir : commonPrefixes) { + FsPath newPath = new FsPath(buildPath(dir)); + newPath.setIsdir(true); + fsPaths.add(newPath); + } + } + return fsPaths; } } catch (AmazonS3Exception e) { throw new IOException("You have not permission to access path " + path.getPath()); @@ -173,7 +203,7 @@ public FsPathListWithError listPathWithError(FsPath path, boolean ignoreInitFile ListObjectsV2Request listObjectsV2Request = new ListObjectsV2Request() .withBucketName(bucket) - .withPrefix(buildPrefix(path.getPath())) + .withPrefix(buildKey(path.getPath()) + "/") .withDelimiter("/"); ListObjectsV2Result dirResult = s3Client.listObjectsV2(listObjectsV2Request); List s3ObjectSummaries = dirResult.getObjectSummaries(); @@ -204,25 +234,15 @@ public FsPathListWithError listPathWithError(FsPath path, boolean ignoreInitFile @Override public boolean exists(FsPath dest) throws IOException { try { - if (new File(dest.getPath()).getName().contains(".")) { - return existsFile(dest); + if (dest == null) { + return false; } ListObjectsV2Request listObjectsV2Request = new ListObjectsV2Request(); listObjectsV2Request .withBucketName(bucket) - .withPrefix(buildPrefix(dest.getPath())) - .withDelimiter("/"); - return s3Client.listObjectsV2(listObjectsV2Request).getObjectSummaries().size() - + s3Client.listObjectsV2(listObjectsV2Request).getCommonPrefixes().size() - > 0; - } catch (AmazonS3Exception e) { - return false; - } - } - - public boolean existsFile(FsPath dest) { - try { - return s3Client.doesObjectExist(bucket, buildPrefix(dest.getPath(), false)); + .withPrefix(buildKey(dest.getPath())) + .withMaxKeys(1); + return !s3Client.listObjectsV2(listObjectsV2Request).getObjectSummaries().isEmpty(); } catch (AmazonS3Exception e) { return false; } @@ -231,25 +251,41 @@ public boolean existsFile(FsPath dest) { @Override public boolean delete(FsPath dest) throws IOException { try { - ListObjectsV2Request listObjectsV2Request = new ListObjectsV2Request(); - listObjectsV2Request.withBucketName(bucket).withPrefix(buildPrefix(dest.getPath(), false)); - ListObjectsV2Result result = s3Client.listObjectsV2(listObjectsV2Request); - String[] keyList = - result.getObjectSummaries().stream().map(S3ObjectSummary::getKey).toArray(String[]::new); - DeleteObjectsRequest deleteObjectsRequest = - new DeleteObjectsRequest("test").withKeys(keyList); - s3Client.deleteObjects(deleteObjectsRequest); + List deleteKeys = new ArrayList<>(); + delete(dest, deleteKeys); + if (!deleteKeys.isEmpty()) { + DeleteObjectsRequest deleteObjectsRequest = + new DeleteObjectsRequest(bucket).withKeys(deleteKeys.toArray(new String[0])); + s3Client.deleteObjects(deleteObjectsRequest); + } return true; } catch (AmazonS3Exception e) { throw new IOException("You have not permission to access path " + dest.getPath()); } } + public void delete(FsPath dest, List keys) throws IOException { + if (isDir(buildKey(dest.getPath()))) { + FsPathListWithError fsPathListWithError = listPathWithError(dest, false); + List fsPaths = fsPathListWithError.getFsPaths(); + fsPaths.forEach( + fsPath -> { + try { + delete(fsPath, keys); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + } else { + keys.add(buildKey(dest.getPath())); + } + } + @Override public boolean renameTo(FsPath oldDest, FsPath newDest) throws IOException { try { - String newOriginPath = buildPrefix(oldDest.getPath(), false); - String newDestPath = buildPrefix(newDest.getPath(), false); + String newOriginPath = buildKey(oldDest.getPath()); + String newDestPath = buildKey(newDest.getPath()); ListObjectsV2Request listObjectsV2Request = new ListObjectsV2Request(); listObjectsV2Request.withBucketName(bucket).withPrefix(newOriginPath); ListObjectsV2Result result = s3Client.listObjectsV2(listObjectsV2Request); @@ -281,8 +317,8 @@ public boolean renameTo(FsPath oldDest, FsPath newDest) throws IOException { @Override public boolean copy(String origin, String dest) throws IOException { try { - String newOrigin = buildPrefix(origin, false); - String newDest = buildPrefix(dest, false); + String newOrigin = buildKey(origin); + String newDest = buildKey(dest); ListObjectsV2Request listObjectsV2Request = new ListObjectsV2Request(); listObjectsV2Request.withBucketName(bucket).withPrefix(newOrigin); ListObjectsV2Result result = s3Client.listObjectsV2(listObjectsV2Request); @@ -305,8 +341,16 @@ public boolean copy(String origin, String dest) throws IOException { } } - private boolean isDir(S3ObjectSummary s3ObjectSummary, String prefix) { - return s3ObjectSummary.getKey().substring(prefix.length()).contains("/"); + private boolean isDir(String key) { + ListObjectsV2Request listObjectsV2Request = new ListObjectsV2Request(); + listObjectsV2Request + .withBucketName(bucket) + .withPrefix(key + "/") + .withDelimiter("/") + .withMaxKeys(1); + + return !(s3Client.listObjectsV2(listObjectsV2Request).getCommonPrefixes().isEmpty() + && s3Client.listObjectsV2(listObjectsV2Request).getObjectSummaries().isEmpty()); } private boolean isInitFile(S3ObjectSummary s3ObjectSummary) { @@ -318,6 +362,13 @@ public String listRoot() { return "/"; } + /** + * s3没有目录概念,无法直接创建目录 S3 lacks the concept of directories and cannot create directories directly. + * + * @param dest + * @return + * @throws IOException + */ @Override public boolean mkdir(FsPath dest) throws IOException { String path = new File(dest.getPath(), INIT_FILE_NAME).getPath(); @@ -339,7 +390,7 @@ private FsPath fillStorageFile(FsPath fsPath, S3ObjectSummary s3ObjectSummary) { fsPath.setOwner(owner.getDisplayName()); } try { - fsPath.setIsdir(isDir(s3ObjectSummary, fsPath.getParent().getPath())); + fsPath.setIsdir(isDir(s3ObjectSummary.getKey())); } catch (Throwable e) { logger.warn("Failed to fill storage file:" + fsPath.getPath(), e); } @@ -359,7 +410,7 @@ public boolean canRead(FsPath dest) { @Override public boolean canRead(FsPath dest, String user) throws IOException { - return false; + return true; } @Override @@ -384,7 +435,10 @@ public long getUsableSpace(FsPath dest) { @Override public long getLength(FsPath dest) throws IOException { - return 0; + return s3Client + .getObject(bucket, buildKey(dest.getPath())) + .getObjectMetadata() + .getContentLength(); } @Override @@ -418,7 +472,9 @@ public boolean setPermission(FsPath dest, String permission) { } @Override - public void close() throws IOException {} + public void close() throws IOException { + s3Client.shutdown(); + } public String getLabel() { return label; @@ -429,46 +485,22 @@ public void setLabel(String label) { } public String buildPath(String path) { - if (path == null || "".equals(path)) return ""; + if (path == null || path.isEmpty()) return ""; if (path.startsWith("/")) { return StorageUtils.S3_SCHEMA() + path; } return StorageUtils.S3_SCHEMA() + "/" + path; } - public String buildPrefix(String path, boolean addTail) { + public String buildKey(String path) { String res = path; - if (path == null || "".equals(path)) return ""; + if (path == null || path.isEmpty()) return ""; if (path.startsWith("/")) { res = path.replaceFirst("/", ""); } - if (!path.endsWith("/") && addTail) { - res = res + "/"; + if (path.endsWith("/") && !res.isEmpty()) { + res = res.substring(0, res.length() - 1); } return res; } - - public String buildPrefix(String path) { - return buildPrefix(path, true); - } -} - -class S3OutputStream extends ByteArrayOutputStream { - private AmazonS3 s3Client; - private String bucket; - private String path; - - public S3OutputStream(AmazonS3 s3Client, String bucket, String path) { - this.s3Client = s3Client; - this.bucket = bucket; - this.path = path; - } - - @Override - public void close() throws IOException { - byte[] buffer = this.toByteArray(); - try (InputStream in = new ByteArrayInputStream(buffer)) { - s3Client.putObject(bucket, path, in, new ObjectMetadata()); - } - } } diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/stream/S3OutputStream.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/stream/S3OutputStream.java new file mode 100644 index 0000000000..b5b39f6d5b --- /dev/null +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/stream/S3OutputStream.java @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.storage.fs.stream; + +import java.io.ByteArrayInputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.List; + +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.model.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class S3OutputStream extends OutputStream { + + private static final Logger LOG = LoggerFactory.getLogger(S3OutputStream.class); + + /** The bucket-name on Amazon S3 */ + private final String bucket; + + /** The path (key) name within the bucket */ + private final String path; + + int BUFFER_SIZE = 5 * 1024 * 1024; + + private final byte[] buf = new byte[BUFFER_SIZE];; + + private byte[] flashBuffer; + + /** The position in the buffer */ + private int position = 0; + + /** Amazon S3 client. */ + private final AmazonS3 s3Client; + + /** The unique id for this upload */ + private String uploadId; + + /** Collection of the etags for the parts that have been uploaded */ + private final List etags = new ArrayList<>(); + + /** + * Creates a new S3 OutputStream + * + * @param s3Client the AmazonS3 client + * @param bucket name of the bucket + * @param path path within the bucket + */ + public S3OutputStream(AmazonS3 s3Client, String bucket, String path) { + if (s3Client == null) { + throw new IllegalArgumentException("The s3Client cannot be null."); + } + if (bucket == null || bucket.isEmpty()) { + throw new IllegalArgumentException("The bucket cannot be null or an empty string."); + } + if (path == null || path.isEmpty()) { + throw new IllegalArgumentException("The path cannot be null or an empty string."); + } + this.s3Client = s3Client; + this.bucket = bucket; + this.path = path; + } + + /** + * Write an array to the S3 output stream. + * + * @param b the byte-array to append + */ + @Override + public void write(byte[] b) { + write(b, 0, b.length); + } + + /** + * Writes an array to the S3 Output Stream + * + * @param byteArray the array to write + * @param o the offset into the array + * @param l the number of bytes to write + */ + @Override + public void write(final byte[] byteArray, final int o, final int l) { + int ofs = o, len = l; + int size; + while (len > (size = this.buf.length - position)) { + System.arraycopy(byteArray, ofs, this.buf, this.position, size); + this.position += size; + flushBufferAndRewind(); + ofs += size; + len -= size; + } + System.arraycopy(byteArray, ofs, this.buf, this.position, len); + this.position += len; + } + + /** Flushes the buffer by uploading a part to S3. */ + @Override + public synchronized void flush() {} + + protected void flushBufferAndRewind() { + if (uploadId == null) { + LOG.info("Starting a multipart upload for {}/{}", this.bucket, this.path); + try { + final InitiateMultipartUploadRequest request = + new InitiateMultipartUploadRequest(this.bucket, this.path) + .withCannedACL(CannedAccessControlList.BucketOwnerFullControl); + InitiateMultipartUploadResult initResponse = s3Client.initiateMultipartUpload(request); + this.uploadId = initResponse.getUploadId(); + } catch (AmazonS3Exception e) { + LOG.error("Failed to start multipart upload: {}", e.getMessage(), e); + throw new RuntimeException(e); + } + } + try { + uploadPart(); + } catch (AmazonS3Exception e) { + LOG.error("Failed to upload part: {}", e.getMessage(), e); + this.s3Client.abortMultipartUpload( + new AbortMultipartUploadRequest(this.bucket, this.path, this.uploadId)); + throw new RuntimeException(e); + } + this.position = 0; + } + + protected void uploadPart() { + LOG.debug("Uploading part {}", this.etags.size()); + try { + UploadPartResult uploadResult = + s3Client.uploadPart( + new UploadPartRequest() + .withBucketName(this.bucket) + .withKey(this.path) + .withUploadId(this.uploadId) + .withInputStream(new ByteArrayInputStream(buf, 0, this.position)) + .withPartNumber(this.etags.size() + 1) + .withPartSize(this.position)); + this.etags.add(uploadResult.getPartETag()); + } catch (AmazonS3Exception e) { + LOG.error("Failed to upload part: {}", e.getMessage(), e); + this.s3Client.abortMultipartUpload( + new AbortMultipartUploadRequest(this.bucket, this.path, this.uploadId)); + throw new RuntimeException(e); + } + } + + @Override + public void close() { + if (this.uploadId != null) { + if (this.position > 0) { + uploadPart(); + } + LOG.debug("Completing multipart"); + try { + this.s3Client.completeMultipartUpload( + new CompleteMultipartUploadRequest(bucket, path, uploadId, etags)); + } catch (AmazonS3Exception e) { + LOG.error("Failed to complete multipart upload: {}", e.getMessage(), e); + throw new RuntimeException(e); + } + } else { + LOG.debug("Uploading object at once to {}/{}", this.bucket, this.path); + try { + final ObjectMetadata metadata = new ObjectMetadata(); + metadata.setContentLength(this.position); + final PutObjectRequest request = + new PutObjectRequest( + this.bucket, + this.path, + new ByteArrayInputStream(this.buf, 0, this.position), + metadata) + .withCannedAcl(CannedAccessControlList.BucketOwnerFullControl); + this.s3Client.putObject(request); + } catch (AmazonS3Exception e) { + LOG.error("Failed to upload object: {}", e.getMessage(), e); + throw new RuntimeException(e); + } + } + } + + public void cancel() { + if (this.uploadId != null) { + try { + LOG.debug("Aborting multipart upload"); + this.s3Client.abortMultipartUpload( + new AbortMultipartUploadRequest(this.bucket, this.path, this.uploadId)); + } catch (AmazonS3Exception e) { + LOG.error("Failed to abort multipart upload: {}", e.getMessage(), e); + } + } + } + + @Override + public void write(int b) { + + if (position >= this.buf.length) { + flushBufferAndRewind(); + } + this.buf[position++] = (byte) b; + } +} diff --git a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/utils/FileSystemUtils.scala b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/utils/FileSystemUtils.scala index 9c344fa802..0e382128ca 100644 --- a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/utils/FileSystemUtils.scala +++ b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/utils/FileSystemUtils.scala @@ -122,7 +122,7 @@ object FileSystemUtils extends Logging { var parentPath = dest.getParent val dirsToMake = new util.Stack[FsPath]() dirsToMake.push(dest) - while (!fileSystem.exists(parentPath)) { + while (parentPath != null && !fileSystem.exists(parentPath)) { dirsToMake.push(parentPath) parentPath = parentPath.getParent } @@ -153,7 +153,7 @@ object FileSystemUtils extends Logging { var parentPath = dest.getParent val dirsToMake = new util.Stack[FsPath]() dirsToMake.push(dest) - while (!fileSystem.exists(parentPath)) { + while (parentPath != null && !fileSystem.exists(parentPath)) { dirsToMake.push(parentPath) parentPath = parentPath.getParent } diff --git a/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/utils/GovernanceUtils.scala b/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/utils/GovernanceUtils.scala index ae83749ecb..63c963d2d3 100644 --- a/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/utils/GovernanceUtils.scala +++ b/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/utils/GovernanceUtils.scala @@ -132,7 +132,7 @@ object GovernanceUtils extends Logging { * @return */ def getResultParentPath(creator: String): String = { - val resPrefix = GovernanceCommonConf.DEFAULT_LOGPATH_PREFIX + val resPrefix = GovernanceCommonConf.RESULT_SET_STORE_PATH.getValue val resStb = new StringBuilder() if (resStb.endsWith("/")) { resStb.append(resPrefix) diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/utils/CommonLogPathUtils.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/utils/CommonLogPathUtils.scala index 746774633e..3430c1809b 100644 --- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/utils/CommonLogPathUtils.scala +++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/utils/CommonLogPathUtils.scala @@ -20,6 +20,7 @@ package org.apache.linkis.entrance.utils import org.apache.linkis.common.io.FsPath import org.apache.linkis.common.utils.Utils import org.apache.linkis.entrance.conf.EntranceConfiguration +import org.apache.linkis.governance.common.conf.GovernanceCommonConf import org.apache.linkis.governance.common.entity.job.JobRequest import org.apache.linkis.manager.label.utils.LabelUtil import org.apache.linkis.storage.FSFactory @@ -51,6 +52,8 @@ object CommonLogPathUtils { val fsPath = new FsPath(commonPath) if (StorageUtils.HDFS.equals(fsPath.getFsType)) { FSFactory.getFs(StorageUtils.HDFS).asInstanceOf[FileSystem] + } else if (StorageUtils.S3.equals(fsPath.getFsType)) { + FSFactory.getFs(StorageUtils.S3).asInstanceOf[FileSystem] } else { FSFactory .getFs(StorageUtils.FILE, StorageConfiguration.LOCAL_ROOT_USER.getValue) @@ -58,7 +61,7 @@ object CommonLogPathUtils { } } - private val resPrefix = EntranceConfiguration.DEFAULT_LOGPATH_PREFIX.getValue + private val resPrefix = GovernanceCommonConf.RESULT_SET_STORE_PATH.getValue /** * get result path parentPath: resPrefix + dateStr + result + creator subPath: parentPath + diff --git a/linkis-public-enhancements/linkis-bml-server/src/main/java/org/apache/linkis/bml/common/ResourceHelperFactory.java b/linkis-public-enhancements/linkis-bml-server/src/main/java/org/apache/linkis/bml/common/ResourceHelperFactory.java index 83acf03a07..a6138fe510 100644 --- a/linkis-public-enhancements/linkis-bml-server/src/main/java/org/apache/linkis/bml/common/ResourceHelperFactory.java +++ b/linkis-public-enhancements/linkis-bml-server/src/main/java/org/apache/linkis/bml/common/ResourceHelperFactory.java @@ -26,16 +26,22 @@ public class ResourceHelperFactory { private static final Logger LOGGER = LoggerFactory.getLogger(ResourceHelperFactory.class); - private static final boolean IS_HDFS = (Boolean) BmlServerConfiguration.BML_IS_HDFS().getValue(); + private static final String FILESYSTEM_TYPE = + BmlServerConfiguration.BML_FILESYSTEM_TYPE().getValue(); private static final ResourceHelper HDFS_RESOURCE_HELPER = new HdfsResourceHelper(); private static final ResourceHelper LOCAL_RESOURCE_HELPER = new LocalResourceHelper(); + private static final ResourceHelper S3_RESOURCE_HELPER = new S3ResourceHelper(); + public static ResourceHelper getResourceHelper() { - if (IS_HDFS) { + if (FILESYSTEM_TYPE.equals("hdfs")) { LOGGER.info("will store resource in hdfs"); return HDFS_RESOURCE_HELPER; + } else if (FILESYSTEM_TYPE.equals("s3")) { + LOGGER.info("will store resource in s3"); + return S3_RESOURCE_HELPER; } else { LOGGER.info("will store resource in local"); return LOCAL_RESOURCE_HELPER; diff --git a/linkis-public-enhancements/linkis-bml-server/src/main/java/org/apache/linkis/bml/common/S3ResourceHelper.java b/linkis-public-enhancements/linkis-bml-server/src/main/java/org/apache/linkis/bml/common/S3ResourceHelper.java new file mode 100644 index 0000000000..5370da9351 --- /dev/null +++ b/linkis-public-enhancements/linkis-bml-server/src/main/java/org/apache/linkis/bml/common/S3ResourceHelper.java @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.bml.common; + +import org.apache.linkis.bml.conf.BmlServerConfiguration; +import org.apache.linkis.common.io.Fs; +import org.apache.linkis.common.io.FsPath; +import org.apache.linkis.storage.FSFactory; +import org.apache.linkis.storage.utils.FileSystemUtils; + +import org.apache.commons.codec.binary.Hex; +import org.apache.commons.codec.digest.DigestUtils; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.security.MessageDigest; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class S3ResourceHelper implements ResourceHelper { + + private static final Logger logger = LoggerFactory.getLogger(S3ResourceHelper.class); + + private static final String SCHEMA = "s3://"; + + @Override + public long upload( + String path, + String user, + InputStream inputStream, + StringBuilder stringBuilder, + boolean overwrite) + throws UploadResourceException { + OutputStream outputStream = null; + InputStream is0 = null; + InputStream is1 = null; + long size = 0; + Fs fileSystem = null; + try { + FsPath fsPath = new FsPath(path); + fileSystem = FSFactory.getFsByProxyUser(fsPath, user); + fileSystem.init(null); + if (!fileSystem.exists(fsPath)) { + FileSystemUtils.createNewFile(fsPath, user, true); + } + byte[] buffer = new byte[1024]; + long beforeSize = -1; + is0 = fileSystem.read(fsPath); + int ch0 = 0; + while ((ch0 = is0.read(buffer)) != -1) { + beforeSize += ch0; + } + outputStream = fileSystem.write(fsPath, overwrite); + int ch = 0; + MessageDigest md5Digest = DigestUtils.getMd5Digest(); + while ((ch = inputStream.read(buffer)) != -1) { + md5Digest.update(buffer, 0, ch); + outputStream.write(buffer, 0, ch); + size += ch; + } + if (stringBuilder != null) { + stringBuilder.append(Hex.encodeHexString(md5Digest.digest())); + } + // 通过文件名获取的文件所有的字节,这样就避免了错误更新后的更新都是错的 + long afterSize = -1; + is1 = fileSystem.read(fsPath); + int ch1 = 0; + while ((ch1 = is1.read(buffer)) != -1) { + afterSize += ch1; + } + size = Math.max(size, afterSize - beforeSize); + } catch (final IOException e) { + logger.error("{} write to {} failed, reason is, IOException:", user, path, e); + UploadResourceException uploadResourceException = new UploadResourceException(); + uploadResourceException.initCause(e); + throw uploadResourceException; + } catch (final Throwable t) { + logger.error("{} write to {} failed, reason is", user, path, t); + UploadResourceException uploadResourceException = new UploadResourceException(); + uploadResourceException.initCause(t); + throw uploadResourceException; + } finally { + IOUtils.closeQuietly(outputStream); + IOUtils.closeQuietly(inputStream); + IOUtils.closeQuietly(is0); + IOUtils.closeQuietly(is1); + if (fileSystem != null) { + try { + fileSystem.close(); + } catch (Exception e) { + logger.error("close filesystem failed", e); + } + } + } + return size; + } + + @Override + public void update(String path) {} + + @Override + public void getResource(String path, int start, int end) {} + + @Override + public String getSchema() { + return SCHEMA; + } + + /** + * Motivation to modify this path: This path is under /apps-data/hadoop/user on hdfs, which is + * mixed with the result set file. Bml files cannot be synchronized separately. When the user + * verifies the workflow, it is necessary to synchronize the full amount of personal and full hdfs + * data each time, which is very inconvenient. + */ + @Override + public String generatePath(String user, String fileName, Map properties) { + String resourceHeader = (String) properties.get("resourceHeader"); + SimpleDateFormat format = new SimpleDateFormat("yyyyMMdd"); + String dateStr = format.format(new Date()); + if (StringUtils.isNotEmpty(resourceHeader)) { + return getSchema() + + BmlServerConfiguration.BML_PREFIX().getValue() + + "/" + + user + + "/bml" + + "/" + + dateStr + + "/" + + resourceHeader + + "/" + + fileName; + } else { + return getSchema() + + BmlServerConfiguration.BML_PREFIX().getValue() + + "/" + + user + + "/bml" + + "/" + + dateStr + + "/" + + fileName; + } + } + + @Override + public boolean checkIfExists(String path, String user) throws IOException { + Fs fileSystem = FSFactory.getFsByProxyUser(new FsPath(path), user); + fileSystem.init(null); + try { + return fileSystem.exists(new FsPath(path)); + } finally { + fileSystem.close(); + } + } + + @Override + public boolean checkBmlResourceStoragePrefixPathIfChanged(String path) { + String prefixPath = getSchema() + BmlServerConfiguration.BML_PREFIX().getValue(); + return !path.startsWith(prefixPath); + } +} diff --git a/linkis-public-enhancements/linkis-bml-server/src/main/scala/org/apache/linkis/bml/conf/BmlServerConfiguration.scala b/linkis-public-enhancements/linkis-bml-server/src/main/scala/org/apache/linkis/bml/conf/BmlServerConfiguration.scala index 7fde694ef6..3b2a0d112a 100644 --- a/linkis-public-enhancements/linkis-bml-server/src/main/scala/org/apache/linkis/bml/conf/BmlServerConfiguration.scala +++ b/linkis-public-enhancements/linkis-bml-server/src/main/scala/org/apache/linkis/bml/conf/BmlServerConfiguration.scala @@ -29,6 +29,35 @@ object BmlServerConfiguration { val BML_IS_HDFS: CommonVars[Boolean] = CommonVars[Boolean]("wds.linkis.bml.is.hdfs", true) + /** + * BML_FILESYSTEM_TYPE: 用于区分BML的文件系统类型,目前支持hdfs、file、s3三种类型 默认值通过BML_IS_HDFS判断,以便版本向下兼容 + * BML_FILESYSTEM_TYPE: Used to distinguish the type of file system for BML. Currently, it + * supports three types: hdfs, file, and s3. The default value is determined by BML_IS_HDFS to + * ensure backward compatibility with previous versions. + */ + val BML_FILESYSTEM_TYPE = CommonVars( + "linkis.bml.filesystem.type", + if (BML_IS_HDFS.getValue) { + "hdfs" + } else { + "file" + } + ) + + /** + * BML_PREFIX: BML的文件系统前缀 默认值通过BML_IS_HDFS判断,以便版本向下兼容 BML_PREFIX: The file system prefix for BML. + * The default value is determined based on BML_IS_HDFS to ensure backward compatibility with + * previous versions. + */ + val BML_PREFIX = CommonVars( + "linkis.bml.prefix", + if (BML_IS_HDFS.getValue) { + BML_HDFS_PREFIX.getValue + } else { + BML_LOCAL_PREFIX.getValue + } + ) + val BML_CLEAN_EXPIRED_TIME: CommonVars[Int] = CommonVars[Int]("wds.linkis.bml.cleanExpired.time", 100) diff --git a/linkis-public-enhancements/linkis-pes-publicservice/src/main/java/org/apache/linkis/filesystem/restful/api/FsRestfulApi.java b/linkis-public-enhancements/linkis-pes-publicservice/src/main/java/org/apache/linkis/filesystem/restful/api/FsRestfulApi.java index 28dc65dcc5..1e3ce7599c 100644 --- a/linkis-public-enhancements/linkis-pes-publicservice/src/main/java/org/apache/linkis/filesystem/restful/api/FsRestfulApi.java +++ b/linkis-public-enhancements/linkis-pes-publicservice/src/main/java/org/apache/linkis/filesystem/restful/api/FsRestfulApi.java @@ -148,6 +148,9 @@ public Message getUserRootPath( if (StorageUtils.HDFS().equalsIgnoreCase(pathType)) { path = hdfsUserRootPathPrefix + userName + hdfsUserRootPathSuffix; returnType = StorageUtils.HDFS().toUpperCase(); + } else if (StorageUtils.S3().equalsIgnoreCase(pathType)) { + path = localUserRootPath + userName; + returnType = StorageUtils.S3().toUpperCase(); } else { path = localUserRootPath + userName; returnType = LOCAL_RETURN_TYPE; @@ -1490,6 +1493,7 @@ public Message pythonUpload( // 返回成功消息并包含文件地址 return Message.ok().data("filePath", newPath); } + /** * * *