Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Support delete by default in partial updates #3602

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ but only returns input records.)
By default, Partial update can not accept delete records, you can choose one of the following solutions:

- Configure 'ignore-delete' to ignore delete records.
- Configure 'partial-update.remove-record-on-delete' to remove the whole row when receiving delete records.
- Configure 'sequence-group's to retract partial columns.
{{< /hint >}}

Expand Down
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,12 @@
<td>Integer</td>
<td>Turn off the dictionary encoding for all fields in parquet.</td>
</tr>
<tr>
<td><h5>partial-update.remove-record-on-delete</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Whether to remove the whole row in partial-update engine when -D records are received.</td>
</tr>
<tr>
<td><h5>partition</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,14 @@ public class CoreOptions implements Serializable {
"The field that generates the sequence number for primary key table,"
+ " the sequence number determines which data is the most recent.");

@Immutable
public static final ConfigOption<Boolean> PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE =
key("partial-update.remove-record-on-delete")
.booleanType()
.defaultValue(false)
.withDescription(
"Whether to remove the whole row in partial-update engine when -D records are received.");

@Immutable
public static final ConfigOption<String> ROWKIND_FIELD =
key("rowkind.field")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FieldsComparator;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.Projection;
import org.apache.paimon.utils.UserDefinedSeqComparator;

Expand All @@ -48,6 +49,7 @@

import static org.apache.paimon.CoreOptions.FIELDS_PREFIX;
import static org.apache.paimon.CoreOptions.FIELDS_SEPARATOR;
import static org.apache.paimon.CoreOptions.PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE;
import static org.apache.paimon.utils.InternalRowUtils.createFieldGetters;

