Skip to content

Commit

Permalink
fix comment
Browse files Browse the repository at this point in the history
  • Loading branch information
leaves12138 committed Apr 24, 2024
1 parent 61e4e23 commit e02085c
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 9 deletions.
12 changes: 12 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -1075,6 +1075,14 @@ public class CoreOptions implements Serializable {
"Whether to enable deletion vectors mode. In this mode, index files containing deletion"
+ " vectors are generated when data is written, which marks the data for deletion."
+ " During read operations, by applying these index files, merging can be avoided.");

public static final ConfigOption<Boolean> DELETION_FORCE_PRODUCE_CHANGELOG =
key("delete.force-produce-changelog")
.booleanType()
.defaultValue(false)
.withDescription(
"Force produce changelog in delete sql no matter what if changelog producer is not NONE.");

public static final ConfigOption<RangeStrategy> SORT_RANG_STRATEGY =
key("sort-compaction.range-strategy")
.enumType(RangeStrategy.class)
Expand Down Expand Up @@ -1703,6 +1711,10 @@ public boolean deletionVectorsEnabled() {
return options.get(DELETION_VECTORS_ENABLED);
}

public boolean deleteForceProduceChangelog() {
return options.get(DELETION_FORCE_PRODUCE_CHANGELOG);
}

/** Specifies the merge engine for table with primary key. */
public enum MergeEngine implements DescribedEnum {
DEDUPLICATE("deduplicate", "De-duplicate and keep the last row."),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.flink.sink;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.CoreOptions.MergeEngine;
import org.apache.paimon.flink.LogicalTypeConversion;
import org.apache.paimon.flink.PredicateConverter;
Expand Down Expand Up @@ -174,7 +175,14 @@ public Optional<Long> executeDeletion() {
commit.truncateTable(identifier);
return Optional.empty();
} else if (deleteIsDropPartition()) {
return Optional.of(TableUtils.deletePartition(table, deletePartitions()));
if (((FileStoreTable) table).coreOptions().deleteForceProduceChangelog()
&& ((FileStoreTable) table).coreOptions().changelogProducer()
!= CoreOptions.ChangelogProducer.NONE) {
return Optional.of(TableUtils.deletePartition(table, deletePartitions()));
} else {
commit.dropPartitions(Collections.singletonList(deletePartitions()), identifier);
return Optional.empty();
}
} else {
return Optional.of(
TableUtils.deleteWhere(table, Collections.singletonList(deletePredicate)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -446,15 +446,16 @@ public void testDeletePartitionWithChangelog(String producer) throws Exception {
"CREATE TABLE ignore_delete (pt INT, pk INT, v STRING, PRIMARY KEY(pt, pk) NOT ENFORCED) PARTITIONED BY (pt) "
+ "WITH ('changelog-producer' = '"
+ producer
+ "')");
+ "', 'delete.force-produce-changelog'='true', 'bucket'='1')");
BlockingIterator<Row, Row> iterator = streamSqlBlockIter("SELECT * FROM ignore_delete");

sql("INSERT INTO ignore_delete VALUES (1, 1, 'A'), (2, 2, 'B')");
assertThat(iterator.collect(2))
.containsExactlyInAnyOrder(
Row.ofKind(RowKind.INSERT, 1, 1, "A"),
Row.ofKind(RowKind.INSERT, 2, 2, "B"));
sql("DELETE FROM ignore_delete WHERE pt = 1");
sql(
"DELETE FROM ignore_delete /*+ OPTIONS('delete.force-produce-changelog'='true') */ WHERE pt = 1");
assertThat(iterator.collect(1))
.containsExactlyInAnyOrder(Row.ofKind(RowKind.DELETE, 1, 1, "A"));
sql("INSERT INTO ignore_delete VALUES (1, 1, 'B')");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@

package org.apache.paimon.spark.commands

import org.apache.paimon.CoreOptions
import org.apache.paimon.options.Options
import org.apache.paimon.predicate.OnlyPartitionKeyEqualVisitor
import org.apache.paimon.spark.{InsertInto, SparkTable}
import org.apache.paimon.spark.PaimonSplitScan
import org.apache.paimon.spark.catalyst.Compatibility
import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper
Expand All @@ -35,7 +39,9 @@ import org.apache.spark.sql.catalyst.plans.logical.{Filter, SupportsSubquery}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.functions.lit

import java.util.UUID
import java.util.{Collections, UUID}

import scala.collection.JavaConverters._

case class DeleteFromPaimonTableCommand(
relation: DataSourceV2Relation,
Expand All @@ -49,16 +55,52 @@ case class DeleteFromPaimonTableCommand(
private lazy val writer = PaimonSparkWriter(table)

override def run(sparkSession: SparkSession): Seq[Row] = {

val commit = table.store.newCommit(UUID.randomUUID.toString)
if (condition == null || condition == TrueLiteral) {
commit.truncateTable(BatchWriteBuilder.COMMIT_IDENTIFIER)
} else {
val commitMessages = if (withPrimaryKeys) {
performDeleteForPkTable(sparkSession)
val (partitionCondition, otherCondition) = splitPruePartitionAndOtherPredicates(
condition,
table.partitionKeys().asScala,
sparkSession.sessionState.conf.resolver)

// TODO: provide another partition visitor to support more partition predicate.
val visitor = new OnlyPartitionKeyEqualVisitor(table.partitionKeys)
val partitionPredicate = if (partitionCondition.isEmpty) {
None
} else {
convertConditionToPaimonPredicate(
partitionCondition.reduce(And),
relation.output,
rowType,
ignoreFailure = true)
}

// We do not have to scan table if the following three requirements are met:
// 1) no other predicate;
// 2) partition condition can convert to paimon predicate;
// 3) partition predicate can be visit by OnlyPartitionKeyEqualVisitor.
val forceDeleteByRows =
otherCondition.nonEmpty || partitionPredicate.isEmpty || !partitionPredicate.get.visit(
visitor) || (table.store().options().deleteForceProduceChangelog() && table
.store()
.options()
.changelogProducer() != CoreOptions.ChangelogProducer.NONE)

if (forceDeleteByRows) {
val commitMessages = if (withPrimaryKeys) {
performDeleteForPkTable(sparkSession)
} else {
performDeleteForNonPkTable(sparkSession)
}
writer.commit(commitMessages)
} else {
performDeleteForNonPkTable(sparkSession)
val dropPartitions = visitor.partitions()
commit.dropPartitions(
Collections.singletonList(dropPartitions),
BatchWriteBuilder.COMMIT_IDENTIFIER)
}
writer.commit(commitMessages)
}

Seq.empty[Row]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ abstract class DeleteFromTableTestBase extends PaimonSparkTestBase {
spark.sql(
s"""
|CREATE TABLE T (id INT, name STRING, dt STRING, hh STRING)
|TBLPROPERTIES ('primary-key' = 'id, dt, hh', 'merge-engine' = 'deduplicate', 'changelog-producer'='input')
|TBLPROPERTIES ('primary-key' = 'id, dt, hh', 'merge-engine' = 'deduplicate', 'changelog-producer'='input', 'delete.force-produce-changelog'='true')
|PARTITIONED BY (dt, hh)
|""".stripMargin)

Expand Down

0 comments on commit e02085c

Please sign in to comment.