Skip to content

Commit

Permalink
[core] Support deleting rows in partial update of specific sequence g…
Browse files Browse the repository at this point in the history
…roup (#4525)
  • Loading branch information
liyubin117 authored Nov 14, 2024
1 parent c95c3e6 commit 84a97ee
Show file tree
Hide file tree
Showing 6 changed files with 178 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ By default, Partial update can not accept delete records, you can choose one of
- Configure 'ignore-delete' to ignore delete records.
- Configure 'partial-update.remove-record-on-delete' to remove the whole row when receiving delete records.
- Configure 'sequence-group's to retract partial columns.
{{< /hint >}}
* Configure 'partial-update.remove-record-on-sequence-group' to remove the whole row when receiving delete records of specified sequence group.
{{< /hint >}}

## Sequence Group

Expand Down
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,12 @@
<td>Boolean</td>
<td>Whether to remove the whole row in partial-update engine when -D records are received.</td>
</tr>
<tr>
<td><h5>partial-update.remove-record-on-sequence-group</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Whether to remove the whole row in partial-update engine when -D records of specified sequence group are received.</td>
</tr>
<tr>
<td><h5>partition</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -631,6 +631,14 @@ public class CoreOptions implements Serializable {
.withDescription(
"Whether to remove the whole row in partial-update engine when -D records are received.");

@Immutable
public static final ConfigOption<String> PARTIAL_UPDATE_REMOVE_RECORD_ON_SEQUENCE_GROUP =
key("partial-update.remove-record-on-sequence-group")
.stringType()
.noDefaultValue()
.withDescription(
"Whether to remove the whole row in partial-update engine when -D records of specified sequence group are received.");

@Immutable
public static final ConfigOption<String> ROWKIND_FIELD =
key("rowkind.field")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import static org.apache.paimon.CoreOptions.FIELDS_PREFIX;
import static org.apache.paimon.CoreOptions.FIELDS_SEPARATOR;
import static org.apache.paimon.CoreOptions.PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE;
import static org.apache.paimon.CoreOptions.PARTIAL_UPDATE_REMOVE_RECORD_ON_SEQUENCE_GROUP;
import static org.apache.paimon.utils.InternalRowUtils.createFieldGetters;

/**
Expand All @@ -68,6 +69,7 @@ public class PartialUpdateMergeFunction implements MergeFunction<KeyValue> {
private final boolean fieldSequenceEnabled;
private final Map<Integer, FieldAggregator> fieldAggregators;
private final boolean removeRecordOnDelete;
private final Set<Integer> sequenceGroupPartialDelete;

private InternalRow currentKey;
private long latestSequenceNumber;
Expand All @@ -81,13 +83,15 @@ protected PartialUpdateMergeFunction(
Map<Integer, FieldsComparator> fieldSeqComparators,
Map<Integer, FieldAggregator> fieldAggregators,
boolean fieldSequenceEnabled,
boolean removeRecordOnDelete) {
boolean removeRecordOnDelete,
Set<Integer> sequenceGroupPartialDelete) {
this.getters = getters;
this.ignoreDelete = ignoreDelete;
this.fieldSeqComparators = fieldSeqComparators;
this.fieldAggregators = fieldAggregators;
this.fieldSequenceEnabled = fieldSequenceEnabled;
this.removeRecordOnDelete = removeRecordOnDelete;
this.sequenceGroupPartialDelete = sequenceGroupPartialDelete;
}

@Override
Expand Down Expand Up @@ -220,8 +224,15 @@ private void retractWithSequenceGroup(KeyValue kv) {
.anyMatch(field -> field == index)) {
for (int field : seqComparator.compareFields()) {
if (!updatedSequenceFields.contains(field)) {
row.setField(field, getters[field].getFieldOrNull(kv.value()));
updatedSequenceFields.add(field);
if (kv.valueKind() == RowKind.DELETE
&& sequenceGroupPartialDelete.contains(field)) {
currentDeleteRow = true;
row = new GenericRow(getters.length);
return;
} else {
row.setField(field, getters[field].getFieldOrNull(kv.value()));
updatedSequenceFields.add(field);
}
}
}
} else {
Expand Down Expand Up @@ -278,13 +289,21 @@ private static class Factory implements MergeFunctionFactory<KeyValue> {

private final boolean removeRecordOnDelete;

private final String removeRecordOnSequenceGroup;

private Set<Integer> sequenceGroupPartialDelete;

private Factory(Options options, RowType rowType, List<String> primaryKeys) {
this.ignoreDelete = options.get(CoreOptions.IGNORE_DELETE);
this.rowType = rowType;
this.tableTypes = rowType.getFieldTypes();
this.removeRecordOnSequenceGroup =
options.get(PARTIAL_UPDATE_REMOVE_RECORD_ON_SEQUENCE_GROUP);
this.sequenceGroupPartialDelete = new HashSet<>();

List<String> fieldNames = rowType.getFieldNames();
this.fieldSeqComparators = new HashMap<>();
Map<String, Integer> sequenceGroupMap = new HashMap<>();
for (Map.Entry<String, String> entry : options.toMap().entrySet()) {
String k = entry.getKey();
String v = entry.getValue();
Expand Down Expand Up @@ -323,6 +342,7 @@ private Factory(Options options, RowType rowType, List<String> primaryKeys) {
fieldName -> {
int index = fieldNames.indexOf(fieldName);
fieldSeqComparators.put(index, userDefinedSeqComparator);
sequenceGroupMap.put(fieldName, index);
});
}
}
Expand All @@ -345,6 +365,21 @@ private Factory(Options options, RowType rowType, List<String> primaryKeys) {
String.format(
"sequence group and %s have conflicting behavior so should not be enabled at the same time.",
PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE));

if (removeRecordOnSequenceGroup != null) {
String[] sequenceGroupArr = removeRecordOnSequenceGroup.split(FIELDS_SEPARATOR);
Preconditions.checkState(
sequenceGroupMap.keySet().containsAll(Arrays.asList(sequenceGroupArr)),
String.format(
"field '%s' defined in '%s' option must be part of sequence groups",
removeRecordOnSequenceGroup,
PARTIAL_UPDATE_REMOVE_RECORD_ON_SEQUENCE_GROUP.key()));
sequenceGroupPartialDelete =
Arrays.stream(sequenceGroupArr)
.filter(sequenceGroupMap::containsKey)
.map(sequenceGroupMap::get)
.collect(Collectors.toSet());
}
}

@Override
Expand Down Expand Up @@ -405,7 +440,8 @@ public MergeFunction<KeyValue> create(@Nullable int[][] projection) {
projectedSeqComparators,
projectedAggregators,
!fieldSeqComparators.isEmpty(),
removeRecordOnDelete);
removeRecordOnDelete,
sequenceGroupPartialDelete);
} else {
Map<Integer, FieldsComparator> fieldSeqComparators = new HashMap<>();
this.fieldSeqComparators.forEach(
Expand All @@ -419,7 +455,8 @@ public MergeFunction<KeyValue> create(@Nullable int[][] projection) {
fieldSeqComparators,
fieldAggregators,
!fieldSeqComparators.isEmpty(),
removeRecordOnDelete);
removeRecordOnDelete,
sequenceGroupPartialDelete);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,42 @@ public void testSequenceGroup() {
validate(func, 1, null, null, 6, null, null, 6);
}

@Test
public void testSequenceGroupPartialDelete() {
Options options = new Options();
options.set("fields.f3.sequence-group", "f1,f2");
options.set("fields.f6.sequence-group", "f4,f5");
options.set("partial-update.remove-record-on-sequence-group", "f6");
RowType rowType =
RowType.of(
DataTypes.INT(),
DataTypes.INT(),
DataTypes.INT(),
DataTypes.INT(),
DataTypes.INT(),
DataTypes.INT(),
DataTypes.INT());
MergeFunction<KeyValue> func =
PartialUpdateMergeFunction.factory(options, rowType, ImmutableList.of("f0"))
.create();
func.reset();
add(func, 1, 1, 1, 1, 1, 1, 1);
add(func, 1, 2, 2, 2, 2, 2, null);
validate(func, 1, 2, 2, 2, 1, 1, 1);
add(func, 1, 3, 3, 1, 3, 3, 3);
validate(func, 1, 2, 2, 2, 3, 3, 3);

// delete
add(func, RowKind.DELETE, 1, 1, 1, 3, 1, 1, null);
validate(func, 1, null, null, 3, 3, 3, 3);
add(func, RowKind.DELETE, 1, 1, 1, 3, 1, 1, 4);
validate(func, null, null, null, null, null, null, null);
add(func, 1, 4, 4, 4, 5, 5, 5);
validate(func, 1, 4, 4, 4, 5, 5, 5);
add(func, RowKind.DELETE, 1, 1, 1, 6, 1, 1, 6);
validate(func, null, null, null, null, null, null, null);
}

@Test
public void testMultiSequenceFields() {
Options options = new Options();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1213,6 +1213,90 @@ public void testPartialUpdateRemoveRecordOnDelete() throws Exception {
commit.close();
}

@Test
public void testPartialUpdateRemoveRecordOnSequenceGroup() throws Exception {
RowType rowType =
RowType.of(
new DataType[] {
DataTypes.INT(),
DataTypes.INT(),
DataTypes.INT(),
DataTypes.INT(),
DataTypes.INT(),
DataTypes.INT(),
DataTypes.INT()
},
new String[] {"pt", "a", "b", "seq1", "c", "d", "seq2"});
FileStoreTable table =
createFileStoreTable(
options -> {
options.set("merge-engine", "partial-update");
options.set("fields.seq1.sequence-group", "b");
options.set("fields.seq2.sequence-group", "c,d");
options.set("partial-update.remove-record-on-sequence-group", "seq2");
},
rowType);
FileStoreTable wrongTable =
createFileStoreTable(
options -> {
options.set("merge-engine", "partial-update");
options.set("fields.seq1.sequence-group", "b");
options.set("fields.seq2.sequence-group", "c,d");
options.set("partial-update.remove-record-on-sequence-group", "b");
},
rowType);
Function<InternalRow, String> rowToString = row -> internalRowToString(row, rowType);

assertThatThrownBy(() -> wrongTable.newWrite(""))
.hasMessageContaining(
"field 'b' defined in 'partial-update.remove-record-on-sequence-group' option must be part of sequence groups");

SnapshotReader snapshotReader = table.newSnapshotReader();
TableRead read = table.newRead();
StreamTableWrite write = table.newWrite("");
StreamTableCommit commit = table.newCommit("");
// 1. Inserts
write.write(GenericRow.of(1, 1, 10, 1, 20, 20, 1));
write.write(GenericRow.of(1, 1, 11, 2, 25, 25, 0));
write.write(GenericRow.of(1, 1, 12, 1, 29, 29, 2));
commit.commit(0, write.prepareCommit(true, 0));
List<String> result =
getResult(read, toSplits(snapshotReader.read().dataSplits()), rowToString);
assertThat(result).containsExactlyInAnyOrder("+I[1, 1, 11, 2, 29, 29, 2]");

// 2. Update Before
write.write(GenericRow.ofKind(RowKind.UPDATE_BEFORE, 1, 1, 11, 2, 29, 29, 2));
commit.commit(1, write.prepareCommit(true, 1));
result = getResult(read, toSplits(snapshotReader.read().dataSplits()), rowToString);
assertThat(result).containsExactlyInAnyOrder("+I[1, 1, NULL, 2, NULL, NULL, 2]");

// 3. Update After
write.write(GenericRow.ofKind(RowKind.UPDATE_AFTER, 1, 1, 11, 2, 30, 30, 3));
commit.commit(2, write.prepareCommit(true, 2));
result = getResult(read, toSplits(snapshotReader.read().dataSplits()), rowToString);
assertThat(result).containsExactlyInAnyOrder("+I[1, 1, 11, 2, 30, 30, 3]");

// 4. Retracts
write.write(GenericRow.ofKind(RowKind.DELETE, 1, 1, 12, 3, 30, 30, 2));
commit.commit(3, write.prepareCommit(true, 3));
result = getResult(read, toSplits(snapshotReader.read().dataSplits()), rowToString);
assertThat(result).containsExactlyInAnyOrder("+I[1, 1, NULL, 3, 30, 30, 3]");

write.write(GenericRow.ofKind(RowKind.DELETE, 1, 1, 12, 2, 30, 31, 5));
commit.commit(4, write.prepareCommit(true, 4));
result = getResult(read, toSplits(snapshotReader.read().dataSplits()), rowToString);
assertThat(result).isEmpty();

// 5. Inserts
write.write(GenericRow.of(1, 1, 11, 2, 30, 31, 6));
commit.commit(5, write.prepareCommit(true, 5));
result = getResult(read, toSplits(snapshotReader.read().dataSplits()), rowToString);
assertThat(result).containsExactlyInAnyOrder("+I[1, 1, 11, 2, 30, 31, 6]");

write.close();
commit.close();
}

@Test
public void testPartialUpdateWithAgg() throws Exception {
RowType rowType =
Expand Down

0 comments on commit 84a97ee

Please sign in to comment.