Skip to content

Commit

Permalink
[core] Support delete row in partial updates
Browse files Browse the repository at this point in the history
  • Loading branch information
yunfengzhou-hub committed Jul 16, 2024
1 parent e4ba677 commit 791d3bf
Show file tree
Hide file tree
Showing 5 changed files with 200 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ but only returns input records.)
By default, Partial update can not accept delete records, you can choose one of the following solutions:

- Configure 'ignore-delete' to ignore delete records.
- Configure 'partial-update.remove-record-on-delete' to remove the whole row when receiving delete records.
- Configure 'sequence-group's to retract partial columns.
{{< /hint >}}

Expand Down
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,12 @@
<td>Integer</td>
<td>Turn off the dictionary encoding for all fields in parquet.</td>
</tr>
<tr>
<td><h5>partial-update.remove-record-on-delete</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Whether to remove the whole row in partial-update engine when -D records are received.</td>
</tr>
<tr>
<td><h5>partition</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,14 @@ public class CoreOptions implements Serializable {
"The field that generates the sequence number for primary key table,"
+ " the sequence number determines which data is the most recent.");

@Immutable
public static final ConfigOption<Boolean> PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE =
key("partial-update.remove-record-on-delete")
.booleanType()
.defaultValue(false)
.withDescription(
"Whether to remove the whole row in partial-update engine when -D records are received.");

@Immutable
public static final ConfigOption<String> ROWKIND_FIELD =
key("rowkind.field")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FieldsComparator;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.Projection;
import org.apache.paimon.utils.UserDefinedSeqComparator;

Expand All @@ -48,6 +49,7 @@

import static org.apache.paimon.CoreOptions.FIELDS_PREFIX;
import static org.apache.paimon.CoreOptions.FIELDS_SEPARATOR;
import static org.apache.paimon.CoreOptions.PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE;
import static org.apache.paimon.utils.InternalRowUtils.createFieldGetters;

