Skip to content

Commit

Permalink
fix map to json error
Browse files Browse the repository at this point in the history
  • Loading branch information
aiceflower committed Dec 23, 2024
1 parent e7f4ff1 commit ab849c7
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,14 @@ import java.net.InetAddress

import scala.beans.BeanProperty
import scala.collection.JavaConverters._
import scala.collection.mutable

import com.google.gson.reflect.TypeToken

class IOMethodInterceptor(fsType: String) extends MethodInterceptor with Logging {

@BeanProperty var ioClient: IOClient = _

private val properties: mutable.HashMap[String, String] = mutable.HashMap[String, String]()
private var properties: java.util.Map[String, String] = new java.util.HashMap[String, String]()

private var inited = false

Expand All @@ -69,7 +68,7 @@ class IOMethodInterceptor(fsType: String) extends MethodInterceptor with Logging
label.setJobGroupId(IOClientUtils.generateJobGrupID())
}

def getProxyUser: String = StorageConfiguration.PROXY_USER.getValue(properties.asJava)
def getProxyUser: String = StorageConfiguration.PROXY_USER.getValue(properties)

def getCreatorUser: String = StorageUtils.getJvmUser

Expand Down Expand Up @@ -103,7 +102,7 @@ class IOMethodInterceptor(fsType: String) extends MethodInterceptor with Logging
}

