From 2f3fd86e1041191fda81122143d638ac49aad2a9 Mon Sep 17 00:00:00 2001 From: yuzelin <747884505@qq.com> Date: Tue, 2 Apr 2024 13:52:10 +0800 Subject: [PATCH] [core] Fix that ignore-delete option is not compatible with old delete records --- .../io/KeyValueDataFileRecordReader.java | 22 +++++-- .../paimon/io/KeyValueFileReaderFactory.java | 7 ++- .../paimon/flink/action/MergeIntoAction.java | 22 ------- .../paimon/flink/PartialUpdateITCase.java | 46 ++++++++++++++ .../flink/action/MergeIntoActionITCase.java | 61 ++++++++++++++----- 5 files changed, 114 insertions(+), 44 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileRecordReader.java b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileRecordReader.java index e44ad79ff53ee..3caeec43895d6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileRecordReader.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileRecordReader.java @@ -34,12 +34,18 @@ public class KeyValueDataFileRecordReader implements RecordReader { private final RecordReader reader; private final KeyValueSerializer serializer; private final int level; + private final boolean ignoreDelete; public KeyValueDataFileRecordReader( - RecordReader reader, RowType keyType, RowType valueType, int level) { + RecordReader reader, + RowType keyType, + RowType valueType, + int level, + boolean ignoreDelete) { this.reader = reader; this.serializer = new KeyValueSerializer(keyType, valueType); this.level = level; + this.ignoreDelete = ignoreDelete; } @Nullable @@ -50,11 +56,15 @@ public RecordIterator readBatch() throws IOException { return null; } - return iterator.transform( - internalRow -> - internalRow == null - ? null - : serializer.fromRow(internalRow).setLevel(level)); + RecordIterator transformed = + iterator.transform( + internalRow -> + internalRow == null + ? null + : serializer.fromRow(internalRow).setLevel(level)); + // In older version, the delete records might be written into data file even when + // ignore-delete configured, so the reader should also filter the delete records + return ignoreDelete ? transformed.filter(KeyValue::isAdd) : transformed; } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java index 27ddd1ff677ad..3cb74606f4796 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java @@ -144,7 +144,12 @@ private RecordReader createRecordReader( new ApplyDeletionVectorReader<>(fileRecordReader, deletionVector.get()); } - return new KeyValueDataFileRecordReader(fileRecordReader, keyType, valueType, level); + return new KeyValueDataFileRecordReader( + fileRecordReader, + keyType, + valueType, + level, + CoreOptions.fromMap(schema.options()).ignoreDelete()); } public static Builder builder( diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeIntoAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeIntoAction.java index 41f8b7677b8c4..4ec5e6ef5e9e4 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeIntoAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeIntoAction.java @@ -18,7 +18,6 @@ package org.apache.paimon.flink.action; -import org.apache.paimon.CoreOptions; import org.apache.paimon.flink.LogicalTypeConversion; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.types.DataField; @@ -39,7 +38,6 @@ import java.util.Arrays; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -137,8 +135,6 @@ public MergeIntoAction( table.getClass().getName())); } - changeIgnoreMergeEngine(); - // init primaryKeys of target table primaryKeys = ((FileStoreTable) table).schema().primaryKeys(); if (primaryKeys.isEmpty()) { @@ -161,24 +157,6 @@ public MergeIntoAction( .collect(Collectors.toList()); } - /** - * The {@link CoreOptions.MergeEngine}s will process -U/-D records in different ways, but we - * want these records to be sunk directly. This method is a workaround which disables merge - * engine settings and force compaction. - */ - private void changeIgnoreMergeEngine() { - if (CoreOptions.fromMap(table.options()).mergeEngine() - != CoreOptions.MergeEngine.DEDUPLICATE) { - Map dynamicOptions = new HashMap<>(); - dynamicOptions.put( - CoreOptions.MERGE_ENGINE.key(), CoreOptions.MergeEngine.DEDUPLICATE.toString()); - dynamicOptions.put(CoreOptions.IGNORE_DELETE.key(), "false"); - // force compaction - dynamicOptions.put(CoreOptions.FULL_COMPACTION_DELTA_COMMITS.key(), "1"); - table = ((FileStoreTable) table).internalCopyWithoutCheck(dynamicOptions); - } - } - public MergeIntoAction withTargetAlias(String targetAlias) { this.targetAlias = targetAlias; return this; diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java index 23a574fb54c25..34aaa7af2feeb 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java @@ -18,6 +18,14 @@ package org.apache.paimon.flink; +import org.apache.paimon.CoreOptions; +import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.local.LocalFileIO; +import org.apache.paimon.schema.Schema; +import org.apache.paimon.schema.SchemaManager; +import org.apache.paimon.schema.SchemaUtils; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DataTypes; import org.apache.paimon.utils.BlockingIterator; import org.apache.flink.configuration.RestartStrategyOptions; @@ -32,7 +40,10 @@ import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; @@ -434,4 +445,39 @@ public void testIgnoreDelete() throws Exception { assertThat(sql("SELECT * FROM ignore_delete")).containsExactlyInAnyOrder(Row.of(1, 30, 3)); } + + @Test + public void testIgnoreDeleteInReader() throws Exception { + sql( + "CREATE TABLE ignore_delete (pk INT PRIMARY KEY NOT ENFORCED, a STRING, b STRING) WITH (" + + " 'merge-engine' = 'deduplicate'," + + " 'write-only' = 'true'," + + " 'bucket' = '1')"); + sql("INSERT INTO ignore_delete VALUES (1, CAST (NULL AS STRING), 'apple')"); + // write delete records + sql("DELETE FROM ignore_delete WHERE pk = 1"); + sql("INSERT INTO ignore_delete VALUES (1, 'A', CAST (NULL AS STRING))"); + assertThat(batchSql("SELECT * FROM ignore_delete")) + .containsExactlyInAnyOrder(Row.of(1, "A", null)); + + // force alter merge-engine and read + Map newOptions = new HashMap<>(); + newOptions.put( + CoreOptions.MERGE_ENGINE.key(), CoreOptions.MergeEngine.PARTIAL_UPDATE.toString()); + newOptions.put(CoreOptions.BUCKET.key(), "1"); + newOptions.put(CoreOptions.IGNORE_DELETE.key(), "true"); + SchemaUtils.forceCommit( + new SchemaManager(LocalFileIO.create(), new Path(path, "default.db/ignore_delete")), + new Schema( + Arrays.asList( + new DataField(0, "pk", DataTypes.INT().notNull()), + new DataField(1, "a", DataTypes.STRING()), + new DataField(2, "b", DataTypes.STRING())), + Collections.emptyList(), + Collections.singletonList("pk"), + newOptions, + null)); + assertThat(batchSql("SELECT * FROM ignore_delete")) + .containsExactlyInAnyOrder(Row.of(1, "A", "apple")); + } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java index 3e27f0c0c5dfd..5834217c84e1b 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java @@ -130,29 +130,60 @@ public void testVariousChangelogProducer( changelogRow("+I", 8, "v_8", "insert", "02-29"), changelogRow("+I", 11, "v_11", "insert", "02-29"), changelogRow("+I", 12, "v_12", "insert", "02-29"))); - - if (producer == CoreOptions.ChangelogProducer.FULL_COMPACTION) { - // test partial update still works after action - testWorkWithPartialUpdate(); - } } - private void testWorkWithPartialUpdate() throws Exception { - insertInto( - "T", - "(12, CAST (NULL AS STRING), '$', '02-29')", - "(12, 'Test', CAST (NULL AS STRING), '02-29')"); + @Test + public void testWorkWithPartialUpdate() throws Exception { + // re-create target table with given producer + sEnv.executeSql("DROP TABLE T"); + prepareTargetTable(CoreOptions.ChangelogProducer.LOOKUP); - testBatchRead( - buildSimpleQuery("T"), + MergeIntoActionBuilder action = new MergeIntoActionBuilder(warehouse, database, "T"); + // here test if it works when table S is in default and qualified both + action.withSourceTable("default.S") + .withMergeCondition("T.k = S.k AND T.dt = S.dt") + .withMatchedUpsert( + "T.v <> S.v AND S.v IS NOT NULL", "v = S.v, last_action = 'matched_upsert'") + .withMatchedDelete("S.v IS NULL") + .withNotMatchedInsert(null, "S.k, S.v, 'insert', S.dt") + .withNotMatchedBySourceUpsert( + "dt < '02-28'", "v = v || '_nmu', last_action = 'not_matched_upsert'") + .withNotMatchedBySourceDelete("dt >= '02-28'"); + + // delete records are filtered + validateActionRunResult( + action.build(), Arrays.asList( - changelogRow("+I", 1, "v_1", "creation", "02-27"), changelogRow("+U", 2, "v_2_nmu", "not_matched_upsert", "02-27"), changelogRow("+U", 3, "v_3_nmu", "not_matched_upsert", "02-27"), changelogRow("+U", 7, "Seven", "matched_upsert", "02-28"), changelogRow("+I", 8, "v_8", "insert", "02-29"), changelogRow("+I", 11, "v_11", "insert", "02-29"), - changelogRow("+I", 12, "Test", "$", "02-29"))); + changelogRow("+I", 12, "v_12", "insert", "02-29")), + Arrays.asList( + changelogRow("+I", 1, "v_1", "creation", "02-27"), + changelogRow("+I", 2, "v_2_nmu", "not_matched_upsert", "02-27"), + changelogRow("+I", 3, "v_3_nmu", "not_matched_upsert", "02-27"), + changelogRow("+I", 4, "v_4", "creation", "02-27"), + changelogRow("+I", 5, "v_5", "creation", "02-28"), + changelogRow("+I", 6, "v_6", "creation", "02-28"), + changelogRow("+I", 7, "Seven", "matched_upsert", "02-28"), + changelogRow("+I", 8, "v_8", "creation", "02-28"), + changelogRow("+I", 8, "v_8", "insert", "02-29"), + changelogRow("+I", 9, "v_9", "creation", "02-28"), + changelogRow("+I", 10, "v_10", "creation", "02-28"), + changelogRow("+I", 11, "v_11", "insert", "02-29"), + changelogRow("+I", 12, "v_12", "insert", "02-29"))); + + // test partial update still works after action + insertInto( + "T", + "(12, CAST (NULL AS STRING), '$', '02-29')", + "(12, 'Test', CAST (NULL AS STRING), '02-29')"); + + testBatchRead( + "SELECT * FROM T WHERE k = 12", + Collections.singletonList(changelogRow("+I", 12, "Test", "$", "02-29"))); } @ParameterizedTest(name = "in-default = {0}") @@ -553,7 +584,7 @@ private void prepareTargetTable(CoreOptions.ChangelogProducer producer) throws E { put(CHANGELOG_PRODUCER.key(), producer.toString()); // test works with partial update normally - if (producer == CoreOptions.ChangelogProducer.FULL_COMPACTION) { + if (producer == CoreOptions.ChangelogProducer.LOOKUP) { put( CoreOptions.MERGE_ENGINE.key(), CoreOptions.MergeEngine.PARTIAL_UPDATE.toString());