From 3ebea2be1116da5a805725f75de52d7147d76ac4 Mon Sep 17 00:00:00 2001 From: v-kkhuang <62878639+v-kkhuang@users.noreply.github.com> Date: Tue, 10 Dec 2024 14:36:09 +0800 Subject: [PATCH] [1.10.0] hive engine add yarn log (#656) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * hive engine add real yarn queue print * Optimization of Hive Log Printing --------- Co-authored-by: “v_kkhuang” <“420895376@qq.com”> --- .../executor/conf/ComputationExecutorConf.scala | 2 +- .../executor/execute/ComputationExecutor.scala | 7 +++++-- .../hive/conf/HiveEngineConfiguration.scala | 6 ++++++ .../hive/creation/HiveEngineConnFactory.scala | 10 +++------- .../executor/HiveEngineConcurrentConnExecutor.scala | 4 ++++ .../hive/executor/HiveEngineConnExecutor.scala | 2 ++ 6 files changed, 21 insertions(+), 10 deletions(-) diff --git a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/conf/ComputationExecutorConf.scala b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/conf/ComputationExecutorConf.scala index 575f0165fa..41dc4b9783 100644 --- a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/conf/ComputationExecutorConf.scala +++ b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/conf/ComputationExecutorConf.scala @@ -35,7 +35,7 @@ object ComputationExecutorConf { val PRINT_TASK_PARAMS_SKIP_KEYS = CommonVars( "linkis.engineconn.print.task.params.skip.keys", - "jobId", + "jobId,wds.linkis.rm.yarnqueue", "skip to print params key at job logs" ) diff --git a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/execute/ComputationExecutor.scala b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/execute/ComputationExecutor.scala index b156ea9aa0..dc35385276 100644 --- a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/execute/ComputationExecutor.scala +++ b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/execute/ComputationExecutor.scala @@ -414,10 +414,13 @@ abstract class ComputationExecutor(val outputPrintLimit: Int = 1000) def printTaskParamsLog(engineExecutorContext: EngineExecutionContext): Unit = { val sb = new StringBuilder - EngineConnObject.getEngineCreationContext.getOptions.asScala.foreach({ case (key, value) => // skip log jobId because it corresponding jobid when the ec created - if (!ComputationExecutorConf.PRINT_TASK_PARAMS_SKIP_KEYS.getValue.contains(key)) { + if ( + !ComputationExecutorConf.PRINT_TASK_PARAMS_SKIP_KEYS.getValue + .split(",") + .exists(_.equals(key)) + ) { sb.append(s"${key}=${value}\n") } }) diff --git a/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/conf/HiveEngineConfiguration.scala b/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/conf/HiveEngineConfiguration.scala index 4de8f02f52..ba6be619b6 100644 --- a/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/conf/HiveEngineConfiguration.scala +++ b/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/conf/HiveEngineConfiguration.scala @@ -46,4 +46,10 @@ object HiveEngineConfiguration { val HIVE_RANGER_ENABLE = CommonVars[Boolean]("linkis.hive.ranger.enabled", false).getValue + val HIVE_QUEUE_NAME: String = "mapreduce.job.queuename" + + val BDP_QUEUE_NAME: String = "wds.linkis.rm.yarnqueue" + + val HIVE_TEZ_QUEUE_NAME: String = "tez.queue.name" + } diff --git a/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/creation/HiveEngineConnFactory.scala b/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/creation/HiveEngineConnFactory.scala index a9b217074d..43d845e6f3 100644 --- a/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/creation/HiveEngineConnFactory.scala +++ b/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/creation/HiveEngineConnFactory.scala @@ -59,10 +59,6 @@ import scala.collection.JavaConverters._ class HiveEngineConnFactory extends ComputationSingleExecutorEngineConnFactory with Logging { - private val HIVE_QUEUE_NAME: String = "mapreduce.job.queuename" - private val BDP_QUEUE_NAME: String = "wds.linkis.rm.yarnqueue" - private val HIVE_TEZ_QUEUE_NAME: String = "tez.queue.name" - override protected def newExecutor( id: Int, engineCreationContext: EngineCreationContext, @@ -188,10 +184,10 @@ class HiveEngineConnFactory extends ComputationSingleExecutorEngineConnFactory w } .foreach { case (k, v) => logger.info(s"key is $k, value is $v") - if (BDP_QUEUE_NAME.equals(k)) { - hiveConf.set(HIVE_QUEUE_NAME, v) + if (HiveEngineConfiguration.BDP_QUEUE_NAME.equals(k)) { + hiveConf.set(HiveEngineConfiguration.HIVE_QUEUE_NAME, v) if ("tez".equals(HiveEngineConfiguration.HIVE_ENGINE_TYPE)) { - hiveConf.set(HIVE_TEZ_QUEUE_NAME, v) + hiveConf.set(HiveEngineConfiguration.HIVE_TEZ_QUEUE_NAME, v) } } else hiveConf.set(k, v) } diff --git a/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConcurrentConnExecutor.scala b/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConcurrentConnExecutor.scala index 47496cf17a..b5abb36530 100644 --- a/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConcurrentConnExecutor.scala +++ b/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConcurrentConnExecutor.scala @@ -241,6 +241,10 @@ class HiveEngineConcurrentConnExecutor( engineExecutorContext.appendStdout( s"Your hive taskId: $taskId has $numberOfJobs MR jobs to do" ) + val queueName = hiveConf.get(HiveEngineConfiguration.HIVE_QUEUE_NAME) + engineExecutorContext.appendStdout( + s"Your task will be submitted to the $queueName queue" + ) } logger.info(s"there are ${numberOfJobs} jobs.") diff --git a/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala b/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala index 5499cb3d62..e8eccd35a7 100644 --- a/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala +++ b/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala @@ -273,6 +273,8 @@ class HiveEngineConnExecutor( } if (numberOfMRJobs > 0) { engineExecutorContext.appendStdout(s"Your hive sql has $numberOfMRJobs MR jobs to do") + val queueName = hiveConf.get(HiveEngineConfiguration.HIVE_QUEUE_NAME) + engineExecutorContext.appendStdout(s"Your task will be submitted to the $queueName queue") } if (thread.isInterrupted) { logger.error(