Skip to content

Commit

Permalink
update for comment
Browse files Browse the repository at this point in the history
  • Loading branch information
Zouxxyy committed Feb 2, 2024
1 parent e71716d commit 145fd2d
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 164 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,46 +17,7 @@
*/
package org.apache.paimon.spark.sql

import org.junit.jupiter.api.Assertions

class AnalyzeTableTest extends AnalyzeTableTestBase {

test("Paimon analyze: spark use col stats") {
spark.sql(s"""
|CREATE TABLE T (id STRING, name STRING, i INT, l LONG)
|USING PAIMON
|TBLPROPERTIES ('primary-key'='id')
|""".stripMargin)

spark.sql(s"INSERT INTO T VALUES ('1', 'a', 1, 1)")
spark.sql(s"INSERT INTO T VALUES ('2', 'aaa', 1, 2)")
spark.sql(s"ANALYZE TABLE T COMPUTE STATISTICS FOR ALL COLUMNS")

val stats = getScanStatistic("SELECT * FROM T")
Assertions.assertEquals(2L, stats.rowCount.get.longValue())
// spark 33-' v2 table not support col stats
Assertions.assertEquals(0, stats.attributeStats.size)
}

test("Paimon analyze: partition filter push down hit") {
spark.sql(s"""
|CREATE TABLE T (id INT, name STRING, pt INT)
|TBLPROPERTIES ('primary-key'='id, pt', 'bucket'='2')
|PARTITIONED BY (pt)
|""".stripMargin)

spark.sql("INSERT INTO T VALUES (1, 'a', 1), (2, 'b', 1), (3, 'c', 2), (4, 'd', 3)")
spark.sql(s"ANALYZE TABLE T COMPUTE STATISTICS FOR ALL COLUMNS")

// partition push down hit
var sql = "SELECT * FROM T WHERE pt < 1"
// spark 33-' v2 table not support col stats
Assertions.assertEquals(4L, getScanStatistic(sql).rowCount.get.longValue())
checkAnswer(spark.sql(sql), Nil)

// partition push down not hit
sql = "SELECT * FROM T WHERE id < 1"
Assertions.assertEquals(4L, getScanStatistic(sql).rowCount.get.longValue())
checkAnswer(spark.sql(sql), Nil)
}
override protected def supportsColStats(): Boolean = false
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,46 +17,7 @@
*/
package org.apache.paimon.spark.sql

import org.junit.jupiter.api.Assertions

class AnalyzeTableTest extends AnalyzeTableTestBase {

test("Paimon analyze: spark use col stats") {
spark.sql(s"""
|CREATE TABLE T (id STRING, name STRING, i INT, l LONG)
|USING PAIMON
|TBLPROPERTIES ('primary-key'='id')
|""".stripMargin)

spark.sql(s"INSERT INTO T VALUES ('1', 'a', 1, 1)")
spark.sql(s"INSERT INTO T VALUES ('2', 'aaa', 1, 2)")
spark.sql(s"ANALYZE TABLE T COMPUTE STATISTICS FOR ALL COLUMNS")

val stats = getScanStatistic("SELECT * FROM T")
Assertions.assertEquals(2L, stats.rowCount.get.longValue())
// spark 33' v2 table not support col stats
Assertions.assertEquals(0, stats.attributeStats.size)
}

test("Paimon analyze: partition filter push down hit") {
spark.sql(s"""
|CREATE TABLE T (id INT, name STRING, pt INT)
|TBLPROPERTIES ('primary-key'='id, pt', 'bucket'='2')
|PARTITIONED BY (pt)
|""".stripMargin)

spark.sql("INSERT INTO T VALUES (1, 'a', 1), (2, 'b', 1), (3, 'c', 2), (4, 'd', 3)")
spark.sql(s"ANALYZE TABLE T COMPUTE STATISTICS FOR ALL COLUMNS")

// partition push down hit
var sql = "SELECT * FROM T WHERE pt < 1"
// spark 33' v2 table not support col stats
Assertions.assertEquals(4L, getScanStatistic(sql).rowCount.get.longValue())
checkAnswer(spark.sql(sql), Nil)

// partition push down not hit
sql = "SELECT * FROM T WHERE id < 1"
Assertions.assertEquals(4L, getScanStatistic(sql).rowCount.get.longValue())
checkAnswer(spark.sql(sql), Nil)
}
override protected def supportsColStats(): Boolean = false
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,45 +17,4 @@
*/
package org.apache.paimon.spark.sql

import org.junit.jupiter.api.Assertions

class AnalyzeTableTest extends AnalyzeTableTestBase {

test("Paimon analyze: spark use col stats") {
spark.sql(s"""
|CREATE TABLE T (id STRING, name STRING, i INT, l LONG)
|USING PAIMON
|TBLPROPERTIES ('primary-key'='id')
|""".stripMargin)

spark.sql(s"INSERT INTO T VALUES ('1', 'a', 1, 1)")
spark.sql(s"INSERT INTO T VALUES ('2', 'aaa', 1, 2)")
spark.sql(s"ANALYZE TABLE T COMPUTE STATISTICS FOR ALL COLUMNS")

val stats = getScanStatistic("SELECT * FROM T")
Assertions.assertEquals(2L, stats.rowCount.get.longValue())
Assertions.assertEquals(4, stats.attributeStats.size)
}

test("Paimon analyze: partition filter push down hit") {
spark.sql(s"""
|CREATE TABLE T (id INT, name STRING, pt INT)
|TBLPROPERTIES ('primary-key'='id, pt', 'bucket'='2')
|PARTITIONED BY (pt)
|""".stripMargin)

spark.sql("INSERT INTO T VALUES (1, 'a', 1), (2, 'b', 1), (3, 'c', 2), (4, 'd', 3)")
spark.sql(s"ANALYZE TABLE T COMPUTE STATISTICS FOR ALL COLUMNS")

// paimon will reserve partition filter and not return it to spark, we need to ensure stats are filtered correctly.
// partition push down hit
var sql = "SELECT * FROM T WHERE pt < 1"
Assertions.assertEquals(0L, getScanStatistic(sql).rowCount.get.longValue())
checkAnswer(spark.sql(sql), Nil)

// partition push down not hit
sql = "SELECT * FROM T WHERE id < 1"
Assertions.assertEquals(4L, getScanStatistic(sql).rowCount.get.longValue())
checkAnswer(spark.sql(sql), Nil)
}
}
class AnalyzeTableTest extends AnalyzeTableTestBase {}
Original file line number Diff line number Diff line change
Expand Up @@ -17,45 +17,4 @@
*/
package org.apache.paimon.spark.sql

import org.junit.jupiter.api.Assertions

class AnalyzeTableTest extends AnalyzeTableTestBase {

test("Paimon analyze: spark use col stats") {
spark.sql(s"""
|CREATE TABLE T (id STRING, name STRING, i INT, l LONG)
|USING PAIMON
|TBLPROPERTIES ('primary-key'='id')
|""".stripMargin)

spark.sql(s"INSERT INTO T VALUES ('1', 'a', 1, 1)")
spark.sql(s"INSERT INTO T VALUES ('2', 'aaa', 1, 2)")
spark.sql(s"ANALYZE TABLE T COMPUTE STATISTICS FOR ALL COLUMNS")

val stats = getScanStatistic("SELECT * FROM T")
Assertions.assertEquals(2L, stats.rowCount.get.longValue())
Assertions.assertEquals(4, stats.attributeStats.size)
}

test("Paimon analyze: partition filter push down hit") {
spark.sql(s"""
|CREATE TABLE T (id INT, name STRING, pt INT)
|TBLPROPERTIES ('primary-key'='id, pt', 'bucket'='2')
|PARTITIONED BY (pt)
|""".stripMargin)

spark.sql("INSERT INTO T VALUES (1, 'a', 1), (2, 'b', 1), (3, 'c', 2), (4, 'd', 3)")
spark.sql(s"ANALYZE TABLE T COMPUTE STATISTICS FOR ALL COLUMNS")

// paimon will reserve partition filter and not return it to spark, we need to ensure stats are filtered correctly.
// partition push down hit
var sql = "SELECT * FROM T WHERE pt < 1"
Assertions.assertEquals(0L, getScanStatistic(sql).rowCount.get.longValue())
checkAnswer(spark.sql(sql), Nil)

// partition push down not hit
sql = "SELECT * FROM T WHERE id < 1"
Assertions.assertEquals(4L, getScanStatistic(sql).rowCount.get.longValue())
checkAnswer(spark.sql(sql), Nil)
}
}
class AnalyzeTableTest extends AnalyzeTableTestBase {}
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,46 @@ abstract class AnalyzeTableTestBase extends PaimonSparkTestBase {
Assertions.assertEquals(2L, stats.rowCount.get.longValue())
}

test("Paimon analyze: spark use col stats") {
spark.sql(s"""
|CREATE TABLE T (id STRING, name STRING, i INT, l LONG)
|USING PAIMON
|TBLPROPERTIES ('primary-key'='id')
|""".stripMargin)

spark.sql(s"INSERT INTO T VALUES ('1', 'a', 1, 1)")
spark.sql(s"INSERT INTO T VALUES ('2', 'aaa', 1, 2)")
spark.sql(s"ANALYZE TABLE T COMPUTE STATISTICS FOR ALL COLUMNS")

val stats = getScanStatistic("SELECT * FROM T")
Assertions.assertEquals(2L, stats.rowCount.get.longValue())
Assertions.assertEquals(if (supportsColStats()) 4 else 0, stats.attributeStats.size)
}

test("Paimon analyze: partition filter push down hit") {
spark.sql(s"""
|CREATE TABLE T (id INT, name STRING, pt INT)
|TBLPROPERTIES ('primary-key'='id, pt', 'bucket'='2')
|PARTITIONED BY (pt)
|""".stripMargin)

spark.sql("INSERT INTO T VALUES (1, 'a', 1), (2, 'b', 1), (3, 'c', 2), (4, 'd', 3)")
spark.sql(s"ANALYZE TABLE T COMPUTE STATISTICS FOR ALL COLUMNS")

// paimon will reserve partition filter and not return it to spark, we need to ensure stats are filtered correctly.
// partition push down hit
var sql = "SELECT * FROM T WHERE pt < 1"
Assertions.assertEquals(
if (supportsColStats()) 0L else 4L,
getScanStatistic(sql).rowCount.get.longValue())
checkAnswer(spark.sql(sql), Nil)

// partition push down not hit
sql = "SELECT * FROM T WHERE id < 1"
Assertions.assertEquals(4L, getScanStatistic(sql).rowCount.get.longValue())
checkAnswer(spark.sql(sql), Nil)
}

protected def statsFileCount(tableLocation: Path, fileIO: FileIO): Int = {
fileIO.listStatus(new Path(tableLocation, "statistics")).length
}
Expand All @@ -311,4 +351,7 @@ abstract class AnalyzeTableTestBase extends PaimonSparkTestBase {
.get
relation.computeStats()
}

/** Spark supports the use of col stats for v2 table since 3.4+. */
protected def supportsColStats(): Boolean = true
}

0 comments on commit 145fd2d

Please sign in to comment.