diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java index 71cf04cf5ef55..74ebe356b2053 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java @@ -230,6 +230,7 @@ private boolean execute( .getOrElse(null); if (orderType.equals(TableSorter.OrderType.NONE)) { JavaSparkContext javaSparkContext = new JavaSparkContext(spark().sparkContext()); + // askwang-todo: bucket aware 或 bucket un-ware 都无法对部分 buckets 或者部分 tasks 进行 compact switch (bucketMode) { case HASH_FIXED: case HASH_DYNAMIC: @@ -268,6 +269,8 @@ private void compactAwareBucketTable( } Set partitionToBeCompacted = getHistoryPartition(snapshotReader, partitionIdleTime); + + // // 之前是根据读 data splits 获取 bucket 信息,当 splits 比较多时容易出现 oom List> partitionBuckets = snapshotReader.bucketEntries().stream() .map(entry -> Pair.of(entry.partition(), entry.bucket())) @@ -325,6 +328,7 @@ private void compactAwareBucketTable( try (BatchTableCommit commit = writeBuilder.newCommit()) { CommitMessageSerializer serializer = new CommitMessageSerializer(); + // askwang-todo: commit 信息比较大时,collect 容易出现 oom 问题,目前没有办法解决。 List serializedMessages = commitMessageJavaRDD.collect(); List messages = new ArrayList<>(serializedMessages.size()); for (byte[] serializedMessage : serializedMessages) { diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala index 7d56fe867a1b3..ded002bc7638e 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala @@ -398,6 +398,9 @@ case class PaimonSparkWriter(table: FileStoreTable) { private def repartitionByPartitionsAndBucket(df: DataFrame): DataFrame = { val partitionCols = tableSchema.partitionKeys().asScala.map(col).toSeq + // askwang-todo: 给表添加 bucket 列时计算了一次 hash, bucket 范围在 [0, bucket-1], 范围比较小 + // 在 spark 的 RepartitionByExpression 中又会对 partition + bucket 列进行 hash, + // 就会出现数据倾斜,一个 Partition 分区会处理多个 bucket 的写入 df.repartition(partitionCols ++ Seq(col(BUCKET_COL)): _*) }