From 0ae0c021302d2fcd7263644a6c7d7d93ec4e489b Mon Sep 17 00:00:00 2001 From: Houliang Qi Date: Tue, 24 Dec 2024 13:45:06 +0800 Subject: [PATCH] [core] Support read external path in DataFileMeta (#4761) --- .../paimon/append/AppendOnlyWriter.java | 4 +- .../org/apache/paimon/io/DataFileMeta.java | 18 ++--- .../io/DataFileMeta10LegacySerializer.java | 2 +- .../apache/paimon/io/DataFilePathFactory.java | 23 +++++++ .../apache/paimon/io/FileIndexEvaluator.java | 2 +- .../paimon/io/KeyValueDataFileWriter.java | 3 +- .../paimon/io/KeyValueFileReaderFactory.java | 23 +++++-- .../paimon/io/KeyValueFileWriterFactory.java | 10 +-- .../apache/paimon/io/RowDataFileWriter.java | 3 +- .../paimon/manifest/ExpireFileEntry.java | 18 ++++- .../org/apache/paimon/manifest/FileEntry.java | 15 +++- .../apache/paimon/manifest/ManifestEntry.java | 8 ++- .../paimon/manifest/SimpleFileEntry.java | 24 +++++-- .../manifest/SimpleFileEntrySerializer.java | 68 ------------------- .../paimon/mergetree/MergeTreeWriter.java | 14 ++-- .../apache/paimon/migrate/FileMetaUtils.java | 1 + .../paimon/operation/FileStoreCommitImpl.java | 2 +- .../paimon/operation/RawFileSplitRead.java | 5 +- .../paimon/table/query/LocalTableQuery.java | 7 +- .../apache/paimon/table/source/DataSplit.java | 4 +- .../paimon/table/system/FilesTable.java | 11 ++- .../paimon/append/AppendOnlyWriterTest.java | 7 +- .../paimon/io/KeyValueFileReadWriteTest.java | 5 +- .../paimon/mergetree/ContainsLevelsTest.java | 6 +- .../paimon/mergetree/LookupLevelsTest.java | 6 +- .../paimon/mergetree/MergeTreeTestBase.java | 2 +- .../table/AppendOnlyFileStoreTableTest.java | 2 +- .../apache/paimon/table/source/SplitTest.java | 1 + .../paimon/flink/clone/PickFilesUtil.java | 2 +- .../changelog/ChangelogCompactTask.java | 11 +-- .../flink/sink/RewriteFileIndexSink.java | 12 ++-- .../apache/paimon/spark/ScanHelperTest.scala | 2 + 32 files changed, 176 insertions(+), 145 deletions(-) delete mode 100644 paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntrySerializer.java diff --git a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java index a3087e362864..4c313dd655c0 100644 --- a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java @@ -244,7 +244,7 @@ public void close() throws Exception { for (DataFileMeta file : compactAfter) { // appendOnlyCompactManager will rewrite the file and no file upgrade will occur, so we // can directly delete the file in compactAfter. - fileIO.deleteQuietly(pathFactory.toPath(file.fileName())); + fileIO.deleteQuietly(pathFactory.toPath(file)); } sinkWriter.close(); @@ -271,7 +271,7 @@ public void toBufferedWriter() throws Exception { } finally { // remove small files for (DataFileMeta file : files) { - fileIO.deleteQuietly(pathFactory.toPath(file.fileName())); + fileIO.deleteQuietly(pathFactory.toPath(file)); } } } diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java index 3be09ea6c229..459cd788de53 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java @@ -135,7 +135,8 @@ public static DataFileMeta forAppend( List extraFiles, @Nullable byte[] embeddedIndex, @Nullable FileSource fileSource, - @Nullable List valueStatsCols) { + @Nullable List valueStatsCols, + @Nullable String externalPath) { return new DataFileMeta( fileName, fileSize, @@ -154,7 +155,7 @@ public static DataFileMeta forAppend( embeddedIndex, fileSource, valueStatsCols, - null); + externalPath); } public DataFileMeta( @@ -173,7 +174,8 @@ public DataFileMeta( @Nullable Long deleteRowCount, @Nullable byte[] embeddedIndex, @Nullable FileSource fileSource, - @Nullable List valueStatsCols) { + @Nullable List valueStatsCols, + @Nullable String externalPath) { this( fileName, fileSize, @@ -192,7 +194,7 @@ public DataFileMeta( embeddedIndex, fileSource, valueStatsCols, - null); + externalPath); } public DataFileMeta( @@ -403,7 +405,7 @@ public DataFileMeta upgrade(int newLevel) { externalPath); } - public DataFileMeta rename(String newFileName) { + public DataFileMeta rename(String newExternalPath, String newFileName) { return new DataFileMeta( newFileName, fileSize, @@ -422,7 +424,7 @@ public DataFileMeta rename(String newFileName) { embeddedIndex, fileSource, valueStatsCols, - externalPath); + newExternalPath); } public DataFileMeta copyWithoutStats() { @@ -449,8 +451,8 @@ public DataFileMeta copyWithoutStats() { public List collectFiles(DataFilePathFactory pathFactory) { List paths = new ArrayList<>(); - paths.add(pathFactory.toPath(fileName)); - extraFiles.forEach(f -> paths.add(pathFactory.toPath(f))); + paths.add(pathFactory.toPath(this)); + extraFiles.forEach(f -> paths.add(pathFactory.toExtraFilePath(this, f))); return paths; } diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta10LegacySerializer.java b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta10LegacySerializer.java index 68ccba6ea31c..518db7c65834 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta10LegacySerializer.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta10LegacySerializer.java @@ -46,7 +46,7 @@ import static org.apache.paimon.utils.SerializationUtils.newStringType; import static org.apache.paimon.utils.SerializationUtils.serializeBinaryRow; -/** Serializer for {@link DataFileMeta} with 0.9 version. */ +/** Serializer for {@link DataFileMeta} with 1.0 snapshot version. */ public class DataFileMeta10LegacySerializer implements Serializable { private static final long serialVersionUID = 1L; diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java b/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java index b632d44c9420..daeb9f52eada 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java @@ -78,10 +78,33 @@ private Path newPath(String prefix) { return new Path(parent, name); } + @VisibleForTesting public Path toPath(String fileName) { return new Path(parent + "/" + fileName); } + /** + * for read purpose. + * + * @param fileName the file name + * @param externalPath the external path, if null, it will use the parent path + * @return the file's path + */ + public Path toPath(String fileName, String externalPath) { + return new Path((externalPath == null ? parent : externalPath) + "/" + fileName); + } + + public Path toPath(DataFileMeta dataFileMeta) { + String externalPath = dataFileMeta.externalPath(); + String fileName = dataFileMeta.fileName(); + return new Path((externalPath == null ? parent : externalPath) + "/" + fileName); + } + + public Path toExtraFilePath(DataFileMeta dataFileMeta, String extraFile) { + String externalPath = dataFileMeta.externalPath(); + return new Path((externalPath == null ? parent : externalPath) + "/" + extraFile); + } + @VisibleForTesting public String uuid() { return uuid; diff --git a/paimon-core/src/main/java/org/apache/paimon/io/FileIndexEvaluator.java b/paimon-core/src/main/java/org/apache/paimon/io/FileIndexEvaluator.java index 530b87165322..9055097d3718 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/FileIndexEvaluator.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/FileIndexEvaluator.java @@ -62,7 +62,7 @@ public static FileIndexResult evaluate( // go to file index check try (FileIndexPredicate predicate = new FileIndexPredicate( - dataFilePathFactory.toPath(indexFiles.get(0)), + dataFilePathFactory.toExtraFilePath(file, indexFiles.get(0)), fileIO, dataSchema.logicalRowType())) { return predicate.evaluate( diff --git a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java index 651c6a6f7b56..f78d7556487f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java @@ -195,7 +195,8 @@ public DataFileMeta result() throws IOException { deleteRecordCount, indexResult.embeddedIndexBytes(), fileSource, - valueStatsPair.getKey()); + valueStatsPair.getKey(), + null); } abstract Pair fetchKeyValueStats(SimpleColStats[] rowStats); diff --git a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java index 7e272fc97c65..9d65a5411364 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java @@ -20,6 +20,7 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.KeyValue; +import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.InternalRow; import org.apache.paimon.deletionvectors.ApplyDeletionVectorReader; @@ -97,16 +98,25 @@ private KeyValueFileReaderFactory( @Override public RecordReader createRecordReader(DataFileMeta file) throws IOException { - return createRecordReader(file.schemaId(), file.fileName(), file.fileSize(), file.level()); + return createRecordReader( + file.schemaId(), + file.fileName(), + file.fileSize(), + file.level(), + file.externalPath()); } + @VisibleForTesting public RecordReader createRecordReader( - long schemaId, String fileName, long fileSize, int level) throws IOException { + long schemaId, String fileName, long fileSize, int level, String externalPath) + throws IOException { if (fileSize >= asyncThreshold && fileName.endsWith(".orc")) { return new AsyncRecordReader<>( - () -> createRecordReader(schemaId, fileName, level, false, 2, fileSize)); + () -> + createRecordReader( + schemaId, fileName, level, false, 2, fileSize, externalPath)); } - return createRecordReader(schemaId, fileName, level, true, null, fileSize); + return createRecordReader(schemaId, fileName, level, true, null, fileSize, externalPath); } private FileRecordReader createRecordReader( @@ -115,7 +125,8 @@ private FileRecordReader createRecordReader( int level, boolean reuseFormat, @Nullable Integer orcPoolSize, - long fileSize) + long fileSize, + String externalPath) throws IOException { String formatIdentifier = DataFilePathFactory.formatIdentifier(fileName); @@ -132,7 +143,7 @@ private FileRecordReader createRecordReader( new FormatKey(schemaId, formatIdentifier), key -> formatSupplier.get()) : formatSupplier.get(); - Path filePath = pathFactory.toPath(fileName); + Path filePath = pathFactory.toPath(fileName, externalPath); FileRecordReader fileRecordReader = new DataFileRecordReader( diff --git a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java index a6aae3985bd4..500320c24947 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java @@ -142,14 +142,14 @@ private KeyValueDataFileWriter createDataFileWriter( fileIndexOptions); } - public void deleteFile(String filename, int level) { - fileIO.deleteQuietly(formatContext.pathFactory(level).toPath(filename)); + public void deleteFile(DataFileMeta meta, int level) { + fileIO.deleteQuietly(formatContext.pathFactory(level).toPath(meta)); } - public void copyFile(String sourceFileName, String targetFileName, int level) + public void copyFile(DataFileMeta sourceMeta, DataFileMeta targetMeta, int level) throws IOException { - Path sourcePath = formatContext.pathFactory(level).toPath(sourceFileName); - Path targetPath = formatContext.pathFactory(level).toPath(targetFileName); + Path sourcePath = formatContext.pathFactory(level).toPath(sourceMeta); + Path targetPath = formatContext.pathFactory(level).toPath(targetMeta); fileIO.copyFile(sourcePath, targetPath, true); } diff --git a/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java b/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java index 8c2e8ec9498c..cd46d67e3b60 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java @@ -124,6 +124,7 @@ public DataFileMeta result() throws IOException { : Collections.singletonList(indexResult.independentIndexFile()), indexResult.embeddedIndexBytes(), fileSource, - statsPair.getKey()); + statsPair.getKey(), + null); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/ExpireFileEntry.java b/paimon-core/src/main/java/org/apache/paimon/manifest/ExpireFileEntry.java index 060360623cd0..5d6d68144e1c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/ExpireFileEntry.java +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ExpireFileEntry.java @@ -41,8 +41,19 @@ public ExpireFileEntry( @Nullable byte[] embeddedIndex, BinaryRow minKey, BinaryRow maxKey, - @Nullable FileSource fileSource) { - super(kind, partition, bucket, level, fileName, extraFiles, embeddedIndex, minKey, maxKey); + @Nullable FileSource fileSource, + @Nullable String externalPath) { + super( + kind, + partition, + bucket, + level, + fileName, + extraFiles, + embeddedIndex, + minKey, + maxKey, + externalPath); this.fileSource = fileSource; } @@ -61,7 +72,8 @@ public static ExpireFileEntry from(ManifestEntry entry) { entry.file().embeddedIndex(), entry.minKey(), entry.maxKey(), - entry.file().fileSource().orElse(null)); + entry.file().fileSource().orElse(null), + entry.externalPath()); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java b/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java index a2569beac61c..738776438be7 100644 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java @@ -54,6 +54,8 @@ public interface FileEntry { String fileName(); + String externalPath(); + Identifier identifier(); BinaryRow minKey(); @@ -73,6 +75,7 @@ class Identifier { public final String fileName; public final List extraFiles; @Nullable private final byte[] embeddedIndex; + @Nullable public final String externalPath; /* Cache the hash code for the string */ private Integer hash; @@ -83,13 +86,15 @@ public Identifier( int level, String fileName, List extraFiles, - @Nullable byte[] embeddedIndex) { + @Nullable byte[] embeddedIndex, + @Nullable String externalPath) { this.partition = partition; this.bucket = bucket; this.level = level; this.fileName = fileName; this.extraFiles = extraFiles; this.embeddedIndex = embeddedIndex; + this.externalPath = externalPath; } @Override @@ -106,7 +111,8 @@ public boolean equals(Object o) { && Objects.equals(partition, that.partition) && Objects.equals(fileName, that.fileName) && Objects.equals(extraFiles, that.extraFiles) - && Objects.deepEquals(embeddedIndex, that.embeddedIndex); + && Objects.deepEquals(embeddedIndex, that.embeddedIndex) + && Objects.deepEquals(externalPath, that.externalPath); } @Override @@ -119,7 +125,8 @@ public int hashCode() { level, fileName, extraFiles, - Arrays.hashCode(embeddedIndex)); + Arrays.hashCode(embeddedIndex), + externalPath); } return hash; } @@ -138,6 +145,8 @@ public String toString() { + extraFiles + ", embeddedIndex=" + Arrays.toString(embeddedIndex) + + ", externalPath=" + + externalPath + '}'; } diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java index 626e0a5d468f..d4748451d8ca 100644 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java @@ -92,6 +92,11 @@ public String fileName() { return file.fileName(); } + @Override + public String externalPath() { + return file.externalPath(); + } + @Override public BinaryRow minKey() { return file.minKey(); @@ -123,7 +128,8 @@ public Identifier identifier() { file.level(), file.fileName(), file.extraFiles(), - file.embeddedIndex()); + file.embeddedIndex(), + file.externalPath()); } public ManifestEntry copyWithoutStats() { diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntry.java b/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntry.java index fdaed2b85aaf..f86bded52d46 100644 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntry.java +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntry.java @@ -38,6 +38,7 @@ public class SimpleFileEntry implements FileEntry { @Nullable private final byte[] embeddedIndex; private final BinaryRow minKey; private final BinaryRow maxKey; + @Nullable private final String externalPath; public SimpleFileEntry( FileKind kind, @@ -48,7 +49,8 @@ public SimpleFileEntry( List extraFiles, @Nullable byte[] embeddedIndex, BinaryRow minKey, - BinaryRow maxKey) { + BinaryRow maxKey, + @Nullable String externalPath) { this.kind = kind; this.partition = partition; this.bucket = bucket; @@ -58,6 +60,7 @@ public SimpleFileEntry( this.embeddedIndex = embeddedIndex; this.minKey = minKey; this.maxKey = maxKey; + this.externalPath = externalPath; } public static SimpleFileEntry from(ManifestEntry entry) { @@ -70,7 +73,8 @@ public static SimpleFileEntry from(ManifestEntry entry) { entry.file().extraFiles(), entry.file().embeddedIndex(), entry.minKey(), - entry.maxKey()); + entry.maxKey(), + entry.externalPath()); } public static List from(List entries) { @@ -102,9 +106,15 @@ public String fileName() { return fileName; } + @Override + public String externalPath() { + return externalPath; + } + @Override public Identifier identifier() { - return new Identifier(partition, bucket, level, fileName, extraFiles, embeddedIndex); + return new Identifier( + partition, bucket, level, fileName, extraFiles, embeddedIndex, externalPath); } @Override @@ -138,12 +148,14 @@ public boolean equals(Object o) { && Objects.equals(fileName, that.fileName) && Objects.equals(extraFiles, that.extraFiles) && Objects.equals(minKey, that.minKey) - && Objects.equals(maxKey, that.maxKey); + && Objects.equals(maxKey, that.maxKey) + && Objects.equals(externalPath, that.externalPath); } @Override public int hashCode() { - return Objects.hash(kind, partition, bucket, level, fileName, extraFiles, minKey, maxKey); + return Objects.hash( + kind, partition, bucket, level, fileName, extraFiles, minKey, maxKey, externalPath); } @Override @@ -165,6 +177,8 @@ public String toString() { + minKey + ", maxKey=" + maxKey + + ", externalPath=" + + externalPath + '}'; } } diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntrySerializer.java b/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntrySerializer.java deleted file mode 100644 index bdc89b8d4c3d..000000000000 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntrySerializer.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.manifest; - -import org.apache.paimon.data.InternalRow; -import org.apache.paimon.io.DataFileMeta; -import org.apache.paimon.utils.VersionedObjectSerializer; - -import static org.apache.paimon.utils.InternalRowUtils.fromStringArrayData; -import static org.apache.paimon.utils.SerializationUtils.deserializeBinaryRow; - -/** A {@link VersionedObjectSerializer} for {@link SimpleFileEntry}, only supports reading. */ -public class SimpleFileEntrySerializer extends VersionedObjectSerializer { - - private static final long serialVersionUID = 1L; - - private final int version; - - public SimpleFileEntrySerializer() { - super(ManifestEntry.SCHEMA); - this.version = new ManifestEntrySerializer().getVersion(); - } - - @Override - public int getVersion() { - return version; - } - - @Override - public InternalRow convertTo(SimpleFileEntry meta) { - throw new UnsupportedOperationException("Only supports convert from row."); - } - - @Override - public SimpleFileEntry convertFrom(int version, InternalRow row) { - if (this.version != version) { - throw new IllegalArgumentException("Unsupported version: " + version); - } - - InternalRow file = row.getRow(4, DataFileMeta.SCHEMA.getFieldCount()); - return new SimpleFileEntry( - FileKind.fromByteValue(row.getByte(0)), - deserializeBinaryRow(row.getBinary(1)), - row.getInt(2), - file.getInt(10), - file.getString(0).toString(), - fromStringArrayData(file.getArray(11)), - file.isNullAt(14) ? null : file.getBinary(14), - deserializeBinaryRow(file.getBinary(3)), - deserializeBinaryRow(file.getBinary(4))); - } -} diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java index f2a964bae16a..df4855922360 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java @@ -27,6 +27,7 @@ import org.apache.paimon.compression.CompressOptions; import org.apache.paimon.data.InternalRow; import org.apache.paimon.disk.IOManager; +import org.apache.paimon.fs.Path; import org.apache.paimon.io.CompactIncrement; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.DataIncrement; @@ -241,9 +242,10 @@ private void flushWriteBuffer(boolean waitForLatestCompaction, boolean forcedFul } else if (changelogProducer == ChangelogProducer.INPUT && isInsertOnly) { List changelogMetas = new ArrayList<>(); for (DataFileMeta dataMeta : dataMetas) { + Path newPath = writerFactory.newChangelogPath(0); DataFileMeta changelogMeta = - dataMeta.rename(writerFactory.newChangelogPath(0).getName()); - writerFactory.copyFile(dataMeta.fileName(), changelogMeta.fileName(), 0); + dataMeta.rename(newPath.getParent().getName(), newPath.getName()); + writerFactory.copyFile(dataMeta, changelogMeta, 0); changelogMetas.add(changelogMeta); } newFilesChangelog.addAll(changelogMetas); @@ -341,7 +343,7 @@ private void updateCompactResult(CompactResult result) { // 2. This file is not the input of upgraded. if (!compactBefore.containsKey(file.fileName()) && !afterFiles.contains(file.fileName())) { - writerFactory.deleteFile(file.fileName(), file.level()); + writerFactory.deleteFile(file, file.level()); } } else { compactBefore.put(file.fileName(), file); @@ -375,7 +377,7 @@ public void close() throws Exception { deletedFiles.clear(); for (DataFileMeta file : newFilesChangelog) { - writerFactory.deleteFile(file.fileName(), file.level()); + writerFactory.deleteFile(file, file.level()); } newFilesChangelog.clear(); @@ -390,12 +392,12 @@ public void close() throws Exception { compactAfter.clear(); for (DataFileMeta file : compactChangelog) { - writerFactory.deleteFile(file.fileName(), file.level()); + writerFactory.deleteFile(file, file.level()); } compactChangelog.clear(); for (DataFileMeta file : delete) { - writerFactory.deleteFile(file.fileName(), file.level()); + writerFactory.deleteFile(file, file.level()); } if (compactDeletionFile != null) { diff --git a/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java b/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java index 391c5f9bb615..51a0b5e2a92e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java +++ b/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java @@ -169,6 +169,7 @@ private static DataFileMeta constructFileMeta( Collections.emptyList(), null, FileSource.APPEND, + null, null); } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java index 001132e1671c..0b4783b16579 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java @@ -593,7 +593,7 @@ public void abort(List commitMessages) { toDelete.addAll(commitMessage.compactIncrement().changelogFiles()); for (DataFileMeta file : toDelete) { - fileIO.deleteQuietly(pathFactory.toPath(file.fileName())); + fileIO.deleteQuietly(pathFactory.toPath(file)); } } } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java index 4fda82f4e88f..d0f3275b5afc 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java @@ -210,10 +210,7 @@ private FileRecordReader createFileReader( FormatReaderContext formatReaderContext = new FormatReaderContext( - fileIO, - dataFilePathFactory.toPath(file.fileName()), - file.fileSize(), - fileIndexResult); + fileIO, dataFilePathFactory.toPath(file), file.fileSize(), fileIndexResult); FileRecordReader fileRecordReader = new DataFileRecordReader( formatReaderMapping.getReaderFactory(), diff --git a/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java b/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java index 8ff5ce7a6580..d474d4e2d023 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java @@ -161,12 +161,7 @@ private void newLookupLevels(BinaryRow partition, int bucket, List readerFactoryBuilder.keyType(), new LookupLevels.KeyValueProcessor(readerFactoryBuilder.readValueType()), file -> { - RecordReader reader = - factory.createRecordReader( - file.schemaId(), - file.fileName(), - file.fileSize(), - file.level()); + RecordReader reader = factory.createRecordReader(file); if (cacheRowFilter != null) { reader = reader.filter( diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java index bf60234214fa..9178d25a912e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java @@ -180,8 +180,10 @@ public Optional> convertToRawFiles() { } private RawFile makeRawTableFile(String bucketPath, DataFileMeta file) { + String path = file.externalPath() != null ? file.externalPath() : bucketPath; + path += "/" + file.fileName(); return new RawFile( - bucketPath + "/" + file.fileName(), + path, file.fileSize(), 0, file.fileSize(), diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java index 6dcbb322d6d0..3107ebe150e3 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java @@ -385,8 +385,15 @@ private LazyGenericRow toRow( dataSplit.partition()))), dataSplit::bucket, () -> - BinaryString.fromString( - dataSplit.bucketPath() + "/" + dataFileMeta.fileName()), + dataFileMeta.externalPath() == null + ? BinaryString.fromString( + dataSplit.bucketPath() + + "/" + + dataFileMeta.fileName()) + : BinaryString.fromString( + dataFileMeta.externalPath() + + "/" + + dataFileMeta.fileName()), () -> BinaryString.fromString( DataFilePathFactory.formatIdentifier( diff --git a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java index a9012ed89b34..3f752be13e73 100644 --- a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java @@ -125,7 +125,7 @@ public void testSingleWrite() throws Exception { DataFileMeta meta = increment.newFilesIncrement().newFiles().get(0); assertThat(meta).isNotNull(); - Path path = pathFactory.toPath(meta.fileName()); + Path path = pathFactory.toPath(meta); assertThat(LocalFileIO.create().exists(path)).isTrue(); assertThat(meta.rowCount()).isEqualTo(1L); @@ -186,7 +186,7 @@ public void testMultipleCommits() throws Exception { assertThat(inc.newFilesIncrement().newFiles().size()).isEqualTo(1); DataFileMeta meta = inc.newFilesIncrement().newFiles().get(0); - Path path = pathFactory.toPath(meta.fileName()); + Path path = pathFactory.toPath(meta); assertThat(LocalFileIO.create().exists(path)).isTrue(); assertThat(meta.rowCount()).isEqualTo(100L); @@ -227,7 +227,7 @@ public void testRollingWrite() throws Exception { int id = 0; for (DataFileMeta meta : firstInc.newFilesIncrement().newFiles()) { - Path path = pathFactory.toPath(meta.fileName()); + Path path = pathFactory.toPath(meta); assertThat(LocalFileIO.create().exists(path)).isTrue(); assertThat(meta.rowCount()).isEqualTo(1000L); @@ -680,6 +680,7 @@ private DataFileMeta generateCompactAfter(List toCompact) throws I Collections.emptyList(), null, FileSource.APPEND, + null, null); } } diff --git a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java index e43cd898dbc2..e81756268980 100644 --- a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java @@ -78,7 +78,7 @@ public class KeyValueFileReadWriteTest { public void testReadNonExistentFile() { KeyValueFileReaderFactory readerFactory = createReaderFactory(tempDir.toString(), "avro", null, null); - assertThatThrownBy(() -> readerFactory.createRecordReader(0, "dummy_file.avro", 1, 0)) + assertThatThrownBy(() -> readerFactory.createRecordReader(0, "dummy_file.avro", 1, 0, null)) .hasMessageContaining( "you can configure 'snapshot.time-retained' option with a larger value."); } @@ -312,7 +312,8 @@ private void assertData( meta.schemaId(), meta.fileName(), meta.fileSize(), - meta.level())); + meta.level(), + meta.externalPath())); while (actualKvsIterator.hasNext()) { assertThat(expectedIterator.hasNext()).isTrue(); KeyValue actualKv = actualKvsIterator.next(); diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java index be49311427a0..fa96765a4278 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java @@ -198,7 +198,11 @@ private LookupLevels createContainsLevels(Levels levels, MemorySize max file -> createReaderFactory() .createRecordReader( - 0, file.fileName(), file.fileSize(), file.level()), + 0, + file.fileName(), + file.fileSize(), + file.level(), + file.externalPath()), file -> new File(tempDir.toFile(), LOOKUP_FILE_PREFIX + UUID.randomUUID()), new HashLookupStoreFactory( new CacheManager(MemorySize.ofMebiBytes(1)), diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java index a678534042eb..56c45cfdc442 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java @@ -275,7 +275,11 @@ private LookupLevels createLookupLevels(Levels levels, MemorySize maxD file -> createReaderFactory() .createRecordReader( - 0, file.fileName(), file.fileSize(), file.level()), + 0, + file.fileName(), + file.fileSize(), + file.level(), + file.externalPath()), file -> new File(tempDir.toFile(), LOOKUP_FILE_PREFIX + UUID.randomUUID()), new HashLookupStoreFactory( new CacheManager(MemorySize.ofMebiBytes(1)), diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java index f2a9c44dd7ce..47d12ce47cf4 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java @@ -592,7 +592,7 @@ private void mergeCompacted( assertThat(remove).isTrue(); // See MergeTreeWriter.updateCompactResult if (!newFileNames.contains(file.fileName()) && !afterFiles.contains(file.fileName())) { - compactWriterFactory.deleteFile(file.fileName(), file.level()); + compactWriterFactory.deleteFile(file, file.level()); } } compactedFiles.addAll(increment.compactIncrement().compactAfter()); diff --git a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java index 01d4e89af95d..471b60d3cfa5 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java @@ -112,7 +112,7 @@ public void testReadDeletedFiles() throws Exception { table.store() .pathFactory() .createDataFilePathFactory(split.partition(), split.bucket()) - .toPath(split.dataFiles().get(0).fileName()); + .toPath(split.dataFiles().get(0)); table.fileIO().deleteQuietly(path); // read diff --git a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java b/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java index a4e581b701ea..a088f40dab21 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java @@ -447,6 +447,7 @@ private DataFileMeta newDataFile(long rowCount) { Collections.emptyList(), null, null, + null, null); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/PickFilesUtil.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/PickFilesUtil.java index f83b5cf8f9e3..b7e1e608789b 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/PickFilesUtil.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/PickFilesUtil.java @@ -108,7 +108,7 @@ private static List getUsedFilesInternal( pathFactory .createDataFilePathFactory( simpleFileEntry.partition(), simpleFileEntry.bucket()) - .toPath(simpleFileEntry.fileName()); + .toPath(simpleFileEntry.fileName(), simpleFileEntry.externalPath()); dataFiles.add(dataFilePath); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTask.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTask.java index 6b95e369074b..39860e418c18 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTask.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTask.java @@ -96,7 +96,7 @@ public List doCompact(FileStoreTable table) throws Exception { outputStream, results, table, - dataFilePathFactory.toPath(meta.fileName()), + dataFilePathFactory.toPath(meta), bucket, false, meta); @@ -111,7 +111,7 @@ public List doCompact(FileStoreTable table) throws Exception { outputStream, results, table, - dataFilePathFactory.toPath(meta.fileName()), + dataFilePathFactory.toPath(meta), bucket, true, meta); @@ -167,7 +167,8 @@ private List produceNewCommittables( table.fileIO() .rename( changelogTempPath, - dataFilePathFactory.toPath( + dataFilePathFactory.toExtraFilePath( + baseResult.meta, realName + "." + CompactedChangelogReadOnlyFormat.getIdentifier( @@ -193,9 +194,9 @@ private List produceNewCommittables( + CompactedChangelogReadOnlyFormat.getIdentifier( result.meta.fileFormat()); if (result.isCompactResult) { - compactChangelog.add(result.meta.rename(name)); + compactChangelog.add(result.meta.rename(baseResult.meta.externalPath(), name)); } else { - newFilesChangelog.add(result.meta.rename(name)); + newFilesChangelog.add(result.meta.rename(baseResult.meta.externalPath(), name)); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RewriteFileIndexSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RewriteFileIndexSink.java index d9f863c6b919..d35cb09cb78a 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RewriteFileIndexSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RewriteFileIndexSink.java @@ -198,16 +198,18 @@ public DataFileMeta process(BinaryRow partition, int bucket, DataFileMeta dataFi String indexFile = indexFiles.get(0); try (FileIndexFormat.Reader indexReader = FileIndexFormat.createReader( - fileIO.newInputStream(dataFilePathFactory.toPath(indexFile)), + fileIO.newInputStream( + dataFilePathFactory.toExtraFilePath( + dataFileMeta, indexFile)), schemaInfo.fileSchema)) { maintainers = indexReader.readAll(); } - newIndexPath = createNewFileIndexFilePath(dataFilePathFactory.toPath(indexFile)); + newIndexPath = + createNewFileIndexFilePath( + dataFilePathFactory.toExtraFilePath(dataFileMeta, indexFile)); } else { maintainers = new HashMap<>(); - newIndexPath = - dataFileToFileIndexPath( - dataFilePathFactory.toPath(dataFileMeta.fileName())); + newIndexPath = dataFileToFileIndexPath(dataFilePathFactory.toPath(dataFileMeta)); } // remove unnecessary diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/ScanHelperTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/ScanHelperTest.scala index a3223446f644..4997f65eaf0c 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/ScanHelperTest.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/ScanHelperTest.scala @@ -53,6 +53,7 @@ class ScanHelperTest extends PaimonSparkTestBase { new java.util.ArrayList[String](), null, FileSource.APPEND, + null, null) } @@ -89,6 +90,7 @@ class ScanHelperTest extends PaimonSparkTestBase { new java.util.ArrayList[String](), null, FileSource.APPEND, + null, null) ).asJava