Skip to content

Commit

Permalink
1
Browse files Browse the repository at this point in the history
  • Loading branch information
Zouxxyy committed Aug 9, 2024
1 parent ed972f9 commit 60e82a4
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

package org.apache.paimon.spark.sql

import org.apache.paimon.spark.{PaimonPrimaryKeyBucketedTableTest, PaimonPrimaryKeyNonBucketTableTest}
import org.apache.paimon.spark.{PaimonAppendBucketedTableTest, PaimonAppendNonBucketTableTest, PaimonPrimaryKeyBucketedTableTest, PaimonPrimaryKeyNonBucketTableTest}

class MergeIntoPrimaryKeyBucketedTableTest
extends MergeIntoTableTestBase
Expand All @@ -27,3 +27,11 @@ class MergeIntoPrimaryKeyBucketedTableTest
class MergeIntoPrimaryKeyNonBucketTableTest
extends MergeIntoTableTestBase
with PaimonPrimaryKeyNonBucketTableTest {}

class MergeIntoAppendBucketedTableTest
extends MergeIntoTableTestBase
with PaimonAppendBucketedTableTest {}

class MergeIntoAppendNonBucketedTableTest
extends MergeIntoTableTestBase
with PaimonAppendNonBucketTableTest {}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

package org.apache.paimon.spark.sql

import org.apache.paimon.spark.{PaimonPrimaryKeyBucketedTableTest, PaimonPrimaryKeyNonBucketTableTest}
import org.apache.paimon.spark.{PaimonAppendBucketedTableTest, PaimonAppendNonBucketTableTest, PaimonPrimaryKeyBucketedTableTest, PaimonPrimaryKeyNonBucketTableTest}

class MergeIntoPrimaryKeyBucketedTableTest
extends MergeIntoTableTestBase
Expand All @@ -27,3 +27,11 @@ class MergeIntoPrimaryKeyBucketedTableTest
class MergeIntoPrimaryKeyNonBucketTableTest
extends MergeIntoTableTestBase
with PaimonPrimaryKeyNonBucketTableTest {}

class MergeIntoAppendBucketedTableTest
extends MergeIntoTableTestBase
with PaimonAppendBucketedTableTest {}

class MergeIntoAppendNonBucketedTableTest
extends MergeIntoTableTestBase
with PaimonAppendNonBucketTableTest {}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

package org.apache.paimon.spark.sql

import org.apache.paimon.spark.{PaimonPrimaryKeyBucketedTableTest, PaimonPrimaryKeyNonBucketTableTest}
import org.apache.paimon.spark.{PaimonAppendBucketedTableTest, PaimonAppendNonBucketTableTest, PaimonPrimaryKeyBucketedTableTest, PaimonPrimaryKeyNonBucketTableTest}

class MergeIntoPrimaryKeyBucketedTableTest
extends MergeIntoTableTestBase
Expand All @@ -29,3 +29,13 @@ class MergeIntoPrimaryKeyNonBucketTableTest
extends MergeIntoTableTestBase
with MergeIntoNotMatchedBySourceTest
with PaimonPrimaryKeyNonBucketTableTest {}

class MergeIntoAppendBucketedTableTest
extends MergeIntoTableTestBase
with MergeIntoNotMatchedBySourceTest
with PaimonAppendBucketedTableTest {}

class MergeIntoAppendNonBucketedTableTest
extends MergeIntoTableTestBase
with MergeIntoNotMatchedBySourceTest
with PaimonAppendNonBucketTableTest {}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

package org.apache.paimon.spark.sql

import org.apache.paimon.spark.{PaimonPrimaryKeyBucketedTableTest, PaimonPrimaryKeyNonBucketTableTest}
import org.apache.paimon.spark.{PaimonAppendBucketedTableTest, PaimonAppendNonBucketTableTest, PaimonPrimaryKeyBucketedTableTest, PaimonPrimaryKeyNonBucketTableTest}

class MergeIntoPrimaryKeyBucketedTableTest
extends MergeIntoTableTestBase
Expand All @@ -29,3 +29,13 @@ class MergeIntoPrimaryKeyNonBucketTableTest
extends MergeIntoTableTestBase
with MergeIntoNotMatchedBySourceTest
with PaimonPrimaryKeyNonBucketTableTest {}

class MergeIntoAppendBucketedTableTest
extends MergeIntoTableTestBase
with MergeIntoNotMatchedBySourceTest
with PaimonAppendBucketedTableTest {}

class MergeIntoAppendNonBucketedTableTest
extends MergeIntoTableTestBase
with MergeIntoNotMatchedBySourceTest
with PaimonAppendNonBucketTableTest {}
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,23 @@ import scala.collection.mutable

