From 2fa598dee9fa54d3324be8fa5b8fa3eff669b0c1 Mon Sep 17 00:00:00 2001 From: Xiduo You Date: Thu, 12 Sep 2024 10:18:34 +0800 Subject: [PATCH] [spark] Delete with deletion vectors should enable AQE (#4171) --- .../DeleteFromPaimonTableCommand.scala | 23 ++++++++----------- 1 file changed, 10 insertions(+), 13 deletions(-) diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala index cc440dd5c16b..2b3888911226 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala @@ -117,24 +117,21 @@ case class DeleteFromPaimonTableCommand( } def performNonPrimaryKeyDelete(sparkSession: SparkSession): Seq[CommitMessage] = { - val pathFactory = fileStore.pathFactory() // Step1: the candidate data splits which are filtered by Paimon Predicate. val candidateDataSplits = findCandidateDataSplits(condition, relation.output) val dataFilePathToMeta = candidateFileMap(candidateDataSplits) if (deletionVectorsEnabled) { - withSQLConf("spark.sql.adaptive.enabled" -> "false") { - // Step2: collect all the deletion vectors that marks the deleted rows. - val deletionVectors = collectDeletionVectors( - candidateDataSplits, - dataFilePathToMeta, - condition, - relation, - sparkSession) - - // Step3: update the touched deletion vectors and index files - writer.persistDeletionVectors(deletionVectors) - } + // Step2: collect all the deletion vectors that marks the deleted rows. + val deletionVectors = collectDeletionVectors( + candidateDataSplits, + dataFilePathToMeta, + condition, + relation, + sparkSession) + + // Step3: update the touched deletion vectors and index files + writer.persistDeletionVectors(deletionVectors) } else { // Step2: extract out the exactly files, which must have at least one record to be updated. val touchedFilePaths =