From 7ccf6c5e6ce5eff5447a5c5a18f6979cadec7e32 Mon Sep 17 00:00:00 2001 From: zouxxyy Date: Thu, 23 Nov 2023 21:56:46 +0800 Subject: [PATCH] update --- .../spark/commands/WriteIntoPaimonTable.scala | 11 +++- .../spark/sql/DynamicBucketTableTest.scala | 55 +++++++++++++++++++ 2 files changed, 63 insertions(+), 3 deletions(-) create mode 100644 paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DynamicBucketTableTest.scala diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala index 24af3406269c..f9f9c3230c59 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala @@ -21,7 +21,7 @@ import org.apache.paimon.CoreOptions.DYNAMIC_PARTITION_OVERWRITE import org.apache.paimon.data.BinaryRow import org.apache.paimon.index.PartitionIndex import org.apache.paimon.options.Options -import org.apache.paimon.spark.{DynamicOverWrite, InsertInto, Overwrite, SaveMode, SparkConnectorOptions, SparkRow} +import org.apache.paimon.spark._ import org.apache.paimon.spark.SparkUtils.createIOManager import org.apache.paimon.spark.schema.SparkSystemColumns import org.apache.paimon.spark.schema.SparkSystemColumns.{BUCKET_COL, ROW_KIND_COL} @@ -102,7 +102,12 @@ case class WriteIntoPaimonTable( case BucketMode.DYNAMIC => val partitioned = if (primaryKeyCols.nonEmpty) { // Make sure that the records with the same bucket values is within a task. - withBucketCol.repartition(primaryKeyCols: _*) + val assignerParallelism = table.coreOptions.dynamicBucketAssignerParallelism + if (assignerParallelism != null) { + withBucketCol.repartition(assignerParallelism, primaryKeyCols: _*) + } else { + withBucketCol.repartition(primaryKeyCols: _*) + } } else { withBucketCol } @@ -230,7 +235,7 @@ object WriteIntoPaimonTable { fileStoreTable: FileStoreTable, rowType: RowType, bucketColIndex: Int, - numSparkPartitions: Long, + numSparkPartitions: Int, toRow: ExpressionEncoder.Serializer[Row], fromRow: ExpressionEncoder.Deserializer[Row] ) extends BucketProcessor { diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DynamicBucketTableTest.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DynamicBucketTableTest.scala new file mode 100644 index 000000000000..853bac387e33 --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DynamicBucketTableTest.scala @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.paimon.spark.sql + +import org.apache.paimon.spark.PaimonSparkTestBase + +import org.apache.spark.sql.Row + +class DynamicBucketTableTest extends PaimonSparkTestBase { + + test(s"Paimon dynamic bucket table: write with assign parallelism") { + spark.sql(s""" + |CREATE TABLE T ( + | pk STRING, + | v STRING, + | pt STRING) + |TBLPROPERTIES ( + | 'primary-key' = 'pk, pt', + | 'bucket' = '-1', + | 'dynamic-bucket.target-row-num'='3', + | 'dynamic-bucket.assigner-parallelism'='3' + |) + |PARTITIONED BY (pt) + |""".stripMargin) + + spark.sql( + "INSERT INTO T VALUES ('1', 'a', 'p'), ('2', 'b', 'p'), ('3', 'c', 'p'), ('4', 'd', 'p'), ('5', 'e', 'p')") + + checkAnswer( + spark.sql("SELECT * FROM T ORDER BY pk"), + Row("1", "a", "p") :: Row("2", "b", "p") :: Row("3", "c", "p") :: Row("4", "d", "p") :: Row( + "5", + "e", + "p") :: Nil) + + checkAnswer( + spark.sql("SELECT DISTINCT bucket FROM `T$FILES`"), + Row(0) :: Row(1) :: Row(2) :: Nil) + } +}