Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
Zouxxyy committed Nov 23, 2023
1 parent 23f98e5 commit 7ccf6c5
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.apache.paimon.CoreOptions.DYNAMIC_PARTITION_OVERWRITE
import org.apache.paimon.data.BinaryRow
import org.apache.paimon.index.PartitionIndex
import org.apache.paimon.options.Options
import org.apache.paimon.spark.{DynamicOverWrite, InsertInto, Overwrite, SaveMode, SparkConnectorOptions, SparkRow}
import org.apache.paimon.spark._
import org.apache.paimon.spark.SparkUtils.createIOManager
import org.apache.paimon.spark.schema.SparkSystemColumns
import org.apache.paimon.spark.schema.SparkSystemColumns.{BUCKET_COL, ROW_KIND_COL}
Expand Down Expand Up @@ -102,7 +102,12 @@ case class WriteIntoPaimonTable(
case BucketMode.DYNAMIC =>
val partitioned = if (primaryKeyCols.nonEmpty) {
// Make sure that the records with the same bucket values is within a task.
withBucketCol.repartition(primaryKeyCols: _*)
val assignerParallelism = table.coreOptions.dynamicBucketAssignerParallelism
if (assignerParallelism != null) {
withBucketCol.repartition(assignerParallelism, primaryKeyCols: _*)
} else {
withBucketCol.repartition(primaryKeyCols: _*)
}
} else {
withBucketCol
}
Expand Down Expand Up @@ -230,7 +235,7 @@ object WriteIntoPaimonTable {
fileStoreTable: FileStoreTable,
rowType: RowType,
bucketColIndex: Int,
numSparkPartitions: Long,
numSparkPartitions: Int,
toRow: ExpressionEncoder.Serializer[Row],
fromRow: ExpressionEncoder.Deserializer[Row]
) extends BucketProcessor {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.paimon.spark.sql

import org.apache.paimon.spark.PaimonSparkTestBase

import org.apache.spark.sql.Row

class DynamicBucketTableTest extends PaimonSparkTestBase {

test(s"Paimon dynamic bucket table: write with assign parallelism") {
spark.sql(s"""
|CREATE TABLE T (
| pk STRING,
| v STRING,
| pt STRING)
|TBLPROPERTIES (
| 'primary-key' = 'pk, pt',
| 'bucket' = '-1',
| 'dynamic-bucket.target-row-num'='3',
| 'dynamic-bucket.assigner-parallelism'='3'
|)
|PARTITIONED BY (pt)
|""".stripMargin)

spark.sql(
"INSERT INTO T VALUES ('1', 'a', 'p'), ('2', 'b', 'p'), ('3', 'c', 'p'), ('4', 'd', 'p'), ('5', 'e', 'p')")

checkAnswer(
spark.sql("SELECT * FROM T ORDER BY pk"),
Row("1", "a", "p") :: Row("2", "b", "p") :: Row("3", "c", "p") :: Row("4", "d", "p") :: Row(
"5",
"e",
"p") :: Nil)

checkAnswer(
spark.sql("SELECT DISTINCT bucket FROM `T$FILES`"),
Row(0) :: Row(1) :: Row(2) :: Nil)
}
}

0 comments on commit 7ccf6c5

Please sign in to comment.