Skip to content

Commit

Permalink
[core] Fix that ignore-delete option is not compatible with old delet…
Browse files Browse the repository at this point in the history
…e records
  • Loading branch information
yuzelin committed Apr 2, 2024
1 parent 6921d41 commit 2f3fd86
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,18 @@ public class KeyValueDataFileRecordReader implements RecordReader<KeyValue> {
private final RecordReader<InternalRow> reader;
private final KeyValueSerializer serializer;
private final int level;
private final boolean ignoreDelete;

public KeyValueDataFileRecordReader(
RecordReader<InternalRow> reader, RowType keyType, RowType valueType, int level) {
RecordReader<InternalRow> reader,
RowType keyType,
RowType valueType,
int level,
boolean ignoreDelete) {
this.reader = reader;
this.serializer = new KeyValueSerializer(keyType, valueType);
this.level = level;
this.ignoreDelete = ignoreDelete;
}

@Nullable
Expand All @@ -50,11 +56,15 @@ public RecordIterator<KeyValue> readBatch() throws IOException {
return null;
}

return iterator.transform(
internalRow ->
internalRow == null
? null
: serializer.fromRow(internalRow).setLevel(level));
RecordIterator<KeyValue> transformed =
iterator.transform(
internalRow ->
internalRow == null
? null
: serializer.fromRow(internalRow).setLevel(level));
// In older version, the delete records might be written into data file even when
// ignore-delete configured, so the reader should also filter the delete records
return ignoreDelete ? transformed.filter(KeyValue::isAdd) : transformed;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,12 @@ private RecordReader<KeyValue> createRecordReader(
new ApplyDeletionVectorReader<>(fileRecordReader, deletionVector.get());
}

return new KeyValueDataFileRecordReader(fileRecordReader, keyType, valueType, level);
return new KeyValueDataFileRecordReader(
fileRecordReader,
keyType,
valueType,
level,
CoreOptions.fromMap(schema.options()).ignoreDelete());
}

public static Builder builder(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.paimon.flink.action;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.flink.LogicalTypeConversion;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.types.DataField;
Expand All @@ -39,7 +38,6 @@

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -137,8 +135,6 @@ public MergeIntoAction(
table.getClass().getName()));
}

changeIgnoreMergeEngine();

