Skip to content

Commit

Permalink
SPARK-50386 - Improve SparkFatalException Propagation when OutOfMemor…
Browse files Browse the repository at this point in the history
…yError occurs on BroadcastExchangeExec building small table to broadcast
  • Loading branch information
erenavsarogullari committed Nov 21, 2024
1 parent b61411d commit d6d1653
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2067,7 +2067,7 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase with ExecutionE
"autoBroadcastJoinThreshold" -> SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key,
"driverMemory" -> SparkLauncher.DRIVER_MEMORY,
"analyzeTblMsg" -> analyzeTblMsg),
cause = oe.getCause)
cause = oe)
}

def executeCodePathUnsupportedError(execName: String): SparkUnsupportedOperationException = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -926,6 +926,9 @@ class AdaptiveQueryExecSuite
})

try {
object TestProblematicCoalesceStrategy extends BaseTestProblematicCoalesceStrategy {
override val throwable = new RuntimeException("coalesce test error")
}
spark.experimental.extraStrategies = TestProblematicCoalesceStrategy :: Nil
withSQLConf(
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
Expand Down Expand Up @@ -3086,6 +3089,69 @@ class AdaptiveQueryExecSuite
}
}
}

test("SPARK-50386: Check SparkFatalException message details when OutOfMemoryError " +
"occurs on BroadcastExchange execution") {
withTempView("t1", "t2") {
try {
object TestProblematicCoalesceStrategy extends BaseTestProblematicCoalesceStrategy {
override val throwable = new OutOfMemoryError("Java heap space")
}
spark.experimental.extraStrategies = TestProblematicCoalesceStrategy :: Nil
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") {
val joinedDF = createJoinedDF()

val error = intercept[SparkException] {
joinedDF.collect()
}

// error.getMessage was coming as non-null by default behavior
assert(error.getMessage contains
"Not enough memory to build and broadcast the table to all worker nodes.")
// error.getCause was coming as null by default behavior
assert(error.getCause != null, "SparkException' s cause property has to be non-null")
assert(error.getCause.toString == "java.lang.OutOfMemoryError: Java heap space")
assert(error.getCause.getMessage == "Java heap space")
}
} finally {
spark.experimental.extraStrategies = Nil
}
}
}

test("SPARK-50386: Check SparkFatalException message details when InterruptedException " +
"occurs on BroadcastExchange execution") {
withTempView("t1", "t2") {
try {
object TestProblematicCoalesceStrategy extends BaseTestProblematicCoalesceStrategy {
override val throwable = new InterruptedException()
}
spark.experimental.extraStrategies = TestProblematicCoalesceStrategy :: Nil
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
val joinedDF = createJoinedDF()
val error = intercept[java.util.concurrent.ExecutionException] {
joinedDF.collect()
}
val errorMessage =
"org.apache.spark.util.SparkFatalException: java.lang.InterruptedException"
// All following properties were coming as non-null by default behavior
assert(error.getMessage == errorMessage)
assert(error.getCause != null, "SparkFatalException' s cause has to be non-null")
assert(error.getCause.toString == errorMessage)
assert(error.getCause.getMessage == "java.lang.InterruptedException")
}
} finally {
spark.experimental.extraStrategies = Nil
}
}
}

private def createJoinedDF(): DataFrame = {
spark.range(10).toDF("col1").createTempView("t1")
spark.range(5).coalesce(2).toDF("col2").createTempView("t2")
sql("SELECT /*+ BROADCAST(t2) */ * FROM t1 INNER JOIN t2 ON t1.col1 = t2.col2;")
}

}

/**
Expand All @@ -3109,25 +3175,31 @@ private case class SimpleShuffleSortCostEvaluator() extends CostEvaluator {
/**
* Helps to simulate ExchangeQueryStageExec materialization failure.
*/
private object TestProblematicCoalesceStrategy extends Strategy {
private case class TestProblematicCoalesceExec(numPartitions: Int, child: SparkPlan)
extends UnaryExecNode {
override protected def doExecute(): RDD[InternalRow] = {
child.execute().mapPartitions { _ =>
throw new RuntimeException("coalesce test error")
}
}
override def output: Seq[Attribute] = child.output
override protected def withNewChildInternal(newChild: SparkPlan): TestProblematicCoalesceExec =
copy(child = newChild)
}
private trait BaseTestProblematicCoalesceStrategy extends Strategy {

val throwable: Throwable

override def apply(plan: LogicalPlan): Seq[SparkPlan] = {
plan match {
case org.apache.spark.sql.catalyst.plans.logical.Repartition(
numPartitions, false, child) =>
TestProblematicCoalesceExec(numPartitions, planLater(child)) :: Nil
TestProblematicCoalesceExec(numPartitions, planLater(child), throwable) :: Nil
case _ => Nil
}
}
}

private case class TestProblematicCoalesceExec(numPartitions: Int, child: SparkPlan, throwable: Throwable)
extends UnaryExecNode {
override protected def doExecute(): RDD[InternalRow] = {
throwable match {
case _: RuntimeException => child.execute().mapPartitions { _ =>
throw throwable
}
case _ => throw throwable
}
}
override def output: Seq[Attribute] = child.output
override protected def withNewChildInternal(newChild: SparkPlan): TestProblematicCoalesceExec =
copy(child = newChild)
}

0 comments on commit d6d1653

Please sign in to comment.