Skip to content

Commit

Permalink
1
Browse files Browse the repository at this point in the history
  • Loading branch information
Zouxxyy committed Jul 28, 2024
1 parent 6f4aa51 commit 6f98752
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,9 @@ abstract class PaimonBaseScan(

override def estimateStatistics(): Statistics = {
val stats = PaimonStatistics(this)
if (reservedFilters.nonEmpty) {
// When statistics doesn't exist, we don't need to perform FilterEstimation on stats,
// because in this scenario stats is calculated through splits and partition pruning has been done.
if (statistics.isPresent && reservedFilters.nonEmpty) {
filterStatistics(stats, reservedFilters)
} else {
stats
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference,
import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.FilterEstimation
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
import org.apache.spark.sql.connector.expressions.NamedReference
import org.apache.spark.sql.connector.read.Statistics
import org.apache.spark.sql.connector.read.colstats.ColumnStatistics
Expand All @@ -38,9 +39,12 @@ trait StatisticsHelperBase extends SQLConfHelper {

val requiredStatsSchema: StructType

private lazy val replacedStatsSchema =
CharVarcharUtils.replaceCharVarcharWithStringInSchema(requiredStatsSchema)

def filterStatistics(v2Stats: Statistics, filters: Seq[Filter]): Statistics = {
val attrs: Seq[AttributeReference] =
requiredStatsSchema.map(f => AttributeReference(f.name, f.dataType, f.nullable, f.metadata)())
replacedStatsSchema.map(f => AttributeReference(f.name, f.dataType, f.nullable, f.metadata)())
val condition = filterToCondition(filters, attrs)

if (condition.isDefined && v2Stats.numRows().isPresent) {
Expand All @@ -57,14 +61,14 @@ trait StatisticsHelperBase extends SQLConfHelper {
expression =>
expression.transform {
case ref: BoundReference =>
attrs.find(_.name == requiredStatsSchema(ref.ordinal).name).get
attrs.find(_.name == replacedStatsSchema(ref.ordinal).name).get
}
}
}

private def toRef(attr: String): Option[BoundReference] = {
val index = requiredStatsSchema.fieldIndex(attr)
val field = requiredStatsSchema(index)
val index = replacedStatsSchema.fieldIndex(attr)
val field = replacedStatsSchema(index)
Option.apply(BoundReference(index, field.dataType, field.nullable))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,19 +95,19 @@ abstract class AnalyzeTableTestBase extends PaimonSparkTestBase {
s"""
|CREATE TABLE T (id STRING, name STRING, byte_col BYTE, short_col SHORT, int_col INT, long_col LONG,
|float_col FLOAT, double_col DOUBLE, decimal_col DECIMAL(10, 5), boolean_col BOOLEAN, date_col DATE,
|timestamp_col TIMESTAMP, binary BINARY)
|timestamp_col TIMESTAMP, binary BINARY, char_col CHAR(20), varchar_col VARCHAR(20))
|USING PAIMON
|TBLPROPERTIES ('primary-key'='id')
|""".stripMargin)

spark.sql(
s"INSERT INTO T VALUES ('1', 'a', 1, 1, 1, 1, 1.0, 1.0, 12.12345, true, cast('2020-01-01' as date), cast('2020-01-01 00:00:00' as timestamp), binary('example binary1'))")
s"INSERT INTO T VALUES ('1', 'a', 1, 1, 1, 1, 1.0, 1.0, 12.12345, true, cast('2020-01-01' as date), cast('2020-01-01 00:00:00' as timestamp), binary('example binary1'), 'a', 'a')")
spark.sql(
s"INSERT INTO T VALUES ('2', 'aaa', 1, null, 1, 1, 1.0, 1.0, 12.12345, true, cast('2020-01-02' as date), cast('2020-01-02 00:00:00' as timestamp), binary('example binary1'))")
s"INSERT INTO T VALUES ('2', 'aaa', 1, null, 1, 1, 1.0, 1.0, 12.12345, true, cast('2020-01-02' as date), cast('2020-01-02 00:00:00' as timestamp), binary('example binary1'), 'aaa', 'aaa')")
spark.sql(
s"INSERT INTO T VALUES ('3', 'bbbb', 2, 1, 1, 1, 1.0, 1.0, 22.12345, true, cast('2020-01-02' as date), cast('2020-01-02 00:00:00' as timestamp), null)")
s"INSERT INTO T VALUES ('3', 'bbbb', 2, 1, 1, 1, 1.0, 1.0, 22.12345, true, cast('2020-01-02' as date), cast('2020-01-02 00:00:00' as timestamp), null, 'bbbb', 'bbbb')")
spark.sql(
s"INSERT INTO T VALUES ('4', 'bbbbbbbb', 2, 2, 2, 2, 2.0, 2.0, 22.12345, false, cast('2020-01-01' as date), cast('2020-01-01 00:00:00' as timestamp), binary('example binary2'))")
s"INSERT INTO T VALUES ('4', 'bbbbbbbb', 2, 2, 2, 2, 2.0, 2.0, 22.12345, false, cast('2020-01-01' as date), cast('2020-01-01 00:00:00' as timestamp), binary('example binary2'), 'bbbbbbbb', 'bbbbbbbb')")

spark.sql(s"ANALYZE TABLE T COMPUTE STATISTICS FOR ALL COLUMNS")

Expand Down Expand Up @@ -163,9 +163,22 @@ abstract class AnalyzeTableTestBase extends PaimonSparkTestBase {
Assertions.assertEquals(
ColStats.newColStats(12, 2, null, null, 1, 15, 15),
colStats.get("binary"))
// From Spark 3.4, the written char col is padded
if (gteqSpark3_4) {
Assertions.assertEquals(
ColStats.newColStats(13, 4, null, null, 0, 20, 20),
colStats.get("char_col"))
} else {
Assertions.assertEquals(
ColStats.newColStats(13, 4, null, null, 0, 4, 8),
colStats.get("char_col"))
}
Assertions.assertEquals(
ColStats.newColStats(14, 4, null, null, 0, 4, 8),
colStats.get("varchar_col"))

spark.sql(
s"INSERT INTO T VALUES ('5', 'bbbbbbbbbbbbbbbb', 3, 3, 3, 3, 3.0, 3.0, 32.12345, false, cast('2020-01-03' as date), cast('2020-01-03 00:00:00' as timestamp), binary('binary3'))")
s"INSERT INTO T VALUES ('5', 'bbbbbbbbbbbbbbbb', 3, 3, 3, 3, 3.0, 3.0, 32.12345, false, cast('2020-01-03' as date), cast('2020-01-03 00:00:00' as timestamp), binary('binary3'), 'bbbbbbbbbbbbbbbb', 'bbbbbbbbbbbbbbbb')")

spark.sql(s"ANALYZE TABLE T COMPUTE STATISTICS FOR ALL COLUMNS")

Expand Down Expand Up @@ -221,6 +234,18 @@ abstract class AnalyzeTableTestBase extends PaimonSparkTestBase {
Assertions.assertEquals(
ColStats.newColStats(12, 3, null, null, 1, 13, 15),
colStats.get("binary"))
if (gteqSpark3_4) {
Assertions.assertEquals(
ColStats.newColStats(13, 5, null, null, 0, 20, 20),
colStats.get("char_col"))
} else {
Assertions.assertEquals(
ColStats.newColStats(13, 5, null, null, 0, 7, 16),
colStats.get("char_col"))
}
Assertions.assertEquals(
ColStats.newColStats(14, 5, null, null, 0, 7, 16),
colStats.get("varchar_col"))
}

test("Paimon analyze: analyze unsupported cols") {
Expand Down Expand Up @@ -374,6 +399,29 @@ abstract class AnalyzeTableTestBase extends PaimonSparkTestBase {
checkAnswer(spark.sql(sql), Nil)
}

test("Paimon analyze: partition filter push down hit with char/varchar") {
Seq("char(10)", "varchar(10)").foreach(
partitionType => {
withTable("T") {
sql(s"""
|CREATE TABLE T (id INT, name STRING, pt $partitionType)
|TBLPROPERTIES ('primary-key'='id, pt')
|PARTITIONED BY (pt)
|""".stripMargin)

sql("INSERT INTO T VALUES (1, 'a', '1'), (2, 'b', '1'), (3, 'c', '2'), (4, 'd', '3')")
sql(s"ANALYZE TABLE T COMPUTE STATISTICS FOR ALL COLUMNS")

// For col type such as char, varchar that don't have min and max, filter estimation on stats has no effect.
var sqlText = "SELECT * FROM T WHERE pt < '1'"
Assertions.assertEquals(4L, getScanStatistic(sqlText).rowCount.get.longValue())

sqlText = "SELECT id FROM T WHERE pt < '1'"
Assertions.assertEquals(4L, getScanStatistic(sqlText).rowCount.get.longValue())
}
})
}

protected def statsFileCount(tableLocation: Path, fileIO: FileIO): Int = {
fileIO.listStatus(new Path(tableLocation, "statistics")).length
}
Expand Down

0 comments on commit 6f98752

Please sign in to comment.