Skip to content

Commit

Permalink
[core] Fix that ignore-delete option is not compatible with old delet…
Browse files Browse the repository at this point in the history
…e records and LocalMergeOperator (#3139)
  • Loading branch information
yuzelin authored Apr 3, 2024
1 parent a1beb7a commit 5add480
Show file tree
Hide file tree
Showing 8 changed files with 135 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,18 @@ public class KeyValueDataFileRecordReader implements RecordReader<KeyValue> {
private final RecordReader<InternalRow> reader;
private final KeyValueSerializer serializer;
private final int level;
private final boolean ignoreDelete;

public KeyValueDataFileRecordReader(
RecordReader<InternalRow> reader, RowType keyType, RowType valueType, int level) {
RecordReader<InternalRow> reader,
RowType keyType,
RowType valueType,
int level,
boolean ignoreDelete) {
this.reader = reader;
this.serializer = new KeyValueSerializer(keyType, valueType);
this.level = level;
this.ignoreDelete = ignoreDelete;
}

@Nullable
Expand All @@ -50,11 +56,15 @@ public RecordIterator<KeyValue> readBatch() throws IOException {
return null;
}

return iterator.transform(
internalRow ->
internalRow == null
? null
: serializer.fromRow(internalRow).setLevel(level));
RecordIterator<KeyValue> transformed =
iterator.transform(
internalRow ->
internalRow == null
? null
: serializer.fromRow(internalRow).setLevel(level));
// In 0.7- versions, the delete records might be written into data file even when
// ignore-delete configured, so the reader should also filter the delete records
return ignoreDelete ? transformed.filter(KeyValue::isAdd) : transformed;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public class KeyValueFileReaderFactory {
private final Map<FormatKey, BulkFormatMapping> bulkFormatMappings;
private final BinaryRow partition;
private final DeletionVector.Factory dvFactory;
private final boolean ignoreDelete;

private KeyValueFileReaderFactory(
FileIO fileIO,
Expand All @@ -91,6 +92,7 @@ private KeyValueFileReaderFactory(
this.partition = partition;
this.bulkFormatMappings = new HashMap<>();
this.dvFactory = dvFactory;
this.ignoreDelete = CoreOptions.fromMap(schema.options()).ignoreDelete();
}

public RecordReader<KeyValue> createRecordReader(
Expand Down Expand Up @@ -144,7 +146,8 @@ private RecordReader<KeyValue> createRecordReader(
new ApplyDeletionVectorReader<>(fileRecordReader, deletionVector.get());
}

return new KeyValueDataFileRecordReader(fileRecordReader, keyType, valueType, level);
return new KeyValueDataFileRecordReader(
fileRecordReader, keyType, valueType, level, ignoreDelete);
}

public static Builder builder(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,11 +189,6 @@ public FileStoreTable copyWithoutTimeTravel(Map<String, String> dynamicOptions)
return copyInternal(dynamicOptions, false);
}

@Override
public FileStoreTable internalCopyWithoutCheck(Map<String, String> dynamicOptions) {
return copyInternal(dynamicOptions, true);
}

private void checkImmutability(Map<String, String> dynamicOptions) {
Map<String, String> options = tableSchema.options();
// check option is not immutable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,6 @@ default Optional<String> comment() {
/** Doesn't change table schema even when there exists time travel scan options. */
FileStoreTable copyWithoutTimeTravel(Map<String, String> dynamicOptions);

/** Sometimes we have to change some Immutable options to implement features. */
FileStoreTable internalCopyWithoutCheck(Map<String, String> dynamicOptions);

/** TODO: this method is weird, old options will overwrite new options. */
FileStoreTable copyWithLatestSchema();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.paimon.flink.action;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.flink.LogicalTypeConversion;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.types.DataField;
Expand All @@ -39,7 +38,6 @@

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -137,8 +135,6 @@ public MergeIntoAction(
table.getClass().getName()));
}

changeIgnoreMergeEngine();

// init primaryKeys of target table
primaryKeys = ((FileStoreTable) table).schema().primaryKeys();
if (primaryKeys.isEmpty()) {
Expand All @@ -161,24 +157,6 @@ public MergeIntoAction(
.collect(Collectors.toList());
}

/**
* The {@link CoreOptions.MergeEngine}s will process -U/-D records in different ways, but we
* want these records to be sunk directly. This method is a workaround which disables merge
* engine settings and force compaction.
*/
private void changeIgnoreMergeEngine() {
if (CoreOptions.fromMap(table.options()).mergeEngine()
!= CoreOptions.MergeEngine.DEDUPLICATE) {
Map<String, String> dynamicOptions = new HashMap<>();
dynamicOptions.put(
CoreOptions.MERGE_ENGINE.key(), CoreOptions.MergeEngine.DEDUPLICATE.toString());
dynamicOptions.put(CoreOptions.IGNORE_DELETE.key(), "false");
// force compaction
dynamicOptions.put(CoreOptions.FULL_COMPACTION_DELTA_COMMITS.key(), "1");
table = ((FileStoreTable) table).internalCopyWithoutCheck(dynamicOptions);
}
}

public MergeIntoAction withTargetAlias(String targetAlias) {
this.targetAlias = targetAlias;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ public class LocalMergeOperator extends AbstractStreamOperator<InternalRow>

private static final long serialVersionUID = 1L;

TableSchema schema;
private final TableSchema schema;
private final boolean ignoreDelete;

private transient Projection keyProjection;
private transient RecordComparator keyComparator;
Expand All @@ -76,6 +77,7 @@ public LocalMergeOperator(TableSchema schema) {
schema.primaryKeys().size() > 0,
"LocalMergeOperator currently only support tables with primary keys");
this.schema = schema;
this.ignoreDelete = CoreOptions.fromMap(schema.options()).ignoreDelete();
setChainingStrategy(ChainingStrategy.ALWAYS);
}

Expand Down Expand Up @@ -137,6 +139,10 @@ public void processElement(StreamRecord<InternalRow> record) throws Exception {

RowKind rowKind =
rowKindGenerator == null ? row.getRowKind() : rowKindGenerator.generate(row);
if (ignoreDelete && rowKind.isRetract()) {
return;
}

// row kind must be INSERT when it is divided into key and value
row.setRowKind(RowKind.INSERT);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,14 @@

package org.apache.paimon.flink;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.SchemaUtils;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.utils.BlockingIterator;

import org.apache.flink.configuration.RestartStrategyOptions;
Expand All @@ -29,10 +37,15 @@
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -409,29 +422,68 @@ public void testPartialUpdateProjectionPushDownWithDeleteMessage() throws Except
insert2.close();
}

@Test
public void testIgnoreDelete() throws Exception {
@ParameterizedTest(name = "localMergeEnabled = {0}")
@ValueSource(booleans = {true, false})
public void testIgnoreDelete(boolean localMerge) throws Exception {
sql(
"CREATE TABLE ignore_delete (pk INT PRIMARY KEY NOT ENFORCED, a INT, g INT) WITH ("
"CREATE TABLE ignore_delete (pk INT PRIMARY KEY NOT ENFORCED, a STRING, b STRING) WITH ("
+ " 'merge-engine' = 'partial-update',"
+ " 'ignore-delete' = 'true',"
+ " 'fields.a.aggregate-function' = 'sum',"
+ " 'fields.g.sequence-group'='a')");
+ " 'ignore-delete' = 'true'"
+ ")");
if (localMerge) {
sql("ALTER TABLE ignore_delete SET ('local-merge-buffer-size' = '256 kb')");
}

String id =
TestValuesTableFactory.registerData(
Arrays.asList(
Row.ofKind(RowKind.INSERT, 1, 10, 1),
Row.ofKind(RowKind.DELETE, 1, 10, 2),
Row.ofKind(RowKind.INSERT, 1, 20, 3)));
Row.ofKind(RowKind.INSERT, 1, null, "apple"),
Row.ofKind(RowKind.DELETE, 1, null, "apple"),
Row.ofKind(RowKind.INSERT, 1, "A", null)));
streamSqlIter(
"CREATE TEMPORARY TABLE input (pk INT PRIMARY KEY NOT ENFORCED, a INT, g INT) "
"CREATE TEMPORARY TABLE input (pk INT PRIMARY KEY NOT ENFORCED, a STRING, b STRING) "
+ "WITH ('connector'='values', 'bounded'='true', 'data-id'='%s', "
+ "'changelog-mode' = 'I,D')",
id)
.close();
sEnv.executeSql("INSERT INTO ignore_delete SELECT * FROM input").await();

assertThat(sql("SELECT * FROM ignore_delete")).containsExactlyInAnyOrder(Row.of(1, 30, 3));
assertThat(sql("SELECT * FROM ignore_delete"))
.containsExactlyInAnyOrder(Row.of(1, "A", "apple"));
}

@Test
public void testIgnoreDeleteInReader() throws Exception {
sql(
"CREATE TABLE ignore_delete (pk INT PRIMARY KEY NOT ENFORCED, a STRING, b STRING) WITH ("
+ " 'merge-engine' = 'deduplicate',"
+ " 'write-only' = 'true',"
+ " 'bucket' = '1')");
sql("INSERT INTO ignore_delete VALUES (1, CAST (NULL AS STRING), 'apple')");
// write delete records
sql("DELETE FROM ignore_delete WHERE pk = 1");
sql("INSERT INTO ignore_delete VALUES (1, 'A', CAST (NULL AS STRING))");
assertThat(sql("SELECT * FROM ignore_delete"))
.containsExactlyInAnyOrder(Row.of(1, "A", null));

// force altering merge engine and read
Map<String, String> newOptions = new HashMap<>();
newOptions.put(
CoreOptions.MERGE_ENGINE.key(), CoreOptions.MergeEngine.PARTIAL_UPDATE.toString());
newOptions.put(CoreOptions.BUCKET.key(), "1");
newOptions.put(CoreOptions.IGNORE_DELETE.key(), "true");
SchemaUtils.forceCommit(
new SchemaManager(LocalFileIO.create(), new Path(path, "default.db/ignore_delete")),
new Schema(
Arrays.asList(
new DataField(0, "pk", DataTypes.INT().notNull()),
new DataField(1, "a", DataTypes.STRING()),
new DataField(2, "b", DataTypes.STRING())),
Collections.emptyList(),
Collections.singletonList("pk"),
newOptions,
null));
assertThat(sql("SELECT * FROM ignore_delete"))
.containsExactlyInAnyOrder(Row.of(1, "A", "apple"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -130,29 +130,59 @@ public void testVariousChangelogProducer(
changelogRow("+I", 8, "v_8", "insert", "02-29"),
changelogRow("+I", 11, "v_11", "insert", "02-29"),
changelogRow("+I", 12, "v_12", "insert", "02-29")));

if (producer == CoreOptions.ChangelogProducer.FULL_COMPACTION) {
// test partial update still works after action
testWorkWithPartialUpdate();
}
}

private void testWorkWithPartialUpdate() throws Exception {
insertInto(
"T",
"(12, CAST (NULL AS STRING), '$', '02-29')",
"(12, 'Test', CAST (NULL AS STRING), '02-29')");
@Test
public void testWorkWithPartialUpdate() throws Exception {
// re-create target table with given producer
sEnv.executeSql("DROP TABLE T");
prepareTargetTable(CoreOptions.ChangelogProducer.LOOKUP);

testBatchRead(
buildSimpleQuery("T"),
MergeIntoActionBuilder action = new MergeIntoActionBuilder(warehouse, database, "T");
action.withSourceTable("S")
.withMergeCondition("T.k = S.k AND T.dt = S.dt")
.withMatchedUpsert(
"T.v <> S.v AND S.v IS NOT NULL", "v = S.v, last_action = 'matched_upsert'")
.withMatchedDelete("S.v IS NULL")
.withNotMatchedInsert(null, "S.k, S.v, 'insert', S.dt")
.withNotMatchedBySourceUpsert(
"dt < '02-28'", "v = v || '_nmu', last_action = 'not_matched_upsert'")
.withNotMatchedBySourceDelete("dt >= '02-28'");

// delete records are filtered
validateActionRunResult(
action.build(),
Arrays.asList(
changelogRow("+I", 1, "v_1", "creation", "02-27"),
changelogRow("+U", 2, "v_2_nmu", "not_matched_upsert", "02-27"),
changelogRow("+U", 3, "v_3_nmu", "not_matched_upsert", "02-27"),
changelogRow("+U", 7, "Seven", "matched_upsert", "02-28"),
changelogRow("+I", 8, "v_8", "insert", "02-29"),
changelogRow("+I", 11, "v_11", "insert", "02-29"),
changelogRow("+I", 12, "Test", "$", "02-29")));
changelogRow("+I", 12, "v_12", "insert", "02-29")),
Arrays.asList(
changelogRow("+I", 1, "v_1", "creation", "02-27"),
changelogRow("+I", 2, "v_2_nmu", "not_matched_upsert", "02-27"),
changelogRow("+I", 3, "v_3_nmu", "not_matched_upsert", "02-27"),
changelogRow("+I", 4, "v_4", "creation", "02-27"),
changelogRow("+I", 5, "v_5", "creation", "02-28"),
changelogRow("+I", 6, "v_6", "creation", "02-28"),
changelogRow("+I", 7, "Seven", "matched_upsert", "02-28"),
changelogRow("+I", 8, "v_8", "creation", "02-28"),
changelogRow("+I", 8, "v_8", "insert", "02-29"),
changelogRow("+I", 9, "v_9", "creation", "02-28"),
changelogRow("+I", 10, "v_10", "creation", "02-28"),
changelogRow("+I", 11, "v_11", "insert", "02-29"),
changelogRow("+I", 12, "v_12", "insert", "02-29")));

// test partial update still works after action
insertInto(
"T",
"(12, CAST (NULL AS STRING), '$', '02-29')",
"(12, 'Test', CAST (NULL AS STRING), '02-29')");

testBatchRead(
"SELECT * FROM T WHERE k = 12",
Collections.singletonList(changelogRow("+I", 12, "Test", "$", "02-29")));
}

@ParameterizedTest(name = "in-default = {0}")
Expand Down Expand Up @@ -553,7 +583,7 @@ private void prepareTargetTable(CoreOptions.ChangelogProducer producer) throws E
{
put(CHANGELOG_PRODUCER.key(), producer.toString());
// test works with partial update normally
if (producer == CoreOptions.ChangelogProducer.FULL_COMPACTION) {
if (producer == CoreOptions.ChangelogProducer.LOOKUP) {
put(
CoreOptions.MERGE_ENGINE.key(),
CoreOptions.MergeEngine.PARTIAL_UPDATE.toString());
Expand Down

0 comments on commit 5add480

Please sign in to comment.