From 940723a2f43d4f93210cc60081bbc4ba94129be3 Mon Sep 17 00:00:00 2001 From: zouxxyy Date: Thu, 18 Jul 2024 22:33:59 +0800 Subject: [PATCH 1/3] 1 --- .../optimizer/MergePaimonScalarSubqueriersBase.scala | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriersBase.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriersBase.scala index 45a086d09221..e9f50e8b4840 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriersBase.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriersBase.scala @@ -44,7 +44,7 @@ trait MergePaimonScalarSubqueriersBase extends Rule[LogicalPlan] with PredicateH def apply(plan: LogicalPlan): LogicalPlan = { plan match { // Subquery reuse needs to be enabled for this optimization. - case _ if !conf.getConf(SQLConf.SUBQUERY_REUSE_ENABLED) => plan + case _ if !conf.getConf(SQLConf.SUBQUERY_REUSE_ENABLED) && !containPaimonScan(plan) => plan // This rule does a whole plan traversal, no need to run on subqueries. case _: Subquery => plan @@ -56,6 +56,13 @@ trait MergePaimonScalarSubqueriersBase extends Rule[LogicalPlan] with PredicateH } } + private def containPaimonScan(plan: LogicalPlan): Boolean = { + plan.find { + case r: DataSourceV2ScanRelation => r.scan.isInstanceOf[PaimonScan] + case _ => false + }.isDefined + } + /** * An item in the cache of merged scalar subqueries. * From 936bd116691882d3ab6cf109e0e49728f35b172f Mon Sep 17 00:00:00 2001 From: zouxxyy Date: Thu, 18 Jul 2024 22:40:49 +0800 Subject: [PATCH 2/3] rename --- ...larSubqueriers.scala => MergePaimonScalarSubqueries.scala} | 2 +- ...larSubqueriers.scala => MergePaimonScalarSubqueries.scala} | 2 +- ...larSubqueriers.scala => MergePaimonScalarSubqueries.scala} | 2 +- .../catalyst/optimizer/EvalSubqueriesForDeleteTable.scala | 4 ++-- ...larSubqueriers.scala => MergePaimonScalarSubqueries.scala} | 2 +- ...eriersBase.scala => MergePaimonScalarSubqueriesBase.scala} | 2 +- .../spark/extensions/PaimonSparkSessionExtensions.scala | 4 ++-- .../apache/paimon/spark/sql/PaimonOptimizationTestBase.scala | 4 ++-- 8 files changed, 11 insertions(+), 11 deletions(-) rename paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/{MergePaimonScalarSubqueriers.scala => MergePaimonScalarSubqueries.scala} (97%) rename paimon-spark/paimon-spark-3.4/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/{MergePaimonScalarSubqueriers.scala => MergePaimonScalarSubqueries.scala} (97%) rename paimon-spark/paimon-spark-3.5/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/{MergePaimonScalarSubqueriers.scala => MergePaimonScalarSubqueries.scala} (97%) rename paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/{MergePaimonScalarSubqueriers.scala => MergePaimonScalarSubqueries.scala} (94%) rename paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/{MergePaimonScalarSubqueriersBase.scala => MergePaimonScalarSubqueriesBase.scala} (99%) diff --git a/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriers.scala b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueries.scala similarity index 97% rename from paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriers.scala rename to paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueries.scala index 8f6ad46719c6..77574a629b45 100644 --- a/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriers.scala +++ b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueries.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, Attri import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation -object MergePaimonScalarSubqueriers extends MergePaimonScalarSubqueriersBase { +object MergePaimonScalarSubqueries extends MergePaimonScalarSubqueriesBase { override def tryMergeDataSourceV2ScanRelation( newV2ScanRelation: DataSourceV2ScanRelation, diff --git a/paimon-spark/paimon-spark-3.4/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriers.scala b/paimon-spark/paimon-spark-3.4/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueries.scala similarity index 97% rename from paimon-spark/paimon-spark-3.4/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriers.scala rename to paimon-spark/paimon-spark-3.4/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueries.scala index 8a948ab8bd4b..c7cd70bb7ff2 100644 --- a/paimon-spark/paimon-spark-3.4/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriers.scala +++ b/paimon-spark/paimon-spark-3.4/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueries.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, Attri import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation -object MergePaimonScalarSubqueriers extends MergePaimonScalarSubqueriersBase { +object MergePaimonScalarSubqueries extends MergePaimonScalarSubqueriesBase { override def tryMergeDataSourceV2ScanRelation( newV2ScanRelation: DataSourceV2ScanRelation, diff --git a/paimon-spark/paimon-spark-3.5/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriers.scala b/paimon-spark/paimon-spark-3.5/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueries.scala similarity index 97% rename from paimon-spark/paimon-spark-3.5/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriers.scala rename to paimon-spark/paimon-spark-3.5/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueries.scala index 895a4b373cdc..2144f77f3a6c 100644 --- a/paimon-spark/paimon-spark-3.5/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriers.scala +++ b/paimon-spark/paimon-spark-3.5/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueries.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, Attri import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation -object MergePaimonScalarSubqueriers extends MergePaimonScalarSubqueriersBase { +object MergePaimonScalarSubqueries extends MergePaimonScalarSubqueriesBase { override def tryMergeDataSourceV2ScanRelation( newV2ScanRelation: DataSourceV2ScanRelation, diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/EvalSubqueriesForDeleteTable.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/EvalSubqueriesForDeleteTable.scala index aaaed10f1c9f..5d264370adcd 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/EvalSubqueriesForDeleteTable.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/EvalSubqueriesForDeleteTable.scala @@ -37,8 +37,8 @@ import scala.collection.JavaConverters._ * in advance. So that when running [[DeleteFromPaimonTableCommand]], we can directly call * dropPartitions to achieve fast deletion. * - * Note: this rule must be placed before [[MergePaimonScalarSubqueriers]], because - * [[MergePaimonScalarSubqueriers]] will merge subqueries. + * Note: this rule must be placed before [[MergePaimonScalarSubqueries]], because + * [[MergePaimonScalarSubqueries]] will merge subqueries. */ object EvalSubqueriesForDeleteTable extends Rule[LogicalPlan] with ExpressionHelper with Logging { diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriers.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueries.scala similarity index 94% rename from paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriers.scala rename to paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueries.scala index 0c0a0f9ee2a0..3f6801fe321a 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriers.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueries.scala @@ -21,7 +21,7 @@ package org.apache.paimon.spark.catalyst.optimizer import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.rules.Rule -object MergePaimonScalarSubqueriers extends Rule[LogicalPlan] { +object MergePaimonScalarSubqueries extends Rule[LogicalPlan] { override def apply(plan: LogicalPlan): LogicalPlan = { plan diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriersBase.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriesBase.scala similarity index 99% rename from paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriersBase.scala rename to paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriesBase.scala index e9f50e8b4840..b3a3d102d428 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriersBase.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriesBase.scala @@ -40,7 +40,7 @@ import scala.collection.mutable.ArrayBuffer * reused. So we extend the [[tryMergePlans]] method to check and merge * [[DataSourceV2ScanRelation]]s, thus we can merge scalar subqueries for paimon. */ -trait MergePaimonScalarSubqueriersBase extends Rule[LogicalPlan] with PredicateHelper { +trait MergePaimonScalarSubqueriesBase extends Rule[LogicalPlan] with PredicateHelper { def apply(plan: LogicalPlan): LogicalPlan = { plan match { // Subquery reuse needs to be enabled for this optimization. diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala index f770ac506c87..3cd2783221c9 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala @@ -19,7 +19,7 @@ package org.apache.paimon.spark.extensions import org.apache.paimon.spark.catalyst.analysis.{PaimonAnalysis, PaimonDeleteTable, PaimonIncompatiblePHRRules, PaimonIncompatibleResolutionRules, PaimonMergeInto, PaimonPostHocResolutionRules, PaimonProcedureResolver, PaimonUpdateTable} -import org.apache.paimon.spark.catalyst.optimizer.{EvalSubqueriesForDeleteTable, MergePaimonScalarSubqueriers} +import org.apache.paimon.spark.catalyst.optimizer.{EvalSubqueriesForDeleteTable, MergePaimonScalarSubqueries} import org.apache.paimon.spark.catalyst.plans.logical.PaimonTableValuedFunctions import org.apache.paimon.spark.execution.PaimonStrategy @@ -54,7 +54,7 @@ class PaimonSparkSessionExtensions extends (SparkSessionExtensions => Unit) { // optimization rules extensions.injectOptimizerRule(_ => EvalSubqueriesForDeleteTable) - extensions.injectOptimizerRule(_ => MergePaimonScalarSubqueriers) + extensions.injectOptimizerRule(_ => MergePaimonScalarSubqueries) // planner extensions extensions.injectPlannerStrategy(spark => PaimonStrategy(spark)) diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTestBase.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTestBase.scala index 70cb4b0c422d..78e8905fa969 100644 --- a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTestBase.scala +++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTestBase.scala @@ -20,7 +20,7 @@ package org.apache.paimon.spark.sql import org.apache.paimon.Snapshot.CommitKind import org.apache.paimon.spark.PaimonSparkTestBase -import org.apache.paimon.spark.catalyst.optimizer.MergePaimonScalarSubqueriers +import org.apache.paimon.spark.catalyst.optimizer.MergePaimonScalarSubqueries import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.expressions.{Attribute, CreateNamedStruct, Literal, NamedExpression} @@ -39,7 +39,7 @@ abstract class PaimonOptimizationTestBase extends PaimonSparkTestBase { private object Optimize extends RuleExecutor[LogicalPlan] { val batches: immutable.Seq[Batch] = - Batch("MergePaimonScalarSubqueries", Once, MergePaimonScalarSubqueriers) :: Nil + Batch("MergePaimonScalarSubqueries", Once, MergePaimonScalarSubqueries) :: Nil } test("Paimon Optimization: merge scalar subqueries") { From 926fc22b7257446f4f7002545290882faaba13dc Mon Sep 17 00:00:00 2001 From: zouxxyy Date: Thu, 18 Jul 2024 22:47:24 +0800 Subject: [PATCH 3/3] 1 --- .../catalyst/optimizer/MergePaimonScalarSubqueriesBase.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriesBase.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriesBase.scala index b3a3d102d428..eca8c9cdfced 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriesBase.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriesBase.scala @@ -44,7 +44,7 @@ trait MergePaimonScalarSubqueriesBase extends Rule[LogicalPlan] with PredicateHe def apply(plan: LogicalPlan): LogicalPlan = { plan match { // Subquery reuse needs to be enabled for this optimization. - case _ if !conf.getConf(SQLConf.SUBQUERY_REUSE_ENABLED) && !containPaimonScan(plan) => plan + case _ if !conf.getConf(SQLConf.SUBQUERY_REUSE_ENABLED) && !existsPaimonScan(plan) => plan // This rule does a whole plan traversal, no need to run on subqueries. case _: Subquery => plan @@ -56,7 +56,7 @@ trait MergePaimonScalarSubqueriesBase extends Rule[LogicalPlan] with PredicateHe } } - private def containPaimonScan(plan: LogicalPlan): Boolean = { + private def existsPaimonScan(plan: LogicalPlan): Boolean = { plan.find { case r: DataSourceV2ScanRelation => r.scan.isInstanceOf[PaimonScan] case _ => false