Skip to content

Commit

Permalink
[note] PaimonSparkWriter
Browse files Browse the repository at this point in the history
  • Loading branch information
askwang committed Jun 13, 2024
1 parent e9ec743 commit 7ee8be9
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 4 deletions.
2 changes: 2 additions & 0 deletions Notes
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
code:
between 0.8~0.9,switch to master branch if test new feature.

principles:
- Learn from project and do your things based on this project.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public class FixedBucketRowKeyExtractor extends RowKeyExtractor {
public FixedBucketRowKeyExtractor(TableSchema schema) {
super(schema);
numBuckets = new CoreOptions(schema.options()).bucket();
// bucketKeys() 如果 bucketKeys 为空,则返回 trimmedPrimaryKeys() 调用结果
sameBucketKeyAndTrimmedPrimaryKey = schema.bucketKeys().equals(schema.trimmedPrimaryKeys());
bucketKeyProjection =
CodeGenUtils.newProjection(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,14 @@ import org.apache.paimon.spark.util.SparkRowUtils
import org.apache.paimon.table.{BucketMode, FileStoreTable}
import org.apache.paimon.table.sink.{BatchWriteBuilder, CommitMessage, CommitMessageImpl, CommitMessageSerializer, RowPartitionKeyExtractor}
import org.apache.paimon.utils.SerializationUtils

import org.apache.spark.{Partitioner, TaskContext}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import org.apache.spark.sql.{Column, DataFrame, Dataset, Row, SparkSession}
import org.apache.spark.sql.functions._

import java.io.IOException
import java.util.Collections.singletonMap

import scala.collection.JavaConverters._
import scala.collection.mutable

case class PaimonSparkWriter(table: FileStoreTable) {

Expand Down Expand Up @@ -69,6 +68,8 @@ case class PaimonSparkWriter(table: FileStoreTable) {

// append _bucket_ column as placeholder
val withInitBucketCol = data.withColumn(BUCKET_COL, lit(-1))
// withColumn 添加的 BUCKET_COL 在 scheme 中的 index 位置
// 比如 <id int, name string, age int, _bucket_ ?>, schema size=4, idx(_bucket_)=3
val bucketColIdx = withInitBucketCol.schema.size - 1
val encoderGroupWithBucketCol = EncoderSerDeGroup(withInitBucketCol.schema)

Expand All @@ -92,6 +93,10 @@ case class PaimonSparkWriter(table: FileStoreTable) {
def writeWithBucketProcessor(
dataFrame: DataFrame,
processor: BucketProcessor): Dataset[Array[Byte]] = {
// processor.processPartition 是一个 func,func 的类型为 Iterator[T] => Iterator[U]
// 本质就是 mapPartitions(iter => newIter)
// 根据 partitionKey 和 bucketKey 进行 repartition
// todo-askwang: append 表的 bucketKey 计算
val repartitioned = repartitionByPartitionsAndBucket(
dataFrame.mapPartitions(processor.processPartition)(encoderGroupWithBucketCol.encoder))
repartitioned.mapPartitions {
Expand Down Expand Up @@ -181,6 +186,7 @@ case class PaimonSparkWriter(table: FileStoreTable) {
encoderGroupWithBucketCol))
}
case BucketMode.BUCKET_UNAWARE =>
// BUCKET_UNAREARE 仅支持 append only table,数据默认写入到 bucket-0 目录
// Topology: input ->
writeWithoutBucket()
case BucketMode.HASH_FIXED =>
Expand Down Expand Up @@ -320,7 +326,7 @@ case class PaimonSparkWriter(table: FileStoreTable) {
}

private def repartitionByPartitionsAndBucket(ds: Dataset[Row]): Dataset[Row] = {
val partitionCols = tableSchema.partitionKeys().asScala.map(col)
val partitionCols: mutable.Buffer[Column] = tableSchema.partitionKeys().asScala.map(col)
ds.toDF().repartition(partitionCols ++ Seq(col(BUCKET_COL)): _*)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ import org.apache.spark.rdd.RDD

class AskwangScalaITCase extends PaimonSparkTestBase {

/**
* 读取分区内元素
*/
test("spark partitionBy function") {

val sc = spark.sparkContext
Expand Down

0 comments on commit 7ee8be9

Please sign in to comment.