Skip to content

Commit

Permalink
[newcode] partitionby, insert overwrite test
Browse files Browse the repository at this point in the history
partitionBy CustomPartitioner failed.
  • Loading branch information
askwang committed Jun 3, 2024
1 parent b35ea91 commit 4e674c2
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package org.apache.paimon.spark.sql

import org.apache.paimon.spark.PaimonSparkTestBase
import org.apache.spark.sql.Row

class InsertOverwriteTestAskwang extends PaimonSparkTestBase {

var hasPk: Boolean = true
var bucket: Int = 1

test(s"insert overwrite non-partitioned table: hasPk: $hasPk, bucket: $bucket") {
val prop = if (hasPk) {
s"'primary-key'='a,b', 'bucket' = '$bucket' "
} else if (bucket != -1) {
s"'bucket-key'='a,b', 'bucket' = '$bucket' "
} else {
"'write-only'='true'"
}

spark.sql(s"""
|CREATE TABLE T (a INT, b INT, c STRING)
|TBLPROPERTIES ($prop)
|""".stripMargin)

spark.sql("INSERT INTO T values (1, 1, '1'), (2, 2, '2')")
checkAnswer(
spark.sql("SELECT * FROM T ORDER BY a, b"),
Row(1, 1, "1") :: Row(2, 2, "2") :: Nil)

spark.sql("INSERT OVERWRITE T VALUES (1, 3, '3'), (2, 4, '4')");
checkAnswer(
spark.sql("SELECT * FROM T ORDER BY a, b"),
Row(1, 3, "3") :: Row(2, 4, "4") :: Nil)
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package org.apache.paimon.spark.sql

import org.apache.paimon.spark.PaimonSparkTestBase
import org.apache.spark.sql.Row

class InsertOverwriteTestAskwang extends PaimonSparkTestBase {

val hasPk = true
val bucket = 1

test(s"insert overwrite non-partitioned table: hasPk: $hasPk, bucket: $bucket") {
val prop = if (hasPk) {
s"'primary-key'='a,b', 'bucket' = '$bucket' "
} else if (bucket != -1) {
s"'bucket-key'='a,b', 'bucket' = '$bucket' "
} else {
"'write-only'='true'"
}

spark.sql(s"""
|CREATE TABLE T (a INT, b INT, c STRING)
|TBLPROPERTIES ($prop)
|""".stripMargin)

spark.sql("INSERT INTO T values (1, 1, '1'), (2, 2, '2')")
checkAnswer(
spark.sql("SELECT * FROM T ORDER BY a, b"),
Row(1, 1, "1") :: Row(2, 2, "2") :: Nil)

spark.sql("INSERT OVERWRITE T VALUES (1, 3, '3'), (2, 4, '4')");
checkAnswer(
spark.sql("SELECT * FROM T ORDER BY a, b"),
Row(1, 3, "3") :: Row(2, 4, "4") :: Nil)
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package org.apache.paimon.spark


import org.apache.spark.{HashPartitioner, Partitioner}
import org.apache.spark.rdd.RDD

class AskwangScalaITCase extends PaimonSparkTestBase {

test("spark partitionBy function") {

val sc = spark.sparkContext
val rdd = sc.makeRDD(Seq((1, "A"), (2, "B"), (3, "C"), (4, "D")), 2)

println("------init data-----")
getMapPartitionsResult(rdd).foreach(println)

println("-----after partitionBy-----")
// akwang-todo: 使用 CustomPartitioner 报错 Task not serializable
val partitionedRdd = rdd.partitionBy(new HashPartitioner(4))
getMapPartitionsResult(partitionedRdd).foreach(println)
}

class CustomPartitioner(partitions: Int) extends Partitioner {
override def numPartitions: Int = partitions

override def getPartition(key: Any): Int = key.asInstanceOf[Int] % partitions
}

// 读取rdd每个partition数据
def getMapPartitionsResult(rdd : RDD[(Int, String)]): Array[(String, List[(Int, String)])] = {
val part_map = scala.collection.mutable.Map[String, List[(Int, String)]]()
rdd.mapPartitionsWithIndex {
(partIdx, iter) => {
while (iter.hasNext) {
val part_name = "part_" + partIdx
val elem = iter.next()
if (part_map.contains(part_name)) {
var elems = part_map(part_name)
elems ::= elem
part_map(part_name) = elems
} else {
part_map(part_name) = List[(Int, String)](elem)
}
}
part_map.iterator
}
}.collect()
}
}

0 comments on commit 4e674c2

Please sign in to comment.