Skip to content

Commit

Permalink
[core] Move ignore-delete from MergeFunction to Table writer (#3128)
Browse files Browse the repository at this point in the history
  • Loading branch information
yuzelin authored Apr 1, 2024
1 parent b21f4e5 commit 6378e75
Show file tree
Hide file tree
Showing 29 changed files with 141 additions and 262 deletions.
6 changes: 3 additions & 3 deletions docs/content/concepts/primary-key-table/merge-engine.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ result in strange behavior. When the input is out of order, we recommend that yo

`deduplicate` merge engine is the default merge engine. Paimon will only keep the latest record and throw away other records with the same primary keys.

Specifically, if the latest record is a `DELETE` record, all records with the same primary keys will be deleted.
Specifically, if the latest record is a `DELETE` record, all records with the same primary keys will be deleted. You can config `ignore-delete` to ignore it.

## Partial Update

Expand All @@ -59,7 +59,7 @@ For streaming queries, `partial-update` merge engine must be used together with

{{< hint info >}}
By default, Partial update can not accept delete records, you can choose one of the following solutions:
- Configure 'partial-update.ignore-delete' to ignore delete records.
- Configure 'ignore-delete' to ignore delete records.
- Configure 'sequence-group's to retract partial columns.
{{< /hint >}}

Expand Down Expand Up @@ -328,7 +328,7 @@ By specifying `'merge-engine' = 'first-row'`, users can keep the first row of th
{{< hint info >}}
1. `first-row` merge engine must be used together with `lookup` [changelog producer]({{< ref "concepts/primary-key-table/changelog-producer" >}}).
2. You can not specify `sequence.field`.
3. Not accept `DELETE` and `UPDATE_BEFORE` message. You can config `first-row.ignore-delete` to ignore these two kinds records.
3. Not accept `DELETE` and `UPDATE_BEFORE` message. You can config `ignore-delete` to ignore these two kinds records.
{{< /hint >}}

This is of great help in replacing log deduplication in streaming computation.
24 changes: 6 additions & 18 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -176,12 +176,6 @@
<td>Duration</td>
<td>The TTL in rocksdb index for cross partition upsert (primary keys not contain all partition fields), this can avoid maintaining too many indexes and lead to worse and worse performance, but please note that this may also cause data duplication.</td>
</tr>
<tr>
<td><h5>deduplicate.ignore-delete</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Whether to ignore delete records in deduplicate mode.</td>
</tr>
<tr>
<td><h5>deletion-vectors.enabled</h5></td>
<td style="word-wrap: break-word;">false</td>
Expand Down Expand Up @@ -242,18 +236,18 @@
<td>Map</td>
<td>Define different file format for different level, you can add the conf like this: 'file.format.per.level' = '0:avro,3:parquet', if the file format for level is not provided, the default format which set by `file.format` will be used.</td>
</tr>
<tr>
<td><h5>first-row.ignore-delete</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Whether to ignore delete records in first-row mode.</td>
</tr>
<tr>
<td><h5>full-compaction.delta-commits</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>Integer</td>
<td>Full compaction will be constantly triggered after delta commits.</td>
</tr>
<tr>
<td><h5>ignore-delete</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Whether to ignore delete records.</td>
</tr>
<tr>
<td><h5>incremental-between</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down Expand Up @@ -413,12 +407,6 @@
<td>Integer</td>
<td>Turn off the dictionary encoding for all fields in parquet.</td>
</tr>
<tr>
<td><h5>partial-update.ignore-delete</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Whether to ignore delete records in partial-update mode.</td>
</tr>
<tr>
<td><h5>partition</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down
26 changes: 11 additions & 15 deletions paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -255,23 +255,15 @@ public class CoreOptions implements Serializable {
.defaultValue(MergeEngine.DEDUPLICATE)
.withDescription("Specify the merge engine for table with primary key.");

public static final ConfigOption<Boolean> DEDUPLICATE_IGNORE_DELETE =
key("deduplicate.ignore-delete")
public static final ConfigOption<Boolean> IGNORE_DELETE =
key("ignore-delete")
.booleanType()
.defaultValue(false)
.withDescription("Whether to ignore delete records in deduplicate mode.");

public static final ConfigOption<Boolean> PARTIAL_UPDATE_IGNORE_DELETE =
key("partial-update.ignore-delete")
.booleanType()
.defaultValue(false)
.withDescription("Whether to ignore delete records in partial-update mode.");

public static final ConfigOption<Boolean> FIRST_ROW_IGNORE_DELETE =
key("first-row.ignore-delete")
.booleanType()
.defaultValue(false)
.withDescription("Whether to ignore delete records in first-row mode.");
.withDeprecatedKeys(
"first-row.ignore-delete",
"deduplicate.ignore-delete",
"partial-update.ignore-delete")
.withDescription("Whether to ignore delete records.");

public static final ConfigOption<SortEngine> SORT_ENGINE =
key("sort-engine")
Expand Down Expand Up @@ -1275,6 +1267,10 @@ public MergeEngine mergeEngine() {
return options.get(MERGE_ENGINE);
}

public boolean ignoreDelete() {
return options.get(IGNORE_DELETE);
}

public SortEngine sortEngine() {
return options.get(SORT_ENGINE);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.apache.paimon.memory.MemorySegmentPool;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.statistics.FieldStatsCollector;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.CommitIncrement;
import org.apache.paimon.utils.IOUtils;
Expand Down Expand Up @@ -130,8 +129,9 @@ public AppendOnlyWriter(
@Override
public void write(InternalRow rowData) throws Exception {
Preconditions.checkArgument(
rowData.getRowKind() == RowKind.INSERT,
"Append-only writer can only accept insert row kind, but current row kind is: %s",
rowData.getRowKind().isAdd(),
"Append-only writer can only accept insert or update_after row kind, but current row kind is: %s. "
+ "You can configure 'ignore-delete' to ignore retract records.",
rowData.getRowKind());
boolean success = sinkWriter.write(rowData);
if (!success) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@

package org.apache.paimon.mergetree.compact;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.KeyValue;
import org.apache.paimon.options.Options;

import javax.annotation.Nullable;

Expand All @@ -30,24 +28,15 @@
*/
public class DeduplicateMergeFunction implements MergeFunction<KeyValue> {

private final boolean ignoreDelete;

private KeyValue latestKv;

protected DeduplicateMergeFunction(boolean ignoreDelete) {
this.ignoreDelete = ignoreDelete;
}

@Override
public void reset() {
latestKv = null;
}

@Override
public void add(KeyValue kv) {
if (ignoreDelete && kv.valueKind().isRetract()) {
return;
}
latestKv = kv;
}

Expand All @@ -58,26 +47,16 @@ public KeyValue getResult() {
}

public static MergeFunctionFactory<KeyValue> factory() {
return new Factory(new Options());
}

public static MergeFunctionFactory<KeyValue> factory(Options options) {
return new Factory(options);
return new Factory();
}

private static class Factory implements MergeFunctionFactory<KeyValue> {

private static final long serialVersionUID = 1L;

private final Options options;

private Factory(Options options) {
this.options = options;
}

@Override
public MergeFunction<KeyValue> create(@Nullable int[][] projection) {
return new DeduplicateMergeFunction(options.get(CoreOptions.DEDUPLICATE_IGNORE_DELETE));
return new DeduplicateMergeFunction();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,10 @@

package org.apache.paimon.mergetree.compact;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.KeyValue;
import org.apache.paimon.data.serializer.InternalRowSerializer;
import org.apache.paimon.options.Options;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Preconditions;

import javax.annotation.Nullable;

Expand All @@ -36,12 +35,9 @@ public class FirstRowMergeFunction implements MergeFunction<KeyValue> {
private final InternalRowSerializer valueSerializer;
private KeyValue first;

private final boolean ignoreDelete;

protected FirstRowMergeFunction(RowType keyType, RowType valueType, boolean ignoreDelete) {
protected FirstRowMergeFunction(RowType keyType, RowType valueType) {
this.keySerializer = new InternalRowSerializer(keyType);
this.valueSerializer = new InternalRowSerializer(valueType);
this.ignoreDelete = ignoreDelete;
}

@Override
Expand All @@ -51,15 +47,10 @@ public void reset() {

@Override
public void add(KeyValue kv) {
if (kv.valueKind().isRetract()) {
if (ignoreDelete) {
return;
} else {
throw new IllegalArgumentException(
"By default, First row merge engine can not accept DELETE/UPDATE_BEFORE records.\n"
+ "You can config 'first-row.ignore-delete' to ignore the DELETE/UPDATE_BEFORE records.");
}
}
Preconditions.checkArgument(
kv.valueKind().isAdd(),
"By default, First row merge engine can not accept DELETE/UPDATE_BEFORE records.\n"
+ "You can config 'ignore-delete' to ignore the DELETE/UPDATE_BEFORE records.");
if (first == null) {
this.first = kv.copy(keySerializer, valueSerializer);
}
Expand All @@ -71,9 +62,8 @@ public KeyValue getResult() {
return first;
}

public static MergeFunctionFactory<KeyValue> factory(
RowType keyType, RowType valueType, Options options) {
return new FirstRowMergeFunction.Factory(keyType, valueType, options);
public static MergeFunctionFactory<KeyValue> factory(RowType keyType, RowType valueType) {
return new FirstRowMergeFunction.Factory(keyType, valueType);
}

private static class Factory implements MergeFunctionFactory<KeyValue> {
Expand All @@ -82,18 +72,14 @@ private static class Factory implements MergeFunctionFactory<KeyValue> {
private final RowType keyType;
private final RowType valueType;

private final Options options;

public Factory(RowType keyType, RowType valueType, Options options) {
public Factory(RowType keyType, RowType valueType) {
this.keyType = keyType;
this.valueType = valueType;
this.options = options;
}

@Override
public MergeFunction<KeyValue> create(@Nullable int[][] projection) {
return new FirstRowMergeFunction(
keyType, valueType, options.get(CoreOptions.FIRST_ROW_IGNORE_DELETE));
return new FirstRowMergeFunction(keyType, valueType);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ public class PartialUpdateMergeFunction implements MergeFunction<KeyValue> {
public static final String SEQUENCE_GROUP = "sequence-group";

private final InternalRow.FieldGetter[] getters;
private final boolean ignoreDelete;
private final Map<Integer, SequenceGenerator> fieldSequences;
private final boolean fieldSequenceEnabled;
private final Map<Integer, FieldAggregator> fieldAggregators;
Expand All @@ -80,12 +79,10 @@ public class PartialUpdateMergeFunction implements MergeFunction<KeyValue> {

protected PartialUpdateMergeFunction(
InternalRow.FieldGetter[] getters,
boolean ignoreDelete,
Map<Integer, SequenceGenerator> fieldSequences,
Map<Integer, FieldAggregator> fieldAggregators,
boolean fieldSequenceEnabled) {
this.getters = getters;
this.ignoreDelete = ignoreDelete;
this.fieldSequences = fieldSequences;
this.fieldAggregators = fieldAggregators;
this.fieldSequenceEnabled = fieldSequenceEnabled;
Expand All @@ -104,12 +101,7 @@ public void add(KeyValue kv) {
// refresh key object to avoid reference overwritten
currentKey = kv.key();

// ignore delete?
if (kv.valueKind().isRetract()) {
if (ignoreDelete) {
return;
}

if (fieldSequenceEnabled) {
retractWithSequenceGroup(kv);
return;
Expand All @@ -120,7 +112,7 @@ public void add(KeyValue kv) {
"\n",
"By default, Partial update can not accept delete records,"
+ " you can choose one of the following solutions:",
"1. Configure 'partial-update.ignore-delete' to ignore delete records.",
"1. Configure 'ignore-delete' to ignore delete records.",
"2. Configure 'sequence-group's to retract partial columns.");

throw new IllegalArgumentException(msg);
Expand Down Expand Up @@ -232,14 +224,12 @@ private static class Factory implements MergeFunctionFactory<KeyValue> {

private static final long serialVersionUID = 1L;

private final boolean ignoreDelete;
private final List<DataType> tableTypes;
private final Map<Integer, SequenceGenerator> fieldSequences;

private final Map<Integer, FieldAggregator> fieldAggregators;

private Factory(Options options, RowType rowType, List<String> primaryKeys) {
this.ignoreDelete = options.get(CoreOptions.PARTIAL_UPDATE_IGNORE_DELETE);
this.tableTypes = rowType.getFieldTypes();

List<String> fieldNames = rowType.getFieldNames();
Expand Down Expand Up @@ -325,14 +315,12 @@ public MergeFunction<KeyValue> create(@Nullable int[][] projection) {

return new PartialUpdateMergeFunction(
createFieldGetters(Projection.of(projection).project(tableTypes)),
ignoreDelete,
projectedSequences,
projectedAggregators,
!fieldSequences.isEmpty());
} else {
return new PartialUpdateMergeFunction(
createFieldGetters(tableTypes),
ignoreDelete,
fieldSequences,
fieldAggregators,
!fieldSequences.isEmpty());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,8 @@ record -> {
"Append only writer can not accept row with RowKind %s",
record.row().getRowKind());
return record.row();
});
},
CoreOptions.fromMap(tableSchema.options()).ignoreDelete());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,8 @@ record -> {
? row.getRowKind()
: rowKindGenerator.generate(row);
return kv.replace(record.primaryKey(), KeyValue.UNKNOWN_SEQUENCE, rowKind, row);
});
},
CoreOptions.fromMap(tableSchema.options()).ignoreDelete());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public static MergeFunctionFactory<KeyValue> createMergeFunctionFactory(

switch (mergeEngine) {
case DEDUPLICATE:
return DeduplicateMergeFunction.factory(conf);
return DeduplicateMergeFunction.factory();
case PARTIAL_UPDATE:
return PartialUpdateMergeFunction.factory(conf, rowType, tableSchema.primaryKeys());
case AGGREGATE:
Expand All @@ -75,7 +75,7 @@ public static MergeFunctionFactory<KeyValue> createMergeFunctionFactory(
tableSchema.primaryKeys());
case FIRST_ROW:
return FirstRowMergeFunction.factory(
new RowType(extractor.keyFields(tableSchema)), rowType, conf);
new RowType(extractor.keyFields(tableSchema)), rowType);
default:
throw new UnsupportedOperationException("Unsupported merge engine: " + mergeEngine);
}
Expand Down
Loading

0 comments on commit 6378e75

Please sign in to comment.