Skip to content

Commit

Permalink
[spak] unify the writer logic of deletion vector (#3441)
Browse files Browse the repository at this point in the history
  • Loading branch information
YannByron authored May 31, 2024
1 parent e77ea33 commit 98b7598
Showing 1 changed file with 11 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -243,27 +243,18 @@ case class PaimonSparkWriter(table: FileStoreTable) {
serializer.serialize(commitMessage)
}

val serializedCommits = fileStore.bucketMode() match {
case BucketMode.BUCKET_UNAWARE =>
deletionVectors.mapPartitions {
iter: Iterator[SparkDeletionVectors] =>
val serializer = new CommitMessageSerializer
iter.map(commitDeletionVector(_, serializer))
}
case _ =>
deletionVectors
.groupByKey(_.partitionAndBucket)
.mapGroups {
case (_, iter: Iterator[SparkDeletionVectors]) =>
val serializer = new CommitMessageSerializer
val grouped = iter
.reduce(
(sdv1, sdv2) =>
sdv1.copy(dataFileAndDeletionVector =
sdv1.dataFileAndDeletionVector ++ sdv2.dataFileAndDeletionVector))
commitDeletionVector(grouped, serializer)
val serializedCommits = deletionVectors
.groupByKey(_.partitionAndBucket)
.mapGroups {
case (_, iter: Iterator[SparkDeletionVectors]) =>
val serializer = new CommitMessageSerializer
val grouped = iter.reduce {
(sdv1, sdv2) =>
sdv1.copy(dataFileAndDeletionVector =
sdv1.dataFileAndDeletionVector ++ sdv2.dataFileAndDeletionVector)
}
}
commitDeletionVector(grouped, serializer)
}
serializedCommits
.collect()
.map(deserializeCommitMessage(serializer, _))
Expand Down

0 comments on commit 98b7598

Please sign in to comment.