Skip to content

Commit

Permalink
[core] Support ignore delete/update_before records in deduplicate engine
Browse files Browse the repository at this point in the history
  • Loading branch information
yuzelin committed Dec 13, 2023
1 parent 223cccc commit 9afa6e2
Show file tree
Hide file tree
Showing 10 changed files with 87 additions and 15 deletions.
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,12 @@
<td>Duration</td>
<td>The TTL in rocksdb index for cross partition upsert (primary keys not contain all partition fields), this can avoid maintaining too many indexes and lead to worse and worse performance, but please note that this may also cause data duplication.</td>
</tr>
<tr>
<td><h5>deduplicate.ignore-delete</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Whether to ignore delete records in deduplicate mode.</td>
</tr>
<tr>
<td><h5>dynamic-bucket.assigner-parallelism</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,12 @@ public class CoreOptions implements Serializable {
.defaultValue(MergeEngine.DEDUPLICATE)
.withDescription("Specify the merge engine for table with primary key.");

public static final ConfigOption<Boolean> DEDUPLICATE_IGNORE_DELETE =
key("deduplicate.ignore-delete")
.booleanType()
.defaultValue(false)
.withDescription("Whether to ignore delete records in deduplicate mode.");

public static final ConfigOption<Boolean> PARTIAL_UPDATE_IGNORE_DELETE =
key("partial-update.ignore-delete")
.booleanType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@

package org.apache.paimon.mergetree.compact;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.KeyValue;
import org.apache.paimon.options.Options;

import javax.annotation.Nullable;

Expand All @@ -28,9 +30,13 @@
*/
public class DeduplicateMergeFunction implements MergeFunction<KeyValue> {

private final boolean ignoreDelete;

private KeyValue latestKv;

protected DeduplicateMergeFunction() {}
protected DeduplicateMergeFunction(boolean ignoreDelete) {
this.ignoreDelete = ignoreDelete;
}

@Override
public void reset() {
Expand All @@ -39,6 +45,9 @@ public void reset() {

@Override
public void add(KeyValue kv) {
if (ignoreDelete && kv.valueKind().isRetract()) {
return;
}
latestKv = kv;
}

Expand All @@ -49,16 +58,26 @@ public KeyValue getResult() {
}

public static MergeFunctionFactory<KeyValue> factory() {
return new Factory();
return new Factory(new Options());
}

public static MergeFunctionFactory<KeyValue> factory(Options options) {
return new Factory(options);
}

private static class Factory implements MergeFunctionFactory<KeyValue> {

private static final long serialVersionUID = 1L;

private final Options options;

private Factory(Options options) {
this.options = options;
}

@Override
public MergeFunction<KeyValue> create(@Nullable int[][] projection) {
return new DeduplicateMergeFunction();
return new DeduplicateMergeFunction(options.get(CoreOptions.DEDUPLICATE_IGNORE_DELETE));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public static MergeFunctionFactory<KeyValue> createMergeFunctionFactory(

switch (mergeEngine) {
case DEDUPLICATE:
return DeduplicateMergeFunction.factory();
return DeduplicateMergeFunction.factory(conf);
case PARTIAL_UPDATE:
return PartialUpdateMergeFunction.factory(conf, rowType, tableSchema.primaryKeys());
case AGGREGATE:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ public abstract static class WithDeduplicateMergeFunctionTestBase

@Override
protected MergeFunction<KeyValue> createMergeFunction() {
return new DeduplicateMergeFunction();
return DeduplicateMergeFunction.factory().create();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public static class WithDeduplicateMergeFunctionTest

@Override
protected MergeFunction<KeyValue> createMergeFunction() {
return new DeduplicateMergeFunction();
return DeduplicateMergeFunction.factory().create();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ protected List<ReusingTestData> getExpected(List<ReusingTestData> input) {

@Override
protected MergeFunction<KeyValue> createMergeFunction() {
return new DeduplicateMergeFunction();
return DeduplicateMergeFunction.factory().create();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.flink.action.cdc.kafka;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.FileSystemCatalogOptions;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.types.DataType;
Expand Down Expand Up @@ -971,10 +972,11 @@ public void testCatalogAndTableConfig() {
.containsExactlyEntriesOf(Collections.singletonMap("table-key", "table-value"));
}

@Test
@ParameterizedTest(name = "ignore-delete = {0}")
@ValueSource(booleans = {true, false})
@Timeout(60)
public void testCDCOperations() throws Exception {
final String topic = "event-insert";
public void testCDCOperations(boolean ignoreDelete) throws Exception {
final String topic = "event-insert" + UUID.randomUUID();
createTestTopic(topic, 1, 1);

// ---------- Write the Canal json into Kafka -------------------
Expand All @@ -984,8 +986,11 @@ public void testCDCOperations() throws Exception {
kafkaConfig.put(VALUE_FORMAT.key(), "canal-json");
kafkaConfig.put(TOPIC.key(), topic);

Map<String, String> tableConfig = getBasicTableConfig();
tableConfig.put(CoreOptions.DEDUPLICATE_IGNORE_DELETE.key(), String.valueOf(ignoreDelete));

KafkaSyncTableAction action =
syncTableActionBuilder(kafkaConfig).withTableConfig(getBasicTableConfig()).build();
syncTableActionBuilder(kafkaConfig).withTableConfig(tableConfig).build();
runActionWithDefaultEnv(action);

FileStoreTable table = getFileStoreTable(tableName);
Expand Down Expand Up @@ -1035,10 +1040,12 @@ public void testCDCOperations() throws Exception {

// For the DELETE operation
List<String> expectedDelete =
Arrays.asList(
"+I[1, 2, second, NULL, NULL, NULL, NULL]",
"+I[1, 9, nine, 90000000000, 99999.999, [110, 105, 110, 101, 46, 98, 105, 110], 9.9]",
"+I[2, 4, four, NULL, NULL, NULL, NULL]");
ignoreDelete
? expectedUpdate
: Arrays.asList(
"+I[1, 2, second, NULL, NULL, NULL, NULL]",
"+I[1, 9, nine, 90000000000, 99999.999, [110, 105, 110, 101, 46, 98, 105, 110], 9.9]",
"+I[2, 4, four, NULL, NULL, NULL, NULL]");
waitForResult(expectedDelete, table, rowType, primaryKeys);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.paimon.CoreOptions;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.BlockingIterator;

import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
Expand Down Expand Up @@ -368,4 +369,21 @@ public void testRowKindField() {
sql("INSERT INTO R_T VALUES (1, 2, '-D')");
assertThat(sql("SELECT * FROM R_T")).isEmpty();
}

@Test
public void testIgnoreDelete() throws Exception {
sql(
"CREATE TABLE ignore_delete (pk INT PRIMARY KEY NOT ENFORCED, v STRING) "
+ "WITH ('deduplicate.ignore-delete' = 'true')");
BlockingIterator<Row, Row> iterator = streamSqlBlockIter("SELECT * FROM ignore_delete");

sql("INSERT INTO ignore_delete VALUES (1, 'A'), (2, 'B')");
sql("DELETE FROM ignore_delete WHERE pk = 1");
sql("INSERT INTO ignore_delete VALUES (1, 'B')");

assertThat(iterator.collect(2))
.containsExactlyInAnyOrder(
Row.ofKind(RowKind.INSERT, 1, "B"), Row.ofKind(RowKind.INSERT, 2, "B"));
iterator.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -505,4 +505,20 @@ public void testDynamicPartitionPruningNotWork() throws Exception {
Row.of(22, 202L, "ccc", 2, 2, "b"));
iterator.close();
}

@Test
public void testIgnoreDelete() {
sql(
"CREATE TABLE ignore_delete (pk INT PRIMARY KEY NOT ENFORCED, v STRING) "
+ "WITH ('deduplicate.ignore-delete' = 'true')");

sql("INSERT INTO ignore_delete VALUES (1, 'A')");
assertThat(sql("SELECT * FROM ignore_delete")).containsExactly(Row.of(1, "A"));

sql("DELETE FROM ignore_delete WHERE pk = 1");
assertThat(sql("SELECT * FROM ignore_delete")).containsExactly(Row.of(1, "A"));

sql("INSERT INTO ignore_delete VALUES (1, 'B')");
assertThat(sql("SELECT * FROM ignore_delete")).containsExactly(Row.of(1, "B"));
}
}

0 comments on commit 9afa6e2

Please sign in to comment.