Skip to content

Commit

Permalink
Merge branch 'main' into celeborn-0.5.2
Browse files Browse the repository at this point in the history
  • Loading branch information
yikf authored Nov 27, 2024
2 parents 4d06abd + a2deffc commit e46a930
Show file tree
Hide file tree
Showing 9 changed files with 37 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import org.apache.gluten.columnarbatch.ColumnarBatches
import org.apache.gluten.exception.GlutenException
import org.apache.gluten.execution.GlutenPlan
import org.apache.gluten.extension.columnar.transition.{Convention, ConventionReq}
import org.apache.gluten.extension.columnar.transition.ConventionReq.KnownChildConvention
import org.apache.gluten.iterator.Iterators
import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators
import org.apache.gluten.utils.PullOutProjectHelper
Expand Down Expand Up @@ -212,15 +211,14 @@ case class ColumnarArrowEvalPythonExec(
child: SparkPlan,
evalType: Int)
extends EvalPythonExec
with GlutenPlan
with KnownChildConvention {
with GlutenPlan {

override def batchType(): Convention.BatchType = ArrowJavaBatch

override def rowType0(): Convention.RowType = Convention.RowType.None

override def requiredChildConvention(): Seq[ConventionReq] = List(
ConventionReq.of(ConventionReq.RowType.Any, ConventionReq.BatchType.Is(ArrowJavaBatch)))
ConventionReq.ofBatch(ConventionReq.BatchType.Is(ArrowJavaBatch)))

override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ class VeloxStringFunctionsSuite extends VeloxWholeStageTransformerSuite {
s"from $LINEITEM_TABLE limit $LENGTH")(checkGlutenOperatorMatch[ProjectExecTransformer])
}

ignore("locate") {
test("locate") {
runQueryAndCompare(
s"select l_orderkey, locate(l_comment, 'a', 1) " +
s"from $LINEITEM_TABLE limit $LENGTH")(checkGlutenOperatorMatch[ProjectExecTransformer])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,9 @@ abstract class ColumnarToColumnarExec(from: Convention.BatchType, to: Convention
Convention.RowType.None
}

override def requiredChildConvention(): Seq[ConventionReq] = List(
ConventionReq.of(ConventionReq.RowType.Any, ConventionReq.BatchType.Is(from)))
override def requiredChildConvention(): Seq[ConventionReq] = {
List(ConventionReq.ofBatch(ConventionReq.BatchType.Is(from)))
}

override protected def doExecute(): RDD[InternalRow] = throw new UnsupportedOperationException()
override protected def doExecuteColumnar(): RDD[ColumnarBatch] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,44 @@ import org.apache.gluten.extension.columnar.transition.{Convention, ConventionRe

import org.apache.spark.sql.execution.SparkPlan

/**
* Base interface for Query plan that defined by backends.
*
* The following Spark APIs are marked final so forbidden from overriding:
* - supportsColumnar
* - supportsRowBased (Spark version >= 3.3)
*
* Instead, subclasses are expected to implement the following APIs:
* - batchType
* - rowType0
* - requiredChildConvention (optional)
*
* With implementations of the APIs provided, Gluten query planner will be able to find and insert
* proper transitions between different plan nodes.
*
* Implementing `requiredChildConvention` is optional while the default implementation is a sequence
* of convention reqs that are exactly the same with the output convention. If it's not the case for
* some plan types, then the API should be overridden. For example, a typical row-to-columnar
* transition is at the same time a query plan node that requires for row input however produces
* columnar output.
*/
trait GlutenPlan
extends SparkPlan
with Convention.KnownBatchType
with Convention.KnownRowTypeForSpark33AndLater
with Convention.KnownRowTypeForSpark33OrLater
with GlutenPlan.SupportsRowBasedCompatible
with ConventionReq.KnownChildConvention {

final override val supportsColumnar: Boolean = {
batchType() != Convention.BatchType.None
}

override def batchType(): Convention.BatchType

final override val supportsRowBased: Boolean = {
rowType() != Convention.RowType.None
}

override def batchType(): Convention.BatchType

override def rowType0(): Convention.RowType

override def requiredChildConvention(): Seq[ConventionReq] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ object GlutenPlanModel {
constraintSet: PropertySet[SparkPlan])
extends LeafExecNode
with Convention.KnownBatchType
with Convention.KnownRowTypeForSpark33AndLater
with Convention.KnownRowTypeForSpark33OrLater
with GlutenPlan.SupportsRowBasedCompatible {
private val req: Conv.Req = constraintSet.get(ConvDef).asInstanceOf[Conv.Req]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,9 +157,9 @@ object Convention {
def rowType(): RowType
}

trait KnownRowTypeForSpark33AndLater extends KnownRowType {
trait KnownRowTypeForSpark33OrLater extends KnownRowType {
this: SparkPlan =>
import KnownRowTypeForSpark33AndLater._
import KnownRowTypeForSpark33OrLater._

final override def rowType(): RowType = {
if (lteSpark32) {
Expand All @@ -180,7 +180,7 @@ object Convention {
def rowType0(): RowType
}

object KnownRowTypeForSpark33AndLater {
object KnownRowTypeForSpark33OrLater {
private val lteSpark32: Boolean = {
val v = SparkVersionUtil.majorMinorVersion()
SparkVersionUtil.compareMajorMinorVersion(v, (3, 2)) <= 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,17 +90,11 @@ object Transitions {
}

def toRowPlan(plan: SparkPlan): SparkPlan = {
enforceReq(
plan,
ConventionReq.of(
ConventionReq.RowType.Is(Convention.RowType.VanillaRow),
ConventionReq.BatchType.Any))
enforceReq(plan, ConventionReq.row)
}

def toBatchPlan(plan: SparkPlan, toBatchType: Convention.BatchType): SparkPlan = {
enforceReq(
plan,
ConventionReq.of(ConventionReq.RowType.Any, ConventionReq.BatchType.Is(toBatchType)))
enforceReq(plan, ConventionReq.ofBatch(ConventionReq.BatchType.Is(toBatchType)))
}

def enforceReq(plan: SparkPlan, req: ConventionReq): SparkPlan = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package org.apache.gluten.execution

import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.extension.columnar.transition.{Convention, ConventionReq}
import org.apache.gluten.extension.columnar.transition.ConventionReq.KnownChildConvention

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
Expand All @@ -29,7 +28,6 @@ import org.apache.spark.sql.execution.{ColumnarToRowTransition, SparkPlan}

abstract class ColumnarToRowExecBase(child: SparkPlan)
extends ColumnarToRowTransition
with KnownChildConvention
with ValidatablePlan {

// Note: "metrics" is made transient to avoid sending driver-side metrics to tasks.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ case class ColumnarCollapseTransformStages(
case class ColumnarInputAdapter(child: SparkPlan)
extends InputAdapterGenerateTreeStringShim
with Convention.KnownBatchType
with Convention.KnownRowTypeForSpark33AndLater
with Convention.KnownRowTypeForSpark33OrLater
with GlutenPlan.SupportsRowBasedCompatible
with ConventionReq.KnownChildConvention {
override def output: Seq[Attribute] = child.output
Expand Down

0 comments on commit e46a930

Please sign in to comment.