Skip to content

Commit

Permalink
bak
Browse files Browse the repository at this point in the history
  • Loading branch information
peacewong committed Aug 1, 2024
1 parent a9365ff commit 24244ab
Show file tree
Hide file tree
Showing 13 changed files with 52 additions and 190 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import org.apache.linkis.common.conf.CommonVars

import org.apache.commons.lang3.StringUtils

import java.util.Locale

import scala.collection.mutable

object CodeAndRunTypeUtils {
Expand Down Expand Up @@ -117,7 +119,8 @@ object CodeAndRunTypeUtils {
if (StringUtils.isBlank(codeType)) {
return ""
}
getLanguageTypeAndCodeTypeRelationMap.getOrElse(codeType, defaultLanguageType)
val lowerCaseCodeType = codeType.toLowerCase(Locale.getDefault)
getLanguageTypeAndCodeTypeRelationMap.getOrElse(lowerCaseCodeType, defaultLanguageType)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,4 +90,13 @@ object GovernanceCommonConf {
val EC_APP_MANAGE_MODE =
CommonVars("linkis.ec.app.manage.mode", "attach")

/**
* DEFAULT_LOGPATH_PREFIX is the prefix that represents the default log storage path
* DEFAULT_LOGPATH_PREFIX 是表示默认的日志存储路径的前缀 和 结果集的前缀
*/
val DEFAULT_LOGPATH_PREFIX = CommonVars[String](
"wds.linkis.entrance.config.log.path",
CommonVars[String]("wds.linkis.filesystem.hdfs.root.path").getValue
).getValue

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ import org.apache.linkis.governance.common.conf.GovernanceCommonConf
import org.apache.commons.lang3.StringUtils

import java.io.File
import java.text.SimpleDateFormat
import java.util
import java.util.{ArrayList, List}
import java.util.{ArrayList, Date, List}

object GovernanceUtils extends Logging {

Expand Down Expand Up @@ -121,4 +122,27 @@ object GovernanceUtils extends Logging {
}
}

private val resPrefix = GovernanceCommonConf.DEFAULT_LOGPATH_PREFIX

/**
* get result path parentPath: resPrefix + dateStr + result + creator subPath: parentPath +
* executeUser + taskid + filename
*
* @param creator
* @return
*/
def getResultParentPath(creator: String): String = {
val resStb = new StringBuilder()
if (resStb.endsWith("/")) {
resStb.append(resPrefix)
} else {
resStb.append(resPrefix).append("/")
}
val dateFormat = new SimpleDateFormat("yyyy-MM-dd")
val date = new Date(System.currentTimeMillis)
val dateString = dateFormat.format(date)
resStb.append("result").append("/").append(dateString).append("/").append(creator)
resStb.toString()
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ class GovernanceCommonConfTest {
val entranceservicename = GovernanceCommonConf.ENTRANCE_SERVICE_NAME.getValue
val enginedefaultlimit = GovernanceCommonConf.ENGINE_DEFAULT_LIMIT.getValue
val skippythonparser = GovernanceCommonConf.SKIP_PYTHON_PARSER.getValue
val resultsetstorepath = GovernanceCommonConf.RESULT_SET_STORE_PATH.getValue
val errorcodedesclen = GovernanceCommonConf.ERROR_CODE_DESC_LEN

Assertions.assertEquals("wds.linkis.rm", conffilterrm)
Expand All @@ -54,7 +53,6 @@ class GovernanceCommonConfTest {
Assertions.assertEquals("linkis-cg-entrance", entranceservicename)
Assertions.assertTrue(5000 == enginedefaultlimit.intValue())
Assertions.assertTrue(skippythonparser)
Assertions.assertEquals("hdfs:///tmp/linkis/", resultsetstorepath)
Assertions.assertTrue(512 == errorcodedesclen)

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.apache.linkis.common.io.{FsPath, MetaData, Record}
import org.apache.linkis.common.io.resultset.{ResultSet, ResultSetWriter}
import org.apache.linkis.common.utils.Utils
import org.apache.linkis.governance.common.conf.GovernanceCommonConf
import org.apache.linkis.governance.common.utils.GovernanceUtils
import org.apache.linkis.manager.label.entity.Label

import org.apache.commons.lang3.StringUtils
Expand Down Expand Up @@ -60,9 +61,8 @@ trait ExecutorExecutionContext {
def setLabels(labels: Array[Label[_]]): Unit = this.labels = labels

protected def getDefaultStorePath: String = {
val path = GovernanceCommonConf.RESULT_SET_STORE_PATH.getValue
val pathPrefix = (if (path.endsWith("/")) path else path + "/") + Utils.getJvmUser + "/" +
DateFormatUtils.format(System.currentTimeMillis(), "yyyyMMdd") + "/"
val path = GovernanceUtils.getResultParentPath("default")
val pathPrefix = (if (path.endsWith("/")) path else path + "/") + Utils.getJvmUser + "/"
getJobId.map(pathPrefix + _ + "/" + System.nanoTime).getOrElse(pathPrefix + System.nanoTime)
}

Expand All @@ -81,7 +81,7 @@ trait ExecutorExecutionContext {
protected def getDefaultResultSetByType: String

def createDefaultResultSetWriter(): ResultSetWriter[_ <: MetaData, _ <: Record] = {
createResultSetWriter(getResultSetByType(getDefaultResultSetByType)) // todo check
createResultSetWriter(getResultSetByType(getDefaultResultSetByType))
}

def createDefaultResultSetWriter(alias: String): ResultSetWriter[_ <: MetaData, _ <: Record] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,6 @@ public EntranceInterceptor[] entranceInterceptors() {
new ParserVarLabelInterceptor(),
new VarSubstitutionInterceptor(),
new LogPathCreateInterceptor(),
new StorePathEntranceInterceptor(),
new ScalaCodeInterceptor(),
new SQLLimitEntranceInterceptor(),
new CommentInterceptor(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
package org.apache.linkis.entrance.parser;

import org.apache.linkis.common.io.FsPath;
import org.apache.linkis.entrance.conf.EntranceConfiguration$;
import org.apache.linkis.entrance.utils.CommonLogPathUtils;
import org.apache.linkis.governance.common.conf.GovernanceCommonConf;
import org.apache.linkis.governance.common.entity.job.JobRequest;
import org.apache.linkis.manager.label.utils.LabelUtil;
import org.apache.linkis.storage.utils.StorageUtils;
Expand Down Expand Up @@ -53,7 +53,7 @@ public static void generateLogPath(JobRequest jobRequest, Map<String, String> pa
String logPathPrefix = null;
String logMid = "log";
if (StringUtils.isEmpty(logPathPrefix)) {
logPathPrefix = EntranceConfiguration$.MODULE$.DEFAULT_LOGPATH_PREFIX().getValue();
logPathPrefix = GovernanceCommonConf.DEFAULT_LOGPATH_PREFIX();
}
/*Determine whether logPathPrefix is terminated with /, if it is, delete */
/*判断是否logPathPrefix是否是以 / 结尾, 如果是,就删除*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,20 +30,9 @@ object EntranceConfiguration {
val JOB_MAX_PERSIST_WAIT_TIME =
CommonVars("wds.linkis.entrance.job.persist.wait.max", new TimeType("5m"))

val MULTI_ENTRANCE_CONDITION = CommonVars("wds.linkis.entrance.multi.entrance.flag", true)

val JOBHISTORY_SPRING_APPLICATION_NAME =
CommonVars("wds.linkis.jobhistory.application.name", "linkis-ps-jobhistory")

/**
* DEFAULT_LOGPATH_PREFIX is the prefix that represents the default log storage path
* DEFAULT_LOGPATH_PREFIX 是表示默认的日志存储路径的前缀
*/
val DEFAULT_LOGPATH_PREFIX = CommonVars[String](
"wds.linkis.entrance.config.log.path",
CommonVars[String]("wds.linkis.filesystem.hdfs.root.path").getValue
)

/**
* Default_Cache_Max is used to specify the size of the LoopArray of the CacheLogWriter
* Default_Cache_Max 是用来指定CacheLogWriter的LoopArray的大小
Expand Down Expand Up @@ -291,7 +280,4 @@ object EntranceConfiguration {
val LINKIS_ENTRANCE_SKIP_ORCHESTRATOR =
CommonVars("linkis.entrance.skip.orchestrator", false).getValue

val ENABLE_HDFS_RES_DIR_PRIVATE =
CommonVars[Boolean]("linkis.entrance.enable.hdfs.res.dir.private", false).getValue

}
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,6 @@ abstract class EntranceExecutorManager(groupFactory: GroupFactory)

private val idGenerator = new AtomicLong(0)

def getOrCreateInterceptors(): Array[ExecuteRequestInterceptor]

override def delete(executor: Executor): Unit = {
if (null != executor) {
executor.close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,42 +65,3 @@ object JobExecuteRequestInterceptor extends ExecuteRequestInterceptor {
}

}

object ReconnectExecuteRequestInterceptor extends ExecuteRequestInterceptor {
val PROPERTY_EXEC_ID = "execId"

override def apply(requestTask: RequestTask, executeRequest: ExecuteRequest): RequestTask =
executeRequest match {
case reconnect: ReconnectExecuteRequest =>
requestTask.data(PROPERTY_EXEC_ID, reconnect.execId)
requestTask
case _ => requestTask
}

}

object StorePathExecuteRequestInterceptor extends ExecuteRequestInterceptor {

override def apply(requestTask: RequestTask, executeRequest: ExecuteRequest): RequestTask =
executeRequest match {
case storePath: StorePathExecuteRequest =>
requestTask.data(RequestTask.RESULT_SET_STORE_PATH, storePath.storePath)
requestTask
case _ => requestTask
}

}

object RuntimePropertiesExecuteRequestInterceptor extends ExecuteRequestInterceptor {

override def apply(requestTask: RequestTask, executeRequest: ExecuteRequest): RequestTask =
executeRequest match {
case runtime: RuntimePropertiesExecuteRequest =>
runtime.properties.asScala.foreach { case (k, v) =>
requestTask.data(k, v)
}
requestTask
case _ => requestTask
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,5 @@ import org.apache.linkis.scheduler.queue.GroupFactory
class EntranceExecutorManagerImpl(groupFactory: GroupFactory)
extends EntranceExecutorManager(groupFactory) {

override def getOrCreateInterceptors(): Array[ExecuteRequestInterceptor] = Array(
JobExecuteRequestInterceptor,
LabelExecuteRequestInterceptor,
ReconnectExecuteRequestInterceptor,
StorePathExecuteRequestInterceptor,
RuntimePropertiesExecuteRequestInterceptor
)

override def setExecutorListener(engineListener: ExecutorListener): Unit = {}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ package org.apache.linkis.entrance.utils
import org.apache.linkis.common.io.FsPath
import org.apache.linkis.common.utils.Utils
import org.apache.linkis.entrance.conf.EntranceConfiguration
import org.apache.linkis.governance.common.conf.GovernanceCommonConf
import org.apache.linkis.governance.common.entity.job.JobRequest
import org.apache.linkis.governance.common.utils.GovernanceUtils
import org.apache.linkis.manager.label.utils.LabelUtil
import org.apache.linkis.storage.FSFactory
import org.apache.linkis.storage.fs.FileSystem
Expand Down Expand Up @@ -58,34 +60,20 @@ object CommonLogPathUtils {
}
}

private val resPrefix = EntranceConfiguration.DEFAULT_LOGPATH_PREFIX.getValue

/**
* get result path parentPath: resPrefix + dateStr + result + creator subPath: parentPath +
* executeUser + taskid + filename
* @param jobRequest
* @return
*/
def getResultParentPath(jobRequest: JobRequest): String = {
val resStb = new StringBuilder()
if (resStb.endsWith("/")) {
resStb.append(resPrefix)
} else {
resStb.append(resPrefix).append("/")
}
val dateFormat = new SimpleDateFormat("yyyy-MM-dd")
val date = new Date(System.currentTimeMillis)
val dateString = dateFormat.format(date)
val userCreator = LabelUtil.getUserCreatorLabel(jobRequest.getLabels)
val creator =
if (null == userCreator) EntranceConfiguration.DEFAULT_CREATE_SERVICE
else userCreator.getCreator
resStb.append("result").append("/").append(dateString).append("/").append(creator)
resStb.toString()
GovernanceUtils.getResultParentPath(creator)
}

def getResultPath(jobRequest: JobRequest): String = {
val parentPath = getResultParentPath(jobRequest)
val userCreator = LabelUtil.getUserCreatorLabel(jobRequest.getLabels)
val creator =
if (null == userCreator) EntranceConfiguration.DEFAULT_CREATE_SERVICE
else userCreator.getCreator
val parentPath = GovernanceUtils.getResultParentPath(creator)
parentPath + "/" + jobRequest.getExecuteUser + "/" + jobRequest.getId
}

Expand Down

0 comments on commit 24244ab

Please sign in to comment.