Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
loneylee committed Aug 15, 2024
1 parent 215adb8 commit 25cacc5
Show file tree
Hide file tree
Showing 5 changed files with 14 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ class GlutenExecutorEndpoint(val executorId: String, val conf: SparkConf)
context.reply(
CacheJobInfo(status = false, "", s"executor: $executorId cache data failed."))
}
case GlutenMergeTreeCacheLoadStatus(jobId) =>
case GlutenCacheLoadStatus(jobId) =>
val status = CHNativeCacheManager.getCacheStatus(jobId)
context.reply(status)
case GlutenFilesCacheLoad(files) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ object GlutenRpcMessages {
case class GlutenMergeTreeCacheLoad(mergeTreeTable: String, columns: util.Set[String])
extends GlutenRpcMessage

case class GlutenMergeTreeCacheLoadStatus(jobId: String)
case class GlutenCacheLoadStatus(jobId: String)

case class CacheJobInfo(status: Boolean, jobId: String, reason: String = "")
extends GlutenRpcMessage
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,12 @@
*/
package org.apache.spark.sql.execution.commands

import org.apache.gluten.execution.CacheResult
import org.apache.gluten.expression.ConverterUtils
import org.apache.gluten.substrait.rel.ExtensionTableBuilder

import org.apache.spark.affinity.CHAffinity
import org.apache.spark.rpc.GlutenDriverEndpoint
import org.apache.spark.rpc.GlutenRpcMessages.{CacheJobInfo, GlutenMergeTreeCacheLoad, GlutenMergeTreeCacheLoadStatus}
import org.apache.spark.rpc.GlutenRpcMessages.{CacheJobInfo, GlutenMergeTreeCacheLoad}
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, GreaterThanOrEqual, IsNotNull, Literal}
Expand Down Expand Up @@ -224,14 +223,6 @@ case class GlutenCHCacheDataCommand(
})
}

val fetchStatus: (String, String) => Future[CacheResult] =
(executorId: String, jobId: String) => {
GlutenDriverEndpoint.executorDataMap
.get(toExecutorId(executorId))
.executorEndpointRef
.ask[CacheResult](GlutenMergeTreeCacheLoadStatus(jobId))
}

getResult(futureList, fetchStatus, asynExecute)
getResult(futureList, asynExecute)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.apache.gluten.execution.CacheResult
import org.apache.gluten.execution.CacheResult.Status

import org.apache.spark.rpc.GlutenDriverEndpoint
import org.apache.spark.rpc.GlutenRpcMessages.CacheJobInfo
import org.apache.spark.rpc.GlutenRpcMessages.{CacheJobInfo, GlutenCacheLoadStatus}
import org.apache.spark.sql.Row
import org.apache.spark.util.ThreadUtils

Expand Down Expand Up @@ -103,14 +103,19 @@ trait GlutenCacheBase {

protected def getResult(
futureList: ArrayBuffer[(String, Future[CacheJobInfo])],
fetchStatus: (String, String) => Future[CacheResult],
async: Boolean): Seq[Row] = {
val resultList = waitRpcResults(futureList)
if (async) {
val res = collectJobTriggerResult(resultList)
Seq(Row(res._1, res._2.mkString(";")))
} else {

val fetchStatus: (String, String) => Future[CacheResult] =
(executorId: String, jobId: String) => {
GlutenDriverEndpoint.executorDataMap
.get(toExecutorId(executorId))
.executorEndpointRef
.ask[CacheResult](GlutenCacheLoadStatus(jobId))
}
val res = waitAllJobFinish(resultList, fetchStatus)
Seq(Row(res._1, res._2))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,12 @@
*/
package org.apache.spark.sql.execution.commands

import org.apache.gluten.execution.CacheResult
import org.apache.gluten.substrait.rel.LocalFilesBuilder
import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat

import org.apache.spark.affinity.CHAffinity
import org.apache.spark.rpc.GlutenDriverEndpoint
import org.apache.spark.rpc.GlutenRpcMessages.{CacheJobInfo, GlutenFilesCacheLoad, GlutenFilesCacheLoadStatus}
import org.apache.spark.rpc.GlutenRpcMessages.{CacheJobInfo, GlutenFilesCacheLoad}
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.execution.command.LeafRunnableCommand
Expand Down Expand Up @@ -146,14 +145,7 @@ case class GlutenCacheFilesCommand(
}
}

val fetchStatus: (String, String) => Future[CacheResult] =
(executorId: String, jobId: String) => {
GlutenDriverEndpoint.executorDataMap
.get(toExecutorId(executorId))
.executorEndpointRef
.ask[CacheResult](GlutenFilesCacheLoadStatus(jobId))
}
getResult(futureList, fetchStatus, async)
getResult(futureList, async)
}

private def listFiles(targetFile: Path, recursive: Boolean, fs: FileSystem): Seq[FileStatus] = {
Expand Down

0 comments on commit 25cacc5

Please sign in to comment.