trait PaimonTableTest extends SharedSparkSession {

val isPrimaryKeyTable: Boolean

val bucket: Int

def appendPrimaryKey(primaryKeys: Seq[String], props: mutable.Map[String, String]): Unit
def initProps(primaryOrBucketKeys: Seq[String], partitionKeys: Seq[String]): Map[String, String]

def createTable(
tableName: String,
columns: String,
primaryKeys: Seq[String],
primaryOrBucketKeys: Seq[String],
partitionKeys: Seq[String] = Seq.empty,
props: Map[String, String] = Map.empty): Unit = {
val newProps: mutable.Map[String, String] =
mutable.Map.empty[String, String] ++ Map("bucket" -> bucket.toString) ++ props
appendPrimaryKey(primaryKeys, newProps)
createTable0(tableName, columns, partitionKeys, newProps.toMap)
extraProps: Map[String, String] = Map.empty): Unit = {
createTable0(
tableName,
columns,
partitionKeys,
initProps(primaryOrBucketKeys, partitionKeys) ++ extraProps)
}

private def createTable0(
Expand Down Expand Up @@ -72,35 +75,38 @@ trait PaimonNonBucketedTable {
val bucket: Int = -1
}

trait PaimonPrimaryKeyTable {
def appendPrimaryKey(primaryKeys: Seq[String], props: mutable.Map[String, String]): Unit = {
assert(primaryKeys.nonEmpty)
props += ("primary-key" -> primaryKeys.mkString(","))
trait PaimonPrimaryKeyTable extends PaimonTableTest {
val isPrimaryKeyTable: Boolean = true

def initProps(
primaryOrBucketKeys: Seq[String],
partitionKeys: Seq[String]): Map[String, String] = {
assert(primaryOrBucketKeys.nonEmpty)
Map("primary-key" -> primaryOrBucketKeys.mkString(","), "bucket" -> bucket.toString)
}
}

trait PaimonAppendTable {
def appendPrimaryKey(primaryKeys: Seq[String], props: mutable.Map[String, String]): Unit = {
// nothing to do
trait PaimonAppendTable extends PaimonTableTest {
val isPrimaryKeyTable: Boolean = false

def initProps(
primaryOrBucketKeys: Seq[String],
partitionKeys: Seq[String]): Map[String, String] = {
if (bucket == -1) {
Map("bucket" -> bucket.toString)
} else {
// Bucket keys should not involved partition keys
val bucketKeys = primaryOrBucketKeys.filterNot(partitionKeys.contains(_))
assert(bucketKeys.nonEmpty)
Map("bucket-key" -> bucketKeys.mkString(","), "bucket" -> bucket.toString)
}
}
}

trait PaimonPrimaryKeyBucketedTableTest
extends PaimonTableTest
with PaimonPrimaryKeyTable
with PaimonBucketedTable

trait PaimonPrimaryKeyNonBucketTableTest
extends PaimonTableTest
with PaimonPrimaryKeyTable
with PaimonNonBucketedTable

trait PaimonAppendBucketedTableTest
extends PaimonTableTest
with PaimonAppendTable
with PaimonBucketedTable

trait PaimonAppendNonBucketTableTest
extends PaimonTableTest
with PaimonAppendTable
with PaimonNonBucketedTable
trait PaimonPrimaryKeyBucketedTableTest extends PaimonPrimaryKeyTable with PaimonBucketedTable

trait PaimonPrimaryKeyNonBucketTableTest extends PaimonPrimaryKeyTable with PaimonNonBucketedTable

trait PaimonAppendBucketedTableTest extends PaimonAppendTable with PaimonBucketedTable

trait PaimonAppendNonBucketTableTest extends PaimonAppendTable with PaimonNonBucketedTable
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,8 @@ abstract class MergeIntoTableTestBase extends PaimonSparkTestBase with PaimonTab
}

test("Paimon MergeInto: fail in case that maybe update primary key column") {
assume(isPrimaryKeyTable)

withTable("source", "target") {

Seq((101, 10, "c111"), (103, 30, "c333"))
Expand Down Expand Up @@ -536,31 +538,6 @@ abstract class MergeIntoTableTestBase extends PaimonSparkTestBase with PaimonTab
}
}

test("Paimon MergeInto: not support in table without primary keys") {
withTable("source", "target") {

Seq((1, 100, "c11"), (3, 300, "c33")).toDF("a", "b", "c").createOrReplaceTempView("source")

spark.sql(s"""
|CREATE TABLE target (a INT, b INT, c STRING)
|""".stripMargin)
spark.sql("INSERT INTO target values (1, 10, 'c1'), (2, 20, 'c2')")

val error = intercept[RuntimeException] {
spark.sql(s"""
|MERGE INTO target
|USING source
|ON target.a = source.a
|WHEN MATCHED THEN
|UPDATE SET a = source.a, b = source.b, c = source.c
|WHEN NOT MATCHED
|THEN INSERT (a, b, c) values (a, b, c)
|""".stripMargin)
}.getMessage
assert(error.contains("Only support to MergeInto table with primary keys."))
}
}

test(s"Paimon MergeInto: update on source eq target condition") {
withTable("source", "target") {
Seq((1, 100, "c11"), (3, 300, "c33")).toDF("a", "b", "c").createOrReplaceTempView("source")
Expand Down

0 comments on commit 60e82a4

Please sign in to comment.