Skip to content

Commit

Permalink
todo list
Browse files Browse the repository at this point in the history
  • Loading branch information
askwang committed Oct 21, 2024
1 parent 87321e1 commit d5dea8c
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ private boolean execute(
.getOrElse(null);
if (orderType.equals(TableSorter.OrderType.NONE)) {
JavaSparkContext javaSparkContext = new JavaSparkContext(spark().sparkContext());
// askwang-todo: bucket aware 或 bucket un-ware 都无法对部分 buckets 或者部分 tasks 进行 compact
switch (bucketMode) {
case HASH_FIXED:
case HASH_DYNAMIC:
Expand Down Expand Up @@ -268,6 +269,8 @@ private void compactAwareBucketTable(
}
Set<BinaryRow> partitionToBeCompacted =
getHistoryPartition(snapshotReader, partitionIdleTime);

// // 之前是根据读 data splits 获取 bucket 信息,当 splits 比较多时容易出现 oom
List<Pair<byte[], Integer>> partitionBuckets =
snapshotReader.bucketEntries().stream()
.map(entry -> Pair.of(entry.partition(), entry.bucket()))
Expand Down Expand Up @@ -325,6 +328,7 @@ private void compactAwareBucketTable(

try (BatchTableCommit commit = writeBuilder.newCommit()) {
CommitMessageSerializer serializer = new CommitMessageSerializer();
// askwang-todo: commit 信息比较大时,collect 容易出现 oom 问题,目前没有办法解决。
List<byte[]> serializedMessages = commitMessageJavaRDD.collect();
List<CommitMessage> messages = new ArrayList<>(serializedMessages.size());
for (byte[] serializedMessage : serializedMessages) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,9 @@ case class PaimonSparkWriter(table: FileStoreTable) {

private def repartitionByPartitionsAndBucket(df: DataFrame): DataFrame = {
val partitionCols = tableSchema.partitionKeys().asScala.map(col).toSeq
// askwang-todo: 给表添加 bucket 列时计算了一次 hash, bucket 范围在 [0, bucket-1], 范围比较小
// 在 spark 的 RepartitionByExpression 中又会对 partition + bucket 列进行 hash,
// 就会出现数据倾斜,一个 Partition 分区会处理多个 bucket 的写入
df.repartition(partitionCols ++ Seq(col(BUCKET_COL)): _*)
}

Expand Down

0 comments on commit d5dea8c

Please sign in to comment.