From f950f9afb5a06b915770131f801b6cf22d343f69 Mon Sep 17 00:00:00 2001 From: zouxxyy Date: Mon, 3 Jun 2024 16:10:35 +0800 Subject: [PATCH] 1 --- .../EvalSubqueriesForDeleteTable.scala | 111 ++++++++++++++++++ .../PaimonSparkSessionExtensions.scala | 9 +- .../sql/PaimonOptimizationTestBase.scala | 74 ++++++++++++ 3 files changed, 190 insertions(+), 4 deletions(-) create mode 100644 paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/EvalSubqueriesForDeleteTable.scala diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/EvalSubqueriesForDeleteTable.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/EvalSubqueriesForDeleteTable.scala new file mode 100644 index 000000000000..14832042eeff --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/EvalSubqueriesForDeleteTable.scala @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.catalyst.optimizer + +import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper +import org.apache.paimon.spark.commands.DeleteFromPaimonTableCommand + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.{execution, SparkSession} +import org.apache.spark.sql.catalyst.analysis.Resolver +import org.apache.spark.sql.catalyst.expressions.{Expression, In, InSubquery, Literal, ScalarSubquery, SubqueryExpression} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.{ExecSubqueryExpression, QueryExecution} +import org.apache.spark.sql.types.BooleanType + +import scala.collection.JavaConverters._ + +/** + * For those delete conditions with subqueries that only contain partition columns, we can eval them + * in advance. So that when running [[DeleteFromPaimonTableCommand]], we can directly call + * dropPartitions to achieve fast deletion. + */ +object EvalSubqueriesForDeleteTable extends Rule[LogicalPlan] with ExpressionHelper with Logging { + + lazy val spark: SparkSession = SparkSession.active + lazy val resolver: Resolver = spark.sessionState.conf.resolver + + override def apply(plan: LogicalPlan): LogicalPlan = { + plan.transformDown { + case d @ DeleteFromPaimonTableCommand(_, table, condition) + if SubqueryExpression.hasSubquery(condition) && + isPredicatePartitionColumnsOnly(condition, table.partitionKeys().asScala, resolver) => + try { + d.copy(condition = evalSubquery(condition)) + } catch { + case e: Throwable => + logInfo(s"Applying EvalSubqueriesForDeleteTable rule failed for: ${e.getMessage}") + d + } + } + } + + private def evalSubquery(condition: Expression): Expression = { + condition.transformDown { + case InSubquery(values, listQuery) => + val expr = if (values.length == 1) { + values.head + } else { + throw new RuntimeException("InSubquery with multi-values are not supported") + } + + val executedPlan = QueryExecution.prepareExecutedPlan(spark, listQuery.plan) + val physicalSubquery = execution.InSubqueryExec( + expr, + execution.SubqueryExec(s"subquery#${listQuery.exprId.id}", executedPlan), + listQuery.exprId) + evalPhysicalSubquery(physicalSubquery) + + physicalSubquery.values() match { + case Some(l) if l.nonEmpty => In(expr, l.map(Literal(_, expr.dataType))) + case _ => Literal(false, BooleanType) + } + + case s: ScalarSubquery => + if (s.isCorrelated) { + throw new RuntimeException("Correlated ScalarSubquery is not supported") + } + + val executedPlan = QueryExecution.prepareExecutedPlan(spark, s.plan) + val physicalSubquery = execution.ScalarSubquery( + execution.SubqueryExec + .createForScalarSubquery(s"scalar-subquery#${s.exprId.id}", executedPlan), + s.exprId) + evalPhysicalSubquery(physicalSubquery) + + Literal(physicalSubquery.eval(), s.dataType) + + case _: SubqueryExpression => + throw new RuntimeException("Only support InSubquery and ScalarSubquery") + } + } + + // Evaluate physicalSubquery in a bottom-up way. + private def evalPhysicalSubquery(subquery: ExecSubqueryExpression): Unit = { + subquery.plan.foreachUp { + plan => + plan.expressions.foreach(_.foreachUp { + case s: ExecSubqueryExpression => evalPhysicalSubquery(s) + case _ => + }) + } + subquery.updateResult() + } +} diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala index 3fc151621948..f770ac506c87 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala @@ -19,7 +19,7 @@ package org.apache.paimon.spark.extensions import org.apache.paimon.spark.catalyst.analysis.{PaimonAnalysis, PaimonDeleteTable, PaimonIncompatiblePHRRules, PaimonIncompatibleResolutionRules, PaimonMergeInto, PaimonPostHocResolutionRules, PaimonProcedureResolver, PaimonUpdateTable} -import org.apache.paimon.spark.catalyst.optimizer.MergePaimonScalarSubqueriers +import org.apache.paimon.spark.catalyst.optimizer.{EvalSubqueriesForDeleteTable, MergePaimonScalarSubqueriers} import org.apache.paimon.spark.catalyst.plans.logical.PaimonTableValuedFunctions import org.apache.paimon.spark.execution.PaimonStrategy @@ -52,10 +52,11 @@ class PaimonSparkSessionExtensions extends (SparkSessionExtensions => Unit) { PaimonTableValuedFunctions.getTableValueFunctionInjection(fnName)) } - // planner extensions - extensions.injectPlannerStrategy(spark => PaimonStrategy(spark)) - // optimization rules + extensions.injectOptimizerRule(_ => EvalSubqueriesForDeleteTable) extensions.injectOptimizerRule(_ => MergePaimonScalarSubqueriers) + + // planner extensions + extensions.injectPlannerStrategy(spark => PaimonStrategy(spark)) } } diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTestBase.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTestBase.scala index 1f437004537f..6ab248022844 100644 --- a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTestBase.scala +++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTestBase.scala @@ -18,9 +18,11 @@ package org.apache.paimon.spark.sql +import org.apache.paimon.Snapshot.CommitKind import org.apache.paimon.spark.PaimonSparkTestBase import org.apache.paimon.spark.catalyst.optimizer.MergePaimonScalarSubqueriers +import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.expressions.{Attribute, CreateNamedStruct, Literal, NamedExpression} import org.apache.spark.sql.catalyst.plans.logical.{CTERelationDef, LogicalPlan, OneRowRelation, WithCTE} import org.apache.spark.sql.catalyst.rules.RuleExecutor @@ -112,6 +114,78 @@ abstract class PaimonOptimizationTestBase extends PaimonSparkTestBase { } } + test(s"Paimon Optimization: eval subqueries for delete table with ScalarSubquery") { + withPk.foreach( + hasPk => { + val tblProps = if (hasPk) { + s"TBLPROPERTIES ('primary-key'='id, pt')" + } else { + "" + } + withTable("t1", "t2") { + spark.sql(s""" + |CREATE TABLE t1 (id INT, name STRING, pt INT) + |$tblProps + |PARTITIONED BY (pt) + |""".stripMargin) + spark.sql( + "INSERT INTO t1 VALUES (1, 'a', 1), (2, 'b', 2), (3, 'c', 2), (4, 'd', 3), (5, 'e', 4)") + + spark.sql(s"CREATE TABLE t2 (id INT, n INT)") + spark.sql("INSERT INTO t2 VALUES (1, 1), (2, 2), (3, 3), (4, 4)") + + spark.sql(s"""DELETE FROM t1 WHERE + |pt >= (SELECT min(id) FROM t2 WHERE n BETWEEN 2 AND 3) + |AND + |pt <= (SELECT max(id) FROM t2 WHERE n BETWEEN 2 AND 3)""".stripMargin) + // For partition-only predicates, drop partition is called internally. + Assertions.assertEquals( + CommitKind.OVERWRITE, + loadTable("t1").store().snapshotManager().latestSnapshot().commitKind()) + + checkAnswer( + spark.sql("SELECT * FROM t1 ORDER BY id"), + Row(1, "a", 1) :: Row(5, "e", 4) :: Nil) + } + }) + } + + test(s"Paimon Optimization: eval subqueries for delete table with InSubquery") { + withPk.foreach( + hasPk => { + val tblProps = if (hasPk) { + s"TBLPROPERTIES ('primary-key'='id, pt')" + } else { + "" + } + withTable("t1", "t2") { + spark.sql(s""" + |CREATE TABLE t1 (id INT, name STRING, pt INT) + |$tblProps + |PARTITIONED BY (pt) + |""".stripMargin) + spark.sql( + "INSERT INTO t1 VALUES (1, 'a', 1), (2, 'b', 2), (3, 'c', 2), (4, 'd', 3), (5, 'e', 4)") + + spark.sql(s"CREATE TABLE t2 (id INT, n INT)") + spark.sql("INSERT INTO t2 VALUES (1, 1), (2, 2), (3, 3), (4, 4)") + + spark.sql(s"""DELETE FROM t1 WHERE + |pt in (SELECT id FROM t2 WHERE n BETWEEN 2 AND 3) + |OR + |pt in (SELECT max(id) FROM t2 WHERE n BETWEEN 2 AND 3)""".stripMargin) + // For partition-only predicates, drop partition is called internally. + Assertions.assertEquals( + CommitKind.OVERWRITE, + loadTable("t1").store().snapshotManager().latestSnapshot().commitKind()) + + checkAnswer( + spark.sql("SELECT * FROM t1 ORDER BY id"), + Row(1, "a", 1) :: Row(5, "e", 4) :: Nil) + } + }) + } + private def definitionNode(plan: LogicalPlan, cteIndex: Int) = { CTERelationDef(plan, cteIndex, underSubquery = true) }