Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GLUTEN-6923][CH] total_bytes_written is not updated in celeborn partition writers #6939

Merged
merged 3 commits into from
Aug 21, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
add checks
lgbo-ustc committed Aug 20, 2024
commit e7bbd400b6d4faa81091c3856c5eb5010cc69834
Original file line number Diff line number Diff line change
@@ -407,4 +407,11 @@ object CHBackendSettings extends BackendSettingsApi with Logging {
}
}

def getBroadcastThreshold: Long = {
val conf = SQLConf.get
conf
.getConf(SQLConf.ADAPTIVE_AUTO_BROADCASTJOIN_THRESHOLD)
.getOrElse(conf.autoBroadcastJoinThreshold)
}

}
Original file line number Diff line number Diff line change
@@ -18,6 +18,7 @@ package org.apache.gluten.backendsapi.clickhouse

import org.apache.gluten.GlutenConfig
import org.apache.gluten.backendsapi.{BackendsApiManager, SparkPlanExecApi}
import org.apache.gluten.exception.GlutenException
import org.apache.gluten.exception.GlutenNotSupportException
import org.apache.gluten.execution._
import org.apache.gluten.expression._
@@ -31,7 +32,7 @@ import org.apache.gluten.substrait.expression.{ExpressionBuilder, ExpressionNode
import org.apache.gluten.utils.{CHJoinValidateUtil, UnknownJoinStrategy}
import org.apache.gluten.vectorized.CHColumnarBatchSerializer

import org.apache.spark.{ShuffleDependency, SparkException}
import org.apache.spark.ShuffleDependency
import org.apache.spark.rdd.RDD
import org.apache.spark.serializer.Serializer
import org.apache.spark.shuffle.{GenShuffleWriterParameters, GlutenShuffleWriterWrapper, HashPartitioningWrapper}
@@ -539,9 +540,21 @@ class CHSparkPlanExecApi extends SparkPlanExecApi {
CHExecUtil.buildSideRDD(dataSize, newChild).collect

val batches = countsAndBytes.map(_._2)
val totalBatchesBytes = batches.map(_.length).sum
// totalBatchesBytes could be larger than the shuffle written bytes, so we double the threshold
// here.
if (
totalBatchesBytes < 0 ||
totalBatchesBytes.toLong > CHBackendSettings.getBroadcastThreshold * 2
) {
throw new GlutenException(
s"Cannot broadcast the table ($totalBatchesBytes) that is larger than threshold:" +
s" ${CHBackendSettings.getBroadcastThreshold}. Ensure the shuffle written" +
s"bytes is collected properly.")
}
val rawSize = dataSize.value
if (rawSize >= BroadcastExchangeExec.MAX_BROADCAST_TABLE_BYTES) {
throw new SparkException(
throw new GlutenException(
s"Cannot broadcast the table that is larger than 8GB: ${rawSize >> 30} GB")
}
val rowCount = countsAndBytes.map(_._1).sum