Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[spark] Add EvalSubqueriesForDeleteTable for quick delete with subqueries #3464

Merged
merged 4 commits into from
Jun 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ trait ExpressionHelper extends PredicateHelper {
partitionColumns: Seq[String],
resolver: Resolver
): Boolean = {
condition.references.nonEmpty &&
condition.references.forall(r => partitionColumns.exists(resolver(r.name, _)))
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.spark.catalyst.optimizer

import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper
import org.apache.paimon.spark.commands.DeleteFromPaimonTableCommand

import org.apache.spark.internal.Logging
import org.apache.spark.sql.{execution, SparkSession}
import org.apache.spark.sql.catalyst.analysis.Resolver
import org.apache.spark.sql.catalyst.expressions.{Expression, In, InSubquery, Literal, ScalarSubquery, SubqueryExpression}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.{ExecSubqueryExpression, QueryExecution}
import org.apache.spark.sql.types.BooleanType

import scala.collection.JavaConverters._

/**
* For those delete conditions with subqueries that only contain partition columns, we can eval them
* in advance. So that when running [[DeleteFromPaimonTableCommand]], we can directly call
* dropPartitions to achieve fast deletion.
*
* Note: this rule must be placed before [[MergePaimonScalarSubqueriers]], because
* [[MergePaimonScalarSubqueriers]] will merge subqueries.
*/
object EvalSubqueriesForDeleteTable extends Rule[LogicalPlan] with ExpressionHelper with Logging {

lazy val spark: SparkSession = SparkSession.active
lazy val resolver: Resolver = spark.sessionState.conf.resolver

override def apply(plan: LogicalPlan): LogicalPlan = {
plan.transformDown {
case d @ DeleteFromPaimonTableCommand(_, table, condition)
if SubqueryExpression.hasSubquery(condition) &&
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we consider the case that this condition contains the partition predicate with subquery and other data predicates. I think taking this case into account may reduce the splits we have to scan.

isPredicatePartitionColumnsOnly(condition, table.partitionKeys().asScala, resolver) =>
try {
d.copy(condition = evalSubquery(condition))
} catch {
case e: Throwable =>
logInfo(s"Applying EvalSubqueriesForDeleteTable rule failed for: ${e.getMessage}")
d
}
}
}

private def evalSubquery(condition: Expression): Expression = {
condition.transformDown {
case InSubquery(values, listQuery) =>
Zouxxyy marked this conversation as resolved.
Show resolved Hide resolved
val expr = if (values.length == 1) {
values.head
} else {
throw new RuntimeException("InSubquery with multi-values are not supported")
}
if (listQuery.isCorrelated) {
throw new RuntimeException("Correlated InSubquery is not supported")
}

val executedPlan = QueryExecution.prepareExecutedPlan(spark, listQuery.plan)
val physicalSubquery = execution.InSubqueryExec(
expr,
execution.SubqueryExec(s"subquery#${listQuery.exprId.id}", executedPlan),
listQuery.exprId)
evalPhysicalSubquery(physicalSubquery)

physicalSubquery.values() match {
case Some(l) if l.length > 0 => In(expr, l.map(Literal(_, expr.dataType)))
case _ => Literal(false, BooleanType)
}

case s: ScalarSubquery =>
if (s.isCorrelated) {
throw new RuntimeException("Correlated ScalarSubquery is not supported")
}

val executedPlan = QueryExecution.prepareExecutedPlan(spark, s.plan)
val physicalSubquery = execution.ScalarSubquery(
execution.SubqueryExec
.createForScalarSubquery(s"scalar-subquery#${s.exprId.id}", executedPlan),
s.exprId)
evalPhysicalSubquery(physicalSubquery)

Literal(physicalSubquery.eval(), s.dataType)

case _: SubqueryExpression =>
throw new RuntimeException("Only support InSubquery and ScalarSubquery")
}
}

// Evaluate physicalSubquery in a bottom-up way.
private def evalPhysicalSubquery(subquery: ExecSubqueryExpression): Unit = {
subquery.plan.foreachUp {
plan =>
plan.expressions.foreach(_.foreachUp {
case s: ExecSubqueryExpression => evalPhysicalSubquery(s)
case _ =>
})
}
subquery.updateResult()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.apache.paimon.spark.extensions

import org.apache.paimon.spark.catalyst.analysis.{PaimonAnalysis, PaimonDeleteTable, PaimonIncompatiblePHRRules, PaimonIncompatibleResolutionRules, PaimonMergeInto, PaimonPostHocResolutionRules, PaimonProcedureResolver, PaimonUpdateTable}
import org.apache.paimon.spark.catalyst.optimizer.MergePaimonScalarSubqueriers
import org.apache.paimon.spark.catalyst.optimizer.{EvalSubqueriesForDeleteTable, MergePaimonScalarSubqueriers}
import org.apache.paimon.spark.catalyst.plans.logical.PaimonTableValuedFunctions
import org.apache.paimon.spark.execution.PaimonStrategy

Expand Down Expand Up @@ -52,10 +52,11 @@ class PaimonSparkSessionExtensions extends (SparkSessionExtensions => Unit) {
PaimonTableValuedFunctions.getTableValueFunctionInjection(fnName))
}

// planner extensions
extensions.injectPlannerStrategy(spark => PaimonStrategy(spark))

// optimization rules
extensions.injectOptimizerRule(_ => EvalSubqueriesForDeleteTable)
extensions.injectOptimizerRule(_ => MergePaimonScalarSubqueriers)

// planner extensions
extensions.injectPlannerStrategy(spark => PaimonStrategy(spark))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@

package org.apache.paimon.spark.sql

import org.apache.paimon.Snapshot.CommitKind
import org.apache.paimon.spark.PaimonSparkTestBase
import org.apache.paimon.spark.catalyst.optimizer.MergePaimonScalarSubqueriers

import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions.{Attribute, CreateNamedStruct, Literal, NamedExpression}
import org.apache.spark.sql.catalyst.plans.logical.{CTERelationDef, LogicalPlan, OneRowRelation, WithCTE}
import org.apache.spark.sql.catalyst.rules.RuleExecutor
Expand Down Expand Up @@ -112,6 +114,94 @@ abstract class PaimonOptimizationTestBase extends PaimonSparkTestBase {
}
}

test(s"Paimon Optimization: eval subqueries for delete table with ScalarSubquery") {
withPk.foreach(
hasPk => {
val tblProps = if (hasPk) {
s"TBLPROPERTIES ('primary-key'='id, pt')"
} else {
""
}
withTable("t1", "t2") {
spark.sql(s"""
|CREATE TABLE t1 (id INT, name STRING, pt INT)
|$tblProps
|PARTITIONED BY (pt)
|""".stripMargin)
spark.sql(
"INSERT INTO t1 VALUES (1, 'a', 1), (2, 'b', 2), (3, 'c', 2), (4, 'd', 3), (5, 'e', 4)")

spark.sql(s"CREATE TABLE t2 (id INT, n INT)")
spark.sql("INSERT INTO t2 VALUES (1, 1), (2, 2), (3, 3), (4, 4)")

spark.sql(s"""DELETE FROM t1 WHERE
|pt >= (SELECT min(id) FROM t2 WHERE n BETWEEN 2 AND 3)
|AND
|pt <= (SELECT max(id) FROM t2 WHERE n BETWEEN 2 AND 3)""".stripMargin)
// For partition-only predicates, drop partition is called internally.
Assertions.assertEquals(
CommitKind.OVERWRITE,
loadTable("t1").store().snapshotManager().latestSnapshot().commitKind())

checkAnswer(
spark.sql("SELECT * FROM t1 ORDER BY id"),
Row(1, "a", 1) :: Row(5, "e", 4) :: Nil)

// subquery eval nothing
spark.sql(s"""DELETE FROM t1 WHERE
|pt >= (SELECT min(id) FROM t2 WHERE n > 10)""".stripMargin)

checkAnswer(
spark.sql("SELECT * FROM t1 ORDER BY id"),
Row(1, "a", 1) :: Row(5, "e", 4) :: Nil)
}
})
}

test(s"Paimon Optimization: eval subqueries for delete table with InSubquery") {
withPk.foreach(
hasPk => {
val tblProps = if (hasPk) {
s"TBLPROPERTIES ('primary-key'='id, pt')"
} else {
""
}
withTable("t1", "t2") {
spark.sql(s"""
|CREATE TABLE t1 (id INT, name STRING, pt INT)
|$tblProps
|PARTITIONED BY (pt)
|""".stripMargin)
spark.sql(
"INSERT INTO t1 VALUES (1, 'a', 1), (2, 'b', 2), (3, 'c', 2), (4, 'd', 3), (5, 'e', 4)")

spark.sql(s"CREATE TABLE t2 (id INT, n INT)")
spark.sql("INSERT INTO t2 VALUES (1, 1), (2, 2), (3, 3), (4, 4)")

spark.sql(s"""DELETE FROM t1 WHERE
|pt in (SELECT id FROM t2 WHERE n BETWEEN 2 AND 3)
|OR
|pt in (SELECT max(id) FROM t2 WHERE n BETWEEN 2 AND 3)""".stripMargin)
// For partition-only predicates, drop partition is called internally.
Assertions.assertEquals(
CommitKind.OVERWRITE,
loadTable("t1").store().snapshotManager().latestSnapshot().commitKind())

checkAnswer(
spark.sql("SELECT * FROM t1 ORDER BY id"),
Row(1, "a", 1) :: Row(5, "e", 4) :: Nil)

// subquery eval nothing
spark.sql(s"""DELETE FROM t1 WHERE
|pt in (SELECT id FROM t2 WHERE n > 10)""".stripMargin)

checkAnswer(
spark.sql("SELECT * FROM t1 ORDER BY id"),
Row(1, "a", 1) :: Row(5, "e", 4) :: Nil)
}
})
}

private def definitionNode(plan: LogicalPlan, cteIndex: Int) = {
CTERelationDef(plan, cteIndex, underSubquery = true)
}
Expand Down
Loading