Skip to content

Commit

Permalink
[core] Support delete row in partial updates
Browse files Browse the repository at this point in the history
  • Loading branch information
yunfengzhou-hub committed Jul 16, 2024
1 parent e4ba677 commit dfd50df
Show file tree
Hide file tree
Showing 5 changed files with 152 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ but only returns input records.)
By default, Partial update can not accept delete records, you can choose one of the following solutions:

- Configure 'ignore-delete' to ignore delete records.
- Configure 'partial-update.remove-record-on-delete' to remove the whole row when receiving delete records.
- Configure 'sequence-group's to retract partial columns.
{{< /hint >}}

Expand Down
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,12 @@
<td>Integer</td>
<td>Turn off the dictionary encoding for all fields in parquet.</td>
</tr>
<tr>
<td><h5>partial-update.remove-record-on-delete</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Whether to remove the whole row in partial-update engine when -D records are received.</td>
</tr>
<tr>
<td><h5>partition</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,14 @@ public class CoreOptions implements Serializable {
"The field that generates the sequence number for primary key table,"
+ " the sequence number determines which data is the most recent.");

@Immutable
public static final ConfigOption<Boolean> PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE =
key("partial-update.remove-record-on-delete")
.booleanType()
.defaultValue(false)
.withDescription(
"Whether to remove the whole row in partial-update engine when -D records are received.");

@Immutable
public static final ConfigOption<String> ROWKIND_FIELD =
key("rowkind.field")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FieldsComparator;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.Projection;
import org.apache.paimon.utils.UserDefinedSeqComparator;

Expand All @@ -48,6 +49,7 @@

import static org.apache.paimon.CoreOptions.FIELDS_PREFIX;
import static org.apache.paimon.CoreOptions.FIELDS_SEPARATOR;
import static org.apache.paimon.CoreOptions.PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE;
import static org.apache.paimon.utils.InternalRowUtils.createFieldGetters;

