From d85a1b87904ba813423b43910105ce693c01aa42 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Cv=5Fkkhuang=E2=80=9D?= <“420895376@qq.com”> Date: Wed, 20 Nov 2024 16:00:59 +0800 Subject: [PATCH 1/2] hive engine add real yarn queue print --- .../hive/conf/HiveEngineConfiguration.scala | 6 ++++++ .../hive/creation/HiveEngineConnFactory.scala | 10 +++------- .../executor/HiveEngineConcurrentConnExecutor.scala | 4 ++++ .../hive/executor/HiveEngineConnExecutor.scala | 2 ++ 4 files changed, 15 insertions(+), 7 deletions(-) diff --git a/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/conf/HiveEngineConfiguration.scala b/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/conf/HiveEngineConfiguration.scala index 4de8f02f52..ba6be619b6 100644 --- a/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/conf/HiveEngineConfiguration.scala +++ b/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/conf/HiveEngineConfiguration.scala @@ -46,4 +46,10 @@ object HiveEngineConfiguration { val HIVE_RANGER_ENABLE = CommonVars[Boolean]("linkis.hive.ranger.enabled", false).getValue + val HIVE_QUEUE_NAME: String = "mapreduce.job.queuename" + + val BDP_QUEUE_NAME: String = "wds.linkis.rm.yarnqueue" + + val HIVE_TEZ_QUEUE_NAME: String = "tez.queue.name" + } diff --git a/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/creation/HiveEngineConnFactory.scala b/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/creation/HiveEngineConnFactory.scala index a9b217074d..43d845e6f3 100644 --- a/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/creation/HiveEngineConnFactory.scala +++ b/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/creation/HiveEngineConnFactory.scala @@ -59,10 +59,6 @@ import scala.collection.JavaConverters._ class HiveEngineConnFactory extends ComputationSingleExecutorEngineConnFactory with Logging { - private val HIVE_QUEUE_NAME: String = "mapreduce.job.queuename" - private val BDP_QUEUE_NAME: String = "wds.linkis.rm.yarnqueue" - private val HIVE_TEZ_QUEUE_NAME: String = "tez.queue.name" - override protected def newExecutor( id: Int, engineCreationContext: EngineCreationContext, @@ -188,10 +184,10 @@ class HiveEngineConnFactory extends ComputationSingleExecutorEngineConnFactory w } .foreach { case (k, v) => logger.info(s"key is $k, value is $v") - if (BDP_QUEUE_NAME.equals(k)) { - hiveConf.set(HIVE_QUEUE_NAME, v) + if (HiveEngineConfiguration.BDP_QUEUE_NAME.equals(k)) { + hiveConf.set(HiveEngineConfiguration.HIVE_QUEUE_NAME, v) if ("tez".equals(HiveEngineConfiguration.HIVE_ENGINE_TYPE)) { - hiveConf.set(HIVE_TEZ_QUEUE_NAME, v) + hiveConf.set(HiveEngineConfiguration.HIVE_TEZ_QUEUE_NAME, v) } } else hiveConf.set(k, v) } diff --git a/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConcurrentConnExecutor.scala b/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConcurrentConnExecutor.scala index 47496cf17a..45a4d44c7c 100644 --- a/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConcurrentConnExecutor.scala +++ b/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConcurrentConnExecutor.scala @@ -241,6 +241,10 @@ class HiveEngineConcurrentConnExecutor( engineExecutorContext.appendStdout( s"Your hive taskId: $taskId has $numberOfJobs MR jobs to do" ) + val queueName = hiveConf.get(HiveEngineConfiguration.HIVE_QUEUE_NAME) + engineExecutorContext.appendStdout( + s"Your task will be submitted to $queueName queue" + ) } logger.info(s"there are ${numberOfJobs} jobs.") diff --git a/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala b/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala index 5499cb3d62..b7b832db77 100644 --- a/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala +++ b/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala @@ -273,6 +273,8 @@ class HiveEngineConnExecutor( } if (numberOfMRJobs > 0) { engineExecutorContext.appendStdout(s"Your hive sql has $numberOfMRJobs MR jobs to do") + val queueName = hiveConf.get(HiveEngineConfiguration.HIVE_QUEUE_NAME) + engineExecutorContext.appendStdout(s"Your task will be submitted to $queueName queue") } if (thread.isInterrupted) { logger.error( From a9c5c23c645bda230405955f48bfd18a3325afbd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Cv=5Fkkhuang=E2=80=9D?= <“420895376@qq.com”> Date: Tue, 3 Dec 2024 09:43:54 +0800 Subject: [PATCH 2/2] Optimization of Hive Log Printing --- .../executor/conf/ComputationExecutorConf.scala | 2 +- .../computation/executor/execute/ComputationExecutor.scala | 7 +++++-- .../hive/executor/HiveEngineConcurrentConnExecutor.scala | 2 +- .../hive/executor/HiveEngineConnExecutor.scala | 2 +- 4 files changed, 8 insertions(+), 5 deletions(-) diff --git a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/conf/ComputationExecutorConf.scala b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/conf/ComputationExecutorConf.scala index 575f0165fa..41dc4b9783 100644 --- a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/conf/ComputationExecutorConf.scala +++ b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/conf/ComputationExecutorConf.scala @@ -35,7 +35,7 @@ object ComputationExecutorConf { val PRINT_TASK_PARAMS_SKIP_KEYS = CommonVars( "linkis.engineconn.print.task.params.skip.keys", - "jobId", + "jobId,wds.linkis.rm.yarnqueue", "skip to print params key at job logs" ) diff --git a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/execute/ComputationExecutor.scala b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/execute/ComputationExecutor.scala index b156ea9aa0..dc35385276 100644 --- a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/execute/ComputationExecutor.scala +++ b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/execute/ComputationExecutor.scala @@ -414,10 +414,13 @@ abstract class ComputationExecutor(val outputPrintLimit: Int = 1000) def printTaskParamsLog(engineExecutorContext: EngineExecutionContext): Unit = { val sb = new StringBuilder - EngineConnObject.getEngineCreationContext.getOptions.asScala.foreach({ case (key, value) => // skip log jobId because it corresponding jobid when the ec created - if (!ComputationExecutorConf.PRINT_TASK_PARAMS_SKIP_KEYS.getValue.contains(key)) { + if ( + !ComputationExecutorConf.PRINT_TASK_PARAMS_SKIP_KEYS.getValue + .split(",") + .exists(_.equals(key)) + ) { sb.append(s"${key}=${value}\n") } }) diff --git a/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConcurrentConnExecutor.scala b/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConcurrentConnExecutor.scala index 45a4d44c7c..b5abb36530 100644 --- a/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConcurrentConnExecutor.scala +++ b/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConcurrentConnExecutor.scala @@ -243,7 +243,7 @@ class HiveEngineConcurrentConnExecutor( ) val queueName = hiveConf.get(HiveEngineConfiguration.HIVE_QUEUE_NAME) engineExecutorContext.appendStdout( - s"Your task will be submitted to $queueName queue" + s"Your task will be submitted to the $queueName queue" ) } diff --git a/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala b/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala index b7b832db77..e8eccd35a7 100644 --- a/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala +++ b/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala @@ -274,7 +274,7 @@ class HiveEngineConnExecutor( if (numberOfMRJobs > 0) { engineExecutorContext.appendStdout(s"Your hive sql has $numberOfMRJobs MR jobs to do") val queueName = hiveConf.get(HiveEngineConfiguration.HIVE_QUEUE_NAME) - engineExecutorContext.appendStdout(s"Your task will be submitted to $queueName queue") + engineExecutorContext.appendStdout(s"Your task will be submitted to the $queueName queue") } if (thread.isInterrupted) { logger.error(