Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
lgbo-ustc committed Aug 15, 2024
1 parent 8081b62 commit efe62a7
Showing 1 changed file with 6 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,6 @@ case class CHShuffledHashJoinExecTransformer(
override def genJoinParameters(): Any = {
val (isBHJ, isNullAwareAntiJoin, buildHashTableId): (Int, Int, String) = (0, 0, "")

// Don't use lef/right directly, they may be reordered in `HashJoinLikeExecTransformer`
val leftStats = getShuffleStageStatistics(streamedPlan)
val rightStats = getShuffleStageStatistics(buildPlan)
// Start with "JoinParameters:"
val joinParametersStr = new StringBuffer("JoinParameters:")
// isBHJ: 0 for SHJ, 1 for BHJ
Expand All @@ -138,14 +135,12 @@ case class CHShuffledHashJoinExecTransformer(
.append("\n")
logicalLink match {
case Some(join: Join) =>
val leftRowCount =
if (needSwitchChildren) join.left.stats.rowCount else join.right.stats.rowCount
val rightRowCount =
if (needSwitchChildren) join.right.stats.rowCount else join.left.stats.rowCount
val leftSizeInBytes =
if (needSwitchChildren) join.left.stats.sizeInBytes else join.right.stats.sizeInBytes
val rightSizeInBytes =
if (needSwitchChildren) join.right.stats.sizeInBytes else join.left.stats.sizeInBytes
val left = if (!needSwitchChildren) join.left else join.right
val right = if (!needSwitchChildren) join.right else join.left
val leftRowCount = left.stats.rowCount
val rightRowCount = right.stats.rowCount
val leftSizeInBytes = left.stats.sizeInBytes
val rightSizeInBytes = right.stats.sizeInBytes
val numPartitions = outputPartitioning.numPartitions
joinParametersStr
.append("leftRowCount=")
Expand All @@ -171,26 +166,6 @@ case class CHShuffledHashJoinExecTransformer(
.build()
BackendsApiManager.getTransformerApiInstance.packPBMessage(message)
}

private def getShuffleStageStatistics(plan: SparkPlan): ShuffleStageStaticstics = {
plan match {
case queryStage: ShuffleQueryStageExec =>
ShuffleStageStaticstics(
queryStage.shuffle.numPartitions,
queryStage.shuffle.numMappers,
queryStage.getRuntimeStatistics.rowCount)
case shuffle: ColumnarShuffleExchangeExec =>
// FIXEME: We cannot access shuffle.numPartitions and shuffle.numMappers here.
// Otherwise it will cause an exception `ProjectExecTransformer has column support mismatch`
ShuffleStageStaticstics(-1, -1, None)
case _ =>
if (plan.children.length == 1) {
getShuffleStageStatistics(plan.children.head)
} else {
ShuffleStageStaticstics(-1, -1, None)
}
}
}
}

case class CHBroadcastBuildSideRDD(
Expand Down

0 comments on commit efe62a7

Please sign in to comment.