Skip to content

Commit

Permalink
[test] Add test case for read with raw convertible splits (apache#3163)
Browse files Browse the repository at this point in the history
  • Loading branch information
Zouxxyy authored Apr 7, 2024
1 parent deee94b commit 2a928b9
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ public static MemorySize ofKibiBytes(long kibiBytes) {
return new MemorySize(kibiBytes << 10);
}

public static MemorySize ofBytes(long bytes) {
return new MemorySize(bytes);
}

// ------------------------------------------------------------------------

/** Gets the memory size in bytes. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,10 @@
import static org.apache.paimon.CoreOptions.CHANGELOG_PRODUCER;
import static org.apache.paimon.CoreOptions.ChangelogProducer.LOOKUP;
import static org.apache.paimon.CoreOptions.DELETION_VECTORS_ENABLED;
import static org.apache.paimon.CoreOptions.FILE_FORMAT;
import static org.apache.paimon.CoreOptions.SNAPSHOT_EXPIRE_LIMIT;
import static org.apache.paimon.CoreOptions.SOURCE_SPLIT_OPEN_FILE_COST;
import static org.apache.paimon.CoreOptions.SOURCE_SPLIT_TARGET_SIZE;
import static org.apache.paimon.CoreOptions.TARGET_FILE_SIZE;
import static org.apache.paimon.Snapshot.CommitKind.COMPACT;
import static org.apache.paimon.data.DataFormatTestUtil.internalRowToString;
Expand Down Expand Up @@ -1328,6 +1331,81 @@ public void testReadDeletionVectorTable() throws Exception {
commit.close();
}

@Test
public void testReadWithRawConvertibleSplits() throws Exception {
FileStoreTable table =
createFileStoreTable(
options -> {
options.set(FILE_FORMAT, CoreOptions.FileFormatType.AVRO);
options.set(SOURCE_SPLIT_OPEN_FILE_COST, MemorySize.ofBytes(1));
options.set(SOURCE_SPLIT_TARGET_SIZE, MemorySize.ofKibiBytes(5));
});
StreamTableWrite write = table.newWrite(commitUser);
StreamTableCommit commit = table.newCommit(commitUser);

// file1
write.write(rowDataWithKind(RowKind.INSERT, 1, 0, 0L));
commit.commit(0, write.prepareCommit(true, 0));

// file2
for (int i = 1; i < 1000; i++) {
write.write(rowDataWithKind(RowKind.INSERT, 1, i, (long) i));
}
commit.commit(1, write.prepareCommit(true, 1));

// file3
write.write(rowDataWithKind(RowKind.INSERT, 1, 1000, 1000L));
commit.commit(2, write.prepareCommit(true, 2));

// file4
write.write(rowDataWithKind(RowKind.INSERT, 1, 1000, 1001L));
commit.commit(3, write.prepareCommit(true, 3));

// split1[file1], split2[file2], split3[file3, file4]
List<DataSplit> dataSplits = table.newSnapshotReader().read().dataSplits();
assertThat(dataSplits).hasSize(3);
assertThat(dataSplits.get(0).dataFiles()).hasSize(1);
assertThat(dataSplits.get(0).convertToRawFiles()).isPresent();
assertThat(dataSplits.get(1).dataFiles()).hasSize(1);
assertThat(dataSplits.get(1).convertToRawFiles()).isPresent();
assertThat(dataSplits.get(2).dataFiles()).hasSize(2);
assertThat(dataSplits.get(2).convertToRawFiles()).isEmpty();

Function<InternalRow, String> rowDataToString =
row ->
internalRowToString(
row,
DataTypes.ROW(
DataTypes.INT(), DataTypes.INT(), DataTypes.BIGINT()));
List<String> result =
getResult(table.newRead(), table.newScan().plan().splits(), rowDataToString);
assertThat(result.size()).isEqualTo(1001);
for (int i = 0; i < 1000; i++) {
assertThat(result.get(i)).isEqualTo(String.format("+I[1, %s, %s]", i, i));
}
assertThat(result.get(1000)).isEqualTo("+I[1, 1000, 1001]");

// compact all files
write.compact(binaryRow(1), 0, true);
commit.commit(4, write.prepareCommit(true, 4));

// split1[compactedFile]
dataSplits = table.newSnapshotReader().read().dataSplits();
assertThat(dataSplits).hasSize(1);
assertThat(dataSplits.get(0).dataFiles()).hasSize(1);
assertThat(dataSplits.get(0).convertToRawFiles()).isPresent();

result = getResult(table.newRead(), table.newScan().plan().splits(), rowDataToString);
assertThat(result.size()).isEqualTo(1001);
for (int i = 0; i < 1000; i++) {
assertThat(result.get(i)).isEqualTo(String.format("+I[1, %s, %s]", i, i));
}
assertThat(result.get(1000)).isEqualTo("+I[1, 1000, 1001]");

write.close();
commit.close();
}

@Test
public void testTableQueryForLookup() throws Exception {
FileStoreTable table =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,27 @@ public void testSplitRawConvertible() {
Pair.of(Collections.singletonList("6"), true));
}

@Test
public void testMergeTreeSplitRawConvertible() {
Comparator<InternalRow> comparator = Comparator.comparingInt(o -> o.getInt(0));
MergeTreeSplitGenerator mergeTreeSplitGenerator =
new MergeTreeSplitGenerator(comparator, 100, 2, false, DEDUPLICATE);

List<DataFileMeta> files =
Arrays.asList(
newFile("1", 0, 0, 10, 10L),
newFile("2", 0, 0, 12, 12L),
newFile("3", 0, 13, 20, 20L),
newFile("4", 0, 21, 200, 200L),
newFile("5", 0, 201, 210, 210L),
newFile("6", 0, 211, 220, 220L));
assertThat(toNamesAndRawConvertible(mergeTreeSplitGenerator.splitForBatch(files)))
.containsExactlyInAnyOrder(
Pair.of(Arrays.asList("1", "2", "3"), false),
Pair.of(Collections.singletonList("4"), true),
Pair.of(Arrays.asList("5", "6"), false));
}

private List<List<String>> toNames(List<SplitGenerator.SplitGroup> splitGroups) {
return splitGroups.stream()
.map(
Expand Down

0 comments on commit 2a928b9

Please sign in to comment.