Skip to content

Commit

Permalink
[spark] drop partition when all the conditions is partition-related (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
YannByron authored Apr 26, 2024
1 parent 6521678 commit 68355b5
Showing 1 changed file with 17 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

package org.apache.paimon.spark.commands

import org.apache.paimon.predicate.OnlyPartitionKeyEqualVisitor
import org.apache.paimon.CoreOptions
import org.apache.paimon.spark.PaimonSplitScan
import org.apache.paimon.spark.catalyst.Compatibility
import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper
Expand All @@ -27,6 +27,7 @@ import org.apache.paimon.spark.schema.SparkSystemColumns.ROW_KIND_COL
import org.apache.paimon.table.FileStoreTable
import org.apache.paimon.table.sink.{BatchWriteBuilder, CommitMessage}
import org.apache.paimon.types.RowKind
import org.apache.paimon.utils.RowDataPartitionComputer

import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.PaimonUtils.createDataset
Expand All @@ -36,7 +37,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{Filter, SupportsSubquery}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.functions.lit

import java.util.{Collections, UUID}
import java.util.UUID

import scala.collection.JavaConverters._

Expand All @@ -62,38 +63,35 @@ case class DeleteFromPaimonTableCommand(
table.partitionKeys().asScala,
sparkSession.sessionState.conf.resolver)

// TODO: provide another partition visitor to support more partition predicate.
val visitor = new OnlyPartitionKeyEqualVisitor(table.partitionKeys)
val partitionPredicate = if (partitionCondition.isEmpty) {
None
} else {
convertConditionToPaimonPredicate(
partitionCondition.reduce(And),
relation.output,
rowType,
table.schema.logicalPartitionType(),
ignoreFailure = true)
}

// We do not have to scan table if the following three requirements are met:
// 1) no other predicate;
// 2) partition condition can convert to paimon predicate;
// 3) partition predicate can be visit by OnlyPartitionKeyEqualVisitor.
val forceDeleteByRows =
otherCondition.nonEmpty || partitionPredicate.isEmpty || !partitionPredicate.get.visit(
visitor)

if (forceDeleteByRows) {
if (otherCondition.isEmpty && partitionPredicate.nonEmpty) {
val allPartitions = table.newReadBuilder.newScan.listPartitions.asScala
val matchedPartitions = allPartitions.filter(partitionPredicate.get.test)
val rowDataPartitionComputer = new RowDataPartitionComputer(
CoreOptions.PARTITION_DEFAULT_NAME.defaultValue,
table.schema().logicalPartitionType(),
table.partitionKeys.asScala.toArray
)
val dropPartitions = matchedPartitions.map {
partition => rowDataPartitionComputer.generatePartValues(partition).asScala.asJava
}
commit.dropPartitions(dropPartitions.asJava, BatchWriteBuilder.COMMIT_IDENTIFIER)
} else {
val commitMessages = if (withPrimaryKeys) {
performDeleteForPkTable(sparkSession)
} else {
performDeleteForNonPkTable(sparkSession)
}
writer.commit(commitMessages)
} else {
val dropPartitions = visitor.partitions()
commit.dropPartitions(
Collections.singletonList(dropPartitions),
BatchWriteBuilder.COMMIT_IDENTIFIER)
}
}

Expand Down

0 comments on commit 68355b5

Please sign in to comment.