// init primaryKeys of target table
primaryKeys = ((FileStoreTable) table).schema().primaryKeys();
if (primaryKeys.isEmpty()) {
Expand All @@ -161,24 +157,6 @@ public MergeIntoAction(
.collect(Collectors.toList());
}

/**
* The {@link CoreOptions.MergeEngine}s will process -U/-D records in different ways, but we
* want these records to be sunk directly. This method is a workaround which disables merge
* engine settings and force compaction.
*/
private void changeIgnoreMergeEngine() {
if (CoreOptions.fromMap(table.options()).mergeEngine()
!= CoreOptions.MergeEngine.DEDUPLICATE) {
Map<String, String> dynamicOptions = new HashMap<>();
dynamicOptions.put(
CoreOptions.MERGE_ENGINE.key(), CoreOptions.MergeEngine.DEDUPLICATE.toString());
dynamicOptions.put(CoreOptions.IGNORE_DELETE.key(), "false");
// force compaction
dynamicOptions.put(CoreOptions.FULL_COMPACTION_DELTA_COMMITS.key(), "1");
table = ((FileStoreTable) table).internalCopyWithoutCheck(dynamicOptions);
}
}

public MergeIntoAction withTargetAlias(String targetAlias) {
this.targetAlias = targetAlias;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,14 @@

package org.apache.paimon.flink;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.SchemaUtils;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.utils.BlockingIterator;

import org.apache.flink.configuration.RestartStrategyOptions;
Expand All @@ -32,7 +40,10 @@

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -434,4 +445,39 @@ public void testIgnoreDelete() throws Exception {

assertThat(sql("SELECT * FROM ignore_delete")).containsExactlyInAnyOrder(Row.of(1, 30, 3));
}

@Test
public void testIgnoreDeleteInReader() throws Exception {
sql(
"CREATE TABLE ignore_delete (pk INT PRIMARY KEY NOT ENFORCED, a STRING, b STRING) WITH ("
+ " 'merge-engine' = 'deduplicate',"
+ " 'write-only' = 'true',"
+ " 'bucket' = '1')");
sql("INSERT INTO ignore_delete VALUES (1, CAST (NULL AS STRING), 'apple')");
// write delete records
sql("DELETE FROM ignore_delete WHERE pk = 1");
sql("INSERT INTO ignore_delete VALUES (1, 'A', CAST (NULL AS STRING))");
assertThat(batchSql("SELECT * FROM ignore_delete"))
.containsExactlyInAnyOrder(Row.of(1, "A", null));

// force alter merge-engine and read
Map<String, String> newOptions = new HashMap<>();
newOptions.put(
CoreOptions.MERGE_ENGINE.key(), CoreOptions.MergeEngine.PARTIAL_UPDATE.toString());
newOptions.put(CoreOptions.BUCKET.key(), "1");
newOptions.put(CoreOptions.IGNORE_DELETE.key(), "true");
SchemaUtils.forceCommit(
new SchemaManager(LocalFileIO.create(), new Path(path, "default.db/ignore_delete")),
new Schema(
Arrays.asList(
new DataField(0, "pk", DataTypes.INT().notNull()),
new DataField(1, "a", DataTypes.STRING()),
new DataField(2, "b", DataTypes.STRING())),
Collections.emptyList(),
Collections.singletonList("pk"),
newOptions,
null));
assertThat(batchSql("SELECT * FROM ignore_delete"))
.containsExactlyInAnyOrder(Row.of(1, "A", "apple"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -130,29 +130,60 @@ public void testVariousChangelogProducer(
changelogRow("+I", 8, "v_8", "insert", "02-29"),
changelogRow("+I", 11, "v_11", "insert", "02-29"),
changelogRow("+I", 12, "v_12", "insert", "02-29")));

if (producer == CoreOptions.ChangelogProducer.FULL_COMPACTION) {
// test partial update still works after action
testWorkWithPartialUpdate();
}
}

private void testWorkWithPartialUpdate() throws Exception {
insertInto(
"T",
"(12, CAST (NULL AS STRING), '$', '02-29')",
"(12, 'Test', CAST (NULL AS STRING), '02-29')");
@Test
public void testWorkWithPartialUpdate() throws Exception {
// re-create target table with given producer
sEnv.executeSql("DROP TABLE T");
prepareTargetTable(CoreOptions.ChangelogProducer.LOOKUP);

testBatchRead(
buildSimpleQuery("T"),
MergeIntoActionBuilder action = new MergeIntoActionBuilder(warehouse, database, "T");
// here test if it works when table S is in default and qualified both
action.withSourceTable("default.S")
.withMergeCondition("T.k = S.k AND T.dt = S.dt")
.withMatchedUpsert(
"T.v <> S.v AND S.v IS NOT NULL", "v = S.v, last_action = 'matched_upsert'")
.withMatchedDelete("S.v IS NULL")
.withNotMatchedInsert(null, "S.k, S.v, 'insert', S.dt")
.withNotMatchedBySourceUpsert(
"dt < '02-28'", "v = v || '_nmu', last_action = 'not_matched_upsert'")
.withNotMatchedBySourceDelete("dt >= '02-28'");

// delete records are filtered
validateActionRunResult(
action.build(),
Arrays.asList(
changelogRow("+I", 1, "v_1", "creation", "02-27"),
changelogRow("+U", 2, "v_2_nmu", "not_matched_upsert", "02-27"),
changelogRow("+U", 3, "v_3_nmu", "not_matched_upsert", "02-27"),
changelogRow("+U", 7, "Seven", "matched_upsert", "02-28"),
changelogRow("+I", 8, "v_8", "insert", "02-29"),
changelogRow("+I", 11, "v_11", "insert", "02-29"),
changelogRow("+I", 12, "Test", "$", "02-29")));
changelogRow("+I", 12, "v_12", "insert", "02-29")),
Arrays.asList(
changelogRow("+I", 1, "v_1", "creation", "02-27"),
changelogRow("+I", 2, "v_2_nmu", "not_matched_upsert", "02-27"),
changelogRow("+I", 3, "v_3_nmu", "not_matched_upsert", "02-27"),
changelogRow("+I", 4, "v_4", "creation", "02-27"),
changelogRow("+I", 5, "v_5", "creation", "02-28"),
changelogRow("+I", 6, "v_6", "creation", "02-28"),
changelogRow("+I", 7, "Seven", "matched_upsert", "02-28"),
changelogRow("+I", 8, "v_8", "creation", "02-28"),
changelogRow("+I", 8, "v_8", "insert", "02-29"),
changelogRow("+I", 9, "v_9", "creation", "02-28"),
changelogRow("+I", 10, "v_10", "creation", "02-28"),
changelogRow("+I", 11, "v_11", "insert", "02-29"),
changelogRow("+I", 12, "v_12", "insert", "02-29")));

// test partial update still works after action
insertInto(
"T",
"(12, CAST (NULL AS STRING), '$', '02-29')",
"(12, 'Test', CAST (NULL AS STRING), '02-29')");

testBatchRead(
"SELECT * FROM T WHERE k = 12",
Collections.singletonList(changelogRow("+I", 12, "Test", "$", "02-29")));
}

@ParameterizedTest(name = "in-default = {0}")
Expand Down Expand Up @@ -553,7 +584,7 @@ private void prepareTargetTable(CoreOptions.ChangelogProducer producer) throws E
{
put(CHANGELOG_PRODUCER.key(), producer.toString());
// test works with partial update normally
if (producer == CoreOptions.ChangelogProducer.FULL_COMPACTION) {
if (producer == CoreOptions.ChangelogProducer.LOOKUP) {
put(
CoreOptions.MERGE_ENGINE.key(),
CoreOptions.MergeEngine.PARTIAL_UPDATE.toString());
Expand Down

0 comments on commit 2f3fd86

Please sign in to comment.