From 2d711251dac83254885c5b2fd8a15cb0dca10f1f Mon Sep 17 00:00:00 2001 From: zouxxyy Date: Thu, 23 Nov 2023 18:11:02 +0800 Subject: [PATCH] update --- .../paimon/spark/PaimonSparkTestBase.scala | 3 - .../spark/SparkPushDownFilterTest.scala | 84 -------------- .../paimon/spark/SparkPushDownTest.scala | 105 ++++++++++++++++++ 3 files changed, 105 insertions(+), 87 deletions(-) delete mode 100644 paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/SparkPushDownFilterTest.scala create mode 100644 paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/SparkPushDownTest.scala diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala index 429ee068779c..85fc7ac43ca3 100644 --- a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala +++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala @@ -91,7 +91,4 @@ class PaimonSparkTestBase extends QueryTest with SharedSparkSession with WithTab catalog.getTable(Identifier.create(dbName0, tableName)).asInstanceOf[AbstractFileStoreTable] } - def explain(sql: String): String = { - spark.sql(s"EXPLAIN $sql").collect().map(_.mkString).mkString - } } diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/SparkPushDownFilterTest.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/SparkPushDownFilterTest.scala deleted file mode 100644 index db5a2400df91..000000000000 --- a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/SparkPushDownFilterTest.scala +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.paimon.spark - -import org.apache.spark.sql.Row -import org.assertj.core.api.Assertions.assertThat - -class SparkPushDownFilterTest extends PaimonSparkTestBase { - - test(s"Paimon push down filter: apply partition filter push down with non-partitioned table") { - spark.sql(s""" - |CREATE TABLE T (id INT, name STRING, pt STRING) - |TBLPROPERTIES ('primary-key'='id, pt', 'bucket'='2') - |""".stripMargin) - - spark.sql("INSERT INTO T VALUES (1, 'a', 'p1'), (2, 'b', 'p1'), (3, 'c', 'p2')") - assertThat( - "Filter.*pt.*=.*p1".r - .findFirstIn(explain("SELECT * FROM T WHERE pt = 'p1'")) - .isDefined).isTrue - } - - test(s"Paimon push down filter: apply partition filter push down with partitioned table") { - spark.sql(s""" - |CREATE TABLE T (id INT, name STRING, pt STRING) - |TBLPROPERTIES ('primary-key'='id, pt', 'bucket'='2') - |PARTITIONED BY (pt) - |""".stripMargin) - - spark.sql("INSERT INTO T VALUES (1, 'a', 'p1'), (2, 'b', 'p1'), (3, 'c', 'p2'), (4, 'd', 'p3')") - - assertThat( - "Filter.*id.*=.*1".r.findFirstIn(explain("SELECT * FROM T WHERE id = '1'")).isDefined).isTrue - checkAnswer(spark.sql("SELECT * FROM T WHERE id = '1' ORDER BY id"), Row(1, "a", "p1") :: Nil) - - assertThat( - "Filter.*pt.*=.*p1".r - .findFirstIn(explain("SELECT * FROM T WHERE pt = 'p1'")) - .isDefined).isFalse - checkAnswer( - spark.sql("SELECT * FROM T WHERE pt = 'p1' ORDER BY id"), - Row(1, "a", "p1") :: Row(2, "b", "p1") :: Nil) - - assertThat( - "Filter.*pt.*=.*p1".r - .findFirstIn(explain("SELECT * FROM T WHERE id = '1' and pt = 'p1'")) - .isDefined).isFalse - checkAnswer( - spark.sql("SELECT * FROM T WHERE id = '1' and pt = 'p1' ORDER BY id"), - Row(1, "a", "p1") :: Nil) - - assertThat( - "Filter.*pt.*=.*p1".r - .findFirstIn(explain("SELECT * FROM T WHERE id = '1' or pt = 'p1'")) - .isDefined).isTrue - checkAnswer( - spark.sql("SELECT * FROM T WHERE id = '1' or pt = 'p1' ORDER BY id"), - Row(1, "a", "p1") :: Row(2, "b", "p1") :: Nil) - - assertThat( - "Filter.*pt.*<.*p3".r - .findFirstIn(explain("SELECT * FROM T WHERE pt < 'p3'")) - .isDefined).isFalse - checkAnswer( - spark.sql("SELECT * FROM T WHERE pt < 'p3' ORDER BY id"), - Row(1, "a", "p1") :: Row(2, "b", "p1") :: Row(3, "c", "p2") :: Nil) - } - -} diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/SparkPushDownTest.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/SparkPushDownTest.scala new file mode 100644 index 000000000000..b9f75ba4dfc9 --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/SparkPushDownTest.scala @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.paimon.spark + +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Expression, Literal, Or} +import org.apache.spark.sql.catalyst.plans.logical.Filter +import org.assertj.core.api.Assertions.assertThat + +class SparkPushDownTest extends PaimonSparkTestBase { + + test(s"Paimon push down: apply partition filter push down with non-partitioned table") { + spark.sql(s""" + |CREATE TABLE T (id INT, name STRING, pt STRING) + |TBLPROPERTIES ('primary-key'='id, pt', 'bucket'='2') + |""".stripMargin) + + spark.sql("INSERT INTO T VALUES (1, 'a', 'p1'), (2, 'b', 'p1'), (3, 'c', 'p2')") + + assertThat(spark.sql("SELECT * FROM T WHERE pt = 'p1'").queryExecution.optimizedPlan.exists { + case Filter(c: Expression, _) => + c.exists { + case EqualTo(a: AttributeReference, l: Literal) => + a.name.equals("pt") && l.value.toString.equals("p1") + case _ => false + } + case _ => false + }).isTrue + } + + test(s"Paimon push down: apply partition filter push down with partitioned table") { + spark.sql(s""" + |CREATE TABLE T (id INT, name STRING, pt STRING) + |TBLPROPERTIES ('primary-key'='id, pt', 'bucket'='2') + |PARTITIONED BY (pt) + |""".stripMargin) + + spark.sql("INSERT INTO T VALUES (1, 'a', 'p1'), (2, 'b', 'p1'), (3, 'c', 'p2'), (4, 'd', 'p3')") + + // partition filter push down did not hit + assertThat(spark.sql("SELECT * FROM T WHERE id = '1'").queryExecution.optimizedPlan.exists { + case Filter(_: Expression, _) => true + case _ => false + }).isTrue + checkAnswer(spark.sql("SELECT * FROM T WHERE id = '1'"), Row(1, "a", "p1") :: Nil) + + assertThat( + spark.sql("SELECT * FROM T WHERE id = '1' or pt = 'p1'").queryExecution.optimizedPlan.exists { + case Filter(_: Or, _) => true + case _ => false + }).isTrue + checkAnswer( + spark.sql("SELECT * FROM T WHERE id = '1' or pt = 'p1'"), + Row(1, "a", "p1") :: Row(2, "b", "p1") :: Nil) + + // partition filter push down hit + assertThat(spark.sql("SELECT * FROM T WHERE pt = 'p1'").queryExecution.optimizedPlan.exists { + case Filter(_: Expression, _) => true + case _ => false + }).isFalse + checkAnswer( + spark.sql("SELECT * FROM T WHERE pt = 'p1'"), + Row(1, "a", "p1") :: Row(2, "b", "p1") :: Nil) + + assertThat( + spark + .sql("SELECT * FROM T WHERE id = '1' and pt = 'p1'") + .queryExecution + .optimizedPlan + .exists { + case Filter(c: Expression, _) => + c.exists { + case EqualTo(a: AttributeReference, l: Literal) => + a.name.equals("pt") && l.value.toString.equals("p1") + case _ => false + } + case _ => false + }).isFalse + checkAnswer(spark.sql("SELECT * FROM T WHERE id = '1' and pt = 'p1'"), Row(1, "a", "p1") :: Nil) + + assertThat(spark.sql("SELECT * FROM T WHERE pt < 'p3'").queryExecution.optimizedPlan.exists { + case Filter(_: Expression, _) => true + case _ => false + }).isFalse + checkAnswer( + spark.sql("SELECT * FROM T WHERE pt < 'p3'"), + Row(1, "a", "p1") :: Row(2, "b", "p1") :: Row(3, "c", "p2") :: Nil) + } + +}