/**
Expand All @@ -63,6 +65,7 @@ public class PartialUpdateMergeFunction implements MergeFunction<KeyValue> {
private final Map<Integer, FieldsComparator> fieldSeqComparators;
private final boolean fieldSequenceEnabled;
private final Map<Integer, FieldAggregator> fieldAggregators;
private final boolean removeRecordOnDelete;

private InternalRow currentKey;
private long latestSequenceNumber;
Expand All @@ -74,12 +77,14 @@ protected PartialUpdateMergeFunction(
boolean ignoreDelete,
Map<Integer, FieldsComparator> fieldSeqComparators,
Map<Integer, FieldAggregator> fieldAggregators,
boolean fieldSequenceEnabled) {
boolean fieldSequenceEnabled,
boolean removeRecordOnDelete) {
this.getters = getters;
this.ignoreDelete = ignoreDelete;
this.fieldSeqComparators = fieldSeqComparators;
this.fieldAggregators = fieldAggregators;
this.fieldSequenceEnabled = fieldSequenceEnabled;
this.removeRecordOnDelete = removeRecordOnDelete;
}

@Override
Expand All @@ -106,6 +111,14 @@ public void add(KeyValue kv) {
return;
}

if (removeRecordOnDelete) {
if (kv.valueKind() == RowKind.DELETE) {
row = null;
}
// ignore -U records
return;
}

String msg =
String.join(
"\n",
Expand Down Expand Up @@ -235,6 +248,9 @@ public KeyValue getResult() {
if (reused == null) {
reused = new KeyValue();
}
if (removeRecordOnDelete && row == null) {
return null;
}
return reused.replace(currentKey, latestSequenceNumber, RowKind.INSERT, row);
}

Expand All @@ -256,6 +272,8 @@ private static class Factory implements MergeFunctionFactory<KeyValue> {

private final Map<Integer, FieldAggregator> fieldAggregators;

private final boolean removeRecordOnDelete;

private Factory(Options options, RowType rowType, List<String> primaryKeys) {
this.ignoreDelete = options.get(CoreOptions.IGNORE_DELETE);
this.rowType = rowType;
Expand Down Expand Up @@ -310,6 +328,19 @@ private Factory(Options options, RowType rowType, List<String> primaryKeys) {
throw new IllegalArgumentException(
"Must use sequence group for aggregation functions.");
}

removeRecordOnDelete = options.get(PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE);

Preconditions.checkState(
!(removeRecordOnDelete && ignoreDelete),
String.format(
"%s and %s have conflicting behavior so should not be enabled at the same time.",
CoreOptions.IGNORE_DELETE, PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE));
Preconditions.checkState(
!removeRecordOnDelete || fieldSeqComparators.isEmpty(),
String.format(
"sequence group and %s have conflicting behavior so should not be enabled at the same time.",
PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE));
}

@Override
Expand Down Expand Up @@ -357,25 +388,22 @@ public MergeFunction<KeyValue> create(@Nullable int[][] projection) {
newRowType, newSequenceFields));
}
});
for (int i = 0; i < projects.length; i++) {
if (fieldAggregators.containsKey(projects[i])) {
projectedAggregators.put(i, fieldAggregators.get(projects[i]));
}
}

return new PartialUpdateMergeFunction(
createFieldGetters(Projection.of(projection).project(tableTypes)),
ignoreDelete,
projectedSeqComparators,
projectedAggregators,
!fieldSeqComparators.isEmpty());
!fieldSeqComparators.isEmpty(),
removeRecordOnDelete);
} else {
return new PartialUpdateMergeFunction(
createFieldGetters(tableTypes),
ignoreDelete,
fieldSeqComparators,
fieldAggregators,
!fieldSeqComparators.isEmpty());
!fieldSeqComparators.isEmpty(),
removeRecordOnDelete);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1023,6 +1023,107 @@ public void testAuditLog() throws Exception {
assertThat(result).containsExactlyInAnyOrder("+I[20, 2]", "+I[30, 1]", "+I[10, 1]");
}

@Test
public void testPartialUpdateRemoveRecordOnDelete() throws Exception {
RowType rowType =
RowType.of(
new DataType[] {
DataTypes.INT(), DataTypes.INT(), DataTypes.INT(), DataTypes.INT()
},
new String[] {"pt", "a", "b", "c"});
FileStoreTable table =
createFileStoreTable(
options -> {
options.set("merge-engine", "partial-update");
options.set("partial-update.remove-record-on-delete", "true");
},
rowType);
Function<InternalRow, String> rowToString = row -> internalRowToString(row, rowType);
SnapshotReader snapshotReader = table.newSnapshotReader();
TableRead read = table.newRead();
StreamTableWrite write = table.newWrite("");
StreamTableCommit commit = table.newCommit("");
// 1. inserts
write.write(GenericRow.of(1, 1, 3, 3));
write.write(GenericRow.of(1, 1, 1, 1));
write.write(GenericRow.of(1, 1, 2, 2));
commit.commit(0, write.prepareCommit(true, 0));
List<String> result =
getResult(read, toSplits(snapshotReader.read().dataSplits()), rowToString);
assertThat(result).containsExactlyInAnyOrder("+I[1, 1, 2, 2]");

// 2. Update Before
write.write(GenericRow.ofKind(RowKind.UPDATE_BEFORE, 1, 1, 2, 2));
commit.commit(1, write.prepareCommit(true, 1));
result = getResult(read, toSplits(snapshotReader.read().dataSplits()), rowToString);
assertThat(result).containsExactlyInAnyOrder("+I[1, 1, 2, 2]");

// 3. Update After
write.write(GenericRow.ofKind(RowKind.UPDATE_AFTER, 1, 1, 2, 3));
commit.commit(2, write.prepareCommit(true, 2));
result = getResult(read, toSplits(snapshotReader.read().dataSplits()), rowToString);
assertThat(result).containsExactlyInAnyOrder("+I[1, 1, 2, 3]");

// 4. Retracts
write.write(GenericRow.ofKind(RowKind.DELETE, 1, 1, 2, 3));
commit.commit(3, write.prepareCommit(true, 3));
result = getResult(read, toSplits(snapshotReader.read().dataSplits()), rowToString);
assertThat(result).isEmpty();
write.close();
commit.close();
}

@Test
public void testPartialUpdateWithAgg() throws Exception {
RowType rowType =
RowType.of(
new DataType[] {
DataTypes.INT(), DataTypes.INT(), DataTypes.INT(), DataTypes.INT()
},
new String[] {"pt", "a", "b", "c"});
FileStoreTable table =
createFileStoreTable(
options -> {
options.set("merge-engine", "partial-update");
options.set("fields.a.sequence-group", "c");
options.set("fields.c.aggregate-function", "sum");
},
rowType);
Function<InternalRow, String> rowToString = row -> internalRowToString(row, rowType);
SnapshotReader snapshotReader = table.newSnapshotReader();
TableRead read = table.newRead();
StreamTableWrite write = table.newWrite("");
StreamTableCommit commit = table.newCommit("");
// 1. inserts
write.write(GenericRow.of(1, 1, 3, 3));
write.write(GenericRow.of(1, 1, 1, 1));
write.write(GenericRow.of(1, 1, 2, 2));
commit.commit(0, write.prepareCommit(true, 0));
List<String> result =
getResult(read, toSplits(snapshotReader.read().dataSplits()), rowToString);
assertThat(result).containsExactlyInAnyOrder("+I[1, 1, 2, 6]");

// 2. Update Before
write.write(GenericRow.ofKind(RowKind.UPDATE_BEFORE, 1, 1, 2, 2));
commit.commit(1, write.prepareCommit(true, 1));
result = getResult(read, toSplits(snapshotReader.read().dataSplits()), rowToString);
assertThat(result).containsExactlyInAnyOrder("+I[1, 1, 2, 4]");

// 3. Update After
write.write(GenericRow.ofKind(RowKind.UPDATE_AFTER, 1, 1, 2, 3));
commit.commit(2, write.prepareCommit(true, 2));
result = getResult(read, toSplits(snapshotReader.read().dataSplits()), rowToString);
assertThat(result).containsExactlyInAnyOrder("+I[1, 1, 2, 7]");

// 4. Retracts
write.write(GenericRow.ofKind(RowKind.DELETE, 1, 1, 2, 3));
commit.commit(3, write.prepareCommit(true, 3));
result = getResult(read, toSplits(snapshotReader.read().dataSplits()), rowToString);
assertThat(result).containsExactlyInAnyOrder("+I[1, 1, 2, 4]");
write.close();
commit.close();
}

@Test
public void testAggMergeFunc() throws Exception {
RowType rowType =
Expand Down

0 comments on commit dfd50df

Please sign in to comment.