Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support code set conf template name #304

Merged
merged 3 commits into from
Oct 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,9 @@ class ConfigurationTest {
Assertions.assertFalse(Configuration.isAdmin("HaDooop"))
}


@Test private[conf] def testFormatValue(): Unit = {
val confvalue = CommonVars[Int]("linkis.test.error.conf", 456).getValue
Assertions.assertTrue(123 == confvalue)
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,13 @@ public class TemplateConfRequest implements RequestProtocol {

private String templateUuid;

private String templateName;

public TemplateConfRequest(String templateUuid, String templateName) {
this.templateUuid = templateUuid;
this.templateName = templateName;
}

public TemplateConfRequest(String templateUuid) {
this.templateUuid = templateUuid;
}
Expand All @@ -34,4 +41,12 @@ public String getTemplateUuid() {
public void setTemplateUuid(String templateUuid) {
this.templateUuid = templateUuid;
}

public String getTemplateName() {
return templateName;
}

public void setTemplateName(String templateName) {
this.templateName = templateName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ import org.apache.commons.lang3.exception.ExceptionUtils
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger

import scala.collection.JavaConverters._

import com.google.common.cache.{Cache, CacheBuilder}

abstract class ComputationExecutor(val outputPrintLimit: Int = 1000)
Expand Down Expand Up @@ -354,6 +356,27 @@ abstract class ComputationExecutor(val outputPrintLimit: Int = 1000)
}
}

/**
* job task log print task params info
*
* @param engineExecutorContext
* @return
* Unit
*/

def printTaskParamsLog(engineExecutorContext: EngineExecutionContext): Unit = {
val sb = new StringBuilder

EngineConnObject.getEngineCreationContext.getOptions.asScala.foreach({ case (key, value) =>
sb.append(s"${key}=${value.toString}\n")
})

sb.append("\n")
engineExecutorContext.appendStdout(
LogUtils.generateInfo(s" Your job exec with configs:\n${sb.toString()}\n")
)
}

def transformTaskStatus(task: EngineConnTask, newStatus: ExecutionNodeStatus): Unit = {
val oriStatus = task.getStatus
logger.info(s"task ${task.getTaskId} from status $oriStatus to new status $newStatus")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class TemplateConfInterceptor extends EntranceInterceptor {

override def apply(jobRequest: JobRequest, logAppender: lang.StringBuilder): JobRequest = {
if (EntranceConfiguration.TEMPLATE_CONF_SWITCH.getValue) {
TemplateConfUtils.dealWithStartParams(jobRequest, logAppender)
TemplateConfUtils.dealWithTemplateConf(jobRequest, logAppender)
} else {
jobRequest
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,16 @@
package org.apache.linkis.entrance.interceptor.impl

import org.apache.linkis.common.conf.Configuration
import org.apache.linkis.common.utils.{Logging, Utils}
import org.apache.linkis.common.exception.LinkisCommonErrorException
import org.apache.linkis.common.log.LogUtils
import org.apache.linkis.common.utils.{CodeAndRunTypeUtils, Logging, Utils}
import org.apache.linkis.governance.common.entity.TemplateConfKey
import org.apache.linkis.governance.common.entity.job.JobRequest
import org.apache.linkis.governance.common.protocol.conf.{TemplateConfRequest, TemplateConfResponse}
import org.apache.linkis.manager.label.builder.factory.LabelBuilderFactoryContext
import org.apache.linkis.manager.label.constant.LabelKeyConstant
import org.apache.linkis.manager.label.entity.entrance.ExecuteOnceLabel
import org.apache.linkis.manager.label.utils.LabelUtil
import org.apache.linkis.protocol.utils.TaskUtils
import org.apache.linkis.rpc.Sender

Expand All @@ -37,6 +42,8 @@ import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache}

object TemplateConfUtils extends Logging {

val confTemplateNameKey = "ec.resource.name"

private val templateCache: LoadingCache[String, util.List[TemplateConfKey]] = CacheBuilder
.newBuilder()
.maximumSize(1000)
Expand Down Expand Up @@ -69,42 +76,198 @@ object TemplateConfUtils extends Logging {

})

def dealWithStartParams(jobRequest: JobRequest, logAppender: lang.StringBuilder): JobRequest = {
private val templateCacheName: LoadingCache[String, util.List[TemplateConfKey]] = CacheBuilder
.newBuilder()
.maximumSize(1000)
.expireAfterWrite(5, TimeUnit.MINUTES)
.build(new CacheLoader[String, util.List[TemplateConfKey]]() {

override def load(templateName: String): util.List[TemplateConfKey] = {
var templateList = Utils.tryAndWarn {
val sender: Sender = Sender
.getSender(Configuration.CLOUD_CONSOLE_CONFIGURATION_SPRING_APPLICATION_NAME.getValue)

logger.info(s"load template configuration data templateName:$templateName")
val res = sender.ask(new TemplateConfRequest(null, templateName)) match {
case response: TemplateConfResponse =>
logger
.debug(s"${response.getList()}")
response.getList
case _ =>
logger
.warn(s"load template configuration data templateName:$templateName loading failed")
new util.ArrayList[TemplateConfKey](0)
}
res
}

if (templateList.size() == 0) {
logger.warn(s"template configuration data loading failed, plaese check warn log")
}
templateList
}

})

/**
* Get user-defined template conf name value
*
* @param code
* :code
* @param languageType
* :SQL,PYTHON
* @return
*/
def getCustomTemplateConfName(code: String, languageType: String): String = {
var templateConfName = "";

var varString: String = null
var errString: String = null
var rightVarString: String = null

languageType match {
case CodeAndRunTypeUtils.LANGUAGE_TYPE_SQL =>
varString = s"""\\s*---@set ${confTemplateNameKey}=\\s*.+\\s*"""
rightVarString = s"""^\\s*---@set ${confTemplateNameKey}=\\s*.+\\s*"""
errString = """\s*---@.*"""
case CodeAndRunTypeUtils.LANGUAGE_TYPE_PYTHON | CodeAndRunTypeUtils.LANGUAGE_TYPE_SHELL =>
varString = s"""\\s*##@set ${confTemplateNameKey}=\\s*.+\\s*"""
rightVarString = s"""^\\s*##@set ${confTemplateNameKey}=\\s*.+\\s*"""
errString = """\s*##@"""
case CodeAndRunTypeUtils.LANGUAGE_TYPE_SCALA =>
varString = s"""\\s*///@set ${confTemplateNameKey}=\\s*.+\\s*"""
rightVarString = s"""^\\s*///@set ${confTemplateNameKey}=\\s*.+\\s*"""
errString = """\s*///@.+"""
case _ =>
return templateConfName
}

val customRegex = varString.r.unanchored
val customRightRegex = rightVarString.r.unanchored
val errRegex = errString.r.unanchored
code.split("\n").foreach { str =>
{

if (customRightRegex.unapplySeq(str).size < customRegex.unapplySeq(str).size) {
logger.warn(s"code:$str is wrong custom template conf name variable format!!!")
}
str match {
case customRegex() =>
val clearStr = if (str.endsWith(";")) str.substring(0, str.length - 1) else str
val res: Array[String] = clearStr.split("=")
if (res != null && res.length == 2) {
templateConfName = res(1).trim
logger.info(s"get template conf name $templateConfName")
} else {
if (res.length > 2) {
throw new LinkisCommonErrorException(
20044,
s"$str template conf name var defined uncorrectly"
)
} else {
throw new LinkisCommonErrorException(
20045,
s"template conf name var was defined uncorrectly:$str"
)
}
}
case errRegex() =>
logger.warn(
s"The template conf name var definition is incorrect:$str,if it is not used, it will not run the error, but it is recommended to use the correct specification to define"
)
case _ =>
}
}
}
templateConfName
}

def dealWithTemplateConf(jobRequest: JobRequest, logAppender: lang.StringBuilder): JobRequest = {
jobRequest match {
case requestPersistTask: JobRequest =>
val params = requestPersistTask.getParams
val startMap = TaskUtils.getStartupMap(params)
logger.info("jobRequest startMap params :{} ", startMap)
val templateUuid = startMap.getOrDefault(LabelKeyConstant.TEMPLATE_CONF_KEY, "").toString
if (StringUtils.isBlank(templateUuid)) {
logger.debug("jobRequest startMap param template id is empty")
} else {
logger.info("try to get template conf list with templateUid:{} ", templateUuid)
logAppender.append(s"try to get template conf list with templateUid:$templateUuid")
val templateConflist = templateCache.get(templateUuid)
if (templateConflist != null && templateConflist.size() > 0) {
val keyList = new util.HashMap[String, AnyRef]()
templateConflist.asScala.foreach(ele => {
val key = ele.getKey
val oldValue = startMap.get(key)
if (oldValue != null && StringUtils.isNotBlank(oldValue.toString)) {
logger.info(s"key:$key value:$oldValue not empty, skip to deal")
} else {
val newValue = ele.getConfigValue
logger.info(s"key:$key value:$newValue will add to startMap params")
if (TaskUtils.isWithDebugInfo(params)) {
logAppender.append(s"add $key=$newValue\n")
}
keyList.put(key, newValue)
}

})
if (keyList.size() > 0) {
TaskUtils.addStartupMap(params, keyList)
var templateConflist: util.List[TemplateConfKey] = new util.ArrayList[TemplateConfKey]()
var templateName: String = ""
// only for Creator:IDE, try to get template conf name from code string. eg:---@set ec.resource.name=xxxx
val (user, creator) = LabelUtil.getUserCreator(jobRequest.getLabels)
if ("IDE".equals(creator)) {
val codeType = LabelUtil.getCodeType(jobRequest.getLabels)
templateName =
TemplateConfUtils.getCustomTemplateConfName(jobRequest.getExecutionCode, codeType)
}

// code template name > start params template uuid
if (StringUtils.isBlank(templateName)) {
logger.debug("jobRequest startMap param template name is empty")

logger.info("jobRequest startMap params :{} ", startMap)
val templateUuid = startMap.getOrDefault(LabelKeyConstant.TEMPLATE_CONF_KEY, "").toString

if (StringUtils.isBlank(templateUuid)) {
logger.debug("jobRequest startMap param template id is empty")
} else {
logger.info("try to get template conf list with template uid:{} ", templateUuid)
logAppender.append(
LogUtils
.generateInfo(s"try to get template conf data with template uid:$templateUuid\nn")
)
templateConflist = templateCache.get(templateUuid)
if (templateConflist == null || templateConflist.size() == 0) {
logAppender.append(
LogUtils.generateWarn(
s"can not get any template conf data with template uid:$templateUuid\n"
)
)
} else {
val onceLabel =
LabelBuilderFactoryContext.getLabelBuilderFactory.createLabel(
classOf[ExecuteOnceLabel]
)
logger.info("add once label for task id:{}", requestPersistTask.getId.toString)
requestPersistTask.getLabels.add(onceLabel)
}
}

} else {
logger.info("try to get template conf list with template name:{} ", templateName)
logAppender.append(
LogUtils
.generateInfo(s"try to get template conf data with template name:$templateName\n")
)
templateConflist = templateCacheName.get(templateName)
if (templateConflist == null || templateConflist.size() == 0) {
logAppender.append(
LogUtils.generateWarn(
s"can not get any template conf data with template name:$templateName\n"
)
)
}
}

if (templateConflist != null && templateConflist.size() > 0) {
val keyList = new util.HashMap[String, AnyRef]()
templateConflist.asScala.foreach(ele => {
val key = ele.getKey
val oldValue = startMap.get(key)
if (oldValue != null && StringUtils.isNotBlank(oldValue.toString)) {
logger.info(s"key:$key value:$oldValue not empty, skip to deal")
} else {
val newValue = ele.getConfigValue
logger.info(s"key:$key value:$newValue will add to startMap params")
if (TaskUtils.isWithDebugInfo(params)) {
logAppender.append(LogUtils.generateInfo(s"add $key=$newValue\n"))
}
keyList.put(key, newValue)
}

})
if (keyList.size() > 0) {
TaskUtils.addStartupMap(params, keyList)
}
}

case _ =>
}
jobRequest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,7 @@ private Message executeECMOperation(
ECResourceInfoRecord ecResourceInfoRecord =
ecResourceInfoService.getECResourceInfoRecordByInstance(engineInstance);
if (Objects.isNull(ecResourceInfoRecord)) {
return Message.error("ECM instance: " + ecmNode.getServiceInstance() + " not exist ");
return Message.error("EC instance: " + engineInstance + " not exist ");
}
// eg logDirSuffix -> root/20230705/io_file/6d48068a-0e1e-44b5-8eb2-835034db5b30/logs
String logDirSuffix = ecResourceInfoRecord.getLogDirSuffix();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,5 +66,7 @@ public class LabelKeyConstant {

public static final String TEMPLATE_CONF_KEY = "ec.conf.templateId";

public static final String TEMPLATE_CONF_NAME_KEY = "ec.resource.name";

public static final String MANAGER_KEY = "manager";
}
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,9 @@ class HiveEngineConnExecutor(
LOG.info(s"set mapreduce.job.tags=LINKIS_$jobId")
hiveConf.set("mapreduce.job.tags", s"LINKIS_$jobId")
}

printTaskParamsLog(engineExecutorContext)

if (realCode.trim.length > 500) {
engineExecutorContext.appendStdout(s"$getId >> ${realCode.trim.substring(0, 500)} ...")
} else engineExecutorContext.appendStdout(s"$getId >> ${realCode.trim}")
Expand Down
Loading
Loading