Skip to content

Commit

Permalink
[core] Fix dv table with partial-update and aggregate (apache#3036)
Browse files Browse the repository at this point in the history
  • Loading branch information
Zouxxyy authored Mar 18, 2024
1 parent fba132e commit b6dd27a
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
import java.util.function.Supplier;

import static org.apache.paimon.CoreOptions.ChangelogProducer.FULL_COMPACTION;
import static org.apache.paimon.CoreOptions.MergeEngine.DEDUPLICATE;
import static org.apache.paimon.CoreOptions.MergeEngine.FIRST_ROW;
import static org.apache.paimon.io.DataFileMeta.getMaxSequenceNumber;
import static org.apache.paimon.lookup.LookupStoreFactory.bfGenerator;
Expand Down Expand Up @@ -255,8 +256,6 @@ private MergeTreeCompactRewriter createRewriter(
@Nullable FieldsComparator userDefinedSeqComparator,
Levels levels,
@Nullable DeletionVectorsMaintainer dvMaintainer) {
KeyValueFileReaderFactory.Builder readerFactoryBuilder =
this.readerFactoryBuilder.copyWithoutProjection();
DeletionVector.Factory dvFactory = DeletionVector.factory(dvMaintainer);
KeyValueFileReaderFactory readerFactory =
readerFactoryBuilder.build(partition, bucket, dvFactory);
Expand Down Expand Up @@ -286,10 +285,11 @@ private MergeTreeCompactRewriter createRewriter(
if (mergeEngine == FIRST_ROW) {
if (options.deletionVectorsEnabled()) {
throw new UnsupportedOperationException(
"Deletion vectors mode is not supported for first row merge engine now.");
"First row merge engine does not need deletion vectors because there is no deletion of old data in this merge engine.");
}
lookupReaderFactory =
readerFactoryBuilder
.copyWithoutProjection()
.withValueProjection(new int[0][])
.build(partition, bucket, dvFactory);
processor = new ContainsValueProcessor();
Expand All @@ -298,7 +298,9 @@ private MergeTreeCompactRewriter createRewriter(
processor =
lookupStrategy.deletionVector
? new PositionedKeyValueProcessor(
valueType, lookupStrategy.produceChangelog)
valueType,
lookupStrategy.produceChangelog
|| mergeEngine != DEDUPLICATE)
: new KeyValueProcessor(valueType);
wrapperFactory =
new LookupMergeFunctionWrapperFactory<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -463,10 +463,9 @@ private static void validateForDeletionVectors(TableSchema schema, CoreOptions o
|| options.changelogProducer() == ChangelogProducer.LOOKUP,
"Deletion vectors mode is only supported for none or lookup changelog producer now.");

// todo: implement it
checkArgument(
!options.mergeEngine().equals(MergeEngine.FIRST_ROW),
"Deletion vectors mode is not supported for first row merge engine now.");
"First row merge engine does not need deletion vectors because there is no deletion of old data in this merge engine.");
}

private static void validateSequenceField(TableSchema schema, CoreOptions options) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public void testStreamingReadDVTable(String changelogProducer) throws Exception
// test read from APPEND snapshot
try (BlockingIterator<Row, Row> iter =
streamSqlBlockIter(
"SELECT * FROM T /*+ OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id' = '3') */"); ) {
"SELECT * FROM T /*+ OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id' = '3') */")) {
assertThat(iter.collect(12))
.containsExactlyInAnyOrder(
Row.ofKind(RowKind.INSERT, 1, "111111111"),
Expand All @@ -68,7 +68,7 @@ public void testStreamingReadDVTable(String changelogProducer) throws Exception
// test read from COMPACT snapshot
try (BlockingIterator<Row, Row> iter =
streamSqlBlockIter(
"SELECT * FROM T /*+ OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id' = '4') */"); ) {
"SELECT * FROM T /*+ OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id' = '4') */")) {
assertThat(iter.collect(8))
.containsExactlyInAnyOrder(
Row.ofKind(RowKind.INSERT, 1, "111111111"),
Expand Down Expand Up @@ -113,4 +113,89 @@ public void testBatchReadDVTable(String changelogProducer) {
.containsExactlyInAnyOrder(
Row.of(1, "111111111"), Row.of(2, "2_1"), Row.of(3, "3_1"), Row.of(4, "4"));
}

@ParameterizedTest
@ValueSource(strings = {"none", "lookup"})
public void testDVTableWithAggregationMergeEngine(String changelogProducer) throws Exception {
sql(
String.format(
"CREATE TABLE T (id INT PRIMARY KEY NOT ENFORCED, v INT) "
+ "WITH ('deletion-vectors.enabled' = 'true', 'changelog-producer' = '%s', "
+ "'merge-engine'='aggregation', 'fields.v.aggregate-function'='sum')",
changelogProducer));

sql("INSERT INTO T VALUES (1, 111111111), (2, 2), (3, 3), (4, 4)");

sql("INSERT INTO T VALUES (2, 1), (3, 1)");

sql("INSERT INTO T VALUES (2, 1), (4, 1)");

// test batch read
assertThat(batchSql("SELECT * FROM T"))
.containsExactlyInAnyOrder(
Row.of(1, 111111111), Row.of(2, 4), Row.of(3, 4), Row.of(4, 5));

// test streaming read
if (changelogProducer.equals("lookup")) {
try (BlockingIterator<Row, Row> iter =
streamSqlBlockIter(
"SELECT * FROM T /*+ OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id' = '4') */")) {
assertThat(iter.collect(8))
.containsExactlyInAnyOrder(
Row.ofKind(RowKind.INSERT, 1, 111111111),
Row.ofKind(RowKind.INSERT, 2, 3),
Row.ofKind(RowKind.INSERT, 3, 4),
Row.ofKind(RowKind.INSERT, 4, 4),
Row.ofKind(RowKind.UPDATE_BEFORE, 2, 3),
Row.ofKind(RowKind.UPDATE_AFTER, 2, 4),
Row.ofKind(RowKind.UPDATE_BEFORE, 4, 4),
Row.ofKind(RowKind.UPDATE_AFTER, 4, 5));
}
}
}

@ParameterizedTest
@ValueSource(strings = {"none", "lookup"})
public void testDVTableWithPartialUpdateMergeEngine(String changelogProducer) throws Exception {
sql(
String.format(
"CREATE TABLE T (id INT PRIMARY KEY NOT ENFORCED, v1 STRING, v2 STRING) "
+ "WITH ('deletion-vectors.enabled' = 'true', 'changelog-producer' = '%s', "
+ "'merge-engine'='partial-update')",
changelogProducer));

sql(
"INSERT INTO T VALUES (1, '111111111', '1'), (2, '2', CAST(NULL AS STRING)), (3, '3', '3'), (4, CAST(NULL AS STRING), '4')");

sql("INSERT INTO T VALUES (2, CAST(NULL AS STRING), '2'), (3, '3_1', '3_1')");

sql(
"INSERT INTO T VALUES (2, '2_1', CAST(NULL AS STRING)), (4, '4', CAST(NULL AS STRING))");

// test batch read
assertThat(batchSql("SELECT * FROM T"))
.containsExactlyInAnyOrder(
Row.of(1, "111111111", "1"),
Row.of(2, "2_1", "2"),
Row.of(3, "3_1", "3_1"),
Row.of(4, "4", "4"));

// test streaming read
if (changelogProducer.equals("lookup")) {
try (BlockingIterator<Row, Row> iter =
streamSqlBlockIter(
"SELECT * FROM T /*+ OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id' = '4') */")) {
assertThat(iter.collect(8))
.containsExactlyInAnyOrder(
Row.ofKind(RowKind.INSERT, 1, "111111111", "1"),
Row.ofKind(RowKind.INSERT, 2, "2", "2"),
Row.ofKind(RowKind.INSERT, 3, "3_1", "3_1"),
Row.ofKind(RowKind.INSERT, 4, null, "4"),
Row.ofKind(RowKind.UPDATE_BEFORE, 2, "2", "2"),
Row.ofKind(RowKind.UPDATE_AFTER, 2, "2_1", "2"),
Row.ofKind(RowKind.UPDATE_BEFORE, 4, null, "4"),
Row.ofKind(RowKind.UPDATE_AFTER, 4, "4", "4"));
}
}
}
}

0 comments on commit b6dd27a

Please sign in to comment.