/**
Expand All @@ -63,6 +65,7 @@ public class PartialUpdateMergeFunction implements MergeFunction<KeyValue> {
private final Map<Integer, FieldsComparator> fieldSeqComparators;
private final boolean fieldSequenceEnabled;
private final Map<Integer, FieldAggregator> fieldAggregators;
private final boolean removeRecordOnDelete;

private InternalRow currentKey;
private long latestSequenceNumber;
Expand All @@ -74,12 +77,14 @@ protected PartialUpdateMergeFunction(
boolean ignoreDelete,
Map<Integer, FieldsComparator> fieldSeqComparators,
Map<Integer, FieldAggregator> fieldAggregators,
boolean fieldSequenceEnabled) {
boolean fieldSequenceEnabled,
boolean removeRecordOnDelete) {
this.getters = getters;
this.ignoreDelete = ignoreDelete;
this.fieldSeqComparators = fieldSeqComparators;
this.fieldAggregators = fieldAggregators;
this.fieldSequenceEnabled = fieldSequenceEnabled;
this.removeRecordOnDelete = removeRecordOnDelete;
}

@Override
Expand All @@ -106,6 +111,14 @@ public void add(KeyValue kv) {
return;
}

if (removeRecordOnDelete) {
if (kv.valueKind() == RowKind.DELETE) {
row = null;
}
// ignore -U records
return;
}

String msg =
String.join(
"\n",
Expand Down Expand Up @@ -235,6 +248,9 @@ public KeyValue getResult() {
if (reused == null) {
reused = new KeyValue();
}
if (removeRecordOnDelete && row == null) {
return null;
}
return reused.replace(currentKey, latestSequenceNumber, RowKind.INSERT, row);
}

Expand All @@ -256,6 +272,8 @@ private static class Factory implements MergeFunctionFactory<KeyValue> {

private final Map<Integer, FieldAggregator> fieldAggregators;

private final boolean removeRecordOnDelete;

private Factory(Options options, RowType rowType, List<String> primaryKeys) {
this.ignoreDelete = options.get(CoreOptions.IGNORE_DELETE);
this.rowType = rowType;
Expand Down Expand Up @@ -310,6 +328,19 @@ private Factory(Options options, RowType rowType, List<String> primaryKeys) {
throw new IllegalArgumentException(
"Must use sequence group for aggregation functions.");
}

removeRecordOnDelete = options.get(PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE);

Preconditions.checkState(
!(removeRecordOnDelete && ignoreDelete),
String.format(
"%s and %s have conflicting behavior so should not be enabled at the same time.",
CoreOptions.IGNORE_DELETE, PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE));
Preconditions.checkState(
!removeRecordOnDelete || fieldSeqComparators.isEmpty(),
String.format(
"sequence group and %s have conflicting behavior so should not be enabled at the same time.",
PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE));
}

@Override
Expand Down Expand Up @@ -368,14 +399,16 @@ public MergeFunction<KeyValue> create(@Nullable int[][] projection) {
ignoreDelete,
projectedSeqComparators,
projectedAggregators,
!fieldSeqComparators.isEmpty());
!fieldSeqComparators.isEmpty(),
removeRecordOnDelete);
} else {
return new PartialUpdateMergeFunction(
createFieldGetters(tableTypes),
ignoreDelete,
fieldSeqComparators,
fieldAggregators,
!fieldSeqComparators.isEmpty());
!fieldSeqComparators.isEmpty(),
removeRecordOnDelete);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1023,6 +1023,107 @@ public void testAuditLog() throws Exception {
assertThat(result).containsExactlyInAnyOrder("+I[20, 2]", "+I[30, 1]", "+I[10, 1]");
}

@Test
public void testPartialUpdateRemoveRecordOnDelete() throws Exception {
RowType rowType =
RowType.of(
new DataType[] {
DataTypes.INT(), DataTypes.INT(), DataTypes.INT(), DataTypes.INT()
},
new String[] {"pt", "a", "b", "c"});
FileStoreTable table =
createFileStoreTable(
options -> {
options.set("merge-engine", "partial-update");
options.set("partial-update.remove-record-on-delete", "true");
},
rowType);
Function<InternalRow, String> rowToString = row -> internalRowToString(row, rowType);
SnapshotReader snapshotReader = table.newSnapshotReader();
TableRead read = table.newRead();
StreamTableWrite write = table.newWrite("");
StreamTableCommit commit = table.newCommit("");
// 1. inserts
write.write(GenericRow.of(1, 1, 3, 3));
write.write(GenericRow.of(1, 1, 1, 1));
write.write(GenericRow.of(1, 1, 2, 2));
commit.commit(0, write.prepareCommit(true, 0));
List<String> result =
getResult(read, toSplits(snapshotReader.read().dataSplits()), rowToString);
assertThat(result).containsExactlyInAnyOrder("+I[1, 1, 2, 2]");

// 2. Update Before
write.write(GenericRow.ofKind(RowKind.UPDATE_BEFORE, 1, 1, 2, 2));
commit.commit(1, write.prepareCommit(true, 1));
result = getResult(read, toSplits(snapshotReader.read().dataSplits()), rowToString);
assertThat(result).containsExactlyInAnyOrder("+I[1, 1, 2, 2]");

// 3. Update After
write.write(GenericRow.ofKind(RowKind.UPDATE_AFTER, 1, 1, 2, 3));
commit.commit(2, write.prepareCommit(true, 2));
result = getResult(read, toSplits(snapshotReader.read().dataSplits()), rowToString);
assertThat(result).containsExactlyInAnyOrder("+I[1, 1, 2, 3]");

// 4. Retracts
write.write(GenericRow.ofKind(RowKind.DELETE, 1, 1, 2, 3));
commit.commit(3, write.prepareCommit(true, 3));
result = getResult(read, toSplits(snapshotReader.read().dataSplits()), rowToString);
assertThat(result).isEmpty();
write.close();
commit.close();
}

@Test
public void testPartialUpdateWithAgg() throws Exception {
RowType rowType =
RowType.of(
new DataType[] {
DataTypes.INT(), DataTypes.INT(), DataTypes.INT(), DataTypes.INT()
},
new String[] {"pt", "a", "b", "c"});
FileStoreTable table =
createFileStoreTable(
options -> {
options.set("merge-engine", "partial-update");
options.set("fields.a.sequence-group", "c");
options.set("fields.c.aggregate-function", "sum");
},
rowType);
Function<InternalRow, String> rowToString = row -> internalRowToString(row, rowType);
SnapshotReader snapshotReader = table.newSnapshotReader();
TableRead read = table.newRead();
StreamTableWrite write = table.newWrite("");
StreamTableCommit commit = table.newCommit("");
// 1. inserts
write.write(GenericRow.of(1, 1, 3, 3));
write.write(GenericRow.of(1, 1, 1, 1));
write.write(GenericRow.of(1, 1, 2, 2));
commit.commit(0, write.prepareCommit(true, 0));
List<String> result =
getResult(read, toSplits(snapshotReader.read().dataSplits()), rowToString);
assertThat(result).containsExactlyInAnyOrder("+I[1, 1, 2, 6]");

// 2. Update Before
write.write(GenericRow.ofKind(RowKind.UPDATE_BEFORE, 1, 1, 2, 2));
commit.commit(1, write.prepareCommit(true, 1));
result = getResult(read, toSplits(snapshotReader.read().dataSplits()), rowToString);
assertThat(result).containsExactlyInAnyOrder("+I[1, 1, 2, 4]");

// 3. Update After
write.write(GenericRow.ofKind(RowKind.UPDATE_AFTER, 1, 1, 2, 3));
commit.commit(2, write.prepareCommit(true, 2));
result = getResult(read, toSplits(snapshotReader.read().dataSplits()), rowToString);
assertThat(result).containsExactlyInAnyOrder("+I[1, 1, 2, 7]");

// 4. Retracts
write.write(GenericRow.ofKind(RowKind.DELETE, 1, 1, 2, 3));
commit.commit(3, write.prepareCommit(true, 3));
result = getResult(read, toSplits(snapshotReader.read().dataSplits()), rowToString);
assertThat(result).containsExactlyInAnyOrder("+I[1, 1, 2, 4]");
write.close();
commit.close();
}

@Test
public void testAggMergeFunc() throws Exception {
RowType rowType =
Expand Down