Skip to content

Commit

Permalink
Default parallelism also considers numShufflePartitions (#3218)
Browse files Browse the repository at this point in the history
  • Loading branch information
wForget authored Apr 16, 2024
1 parent c05cbca commit e27ceb4
Showing 1 changed file with 5 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,11 @@ case class PaimonSparkWriter(table: FileStoreTable) {
// Topology: input -> shuffle by special key & partition hash -> bucket-assigner -> shuffle by partition & bucket
val numParallelism = Option(table.coreOptions.dynamicBucketAssignerParallelism)
.map(_.toInt)
.getOrElse(sparkSession.sparkContext.defaultParallelism)
.getOrElse {
val defaultParallelism = sparkSession.sparkContext.defaultParallelism
val numShufflePartitions = sparkSession.sessionState.conf.numShufflePartitions
Math.max(defaultParallelism, numShufflePartitions)
}
val numAssigners = Option(table.coreOptions.dynamicBucketInitialBuckets)
.map(initialBuckets => Math.min(initialBuckets.toInt, numParallelism))
.getOrElse(numParallelism)
Expand Down

0 comments on commit e27ceb4

Please sign in to comment.