Skip to content

Commit

Permalink
fix bug
Browse files Browse the repository at this point in the history
  • Loading branch information
casionone committed Oct 9, 2023
1 parent 76a33ad commit bfff23b
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,14 @@ object TaskUtils {
}
} else params.put(key, waitToAdd)

private def clearMap(params: util.Map[String, AnyRef], key: String): Unit =
if (params != null && params.containsKey(key)) {
params.get(key) match {
case map: util.Map[String, AnyRef] => map.clear()
case _ => params.put(key, new util.HashMap[String, AnyRef]())
}
}

private def getConfigurationMap(
params: util.Map[String, AnyRef],
key: String
Expand Down Expand Up @@ -84,13 +92,20 @@ object TaskUtils {
def addStartupMap(params: util.Map[String, AnyRef], startupMap: util.Map[String, AnyRef]): Unit =
addConfigurationMap(params, startupMap, TaskConstant.PARAMS_CONFIGURATION_STARTUP)

def clearStartupMap(params: util.Map[String, AnyRef]): Unit = {
val configurationMap = getMap(params, TaskConstant.PARAMS_CONFIGURATION)
if (!configurationMap.isEmpty) {
clearMap(configurationMap, TaskConstant.PARAMS_CONFIGURATION_STARTUP)
}
}

def addRuntimeMap(params: util.Map[String, AnyRef], runtimeMap: util.Map[String, AnyRef]): Unit =
addConfigurationMap(params, runtimeMap, TaskConstant.PARAMS_CONFIGURATION_RUNTIME)

def addSpecialMap(params: util.Map[String, AnyRef], specialMap: util.Map[String, AnyRef]): Unit =
addConfigurationMap(params, specialMap, TaskConstant.PARAMS_CONFIGURATION_SPECIAL)

// tdoo
// todo
def getLabelsMap(params: util.Map[String, AnyRef]): util.Map[String, AnyRef] =
getMap(params, TaskConstant.LABELS)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,13 @@ import org.apache.linkis.governance.common.paser.CodeParser
import org.apache.linkis.governance.common.protocol.task.{EngineConcurrentInfo, RequestTask}
import org.apache.linkis.governance.common.utils.{JobUtils, LoggerUtils}
import org.apache.linkis.manager.common.entity.enumeration.NodeStatus
import org.apache.linkis.manager.label.entity.engine.UserCreatorLabel
import org.apache.linkis.manager.label.entity.engine.{
CodeLanguageLabel,
EngineType,
EngineTypeLabel,
RunType,
UserCreatorLabel
}
import org.apache.linkis.protocol.engine.JobProgressInfo
import org.apache.linkis.scheduler.executer._

Expand Down Expand Up @@ -183,9 +189,11 @@ abstract class ComputationExecutor(val outputPrintLimit: Int = 1000)
Utils.tryFinally {
transformTaskStatus(engineConnTask, ExecutionNodeStatus.Running)
val engineExecutionContext = createEngineExecutionContext(engineConnTask)

val engineCreationContext = EngineConnObject.getEngineCreationContext

var hookedCode = engineConnTask.getCode
Utils.tryCatch {
val engineCreationContext = EngineConnObject.getEngineCreationContext
ComputationExecutorHook.getComputationExecutorHooks.foreach(hook => {
hookedCode =
hook.beforeExecutorExecute(engineExecutionContext, engineCreationContext, hookedCode)
Expand All @@ -196,12 +204,28 @@ abstract class ComputationExecutor(val outputPrintLimit: Int = 1000)
} else {
logger.info(s"hooked after code: $hookedCode ")
}

// task params log
// spark engine: at org.apache.linkis.engineplugin.spark.executor.SparkEngineConnExecutor.executeLine log special conf
Utils.tryAndWarn {
var engineType = ""
engineCreationContext.getLabels().asScala.find(_.isInstanceOf[EngineTypeLabel]) match {
case Some(engineTypeLabel) =>
engineType = engineTypeLabel.asInstanceOf[EngineTypeLabel].getEngineType
}
EngineType.mapStringToEngineType(engineType) match {
case EngineType.HIVE | EngineType.TRINO => printTaskParamsLog(engineExecutionContext)
case _ =>
}
}

val localPath = EngineConnConf.getLogDir
engineExecutionContext.appendStdout(
LogUtils.generateInfo(
s"EngineConn local log path: ${DataWorkCloudApplication.getServiceInstance.toString} $localPath"
)
)

var response: ExecuteResponse = null
val incomplete = new StringBuilder
val codes =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,16 +223,8 @@ object TemplateConfUtils extends Logging {
s"Can not get any template conf data with template uid:$templateUuid\n"
)
)
} else {
val onceLabel =
LabelBuilderFactoryContext.getLabelBuilderFactory.createLabel(
classOf[ExecuteOnceLabel]
)
logger.info("Add once label for task id:{}", requestPersistTask.getId.toString)
requestPersistTask.getLabels.add(onceLabel)
}
}

} else {
logger.info("Try to get template conf list with template name:{} ", templateName)
logAppender.append(
Expand All @@ -246,6 +238,16 @@ object TemplateConfUtils extends Logging {
s"Can not get any template conf data with template name:$templateName\n"
)
)
} else {
// to remove metedata start param
TaskUtils.clearStartupMap(params)

val onceLabel =
LabelBuilderFactoryContext.getLabelBuilderFactory.createLabel(
classOf[ExecuteOnceLabel]
)
logger.info("Add once label for task id:{}", requestPersistTask.getId.toString)
requestPersistTask.getLabels.add(onceLabel)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,6 @@ class HiveEngineConnExecutor(
hiveConf.set("mapreduce.job.tags", s"LINKIS_$jobId")
}

printTaskParamsLog(engineExecutorContext)

if (realCode.trim.length > 500) {
engineExecutorContext.appendStdout(s"$getId >> ${realCode.trim.substring(0, 500)} ...")
} else engineExecutorContext.appendStdout(s"$getId >> ${realCode.trim}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,6 @@ class TrinoEngineConnExecutor(override val outputPrintLimit: Int, val id: Int)
code.trim
}

printTaskParamsLog(engineExecutorContext)

TrinoCode.checkCode(realCode)
logger.info(s"trino client begins to run psql code:\n $realCode")
val jobId = JobUtils.getJobIdFromMap(engineExecutorContext.getProperties)
Expand Down

0 comments on commit bfff23b

Please sign in to comment.