From ab849c71862e72cdaf9396e81c28a6d497503e30 Mon Sep 17 00:00:00 2001 From: aiceflower Date: Mon, 23 Dec 2024 23:29:21 +0800 Subject: [PATCH] fix map to json error --- .../io/iteraceptor/IOMethodInterceptor.scala | 21 +++++---- .../monitor/EngineConnMonitor.scala | 45 ++++++++++++++++--- 2 files changed, 48 insertions(+), 18 deletions(-) diff --git a/linkis-extensions/linkis-io-file-client/src/main/scala/org/apache/linkis/storage/io/iteraceptor/IOMethodInterceptor.scala b/linkis-extensions/linkis-io-file-client/src/main/scala/org/apache/linkis/storage/io/iteraceptor/IOMethodInterceptor.scala index 92feb8a561..b0581967f1 100644 --- a/linkis-extensions/linkis-io-file-client/src/main/scala/org/apache/linkis/storage/io/iteraceptor/IOMethodInterceptor.scala +++ b/linkis-extensions/linkis-io-file-client/src/main/scala/org/apache/linkis/storage/io/iteraceptor/IOMethodInterceptor.scala @@ -40,7 +40,6 @@ import java.net.InetAddress import scala.beans.BeanProperty import scala.collection.JavaConverters._ -import scala.collection.mutable import com.google.gson.reflect.TypeToken @@ -48,7 +47,7 @@ class IOMethodInterceptor(fsType: String) extends MethodInterceptor with Logging @BeanProperty var ioClient: IOClient = _ - private val properties: mutable.HashMap[String, String] = mutable.HashMap[String, String]() + private var properties: java.util.Map[String, String] = new java.util.HashMap[String, String]() private var inited = false @@ -69,7 +68,7 @@ class IOMethodInterceptor(fsType: String) extends MethodInterceptor with Logging label.setJobGroupId(IOClientUtils.generateJobGrupID()) } - def getProxyUser: String = StorageConfiguration.PROXY_USER.getValue(properties.asJava) + def getProxyUser: String = StorageConfiguration.PROXY_USER.getValue(properties) def getCreatorUser: String = StorageUtils.getJvmUser @@ -103,7 +102,7 @@ class IOMethodInterceptor(fsType: String) extends MethodInterceptor with Logging } def initFS(methodName: String = "init"): Unit = { - if (!properties.contains(StorageConfiguration.PROXY_USER.key)) { + if (!properties.containsKey(StorageConfiguration.PROXY_USER.key)) { throw new StorageErrorException(NO_PROXY_USER.getErrorCode, NO_PROXY_USER.getErrorDesc) } bindEngineLabel.setIsJobGroupHead("true") @@ -117,7 +116,7 @@ class IOMethodInterceptor(fsType: String) extends MethodInterceptor with Logging getProxyUser, getLocalIP, methodName, - Array(properties.toMap) + Array(properties) ), bindEngineLabel ) @@ -172,7 +171,7 @@ class IOMethodInterceptor(fsType: String) extends MethodInterceptor with Logging case "init" => case "storageName" => return fsType case "setUser" => - properties += StorageConfiguration.PROXY_USER.key -> args(0).asInstanceOf[String]; + properties.put(StorageConfiguration.PROXY_USER.key, args(0).asInstanceOf[String]); return Unit case _ => if (inited) { @@ -185,23 +184,23 @@ class IOMethodInterceptor(fsType: String) extends MethodInterceptor with Logging method.getName match { case "init" => val user = - if (properties.contains(StorageConfiguration.PROXY_USER.key)) { - StorageConfiguration.PROXY_USER.getValue(properties.toMap) + if (properties.containsKey(StorageConfiguration.PROXY_USER.key)) { + StorageConfiguration.PROXY_USER.getValue(properties) } else { null } if (args.length > 0 && args(0).isInstanceOf[java.util.Map[String, String]]) { - properties ++= args(0).asInstanceOf[java.util.Map[String, String]].asScala + properties = args(0).asInstanceOf[java.util.Map[String, String]] } if (StringUtils.isNoneBlank(user)) { - properties += StorageConfiguration.PROXY_USER.key -> user + properties.put(StorageConfiguration.PROXY_USER.key, user) } initFS() logger.warn(s"For user($user)inited a $fsType storage($id) .") Unit case "fsName" => fsType case "setUser" => - properties += StorageConfiguration.PROXY_USER.key -> args(0).asInstanceOf[String]; Unit + properties.put(StorageConfiguration.PROXY_USER.key, args(0).asInstanceOf[String]); Unit case "read" => if (!inited) throw new IllegalAccessException("storage has not been inited.") new IOInputStream(args) diff --git a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/monitor/EngineConnMonitor.scala b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/monitor/EngineConnMonitor.scala index 751bb2d1b2..33e57a7c22 100644 --- a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/monitor/EngineConnMonitor.scala +++ b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/monitor/EngineConnMonitor.scala @@ -22,7 +22,10 @@ import org.apache.linkis.common.log.LogUtils import org.apache.linkis.common.utils.{Logging, Utils} import org.apache.linkis.governance.common.conf.GovernanceCommonConf import org.apache.linkis.governance.common.entity.{ExecutionNodeStatus, NodeExistStatus} -import org.apache.linkis.governance.common.protocol.engineconn.{RequestEngineStatusBatch, ResponseEngineStatusBatch} +import org.apache.linkis.governance.common.protocol.engineconn.{ + RequestEngineStatusBatch, + ResponseEngineStatusBatch +} import org.apache.linkis.governance.common.utils.GovernanceConstant import org.apache.linkis.manager.common.entity.enumeration.NodeStatus import org.apache.linkis.manager.common.protocol.node.{RequestNodeStatus, ResponseNodeStatus} @@ -31,12 +34,21 @@ import org.apache.linkis.manager.label.entity.engine.EngineTypeLabel import org.apache.linkis.manager.label.utils.LabelUtil import org.apache.linkis.orchestrator.computation.conf.ComputationOrchestratorConf import org.apache.linkis.orchestrator.computation.execute.{CodeExecTaskExecutor, EngineConnTaskInfo} -import org.apache.linkis.orchestrator.listener.task.{TaskErrorResponseEvent, TaskLogEvent, TaskStatusEvent} +import org.apache.linkis.orchestrator.ecm.entity.{Mark, MarkReq} +import org.apache.linkis.orchestrator.ecm.service.impl.ComputationEngineConnExecutor +import org.apache.linkis.orchestrator.listener.task.{ + TaskErrorResponseEvent, + TaskLogEvent, + TaskStatusEvent +} import org.apache.linkis.rpc.Sender -import org.apache.linkis.server.{BDPJettyServerHelper, toJavaMap} +import org.apache.linkis.server.{toJavaMap, BDPJettyServerHelper} + +import org.apache.commons.lang3.StringUtils import java.util import java.util.concurrent.TimeUnit + import scala.collection.JavaConverters._ import scala.collection.mutable import scala.collection.mutable.ArrayBuffer @@ -46,6 +58,8 @@ object EngineConnMonitor extends Logging { private val ENGINECONN_LASTUPDATE_TIMEOUT = ComputationOrchestratorConf.ENGINECONN_LASTUPDATE_TIMEOUT.getValue.toLong + private val engineTypeKey = "engineType" + private[linkis] def addEngineExecutorStatusMonitor( engineConnExecutorCache: util.Map[EngineConnTaskInfo, CodeExecTaskExecutor] ): Unit = { @@ -196,11 +210,19 @@ object EngineConnMonitor extends Logging { val execTask = executor.getExecTask Utils.tryAndError { val labels: Array[Label[_]] = executor.getEngineConnExecutor.getLabels() - val engineTypeKey = "engineType" - val labelArray: Array[Label[_]] = labels.filter(_.getLabelKey.equals(engineTypeKey)) var engineType = "" - if (labelArray != null && labelArray.size > 0) { - engineType = labelArray(0).asInstanceOf[EngineTypeLabel].getEngineType + val mark: Mark = executor.getMark + if (mark != null) { + val req: MarkReq = mark.getMarkReq + if (req != null) { + val engineTypeRef: AnyRef = req.labels.get(engineTypeKey) + if (engineTypeRef != null && engineTypeRef.toString.contains("-")) { + engineType = engineTypeRef.toString.split("-")(0) + } + } + } + if (StringUtils.isEmpty(engineType)) { + engineType = getEngineType(labels) } logger.warn( s"Will kill task ${execTask.getIDInfo()} because the engine ${executor.getEngineConnExecutor.getServiceInstance.toString} quited unexpectedly." @@ -223,6 +245,15 @@ object EngineConnMonitor extends Logging { } } + private def getEngineType(labels: Array[Label[_]]): String = { + val labelArray: Array[Label[_]] = labels.filter(_.getLabelKey.equals(engineTypeKey)) + var engineType = "" + if (labelArray != null && labelArray.size > 0) { + engineType = labelArray(0).asInstanceOf[EngineTypeLabel].getEngineType + } + engineType + } + private def updateExecutorActivityTime( serviceInstance: ServiceInstance, engineConnExecutorCache: mutable.HashMap[ServiceInstance, ArrayBuffer[CodeExecTaskExecutor]]