Skip to content

Commit

Permalink
1
Browse files Browse the repository at this point in the history
  • Loading branch information
Zouxxyy committed Jun 3, 2024
1 parent ee1d541 commit f950f9a
Show file tree
Hide file tree
Showing 3 changed files with 190 additions and 4 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.spark.catalyst.optimizer

import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper
import org.apache.paimon.spark.commands.DeleteFromPaimonTableCommand

import org.apache.spark.internal.Logging
import org.apache.spark.sql.{execution, SparkSession}
import org.apache.spark.sql.catalyst.analysis.Resolver
import org.apache.spark.sql.catalyst.expressions.{Expression, In, InSubquery, Literal, ScalarSubquery, SubqueryExpression}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.{ExecSubqueryExpression, QueryExecution}
import org.apache.spark.sql.types.BooleanType

import scala.collection.JavaConverters._

/**
* For those delete conditions with subqueries that only contain partition columns, we can eval them
* in advance. So that when running [[DeleteFromPaimonTableCommand]], we can directly call
* dropPartitions to achieve fast deletion.
*/
object EvalSubqueriesForDeleteTable extends Rule[LogicalPlan] with ExpressionHelper with Logging {

lazy val spark: SparkSession = SparkSession.active
lazy val resolver: Resolver = spark.sessionState.conf.resolver

override def apply(plan: LogicalPlan): LogicalPlan = {
plan.transformDown {
case d @ DeleteFromPaimonTableCommand(_, table, condition)
if SubqueryExpression.hasSubquery(condition) &&
isPredicatePartitionColumnsOnly(condition, table.partitionKeys().asScala, resolver) =>
try {
d.copy(condition = evalSubquery(condition))
} catch {
case e: Throwable =>
logInfo(s"Applying EvalSubqueriesForDeleteTable rule failed for: ${e.getMessage}")
d
}
}
}

private def evalSubquery(condition: Expression): Expression = {
condition.transformDown {
case InSubquery(values, listQuery) =>
val expr = if (values.length == 1) {
values.head
} else {
throw new RuntimeException("InSubquery with multi-values are not supported")
}

val executedPlan = QueryExecution.prepareExecutedPlan(spark, listQuery.plan)
val physicalSubquery = execution.InSubqueryExec(
expr,
execution.SubqueryExec(s"subquery#${listQuery.exprId.id}", executedPlan),
listQuery.exprId)
evalPhysicalSubquery(physicalSubquery)

physicalSubquery.values() match {
case Some(l) if l.nonEmpty => In(expr, l.map(Literal(_, expr.dataType)))
case _ => Literal(false, BooleanType)
}

case s: ScalarSubquery =>
if (s.isCorrelated) {
throw new RuntimeException("Correlated ScalarSubquery is not supported")
}

val executedPlan = QueryExecution.prepareExecutedPlan(spark, s.plan)
val physicalSubquery = execution.ScalarSubquery(
execution.SubqueryExec
.createForScalarSubquery(s"scalar-subquery#${s.exprId.id}", executedPlan),
s.exprId)
evalPhysicalSubquery(physicalSubquery)

Literal(physicalSubquery.eval(), s.dataType)

case _: SubqueryExpression =>
throw new RuntimeException("Only support InSubquery and ScalarSubquery")
}
}

// Evaluate physicalSubquery in a bottom-up way.
private def evalPhysicalSubquery(subquery: ExecSubqueryExpression): Unit = {
subquery.plan.foreachUp {
plan =>
plan.expressions.foreach(_.foreachUp {
case s: ExecSubqueryExpression => evalPhysicalSubquery(s)
case _ =>
})
}
subquery.updateResult()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.apache.paimon.spark.extensions

import org.apache.paimon.spark.catalyst.analysis.{PaimonAnalysis, PaimonDeleteTable, PaimonIncompatiblePHRRules, PaimonIncompatibleResolutionRules, PaimonMergeInto, PaimonPostHocResolutionRules, PaimonProcedureResolver, PaimonUpdateTable}
import org.apache.paimon.spark.catalyst.optimizer.MergePaimonScalarSubqueriers
import org.apache.paimon.spark.catalyst.optimizer.{EvalSubqueriesForDeleteTable, MergePaimonScalarSubqueriers}
import org.apache.paimon.spark.catalyst.plans.logical.PaimonTableValuedFunctions
import org.apache.paimon.spark.execution.PaimonStrategy

Expand Down Expand Up @@ -52,10 +52,11 @@ class PaimonSparkSessionExtensions extends (SparkSessionExtensions => Unit) {
PaimonTableValuedFunctions.getTableValueFunctionInjection(fnName))
}

// planner extensions
extensions.injectPlannerStrategy(spark => PaimonStrategy(spark))

// optimization rules
extensions.injectOptimizerRule(_ => EvalSubqueriesForDeleteTable)
extensions.injectOptimizerRule(_ => MergePaimonScalarSubqueriers)

// planner extensions
extensions.injectPlannerStrategy(spark => PaimonStrategy(spark))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@

package org.apache.paimon.spark.sql

import org.apache.paimon.Snapshot.CommitKind
import org.apache.paimon.spark.PaimonSparkTestBase
import org.apache.paimon.spark.catalyst.optimizer.MergePaimonScalarSubqueriers

import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions.{Attribute, CreateNamedStruct, Literal, NamedExpression}
import org.apache.spark.sql.catalyst.plans.logical.{CTERelationDef, LogicalPlan, OneRowRelation, WithCTE}
import org.apache.spark.sql.catalyst.rules.RuleExecutor
Expand Down Expand Up @@ -112,6 +114,78 @@ abstract class PaimonOptimizationTestBase extends PaimonSparkTestBase {
}
}

