Skip to content

Commit

Permalink
[SPARK-46861][CORE] Avoid Deadlock in DAGScheduler
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

* The DAGScheduler could currently run into a deadlock with another thread if both access the partitions of the same RDD at the same time.
* To make progress in getCacheLocs, we require both exclusive access to the RDD partitions and the location cache. We first lock on the location cache, and then on the RDD.
* When accessing partitions of an RDD, the RDD first acquires exclusive access on the partitions, and then might acquire exclusive access on the location cache.
* If thread 1 is able to acquire access on the RDD, while thread 2 holds the access to the location cache, we can run into a deadlock situation.
* To fix this, acquire locks in the same order. Change the DAGScheduler to first acquire the lock on the RDD, and then the lock on the location cache.

### Why are the changes needed?

* This is a deadlock you can run into, which can prevent any progress on the cluster.

### Does this PR introduce _any_ user-facing change?

* No

### How was this patch tested?

* Unit test that reproduces the issue.

### Was this patch authored or co-authored using generative AI tooling?

No

Closes apache#44882 from fred-db/fix-deadlock.

Authored-by: fred-db <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
  • Loading branch information
fred-db authored and dongjoon-hyun committed Jan 25, 2024
1 parent 1bee07e commit 617014c
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 18 deletions.
11 changes: 7 additions & 4 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -224,14 +224,17 @@ abstract class RDD[T: ClassTag](
* not use `this` because RDDs are user-visible, so users might have added their own locking on
* RDDs; sharing that could lead to a deadlock.
*
* One thread might hold the lock on many of these, for a chain of RDD dependencies; but
* because DAGs are acyclic, and we only ever hold locks for one path in that DAG, there is no
* chance of deadlock.
* One thread might hold the lock on many of these, for a chain of RDD dependencies. Deadlocks
* are possible if we try to lock another resource while holding the stateLock,
* and the lock acquisition sequence of these locks is not guaranteed to be the same.
* This can lead lead to a deadlock as one thread might first acquire the stateLock,
* and then the resource,
* while another thread might first acquire the resource, and then the stateLock.
*
* Executors may reference the shared fields (though they should never mutate them,
* that only happens on the driver).
*/
private val stateLock = new Serializable {}
private[spark] val stateLock = new Serializable {}

// Our dependencies and partitions will be gotten by calling subclass's methods below, and will
// be overwritten when we're checkpointed
Expand Down
31 changes: 18 additions & 13 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,9 @@ private[spark] class DAGScheduler(
* locations where that RDD partition is cached.
*
* All accesses to this map should be guarded by synchronizing on it (see SPARK-4454).
* If you need to access any RDD while synchronizing on the cache locations,
* first synchronize on the RDD, and then synchronize on this map to avoid deadlocks. The RDD
* could try to access the cache locations after synchronizing on the RDD.
*/
private val cacheLocs = new HashMap[Int, IndexedSeq[Seq[TaskLocation]]]

Expand Down Expand Up @@ -435,22 +438,24 @@ private[spark] class DAGScheduler(
}

private[scheduler]
def getCacheLocs(rdd: RDD[_]): IndexedSeq[Seq[TaskLocation]] = cacheLocs.synchronized {
// Note: this doesn't use `getOrElse()` because this method is called O(num tasks) times
if (!cacheLocs.contains(rdd.id)) {
// Note: if the storage level is NONE, we don't need to get locations from block manager.
val locs: IndexedSeq[Seq[TaskLocation]] = if (rdd.getStorageLevel == StorageLevel.NONE) {
IndexedSeq.fill(rdd.partitions.length)(Nil)
} else {
val blockIds =
rdd.partitions.indices.map(index => RDDBlockId(rdd.id, index)).toArray[BlockId]
blockManagerMaster.getLocations(blockIds).map { bms =>
bms.map(bm => TaskLocation(bm.host, bm.executorId))
def getCacheLocs(rdd: RDD[_]): IndexedSeq[Seq[TaskLocation]] = rdd.stateLock.synchronized {
cacheLocs.synchronized {
// Note: this doesn't use `getOrElse()` because this method is called O(num tasks) times
if (!cacheLocs.contains(rdd.id)) {
// Note: if the storage level is NONE, we don't need to get locations from block manager.
val locs: IndexedSeq[Seq[TaskLocation]] = if (rdd.getStorageLevel == StorageLevel.NONE) {
IndexedSeq.fill(rdd.partitions.length)(Nil)
} else {
val blockIds =
rdd.partitions.indices.map(index => RDDBlockId(rdd.id, index)).toArray[BlockId]
blockManagerMaster.getLocations(blockIds).map { bms =>
bms.map(bm => TaskLocation(bm.host, bm.executorId))
}
}
cacheLocs(rdd.id) = locs
}
cacheLocs(rdd.id) = locs
cacheLocs(rdd.id)
}
cacheLocs(rdd.id)
}

private def clearCacheLocs(): Unit = cacheLocs.synchronized {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import org.apache.spark.scheduler.local.LocalSchedulerBackend
import org.apache.spark.shuffle.{FetchFailedException, MetadataFetchFailedException}
import org.apache.spark.storage.{BlockId, BlockManager, BlockManagerId, BlockManagerMaster}
import org.apache.spark.util.{AccumulatorContext, AccumulatorV2, CallSite, Clock, LongAccumulator, SystemClock, Utils}
import org.apache.spark.util.{AccumulatorContext, AccumulatorV2, CallSite, Clock, LongAccumulator, SystemClock, ThreadUtils, Utils}
import org.apache.spark.util.ArrayImplicits._

class DAGSchedulerEventProcessLoopTester(dagScheduler: DAGScheduler)
Expand Down Expand Up @@ -612,6 +612,42 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
assertDataStructuresEmpty()
}

// Note that this test is NOT perfectly reproducible when there is a deadlock as it uses
// Thread.sleep, but it should never fail / flake when there is no deadlock.
// If this test starts to flake, this shows that there is a deadlock!
test("No Deadlock between getCacheLocs and CoalescedRDD") {
val rdd = sc.parallelize(1 to 10, numSlices = 10)
val coalescedRDD = rdd.coalesce(2)
val executionContext = ThreadUtils.newDaemonFixedThreadPool(
nThreads = 2, "test-getCacheLocs")
// Used to only make progress on getCacheLocs after we acquired the lock to the RDD.
val rddLock = new java.util.concurrent.Semaphore(0)
val partitionsFuture = executionContext.submit(new Runnable {
override def run(): Unit = {
coalescedRDD.stateLock.synchronized {
rddLock.release(1)
// Try to access the partitions of the coalescedRDD. This will cause a call to
// getCacheLocs internally.
Thread.sleep(5000)
coalescedRDD.partitions
}
}
})
val getCacheLocsFuture = executionContext.submit(new Runnable {
override def run(): Unit = {
rddLock.acquire()
// Access the cache locations.
// If the partition location cache is locked before the stateLock is locked,
// we'll run into a deadlock.
sc.dagScheduler.getCacheLocs(coalescedRDD)
}
})
// If any of the futures throw a TimeOutException, this shows that there is a deadlock between
// getCacheLocs and accessing partitions of an RDD.
getCacheLocsFuture.get(120, TimeUnit.SECONDS)
partitionsFuture.get(120, TimeUnit.SECONDS)
}

test("All shuffle files on the storage endpoint should be cleaned up when it is lost") {
conf.set(config.SHUFFLE_SERVICE_ENABLED.key, "true")
conf.set("spark.files.fetchFailure.unRegisterOutputOnHost", "true")
Expand Down

0 comments on commit 617014c

Please sign in to comment.