Skip to content

Commit

Permalink
[core] Support delete by default in partial updates
Browse files Browse the repository at this point in the history
  • Loading branch information
yunfengzhou-hub committed Jun 25, 2024
1 parent d2f1bf1 commit e2fa258
Show file tree
Hide file tree
Showing 3 changed files with 156 additions and 9 deletions.
2 changes: 1 addition & 1 deletion docs/content/primary-key-table/merge-engine.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ For streaming queries, `partial-update` merge engine must be used together with
{{< /hint >}}

{{< hint info >}}
By default, Partial update can not accept delete records, you can choose one of the following solutions:
By default, Partial update will ignore -U records and remove the entire row when -D records are received. You can also choose one of the following solutions:
- Configure 'ignore-delete' to ignore delete records.
- Configure 'sequence-group's to retract partial columns.
{{< /hint >}}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public class PartialUpdateMergeFunction implements MergeFunction<KeyValue> {

public static final String SEQUENCE_GROUP = "sequence-group";

private final boolean[] primaryKeyMask;
private final InternalRow.FieldGetter[] getters;
private final boolean ignoreDelete;
private final Map<Integer, FieldsComparator> fieldSeqComparators;
Expand All @@ -70,11 +71,13 @@ public class PartialUpdateMergeFunction implements MergeFunction<KeyValue> {
private KeyValue reused;

protected PartialUpdateMergeFunction(
boolean[] primaryKeyMask,
InternalRow.FieldGetter[] getters,
boolean ignoreDelete,
Map<Integer, FieldsComparator> fieldSeqComparators,
Map<Integer, FieldAggregator> fieldAggregators,
boolean fieldSequenceEnabled) {
this.primaryKeyMask = primaryKeyMask;
this.getters = getters;
this.ignoreDelete = ignoreDelete;
this.fieldSeqComparators = fieldSeqComparators;
Expand Down Expand Up @@ -106,15 +109,11 @@ public void add(KeyValue kv) {
return;
}

String msg =
String.join(
"\n",
"By default, Partial update can not accept delete records,"
+ " you can choose one of the following solutions:",
"1. Configure 'ignore-delete' to ignore delete records.",
"2. Configure 'sequence-group's to retract partial columns.");
if (kv.valueKind() == RowKind.DELETE) {
retract(kv);
}

throw new IllegalArgumentException(msg);
return;
}

latestSequenceNumber = kv.sequenceNumber();
Expand Down Expand Up @@ -182,6 +181,23 @@ private boolean isEmptySequenceGroup(KeyValue kv, FieldsComparator comparator) {
return true;
}

private void retract(KeyValue kv) {
for (int i = 0; i < getters.length; i++) {
FieldAggregator aggregator = fieldAggregators.get(i);
if (primaryKeyMask[i]) {
row.setField(i, getters[i].getFieldOrNull(kv.value()));
} else if (aggregator == null) {
// retract normal field
row.setField(i, null);
} else {
// retract agg field
Object accumulator = getters[i].getFieldOrNull(row);
row.setField(
i, aggregator.retract(accumulator, getters[i].getFieldOrNull(kv.value())));
}
}
}

private void retractWithSequenceGroup(KeyValue kv) {
Set<Integer> updatedSequenceFields = new HashSet<>();

Expand Down Expand Up @@ -235,9 +251,29 @@ public KeyValue getResult() {
if (reused == null) {
reused = new KeyValue();
}
if (isAllValueFieldNull()) {
row.setRowKind(RowKind.DELETE);
return reused.replace(currentKey, latestSequenceNumber, RowKind.DELETE, row);
}
row.setRowKind(RowKind.INSERT);
return reused.replace(currentKey, latestSequenceNumber, RowKind.INSERT, row);
}

private boolean isAllValueFieldNull() {
for (int i = 0; i < getters.length; i++) {
if (getters[i].getFieldOrNull(row) == null) {
continue;
}

if (primaryKeyMask[i]) {
continue;
}

return false;
}
return true;
}

public static MergeFunctionFactory<KeyValue> factory(
Options options, RowType rowType, List<String> primaryKeys) {
return new Factory(options, rowType, primaryKeys);
Expand All @@ -256,6 +292,8 @@ private static class Factory implements MergeFunctionFactory<KeyValue> {

private final Map<Integer, FieldAggregator> fieldAggregators;

private final boolean[] primaryKeyMask;

private Factory(Options options, RowType rowType, List<String> primaryKeys) {
this.ignoreDelete = options.get(CoreOptions.IGNORE_DELETE);
this.rowType = rowType;
Expand Down Expand Up @@ -310,6 +348,11 @@ private Factory(Options options, RowType rowType, List<String> primaryKeys) {
throw new IllegalArgumentException(
"Must use sequence group for aggregation functions.");
}

primaryKeyMask = new boolean[rowType.getFieldCount()];
for (String primaryKey : primaryKeys) {
primaryKeyMask[rowType.getFieldIndex(primaryKey)] = true;
}
}

@Override
Expand Down Expand Up @@ -357,20 +400,24 @@ public MergeFunction<KeyValue> create(@Nullable int[][] projection) {
newRowType, newSequenceFields));
}
});
boolean[] primaryKeyMask = new boolean[projects.length];
for (int i = 0; i < projects.length; i++) {
if (fieldAggregators.containsKey(projects[i])) {
projectedAggregators.put(i, fieldAggregators.get(projects[i]));
}
primaryKeyMask[i] = this.primaryKeyMask[projects[i]];
}

return new PartialUpdateMergeFunction(
primaryKeyMask,
createFieldGetters(Projection.of(projection).project(tableTypes)),
ignoreDelete,
projectedSeqComparators,
projectedAggregators,
!fieldSeqComparators.isEmpty());
} else {
return new PartialUpdateMergeFunction(
primaryKeyMask,
createFieldGetters(tableTypes),
ignoreDelete,
fieldSeqComparators,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1009,6 +1009,106 @@ public void testAuditLog() throws Exception {
assertThat(result).containsExactlyInAnyOrder("+I[20, 2]", "+I[30, 1]", "+I[10, 1]");
}

@Test
public void testPartialUpdate() throws Exception {
RowType rowType =
RowType.of(
new DataType[] {
DataTypes.INT(), DataTypes.INT(), DataTypes.INT(), DataTypes.INT()
},
new String[] {"pt", "a", "b", "c"});
FileStoreTable table =
createFileStoreTable(
options -> {
options.set("merge-engine", "partial-update");
},
rowType);
Function<InternalRow, String> rowToString = row -> internalRowToString(row, rowType);
SnapshotReader snapshotReader = table.newSnapshotReader();
TableRead read = table.newRead();
StreamTableWrite write = table.newWrite("");
StreamTableCommit commit = table.newCommit("");
// 1. inserts
write.write(GenericRow.of(1, 1, 3, 3));
write.write(GenericRow.of(1, 1, 1, 1));
write.write(GenericRow.of(1, 1, 2, 2));
commit.commit(0, write.prepareCommit(true, 0));
List<String> result =
getResult(read, toSplits(snapshotReader.read().dataSplits()), rowToString);
assertThat(result).containsExactlyInAnyOrder("+I[1, 1, 2, 2]");

// 2. Update Before
write.write(GenericRow.ofKind(RowKind.UPDATE_BEFORE, 1, 1, 2, 2));
commit.commit(1, write.prepareCommit(true, 1));
result = getResult(read, toSplits(snapshotReader.read().dataSplits()), rowToString);
assertThat(result).containsExactlyInAnyOrder("+I[1, 1, 2, 2]");

// 3. Update After
write.write(GenericRow.ofKind(RowKind.UPDATE_AFTER, 1, 1, 2, 3));
commit.commit(2, write.prepareCommit(true, 2));
result = getResult(read, toSplits(snapshotReader.read().dataSplits()), rowToString);
assertThat(result).containsExactlyInAnyOrder("+I[1, 1, 2, 3]");

// 4. Retracts
write.write(GenericRow.ofKind(RowKind.DELETE, 1, 1, 2, 3));
commit.commit(3, write.prepareCommit(true, 3));
result = getResult(read, toSplits(snapshotReader.read().dataSplits()), rowToString);
assertThat(result).isEmpty();
write.close();
commit.close();
}

@Test
public void testPartialUpdateWithAgg() throws Exception {
RowType rowType =
RowType.of(
new DataType[] {
DataTypes.INT(), DataTypes.INT(), DataTypes.INT(), DataTypes.INT()
},
new String[] {"pt", "a", "b", "c"});
FileStoreTable table =
createFileStoreTable(
options -> {
options.set("merge-engine", "partial-update");
options.set("fields.a.sequence-group", "c");
options.set("fields.c.aggregate-function", "sum");
},
rowType);
Function<InternalRow, String> rowToString = row -> internalRowToString(row, rowType);
SnapshotReader snapshotReader = table.newSnapshotReader();
TableRead read = table.newRead();
StreamTableWrite write = table.newWrite("");
StreamTableCommit commit = table.newCommit("");
// 1. inserts
write.write(GenericRow.of(1, 1, 3, 3));
write.write(GenericRow.of(1, 1, 1, 1));
write.write(GenericRow.of(1, 1, 2, 2));
commit.commit(0, write.prepareCommit(true, 0));
List<String> result =
getResult(read, toSplits(snapshotReader.read().dataSplits()), rowToString);
assertThat(result).containsExactlyInAnyOrder("+I[1, 1, 2, 6]");

// 2. Update Before
write.write(GenericRow.ofKind(RowKind.UPDATE_BEFORE, 1, 1, 2, 2));
commit.commit(1, write.prepareCommit(true, 1));
result = getResult(read, toSplits(snapshotReader.read().dataSplits()), rowToString);
assertThat(result).containsExactlyInAnyOrder("+I[1, 1, 2, 4]");

// 3. Update After
write.write(GenericRow.ofKind(RowKind.UPDATE_AFTER, 1, 1, 2, 3));
commit.commit(2, write.prepareCommit(true, 2));
result = getResult(read, toSplits(snapshotReader.read().dataSplits()), rowToString);
assertThat(result).containsExactlyInAnyOrder("+I[1, 1, 2, 7]");

// 4. Retracts
write.write(GenericRow.ofKind(RowKind.DELETE, 1, 1, 2, 3));
commit.commit(3, write.prepareCommit(true, 3));
result = getResult(read, toSplits(snapshotReader.read().dataSplits()), rowToString);
assertThat(result).containsExactlyInAnyOrder("+I[1, 1, 2, 4]");
write.close();
commit.close();
}

@Test
public void testAggMergeFunc() throws Exception {
RowType rowType =
Expand Down

0 comments on commit e2fa258

Please sign in to comment.