diff --git a/paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTest.scala b/paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTest.scala index b513d93beb94..1a7dffaf1257 100644 --- a/paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTest.scala +++ b/paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTest.scala @@ -18,4 +18,12 @@ package org.apache.paimon.spark.sql -class MergeIntoTableTest extends MergeIntoTableTestBase {} +import org.apache.paimon.spark.{PaimonPrimaryKeyBucketedTableTest, PaimonPrimaryKeyNonBucketTableTest} + +class MergeIntoPrimaryKeyBucketedTableTest + extends MergeIntoTableTestBase + with PaimonPrimaryKeyBucketedTableTest {} + +class MergeIntoPrimaryKeyNonBucketTableTest + extends MergeIntoTableTestBase + with PaimonPrimaryKeyNonBucketTableTest {} diff --git a/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTest.scala b/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTest.scala index b513d93beb94..1a7dffaf1257 100644 --- a/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTest.scala +++ b/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTest.scala @@ -18,4 +18,12 @@ package org.apache.paimon.spark.sql -class MergeIntoTableTest extends MergeIntoTableTestBase {} +import org.apache.paimon.spark.{PaimonPrimaryKeyBucketedTableTest, PaimonPrimaryKeyNonBucketTableTest} + +class MergeIntoPrimaryKeyBucketedTableTest + extends MergeIntoTableTestBase + with PaimonPrimaryKeyBucketedTableTest {} + +class MergeIntoPrimaryKeyNonBucketTableTest + extends MergeIntoTableTestBase + with PaimonPrimaryKeyNonBucketTableTest {} diff --git a/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTest.scala b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTest.scala index c1811107bcf7..13b79e744e8f 100644 --- a/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTest.scala +++ b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTest.scala @@ -18,4 +18,14 @@ package org.apache.paimon.spark.sql -class MergeIntoTableTest extends MergeIntoTableTestBase with MergeIntoNotMatchedBySourceTest {} +import org.apache.paimon.spark.{PaimonPrimaryKeyBucketedTableTest, PaimonPrimaryKeyNonBucketTableTest} + +class MergeIntoPrimaryKeyBucketedTableTest + extends MergeIntoTableTestBase + with MergeIntoNotMatchedBySourceTest + with PaimonPrimaryKeyBucketedTableTest {} + +class MergeIntoPrimaryKeyNonBucketTableTest + extends MergeIntoTableTestBase + with MergeIntoNotMatchedBySourceTest + with PaimonPrimaryKeyNonBucketTableTest {} diff --git a/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTest.scala b/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTest.scala index c1811107bcf7..13b79e744e8f 100644 --- a/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTest.scala +++ b/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTest.scala @@ -18,4 +18,14 @@ package org.apache.paimon.spark.sql -class MergeIntoTableTest extends MergeIntoTableTestBase with MergeIntoNotMatchedBySourceTest {} +import org.apache.paimon.spark.{PaimonPrimaryKeyBucketedTableTest, PaimonPrimaryKeyNonBucketTableTest} + +class MergeIntoPrimaryKeyBucketedTableTest + extends MergeIntoTableTestBase + with MergeIntoNotMatchedBySourceTest + with PaimonPrimaryKeyBucketedTableTest {} + +class MergeIntoPrimaryKeyNonBucketTableTest + extends MergeIntoTableTestBase + with MergeIntoNotMatchedBySourceTest + with PaimonPrimaryKeyNonBucketTableTest {} diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonTableTest.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonTableTest.scala new file mode 100644 index 000000000000..53f41833f7a7 --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonTableTest.scala @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark + +import org.apache.spark.sql.test.SharedSparkSession + +import scala.collection.mutable + +trait PaimonTableTest extends SharedSparkSession { + + val bucket: Int + + def appendPrimaryKey(primaryKeys: Seq[String], props: mutable.Map[String, String]): Unit + + def createTable( + tableName: String, + columns: String, + primaryKeys: Seq[String], + partitionKeys: Seq[String] = Seq.empty, + props: Map[String, String] = Map.empty): Unit = { + val newProps: mutable.Map[String, String] = + mutable.Map.empty[String, String] ++ Map("bucket" -> bucket.toString) ++ props + appendPrimaryKey(primaryKeys, newProps) + createTable0(tableName, columns, partitionKeys, newProps.toMap) + } + + private def createTable0( + tableName: String, + columns: String, + partitionKeys: Seq[String], + props: Map[String, String]): Unit = { + val partitioned = if (partitionKeys.isEmpty) { + "" + } else { + s"PARTITIONED BY (${partitionKeys.mkString(", ")})" + } + val tblproperties = if (props.isEmpty) { + "" + } else { + val kvs = props.map(kv => s"'${kv._1}' = '${kv._2}'").mkString(", ") + s"TBLPROPERTIES ($kvs)" + } + sql(s""" + | CREATE TABLE $tableName ($columns) USING PAIMON + | $partitioned + | $tblproperties + |""".stripMargin) + } +} + +trait PaimonBucketedTable { + val bucket: Int = 3 +} + +trait PaimonNonBucketedTable { + val bucket: Int = -1 +} + +trait PaimonPrimaryKeyTable { + def appendPrimaryKey(primaryKeys: Seq[String], props: mutable.Map[String, String]): Unit = { + assert(primaryKeys.nonEmpty) + props += ("primary-key" -> primaryKeys.mkString(",")) + } +} + +trait PaimonAppendTable { + def appendPrimaryKey(primaryKeys: Seq[String], props: mutable.Map[String, String]): Unit = { + // nothing to do + } +} + +trait PaimonPrimaryKeyBucketedTableTest + extends PaimonTableTest + with PaimonPrimaryKeyTable + with PaimonBucketedTable + +trait PaimonPrimaryKeyNonBucketTableTest + extends PaimonTableTest + with PaimonPrimaryKeyTable + with PaimonNonBucketedTable + +trait PaimonAppendBucketedTableTest + extends PaimonTableTest + with PaimonAppendTable + with PaimonBucketedTable + +trait PaimonAppendNonBucketTableTest + extends PaimonTableTest + with PaimonAppendTable + with PaimonNonBucketedTable diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/MergeIntoNotMatchedBySourceTest.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/MergeIntoNotMatchedBySourceTest.scala index be4f077e0eb0..6a29181659e0 100644 --- a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/MergeIntoNotMatchedBySourceTest.scala +++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/MergeIntoNotMatchedBySourceTest.scala @@ -18,11 +18,11 @@ package org.apache.paimon.spark.sql -import org.apache.paimon.spark.PaimonSparkTestBase +import org.apache.paimon.spark.{PaimonSparkTestBase, PaimonTableTest} import org.apache.spark.sql.Row -trait MergeIntoNotMatchedBySourceTest extends PaimonSparkTestBase { +trait MergeIntoNotMatchedBySourceTest extends PaimonSparkTestBase with PaimonTableTest { import testImplicits._ @@ -31,10 +31,7 @@ trait MergeIntoNotMatchedBySourceTest extends PaimonSparkTestBase { Seq((1, 100, "c11"), (3, 300, "c33")).toDF("a", "b", "c").createOrReplaceTempView("source") - spark.sql(s""" - |CREATE TABLE target (a INT, b INT, c STRING) - |TBLPROPERTIES ('primary-key'='a', 'bucket'='2') - |""".stripMargin) + createTable("target", "a INT, b INT, c STRING", Seq("a")) spark.sql("INSERT INTO target values (1, 10, 'c1'), (2, 20, 'c2'), (5, 50, 'c5')") spark.sql(s""" @@ -58,10 +55,7 @@ trait MergeIntoNotMatchedBySourceTest extends PaimonSparkTestBase { Seq((1, 100, "c11"), (3, 300, "c33")).toDF("a", "b", "c").createOrReplaceTempView("source") - spark.sql(s""" - |CREATE TABLE target (a INT, b INT, c STRING) - |TBLPROPERTIES ('primary-key'='a', 'bucket'='2') - |""".stripMargin) + createTable("target", "a INT, b INT, c STRING", Seq("a")) spark.sql("INSERT INTO target values (1, 10, 'c1'), (2, 20, 'c2')") spark.sql(s""" @@ -89,10 +83,7 @@ trait MergeIntoNotMatchedBySourceTest extends PaimonSparkTestBase { .toDF("a", "b", "c1", "c2") .createOrReplaceTempView("source") - spark.sql(s""" - |CREATE TABLE target (a INT, b INT, c STRUCT) - |TBLPROPERTIES ('primary-key'='a', 'bucket'='2') - |""".stripMargin) + createTable("target", "a INT, b INT, c STRUCT", Seq("a")) spark.sql("INSERT INTO target values (1, 10, struct('x', 'y')), (2, 20, struct('x', 'y'))") spark.sql(s""" @@ -118,10 +109,7 @@ trait MergeIntoNotMatchedBySourceTest extends PaimonSparkTestBase { .toDF("a", "b", "c") .createOrReplaceTempView("source") - spark.sql(s""" - |CREATE TABLE target (a INT, b INT, c STRING) - |TBLPROPERTIES ('primary-key'='a', 'bucket'='2') - |""".stripMargin) + createTable("target", "a INT, b INT, c STRING", Seq("a")) spark.sql( "INSERT INTO target values (1, 10, 'c1'), (2, 20, 'c2'), (3, 30, 'c3'), (4, 40, 'c4'), (5, 50, 'c5')") diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala index 2fa49d1c12c5..65670ebd8db3 100644 --- a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala +++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala @@ -18,11 +18,11 @@ package org.apache.paimon.spark.sql -import org.apache.paimon.spark.PaimonSparkTestBase +import org.apache.paimon.spark.{PaimonSparkTestBase, PaimonTableTest} import org.apache.spark.sql.Row -abstract class MergeIntoTableTestBase extends PaimonSparkTestBase { +abstract class MergeIntoTableTestBase extends PaimonSparkTestBase with PaimonTableTest { import testImplicits._ @@ -31,10 +31,7 @@ abstract class MergeIntoTableTestBase extends PaimonSparkTestBase { Seq((1, 100, "c11"), (3, 300, "c33")).toDF("a", "b", "c").createOrReplaceTempView("source") - spark.sql(s""" - |CREATE TABLE target (a INT, b INT, c STRING) - |TBLPROPERTIES ('primary-key'='a', 'bucket'='2') - |""".stripMargin) + createTable("target", "a INT, b INT, c STRING", Seq("a")) spark.sql("INSERT INTO target values (1, 10, 'c1'), (2, 20, 'c2')") spark.sql(s""" @@ -56,10 +53,7 @@ abstract class MergeIntoTableTestBase extends PaimonSparkTestBase { Seq((1, 100, "c11"), (3, 300, "c33")).toDF("a", "b", "c").createOrReplaceTempView("source") - spark.sql(s""" - |CREATE TABLE target (a INT, b INT, c STRING) - |TBLPROPERTIES ('primary-key'='a', 'bucket'='2') - |""".stripMargin) + createTable("target", "a INT, b INT, c STRING", Seq("a")) spark.sql("INSERT INTO target values (1, 10, 'c1'), (2, 20, 'c2')") spark.sql(s""" @@ -78,10 +72,7 @@ abstract class MergeIntoTableTestBase extends PaimonSparkTestBase { Seq((1, 100, "c11"), (3, 300, "c33")).toDF("a", "b", "c").createOrReplaceTempView("source") - spark.sql(s""" - |CREATE TABLE target (a INT, b INT, c STRING) - |TBLPROPERTIES ('primary-key'='a', 'bucket'='2') - |""".stripMargin) + createTable("target", "a INT, b INT, c STRING", Seq("a")) spark.sql("INSERT INTO target values (1, 10, 'c1'), (2, 20, 'c2')") spark.sql(s""" @@ -103,10 +94,7 @@ abstract class MergeIntoTableTestBase extends PaimonSparkTestBase { Seq((1, 100, "c11"), (3, 300, "c33")).toDF("a", "b", "c").createOrReplaceTempView("source") - spark.sql(s""" - |CREATE TABLE target (a INT, b INT, c STRING) - |TBLPROPERTIES ('primary-key'='a', 'bucket'='2') - |""".stripMargin) + createTable("target", "a INT, b INT, c STRING", Seq("a")) spark.sql("INSERT INTO target values (1, 10, 'c1'), (2, 20, 'c2')") spark.sql(s""" @@ -130,10 +118,7 @@ abstract class MergeIntoTableTestBase extends PaimonSparkTestBase { Seq((1, 100, "c11"), (3, 300, "c33")).toDF("a", "b", "c").createOrReplaceTempView("source") - spark.sql(s""" - |CREATE TABLE target (a INT, b INT, c STRING) - |TBLPROPERTIES ('primary-key'='a', 'bucket'='2') - |""".stripMargin) + createTable("target", "a INT, b INT, c STRING", Seq("a")) spark.sql("INSERT INTO target values (1, 10, 'c1'), (2, 20, 'c2')") spark.sql(s""" @@ -157,10 +142,7 @@ abstract class MergeIntoTableTestBase extends PaimonSparkTestBase { Seq((1, 100, "c11"), (3, 300, "c33")).toDF("a", "b", "c").createOrReplaceTempView("source") - spark.sql(s""" - |CREATE TABLE target (a INT, b INT, c STRING) - |TBLPROPERTIES ('primary-key'='a', 'bucket'='2') - |""".stripMargin) + createTable("target", "a INT, b INT, c STRING", Seq("a")) spark.sql("INSERT INTO target values (1, 10, 'c1'), (2, 20, 'c2'), (3, 30, 'c3')") spark.sql(s""" @@ -186,10 +168,7 @@ abstract class MergeIntoTableTestBase extends PaimonSparkTestBase { Seq((1, 100, "c11"), (3, 300, "c33")).toDF("a", "b", "c").createOrReplaceTempView("source") - spark.sql(s""" - |CREATE TABLE target (a INT, b INT, c STRING) - |TBLPROPERTIES ('primary-key'='a', 'bucket'='2') - |""".stripMargin) + createTable("target", "a INT, b INT, c STRING", Seq("a")) spark.sql("INSERT INTO target values (1, 10, 'c1'), (2, 20, 'c2')") spark.sql(s""" @@ -213,10 +192,7 @@ abstract class MergeIntoTableTestBase extends PaimonSparkTestBase { Seq((1, 100, "c11"), (3, 300, "c33")).toDF("a", "b", "c").createOrReplaceTempView("source") - spark.sql(s""" - |CREATE TABLE target (a INT, b INT, c STRING) - |TBLPROPERTIES ('primary-key'='a', 'bucket'='2') - |""".stripMargin) + createTable("target", "a INT, b INT, c STRING", Seq("a")) spark.sql("INSERT INTO target values (1, 10, 'c1'), (2, 20, 'c2')") spark.sql(s""" @@ -240,10 +216,7 @@ abstract class MergeIntoTableTestBase extends PaimonSparkTestBase { Seq((1, 100, "c11"), (3, 300, "c33")).toDF("a", "b", "c").createOrReplaceTempView("source") - spark.sql(s""" - |CREATE TABLE target (a INT, b INT, c STRING) - |TBLPROPERTIES ('primary-key'='a', 'bucket'='2') - |""".stripMargin) + createTable("target", "a INT, b INT, c STRING", Seq("a")) spark.sql("INSERT INTO target values (1, 10, 'c1'), (2, 20, 'c2')") spark.sql(s""" @@ -269,10 +242,7 @@ abstract class MergeIntoTableTestBase extends PaimonSparkTestBase { .toDF("a", "b", "c") .createOrReplaceTempView("source") - spark.sql(s""" - |CREATE TABLE target (a INT, b INT, c STRING) - |TBLPROPERTIES ('primary-key'='a', 'bucket'='2') - |""".stripMargin) + createTable("target", "a INT, b INT, c STRING", Seq("a")) spark.sql( "INSERT INTO target values (1, 10, 'c1'), (2, 20, 'c2'), (3, 30, 'c3'), (4, 40, 'c4'), (5, 50, 'c5')") @@ -307,11 +277,7 @@ abstract class MergeIntoTableTestBase extends PaimonSparkTestBase { Seq.empty[(Int, Int, String)].toDF("a", "b", "c").createOrReplaceTempView("source") - spark.sql(s""" - |CREATE TABLE target (a INT, b INT, c STRING) - |TBLPROPERTIES ('primary-key'='a', 'bucket'='2') - |""".stripMargin) - + createTable("target", "a INT, b INT, c STRING", Seq("a")) spark.sql(s""" |MERGE INTO target |USING source @@ -331,10 +297,7 @@ abstract class MergeIntoTableTestBase extends PaimonSparkTestBase { Seq((1, 100, "c11"), (3, 300, "c33")).toDF("a", "b", "c").createOrReplaceTempView("source") - spark.sql(s""" - |CREATE TABLE target (a INT, b INT, c STRING) - |TBLPROPERTIES ('primary-key'='a', 'bucket'='2') - |""".stripMargin) + createTable("target", "a INT, b INT, c STRING", Seq("a")) spark.sql("INSERT INTO target values (1, 10, 'c1'), (2, 20, 'c2')") spark.sql(s""" @@ -358,10 +321,7 @@ abstract class MergeIntoTableTestBase extends PaimonSparkTestBase { Seq((1, 100, "c11"), (3, 300, "c33")).toDF("a", "b", "c").createOrReplaceTempView("source") - spark.sql(s""" - |CREATE TABLE target (a INT, b INT, c STRING) - |TBLPROPERTIES ('primary-key'='a', 'bucket'='2') - |""".stripMargin) + createTable("target", "a INT, b INT, c STRING", Seq("a")) spark.sql("INSERT INTO target values (1, 10, 'c1'), (2, 20, 'c2')") spark.sql(s""" @@ -385,10 +345,7 @@ abstract class MergeIntoTableTestBase extends PaimonSparkTestBase { Seq((1, 100, "c11"), (3, 300, "c33")).toDF("a", "b", "c").createOrReplaceTempView("source") - spark.sql(s""" - |CREATE TABLE target (a INT, b INT, c STRING) - |TBLPROPERTIES ('primary-key'='a', 'bucket'='2') - |""".stripMargin) + createTable("target", "a INT, b INT, c STRING", Seq("a")) spark.sql("INSERT INTO target values (1, 10, 'c1'), (2, 20, 'c2')") spark.sql(s""" @@ -412,10 +369,7 @@ abstract class MergeIntoTableTestBase extends PaimonSparkTestBase { Seq((1, 100, "c11"), (3, 300, "c33")).toDF("a", "b", "c").createOrReplaceTempView("source") - spark.sql(s""" - |CREATE TABLE target (a INT, b INT, c STRING) - |TBLPROPERTIES ('primary-key'='a', 'bucket'='2') - |""".stripMargin) + createTable("target", "a INT, b INT, c STRING", Seq("a")) spark.sql("INSERT INTO target values (1, 10, 'c1'), (2, 20, 'c2')") val error = intercept[RuntimeException] { @@ -440,10 +394,7 @@ abstract class MergeIntoTableTestBase extends PaimonSparkTestBase { .toDF("a", "b", "c") .createOrReplaceTempView("source") - spark.sql(s""" - |CREATE TABLE target (a INT, b INT, c STRING) - |TBLPROPERTIES ('primary-key'='a', 'bucket'='2') - |""".stripMargin) + createTable("target", "a INT, b INT, c STRING", Seq("a")) spark.sql("INSERT INTO target values (1, 10, 'c1'), (2, 20, 'c2')") spark.sql(s""" @@ -469,10 +420,7 @@ abstract class MergeIntoTableTestBase extends PaimonSparkTestBase { .toDF("a", "b", "c") .createOrReplaceTempView("source") - spark.sql(s""" - |CREATE TABLE target (a INT, b INT, c STRING) - |TBLPROPERTIES ('primary-key'='a', 'bucket'='2') - |""".stripMargin) + createTable("target", "a INT, b INT, c STRING", Seq("a")) spark.sql("INSERT INTO target values (1, 10, 'c1'), (2, 20, 'c2')") val error = intercept[RuntimeException] { @@ -495,10 +443,7 @@ abstract class MergeIntoTableTestBase extends PaimonSparkTestBase { Seq((1, 100, "c11"), (3, 300, "c33")).toDF("a", "b", "c").createOrReplaceTempView("source") - spark.sql(s""" - |CREATE TABLE target (a INT, b INT, c STRING) - |TBLPROPERTIES ('primary-key'='a', 'bucket'='2') - |""".stripMargin) + createTable("target", "a INT, b INT, c STRING", Seq("a")) spark.sql("INSERT INTO target values (1, 10, 'c1'), (2, 20, 'c2')") val error1 = intercept[RuntimeException] { @@ -536,10 +481,7 @@ abstract class MergeIntoTableTestBase extends PaimonSparkTestBase { .toDF("a", "b", "c1", "c2") .createOrReplaceTempView("source") - spark.sql(s""" - |CREATE TABLE target (a INT, b INT, c STRUCT) - |TBLPROPERTIES ('primary-key'='a', 'bucket'='2') - |""".stripMargin) + createTable("target", "a INT, b INT, c STRUCT", Seq("a")) spark.sql("INSERT INTO target values (1, 10, struct('x', 'y')), (2, 20, struct('x', 'y'))") spark.sql(s""" @@ -563,10 +505,7 @@ abstract class MergeIntoTableTestBase extends PaimonSparkTestBase { .toDF("a", "b", "c") .createOrReplaceTempView("source") - spark.sql(s""" - |CREATE TABLE target (a INT, b INT, c STRING) - |TBLPROPERTIES ('primary-key'='a', 'bucket'='2') - |""".stripMargin) + createTable("target", "a INT, b INT, c STRING", Seq("a")) spark.sql("INSERT INTO target values (1, 10, 'c1'), (2, 20, 'c2')") val error = intercept[RuntimeException] { @@ -626,10 +565,7 @@ abstract class MergeIntoTableTestBase extends PaimonSparkTestBase { withTable("source", "target") { Seq((1, 100, "c11"), (3, 300, "c33")).toDF("a", "b", "c").createOrReplaceTempView("source") - sql(s""" - |CREATE TABLE target (a INT, b INT, c STRING) - |TBLPROPERTIES ('primary-key'='a') - |""".stripMargin) + createTable("target", "a INT, b INT, c STRING", Seq("a")) sql("INSERT INTO target values (1, 10, 'c1'), (2, 20, 'c2')") sql(s"""