Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[spark] Apply MergePaimonScalarSubqueries only when paimon scan exists #3783

Merged
merged 3 commits into from
Jul 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, Attri
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation

object MergePaimonScalarSubqueriers extends MergePaimonScalarSubqueriersBase {
object MergePaimonScalarSubqueries extends MergePaimonScalarSubqueriesBase {

override def tryMergeDataSourceV2ScanRelation(
newV2ScanRelation: DataSourceV2ScanRelation,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, Attri
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation

object MergePaimonScalarSubqueriers extends MergePaimonScalarSubqueriersBase {
object MergePaimonScalarSubqueries extends MergePaimonScalarSubqueriesBase {

override def tryMergeDataSourceV2ScanRelation(
newV2ScanRelation: DataSourceV2ScanRelation,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, Attri
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation

object MergePaimonScalarSubqueriers extends MergePaimonScalarSubqueriersBase {
object MergePaimonScalarSubqueries extends MergePaimonScalarSubqueriesBase {

override def tryMergeDataSourceV2ScanRelation(
newV2ScanRelation: DataSourceV2ScanRelation,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ import scala.collection.JavaConverters._
* in advance. So that when running [[DeleteFromPaimonTableCommand]], we can directly call
* dropPartitions to achieve fast deletion.
*
* Note: this rule must be placed before [[MergePaimonScalarSubqueriers]], because
* [[MergePaimonScalarSubqueriers]] will merge subqueries.
* Note: this rule must be placed before [[MergePaimonScalarSubqueries]], because
* [[MergePaimonScalarSubqueries]] will merge subqueries.
*/
object EvalSubqueriesForDeleteTable extends Rule[LogicalPlan] with ExpressionHelper with Logging {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ package org.apache.paimon.spark.catalyst.optimizer
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule

object MergePaimonScalarSubqueriers extends Rule[LogicalPlan] {
object MergePaimonScalarSubqueries extends Rule[LogicalPlan] {

override def apply(plan: LogicalPlan): LogicalPlan = {
plan
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ import scala.collection.mutable.ArrayBuffer
* reused. So we extend the [[tryMergePlans]] method to check and merge
* [[DataSourceV2ScanRelation]]s, thus we can merge scalar subqueries for paimon.
*/
trait MergePaimonScalarSubqueriersBase extends Rule[LogicalPlan] with PredicateHelper {
trait MergePaimonScalarSubqueriesBase extends Rule[LogicalPlan] with PredicateHelper {
def apply(plan: LogicalPlan): LogicalPlan = {
plan match {
// Subquery reuse needs to be enabled for this optimization.
case _ if !conf.getConf(SQLConf.SUBQUERY_REUSE_ENABLED) => plan
case _ if !conf.getConf(SQLConf.SUBQUERY_REUSE_ENABLED) && !existsPaimonScan(plan) => plan

// This rule does a whole plan traversal, no need to run on subqueries.
case _: Subquery => plan
Expand All @@ -56,6 +56,13 @@ trait MergePaimonScalarSubqueriersBase extends Rule[LogicalPlan] with PredicateH
}
}

private def existsPaimonScan(plan: LogicalPlan): Boolean = {
plan.find {
case r: DataSourceV2ScanRelation => r.scan.isInstanceOf[PaimonScan]
case _ => false
}.isDefined
}

/**
* An item in the cache of merged scalar subqueries.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.apache.paimon.spark.extensions

import org.apache.paimon.spark.catalyst.analysis.{PaimonAnalysis, PaimonDeleteTable, PaimonIncompatiblePHRRules, PaimonIncompatibleResolutionRules, PaimonMergeInto, PaimonPostHocResolutionRules, PaimonProcedureResolver, PaimonUpdateTable}
import org.apache.paimon.spark.catalyst.optimizer.{EvalSubqueriesForDeleteTable, MergePaimonScalarSubqueriers}
import org.apache.paimon.spark.catalyst.optimizer.{EvalSubqueriesForDeleteTable, MergePaimonScalarSubqueries}
import org.apache.paimon.spark.catalyst.plans.logical.PaimonTableValuedFunctions
import org.apache.paimon.spark.execution.PaimonStrategy

Expand Down Expand Up @@ -54,7 +54,7 @@ class PaimonSparkSessionExtensions extends (SparkSessionExtensions => Unit) {

// optimization rules
extensions.injectOptimizerRule(_ => EvalSubqueriesForDeleteTable)
extensions.injectOptimizerRule(_ => MergePaimonScalarSubqueriers)
extensions.injectOptimizerRule(_ => MergePaimonScalarSubqueries)

// planner extensions
extensions.injectPlannerStrategy(spark => PaimonStrategy(spark))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.paimon.spark.sql

import org.apache.paimon.Snapshot.CommitKind
import org.apache.paimon.spark.PaimonSparkTestBase
import org.apache.paimon.spark.catalyst.optimizer.MergePaimonScalarSubqueriers
import org.apache.paimon.spark.catalyst.optimizer.MergePaimonScalarSubqueries

import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions.{Attribute, CreateNamedStruct, Literal, NamedExpression}
Expand All @@ -39,7 +39,7 @@ abstract class PaimonOptimizationTestBase extends PaimonSparkTestBase {

private object Optimize extends RuleExecutor[LogicalPlan] {
val batches: immutable.Seq[Batch] =
Batch("MergePaimonScalarSubqueries", Once, MergePaimonScalarSubqueriers) :: Nil
Batch("MergePaimonScalarSubqueries", Once, MergePaimonScalarSubqueries) :: Nil
}

test("Paimon Optimization: merge scalar subqueries") {
Expand Down
Loading