def initFS(methodName: String = "init"): Unit = {
if (!properties.contains(StorageConfiguration.PROXY_USER.key)) {
if (!properties.containsKey(StorageConfiguration.PROXY_USER.key)) {
throw new StorageErrorException(NO_PROXY_USER.getErrorCode, NO_PROXY_USER.getErrorDesc)
}
bindEngineLabel.setIsJobGroupHead("true")
Expand All @@ -117,7 +116,7 @@ class IOMethodInterceptor(fsType: String) extends MethodInterceptor with Logging
getProxyUser,
getLocalIP,
methodName,
Array(properties.toMap)
Array(properties)
),
bindEngineLabel
)
Expand Down Expand Up @@ -172,7 +171,7 @@ class IOMethodInterceptor(fsType: String) extends MethodInterceptor with Logging
case "init" =>
case "storageName" => return fsType
case "setUser" =>
properties += StorageConfiguration.PROXY_USER.key -> args(0).asInstanceOf[String];
properties.put(StorageConfiguration.PROXY_USER.key, args(0).asInstanceOf[String]);
return Unit
case _ =>
if (inited) {
Expand All @@ -185,23 +184,23 @@ class IOMethodInterceptor(fsType: String) extends MethodInterceptor with Logging
method.getName match {
case "init" =>
val user =
if (properties.contains(StorageConfiguration.PROXY_USER.key)) {
StorageConfiguration.PROXY_USER.getValue(properties.toMap)
if (properties.containsKey(StorageConfiguration.PROXY_USER.key)) {
StorageConfiguration.PROXY_USER.getValue(properties)
} else {
null
}
if (args.length > 0 && args(0).isInstanceOf[java.util.Map[String, String]]) {
properties ++= args(0).asInstanceOf[java.util.Map[String, String]].asScala
properties = args(0).asInstanceOf[java.util.Map[String, String]]
}
if (StringUtils.isNoneBlank(user)) {
properties += StorageConfiguration.PROXY_USER.key -> user
properties.put(StorageConfiguration.PROXY_USER.key, user)
}
initFS()
logger.warn(s"For user($user)inited a $fsType storage($id) .")
Unit
case "fsName" => fsType
case "setUser" =>
properties += StorageConfiguration.PROXY_USER.key -> args(0).asInstanceOf[String]; Unit
properties.put(StorageConfiguration.PROXY_USER.key, args(0).asInstanceOf[String]); Unit
case "read" =>
if (!inited) throw new IllegalAccessException("storage has not been inited.")
new IOInputStream(args)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@ import org.apache.linkis.common.log.LogUtils
import org.apache.linkis.common.utils.{Logging, Utils}
import org.apache.linkis.governance.common.conf.GovernanceCommonConf
import org.apache.linkis.governance.common.entity.{ExecutionNodeStatus, NodeExistStatus}
import org.apache.linkis.governance.common.protocol.engineconn.{RequestEngineStatusBatch, ResponseEngineStatusBatch}
import org.apache.linkis.governance.common.protocol.engineconn.{
RequestEngineStatusBatch,
ResponseEngineStatusBatch
}
import org.apache.linkis.governance.common.utils.GovernanceConstant
import org.apache.linkis.manager.common.entity.enumeration.NodeStatus
import org.apache.linkis.manager.common.protocol.node.{RequestNodeStatus, ResponseNodeStatus}
Expand All @@ -31,12 +34,21 @@ import org.apache.linkis.manager.label.entity.engine.EngineTypeLabel
import org.apache.linkis.manager.label.utils.LabelUtil
import org.apache.linkis.orchestrator.computation.conf.ComputationOrchestratorConf
import org.apache.linkis.orchestrator.computation.execute.{CodeExecTaskExecutor, EngineConnTaskInfo}
import org.apache.linkis.orchestrator.listener.task.{TaskErrorResponseEvent, TaskLogEvent, TaskStatusEvent}
import org.apache.linkis.orchestrator.ecm.entity.{Mark, MarkReq}
import org.apache.linkis.orchestrator.ecm.service.impl.ComputationEngineConnExecutor
import org.apache.linkis.orchestrator.listener.task.{
TaskErrorResponseEvent,
TaskLogEvent,
TaskStatusEvent
}
import org.apache.linkis.rpc.Sender
import org.apache.linkis.server.{BDPJettyServerHelper, toJavaMap}
import org.apache.linkis.server.{toJavaMap, BDPJettyServerHelper}

import org.apache.commons.lang3.StringUtils

import java.util
import java.util.concurrent.TimeUnit

import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
Expand All @@ -46,6 +58,8 @@ object EngineConnMonitor extends Logging {
private val ENGINECONN_LASTUPDATE_TIMEOUT =
ComputationOrchestratorConf.ENGINECONN_LASTUPDATE_TIMEOUT.getValue.toLong

private val engineTypeKey = "engineType"

private[linkis] def addEngineExecutorStatusMonitor(
engineConnExecutorCache: util.Map[EngineConnTaskInfo, CodeExecTaskExecutor]
): Unit = {
Expand Down Expand Up @@ -196,11 +210,19 @@ object EngineConnMonitor extends Logging {
val execTask = executor.getExecTask
Utils.tryAndError {
val labels: Array[Label[_]] = executor.getEngineConnExecutor.getLabels()
val engineTypeKey = "engineType"
val labelArray: Array[Label[_]] = labels.filter(_.getLabelKey.equals(engineTypeKey))
var engineType = ""
if (labelArray != null && labelArray.size > 0) {
engineType = labelArray(0).asInstanceOf[EngineTypeLabel].getEngineType
val mark: Mark = executor.getMark
if (mark != null) {
val req: MarkReq = mark.getMarkReq
if (req != null) {
val engineTypeRef: AnyRef = req.labels.get(engineTypeKey)
if (engineTypeRef != null && engineTypeRef.toString.contains("-")) {
engineType = engineTypeRef.toString.split("-")(0)
}
}
}
if (StringUtils.isEmpty(engineType)) {
engineType = getEngineType(labels)
}
logger.warn(
s"Will kill task ${execTask.getIDInfo()} because the engine ${executor.getEngineConnExecutor.getServiceInstance.toString} quited unexpectedly."
Expand All @@ -223,6 +245,15 @@ object EngineConnMonitor extends Logging {
}
}

private def getEngineType(labels: Array[Label[_]]): String = {
val labelArray: Array[Label[_]] = labels.filter(_.getLabelKey.equals(engineTypeKey))
var engineType = ""
if (labelArray != null && labelArray.size > 0) {
engineType = labelArray(0).asInstanceOf[EngineTypeLabel].getEngineType
}
engineType
}

private def updateExecutorActivityTime(
serviceInstance: ServiceInstance,
engineConnExecutorCache: mutable.HashMap[ServiceInstance, ArrayBuffer[CodeExecTaskExecutor]]
Expand Down

0 comments on commit ab849c7

Please sign in to comment.