Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Fix that ignore-delete option is not compatible with old delete records and LocalMergeOperator #3139

Merged
merged 6 commits into from
Apr 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,18 @@ public class KeyValueDataFileRecordReader implements RecordReader<KeyValue> {
private final RecordReader<InternalRow> reader;
private final KeyValueSerializer serializer;
private final int level;
private final boolean ignoreDelete;

public KeyValueDataFileRecordReader(
RecordReader<InternalRow> reader, RowType keyType, RowType valueType, int level) {
RecordReader<InternalRow> reader,
RowType keyType,
RowType valueType,
int level,
boolean ignoreDelete) {
this.reader = reader;
this.serializer = new KeyValueSerializer(keyType, valueType);
this.level = level;
this.ignoreDelete = ignoreDelete;
}

@Nullable
Expand All @@ -50,11 +56,15 @@ public RecordIterator<KeyValue> readBatch() throws IOException {
return null;
}

return iterator.transform(
internalRow ->
internalRow == null
? null
: serializer.fromRow(internalRow).setLevel(level));
RecordIterator<KeyValue> transformed =
iterator.transform(
internalRow ->
internalRow == null
? null
: serializer.fromRow(internalRow).setLevel(level));
// In 0.7- versions, the delete records might be written into data file even when
// ignore-delete configured, so the reader should also filter the delete records
return ignoreDelete ? transformed.filter(KeyValue::isAdd) : transformed;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public class KeyValueFileReaderFactory {
private final Map<FormatKey, BulkFormatMapping> bulkFormatMappings;
private final BinaryRow partition;
private final DeletionVector.Factory dvFactory;
private final boolean ignoreDelete;

private KeyValueFileReaderFactory(
FileIO fileIO,
Expand All @@ -91,6 +92,7 @@ private KeyValueFileReaderFactory(
this.partition = partition;
this.bulkFormatMappings = new HashMap<>();
this.dvFactory = dvFactory;
this.ignoreDelete = CoreOptions.fromMap(schema.options()).ignoreDelete();
}

public RecordReader<KeyValue> createRecordReader(
Expand Down Expand Up @@ -144,7 +146,8 @@ private RecordReader<KeyValue> createRecordReader(
new ApplyDeletionVectorReader<>(fileRecordReader, deletionVector.get());
}

return new KeyValueDataFileRecordReader(fileRecordReader, keyType, valueType, level);
return new KeyValueDataFileRecordReader(
fileRecordReader, keyType, valueType, level, ignoreDelete);
}

public static Builder builder(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,11 +189,6 @@ public FileStoreTable copyWithoutTimeTravel(Map<String, String> dynamicOptions)
return copyInternal(dynamicOptions, false);
}

@Override
public FileStoreTable internalCopyWithoutCheck(Map<String, String> dynamicOptions) {
return copyInternal(dynamicOptions, true);
}

private void checkImmutability(Map<String, String> dynamicOptions) {
Map<String, String> options = tableSchema.options();
// check option is not immutable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,6 @@ default Optional<String> comment() {
/** Doesn't change table schema even when there exists time travel scan options. */
FileStoreTable copyWithoutTimeTravel(Map<String, String> dynamicOptions);

/** Sometimes we have to change some Immutable options to implement features. */
FileStoreTable internalCopyWithoutCheck(Map<String, String> dynamicOptions);

/** TODO: this method is weird, old options will overwrite new options. */
FileStoreTable copyWithLatestSchema();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.paimon.flink.action;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.flink.LogicalTypeConversion;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.types.DataField;
Expand All @@ -39,7 +38,6 @@

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -137,8 +135,6 @@ public MergeIntoAction(
table.getClass().getName()));
}

changeIgnoreMergeEngine();

// init primaryKeys of target table
primaryKeys = ((FileStoreTable) table).schema().primaryKeys();
if (primaryKeys.isEmpty()) {
Expand All @@ -161,24 +157,6 @@ public MergeIntoAction(
.collect(Collectors.toList());
}

/**
* The {@link CoreOptions.MergeEngine}s will process -U/-D records in different ways, but we
* want these records to be sunk directly. This method is a workaround which disables merge
* engine settings and force compaction.
*/
private void changeIgnoreMergeEngine() {
if (CoreOptions.fromMap(table.options()).mergeEngine()
!= CoreOptions.MergeEngine.DEDUPLICATE) {
Map<String, String> dynamicOptions = new HashMap<>();
dynamicOptions.put(
CoreOptions.MERGE_ENGINE.key(), CoreOptions.MergeEngine.DEDUPLICATE.toString());
dynamicOptions.put(CoreOptions.IGNORE_DELETE.key(), "false");
// force compaction
dynamicOptions.put(CoreOptions.FULL_COMPACTION_DELTA_COMMITS.key(), "1");
table = ((FileStoreTable) table).internalCopyWithoutCheck(dynamicOptions);
}
}

public MergeIntoAction withTargetAlias(String targetAlias) {
this.targetAlias = targetAlias;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ public class LocalMergeOperator extends AbstractStreamOperator<InternalRow>

private static final long serialVersionUID = 1L;

TableSchema schema;
private final TableSchema schema;
private final boolean ignoreDelete;

private transient Projection keyProjection;
private transient RecordComparator keyComparator;
Expand All @@ -76,6 +77,7 @@ public LocalMergeOperator(TableSchema schema) {
schema.primaryKeys().size() > 0,
"LocalMergeOperator currently only support tables with primary keys");
this.schema = schema;
this.ignoreDelete = CoreOptions.fromMap(schema.options()).ignoreDelete();
setChainingStrategy(ChainingStrategy.ALWAYS);
}

Expand Down Expand Up @@ -137,6 +139,10 @@ public void processElement(StreamRecord<InternalRow> record) throws Exception {

RowKind rowKind =
rowKindGenerator == null ? row.getRowKind() : rowKindGenerator.generate(row);
if (ignoreDelete && rowKind.isRetract()) {
return;
}

// row kind must be INSERT when it is divided into key and value
row.setRowKind(RowKind.INSERT);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,14 @@

package org.apache.paimon.flink;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.SchemaUtils;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.utils.BlockingIterator;

import org.apache.flink.configuration.RestartStrategyOptions;
Expand All @@ -29,10 +37,15 @@
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -409,29 +422,68 @@ public void testPartialUpdateProjectionPushDownWithDeleteMessage() throws Except
insert2.close();
}

@Test
public void testIgnoreDelete() throws Exception {
@ParameterizedTest(name = "localMergeEnabled = {0}")
@ValueSource(booleans = {true, false})
public void testIgnoreDelete(boolean localMerge) throws Exception {
sql(
"CREATE TABLE ignore_delete (pk INT PRIMARY KEY NOT ENFORCED, a INT, g INT) WITH ("
"CREATE TABLE ignore_delete (pk INT PRIMARY KEY NOT ENFORCED, a STRING, b STRING) WITH ("
+ " 'merge-engine' = 'partial-update',"
+ " 'ignore-delete' = 'true',"
+ " 'fields.a.aggregate-function' = 'sum',"
+ " 'fields.g.sequence-group'='a')");
+ " 'ignore-delete' = 'true'"
+ ")");
if (localMerge) {
sql("ALTER TABLE ignore_delete SET ('local-merge-buffer-size' = '256 kb')");
}

String id =
TestValuesTableFactory.registerData(
Arrays.asList(
Row.ofKind(RowKind.INSERT, 1, 10, 1),
Row.ofKind(RowKind.DELETE, 1, 10, 2),
Row.ofKind(RowKind.INSERT, 1, 20, 3)));
Row.ofKind(RowKind.INSERT, 1, null, "apple"),
Row.ofKind(RowKind.DELETE, 1, null, "apple"),
Row.ofKind(RowKind.INSERT, 1, "A", null)));
streamSqlIter(
"CREATE TEMPORARY TABLE input (pk INT PRIMARY KEY NOT ENFORCED, a INT, g INT) "
"CREATE TEMPORARY TABLE input (pk INT PRIMARY KEY NOT ENFORCED, a STRING, b STRING) "
+ "WITH ('connector'='values', 'bounded'='true', 'data-id'='%s', "
+ "'changelog-mode' = 'I,D')",
id)
.close();
sEnv.executeSql("INSERT INTO ignore_delete SELECT * FROM input").await();

assertThat(sql("SELECT * FROM ignore_delete")).containsExactlyInAnyOrder(Row.of(1, 30, 3));
assertThat(sql("SELECT * FROM ignore_delete"))
.containsExactlyInAnyOrder(Row.of(1, "A", "apple"));
}

@Test
public void testIgnoreDeleteInReader() throws Exception {
sql(
"CREATE TABLE ignore_delete (pk INT PRIMARY KEY NOT ENFORCED, a STRING, b STRING) WITH ("
+ " 'merge-engine' = 'deduplicate',"
+ " 'write-only' = 'true',"
+ " 'bucket' = '1')");
sql("INSERT INTO ignore_delete VALUES (1, CAST (NULL AS STRING), 'apple')");
// write delete records
sql("DELETE FROM ignore_delete WHERE pk = 1");
sql("INSERT INTO ignore_delete VALUES (1, 'A', CAST (NULL AS STRING))");
assertThat(sql("SELECT * FROM ignore_delete"))
.containsExactlyInAnyOrder(Row.of(1, "A", null));

// force altering merge engine and read
Map<String, String> newOptions = new HashMap<>();
newOptions.put(
CoreOptions.MERGE_ENGINE.key(), CoreOptions.MergeEngine.PARTIAL_UPDATE.toString());
newOptions.put(CoreOptions.BUCKET.key(), "1");
newOptions.put(CoreOptions.IGNORE_DELETE.key(), "true");
SchemaUtils.forceCommit(
new SchemaManager(LocalFileIO.create(), new Path(path, "default.db/ignore_delete")),
new Schema(
Arrays.asList(
new DataField(0, "pk", DataTypes.INT().notNull()),
new DataField(1, "a", DataTypes.STRING()),
new DataField(2, "b", DataTypes.STRING())),
Collections.emptyList(),
Collections.singletonList("pk"),
newOptions,
null));
assertThat(sql("SELECT * FROM ignore_delete"))
.containsExactlyInAnyOrder(Row.of(1, "A", "apple"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -130,29 +130,59 @@ public void testVariousChangelogProducer(
changelogRow("+I", 8, "v_8", "insert", "02-29"),
changelogRow("+I", 11, "v_11", "insert", "02-29"),
changelogRow("+I", 12, "v_12", "insert", "02-29")));

if (producer == CoreOptions.ChangelogProducer.FULL_COMPACTION) {
// test partial update still works after action
testWorkWithPartialUpdate();
}
}

private void testWorkWithPartialUpdate() throws Exception {
insertInto(
"T",
"(12, CAST (NULL AS STRING), '$', '02-29')",
"(12, 'Test', CAST (NULL AS STRING), '02-29')");
@Test
public void testWorkWithPartialUpdate() throws Exception {
// re-create target table with given producer
sEnv.executeSql("DROP TABLE T");
prepareTargetTable(CoreOptions.ChangelogProducer.LOOKUP);

testBatchRead(
buildSimpleQuery("T"),
MergeIntoActionBuilder action = new MergeIntoActionBuilder(warehouse, database, "T");
action.withSourceTable("S")
.withMergeCondition("T.k = S.k AND T.dt = S.dt")
.withMatchedUpsert(
"T.v <> S.v AND S.v IS NOT NULL", "v = S.v, last_action = 'matched_upsert'")
.withMatchedDelete("S.v IS NULL")
.withNotMatchedInsert(null, "S.k, S.v, 'insert', S.dt")
.withNotMatchedBySourceUpsert(
"dt < '02-28'", "v = v || '_nmu', last_action = 'not_matched_upsert'")
.withNotMatchedBySourceDelete("dt >= '02-28'");

// delete records are filtered
validateActionRunResult(
action.build(),
Arrays.asList(
changelogRow("+I", 1, "v_1", "creation", "02-27"),
changelogRow("+U", 2, "v_2_nmu", "not_matched_upsert", "02-27"),
changelogRow("+U", 3, "v_3_nmu", "not_matched_upsert", "02-27"),
changelogRow("+U", 7, "Seven", "matched_upsert", "02-28"),
changelogRow("+I", 8, "v_8", "insert", "02-29"),
changelogRow("+I", 11, "v_11", "insert", "02-29"),
changelogRow("+I", 12, "Test", "$", "02-29")));
changelogRow("+I", 12, "v_12", "insert", "02-29")),
Arrays.asList(
changelogRow("+I", 1, "v_1", "creation", "02-27"),
changelogRow("+I", 2, "v_2_nmu", "not_matched_upsert", "02-27"),
changelogRow("+I", 3, "v_3_nmu", "not_matched_upsert", "02-27"),
changelogRow("+I", 4, "v_4", "creation", "02-27"),
changelogRow("+I", 5, "v_5", "creation", "02-28"),
changelogRow("+I", 6, "v_6", "creation", "02-28"),
changelogRow("+I", 7, "Seven", "matched_upsert", "02-28"),
changelogRow("+I", 8, "v_8", "creation", "02-28"),
changelogRow("+I", 8, "v_8", "insert", "02-29"),
changelogRow("+I", 9, "v_9", "creation", "02-28"),
changelogRow("+I", 10, "v_10", "creation", "02-28"),
changelogRow("+I", 11, "v_11", "insert", "02-29"),
changelogRow("+I", 12, "v_12", "insert", "02-29")));

// test partial update still works after action
insertInto(
"T",
"(12, CAST (NULL AS STRING), '$', '02-29')",
"(12, 'Test', CAST (NULL AS STRING), '02-29')");

testBatchRead(
"SELECT * FROM T WHERE k = 12",
Collections.singletonList(changelogRow("+I", 12, "Test", "$", "02-29")));
}

@ParameterizedTest(name = "in-default = {0}")
Expand Down Expand Up @@ -553,7 +583,7 @@ private void prepareTargetTable(CoreOptions.ChangelogProducer producer) throws E
{
put(CHANGELOG_PRODUCER.key(), producer.toString());
// test works with partial update normally
if (producer == CoreOptions.ChangelogProducer.FULL_COMPACTION) {
if (producer == CoreOptions.ChangelogProducer.LOOKUP) {
put(
CoreOptions.MERGE_ENGINE.key(),
CoreOptions.MergeEngine.PARTIAL_UPDATE.toString());
Expand Down
Loading