Skip to content

Commit

Permalink
[spark] Delete with deletion vectors should enable AQE (apache#4171)
Browse files Browse the repository at this point in the history
  • Loading branch information
ulysses-you authored Sep 12, 2024
1 parent 57135be commit 2fa598d
Showing 1 changed file with 10 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -117,24 +117,21 @@ case class DeleteFromPaimonTableCommand(
}

def performNonPrimaryKeyDelete(sparkSession: SparkSession): Seq[CommitMessage] = {
val pathFactory = fileStore.pathFactory()
// Step1: the candidate data splits which are filtered by Paimon Predicate.
val candidateDataSplits = findCandidateDataSplits(condition, relation.output)
val dataFilePathToMeta = candidateFileMap(candidateDataSplits)

if (deletionVectorsEnabled) {
withSQLConf("spark.sql.adaptive.enabled" -> "false") {
// Step2: collect all the deletion vectors that marks the deleted rows.
val deletionVectors = collectDeletionVectors(
candidateDataSplits,
dataFilePathToMeta,
condition,
relation,
sparkSession)

// Step3: update the touched deletion vectors and index files
writer.persistDeletionVectors(deletionVectors)
}
// Step2: collect all the deletion vectors that marks the deleted rows.
val deletionVectors = collectDeletionVectors(
candidateDataSplits,
dataFilePathToMeta,
condition,
relation,
sparkSession)

// Step3: update the touched deletion vectors and index files
writer.persistDeletionVectors(deletionVectors)
} else {
// Step2: extract out the exactly files, which must have at least one record to be updated.
val touchedFilePaths =
Expand Down

0 comments on commit 2fa598d

Please sign in to comment.