diff --git a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileRecordReader.java b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileRecordReader.java index e44ad79ff53e..517720508685 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileRecordReader.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileRecordReader.java @@ -34,12 +34,18 @@ public class KeyValueDataFileRecordReader implements RecordReader { private final RecordReader reader; private final KeyValueSerializer serializer; private final int level; + private final boolean ignoreDelete; public KeyValueDataFileRecordReader( - RecordReader reader, RowType keyType, RowType valueType, int level) { + RecordReader reader, + RowType keyType, + RowType valueType, + int level, + boolean ignoreDelete) { this.reader = reader; this.serializer = new KeyValueSerializer(keyType, valueType); this.level = level; + this.ignoreDelete = ignoreDelete; } @Nullable @@ -50,11 +56,15 @@ public RecordIterator readBatch() throws IOException { return null; } - return iterator.transform( - internalRow -> - internalRow == null - ? null - : serializer.fromRow(internalRow).setLevel(level)); + RecordIterator transformed = + iterator.transform( + internalRow -> + internalRow == null + ? null + : serializer.fromRow(internalRow).setLevel(level)); + // In 0.7- versions, the delete records might be written into data file even when + // ignore-delete configured, so the reader should also filter the delete records + return ignoreDelete ? transformed.filter(KeyValue::isAdd) : transformed; } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java index 27ddd1ff677a..e7d091b47438 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java @@ -68,6 +68,7 @@ public class KeyValueFileReaderFactory { private final Map bulkFormatMappings; private final BinaryRow partition; private final DeletionVector.Factory dvFactory; + private final boolean ignoreDelete; private KeyValueFileReaderFactory( FileIO fileIO, @@ -91,6 +92,7 @@ private KeyValueFileReaderFactory( this.partition = partition; this.bulkFormatMappings = new HashMap<>(); this.dvFactory = dvFactory; + this.ignoreDelete = CoreOptions.fromMap(schema.options()).ignoreDelete(); } public RecordReader createRecordReader( @@ -144,7 +146,8 @@ private RecordReader createRecordReader( new ApplyDeletionVectorReader<>(fileRecordReader, deletionVector.get()); } - return new KeyValueDataFileRecordReader(fileRecordReader, keyType, valueType, level); + return new KeyValueDataFileRecordReader( + fileRecordReader, keyType, valueType, level, ignoreDelete); } public static Builder builder( diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java index 5b97b82600f3..99ad605df8ef 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java @@ -189,11 +189,6 @@ public FileStoreTable copyWithoutTimeTravel(Map dynamicOptions) return copyInternal(dynamicOptions, false); } - @Override - public FileStoreTable internalCopyWithoutCheck(Map dynamicOptions) { - return copyInternal(dynamicOptions, true); - } - private void checkImmutability(Map dynamicOptions) { Map options = tableSchema.options(); // check option is not immutable diff --git a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java index a96a6138d7f6..f68e2e19de14 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java @@ -86,9 +86,6 @@ default Optional comment() { /** Doesn't change table schema even when there exists time travel scan options. */ FileStoreTable copyWithoutTimeTravel(Map dynamicOptions); - /** Sometimes we have to change some Immutable options to implement features. */ - FileStoreTable internalCopyWithoutCheck(Map dynamicOptions); - /** TODO: this method is weird, old options will overwrite new options. */ FileStoreTable copyWithLatestSchema(); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeIntoAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeIntoAction.java index 41f8b7677b8c..4ec5e6ef5e9e 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeIntoAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeIntoAction.java @@ -18,7 +18,6 @@ package org.apache.paimon.flink.action; -import org.apache.paimon.CoreOptions; import org.apache.paimon.flink.LogicalTypeConversion; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.types.DataField; @@ -39,7 +38,6 @@ import java.util.Arrays; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -137,8 +135,6 @@ public MergeIntoAction( table.getClass().getName())); } - changeIgnoreMergeEngine(); - // init primaryKeys of target table primaryKeys = ((FileStoreTable) table).schema().primaryKeys(); if (primaryKeys.isEmpty()) { @@ -161,24 +157,6 @@ public MergeIntoAction( .collect(Collectors.toList()); } - /** - * The {@link CoreOptions.MergeEngine}s will process -U/-D records in different ways, but we - * want these records to be sunk directly. This method is a workaround which disables merge - * engine settings and force compaction. - */ - private void changeIgnoreMergeEngine() { - if (CoreOptions.fromMap(table.options()).mergeEngine() - != CoreOptions.MergeEngine.DEDUPLICATE) { - Map dynamicOptions = new HashMap<>(); - dynamicOptions.put( - CoreOptions.MERGE_ENGINE.key(), CoreOptions.MergeEngine.DEDUPLICATE.toString()); - dynamicOptions.put(CoreOptions.IGNORE_DELETE.key(), "false"); - // force compaction - dynamicOptions.put(CoreOptions.FULL_COMPACTION_DELTA_COMMITS.key(), "1"); - table = ((FileStoreTable) table).internalCopyWithoutCheck(dynamicOptions); - } - } - public MergeIntoAction withTargetAlias(String targetAlias) { this.targetAlias = targetAlias; return this; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java index 0a5eceb49215..a09372443c8d 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java @@ -57,7 +57,8 @@ public class LocalMergeOperator extends AbstractStreamOperator private static final long serialVersionUID = 1L; - TableSchema schema; + private final TableSchema schema; + private final boolean ignoreDelete; private transient Projection keyProjection; private transient RecordComparator keyComparator; @@ -76,6 +77,7 @@ public LocalMergeOperator(TableSchema schema) { schema.primaryKeys().size() > 0, "LocalMergeOperator currently only support tables with primary keys"); this.schema = schema; + this.ignoreDelete = CoreOptions.fromMap(schema.options()).ignoreDelete(); setChainingStrategy(ChainingStrategy.ALWAYS); } @@ -137,6 +139,10 @@ public void processElement(StreamRecord record) throws Exception { RowKind rowKind = rowKindGenerator == null ? row.getRowKind() : rowKindGenerator.generate(row); + if (ignoreDelete && rowKind.isRetract()) { + return; + } + // row kind must be INSERT when it is divided into key and value row.setRowKind(RowKind.INSERT); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java index 23a574fb54c2..5dd6a732278e 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java @@ -18,6 +18,14 @@ package org.apache.paimon.flink; +import org.apache.paimon.CoreOptions; +import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.local.LocalFileIO; +import org.apache.paimon.schema.Schema; +import org.apache.paimon.schema.SchemaManager; +import org.apache.paimon.schema.SchemaUtils; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DataTypes; import org.apache.paimon.utils.BlockingIterator; import org.apache.flink.configuration.RestartStrategyOptions; @@ -29,10 +37,15 @@ import org.assertj.core.api.Assertions; import org.awaitility.Awaitility; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; @@ -409,29 +422,68 @@ public void testPartialUpdateProjectionPushDownWithDeleteMessage() throws Except insert2.close(); } - @Test - public void testIgnoreDelete() throws Exception { + @ParameterizedTest(name = "localMergeEnabled = {0}") + @ValueSource(booleans = {true, false}) + public void testIgnoreDelete(boolean localMerge) throws Exception { sql( - "CREATE TABLE ignore_delete (pk INT PRIMARY KEY NOT ENFORCED, a INT, g INT) WITH (" + "CREATE TABLE ignore_delete (pk INT PRIMARY KEY NOT ENFORCED, a STRING, b STRING) WITH (" + " 'merge-engine' = 'partial-update'," - + " 'ignore-delete' = 'true'," - + " 'fields.a.aggregate-function' = 'sum'," - + " 'fields.g.sequence-group'='a')"); + + " 'ignore-delete' = 'true'" + + ")"); + if (localMerge) { + sql("ALTER TABLE ignore_delete SET ('local-merge-buffer-size' = '256 kb')"); + } String id = TestValuesTableFactory.registerData( Arrays.asList( - Row.ofKind(RowKind.INSERT, 1, 10, 1), - Row.ofKind(RowKind.DELETE, 1, 10, 2), - Row.ofKind(RowKind.INSERT, 1, 20, 3))); + Row.ofKind(RowKind.INSERT, 1, null, "apple"), + Row.ofKind(RowKind.DELETE, 1, null, "apple"), + Row.ofKind(RowKind.INSERT, 1, "A", null))); streamSqlIter( - "CREATE TEMPORARY TABLE input (pk INT PRIMARY KEY NOT ENFORCED, a INT, g INT) " + "CREATE TEMPORARY TABLE input (pk INT PRIMARY KEY NOT ENFORCED, a STRING, b STRING) " + "WITH ('connector'='values', 'bounded'='true', 'data-id'='%s', " + "'changelog-mode' = 'I,D')", id) .close(); sEnv.executeSql("INSERT INTO ignore_delete SELECT * FROM input").await(); - assertThat(sql("SELECT * FROM ignore_delete")).containsExactlyInAnyOrder(Row.of(1, 30, 3)); + assertThat(sql("SELECT * FROM ignore_delete")) + .containsExactlyInAnyOrder(Row.of(1, "A", "apple")); + } + + @Test + public void testIgnoreDeleteInReader() throws Exception { + sql( + "CREATE TABLE ignore_delete (pk INT PRIMARY KEY NOT ENFORCED, a STRING, b STRING) WITH (" + + " 'merge-engine' = 'deduplicate'," + + " 'write-only' = 'true'," + + " 'bucket' = '1')"); + sql("INSERT INTO ignore_delete VALUES (1, CAST (NULL AS STRING), 'apple')"); + // write delete records + sql("DELETE FROM ignore_delete WHERE pk = 1"); + sql("INSERT INTO ignore_delete VALUES (1, 'A', CAST (NULL AS STRING))"); + assertThat(sql("SELECT * FROM ignore_delete")) + .containsExactlyInAnyOrder(Row.of(1, "A", null)); + + // force altering merge engine and read + Map newOptions = new HashMap<>(); + newOptions.put( + CoreOptions.MERGE_ENGINE.key(), CoreOptions.MergeEngine.PARTIAL_UPDATE.toString()); + newOptions.put(CoreOptions.BUCKET.key(), "1"); + newOptions.put(CoreOptions.IGNORE_DELETE.key(), "true"); + SchemaUtils.forceCommit( + new SchemaManager(LocalFileIO.create(), new Path(path, "default.db/ignore_delete")), + new Schema( + Arrays.asList( + new DataField(0, "pk", DataTypes.INT().notNull()), + new DataField(1, "a", DataTypes.STRING()), + new DataField(2, "b", DataTypes.STRING())), + Collections.emptyList(), + Collections.singletonList("pk"), + newOptions, + null)); + assertThat(sql("SELECT * FROM ignore_delete")) + .containsExactlyInAnyOrder(Row.of(1, "A", "apple")); } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java index 3e27f0c0c5df..1926299baf86 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java @@ -130,29 +130,59 @@ public void testVariousChangelogProducer( changelogRow("+I", 8, "v_8", "insert", "02-29"), changelogRow("+I", 11, "v_11", "insert", "02-29"), changelogRow("+I", 12, "v_12", "insert", "02-29"))); - - if (producer == CoreOptions.ChangelogProducer.FULL_COMPACTION) { - // test partial update still works after action - testWorkWithPartialUpdate(); - } } - private void testWorkWithPartialUpdate() throws Exception { - insertInto( - "T", - "(12, CAST (NULL AS STRING), '$', '02-29')", - "(12, 'Test', CAST (NULL AS STRING), '02-29')"); + @Test + public void testWorkWithPartialUpdate() throws Exception { + // re-create target table with given producer + sEnv.executeSql("DROP TABLE T"); + prepareTargetTable(CoreOptions.ChangelogProducer.LOOKUP); - testBatchRead( - buildSimpleQuery("T"), + MergeIntoActionBuilder action = new MergeIntoActionBuilder(warehouse, database, "T"); + action.withSourceTable("S") + .withMergeCondition("T.k = S.k AND T.dt = S.dt") + .withMatchedUpsert( + "T.v <> S.v AND S.v IS NOT NULL", "v = S.v, last_action = 'matched_upsert'") + .withMatchedDelete("S.v IS NULL") + .withNotMatchedInsert(null, "S.k, S.v, 'insert', S.dt") + .withNotMatchedBySourceUpsert( + "dt < '02-28'", "v = v || '_nmu', last_action = 'not_matched_upsert'") + .withNotMatchedBySourceDelete("dt >= '02-28'"); + + // delete records are filtered + validateActionRunResult( + action.build(), Arrays.asList( - changelogRow("+I", 1, "v_1", "creation", "02-27"), changelogRow("+U", 2, "v_2_nmu", "not_matched_upsert", "02-27"), changelogRow("+U", 3, "v_3_nmu", "not_matched_upsert", "02-27"), changelogRow("+U", 7, "Seven", "matched_upsert", "02-28"), changelogRow("+I", 8, "v_8", "insert", "02-29"), changelogRow("+I", 11, "v_11", "insert", "02-29"), - changelogRow("+I", 12, "Test", "$", "02-29"))); + changelogRow("+I", 12, "v_12", "insert", "02-29")), + Arrays.asList( + changelogRow("+I", 1, "v_1", "creation", "02-27"), + changelogRow("+I", 2, "v_2_nmu", "not_matched_upsert", "02-27"), + changelogRow("+I", 3, "v_3_nmu", "not_matched_upsert", "02-27"), + changelogRow("+I", 4, "v_4", "creation", "02-27"), + changelogRow("+I", 5, "v_5", "creation", "02-28"), + changelogRow("+I", 6, "v_6", "creation", "02-28"), + changelogRow("+I", 7, "Seven", "matched_upsert", "02-28"), + changelogRow("+I", 8, "v_8", "creation", "02-28"), + changelogRow("+I", 8, "v_8", "insert", "02-29"), + changelogRow("+I", 9, "v_9", "creation", "02-28"), + changelogRow("+I", 10, "v_10", "creation", "02-28"), + changelogRow("+I", 11, "v_11", "insert", "02-29"), + changelogRow("+I", 12, "v_12", "insert", "02-29"))); + + // test partial update still works after action + insertInto( + "T", + "(12, CAST (NULL AS STRING), '$', '02-29')", + "(12, 'Test', CAST (NULL AS STRING), '02-29')"); + + testBatchRead( + "SELECT * FROM T WHERE k = 12", + Collections.singletonList(changelogRow("+I", 12, "Test", "$", "02-29"))); } @ParameterizedTest(name = "in-default = {0}") @@ -553,7 +583,7 @@ private void prepareTargetTable(CoreOptions.ChangelogProducer producer) throws E { put(CHANGELOG_PRODUCER.key(), producer.toString()); // test works with partial update normally - if (producer == CoreOptions.ChangelogProducer.FULL_COMPACTION) { + if (producer == CoreOptions.ChangelogProducer.LOOKUP) { put( CoreOptions.MERGE_ENGINE.key(), CoreOptions.MergeEngine.PARTIAL_UPDATE.toString());