diff --git a/docs/content/concepts/primary-key-table/merge-engine.md b/docs/content/concepts/primary-key-table/merge-engine.md index 9d06020fa64d..5e87c4f76662 100644 --- a/docs/content/concepts/primary-key-table/merge-engine.md +++ b/docs/content/concepts/primary-key-table/merge-engine.md @@ -38,7 +38,7 @@ result in strange behavior. When the input is out of order, we recommend that yo `deduplicate` merge engine is the default merge engine. Paimon will only keep the latest record and throw away other records with the same primary keys. -Specifically, if the latest record is a `DELETE` record, all records with the same primary keys will be deleted. +Specifically, if the latest record is a `DELETE` record, all records with the same primary keys will be deleted. You can config `ignore-delete` to ignore it. ## Partial Update @@ -59,7 +59,7 @@ For streaming queries, `partial-update` merge engine must be used together with {{< hint info >}} By default, Partial update can not accept delete records, you can choose one of the following solutions: -- Configure 'partial-update.ignore-delete' to ignore delete records. +- Configure 'ignore-delete' to ignore delete records. - Configure 'sequence-group's to retract partial columns. {{< /hint >}} @@ -328,7 +328,7 @@ By specifying `'merge-engine' = 'first-row'`, users can keep the first row of th {{< hint info >}} 1. `first-row` merge engine must be used together with `lookup` [changelog producer]({{< ref "concepts/primary-key-table/changelog-producer" >}}). 2. You can not specify `sequence.field`. -3. Not accept `DELETE` and `UPDATE_BEFORE` message. You can config `first-row.ignore-delete` to ignore these two kinds records. +3. Not accept `DELETE` and `UPDATE_BEFORE` message. You can config `ignore-delete` to ignore these two kinds records. {{< /hint >}} This is of great help in replacing log deduplication in streaming computation. diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index 3f4c76bf03aa..e1a52ec2a2db 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -176,12 +176,6 @@ Duration The TTL in rocksdb index for cross partition upsert (primary keys not contain all partition fields), this can avoid maintaining too many indexes and lead to worse and worse performance, but please note that this may also cause data duplication. - -
deduplicate.ignore-delete
- false - Boolean - Whether to ignore delete records in deduplicate mode. -
deletion-vectors.enabled
false @@ -242,18 +236,18 @@ Map Define different file format for different level, you can add the conf like this: 'file.format.per.level' = '0:avro,3:parquet', if the file format for level is not provided, the default format which set by `file.format` will be used. - -
first-row.ignore-delete
- false - Boolean - Whether to ignore delete records in first-row mode. -
full-compaction.delta-commits
(none) Integer Full compaction will be constantly triggered after delta commits. + +
ignore-delete
+ false + Boolean + Whether to ignore delete records. +
incremental-between
(none) @@ -413,12 +407,6 @@ Integer Turn off the dictionary encoding for all fields in parquet. - -
partial-update.ignore-delete
- false - Boolean - Whether to ignore delete records in partial-update mode. -
partition
(none) diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java index 895da7fe8c68..287dc6e16b95 100644 --- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java @@ -255,23 +255,15 @@ public class CoreOptions implements Serializable { .defaultValue(MergeEngine.DEDUPLICATE) .withDescription("Specify the merge engine for table with primary key."); - public static final ConfigOption DEDUPLICATE_IGNORE_DELETE = - key("deduplicate.ignore-delete") + public static final ConfigOption IGNORE_DELETE = + key("ignore-delete") .booleanType() .defaultValue(false) - .withDescription("Whether to ignore delete records in deduplicate mode."); - - public static final ConfigOption PARTIAL_UPDATE_IGNORE_DELETE = - key("partial-update.ignore-delete") - .booleanType() - .defaultValue(false) - .withDescription("Whether to ignore delete records in partial-update mode."); - - public static final ConfigOption FIRST_ROW_IGNORE_DELETE = - key("first-row.ignore-delete") - .booleanType() - .defaultValue(false) - .withDescription("Whether to ignore delete records in first-row mode."); + .withDeprecatedKeys( + "first-row.ignore-delete", + "deduplicate.ignore-delete", + "partial-update.ignore-delete") + .withDescription("Whether to ignore delete records."); public static final ConfigOption SORT_ENGINE = key("sort-engine") @@ -1275,6 +1267,10 @@ public MergeEngine mergeEngine() { return options.get(MERGE_ENGINE); } + public boolean ignoreDelete() { + return options.get(IGNORE_DELETE); + } + public SortEngine sortEngine() { return options.get(SORT_ENGINE); } diff --git a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java index 1ab054531541..9566dd37215e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java @@ -35,7 +35,6 @@ import org.apache.paimon.memory.MemorySegmentPool; import org.apache.paimon.options.MemorySize; import org.apache.paimon.statistics.FieldStatsCollector; -import org.apache.paimon.types.RowKind; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.CommitIncrement; import org.apache.paimon.utils.IOUtils; @@ -130,8 +129,9 @@ public AppendOnlyWriter( @Override public void write(InternalRow rowData) throws Exception { Preconditions.checkArgument( - rowData.getRowKind() == RowKind.INSERT, - "Append-only writer can only accept insert row kind, but current row kind is: %s", + rowData.getRowKind().isAdd(), + "Append-only writer can only accept insert or update_after row kind, but current row kind is: %s. " + + "You can configure 'ignore-delete' to ignore retract records.", rowData.getRowKind()); boolean success = sinkWriter.write(rowData); if (!success) { diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/DeduplicateMergeFunction.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/DeduplicateMergeFunction.java index b062ce9b6c03..9c1cf0f5e60c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/DeduplicateMergeFunction.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/DeduplicateMergeFunction.java @@ -18,9 +18,7 @@ package org.apache.paimon.mergetree.compact; -import org.apache.paimon.CoreOptions; import org.apache.paimon.KeyValue; -import org.apache.paimon.options.Options; import javax.annotation.Nullable; @@ -30,14 +28,8 @@ */ public class DeduplicateMergeFunction implements MergeFunction { - private final boolean ignoreDelete; - private KeyValue latestKv; - protected DeduplicateMergeFunction(boolean ignoreDelete) { - this.ignoreDelete = ignoreDelete; - } - @Override public void reset() { latestKv = null; @@ -45,9 +37,6 @@ public void reset() { @Override public void add(KeyValue kv) { - if (ignoreDelete && kv.valueKind().isRetract()) { - return; - } latestKv = kv; } @@ -58,26 +47,16 @@ public KeyValue getResult() { } public static MergeFunctionFactory factory() { - return new Factory(new Options()); - } - - public static MergeFunctionFactory factory(Options options) { - return new Factory(options); + return new Factory(); } private static class Factory implements MergeFunctionFactory { private static final long serialVersionUID = 1L; - private final Options options; - - private Factory(Options options) { - this.options = options; - } - @Override public MergeFunction create(@Nullable int[][] projection) { - return new DeduplicateMergeFunction(options.get(CoreOptions.DEDUPLICATE_IGNORE_DELETE)); + return new DeduplicateMergeFunction(); } } } diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FirstRowMergeFunction.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FirstRowMergeFunction.java index f0551e0fa792..b955e0fea387 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FirstRowMergeFunction.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FirstRowMergeFunction.java @@ -18,11 +18,10 @@ package org.apache.paimon.mergetree.compact; -import org.apache.paimon.CoreOptions; import org.apache.paimon.KeyValue; import org.apache.paimon.data.serializer.InternalRowSerializer; -import org.apache.paimon.options.Options; import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.Preconditions; import javax.annotation.Nullable; @@ -36,12 +35,9 @@ public class FirstRowMergeFunction implements MergeFunction { private final InternalRowSerializer valueSerializer; private KeyValue first; - private final boolean ignoreDelete; - - protected FirstRowMergeFunction(RowType keyType, RowType valueType, boolean ignoreDelete) { + protected FirstRowMergeFunction(RowType keyType, RowType valueType) { this.keySerializer = new InternalRowSerializer(keyType); this.valueSerializer = new InternalRowSerializer(valueType); - this.ignoreDelete = ignoreDelete; } @Override @@ -51,15 +47,10 @@ public void reset() { @Override public void add(KeyValue kv) { - if (kv.valueKind().isRetract()) { - if (ignoreDelete) { - return; - } else { - throw new IllegalArgumentException( - "By default, First row merge engine can not accept DELETE/UPDATE_BEFORE records.\n" - + "You can config 'first-row.ignore-delete' to ignore the DELETE/UPDATE_BEFORE records."); - } - } + Preconditions.checkArgument( + kv.valueKind().isAdd(), + "By default, First row merge engine can not accept DELETE/UPDATE_BEFORE records.\n" + + "You can config 'ignore-delete' to ignore the DELETE/UPDATE_BEFORE records."); if (first == null) { this.first = kv.copy(keySerializer, valueSerializer); } @@ -71,9 +62,8 @@ public KeyValue getResult() { return first; } - public static MergeFunctionFactory factory( - RowType keyType, RowType valueType, Options options) { - return new FirstRowMergeFunction.Factory(keyType, valueType, options); + public static MergeFunctionFactory factory(RowType keyType, RowType valueType) { + return new FirstRowMergeFunction.Factory(keyType, valueType); } private static class Factory implements MergeFunctionFactory { @@ -82,18 +72,14 @@ private static class Factory implements MergeFunctionFactory { private final RowType keyType; private final RowType valueType; - private final Options options; - - public Factory(RowType keyType, RowType valueType, Options options) { + public Factory(RowType keyType, RowType valueType) { this.keyType = keyType; this.valueType = valueType; - this.options = options; } @Override public MergeFunction create(@Nullable int[][] projection) { - return new FirstRowMergeFunction( - keyType, valueType, options.get(CoreOptions.FIRST_ROW_IGNORE_DELETE)); + return new FirstRowMergeFunction(keyType, valueType); } } } diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java index dbf6dfd7531f..c1fc9293fd8f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java @@ -67,7 +67,6 @@ public class PartialUpdateMergeFunction implements MergeFunction { public static final String SEQUENCE_GROUP = "sequence-group"; private final InternalRow.FieldGetter[] getters; - private final boolean ignoreDelete; private final Map fieldSequences; private final boolean fieldSequenceEnabled; private final Map fieldAggregators; @@ -80,12 +79,10 @@ public class PartialUpdateMergeFunction implements MergeFunction { protected PartialUpdateMergeFunction( InternalRow.FieldGetter[] getters, - boolean ignoreDelete, Map fieldSequences, Map fieldAggregators, boolean fieldSequenceEnabled) { this.getters = getters; - this.ignoreDelete = ignoreDelete; this.fieldSequences = fieldSequences; this.fieldAggregators = fieldAggregators; this.fieldSequenceEnabled = fieldSequenceEnabled; @@ -104,12 +101,7 @@ public void add(KeyValue kv) { // refresh key object to avoid reference overwritten currentKey = kv.key(); - // ignore delete? if (kv.valueKind().isRetract()) { - if (ignoreDelete) { - return; - } - if (fieldSequenceEnabled) { retractWithSequenceGroup(kv); return; @@ -120,7 +112,7 @@ public void add(KeyValue kv) { "\n", "By default, Partial update can not accept delete records," + " you can choose one of the following solutions:", - "1. Configure 'partial-update.ignore-delete' to ignore delete records.", + "1. Configure 'ignore-delete' to ignore delete records.", "2. Configure 'sequence-group's to retract partial columns."); throw new IllegalArgumentException(msg); @@ -232,14 +224,12 @@ private static class Factory implements MergeFunctionFactory { private static final long serialVersionUID = 1L; - private final boolean ignoreDelete; private final List tableTypes; private final Map fieldSequences; private final Map fieldAggregators; private Factory(Options options, RowType rowType, List primaryKeys) { - this.ignoreDelete = options.get(CoreOptions.PARTIAL_UPDATE_IGNORE_DELETE); this.tableTypes = rowType.getFieldTypes(); List fieldNames = rowType.getFieldNames(); @@ -325,14 +315,12 @@ public MergeFunction create(@Nullable int[][] projection) { return new PartialUpdateMergeFunction( createFieldGetters(Projection.of(projection).project(tableTypes)), - ignoreDelete, projectedSequences, projectedAggregators, !fieldSequences.isEmpty()); } else { return new PartialUpdateMergeFunction( createFieldGetters(tableTypes), - ignoreDelete, fieldSequences, fieldAggregators, !fieldSequences.isEmpty()); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java index 4d91328f24c7..2eb41fdd50a7 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java @@ -147,7 +147,8 @@ record -> { "Append only writer can not accept row with RowKind %s", record.row().getRowKind()); return record.row(); - }); + }, + CoreOptions.fromMap(tableSchema.options()).ignoreDelete()); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java index fea783259241..f2ab323d272d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java @@ -200,7 +200,8 @@ record -> { ? row.getRowKind() : rowKindGenerator.generate(row); return kv.replace(record.primaryKey(), KeyValue.UNKNOWN_SEQUENCE, rowKind, row); - }); + }, + CoreOptions.fromMap(tableSchema.options()).ignoreDelete()); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyTableUtils.java b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyTableUtils.java index 112ab0890541..572e488c6b51 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyTableUtils.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyTableUtils.java @@ -64,7 +64,7 @@ public static MergeFunctionFactory createMergeFunctionFactory( switch (mergeEngine) { case DEDUPLICATE: - return DeduplicateMergeFunction.factory(conf); + return DeduplicateMergeFunction.factory(); case PARTIAL_UPDATE: return PartialUpdateMergeFunction.factory(conf, rowType, tableSchema.primaryKeys()); case AGGREGATE: @@ -75,7 +75,7 @@ public static MergeFunctionFactory createMergeFunctionFactory( tableSchema.primaryKeys()); case FIRST_ROW: return FirstRowMergeFunction.factory( - new RowType(extractor.keyFields(tableSchema)), rowType, conf); + new RowType(extractor.keyFields(tableSchema)), rowType); default: throw new UnsupportedOperationException("Unsupported merge engine: " + mergeEngine); } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java index b1cacc4c6c36..c03ef7e13071 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java @@ -32,6 +32,8 @@ import org.apache.paimon.table.BucketMode; import org.apache.paimon.utils.Restorable; +import javax.annotation.Nullable; + import java.util.List; import java.util.concurrent.ExecutorService; @@ -47,6 +49,7 @@ public class TableWriteImpl implements InnerTableWrite, Restorable write; private final KeyAndBucketExtractor keyAndBucketExtractor; private final RecordExtractor recordExtractor; + private final boolean ignoreDelete; private boolean batchCommitted = false; private BucketMode bucketMode; @@ -54,10 +57,12 @@ public class TableWriteImpl implements InnerTableWrite, Restorable write, KeyAndBucketExtractor keyAndBucketExtractor, - RecordExtractor recordExtractor) { + RecordExtractor recordExtractor, + boolean ignoreDelete) { this.write = write; this.keyAndBucketExtractor = keyAndBucketExtractor; this.recordExtractor = recordExtractor; + this.ignoreDelete = ignoreDelete; } @Override @@ -121,13 +126,21 @@ public void write(InternalRow row, int bucket) throws Exception { writeAndReturn(row, bucket); } + @Nullable public SinkRecord writeAndReturn(InternalRow row) throws Exception { + if (ignoreDelete && row.getRowKind().isRetract()) { + return null; + } SinkRecord record = toSinkRecord(row); write.write(record.partition(), record.bucket(), recordExtractor.extract(record)); return record; } + @Nullable public SinkRecord writeAndReturn(InternalRow row, int bucket) throws Exception { + if (ignoreDelete && row.getRowKind().isRetract()) { + return null; + } SinkRecord record = toSinkRecord(row, bucket); write.write(record.partition(), bucket, recordExtractor.extract(record)); return record; diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/SortBufferWriteBufferTestBase.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/SortBufferWriteBufferTestBase.java index b71971ab6c1f..89b217b5a5aa 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/SortBufferWriteBufferTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/SortBufferWriteBufferTestBase.java @@ -18,7 +18,6 @@ package org.apache.paimon.mergetree; -import org.apache.paimon.CoreOptions; import org.apache.paimon.KeyValue; import org.apache.paimon.codegen.RecordComparator; import org.apache.paimon.memory.HeapMemorySegmentPool; @@ -179,7 +178,6 @@ protected List getExpected(List input) { @Override protected MergeFunction createMergeFunction() { Options options = new Options(); - options.set(CoreOptions.PARTIAL_UPDATE_IGNORE_DELETE, true); return PartialUpdateMergeFunction.factory( options, RowType.of(DataTypes.BIGINT()), ImmutableList.of("key")) .create(); @@ -260,8 +258,7 @@ protected List getExpected(List input) { protected MergeFunction createMergeFunction() { return FirstRowMergeFunction.factory( new RowType(Lists.list(new DataField(0, "f0", new IntType()))), - new RowType(Lists.list(new DataField(1, "f1", new BigIntType()))), - new Options()) + new RowType(Lists.list(new DataField(1, "f1", new BigIntType())))) .create(); } } diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java index 4c377af17617..96aa57684d78 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java @@ -18,7 +18,6 @@ package org.apache.paimon.mergetree.compact; -import org.apache.paimon.CoreOptions; import org.apache.paimon.KeyValue; import org.apache.paimon.codegen.RecordEqualiser; import org.apache.paimon.data.InternalRow; @@ -27,7 +26,6 @@ import org.apache.paimon.mergetree.compact.aggregate.AggregateMergeFunction; import org.apache.paimon.mergetree.compact.aggregate.FieldAggregator; import org.apache.paimon.mergetree.compact.aggregate.FieldSumAgg; -import org.apache.paimon.options.Options; import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.IntType; @@ -38,7 +36,6 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -308,8 +305,7 @@ public void testFirstRow() { new RowType( Lists.list(new DataField(0, "f0", new IntType()))), new RowType( - Lists.list(new DataField(1, "f1", new IntType()))), - false), + Lists.list(new DataField(1, "f1", new IntType())))), highLevel::contains); // Without level-0 @@ -373,30 +369,4 @@ public void testFirstRow() { kv = result.result(); assertThat(kv).isNull(); } - - @Test - public void testPartialUpdateIgnoreDelete() { - Options options = new Options(); - options.set(CoreOptions.PARTIAL_UPDATE_IGNORE_DELETE, true); - LookupChangelogMergeFunctionWrapper function = - new LookupChangelogMergeFunctionWrapper( - LookupMergeFunction.wrap( - PartialUpdateMergeFunction.factory( - options, - DataTypes.ROW(DataTypes.INT()), - Collections.singletonList("f0")), - RowType.of(DataTypes.INT()), - RowType.of(DataTypes.INT())), - key -> null, - EQUALISER, - false, - LookupStrategy.CHANGELOG_ONLY, - null); - - function.reset(); - function.add(new KeyValue().replace(row(1), 1, DELETE, row(1)).setLevel(2)); - ChangelogResult result = function.getResult(); - assertThat(result).isNotNull(); - assertThat(result.result()).isNull(); - } } diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/SortMergeReaderTestBase.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/SortMergeReaderTestBase.java index 81e49b81d2a6..3ae4615328a4 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/SortMergeReaderTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/SortMergeReaderTestBase.java @@ -130,7 +130,7 @@ protected MergeFunction createMergeFunction() { RowType keyType = new RowType(Lists.list(new DataField(0, "f0", new IntType()))); RowType valueType = new RowType(Lists.list(new DataField(1, "f1", new BigIntType()))); return new LookupMergeFunction( - new FirstRowMergeFunction(keyType, valueType, false), keyType, valueType); + new FirstRowMergeFunction(keyType, valueType), keyType, valueType); } } } diff --git a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java index c245a2ce7220..db6e12305a44 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java @@ -747,7 +747,7 @@ public void testPartialUpdateIgnoreDelete() throws Exception { conf.set( CoreOptions.MERGE_ENGINE, CoreOptions.MergeEngine.PARTIAL_UPDATE); - conf.set(CoreOptions.PARTIAL_UPDATE_IGNORE_DELETE, true); + conf.set(CoreOptions.IGNORE_DELETE, true); }); StreamTableWrite write = table.newWrite(commitUser); StreamTableCommit commit = table.newCommit(commitUser); diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java index 40743fae67fb..9ad54f04d1bd 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java @@ -995,7 +995,7 @@ public void testCDCOperations(boolean ignoreDelete) throws Exception { kafkaConfig.put(TOPIC.key(), topic); Map tableConfig = getBasicTableConfig(); - tableConfig.put(CoreOptions.DEDUPLICATE_IGNORE_DELETE.key(), String.valueOf(ignoreDelete)); + tableConfig.put(CoreOptions.IGNORE_DELETE.key(), String.valueOf(ignoreDelete)); KafkaSyncTableAction action = syncTableActionBuilder(kafkaConfig).withTableConfig(tableConfig).build(); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DeleteAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DeleteAction.java index 19f77fe913c1..2fdb658ad289 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DeleteAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DeleteAction.java @@ -46,7 +46,6 @@ public DeleteAction( String filter, Map catalogConfig) { super(warehouse, databaseName, tableName, catalogConfig); - changeIgnoreMergeEngine(); this.filter = filter; } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeIntoAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeIntoAction.java index 687ea5869525..41f8b7677b8c 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeIntoAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeIntoAction.java @@ -18,6 +18,7 @@ package org.apache.paimon.flink.action; +import org.apache.paimon.CoreOptions; import org.apache.paimon.flink.LogicalTypeConversion; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.types.DataField; @@ -38,6 +39,7 @@ import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -159,6 +161,24 @@ public MergeIntoAction( .collect(Collectors.toList()); } + /** + * The {@link CoreOptions.MergeEngine}s will process -U/-D records in different ways, but we + * want these records to be sunk directly. This method is a workaround which disables merge + * engine settings and force compaction. + */ + private void changeIgnoreMergeEngine() { + if (CoreOptions.fromMap(table.options()).mergeEngine() + != CoreOptions.MergeEngine.DEDUPLICATE) { + Map dynamicOptions = new HashMap<>(); + dynamicOptions.put( + CoreOptions.MERGE_ENGINE.key(), CoreOptions.MergeEngine.DEDUPLICATE.toString()); + dynamicOptions.put(CoreOptions.IGNORE_DELETE.key(), "false"); + // force compaction + dynamicOptions.put(CoreOptions.FULL_COMPACTION_DELTA_COMMITS.key(), "1"); + table = ((FileStoreTable) table).internalCopyWithoutCheck(dynamicOptions); + } + } + public MergeIntoAction withTargetAlias(String targetAlias) { this.targetAlias = targetAlias; return this; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/TableActionBase.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/TableActionBase.java index a97335cd2763..f10f3d625dcd 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/TableActionBase.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/TableActionBase.java @@ -18,13 +18,11 @@ package org.apache.paimon.flink.action; -import org.apache.paimon.CoreOptions; import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.flink.sink.FlinkSinkBuilder; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.Table; -import org.apache.paimon.utils.Preconditions; import org.apache.flink.api.dag.Transformation; import org.apache.flink.streaming.api.datastream.DataStream; @@ -35,7 +33,6 @@ import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; @@ -99,23 +96,4 @@ private TableResult executeInternal( e); } } - - /** - * The {@link CoreOptions.MergeEngine}s will process -U/-D records in different ways, but we - * want these records to be sunk directly. This method is a workaround. Actions that may produce - * -U/-D records can call this to disable merge engine settings and force compaction. - */ - protected void changeIgnoreMergeEngine() { - if (CoreOptions.fromMap(table.options()).mergeEngine() - != CoreOptions.MergeEngine.DEDUPLICATE) { - Map dynamicOptions = new HashMap<>(); - dynamicOptions.put( - CoreOptions.MERGE_ENGINE.key(), CoreOptions.MergeEngine.DEDUPLICATE.toString()); - // force compaction - dynamicOptions.put(CoreOptions.FULL_COMPACTION_DELTA_COMMITS.key(), "1"); - Preconditions.checkArgument( - table instanceof FileStoreTable, "Only supports FileStoreTable."); - table = ((FileStoreTable) table).internalCopyWithoutCheck(dynamicOptions); - } - } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/GlobalFullCompactionSinkWrite.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/GlobalFullCompactionSinkWrite.java index 3fb6fe6ae775..62341a180dab 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/GlobalFullCompactionSinkWrite.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/GlobalFullCompactionSinkWrite.java @@ -108,16 +108,22 @@ public GlobalFullCompactionSinkWrite( } @Override + @Nullable public SinkRecord write(InternalRow rowData) throws Exception { SinkRecord sinkRecord = super.write(rowData); - touchBucket(sinkRecord.partition(), sinkRecord.bucket()); + if (sinkRecord != null) { + touchBucket(sinkRecord.partition(), sinkRecord.bucket()); + } return sinkRecord; } @Override + @Nullable public SinkRecord write(InternalRow rowData, int bucket) throws Exception { SinkRecord sinkRecord = super.write(rowData, bucket); - touchBucket(sinkRecord.partition(), bucket); + if (sinkRecord != null) { + touchBucket(sinkRecord.partition(), bucket); + } return sinkRecord; } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataStoreWriteOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataStoreWriteOperator.java index b61fecab5d34..07fe275543a1 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataStoreWriteOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataStoreWriteOperator.java @@ -130,7 +130,7 @@ record = write.write(element.getValue()); throw new IOException(e); } - if (logSinkFunction != null) { + if (record != null && logSinkFunction != null) { // write to log store, need to preserve original pk (which includes partition fields) SinkRecord logRecord = write.toLogRecord(record); logSinkFunction.invoke(logRecord, sinkContext); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java index 7278e1f211cc..6001721b71f3 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java @@ -39,8 +39,10 @@ /** Helper class of {@link PrepareCommitOperator} for different types of paimon sinks. */ public interface StoreSinkWrite { + @Nullable SinkRecord write(InternalRow rowData) throws Exception; + @Nullable SinkRecord write(InternalRow rowData, int bucket) throws Exception; SinkRecord toLogRecord(SinkRecord record); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java index c70f6038ec96..3ecc80bb6f13 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java @@ -170,11 +170,13 @@ public void withCompactExecutor(ExecutorService compactExecutor) { } @Override + @Nullable public SinkRecord write(InternalRow rowData) throws Exception { return write.writeAndReturn(rowData); } @Override + @Nullable public SinkRecord write(InternalRow rowData, int bucket) throws Exception { return write.writeAndReturn(rowData, bucket); } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java index 9829e1040526..7c8ffb6d630d 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java @@ -372,20 +372,19 @@ public void testRowKindField() { } @Test - public void testIgnoreDelete() throws Exception { + public void testIgnoreDelete() { sql( "CREATE TABLE ignore_delete (pk INT PRIMARY KEY NOT ENFORCED, v STRING) " - + "WITH ('deduplicate.ignore-delete' = 'true', 'bucket' = '1')"); - BlockingIterator iterator = streamSqlBlockIter("SELECT * FROM ignore_delete"); + + "WITH ('merge-engine' = 'deduplicate', 'ignore-delete' = 'true', 'bucket' = '1')"); + + sql("INSERT INTO ignore_delete VALUES (1, 'A')"); + assertThat(sql("SELECT * FROM ignore_delete")).containsExactly(Row.of(1, "A")); - sql("INSERT INTO ignore_delete VALUES (1, 'A'), (2, 'B')"); sql("DELETE FROM ignore_delete WHERE pk = 1"); - sql("INSERT INTO ignore_delete VALUES (1, 'B')"); + assertThat(sql("SELECT * FROM ignore_delete")).containsExactly(Row.of(1, "A")); - assertThat(iterator.collect(2)) - .containsExactlyInAnyOrder( - Row.ofKind(RowKind.INSERT, 1, "B"), Row.ofKind(RowKind.INSERT, 2, "B")); - iterator.close(); + sql("INSERT INTO ignore_delete VALUES (1, 'B')"); + assertThat(sql("SELECT * FROM ignore_delete")).containsExactly(Row.of(1, "B")); } @Test diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java index 66a154ba4448..1fbed646ecf1 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java @@ -29,6 +29,7 @@ import org.apache.flink.table.api.StatementSet; import org.apache.flink.table.api.ValidationException; import org.apache.flink.types.Row; +import org.apache.flink.types.RowKind; import org.apache.flink.util.CloseableIterator; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; @@ -549,19 +550,20 @@ public void testDynamicPartitionPruningNotWork() throws Exception { } @Test - public void testIgnoreDelete() { + public void testIgnoreDelete() throws Exception { sql( "CREATE TABLE ignore_delete (pk INT PRIMARY KEY NOT ENFORCED, v STRING) " - + "WITH ('deduplicate.ignore-delete' = 'true', 'bucket' = '1')"); - - sql("INSERT INTO ignore_delete VALUES (1, 'A')"); - assertThat(sql("SELECT * FROM ignore_delete")).containsExactly(Row.of(1, "A")); + + "WITH ('merge-engine' = 'deduplicate', 'ignore-delete' = 'true', 'bucket' = '1')"); + BlockingIterator iterator = streamSqlBlockIter("SELECT * FROM ignore_delete"); + sql("INSERT INTO ignore_delete VALUES (1, 'A'), (2, 'B')"); sql("DELETE FROM ignore_delete WHERE pk = 1"); - assertThat(sql("SELECT * FROM ignore_delete")).containsExactly(Row.of(1, "A")); - sql("INSERT INTO ignore_delete VALUES (1, 'B')"); - assertThat(sql("SELECT * FROM ignore_delete")).containsExactly(Row.of(1, "B")); + + assertThat(iterator.collect(2)) + .containsExactlyInAnyOrder( + Row.ofKind(RowKind.INSERT, 1, "B"), Row.ofKind(RowKind.INSERT, 2, "B")); + iterator.close(); } @Test diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FirstRowITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FirstRowITCase.java index 5b9909d9ce7c..5d7927a97956 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FirstRowITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FirstRowITCase.java @@ -110,7 +110,7 @@ public void testIgnoreDelete() { sql( "CREATE TABLE IF NOT EXISTS T1 (" + "a INT, b INT, c STRING, PRIMARY KEY (a) NOT ENFORCED)" - + " WITH ('merge-engine'='first-row', 'first-row.ignore-delete' = 'true'," + + " WITH ('merge-engine'='first-row', 'ignore-delete' = 'true'," + " 'changelog-producer' = 'lookup');"); List input = diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java index f4cf0b67f24b..23a574fb54c2 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java @@ -52,7 +52,7 @@ protected List ddl() { + " WITH ('merge-engine'='partial-update');", "CREATE TABLE IF NOT EXISTS dwd_orders (" + "OrderID INT, OrderNumber INT, PersonID INT, LastName STRING, FirstName STRING, Age INT, PRIMARY KEY (OrderID) NOT ENFORCED)" - + " WITH ('merge-engine'='partial-update', 'partial-update.ignore-delete'='true');", + + " WITH ('merge-engine'='partial-update', 'ignore-delete'='true');", "CREATE TABLE IF NOT EXISTS ods_orders (OrderID INT, OrderNumber INT, PersonID INT, PRIMARY KEY (OrderID) NOT ENFORCED) WITH ('changelog-producer'='input', 'continuous.discovery-interval'='1s');", "CREATE TABLE IF NOT EXISTS dim_persons (PersonID INT, LastName STRING, FirstName STRING, Age INT, PRIMARY KEY (PersonID) NOT ENFORCED) WITH ('changelog-producer'='input', 'continuous.discovery-interval'='1s');"); } @@ -408,4 +408,30 @@ public void testPartialUpdateProjectionPushDownWithDeleteMessage() throws Except insert1.close(); insert2.close(); } + + @Test + public void testIgnoreDelete() throws Exception { + sql( + "CREATE TABLE ignore_delete (pk INT PRIMARY KEY NOT ENFORCED, a INT, g INT) WITH (" + + " 'merge-engine' = 'partial-update'," + + " 'ignore-delete' = 'true'," + + " 'fields.a.aggregate-function' = 'sum'," + + " 'fields.g.sequence-group'='a')"); + + String id = + TestValuesTableFactory.registerData( + Arrays.asList( + Row.ofKind(RowKind.INSERT, 1, 10, 1), + Row.ofKind(RowKind.DELETE, 1, 10, 2), + Row.ofKind(RowKind.INSERT, 1, 20, 3))); + streamSqlIter( + "CREATE TEMPORARY TABLE input (pk INT PRIMARY KEY NOT ENFORCED, a INT, g INT) " + + "WITH ('connector'='values', 'bounded'='true', 'data-id'='%s', " + + "'changelog-mode' = 'I,D')", + id) + .close(); + sEnv.executeSql("INSERT INTO ignore_delete SELECT * FROM input").await(); + + assertThat(sql("SELECT * FROM ignore_delete")).containsExactlyInAnyOrder(Row.of(1, 30, 3)); + } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DeleteActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DeleteActionITCase.java index da1cf763abb5..c67228c25937 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DeleteActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DeleteActionITCase.java @@ -18,7 +18,6 @@ package org.apache.paimon.flink.action; -import org.apache.paimon.CoreOptions; import org.apache.paimon.Snapshot; import org.apache.paimon.data.BinaryString; import org.apache.paimon.table.FileStoreTable; @@ -38,13 +37,10 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.ThreadLocalRandom; import static org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow; import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.buildSimpleQuery; import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.init; -import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.insertInto; -import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.testBatchRead; import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.testStreamingRead; import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.validateStreamingReadResult; import static org.assertj.core.api.Assertions.assertThat; @@ -99,76 +95,6 @@ public void testDeleteAction() throws Exception { iterator.close(); } - @Test - public void testWorkWithPartialUpdateTable() throws Exception { - createFileStoreTable( - RowType.of( - new DataType[] {DataTypes.INT(), DataTypes.STRING(), DataTypes.STRING()}, - new String[] {"k", "a", "b"}), - Collections.emptyList(), - Collections.singletonList("k"), - new HashMap() { - { - put( - CoreOptions.MERGE_ENGINE.key(), - CoreOptions.MergeEngine.PARTIAL_UPDATE.toString()); - put(CoreOptions.PARTIAL_UPDATE_IGNORE_DELETE.key(), "true"); - put( - CoreOptions.CHANGELOG_PRODUCER.key(), - ThreadLocalRandom.current().nextBoolean() - ? CoreOptions.ChangelogProducer.LOOKUP.toString() - : CoreOptions.ChangelogProducer.FULL_COMPACTION.toString()); - } - }); - - DeleteAction action = - createAction( - DeleteAction.class, - "delete", - "--warehouse", - warehouse, - "--database", - database, - "--table", - tableName, - "--where", - "k<3"); - - insertInto( - tableName, "(1, 'Say', 'A'), (2, 'Hi', 'B'), (3, 'To', 'C'), (4, 'Paimon', 'D')"); - - BlockingIterator streamItr = - testStreamingRead( - buildSimpleQuery(tableName), - Arrays.asList( - changelogRow("+I", 1, "Say", "A"), - changelogRow("+I", 2, "Hi", "B"), - changelogRow("+I", 3, "To", "C"), - changelogRow("+I", 4, "Paimon", "D"))); - - action.run(); - - // test delete records hasn't been thrown - validateStreamingReadResult( - streamItr, - Arrays.asList(changelogRow("-D", 1, "Say", "A"), changelogRow("-D", 2, "Hi", "B"))); - - // test partial update still works after action - insertInto( - tableName, "(4, CAST (NULL AS STRING), '$')", "(4, 'Test', CAST (NULL AS STRING))"); - - validateStreamingReadResult( - streamItr, - Arrays.asList( - changelogRow("-U", 4, "Paimon", "D"), changelogRow("+U", 4, "Test", "$"))); - streamItr.close(); - - testBatchRead( - buildSimpleQuery(tableName), - Arrays.asList( - changelogRow("+I", 3, "To", "C"), changelogRow("+I", 4, "Test", "$"))); - } - private void prepareTable() throws Exception { Map options = new HashMap<>(); FileStoreTable table = diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java index a1ca5eb55996..3e27f0c0c5df 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java @@ -557,7 +557,7 @@ private void prepareTargetTable(CoreOptions.ChangelogProducer producer) throws E put( CoreOptions.MERGE_ENGINE.key(), CoreOptions.MergeEngine.PARTIAL_UPDATE.toString()); - put(CoreOptions.PARTIAL_UPDATE_IGNORE_DELETE.key(), "true"); + put(CoreOptions.IGNORE_DELETE.key(), "true"); } } }));