Skip to content

Commit

Permalink
fix local merge
Browse files Browse the repository at this point in the history
  • Loading branch information
yuzelin committed Apr 2, 2024
1 parent e839416 commit 86b9477
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ public class LocalMergeOperator extends AbstractStreamOperator<InternalRow>

private static final long serialVersionUID = 1L;

TableSchema schema;
private final TableSchema schema;
private final boolean ignoreDelete;

private transient Projection keyProjection;
private transient RecordComparator keyComparator;
Expand All @@ -76,6 +77,7 @@ public LocalMergeOperator(TableSchema schema) {
schema.primaryKeys().size() > 0,
"LocalMergeOperator currently only support tables with primary keys");
this.schema = schema;
this.ignoreDelete = CoreOptions.fromMap(schema.options()).ignoreDelete();
setChainingStrategy(ChainingStrategy.ALWAYS);
}

Expand Down Expand Up @@ -137,6 +139,10 @@ public void processElement(StreamRecord<InternalRow> record) throws Exception {

RowKind rowKind =
rowKindGenerator == null ? row.getRowKind() : rowKindGenerator.generate(row);
if (ignoreDelete && rowKind.isRetract()) {
return;
}

// row kind must be INSERT when it is divided into key and value
row.setRowKind(RowKind.INSERT);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -420,30 +422,34 @@ public void testPartialUpdateProjectionPushDownWithDeleteMessage() throws Except
insert2.close();
}

@Test
public void testIgnoreDelete() throws Exception {
@ParameterizedTest(name = "localMergeEnabled = {0}")
@ValueSource(booleans = {true, false})
public void testIgnoreDelete(boolean localMerge) throws Exception {
sql(
"CREATE TABLE ignore_delete (pk INT PRIMARY KEY NOT ENFORCED, a INT, g INT) WITH ("
"CREATE TABLE ignore_delete (pk INT PRIMARY KEY NOT ENFORCED, a STRING, b STRING) WITH ("
+ " 'merge-engine' = 'partial-update',"
+ " 'ignore-delete' = 'true',"
+ " 'fields.a.aggregate-function' = 'sum',"
+ " 'fields.g.sequence-group'='a')");
+ " 'ignore-delete' = 'true'"
+ ")");
if (localMerge) {
sql("ALTER TABLE ignore_delete SET ('local-merge-buffer-size' = '256 kb')");
}

String id =
TestValuesTableFactory.registerData(
Arrays.asList(
Row.ofKind(RowKind.INSERT, 1, 10, 1),
Row.ofKind(RowKind.DELETE, 1, 10, 2),
Row.ofKind(RowKind.INSERT, 1, 20, 3)));
Row.ofKind(RowKind.INSERT, 1, null, "apple"),
Row.ofKind(RowKind.DELETE, 1, null, "apple"),
Row.ofKind(RowKind.INSERT, 1, "A", null)));
streamSqlIter(
"CREATE TEMPORARY TABLE input (pk INT PRIMARY KEY NOT ENFORCED, a INT, g INT) "
"CREATE TEMPORARY TABLE input (pk INT PRIMARY KEY NOT ENFORCED, a STRING, b STRING) "
+ "WITH ('connector'='values', 'bounded'='true', 'data-id'='%s', "
+ "'changelog-mode' = 'I,D')",
id)
.close();
sEnv.executeSql("INSERT INTO ignore_delete SELECT * FROM input").await();

assertThat(sql("SELECT * FROM ignore_delete")).containsExactlyInAnyOrder(Row.of(1, 30, 3));
assertThat(sql("SELECT * FROM ignore_delete"))
.containsExactlyInAnyOrder(Row.of(1, "A", "apple"));
}

@Test
Expand All @@ -457,7 +463,7 @@ public void testIgnoreDeleteInReader() throws Exception {
// write delete records
sql("DELETE FROM ignore_delete WHERE pk = 1");
sql("INSERT INTO ignore_delete VALUES (1, 'A', CAST (NULL AS STRING))");
assertThat(batchSql("SELECT * FROM ignore_delete"))
assertThat(sql("SELECT * FROM ignore_delete"))
.containsExactlyInAnyOrder(Row.of(1, "A", null));

// force altering merge engine and read
Expand All @@ -477,7 +483,7 @@ public void testIgnoreDeleteInReader() throws Exception {
Collections.singletonList("pk"),
newOptions,
null));
assertThat(batchSql("SELECT * FROM ignore_delete"))
assertThat(sql("SELECT * FROM ignore_delete"))
.containsExactlyInAnyOrder(Row.of(1, "A", "apple"));
}
}

0 comments on commit 86b9477

Please sign in to comment.