From f775bb0e2d453d9ceaf7ed626caafdcc174232ed Mon Sep 17 00:00:00 2001 From: zouxxyy Date: Sun, 7 Apr 2024 12:24:52 +0800 Subject: [PATCH 1/2] 1 --- .../org/apache/paimon/options/MemorySize.java | 4 + .../table/PrimaryKeyFileStoreTableTest.java | 78 +++++++++++++++++++ .../table/source/SplitGeneratorTest.java | 21 +++++ 3 files changed, 103 insertions(+) diff --git a/paimon-common/src/main/java/org/apache/paimon/options/MemorySize.java b/paimon-common/src/main/java/org/apache/paimon/options/MemorySize.java index 9296a7054378..d40450e0ce79 100644 --- a/paimon-common/src/main/java/org/apache/paimon/options/MemorySize.java +++ b/paimon-common/src/main/java/org/apache/paimon/options/MemorySize.java @@ -88,6 +88,10 @@ public static MemorySize ofKibiBytes(long kibiBytes) { return new MemorySize(kibiBytes << 10); } + public static MemorySize ofBytes(long bytes) { + return new MemorySize(bytes); + } + // ------------------------------------------------------------------------ /** Gets the memory size in bytes. */ diff --git a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java index a1fc6f0b034d..1d20308b8073 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java @@ -87,7 +87,10 @@ import static org.apache.paimon.CoreOptions.CHANGELOG_PRODUCER; import static org.apache.paimon.CoreOptions.ChangelogProducer.LOOKUP; import static org.apache.paimon.CoreOptions.DELETION_VECTORS_ENABLED; +import static org.apache.paimon.CoreOptions.FILE_FORMAT; import static org.apache.paimon.CoreOptions.SNAPSHOT_EXPIRE_LIMIT; +import static org.apache.paimon.CoreOptions.SOURCE_SPLIT_OPEN_FILE_COST; +import static org.apache.paimon.CoreOptions.SOURCE_SPLIT_TARGET_SIZE; import static org.apache.paimon.CoreOptions.TARGET_FILE_SIZE; import static org.apache.paimon.Snapshot.CommitKind.COMPACT; import static org.apache.paimon.data.DataFormatTestUtil.internalRowToString; @@ -1328,6 +1331,81 @@ public void testReadDeletionVectorTable() throws Exception { commit.close(); } + @Test + public void testReadWithRawConvertibleSplits() throws Exception { + FileStoreTable table = + createFileStoreTable( + options -> { + options.set(FILE_FORMAT, CoreOptions.FileFormatType.AVRO); + options.set(SOURCE_SPLIT_OPEN_FILE_COST, MemorySize.ofBytes(1)); + options.set(SOURCE_SPLIT_TARGET_SIZE, MemorySize.ofKibiBytes(5)); + }); + StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser); + + // file1 + write.write(rowDataWithKind(RowKind.INSERT, 1, 0, 0L)); + commit.commit(0, write.prepareCommit(true, 0)); + + // file2 + for (int i = 1; i < 1000; i++) { + write.write(rowDataWithKind(RowKind.INSERT, 1, i, (long) i)); + } + commit.commit(1, write.prepareCommit(true, 1)); + + // file3 + write.write(rowDataWithKind(RowKind.INSERT, 1, 1000, 1000L)); + commit.commit(2, write.prepareCommit(true, 2)); + + // file4 + write.write(rowDataWithKind(RowKind.INSERT, 1, 1000, 1001L)); + commit.commit(3, write.prepareCommit(true, 3)); + + // split1[file1], split2[file3], split3[file4, file5] + List dataSplits = table.newSnapshotReader().read().dataSplits(); + assertThat(dataSplits).hasSize(3); + assertThat(dataSplits.get(0).dataFiles()).hasSize(1); + assertThat(dataSplits.get(0).convertToRawFiles()).isPresent(); + assertThat(dataSplits.get(1).dataFiles()).hasSize(1); + assertThat(dataSplits.get(1).convertToRawFiles()).isPresent(); + assertThat(dataSplits.get(2).dataFiles()).hasSize(2); + assertThat(dataSplits.get(2).convertToRawFiles()).isEmpty(); + + Function rowDataToString = + row -> + internalRowToString( + row, + DataTypes.ROW( + DataTypes.INT(), DataTypes.INT(), DataTypes.BIGINT())); + List result = + getResult(table.newRead(), table.newScan().plan().splits(), rowDataToString); + assertThat(result.size()).isEqualTo(1001); + for (int i = 0; i < 1000; i++) { + assertThat(result.get(i)).isEqualTo(String.format("+I[1, %s, %s]", i, i)); + } + assertThat(result.get(1000)).isEqualTo("+I[1, 1000, 1001]"); + + // compact all files + write.compact(binaryRow(1), 0, true); + commit.commit(4, write.prepareCommit(true, 4)); + + // split1[compactedFile] + dataSplits = table.newSnapshotReader().read().dataSplits(); + assertThat(dataSplits).hasSize(1); + assertThat(dataSplits.get(0).dataFiles()).hasSize(1); + assertThat(dataSplits.get(0).convertToRawFiles()).isPresent(); + + result = getResult(table.newRead(), table.newScan().plan().splits(), rowDataToString); + assertThat(result.size()).isEqualTo(1001); + for (int i = 0; i < 1000; i++) { + assertThat(result.get(i)).isEqualTo(String.format("+I[1, %s, %s]", i, i)); + } + assertThat(result.get(1000)).isEqualTo("+I[1, 1000, 1001]"); + + write.close(); + commit.close(); + } + @Test public void testTableQueryForLookup() throws Exception { FileStoreTable table = diff --git a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitGeneratorTest.java b/paimon-core/src/test/java/org/apache/paimon/table/source/SplitGeneratorTest.java index 82663a72cb2e..a10413005530 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitGeneratorTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/source/SplitGeneratorTest.java @@ -185,6 +185,27 @@ public void testSplitRawConvertible() { Pair.of(Collections.singletonList("6"), true)); } + @Test + public void testMergeTreeSplitRawConvertible() { + Comparator comparator = Comparator.comparingInt(o -> o.getInt(0)); + MergeTreeSplitGenerator mergeTreeSplitGenerator = + new MergeTreeSplitGenerator(comparator, 100, 2, false, DEDUPLICATE); + + List files = + Arrays.asList( + newFile("1", 0, 0, 10, 10L), + newFile("2", 0, 0, 12, 12L), + newFile("3", 0, 13, 20, 20L), + newFile("4", 0, 21, 200, 200L), + newFile("5", 0, 201, 210, 210L), + newFile("6", 0, 211, 220, 220L)); + assertThat(toNamesAndRawConvertible(mergeTreeSplitGenerator.splitForBatch(files))) + .containsExactlyInAnyOrder( + Pair.of(Arrays.asList("1", "2", "3"), false), + Pair.of(Collections.singletonList("4"), true), + Pair.of(Arrays.asList("5", "6"), false)); + } + private List> toNames(List splitGroups) { return splitGroups.stream() .map( From 15ba229f879a5d527f7636a93e2f473ce12e6dce Mon Sep 17 00:00:00 2001 From: zouxxyy Date: Sun, 7 Apr 2024 13:00:28 +0800 Subject: [PATCH 2/2] update --- .../org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java index 1d20308b8073..3357c9f7d258 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java @@ -1361,7 +1361,7 @@ public void testReadWithRawConvertibleSplits() throws Exception { write.write(rowDataWithKind(RowKind.INSERT, 1, 1000, 1001L)); commit.commit(3, write.prepareCommit(true, 3)); - // split1[file1], split2[file3], split3[file4, file5] + // split1[file1], split2[file2], split3[file3, file4] List dataSplits = table.newSnapshotReader().read().dataSplits(); assertThat(dataSplits).hasSize(3); assertThat(dataSplits.get(0).dataFiles()).hasSize(1);