diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/InternalRowPartitionComputer.java b/paimon-common/src/main/java/org/apache/paimon/utils/InternalRowPartitionComputer.java index bdc5197fe359..07746d58dd2d 100644 --- a/paimon-common/src/main/java/org/apache/paimon/utils/InternalRowPartitionComputer.java +++ b/paimon-common/src/main/java/org/apache/paimon/utils/InternalRowPartitionComputer.java @@ -18,6 +18,7 @@ package org.apache.paimon.utils; +import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.InternalRow; import org.apache.paimon.types.RowType; @@ -77,4 +78,22 @@ public static Map convertSpecToInternal( } return partValues; } + + public static String paritionToString( + RowType partitionType, BinaryRow partition, String delimiter) { + InternalRow.FieldGetter[] getters = partitionType.fieldGetters(); + StringBuilder builder = new StringBuilder(); + for (int i = 0; i < getters.length; i++) { + Object part = getters[i].getFieldOrNull(partition); + if (part != null) { + builder.append(part); + } else { + builder.append("null"); + } + if (i != getters.length - 1) { + builder.append(delimiter); + } + } + return builder.toString(); + } } diff --git a/paimon-common/src/test/java/org/apache/paimon/utils/InternalRowPartitionComputerTest.java b/paimon-common/src/test/java/org/apache/paimon/utils/InternalRowPartitionComputerTest.java new file mode 100644 index 000000000000..fcd79fa4c084 --- /dev/null +++ b/paimon-common/src/test/java/org/apache/paimon/utils/InternalRowPartitionComputerTest.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.utils; + +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.data.BinaryRowWriter; +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.RowType; + +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link InternalRowPartitionComputer}. */ +public class InternalRowPartitionComputerTest { + + @Test + public void testPartitionToString() { + RowType rowType = RowType.of(); + BinaryRow binaryRow = new BinaryRow(0); + BinaryRowWriter writer = new BinaryRowWriter(binaryRow); + assertThat(InternalRowPartitionComputer.paritionToString(rowType, binaryRow, "-")) + .isEqualTo(""); + + rowType = RowType.of(DataTypes.STRING(), DataTypes.INT()); + binaryRow = new BinaryRow(2); + writer = new BinaryRowWriter(binaryRow); + writer.writeString(0, BinaryString.fromString("20240731")); + writer.writeInt(1, 10); + assertThat(InternalRowPartitionComputer.paritionToString(rowType, binaryRow, "-")) + .isEqualTo("20240731-10"); + + rowType = RowType.of(DataTypes.STRING(), DataTypes.INT()); + binaryRow = new BinaryRow(2); + writer = new BinaryRowWriter(binaryRow); + writer.setNullAt(0); + writer.writeInt(1, 10); + assertThat(InternalRowPartitionComputer.paritionToString(rowType, binaryRow, "-")) + .isEqualTo("null-10"); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java index c17f1c2527a8..26341d045c35 100644 --- a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java @@ -169,6 +169,7 @@ public KeyValueFileStoreWrite newWrite(String commitUser, ManifestCacheFilter ma schemaManager, schema, commitUser, + partitionType, keyType, valueType, keyComparatorSupplier, diff --git a/paimon-core/src/main/java/org/apache/paimon/disk/FileChannelManager.java b/paimon-core/src/main/java/org/apache/paimon/disk/FileChannelManager.java index 1c6a709cf783..7349d456603a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/disk/FileChannelManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/disk/FileChannelManager.java @@ -26,6 +26,9 @@ public interface FileChannelManager extends AutoCloseable { /** Creates an ID identifying an underlying file channel and returns it. */ FileIOChannel.ID createChannel(); + /** Creates an ID identifying an underlying file channel and returns it. */ + FileIOChannel.ID createChannel(String prefix); + /** Creates an enumerator for channels that logically belong together and returns it. */ FileIOChannel.Enumerator createChannelEnumerator(); diff --git a/paimon-core/src/main/java/org/apache/paimon/disk/FileChannelManagerImpl.java b/paimon-core/src/main/java/org/apache/paimon/disk/FileChannelManagerImpl.java index d78a361c9e0d..ce175e90bbd1 100644 --- a/paimon-core/src/main/java/org/apache/paimon/disk/FileChannelManagerImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/disk/FileChannelManagerImpl.java @@ -89,6 +89,12 @@ public ID createChannel() { return new ID(paths[num], num, random); } + @Override + public ID createChannel(String prefix) { + int num = (int) (nextPath.getAndIncrement() % paths.length); + return new ID(paths[num], num, prefix, random); + } + @Override public Enumerator createChannelEnumerator() { return new Enumerator(paths, random); diff --git a/paimon-core/src/main/java/org/apache/paimon/disk/FileIOChannel.java b/paimon-core/src/main/java/org/apache/paimon/disk/FileIOChannel.java index 5c9a262df74c..4b72029dddb6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/disk/FileIOChannel.java +++ b/paimon-core/src/main/java/org/apache/paimon/disk/FileIOChannel.java @@ -102,6 +102,11 @@ public ID(File basePath, int bucketNum, Random random) { this.bucketNum = bucketNum; } + public ID(File basePath, int bucketNum, String prefix, Random random) { + this.path = new File(basePath, prefix + "-" + randomString(random) + ".channel"); + this.bucketNum = bucketNum; + } + /** Returns the path to the underlying temporary file. */ public String getPath() { return path.getAbsolutePath(); diff --git a/paimon-core/src/main/java/org/apache/paimon/disk/IOManager.java b/paimon-core/src/main/java/org/apache/paimon/disk/IOManager.java index 8e6b67f8378b..510492583a64 100644 --- a/paimon-core/src/main/java/org/apache/paimon/disk/IOManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/disk/IOManager.java @@ -34,6 +34,8 @@ public interface IOManager extends AutoCloseable { ID createChannel(); + ID createChannel(String prefix); + String[] tempDirs(); Enumerator createChannelEnumerator(); diff --git a/paimon-core/src/main/java/org/apache/paimon/disk/IOManagerImpl.java b/paimon-core/src/main/java/org/apache/paimon/disk/IOManagerImpl.java index 50b8621ecf17..d39c8efb543e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/disk/IOManagerImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/disk/IOManagerImpl.java @@ -77,6 +77,11 @@ public ID createChannel() { return fileChannelManager.createChannel(); } + @Override + public ID createChannel(String prefix) { + return fileChannelManager.createChannel(prefix); + } + @Override public String[] tempDirs() { return tempDirs; diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupFile.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupFile.java index 429ede81039d..4a1b04f52a0f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupFile.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupFile.java @@ -105,4 +105,16 @@ private static void removalCallback(String file, LookupFile lookupFile, RemovalC } } } + + public static String localFilePrefix( + String partition, int bucket, String remoteFileName, int length) { + String identifier; + if (partition.isEmpty()) { + identifier = String.format("%s-%s", bucket, remoteFileName); + } else { + identifier = String.format("%s-%s-%s", partition, bucket, remoteFileName); + } + + return identifier.substring(0, Math.min(identifier.length(), length)); + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupLevels.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupLevels.java index 6cb3bc61b20b..da1192a6e5b5 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupLevels.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupLevels.java @@ -47,7 +47,6 @@ import java.util.Set; import java.util.TreeSet; import java.util.function.Function; -import java.util.function.Supplier; import static org.apache.paimon.utils.VarLengthIntUtils.MAX_VAR_LONG_SIZE; import static org.apache.paimon.utils.VarLengthIntUtils.decodeLong; @@ -61,7 +60,7 @@ public class LookupLevels implements Levels.DropFileCallback, Closeable { private final RowCompactedSerializer keySerializer; private final ValueProcessor valueProcessor; private final IOFunction> fileReaderFactory; - private final Supplier localFileFactory; + private final Function localFileFactory; private final LookupStoreFactory lookupStoreFactory; private final Function bfGenerator; @@ -74,7 +73,7 @@ public LookupLevels( RowType keyType, ValueProcessor valueProcessor, IOFunction> fileReaderFactory, - Supplier localFileFactory, + Function localFileFactory, LookupStoreFactory lookupStoreFactory, Function bfGenerator, Cache lookupFileCache) { @@ -145,7 +144,7 @@ private T lookup(InternalRow key, DataFileMeta file) throws IOException { } private LookupFile createLookupFile(DataFileMeta file) throws IOException { - File localFile = localFileFactory.get(); + File localFile = localFileFactory.apply(file.fileName()); if (!localFile.createNewFile()) { throw new IOException("Can not create new file: " + localFile); } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java index 85a796d2e45e..545f20f096db 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java @@ -69,6 +69,7 @@ import org.apache.paimon.utils.CommitIncrement; import org.apache.paimon.utils.FieldsComparator; import org.apache.paimon.utils.FileStorePathFactory; +import org.apache.paimon.utils.InternalRowPartitionComputer; import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.UserDefinedSeqComparator; @@ -104,6 +105,7 @@ public class KeyValueFileStoreWrite extends MemoryFileStoreWrite { private final FileIO fileIO; private final RowType keyType; private final RowType valueType; + private final RowType partitionType; @Nullable private final RecordLevelExpire recordLevelExpire; @Nullable private Cache lookupFileCache; @@ -112,6 +114,7 @@ public KeyValueFileStoreWrite( SchemaManager schemaManager, TableSchema schema, String commitUser, + RowType partitionType, RowType keyType, RowType valueType, Supplier> keyComparatorSupplier, @@ -136,6 +139,7 @@ public KeyValueFileStoreWrite( deletionVectorsMaintainerFactory, tableName); this.fileIO = fileIO; + this.partitionType = partitionType; this.keyType = keyType; this.valueType = valueType; this.udsComparatorSupplier = udsComparatorSupplier; @@ -325,7 +329,7 @@ private MergeTreeCompactRewriter createRewriter( return new LookupMergeTreeCompactRewriter( maxLevel, mergeEngine, - createLookupLevels(levels, processor, lookupReaderFactory), + createLookupLevels(partition, bucket, levels, processor, lookupReaderFactory), readerFactory, writerFactory, keyComparator, @@ -347,6 +351,8 @@ private MergeTreeCompactRewriter createRewriter( } private LookupLevels createLookupLevels( + BinaryRow partition, + int bucket, Levels levels, LookupLevels.ValueProcessor valueProcessor, FileReaderFactory readerFactory) { @@ -366,14 +372,22 @@ private LookupLevels createLookupLevels( options.get(CoreOptions.LOOKUP_CACHE_FILE_RETENTION), options.get(CoreOptions.LOOKUP_CACHE_MAX_DISK_SIZE)); } - return new LookupLevels<>( levels, keyComparatorSupplier.get(), keyType, valueProcessor, readerFactory::createRecordReader, - () -> ioManager.createChannel().getPathFile(), + file -> + ioManager + .createChannel( + LookupFile.localFilePrefix( + InternalRowPartitionComputer.paritionToString( + partitionType, partition, "-"), + bucket, + file, + 100)) + .getPathFile(), lookupStoreFactory, bfGenerator(options), lookupFileCache); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java b/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java index f0a32d101ea5..5f5463a70cd6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java @@ -39,6 +39,7 @@ import org.apache.paimon.options.Options; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.InternalRowPartitionComputer; import org.apache.paimon.utils.KeyComparatorSupplier; import org.apache.paimon.utils.Preconditions; @@ -75,6 +76,8 @@ public class LocalTableQuery implements TableQuery { @Nullable private Cache lookupFileCache; + private final RowType partitionType; + public LocalTableQuery(FileStoreTable table) { this.options = table.coreOptions(); this.tableView = new HashMap<>(); @@ -86,6 +89,7 @@ public LocalTableQuery(FileStoreTable table) { KeyValueFileStore store = (KeyValueFileStore) tableStore; this.readerFactoryBuilder = store.newReaderFactoryBuilder(); + this.partitionType = table.schema().logicalPartitionType(); RowType keyType = readerFactoryBuilder.keyType(); this.keyComparatorSupplier = new KeyComparatorSupplier(readerFactoryBuilder.keyType()); this.lookupStoreFactory = @@ -155,9 +159,18 @@ private void newLookupLevels(BinaryRow partition, int bucket, List file.fileName(), file.fileSize(), file.level()), - () -> + file -> Preconditions.checkNotNull(ioManager, "IOManager is required.") - .createChannel() + .createChannel( + LookupFile.localFilePrefix( + InternalRowPartitionComputer + .paritionToString( + partitionType, + partition, + "-"), + bucket, + file, + 100)) .getPathFile(), lookupStoreFactory, bfGenerator(options), diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java index 086548547d34..47a9dd902c83 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java @@ -192,7 +192,7 @@ private LookupLevels createContainsLevels(Levels levels, MemorySize max createReaderFactory() .createRecordReader( 0, file.fileName(), file.fileSize(), file.level()), - () -> new File(tempDir.toFile(), LOOKUP_FILE_PREFIX + UUID.randomUUID()), + file -> new File(tempDir.toFile(), LOOKUP_FILE_PREFIX + UUID.randomUUID()), new HashLookupStoreFactory( new CacheManager(MemorySize.ofMebiBytes(1)), 2048, 0.75, "none"), rowCount -> BloomFilter.builder(rowCount, 0.01), diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupFileTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupFileTest.java new file mode 100644 index 000000000000..1fcebb33887d --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupFileTest.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.mergetree; + +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link LookupFile}. */ +public class LookupFileTest { + + @Test + public void testLocalFilePrefix() { + assertThat( + LookupFile.localFilePrefix( + "2024073105", + 10, + "data-ccbb95e7-8b8c-4549-8ca9-f553843d67ad-3.orc", + 100)) + .isEqualTo("2024073105-10-data-ccbb95e7-8b8c-4549-8ca9-f553843d67ad-3.orc"); + assertThat( + LookupFile.localFilePrefix( + "", 10, "data-ccbb95e7-8b8c-4549-8ca9-f553843d67ad-3.orc", 100)) + .isEqualTo("10-data-ccbb95e7-8b8c-4549-8ca9-f553843d67ad-3.orc"); + assertThat( + LookupFile.localFilePrefix( + "2024073105", + 10, + "data-ccbb95e7-8b8c-4549-8ca9-f553843d67ad-3.orc", + 20)) + .isEqualTo("2024073105-10-data-c"); + } +} diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java index 9945b54b9c32..f9b4bf7271bc 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java @@ -271,7 +271,7 @@ private LookupLevels createLookupLevels(Levels levels, MemorySize maxD createReaderFactory() .createRecordReader( 0, file.fileName(), file.fileSize(), file.level()), - () -> new File(tempDir.toFile(), LOOKUP_FILE_PREFIX + UUID.randomUUID()), + file -> new File(tempDir.toFile(), LOOKUP_FILE_PREFIX + UUID.randomUUID()), new HashLookupStoreFactory( new CacheManager(MemorySize.ofMebiBytes(1)), 2048, 0.75, "none"), rowCount -> BloomFilter.builder(rowCount, 0.05), diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java index edd6688da66d..66566f873b06 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java @@ -70,6 +70,8 @@ public class TestChangelogDataReadWrite { new RowType(singletonList(new DataField(0, "k", new BigIntType()))); private static final RowType VALUE_TYPE = new RowType(singletonList(new DataField(0, "v", new BigIntType()))); + private static final RowType PARTITION_TYPE = + new RowType(singletonList(new DataField(0, "p", new IntType()))); private static final Comparator COMPARATOR = Comparator.comparingLong(o -> o.getLong(0)); private static final RecordEqualiser EQUALISER = @@ -171,6 +173,7 @@ public RecordWriter createMergeTreeWriter(BinaryRow partition, int buc schemaManager, schemaManager.schema(0), commitUser, + PARTITION_TYPE, KEY_TYPE, VALUE_TYPE, () -> COMPARATOR,