test(s"Paimon Optimization: eval subqueries for delete table with ScalarSubquery") {
withPk.foreach(
hasPk => {
val tblProps = if (hasPk) {
s"TBLPROPERTIES ('primary-key'='id, pt')"
} else {
""
}
withTable("t1", "t2") {
spark.sql(s"""
|CREATE TABLE t1 (id INT, name STRING, pt INT)
|$tblProps
|PARTITIONED BY (pt)
|""".stripMargin)
spark.sql(
"INSERT INTO t1 VALUES (1, 'a', 1), (2, 'b', 2), (3, 'c', 2), (4, 'd', 3), (5, 'e', 4)")

spark.sql(s"CREATE TABLE t2 (id INT, n INT)")
spark.sql("INSERT INTO t2 VALUES (1, 1), (2, 2), (3, 3), (4, 4)")

spark.sql(s"""DELETE FROM t1 WHERE
|pt >= (SELECT min(id) FROM t2 WHERE n BETWEEN 2 AND 3)
|AND
|pt <= (SELECT max(id) FROM t2 WHERE n BETWEEN 2 AND 3)""".stripMargin)
// For partition-only predicates, drop partition is called internally.
Assertions.assertEquals(
CommitKind.OVERWRITE,
loadTable("t1").store().snapshotManager().latestSnapshot().commitKind())

checkAnswer(
spark.sql("SELECT * FROM t1 ORDER BY id"),
Row(1, "a", 1) :: Row(5, "e", 4) :: Nil)
}
})
}

test(s"Paimon Optimization: eval subqueries for delete table with InSubquery") {
withPk.foreach(
hasPk => {
val tblProps = if (hasPk) {
s"TBLPROPERTIES ('primary-key'='id, pt')"
} else {
""
}
withTable("t1", "t2") {
spark.sql(s"""
|CREATE TABLE t1 (id INT, name STRING, pt INT)
|$tblProps
|PARTITIONED BY (pt)
|""".stripMargin)
spark.sql(
"INSERT INTO t1 VALUES (1, 'a', 1), (2, 'b', 2), (3, 'c', 2), (4, 'd', 3), (5, 'e', 4)")

spark.sql(s"CREATE TABLE t2 (id INT, n INT)")
spark.sql("INSERT INTO t2 VALUES (1, 1), (2, 2), (3, 3), (4, 4)")

spark.sql(s"""DELETE FROM t1 WHERE
|pt in (SELECT id FROM t2 WHERE n BETWEEN 2 AND 3)
|OR
|pt in (SELECT max(id) FROM t2 WHERE n BETWEEN 2 AND 3)""".stripMargin)
// For partition-only predicates, drop partition is called internally.
Assertions.assertEquals(
CommitKind.OVERWRITE,
loadTable("t1").store().snapshotManager().latestSnapshot().commitKind())

checkAnswer(
spark.sql("SELECT * FROM t1 ORDER BY id"),
Row(1, "a", 1) :: Row(5, "e", 4) :: Nil)
}
})
}

private def definitionNode(plan: LogicalPlan, cteIndex: Int) = {
CTERelationDef(plan, cteIndex, underSubquery = true)
}
Expand Down

0 comments on commit f950f9a

Please sign in to comment.