diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala index 0ac0b14bbe6c..c955acbdfa26 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala @@ -18,7 +18,7 @@ package org.apache.paimon.spark.commands -import org.apache.paimon.predicate.OnlyPartitionKeyEqualVisitor +import org.apache.paimon.CoreOptions import org.apache.paimon.spark.PaimonSplitScan import org.apache.paimon.spark.catalyst.Compatibility import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper @@ -27,6 +27,7 @@ import org.apache.paimon.spark.schema.SparkSystemColumns.ROW_KIND_COL import org.apache.paimon.table.FileStoreTable import org.apache.paimon.table.sink.{BatchWriteBuilder, CommitMessage} import org.apache.paimon.types.RowKind +import org.apache.paimon.utils.RowDataPartitionComputer import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.PaimonUtils.createDataset @@ -36,7 +37,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{Filter, SupportsSubquery} import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation import org.apache.spark.sql.functions.lit -import java.util.{Collections, UUID} +import java.util.UUID import scala.collection.JavaConverters._ @@ -62,38 +63,35 @@ case class DeleteFromPaimonTableCommand( table.partitionKeys().asScala, sparkSession.sessionState.conf.resolver) - // TODO: provide another partition visitor to support more partition predicate. - val visitor = new OnlyPartitionKeyEqualVisitor(table.partitionKeys) val partitionPredicate = if (partitionCondition.isEmpty) { None } else { convertConditionToPaimonPredicate( partitionCondition.reduce(And), relation.output, - rowType, + table.schema.logicalPartitionType(), ignoreFailure = true) } - // We do not have to scan table if the following three requirements are met: - // 1) no other predicate; - // 2) partition condition can convert to paimon predicate; - // 3) partition predicate can be visit by OnlyPartitionKeyEqualVisitor. - val forceDeleteByRows = - otherCondition.nonEmpty || partitionPredicate.isEmpty || !partitionPredicate.get.visit( - visitor) - - if (forceDeleteByRows) { + if (otherCondition.isEmpty && partitionPredicate.nonEmpty) { + val allPartitions = table.newReadBuilder.newScan.listPartitions.asScala + val matchedPartitions = allPartitions.filter(partitionPredicate.get.test) + val rowDataPartitionComputer = new RowDataPartitionComputer( + CoreOptions.PARTITION_DEFAULT_NAME.defaultValue, + table.schema().logicalPartitionType(), + table.partitionKeys.asScala.toArray + ) + val dropPartitions = matchedPartitions.map { + partition => rowDataPartitionComputer.generatePartValues(partition).asScala.asJava + } + commit.dropPartitions(dropPartitions.asJava, BatchWriteBuilder.COMMIT_IDENTIFIER) + } else { val commitMessages = if (withPrimaryKeys) { performDeleteForPkTable(sparkSession) } else { performDeleteForNonPkTable(sparkSession) } writer.commit(commitMessages) - } else { - val dropPartitions = visitor.partitions() - commit.dropPartitions( - Collections.singletonList(dropPartitions), - BatchWriteBuilder.COMMIT_IDENTIFIER) } }