Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[flink] Delete in flink should produce changelog no matter what #2594

Merged
merged 1 commit into from
May 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,12 @@
<td>Duration</td>
<td>The TTL in rocksdb index for cross partition upsert (primary keys not contain all partition fields), this can avoid maintaining too many indexes and lead to worse and worse performance, but please note that this may also cause data duplication.</td>
</tr>
<tr>
<td><h5>delete.force-produce-changelog</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Force produce changelog in delete sql no matter what if changelog producer is not NONE.</td>
</tr>
<tr>
<td><h5>deletion-vectors.enabled</h5></td>
<td style="word-wrap: break-word;">false</td>
Expand Down
13 changes: 13 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -1104,6 +1104,14 @@ public class CoreOptions implements Serializable {
"Whether to enable deletion vectors mode. In this mode, index files containing deletion"
+ " vectors are generated when data is written, which marks the data for deletion."
+ " During read operations, by applying these index files, merging can be avoided.");

public static final ConfigOption<Boolean> DELETION_FORCE_PRODUCE_CHANGELOG =
key("delete.force-produce-changelog")
.booleanType()
.defaultValue(false)
.withDescription(
"Force produce changelog in delete sql no matter what if changelog producer is not NONE.");

public static final ConfigOption<RangeStrategy> SORT_RANG_STRATEGY =
key("sort-compaction.range-strategy")
.enumType(RangeStrategy.class)
Expand Down Expand Up @@ -1749,6 +1757,11 @@ public boolean fileIndexReadEnabled() {
return options.get(FILE_INDEX_READ_ENABLED);
}

public boolean deleteForceProduceChangelog() {
return options.get(DELETION_FORCE_PRODUCE_CHANGELOG)
&& changelogProducer() != CoreOptions.ChangelogProducer.NONE;
}

/** Specifies the merge engine for table with primary key. */
public enum MergeEngine implements DescribedEnum {
DEDUPLICATE("deduplicate", "De-duplicate and keep the last row."),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@
import java.util.UUID;
import java.util.stream.Collectors;

import static org.apache.paimon.CoreOptions.BUCKET;
import static org.apache.paimon.CoreOptions.MERGE_ENGINE;
import static org.apache.paimon.CoreOptions.MergeEngine.DEDUPLICATE;
import static org.apache.paimon.CoreOptions.MergeEngine.PARTIAL_UPDATE;
Expand Down Expand Up @@ -193,7 +192,9 @@ private void validateDeletable() {
}

private boolean canPushDownDeleteFilter() {
return -1 != Options.fromMap(table.options()).get(BUCKET)
CoreOptions coreOptions = CoreOptions.fromMap(table.options());
return -1 != coreOptions.bucket()
&& !coreOptions.deleteForceProduceChangelog()
&& (deletePredicate == null || deleteIsDropPartition() || deleteInSingleNode());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import org.apache.flink.util.CloseableIterator;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -437,6 +439,31 @@ public void testDeleteWithPkLookup() throws Exception {
iterator.close();
}

@ParameterizedTest
@ValueSource(strings = {"lookup", "input"})
public void testDeletePartitionWithChangelog(String producer) throws Exception {
sql(
"CREATE TABLE delete_table (pt INT, pk INT, v STRING, PRIMARY KEY(pt, pk) NOT ENFORCED) PARTITIONED BY (pt) "
+ "WITH ('changelog-producer' = '"
+ producer
+ "', 'delete.force-produce-changelog'='true', 'bucket'='1')");
BlockingIterator<Row, Row> iterator = streamSqlBlockIter("SELECT * FROM delete_table");

sql("INSERT INTO delete_table VALUES (1, 1, 'A'), (2, 2, 'B')");
assertThat(iterator.collect(2))
.containsExactlyInAnyOrder(
Row.ofKind(RowKind.INSERT, 1, 1, "A"),
Row.ofKind(RowKind.INSERT, 2, 2, "B"));
sql("DELETE FROM delete_table WHERE pt = 1");
assertThat(iterator.collect(1))
.containsExactlyInAnyOrder(Row.ofKind(RowKind.DELETE, 1, 1, "A"));
sql("INSERT INTO delete_table VALUES (1, 1, 'B')");

assertThat(iterator.collect(1))
.containsExactlyInAnyOrder(Row.ofKind(RowKind.INSERT, 1, 1, "B"));
iterator.close();
}

@Test
public void testScanFromOldSchema() throws InterruptedException {
sql("CREATE TABLE select_old (f0 INT PRIMARY KEY NOT ENFORCED, f1 STRING)");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@
package org.apache.paimon.spark.commands

import org.apache.paimon.CoreOptions
import org.apache.paimon.options.Options
import org.apache.paimon.partition.PartitionPredicate
import org.apache.paimon.predicate.OnlyPartitionKeyEqualVisitor
import org.apache.paimon.spark.{InsertInto, SparkTable}
import org.apache.paimon.spark.PaimonSplitScan
import org.apache.paimon.spark.catalyst.Compatibility
import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper
Expand Down Expand Up @@ -74,7 +77,12 @@ case class DeleteFromPaimonTableCommand(
ignoreFailure = true)
}

if (otherCondition.isEmpty && partitionPredicate.nonEmpty) {
if (
otherCondition.isEmpty && partitionPredicate.nonEmpty && !table
.store()
.options()
.deleteForceProduceChangelog()
) {
val matchedPartitions =
table.newSnapshotReader().withPartitionFilter(partitionPredicate.get).partitions().asScala
val rowDataPartitionComputer = new RowDataPartitionComputer(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,35 @@ abstract class DeleteFromTableTestBase extends PaimonSparkTestBase {
val rows4 = spark.sql("SELECT * FROM T ORDER BY id").collectAsList()
assertThat(rows4.toString).isEqualTo("[]")
}

test(s"test delete producer changelog") {
spark.sql(
s"""
|CREATE TABLE T (id INT, name STRING, dt STRING, hh STRING)
|TBLPROPERTIES ('primary-key' = 'id, dt, hh', 'merge-engine' = 'deduplicate', 'changelog-producer'='input', 'delete.force-produce-changelog'='true')
|PARTITIONED BY (dt, hh)
|""".stripMargin)

spark.sql(
"INSERT INTO T VALUES " +
"(1, 'a', '2023-10-01', '12')," +
"(2, 'b', '2023-10-01', '12')," +
"(3, 'c', '2023-10-02', '12')," +
"(4, 'd', '2023-10-02', '13')," +
"(5, 'e', '2023-10-02', '14')," +
"(6, 'f', '2023-10-02', '15')")

// delete isn't drop partition
spark.sql("DELETE FROM T WHERE name = 'a' and hh = '12'")
assertThat(spark.sql("SELECT * FROM `T$audit_log` WHERE rowkind='-D'").collectAsList().size())
.isEqualTo(1)

// delete is drop partition
spark.sql("DELETE FROM T WHERE hh = '12'")
assertThat(spark.sql("SELECT * FROM `T$audit_log` WHERE rowkind='-D'").collectAsList().size())
.isEqualTo(3)

}
}

class DeleteFromTableTest extends DeleteFromTableTestBase {}
Loading