Skip to content

Commit

Permalink
Fix thread contention issue
Browse files Browse the repository at this point in the history
  • Loading branch information
AdalbertMemSQL committed Jun 12, 2024
1 parent 86c0ea5 commit 770a571
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ class AggregatorParallelReadListener(applicationId: String) extends SparkListene
connectionsMap += (tableName -> conn)
)

log.info(s"Creating result table '$tableName'")
try {
// Create result table
JdbcHelpers.createResultTable(
Expand All @@ -94,9 +95,13 @@ class AggregatorParallelReadListener(applicationId: String) extends SparkListene
singleStoreRDDInfo.needsRepartition,
singleStoreRDDInfo.repartitionColumns
)
log.info(s"Successfully created result table '$tableName'")
} catch {
// Cancel execution if we failed to create a result table
case _: SQLException => singleStoreRDDInfo.sc.cancelStage(stageId)
case e: SQLException => {
singleStoreRDDInfo.sc.cancelStage(stageId)
throw e
}
}
})
}
Expand All @@ -117,7 +122,9 @@ class AggregatorParallelReadListener(applicationId: String) extends SparkListene
.get(tableName)
.foreach(conn => {
// Drop result table
log.info(s"Dropping result table '$tableName'")
JdbcHelpers.dropResultTable(conn, tableName)
log.info(s"Successfully dropped result table '$tableName'")
// Close connection
conn.close()
// Delete connection from map
Expand Down
12 changes: 7 additions & 5 deletions src/main/scala/com/singlestore/spark/SinglestoreRDD.scala
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
package com.singlestore.spark

import java.sql.{Connection, PreparedStatement, ResultSet, SQLTransientConnectionException}
import java.util.concurrent.Executors

import java.sql.{Connection, PreparedStatement, ResultSet}
import java.util.concurrent.{Executors, ForkJoinPool}
import com.singlestore.spark.SQLGen.VariableList
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Row
Expand Down Expand Up @@ -54,7 +53,7 @@ case class SinglestoreRDD(query: String,
override def compute(rawPartition: Partition, context: TaskContext): Iterator[Row] = {
val multiPartition: SinglestoreMultiPartition =
rawPartition.asInstanceOf[SinglestoreMultiPartition]
val threadPool = Executors.newFixedThreadPool(multiPartition.partitions.size)
val threadPool = new ForkJoinPool(multiPartition.partitions.size)
try {
val executionContext =
ExecutionContext.fromExecutor(threadPool)
Expand Down Expand Up @@ -121,13 +120,16 @@ case class SinglestoreRDD(query: String,
}

var lastError: java.sql.SQLException = null
var delay = 50
val maxDelay = 10000
while (rs == null && (timeout == 0 || System.currentTimeMillis() - startTime < timeout)) {
try {
rs = stmt.executeQuery()
} catch {
case e: java.sql.SQLException if e.getErrorCode == ErrResultTableNotExistCode =>
lastError = e
Thread.sleep(10)
delay = Math.min(maxDelay, delay * 2)
Thread.sleep(delay)
}
}

Expand Down

0 comments on commit 770a571

Please sign in to comment.