Skip to content

Commit

Permalink
Merge pull request #304 from WeDataSphere/dev-1.1.16-webank-xc
Browse files Browse the repository at this point in the history
support code set conf template name
  • Loading branch information
casionone authored Oct 8, 2023
2 parents 5950d61 + dd6c0c4 commit e565f30
Show file tree
Hide file tree
Showing 13 changed files with 299 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,9 @@ class ConfigurationTest {
Assertions.assertFalse(Configuration.isAdmin("HaDooop"))
}


@Test private[conf] def testFormatValue(): Unit = {
val confvalue = CommonVars[Int]("linkis.test.error.conf", 456).getValue
Assertions.assertTrue(123 == confvalue)
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,13 @@ public class TemplateConfRequest implements RequestProtocol {

private String templateUuid;

private String templateName;

public TemplateConfRequest(String templateUuid, String templateName) {
this.templateUuid = templateUuid;
this.templateName = templateName;
}

public TemplateConfRequest(String templateUuid) {
this.templateUuid = templateUuid;
}
Expand All @@ -34,4 +41,12 @@ public String getTemplateUuid() {
public void setTemplateUuid(String templateUuid) {
this.templateUuid = templateUuid;
}

public String getTemplateName() {
return templateName;
}

public void setTemplateName(String templateName) {
this.templateName = templateName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ import org.apache.commons.lang3.exception.ExceptionUtils
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger

import scala.collection.JavaConverters._

import com.google.common.cache.{Cache, CacheBuilder}

abstract class ComputationExecutor(val outputPrintLimit: Int = 1000)
Expand Down Expand Up @@ -354,6 +356,27 @@ abstract class ComputationExecutor(val outputPrintLimit: Int = 1000)
}
}

/**
* job task log print task params info
*
* @param engineExecutorContext
* @return
* Unit
*/

def printTaskParamsLog(engineExecutorContext: EngineExecutionContext): Unit = {
val sb = new StringBuilder

EngineConnObject.getEngineCreationContext.getOptions.asScala.foreach({ case (key, value) =>
sb.append(s"${key}=${value.toString}\n")
})

sb.append("\n")
engineExecutorContext.appendStdout(
LogUtils.generateInfo(s" Your job exec with configs:\n${sb.toString()}\n")
)
}

def transformTaskStatus(task: EngineConnTask, newStatus: ExecutionNodeStatus): Unit = {
val oriStatus = task.getStatus
logger.info(s"task ${task.getTaskId} from status $oriStatus to new status $newStatus")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class TemplateConfInterceptor extends EntranceInterceptor {

override def apply(jobRequest: JobRequest, logAppender: lang.StringBuilder): JobRequest = {
if (EntranceConfiguration.TEMPLATE_CONF_SWITCH.getValue) {
TemplateConfUtils.dealWithStartParams(jobRequest, logAppender)
TemplateConfUtils.dealWithTemplateConf(jobRequest, logAppender)
} else {
jobRequest
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,16 @@
package org.apache.linkis.entrance.interceptor.impl

import org.apache.linkis.common.conf.Configuration
import org.apache.linkis.common.utils.{Logging, Utils}
import org.apache.linkis.common.exception.LinkisCommonErrorException
import org.apache.linkis.common.log.LogUtils
import org.apache.linkis.common.utils.{CodeAndRunTypeUtils, Logging, Utils}
import org.apache.linkis.governance.common.entity.TemplateConfKey
import org.apache.linkis.governance.common.entity.job.JobRequest
import org.apache.linkis.governance.common.protocol.conf.{TemplateConfRequest, TemplateConfResponse}
import org.apache.linkis.manager.label.builder.factory.LabelBuilderFactoryContext
import org.apache.linkis.manager.label.constant.LabelKeyConstant
import org.apache.linkis.manager.label.entity.entrance.ExecuteOnceLabel
import org.apache.linkis.manager.label.utils.LabelUtil
import org.apache.linkis.protocol.utils.TaskUtils
import org.apache.linkis.rpc.Sender

Expand All @@ -37,6 +42,8 @@ import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache}

object TemplateConfUtils extends Logging {

val confTemplateNameKey = "ec.resource.name"

private val templateCache: LoadingCache[String, util.List[TemplateConfKey]] = CacheBuilder
.newBuilder()
.maximumSize(1000)
Expand Down Expand Up @@ -69,42 +76,198 @@ object TemplateConfUtils extends Logging {

})

def dealWithStartParams(jobRequest: JobRequest, logAppender: lang.StringBuilder): JobRequest = {
private val templateCacheName: LoadingCache[String, util.List[TemplateConfKey]] = CacheBuilder
.newBuilder()
.maximumSize(1000)
.expireAfterWrite(5, TimeUnit.MINUTES)
.build(new CacheLoader[String, util.List[TemplateConfKey]]() {

override def load(templateName: String): util.List[TemplateConfKey] = {
var templateList = Utils.tryAndWarn {
val sender: Sender = Sender
.getSender(Configuration.CLOUD_CONSOLE_CONFIGURATION_SPRING_APPLICATION_NAME.getValue)

logger.info(s"load template configuration data templateName:$templateName")
val res = sender.ask(new TemplateConfRequest(null, templateName)) match {
case response: TemplateConfResponse =>
logger
.debug(s"${response.getList()}")
response.getList
case _ =>
logger
.warn(s"load template configuration data templateName:$templateName loading failed")
new util.ArrayList[TemplateConfKey](0)
}
res
}

if (templateList.size() == 0) {
logger.warn(s"template configuration data loading failed, plaese check warn log")
}
templateList
}

})

/**
* Get user-defined template conf name value
*
* @param code
* :code
* @param languageType
* :SQL,PYTHON
* @return
*/
def getCustomTemplateConfName(code: String, languageType: String): String = {
var templateConfName = "";

var varString: String = null
var errString: String = null
var rightVarString: String = null

languageType match {
case CodeAndRunTypeUtils.LANGUAGE_TYPE_SQL =>
varString = s"""\\s*---@set ${confTemplateNameKey}=\\s*.+\\s*"""
rightVarString = s"""^\\s*---@set ${confTemplateNameKey}=\\s*.+\\s*"""
errString = """\s*---@.*"""
case CodeAndRunTypeUtils.LANGUAGE_TYPE_PYTHON | CodeAndRunTypeUtils.LANGUAGE_TYPE_SHELL =>
varString = s"""\\s*##@set ${confTemplateNameKey}=\\s*.+\\s*"""
rightVarString = s"""^\\s*##@set ${confTemplateNameKey}=\\s*.+\\s*"""
errString = """\s*##@"""
case CodeAndRunTypeUtils.LANGUAGE_TYPE_SCALA =>
varString = s"""\\s*///@set ${confTemplateNameKey}=\\s*.+\\s*"""
rightVarString = s"""^\\s*///@set ${confTemplateNameKey}=\\s*.+\\s*"""
errString = """\s*///@.+"""
case _ =>
return templateConfName
}

val customRegex = varString.r.unanchored
val customRightRegex = rightVarString.r.unanchored
val errRegex = errString.r.unanchored
code.split("\n").foreach { str =>
{

if (customRightRegex.unapplySeq(str).size < customRegex.unapplySeq(str).size) {
logger.warn(s"code:$str is wrong custom template conf name variable format!!!")
}
str match {
case customRegex() =>
val clearStr = if (str.endsWith(";")) str.substring(0, str.length - 1) else str
val res: Array[String] = clearStr.split("=")
if (res != null && res.length == 2) {
templateConfName = res(1).trim
logger.info(s"get template conf name $templateConfName")
} else {
if (res.length > 2) {
throw new LinkisCommonErrorException(
20044,
s"$str template conf name var defined uncorrectly"
)
} else {
throw new LinkisCommonErrorException(
20045,
s"template conf name var was defined uncorrectly:$str"
)
}
}
case errRegex() =>
logger.warn(
s"The template conf name var definition is incorrect:$str,if it is not used, it will not run the error, but it is recommended to use the correct specification to define"
)
case _ =>
}
}
}
templateConfName
}

def dealWithTemplateConf(jobRequest: JobRequest, logAppender: lang.StringBuilder): JobRequest = {
jobRequest match {
case requestPersistTask: JobRequest =>
val params = requestPersistTask.getParams
val startMap = TaskUtils.getStartupMap(params)
logger.info("jobRequest startMap params :{} ", startMap)
val templateUuid = startMap.getOrDefault(LabelKeyConstant.TEMPLATE_CONF_KEY, "").toString
if (StringUtils.isBlank(templateUuid)) {
logger.debug("jobRequest startMap param template id is empty")
} else {
logger.info("try to get template conf list with templateUid:{} ", templateUuid)
logAppender.append(s"try to get template conf list with templateUid:$templateUuid")
val templateConflist = templateCache.get(templateUuid)
if (templateConflist != null && templateConflist.size() > 0) {
val keyList = new util.HashMap[String, AnyRef]()
templateConflist.asScala.foreach(ele => {
val key = ele.getKey
val oldValue = startMap.get(key)
if (oldValue != null && StringUtils.isNotBlank(oldValue.toString)) {
logger.info(s"key:$key value:$oldValue not empty, skip to deal")
} else {
val newValue = ele.getConfigValue
logger.info(s"key:$key value:$newValue will add to startMap params")
if (TaskUtils.isWithDebugInfo(params)) {
logAppender.append(s"add $key=$newValue\n")
}
keyList.put(key, newValue)
}

})
if (keyList.size() > 0) {
TaskUtils.addStartupMap(params, keyList)
var templateConflist: util.List[TemplateConfKey] = new util.ArrayList[TemplateConfKey]()
var templateName: String = ""
// only for Creator:IDE, try to get template conf name from code string. eg:---@set ec.resource.name=xxxx
val (user, creator) = LabelUtil.getUserCreator(jobRequest.getLabels)
if ("IDE".equals(creator)) {
val codeType = LabelUtil.getCodeType(jobRequest.getLabels)
templateName =
TemplateConfUtils.getCustomTemplateConfName(jobRequest.getExecutionCode, codeType)
}

// code template name > start params template uuid
if (StringUtils.isBlank(templateName)) {
logger.debug("jobRequest startMap param template name is empty")

logger.info("jobRequest startMap params :{} ", startMap)
val templateUuid = startMap.getOrDefault(LabelKeyConstant.TEMPLATE_CONF_KEY, "").toString

if (StringUtils.isBlank(templateUuid)) {
logger.debug("jobRequest startMap param template id is empty")
} else {
logger.info("try to get template conf list with template uid:{} ", templateUuid)
logAppender.append(
LogUtils
.generateInfo(s"try to get template conf data with template uid:$templateUuid\nn")
)
templateConflist = templateCache.get(templateUuid)
if (templateConflist == null || templateConflist.size() == 0) {
logAppender.append(
LogUtils.generateWarn(
s"can not get any template conf data with template uid:$templateUuid\n"
)
)
} else {
val onceLabel =
LabelBuilderFactoryContext.getLabelBuilderFactory.createLabel(
classOf[ExecuteOnceLabel]
)
logger.info("add once label for task id:{}", requestPersistTask.getId.toString)
requestPersistTask.getLabels.add(onceLabel)
}
}

} else {
logger.info("try to get template conf list with template name:{} ", templateName)
logAppender.append(
LogUtils
.generateInfo(s"try to get template conf data with template name:$templateName\n")
)
templateConflist = templateCacheName.get(templateName)
if (templateConflist == null || templateConflist.size() == 0) {
logAppender.append(
LogUtils.generateWarn(
s"can not get any template conf data with template name:$templateName\n"
)
)
}
}

if (templateConflist != null && templateConflist.size() > 0) {
val keyList = new util.HashMap[String, AnyRef]()
templateConflist.asScala.foreach(ele => {
val key = ele.getKey
val oldValue = startMap.get(key)
if (oldValue != null && StringUtils.isNotBlank(oldValue.toString)) {
logger.info(s"key:$key value:$oldValue not empty, skip to deal")
} else {
val newValue = ele.getConfigValue
logger.info(s"key:$key value:$newValue will add to startMap params")
if (TaskUtils.isWithDebugInfo(params)) {
logAppender.append(LogUtils.generateInfo(s"add $key=$newValue\n"))
}
keyList.put(key, newValue)
}

})
if (keyList.size() > 0) {
TaskUtils.addStartupMap(params, keyList)
}
}

case _ =>
}
jobRequest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,7 @@ private Message executeECMOperation(
ECResourceInfoRecord ecResourceInfoRecord =
ecResourceInfoService.getECResourceInfoRecordByInstance(engineInstance);
if (Objects.isNull(ecResourceInfoRecord)) {
return Message.error("ECM instance: " + ecmNode.getServiceInstance() + " not exist ");
return Message.error("EC instance: " + engineInstance + " not exist ");
}
// eg logDirSuffix -> root/20230705/io_file/6d48068a-0e1e-44b5-8eb2-835034db5b30/logs
String logDirSuffix = ecResourceInfoRecord.getLogDirSuffix();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,5 +66,7 @@ public class LabelKeyConstant {

public static final String TEMPLATE_CONF_KEY = "ec.conf.templateId";

public static final String TEMPLATE_CONF_NAME_KEY = "ec.resource.name";

public static final String MANAGER_KEY = "manager";
}
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,9 @@ class HiveEngineConnExecutor(
LOG.info(s"set mapreduce.job.tags=LINKIS_$jobId")
hiveConf.set("mapreduce.job.tags", s"LINKIS_$jobId")
}

printTaskParamsLog(engineExecutorContext)

if (realCode.trim.length > 500) {
engineExecutorContext.appendStdout(s"$getId >> ${realCode.trim.substring(0, 500)} ...")
} else engineExecutorContext.appendStdout(s"$getId >> ${realCode.trim}")
Expand Down
Loading

0 comments on commit e565f30

Please sign in to comment.