Skip to content

Commit

Permalink
sort system table query result
Browse files Browse the repository at this point in the history
  • Loading branch information
askwang committed Oct 10, 2024
1 parent 77c7dec commit 390c77f
Showing 1 changed file with 60 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@ package org.apache.paimon.spark.sql
import org.apache.paimon.spark.PaimonSparkTestBase

import org.apache.spark.scheduler.{SparkListener, SparkListenerStageSubmitted}
import org.apache.spark.sql.Row

import scala.jdk.CollectionConverters._

/** paimon spark test. */
class AskwangPaimonSparkTest extends PaimonSparkTestBase {
test(s"Partition for partitioned table: tmp") {
test(s"custom pk and bucket prop") {
val hasPk = true
val bucket = 4
val prop =
Expand Down Expand Up @@ -94,6 +95,64 @@ class AskwangPaimonSparkTest extends PaimonSparkTestBase {
}
}

test("system table: sort tags table") {
spark.sql(s"""
|CREATE TABLE T (id STRING, name STRING)
|USING PAIMON
|""".stripMargin)

spark.sql(s"INSERT INTO T VALUES(1, 'a')")

spark.sql("CALL paimon.sys.create_tag(table => 'test.T', tag => '2024-10-02')")
spark.sql("CALL paimon.sys.create_tag(table => 'test.T', tag => '2024-10-01')")
spark.sql("CALL paimon.sys.create_tag(table => 'test.T', tag => '2024-10-04')")
spark.sql("CALL paimon.sys.create_tag(table => 'test.T', tag => '2024-10-03')")

checkAnswer(
spark.sql("select tag_name from `T$tags`"),
Row("2024-10-01") :: Row("2024-10-02") :: Row("2024-10-03") :: Row("2024-10-04") :: Nil)
}

test("system table: sort partitions table") {
spark.sql(s"""
|CREATE TABLE T (a INT, b STRING,dt STRING,hh STRING)
|PARTITIONED BY (dt, hh)
|TBLPROPERTIES ('primary-key'='a,dt,hh', 'bucket' = '3')
|""".stripMargin)

spark.sql("INSERT INTO T VALUES(1, 'a', '2024-10-10', '01')")
spark.sql("INSERT INTO T VALUES(3, 'c', '2024-10-10', '23')")
spark.sql("INSERT INTO T VALUES(2, 'b', '2024-10-10', '12')")
spark.sql("INSERT INTO T VALUES(5, 'f', '2024-10-09', '02')")
spark.sql("INSERT INTO T VALUES(4, 'd', '2024-10-09', '01')")

checkAnswer(spark.sql("select count(*) from `T$partitions`"), Row(5) :: Nil)
checkAnswer(
spark.sql("select partition from `T$partitions`"),
Row("[2024-10-09, 01]") :: Row("[2024-10-09, 02]") :: Row("[2024-10-10, 01]") :: Row(
"[2024-10-10, 12]") :: Row("[2024-10-10, 23]") :: Nil
)
}

test("system table: sort buckets table") {
spark.sql(s"""
|CREATE TABLE T (a INT, b STRING,dt STRING,hh STRING)
|PARTITIONED BY (dt, hh)
|TBLPROPERTIES ('primary-key'='a,dt,hh', 'bucket' = '3')
|""".stripMargin)

spark.sql("INSERT INTO T VALUES(1, 'a', '2024-10-10', '01')")
spark.sql("INSERT INTO T VALUES(2, 'b', '2024-10-10', '01')")
spark.sql("INSERT INTO T VALUES(3, 'c', '2024-10-10', '01')")
spark.sql("INSERT INTO T VALUES(4, 'd', '2024-10-10', '01')")
spark.sql("INSERT INTO T VALUES(5, 'f', '2024-10-10', '01')")

checkAnswer(spark.sql("select count(*) from `T$partitions`"), Row(1) :: Nil)
checkAnswer(
spark.sql("select partition,bucket from `T$buckets`"),
Row("[2024-10-10, 01]", 0) :: Row("[2024-10-10, 01]", 1) :: Row("[2024-10-10, 01]", 2) :: Nil)
}

test("tmp: xxx") {
println("version: " + sparkVersion)
}
Expand Down

0 comments on commit 390c77f

Please sign in to comment.