diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenExecutorEndpoint.scala b/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenExecutorEndpoint.scala index 97c040298fd9d..7f2b94eea314e 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenExecutorEndpoint.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenExecutorEndpoint.scala @@ -79,7 +79,7 @@ class GlutenExecutorEndpoint(val executorId: String, val conf: SparkConf) context.reply( CacheJobInfo(status = false, "", s"executor: $executorId cache data failed.")) } - case GlutenMergeTreeCacheLoadStatus(jobId) => + case GlutenCacheLoadStatus(jobId) => val status = CHNativeCacheManager.getCacheStatus(jobId) context.reply(status) case GlutenFilesCacheLoad(files) => diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenRpcMessages.scala b/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenRpcMessages.scala index c2467f19b9639..e596e94fed722 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenRpcMessages.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenRpcMessages.scala @@ -39,7 +39,7 @@ object GlutenRpcMessages { case class GlutenMergeTreeCacheLoad(mergeTreeTable: String, columns: util.Set[String]) extends GlutenRpcMessage - case class GlutenMergeTreeCacheLoadStatus(jobId: String) + case class GlutenCacheLoadStatus(jobId: String) case class CacheJobInfo(status: Boolean, jobId: String, reason: String = "") extends GlutenRpcMessage diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCHCacheDataCommand.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCHCacheDataCommand.scala index 09ea6610b430c..6e9201d17336a 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCHCacheDataCommand.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCHCacheDataCommand.scala @@ -16,13 +16,12 @@ */ package org.apache.spark.sql.execution.commands -import org.apache.gluten.execution.CacheResult import org.apache.gluten.expression.ConverterUtils import org.apache.gluten.substrait.rel.ExtensionTableBuilder import org.apache.spark.affinity.CHAffinity import org.apache.spark.rpc.GlutenDriverEndpoint -import org.apache.spark.rpc.GlutenRpcMessages.{CacheJobInfo, GlutenMergeTreeCacheLoad, GlutenMergeTreeCacheLoadStatus} +import org.apache.spark.rpc.GlutenRpcMessages.{CacheJobInfo, GlutenMergeTreeCacheLoad} import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, GreaterThanOrEqual, IsNotNull, Literal} @@ -224,14 +223,6 @@ case class GlutenCHCacheDataCommand( }) } - val fetchStatus: (String, String) => Future[CacheResult] = - (executorId: String, jobId: String) => { - GlutenDriverEndpoint.executorDataMap - .get(toExecutorId(executorId)) - .executorEndpointRef - .ask[CacheResult](GlutenMergeTreeCacheLoadStatus(jobId)) - } - - getResult(futureList, fetchStatus, asynExecute) + getResult(futureList, asynExecute) } } diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCacheBase.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCacheBase.scala index bf6846afd2b16..2907febf8028e 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCacheBase.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCacheBase.scala @@ -21,7 +21,7 @@ import org.apache.gluten.execution.CacheResult import org.apache.gluten.execution.CacheResult.Status import org.apache.spark.rpc.GlutenDriverEndpoint -import org.apache.spark.rpc.GlutenRpcMessages.CacheJobInfo +import org.apache.spark.rpc.GlutenRpcMessages.{CacheJobInfo, GlutenCacheLoadStatus} import org.apache.spark.sql.Row import org.apache.spark.util.ThreadUtils @@ -103,14 +103,19 @@ trait GlutenCacheBase { protected def getResult( futureList: ArrayBuffer[(String, Future[CacheJobInfo])], - fetchStatus: (String, String) => Future[CacheResult], async: Boolean): Seq[Row] = { val resultList = waitRpcResults(futureList) if (async) { val res = collectJobTriggerResult(resultList) Seq(Row(res._1, res._2.mkString(";"))) } else { - + val fetchStatus: (String, String) => Future[CacheResult] = + (executorId: String, jobId: String) => { + GlutenDriverEndpoint.executorDataMap + .get(toExecutorId(executorId)) + .executorEndpointRef + .ask[CacheResult](GlutenCacheLoadStatus(jobId)) + } val res = waitAllJobFinish(resultList, fetchStatus) Seq(Row(res._1, res._2)) } diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCacheFilesCommand.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCacheFilesCommand.scala index 5c3e193563fb4..e535097ed9fcf 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCacheFilesCommand.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCacheFilesCommand.scala @@ -16,13 +16,12 @@ */ package org.apache.spark.sql.execution.commands -import org.apache.gluten.execution.CacheResult import org.apache.gluten.substrait.rel.LocalFilesBuilder import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat import org.apache.spark.affinity.CHAffinity import org.apache.spark.rpc.GlutenDriverEndpoint -import org.apache.spark.rpc.GlutenRpcMessages.{CacheJobInfo, GlutenFilesCacheLoad, GlutenFilesCacheLoadStatus} +import org.apache.spark.rpc.GlutenRpcMessages.{CacheJobInfo, GlutenFilesCacheLoad} import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.execution.command.LeafRunnableCommand @@ -146,14 +145,7 @@ case class GlutenCacheFilesCommand( } } - val fetchStatus: (String, String) => Future[CacheResult] = - (executorId: String, jobId: String) => { - GlutenDriverEndpoint.executorDataMap - .get(toExecutorId(executorId)) - .executorEndpointRef - .ask[CacheResult](GlutenFilesCacheLoadStatus(jobId)) - } - getResult(futureList, fetchStatus, async) + getResult(futureList, async) } private def listFiles(targetFile: Path, recursive: Boolean, fs: FileSystem): Seq[FileStatus] = {