/**
Expand All @@ -58,28 +60,34 @@ public class PartialUpdateMergeFunction implements MergeFunction<KeyValue> {

public static final String SEQUENCE_GROUP = "sequence-group";

private final boolean[] primaryKeyMask;
private final InternalRow.FieldGetter[] getters;
private final boolean ignoreDelete;
private final Map<Integer, FieldsComparator> fieldSeqComparators;
private final boolean fieldSequenceEnabled;
private final Map<Integer, FieldAggregator> fieldAggregators;
private final boolean removeRecordOnDelete;

private InternalRow currentKey;
private long latestSequenceNumber;
private GenericRow row;
private KeyValue reused;

protected PartialUpdateMergeFunction(
boolean[] primaryKeyMask,
InternalRow.FieldGetter[] getters,
boolean ignoreDelete,
Map<Integer, FieldsComparator> fieldSeqComparators,
Map<Integer, FieldAggregator> fieldAggregators,
boolean fieldSequenceEnabled) {
boolean fieldSequenceEnabled,
boolean removeRecordOnDelete) {
this.primaryKeyMask = primaryKeyMask;
this.getters = getters;
this.ignoreDelete = ignoreDelete;
this.fieldSeqComparators = fieldSeqComparators;
this.fieldAggregators = fieldAggregators;
this.fieldSequenceEnabled = fieldSequenceEnabled;
this.removeRecordOnDelete = removeRecordOnDelete;
}

@Override
Expand All @@ -106,6 +114,14 @@ public void add(KeyValue kv) {
return;
}

if (removeRecordOnDelete) {
if (kv.valueKind() == RowKind.DELETE) {
clearRow(kv);
}
// ignore -U records
return;
}

String msg =
String.join(
"\n",
Expand Down Expand Up @@ -182,6 +198,23 @@ private boolean isEmptySequenceGroup(KeyValue kv, FieldsComparator comparator) {
return true;
}

private void clearRow(KeyValue kv) {
for (int i = 0; i < getters.length; i++) {
FieldAggregator aggregator = fieldAggregators.get(i);
if (primaryKeyMask[i]) {
row.setField(i, getters[i].getFieldOrNull(kv.value()));
} else if (aggregator == null) {
// retract normal field
row.setField(i, null);
} else {
// retract agg field
Object accumulator = getters[i].getFieldOrNull(row);
row.setField(
i, aggregator.retract(accumulator, getters[i].getFieldOrNull(kv.value())));
}
}
}

private void retractWithSequenceGroup(KeyValue kv) {
Set<Integer> updatedSequenceFields = new HashSet<>();

Expand Down Expand Up @@ -235,9 +268,29 @@ public KeyValue getResult() {
if (reused == null) {
reused = new KeyValue();
}
if (removeRecordOnDelete && isRowCleared()) {
row.setRowKind(RowKind.DELETE);
return reused.replace(currentKey, latestSequenceNumber, RowKind.DELETE, row);
}
row.setRowKind(RowKind.INSERT);
return reused.replace(currentKey, latestSequenceNumber, RowKind.INSERT, row);
}

private boolean isRowCleared() {
for (int i = 0; i < getters.length; i++) {
if (getters[i].getFieldOrNull(row) == null) {
continue;
}

if (primaryKeyMask[i]) {
continue;
}

return false;
}
return true;
}

public static MergeFunctionFactory<KeyValue> factory(
Options options, RowType rowType, List<String> primaryKeys) {
return new Factory(options, rowType, primaryKeys);
Expand All @@ -256,6 +309,10 @@ private static class Factory implements MergeFunctionFactory<KeyValue> {

private final Map<Integer, FieldAggregator> fieldAggregators;

private final boolean[] primaryKeyMask;

private final boolean removeRecordOnDelete;

private Factory(Options options, RowType rowType, List<String> primaryKeys) {
this.ignoreDelete = options.get(CoreOptions.IGNORE_DELETE);
this.rowType = rowType;
Expand Down Expand Up @@ -310,6 +367,24 @@ private Factory(Options options, RowType rowType, List<String> primaryKeys) {
throw new IllegalArgumentException(
"Must use sequence group for aggregation functions.");
}

primaryKeyMask = new boolean[rowType.getFieldCount()];
for (String primaryKey : primaryKeys) {
primaryKeyMask[rowType.getFieldIndex(primaryKey)] = true;
}

removeRecordOnDelete = options.get(PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE);

Preconditions.checkState(
!(removeRecordOnDelete && ignoreDelete),
String.format(
"%s and %s have conflicting behavior so should not be enabled at the same time.",
CoreOptions.IGNORE_DELETE, PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE));
Preconditions.checkState(
!removeRecordOnDelete || fieldSeqComparators.isEmpty(),
String.format(
"sequence group and %s have conflicting behavior so should not be enabled at the same time.",
PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE));
}

@Override
Expand Down Expand Up @@ -357,25 +432,31 @@ public MergeFunction<KeyValue> create(@Nullable int[][] projection) {
newRowType, newSequenceFields));
}
});
boolean[] primaryKeyMask = new boolean[projects.length];
for (int i = 0; i < projects.length; i++) {
if (fieldAggregators.containsKey(projects[i])) {
projectedAggregators.put(i, fieldAggregators.get(projects[i]));
}
primaryKeyMask[i] = this.primaryKeyMask[projects[i]];
}

return new PartialUpdateMergeFunction(
primaryKeyMask,
createFieldGetters(Projection.of(projection).project(tableTypes)),
ignoreDelete,
projectedSeqComparators,
projectedAggregators,
!fieldSeqComparators.isEmpty());
!fieldSeqComparators.isEmpty(),
removeRecordOnDelete);
} else {
return new PartialUpdateMergeFunction(
primaryKeyMask,
createFieldGetters(tableTypes),
ignoreDelete,
fieldSeqComparators,
fieldAggregators,
!fieldSeqComparators.isEmpty());
!fieldSeqComparators.isEmpty(),
removeRecordOnDelete);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1023,6 +1023,107 @@ public void testAuditLog() throws Exception {
assertThat(result).containsExactlyInAnyOrder("+I[20, 2]", "+I[30, 1]", "+I[10, 1]");
}

@Test
public void testPartialUpdateRemoveRecordOnDelete() throws Exception {
RowType rowType =
RowType.of(
new DataType[] {
DataTypes.INT(), DataTypes.INT(), DataTypes.INT(), DataTypes.INT()
},
new String[] {"pt", "a", "b", "c"});
FileStoreTable table =
createFileStoreTable(
options -> {
options.set("merge-engine", "partial-update");
options.set("partial-update.remove-record-on-delete", "true");
},
rowType);
Function<InternalRow, String> rowToString = row -> internalRowToString(row, rowType);
SnapshotReader snapshotReader = table.newSnapshotReader();
TableRead read = table.newRead();
StreamTableWrite write = table.newWrite("");
StreamTableCommit commit = table.newCommit("");
// 1. inserts
write.write(GenericRow.of(1, 1, 3, 3));
write.write(GenericRow.of(1, 1, 1, 1));
write.write(GenericRow.of(1, 1, 2, 2));
commit.commit(0, write.prepareCommit(true, 0));
List<String> result =
getResult(read, toSplits(snapshotReader.read().dataSplits()), rowToString);
assertThat(result).containsExactlyInAnyOrder("+I[1, 1, 2, 2]");

// 2. Update Before
write.write(GenericRow.ofKind(RowKind.UPDATE_BEFORE, 1, 1, 2, 2));
commit.commit(1, write.prepareCommit(true, 1));
result = getResult(read, toSplits(snapshotReader.read().dataSplits()), rowToString);
assertThat(result).containsExactlyInAnyOrder("+I[1, 1, 2, 2]");

// 3. Update After
write.write(GenericRow.ofKind(RowKind.UPDATE_AFTER, 1, 1, 2, 3));
commit.commit(2, write.prepareCommit(true, 2));
result = getResult(read, toSplits(snapshotReader.read().dataSplits()), rowToString);
assertThat(result).containsExactlyInAnyOrder("+I[1, 1, 2, 3]");

// 4. Retracts
write.write(GenericRow.ofKind(RowKind.DELETE, 1, 1, 2, 3));
commit.commit(3, write.prepareCommit(true, 3));
result = getResult(read, toSplits(snapshotReader.read().dataSplits()), rowToString);
assertThat(result).isEmpty();
write.close();
commit.close();
}

@Test
public void testPartialUpdateWithAgg() throws Exception {
RowType rowType =
RowType.of(
new DataType[] {
DataTypes.INT(), DataTypes.INT(), DataTypes.INT(), DataTypes.INT()
},
new String[] {"pt", "a", "b", "c"});
FileStoreTable table =
createFileStoreTable(
options -> {
options.set("merge-engine", "partial-update");
options.set("fields.a.sequence-group", "c");
options.set("fields.c.aggregate-function", "sum");
},
rowType);
Function<InternalRow, String> rowToString = row -> internalRowToString(row, rowType);
SnapshotReader snapshotReader = table.newSnapshotReader();
TableRead read = table.newRead();
StreamTableWrite write = table.newWrite("");
StreamTableCommit commit = table.newCommit("");
// 1. inserts
write.write(GenericRow.of(1, 1, 3, 3));
write.write(GenericRow.of(1, 1, 1, 1));
write.write(GenericRow.of(1, 1, 2, 2));
commit.commit(0, write.prepareCommit(true, 0));
List<String> result =
getResult(read, toSplits(snapshotReader.read().dataSplits()), rowToString);
assertThat(result).containsExactlyInAnyOrder("+I[1, 1, 2, 6]");

// 2. Update Before
write.write(GenericRow.ofKind(RowKind.UPDATE_BEFORE, 1, 1, 2, 2));
commit.commit(1, write.prepareCommit(true, 1));
result = getResult(read, toSplits(snapshotReader.read().dataSplits()), rowToString);
assertThat(result).containsExactlyInAnyOrder("+I[1, 1, 2, 4]");

// 3. Update After
write.write(GenericRow.ofKind(RowKind.UPDATE_AFTER, 1, 1, 2, 3));
commit.commit(2, write.prepareCommit(true, 2));
result = getResult(read, toSplits(snapshotReader.read().dataSplits()), rowToString);
assertThat(result).containsExactlyInAnyOrder("+I[1, 1, 2, 7]");

// 4. Retracts
write.write(GenericRow.ofKind(RowKind.DELETE, 1, 1, 2, 3));
commit.commit(3, write.prepareCommit(true, 3));
result = getResult(read, toSplits(snapshotReader.read().dataSplits()), rowToString);
assertThat(result).containsExactlyInAnyOrder("+I[1, 1, 2, 4]");
write.close();
commit.close();
}

@Test
public void testAggMergeFunc() throws Exception {
RowType rowType =
Expand Down

0 comments on commit 791d3bf

Please sign in to comment.