Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master'
Browse files Browse the repository at this point in the history
yangwenzea committed Dec 6, 2023
2 parents 37c0970 + b689f88 commit 2d6ef74
Showing 469 changed files with 23,273 additions and 2,468 deletions.
2 changes: 1 addition & 1 deletion .github/PULL_REQUEST_TEMPLATE.md
Original file line number Diff line number Diff line change
@@ -12,7 +12,7 @@ and session management.

### Related issues/PRs

Related issues: #590
Related issues: close #590 close #591
Related pr:#591


2 changes: 1 addition & 1 deletion .github/actions/chart-testing-action
Original file line number Diff line number Diff line change
@@ -20,11 +20,17 @@ package org.apache.linkis.common
class ServiceInstance {
private var applicationName: String = _
private var instance: String = _
private var registryTimestamp: Long = _
def setApplicationName(applicationName: String): Unit = this.applicationName = applicationName
def getApplicationName: String = applicationName
def setInstance(instance: String): Unit = this.instance = instance
def getInstance: String = instance

def setRegistryTimestamp(registryTimestamp: Long): Unit = this.registryTimestamp =
registryTimestamp

def getRegistryTimestamp: Long = registryTimestamp

override def equals(other: Any): Boolean = other match {
case that: ServiceInstance =>
applicationName == that.applicationName &&
@@ -42,7 +48,9 @@ class ServiceInstance {
.foldLeft(0)((a, b) => 31 * a + b)
}

override def toString: String = s"ServiceInstance($applicationName, $instance)"
override def toString: String =
s"ServiceInstance($applicationName, $instance, $registryTimestamp)"

}

object ServiceInstance {
@@ -54,6 +62,14 @@ object ServiceInstance {
serviceInstance
}

def apply(applicationName: String, instance: String, registryTimestamp: Long): ServiceInstance = {
val serviceInstance = new ServiceInstance
serviceInstance.setApplicationName(applicationName)
serviceInstance.setInstance(instance)
serviceInstance.setRegistryTimestamp(registryTimestamp)
serviceInstance
}

def unapply(serviceInstance: ServiceInstance): Option[(String, String)] =
if (serviceInstance != null) {
Some(serviceInstance.applicationName, serviceInstance.instance)
Original file line number Diff line number Diff line change
@@ -232,19 +232,20 @@ private[conf] object BDPConfiguration extends Logging {

private[common] def formatValue[T](defaultValue: T, value: Option[String]): Option[T] = {
if (value.isEmpty || value.exists(StringUtils.isEmpty)) return Option(defaultValue)
val trimValue = value.map(_.trim)
val formattedValue = defaultValue match {
case _: String => value
case _: Byte => value.map(_.toByte)
case _: Short => value.map(_.toShort)
case _: Char => value.map(_.toCharArray.apply(0))
case _: Int => value.map(_.toInt)
case _: Long => value.map(_.toLong)
case _: Float => value.map(_.toFloat)
case _: Double => value.map(_.toDouble)
case _: Boolean => value.map(_.toBoolean)
case _: TimeType => value.map(new TimeType(_))
case _: ByteType => value.map(new ByteType(_))
case null => value
case _: String => trimValue
case _: Byte => trimValue.map(_.toByte)
case _: Short => trimValue.map(_.toShort)
case _: Char => trimValue.map(_.toCharArray.apply(0))
case _: Int => trimValue.map(_.toInt)
case _: Long => trimValue.map(_.toLong)
case _: Float => trimValue.map(_.toFloat)
case _: Double => trimValue.map(_.toDouble)
case _: Boolean => trimValue.map(_.toBoolean)
case _: TimeType => trimValue.map(new TimeType(_))
case _: ByteType => trimValue.map(new ByteType(_))
case null => trimValue
}
formattedValue.asInstanceOf[Option[T]]
}
Original file line number Diff line number Diff line change
@@ -33,7 +33,7 @@ object LogUtils {
}

def generateERROR(rawLog: String): String = {
getTimeFormat + " " + "ERROR" + " " + rawLog
getTimeFormat + " " + ERROR_STR + " " + rawLog
}

def generateWarn(rawLog: String): String = {
@@ -52,4 +52,6 @@ object LogUtils {
getTimeFormat + " " + "SYSTEM-WARN" + " " + rawLog
}

val ERROR_STR = "ERROR"

}
Original file line number Diff line number Diff line change
@@ -21,6 +21,8 @@ import org.apache.linkis.common.conf.CommonVars

import org.apache.commons.lang3.StringUtils

import scala.collection.mutable

object CodeAndRunTypeUtils {
private val CONF_LOCK = new Object()

@@ -101,7 +103,14 @@ object CodeAndRunTypeUtils {
def getLanguageTypeAndCodeTypeRelationMap: Map[String, String] = {
val codeTypeAndRunTypeRelationMap = getCodeTypeAndLanguageTypeRelationMap
if (codeTypeAndRunTypeRelationMap.isEmpty) Map()
else codeTypeAndRunTypeRelationMap.flatMap(x => x._2.map(y => (y, x._1)))
else {
// codeTypeAndRunTypeRelationMap.flatMap(x => x._2.map(y => (y, x._1)))
val map = mutable.Map[String, String]()
codeTypeAndRunTypeRelationMap.foreach(kv => {
kv._2.foreach(v => map.put(v, kv._1))
})
map.toMap
}
}

def getLanguageTypeByCodeType(codeType: String, defaultLanguageType: String = ""): String = {
Original file line number Diff line number Diff line change
@@ -21,6 +21,7 @@

import java.util.HashMap;
import java.util.Map;
import java.util.TreeMap;

import org.junit.jupiter.api.Test;

@@ -40,7 +41,7 @@ void testGenerateException() {
+ "null");
assertEquals(errorException.getClass(), ExceptionManager.generateException(null).getClass());
assertEquals(errorException.toString(), ExceptionManager.generateException(null).toString());
Map<String, Object> map = new HashMap<>();
Map<String, Object> map = new TreeMap<>();
map.put("level", null);
map.put("errCode", 1);
map.put("desc", "test");
80 changes: 76 additions & 4 deletions linkis-commons/linkis-module/pom.xml
Original file line number Diff line number Diff line change
@@ -64,10 +64,6 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
@@ -277,4 +273,80 @@
</plugins>
</build>

<profiles>
<profile>
<id>eureka</id>
<activation>
<activeByDefault>true</activeByDefault>
<property>
<name>discovery</name>
<value>eureka</value>
</property>
</activation>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
</dependencies>
</profile>
<profile>
<id>nacos</id>
<activation>
<property>
<name>discovery</name>
<value>nacos</value>
</property>
</activation>
<dependencies>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-commons</artifactId>
</exclusion>
<exclusion>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-context</artifactId>
</exclusion>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
</exclusion>
<exclusion>
<groupId>org.yaml</groupId>
<artifactId>snakeyaml</artifactId>
</exclusion>
<exclusion>
<groupId>io.prometheus</groupId>
<artifactId>simpleclient</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
</profile>
</profiles>

</project>
Original file line number Diff line number Diff line change
@@ -207,4 +207,7 @@ object ServerConfiguration extends Logging {
val LINKIS_SERVER_SESSION_PROXY_TICKETID_KEY =
CommonVars("wds.linkis.session.proxy.user.ticket.key", "linkis_user_session_proxy_ticket_id_v1")

val LINKIS_SERVER_ENTRANCE_HEADER_KEY =
CommonVars("linkis.server.entrance.header.key", "jobInstanceKey")

}
Original file line number Diff line number Diff line change
@@ -70,7 +70,7 @@ object RedisClient {
SessionHAConfiguration.RedisHost,
SessionHAConfiguration.RedisPort,
redisTimeout,
SessionHAConfiguration.RedisSentinalServer
SessionHAConfiguration.RedisPassword
)
}

Original file line number Diff line number Diff line change
@@ -69,6 +69,7 @@ public interface TaskConstant {
String TICKET_ID = "ticketId";
String ENGINE_CONN_TASK_ID = "engineConnTaskId";
String ENGINE_CONN_SUBMIT_TIME = "engineConnSubmitTime";
String FAILOVER_FLAG = "failoverFlag";
String DEBUG_ENBALE = "debug.enable";

String PARAMS_DATA_SOURCE = "dataSources";
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.linkis.protocol.engine

case class JobInstance(
status: String,
instances: String,
jobReqId: String,
createTimestamp: Long,
instanceRegistryTimestamp: Long
)
Original file line number Diff line number Diff line change
@@ -45,6 +45,14 @@ object TaskUtils {
}
} else params.put(key, waitToAdd)

private def clearMap(params: util.Map[String, AnyRef], key: String): Unit =
if (params != null && params.containsKey(key)) {
params.get(key) match {
case map: util.Map[String, AnyRef] => map.clear()
case _ => params.put(key, new util.HashMap[String, AnyRef]())
}
}

private def getConfigurationMap(
params: util.Map[String, AnyRef],
key: String
@@ -84,13 +92,20 @@ object TaskUtils {
def addStartupMap(params: util.Map[String, AnyRef], startupMap: util.Map[String, AnyRef]): Unit =
addConfigurationMap(params, startupMap, TaskConstant.PARAMS_CONFIGURATION_STARTUP)

def clearStartupMap(params: util.Map[String, AnyRef]): Unit = {
val configurationMap = getMap(params, TaskConstant.PARAMS_CONFIGURATION)
if (!configurationMap.isEmpty) {
clearMap(configurationMap, TaskConstant.PARAMS_CONFIGURATION_STARTUP)
}
}

def addRuntimeMap(params: util.Map[String, AnyRef], runtimeMap: util.Map[String, AnyRef]): Unit =
addConfigurationMap(params, runtimeMap, TaskConstant.PARAMS_CONFIGURATION_RUNTIME)

def addSpecialMap(params: util.Map[String, AnyRef], specialMap: util.Map[String, AnyRef]): Unit =
addConfigurationMap(params, specialMap, TaskConstant.PARAMS_CONFIGURATION_SPECIAL)

// tdoo
// todo
def getLabelsMap(params: util.Map[String, AnyRef]): util.Map[String, AnyRef] =
getMap(params, TaskConstant.LABELS)

Original file line number Diff line number Diff line change
@@ -23,7 +23,7 @@ object ZuulEntranceUtils {

private val INSTANCE_SPLIT_TOKEN = "_"

private val EXEC_ID = "exec_id"
val EXEC_ID = "exec_id"

private val SPLIT_LEN = 3

Original file line number Diff line number Diff line change
@@ -23,11 +23,18 @@ abstract class AbstractGroup extends Group {

private var _status: GroupStatus = _
private var maxRunningJobs: Int = _
private var maxAllowRunningJobs: Int = 0
private var maxAskExecutorTimes: Long = 0L

def setMaxRunningJobs(maxRunningJobs: Int): Unit = this.maxRunningJobs = maxRunningJobs
def getMaxRunningJobs: Int = maxRunningJobs

def setMaxAllowRunningJobs(maxAllowRunningJobs: Int): Unit = this.maxAllowRunningJobs =
maxAllowRunningJobs

def getMaxAllowRunningJobs: Int =
if (maxAllowRunningJobs <= 0) maxRunningJobs else Math.min(maxAllowRunningJobs, maxRunningJobs)

def setMaxAskExecutorTimes(maxAskExecutorTimes: Long): Unit = this.maxAskExecutorTimes =
maxAskExecutorTimes

Original file line number Diff line number Diff line change
@@ -38,4 +38,8 @@ object SchedulerEventState extends Enumeration {
SchedulerEventState.withName(jobState)
)

def isInitedByStr(jobState: String): Boolean = SchedulerEventState.withName(jobState) == Inited

def isRunningByStr(jobState: String): Boolean = isRunning(SchedulerEventState.withName(jobState))

}
Original file line number Diff line number Diff line change
@@ -27,6 +27,7 @@ import org.apache.linkis.scheduler.executer.Executor
import org.apache.linkis.scheduler.future.{BDPFuture, BDPFutureTask}
import org.apache.linkis.scheduler.queue._

import java.util
import java.util.concurrent.{ExecutorService, Future}

import scala.beans.BeanProperty
@@ -122,9 +123,10 @@ class FIFOUserConsumer(
}
var event: Option[SchedulerEvent] = getWaitForRetryEvent
if (event.isEmpty) {
val completedNums = runningJobs.filter(job => job == null || job.isCompleted)
if (completedNums.length < 1) {
Utils.tryQuietly(Thread.sleep(1000))
val maxAllowRunningJobs = fifoGroup.getMaxAllowRunningJobs
val currentRunningJobs = runningJobs.count(e => e != null && !e.isCompleted)
if (maxAllowRunningJobs <= currentRunningJobs) {
Utils.tryQuietly(Thread.sleep(1000)) // TODO 还可以优化,通过实现JobListener进行优化
return
}
while (event.isEmpty) {
@@ -207,6 +209,19 @@ class FIFOUserConsumer(
runningJobs(index) = job
}

protected def scanAllRetryJobsAndRemove(): util.List[Job] = {
val jobs = new util.ArrayList[Job]()
for (index <- runningJobs.indices) {
val job = runningJobs(index)
if (job != null && job.isJobCanRetry) {
jobs.add(job)
runningJobs(index) = null
logger.info(s"Job $job can retry, remove from runningJobs")
}
}
jobs
}

override def shutdown(): Unit = {
future.cancel(true)
val waitEvents = queue.getWaitingEvents
Original file line number Diff line number Diff line change
@@ -23,7 +23,9 @@ public enum StorageErrorCode {
FS_NOT_INIT(53001, "please init first"),

INCONSISTENT_DATA(53001, "Inconsistent row data read,read %s,need rowLen %s"),
FS_OOM(53002, "OOM occurred while reading the file");
FS_OOM(53002, "OOM occurred while reading the file"),

FS_ERROR(53003, "Failed to operation fs");

StorageErrorCode(int errorCode, String message) {
this.code = errorCode;
Original file line number Diff line number Diff line change
@@ -49,7 +49,7 @@ public static <K extends MetaData, V extends Record> ResultSetReader getResultSe
return new StorageResultSetReader<>(resultSet, value);
}

public static ResultSetReader getResultSetReader(String res) {
public static ResultSetReader getResultSetReader(String res) throws IOException {
ResultSetFactory rsFactory = ResultSetFactory.getInstance();
if (rsFactory.isResultSet(res)) {
ResultSet<? extends MetaData, ? extends Record> resultSet = rsFactory.getResultSet(res);
@@ -58,21 +58,12 @@ public static ResultSetReader getResultSetReader(String res) {
FsPath resPath = new FsPath(res);
ResultSet<? extends MetaData, ? extends Record> resultSet =
rsFactory.getResultSetByPath(resPath);
try {
FSFactory.getFs(resPath).init(null);
} catch (IOException e) {
logger.warn("ResultSetReaderFactory fs init failed", e);
}
ResultSetReader reader = null;
try {
reader =
ResultSetReaderFactory.getResultSetReader(
resultSet, FSFactory.getFs(resPath).read(resPath));
} catch (IOException e) {
logger.warn("ResultSetReaderFactory fs read failed", e);
}
Fs fs = FSFactory.getFs(resPath);
fs.init(null);
ResultSetReader reader =
ResultSetReaderFactory.getResultSetReader(resultSet, fs.read(resPath));
if (reader instanceof StorageResultSetReader) {
((StorageResultSetReader<?, ?>) reader).setFs(FSFactory.getFs(resPath));
((StorageResultSetReader<?, ?>) reader).setFs(fs);
}
return (StorageResultSetReader<?, ?>) reader;
}
Original file line number Diff line number Diff line change
@@ -51,28 +51,25 @@ org.apache.linkis.common.io.resultset.ResultSetWriter<K, V> getResultSetWriter(
public static Record[] getRecordByWriter(
org.apache.linkis.common.io.resultset.ResultSetWriter<? extends MetaData, ? extends Record>
writer,
long limit) {
long limit)
throws IOException {
String res = writer.toString();
return getRecordByRes(res, limit);
}

public static Record[] getRecordByRes(String res, long limit) {
public static Record[] getRecordByRes(String res, long limit) throws IOException {
ResultSetReader reader = ResultSetReaderFactory.getResultSetReader(res);
int count = 0;
List<Record> records = new ArrayList<>();
try {
reader.getMetaData();
while (reader.hasNext() && count < limit) {
records.add(reader.getRecord());
count++;
}
} catch (IOException e) {
logger.warn("ResultSetWriter getRecordByRes failed", e);
reader.getMetaData();
while (reader.hasNext() && count < limit) {
records.add(reader.getRecord());
count++;
}
return records.toArray(new Record[0]);
}

public static Record getLastRecordByRes(String res) {
public static Record getLastRecordByRes(String res) throws IOException {
ResultSetReader reader = ResultSetReaderFactory.getResultSetReader(res);
Record record = null;
try {
Original file line number Diff line number Diff line change
@@ -24,6 +24,7 @@
import org.apache.linkis.storage.*;
import org.apache.linkis.storage.conf.*;
import org.apache.linkis.storage.domain.*;
import org.apache.linkis.storage.exception.StorageErrorException;
import org.apache.linkis.storage.utils.*;

import org.apache.commons.io.IOUtils;
@@ -37,6 +38,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.linkis.storage.exception.StorageErrorCode.FS_ERROR;

public class StorageResultSetWriter<K extends MetaData, V extends Record>
extends ResultSetWriter<K, V> {
private static final Logger logger = LoggerFactory.getLogger(StorageResultSetWriter.class);
@@ -98,8 +101,9 @@ public void createNewFile() {
fs.init(null);
FileSystemUtils.createNewFile(storePath, proxyUser, true);
outputStream = fs.write(storePath, true);
} catch (IOException e) {
logger.warn("StorageResultSetWriter createNewFile failed", e);
} catch (Exception e) {
throw new StorageErrorException(
FS_ERROR.getCode(), "StorageResultSetWriter createNewFile failed", e);
}
logger.info("Succeed to create a new file:{}", storePath);
fileCreated = true;
Original file line number Diff line number Diff line change
@@ -47,7 +47,7 @@ record -> {
if (emptyValue.equals(Dolphin.LINKIS_NULL)) {
return "";
} else {
return nullValue;
return emptyValue;
}
} else if (r instanceof Double) {
return StorageUtils.doubleToString((Double) r);
Original file line number Diff line number Diff line change
@@ -61,16 +61,12 @@ public static void createNewFile(FsPath filePath, boolean createParentWhenNotExi
createNewFile(filePath, StorageUtils.getJvmUser(), createParentWhenNotExists);
}

public static void createNewFile(
FsPath filePath, String user, boolean createParentWhenNotExists) {
public static void createNewFile(FsPath filePath, String user, boolean createParentWhenNotExists)
throws Exception {
FileSystem fileSystem = (FileSystem) FSFactory.getFsByProxyUser(filePath, user);
try {
fileSystem.init(null);
createNewFileWithFileSystem(fileSystem, filePath, user, createParentWhenNotExists);
} catch (IOException e) {
logger.warn("FileSystemUtils createNewFile failed", e);
} catch (Exception e) {
logger.warn("FileSystemUtils createNewFile failed", e);
} finally {
IOUtils.closeQuietly(fileSystem);
}
@@ -96,6 +92,30 @@ public static void createNewFileWithFileSystem(
}
}

/**
* create new file and set file owner by FileSystem
*
* @param fileSystem
* @param filePath
* @param user
* @param createParentWhenNotExists
*/
public static void createNewFileAndSetOwnerWithFileSystem(
FileSystem fileSystem, FsPath filePath, String user, boolean createParentWhenNotExists)
throws Exception {
if (!fileSystem.exists(filePath)) {
if (!fileSystem.exists(filePath.getParent())) {
if (!createParentWhenNotExists) {
throw new IOException(
"parent dir " + filePath.getParent().getPath() + " dose not exists.");
}
mkdirs(fileSystem, filePath.getParent(), user);
}
fileSystem.createNewFile(filePath);
fileSystem.setOwner(filePath, user);
}
}

/**
* Recursively create a directory
*
@@ -133,4 +153,39 @@ public static boolean mkdirs(FileSystem fileSystem, FsPath dest, String user) th
}
return true;
}

/**
* Recursively create a directory(递归创建目录) add owner info
*
* @param fileSystem
* @param dest
* @param user
* @throws IOException
* @return
*/
public static boolean mkdirsAndSetOwner(FileSystem fileSystem, FsPath dest, String user)
throws IOException {
FsPath parentPath = dest.getParent();
Stack<FsPath> dirsToMake = new Stack<>();
dirsToMake.push(dest);
while (!fileSystem.exists(parentPath)) {
dirsToMake.push(parentPath);

if (Objects.isNull(parentPath.getParent())) {
// parent path of root is null
break;
}

parentPath = parentPath.getParent();
}
if (!fileSystem.canExecute(parentPath)) {
throw new IOException("You have not permission to access path " + dest.getPath());
}
while (!dirsToMake.empty()) {
FsPath path = dirsToMake.pop();
fileSystem.mkdir(path);
fileSystem.setOwner(path, user);
}
return true;
}
}
Original file line number Diff line number Diff line change
@@ -272,4 +272,8 @@ public static byte[] mergeByteArrays(byte[] arr1, byte[] arr2) {
System.arraycopy(arr2, 0, mergedArray, arr1.length, arr2.length);
return mergedArray;
}

public static boolean isHDFSPath(FsPath fsPath) {
return HDFS.equalsIgnoreCase(fsPath.getFsType());
}
}
Original file line number Diff line number Diff line change
@@ -26,6 +26,7 @@
import org.apache.linkis.cli.application.operator.ujes.LinkisJobOper;
import org.apache.linkis.cli.application.operator.ujes.UJESClientFactory;
import org.apache.linkis.cli.application.utils.CliUtils;
import org.apache.linkis.cli.application.utils.LoggerManager;

import org.apache.commons.lang3.StringUtils;

@@ -135,14 +136,22 @@ public static InteractiveJobDesc build(CliCtx ctx) {
}

if (StringUtils.isBlank(code) && StringUtils.isNotBlank(codePath)) {
code = CliUtils.readFile(codePath);
try {
code = CliUtils.readFile(codePath);
} catch (Exception e) {
LoggerManager.getInformationLogger().error("Failed to read file", e);
throw e;
}
}

executionMap.put(LinkisKeys.KEY_CODE, code);
labelMap.put(LinkisKeys.KEY_ENGINETYPE, engineType);
labelMap.put(LinkisKeys.KEY_CODETYPE, runType);
labelMap.put(LinkisKeys.KEY_USER_CREATOR, proxyUsr + "-" + creator);
sourceMap.put(LinkisKeys.KEY_SCRIPT_PATH, scriptPath);
if (ctx.getExtraMap().containsKey(CliKeys.VERSION)) {
sourceMap.put(LinkisKeys.CLI_VERSION, ctx.getExtraMap().get(CliKeys.VERSION));
}
runtimeMap.put(LinkisKeys.KEY_HIVE_RESULT_DISPLAY_TBALE, true);

desc.setCreator(creator);
Original file line number Diff line number Diff line change
@@ -32,7 +32,6 @@ public static void writeToFile(
String pathName, String fileName, String content, Boolean overWrite) {

File dir = new File(pathName);
File file = new File(fileName);

if (!dir.exists()) {
try {
@@ -47,6 +46,8 @@ public static void writeToFile(
}
}

File file = new File(dir.getAbsolutePath() + File.separator + fileName);

if (overWrite || !file.exists()) {
try {
file.createNewFile();
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.linkis.governance.common.constant;

public class CodeConstants {
// will auto append at end of scala code; make sure the last line is not a comment
public static String SCALA_CODE_AUTO_APPEND_CODE = "val linkisVar=123";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.linkis.governance.common.entity;

public class TemplateConfKey {

private String templateUuid;

private String key;

private String templateName;

private String configValue;

public String getTemplateUuid() {
return templateUuid;
}

public void setTemplateUuid(String templateUuid) {
this.templateUuid = templateUuid;
}

public String getKey() {
return key;
}

public void setKey(String key) {
this.key = key;
}

public String getTemplateName() {
return templateName;
}

public void setTemplateName(String templateName) {
this.templateName = templateName;
}

public String getConfigValue() {
return configValue;
}

public void setConfigValue(String configValue) {
this.configValue = configValue;
}

@Override
public String toString() {
return "TemplateKey{"
+ "templateUuid='"
+ templateUuid
+ '\''
+ ", key='"
+ key
+ '\''
+ ", templateName='"
+ templateName
+ '\''
+ ", configValue='"
+ configValue
+ '\''
+ '}';
}
}
Original file line number Diff line number Diff line change
@@ -49,6 +49,9 @@ public class JobRequest {
/** result location */
private String resultLocation;

/** Task status updates is ordered, if false, not checked */
private Boolean updateOrderFlag = true;

private String observeInfo;

private Map<String, Object> metrics = new HashMap<>();
@@ -205,6 +208,14 @@ public void setObserveInfo(String observeInfo) {
this.observeInfo = observeInfo;
}

public Boolean getUpdateOrderFlag() {
return updateOrderFlag;
}

public void setUpdateOrderFlag(Boolean updateOrderFlag) {
this.updateOrderFlag = updateOrderFlag;
}

@Override
public String toString() {
return "JobRequest{"
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.linkis.governance.common.protocol.conf;

import org.apache.linkis.protocol.message.RequestProtocol;

public class TemplateConfRequest implements RequestProtocol {

private String templateUuid;

private String templateName;

public TemplateConfRequest(String templateUuid, String templateName) {
this.templateUuid = templateUuid;
this.templateName = templateName;
}

public TemplateConfRequest(String templateUuid) {
this.templateUuid = templateUuid;
}

public String getTemplateUuid() {
return templateUuid;
}

public void setTemplateUuid(String templateUuid) {
this.templateUuid = templateUuid;
}

public String getTemplateName() {
return templateName;
}

public void setTemplateName(String templateName) {
this.templateName = templateName;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.linkis.governance.common.protocol.conf;

import org.apache.linkis.governance.common.entity.TemplateConfKey;

import java.util.ArrayList;
import java.util.List;

public class TemplateConfResponse {

private List<TemplateConfKey> list = new ArrayList<>();

public List<TemplateConfKey> getList() {
return list;
}

public void setList(List<TemplateConfKey> list) {
this.list = list;
}
}
Original file line number Diff line number Diff line change
@@ -90,10 +90,4 @@ object GovernanceCommonConf {
val EC_APP_MANAGE_MODE =
CommonVars("linkis.ec.app.manage.mode", "attach")

val SCALA_PARSE_APPEND_CODE_ENABLED =
CommonVars("linkis.scala.parse.append.code.enable", true).getValue

val SCALA_PARSE_APPEND_CODE =
CommonVars("linkis.scala.parse.append.code", "val linkisVar=1").getValue

}
Original file line number Diff line number Diff line change
@@ -17,7 +17,7 @@

package org.apache.linkis.governance.common.exception

import org.apache.linkis.common.exception.{ErrorException, ExceptionLevel, LinkisRuntimeException}
import org.apache.linkis.common.exception.{ExceptionLevel, LinkisRuntimeException}

class GovernanceErrorException(errorCode: Int, errorMsg: String)
extends LinkisRuntimeException(errorCode, errorMsg) {
Original file line number Diff line number Diff line change
@@ -19,6 +19,7 @@ package org.apache.linkis.governance.common.paser

import org.apache.linkis.common.utils.{CodeAndRunTypeUtils, Logging, Utils}
import org.apache.linkis.governance.common.conf.GovernanceCommonConf
import org.apache.linkis.governance.common.constant.CodeConstants
import org.apache.linkis.governance.common.paser.CodeType.CodeType

import org.apache.commons.lang3.StringUtils
@@ -86,6 +87,11 @@ abstract class CombinedEngineCodeParser extends CodeParser {

}

/**
* Scala is no longer using Parser but instead using EmptyParser. If there is a comment at the end,
* it will cause the task to become stuck
*/
@deprecated
class ScalaCodeParser extends SingleCodeParser with Logging {

override val codeType: CodeType = CodeType.Scala
@@ -109,11 +115,9 @@ class ScalaCodeParser extends SingleCodeParser with Logging {
case _ =>
}
if (statementBuffer.nonEmpty) codeBuffer.append(statementBuffer.mkString("\n"))

// Append code `val linkisVar=1` in ends to prevent bugs that do not exit tasks for a long time
if (GovernanceCommonConf.SCALA_PARSE_APPEND_CODE_ENABLED) {
codeBuffer.append(GovernanceCommonConf.SCALA_PARSE_APPEND_CODE)
}
// Make sure the last line is not a comment
codeBuffer.append("\n")
codeBuffer.append(CodeConstants.SCALA_CODE_AUTO_APPEND_CODE)
codeBuffer.toArray
}

Original file line number Diff line number Diff line change
@@ -51,3 +51,10 @@ class RequestOneJob extends JobReq {
}

case class RequestAllJob(instance: String) extends JobReq

case class RequestFailoverJob(
reqMap: util.Map[String, java.lang.Long],
statusList: util.List[String],
startTimestamp: Long,
limit: Int = 10
) extends JobReq
Original file line number Diff line number Diff line change
@@ -29,7 +29,8 @@ class ScalaCodeParserTest {
"val codeBuffer = new ArrayBuffer[String]()\n val statementBuffer = new ArrayBuffer[String]()"
val scalaCodeParser = new ScalaCodeParser
val array = scalaCodeParser.parse(scalaCode)
Assertions.assertTrue(array.length == 2)
Assertions.assertTrue(array.size == 3)

}

@Test
@@ -40,7 +41,7 @@ class ScalaCodeParserTest {
" def addInt( a:Int, b:Int )\n var sum:Int = 0\n sum = a + b\n return sum\n }"
val scalaCodeParser = new ScalaCodeParser
val array = scalaCodeParser.parse(abnormalCode)
Assertions.assertTrue(array.length == 2)
Assertions.assertTrue(array.length == 3)

}

@@ -53,7 +54,7 @@ class ScalaCodeParserTest {

val scalaCodeParser = new ScalaCodeParser
val array = scalaCodeParser.parse(importCode)
Assertions.assertTrue(array.length == 3)
Assertions.assertTrue(array.length == 4)

}

@@ -67,7 +68,7 @@ class ScalaCodeParserTest {
val scalaCodeParser = new ScalaCodeParser
val arrayResult1 = scalaCodeParser.parse(specialCodeExp1)

Assertions.assertTrue(arrayResult1.length == 3)
Assertions.assertTrue(arrayResult1.length == 4)

val specialCodeExp2 =
" @BeanProperty\n var id: Long = _\n @BeanProperty\n var status: Int = 0\n " +
@@ -78,7 +79,7 @@ class ScalaCodeParserTest {
".append(data, that.data)\n .isEquals\n }"

val arrayResult2 = scalaCodeParser.parse(specialCodeExp2)
Assertions.assertTrue(arrayResult2.length == 2)
Assertions.assertTrue(arrayResult2.length == 3)

}

Original file line number Diff line number Diff line change
@@ -176,8 +176,15 @@ trait ProcessEngineConnLaunch extends EngineConnLaunch with Logging {
.findAvailPortByRange(GovernanceCommonConf.ENGINE_CONN_PORT_RANGE.getValue)
.toString

var springConf = Map("server.port" -> engineConnPort, "spring.profiles.active" -> "engineconn")

var springConf =
Map[String, String]("server.port" -> engineConnPort, "spring.profiles.active" -> "engineconn")
val properties =
PortUtils.readFromProperties(Configuration.getLinkisHome + "/conf/version.properties")
if (StringUtils.isNotBlank(properties.getProperty("version"))) {
springConf += ("eureka.instance.metadata-map.linkis.app.version" -> properties.getProperty(
"version"
))
}
request.creationDesc.properties.asScala.filter(_._1.startsWith("spring.")).foreach {
case (k, v) =>
springConf = springConf + (k -> v)
Original file line number Diff line number Diff line change
@@ -17,15 +17,16 @@

package org.apache.linkis.ecm.core.utils

import org.apache.linkis.common.utils.Utils
import org.apache.linkis.common.utils.{Logging, Utils}

import org.apache.commons.io.IOUtils
import org.apache.commons.lang3.StringUtils

import java.io.IOException
import java.io.{BufferedReader, FileReader, IOException}
import java.net.ServerSocket
import java.util.Properties

object PortUtils {
object PortUtils extends Logging {

/**
* portRange: '-' is the separator
@@ -62,4 +63,23 @@ object PortUtils {
Utils.tryFinally(socket.getLocalPort)(IOUtils.closeQuietly(socket))
}

def readFromProperties(propertiesFile: String): Properties = {
val properties: Properties = new Properties
var reader: BufferedReader = null;
try {
reader = new BufferedReader(new FileReader(propertiesFile))
properties.load(reader)
} catch {
case e: Exception =>
logger.warn(s"loading vsersion faild with path $propertiesFile error:$e")
} finally {
try if (reader != null) reader.close
catch {
case e: Exception =>
logger.warn(s"try to close buffered reader with error:${e.getMessage}")
}
}
properties
}

}
Original file line number Diff line number Diff line change
@@ -51,6 +51,12 @@
<artifactId>linkis-rpc</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
Original file line number Diff line number Diff line change
@@ -33,7 +33,15 @@ public enum EngineconnServerErrorCodeSummary implements LinkisErrorCode {
11110,
"the parameters of engineConnInstance and ticketId are both not exists.(engineConnInstance 和ticketId 的参数都不存在.)"),
LOG_IS_NOT_EXISTS(11110, "Log directory {0} does not exists.(日志目录 {0} 不存在.)"),
FAILED_TO_DOWNLOAD(911115, "failed to downLoad(下载失败)");
FAILED_TO_DOWNLOAD(911115, "failed to downLoad(下载失败)"),
FILE_IS_OVERSIZE(911116, "Download file has exceeded 100MB(下载文件已超过100M)"),
PARAMETER_NOT_NULL(911117, "Parameter {0} cannot be empty (参数 {0} 不能为空)"),
LOGTYPE_ERROR(
911118,
"logType only supports stdout, stderr, gc, yarnApp(logType仅支持stdout,stderr,gc,yarnApp)"),
NOT_PERMISSION(
911119, "You {0} have no permission to download Log in ECM {1}(用户 {0} 无权限下载 ECM {1} 日志)"),
;

/** (errorCode)错误码 */
private final int errorCode;
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.linkis.ecm.restful;

import org.apache.linkis.common.conf.Configuration;
import org.apache.linkis.server.Message;
import org.apache.linkis.server.utils.ModuleUserUtils;

import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.Consts;

import org.springframework.web.bind.annotation.*;

import javax.servlet.ServletOutputStream;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import java.io.*;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.nio.file.attribute.FileOwnerAttributeView;
import java.nio.file.attribute.UserPrincipal;
import java.text.MessageFormat;

import com.github.xiaoymin.knife4j.annotations.ApiOperationSupport;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiImplicitParams;
import io.swagger.annotations.ApiOperation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.linkis.ecm.errorcode.EngineconnServerErrorCodeSummary.*;

@Api(tags = "ECM")
@RequestMapping(path = "/engineconnManager")
@RestController
public class ECMRestfulApi {

private final Logger logger = LoggerFactory.getLogger(ECMRestfulApi.class);

/**
* * Reason for using the get method: Added gateway forwarding rules, which only support get
* requests
*
* @param req
* @param response
* @param emInstance
* @param instance
* @param logDirSuffix
* @param logType
* @throws IOException
*/
@ApiOperation(
value = "downloadEngineLog",
notes = "download engine log",
response = Message.class)
@ApiImplicitParams({
@ApiImplicitParam(
name = "emInstance",
required = true,
dataType = "String",
example = "xxx0002:9102"),
@ApiImplicitParam(
name = "instance",
required = true,
dataType = "String",
example = "xxx0002:35873"),
@ApiImplicitParam(name = "logDirSuffix", required = true, dataType = "String"),
@ApiImplicitParam(name = "logType", required = true, dataType = "String")
})
@ApiOperationSupport(ignoreParameters = {"json"})
@RequestMapping(path = "/downloadEngineLog", method = RequestMethod.GET)
public Message downloadEngineLog(
HttpServletRequest req,
HttpServletResponse response,
@RequestParam(value = "emInstance") String emInstance,
@RequestParam(value = "instance") String instance,
@RequestParam(value = "logDirSuffix") String logDirSuffix,
@RequestParam(value = "logType") String logType)
throws IOException {
String userName = ModuleUserUtils.getOperationUser(req, "downloadEngineLog");
if (StringUtils.isBlank(instance)) {
return Message.error(MessageFormat.format(PARAMETER_NOT_NULL.getErrorDesc(), "instance"));
}
if (StringUtils.isBlank(logDirSuffix)) {
return Message.error(MessageFormat.format(PARAMETER_NOT_NULL.getErrorDesc(), "logDirSuffix"));
}
if (StringUtils.isBlank(logType)) {
return Message.error(MessageFormat.format(PARAMETER_NOT_NULL.getErrorDesc(), "logType"));
} else if (!logType.equals("stdout")
&& !logType.equals("stderr")
&& !logType.equals("gc")
&& !logType.equals("yarnApp")) {
return Message.error(LOGTYPE_ERROR.getErrorDesc());
}
// 获取文件的权限归属者
FileOwnerAttributeView ownerView =
Files.getFileAttributeView(
Paths.get(logDirSuffix + "/" + logType), FileOwnerAttributeView.class);
UserPrincipal owner = ownerView.getOwner();
if (!owner.getName().equals(userName)
&& Configuration.isNotAdmin(userName)
&& Configuration.isNotJobHistoryAdmin(userName)) {
return Message.error(
MessageFormat.format(NOT_PERMISSION.getErrorDesc(), userName, emInstance));
}
File inputFile = new File(logDirSuffix, logType);
if (!inputFile.exists()) {
return Message.error(MessageFormat.format(LOG_IS_NOT_EXISTS.getErrorDesc(), logDirSuffix));
} else {
long fileSizeInBytes = inputFile.length();
long fileSizeInMegabytes = fileSizeInBytes / (1024 * 1024);
if (fileSizeInMegabytes > 100) {
return Message.error(MessageFormat.format(FILE_IS_OVERSIZE.getErrorDesc(), logDirSuffix));
}
ServletOutputStream outputStream = null;
FileInputStream inputStream = null;
BufferedInputStream fis = null;
PrintWriter writer = null;
try {
inputStream = new FileInputStream(inputFile);
fis = new BufferedInputStream(inputStream);
byte[] buffer = new byte[1024];
int bytesRead = 0;
response.setCharacterEncoding(Consts.UTF_8.toString());
java.nio.file.Path source = Paths.get(inputFile.getPath());
response.addHeader("Content-Type", Files.probeContentType(source));
// filename eg:xxx002_11529_stdout.txt
response.addHeader(
"Content-Disposition",
"attachment;filename=" + instance.replace(":", "_") + "_" + logType + ".txt");
response.addHeader("Content-Length", fileSizeInBytes + "");
outputStream = response.getOutputStream();
while ((bytesRead = fis.read(buffer, 0, 1024)) != -1) {
outputStream.write(buffer, 0, bytesRead);
}
} catch (IOException e) {
logger.warn("Download EngineLog Failed Msg :", e);
response.reset();
response.setCharacterEncoding(Consts.UTF_8.toString());
response.setContentType("text/plain; charset=utf-8");
writer = response.getWriter();
writer.append("error(错误):" + e.getMessage());
writer.flush();
} finally {
if (outputStream != null) {
outputStream.flush();
}
IOUtils.closeQuietly(outputStream);
IOUtils.closeQuietly(fis);
IOUtils.closeQuietly(inputStream);
}
return Message.ok();
}
}
}
Original file line number Diff line number Diff line change
@@ -189,6 +189,7 @@ private String getYarnAppRegexByEngineType(String engineType) {
case "sqoop":
regex = EngineConnConf.SQOOP_ENGINE_CONN_YARN_APP_ID_PARSE_REGEX().getValue();
break;
case "flink":
case "hive":
regex = EngineConnConf.HIVE_ENGINE_CONN_YARN_APP_ID_PARSE_REGEX().getValue();
break;
Original file line number Diff line number Diff line change
@@ -62,7 +62,6 @@ class BmlEnginePreExecuteHook extends ComputationExecutorHook with Logging {
): String = {
val props = engineExecutionContext.getProperties
if (null != props && props.containsKey(GovernanceConstant.TASK_RESOURCES_STR)) {
val workDir = ComputationEngineUtils.getCurrentWorkDir
val jobId = engineExecutionContext.getJobId
props.get(GovernanceConstant.TASK_RESOURCES_STR) match {
case resources: util.List[Object] =>
@@ -71,9 +70,7 @@ class BmlEnginePreExecuteHook extends ComputationExecutorHook with Logging {
val fileName = resource.get(GovernanceConstant.TASK_RESOURCE_FILE_NAME_STR).toString
val resourceId = resource.get(GovernanceConstant.TASK_RESOURCE_ID_STR).toString
val version = resource.get(GovernanceConstant.TASK_RESOURCE_VERSION_STR).toString
val fullPath =
if (workDir.endsWith(seperator)) pathType + workDir + fileName
else pathType + workDir + seperator + fileName
val fullPath = fileName
val response = Utils.tryCatch {
bmlClient.downloadShareResource(processUser, resourceId, version, fullPath, true)
} {
Original file line number Diff line number Diff line change
@@ -33,6 +33,12 @@ object ComputationExecutorConf {
"Maximum number of tasks executed by the synchronization EC"
)

val PRINT_TASK_PARAMS_SKIP_KEYS = CommonVars(
"linkis.engineconn.print.task.params.skip.keys",
"jobId",
"skip to print params key at job logs"
)

val ENGINE_PROGRESS_FETCH_INTERVAL =
CommonVars(
"wds.linkis.engineconn.progresss.fetch.interval-in-seconds",
Original file line number Diff line number Diff line change
@@ -17,7 +17,10 @@

package org.apache.linkis.engineconn.computation.executor.cs

import org.apache.linkis.common.utils.Logging
import org.apache.linkis.cs.client.service.CSResourceService
import org.apache.linkis.engineconn.common.conf.EngineConnConf
import org.apache.linkis.governance.common.utils.GovernanceConstant

import org.apache.commons.lang3.StringUtils

@@ -27,7 +30,7 @@ import java.util.regex.Pattern
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer

class CSResourceParser {
class CSResourceParser extends Logging {

private val pb = Pattern.compile("cs://[^\\s\"]+[$\\s]{0,1}", Pattern.CASE_INSENSITIVE)

@@ -47,7 +50,6 @@ class CSResourceParser {
nodeNameStr: String
): String = {

// TODO getBMLResource
val bmlResourceList =
CSResourceService.getInstance().getUpstreamBMLResource(contextIDValueStr, nodeNameStr)

@@ -56,23 +58,25 @@ class CSResourceParser {

val preFixNames = new ArrayBuffer[String]()
val parsedNames = new ArrayBuffer[String]()
val prefixName = System.currentTimeMillis().toString + "_"
preFixResourceNames.foreach { preFixResourceName =>
val resourceName = preFixResourceName.replace(PREFIX, "").trim
val bmlResourceOption =
bmlResourceList.asScala.find(_.getDownloadedFileName.equals(resourceName))
if (bmlResourceOption.isDefined) {
val replacementName = EngineConnConf.getEngineTmpDir + prefixName + resourceName
val bmlResource = bmlResourceOption.get
val map = new util.HashMap[String, Object]()
map.put("resourceId", bmlResource.getResourceId)
map.put("version", bmlResource.getVersion)
map.put("fileName", resourceName)
map.put(GovernanceConstant.TASK_RESOURCE_ID_STR, bmlResource.getResourceId)
map.put(GovernanceConstant.TASK_RESOURCE_VERSION_STR, bmlResource.getVersion)
map.put(GovernanceConstant.TASK_RESOURCE_FILE_NAME_STR, replacementName)
parsedResources.add(map)
preFixNames.append(preFixResourceName)
parsedNames.append(resourceName)
parsedNames.append(replacementName)
logger.warn(s"Replace cs file from {$preFixResourceName} to {$replacementName}")
}

}
props.put("resources", parsedResources)
props.put(GovernanceConstant.TASK_RESOURCES_STR, parsedResources)
StringUtils.replaceEach(code, preFixNames.toArray, parsedNames.toArray)
}

Original file line number Diff line number Diff line change
@@ -22,6 +22,7 @@ import org.apache.linkis.common.log.LogUtils
import org.apache.linkis.common.utils.{Logging, Utils}
import org.apache.linkis.engineconn.acessible.executor.entity.AccessibleExecutor
import org.apache.linkis.engineconn.acessible.executor.listener.event.{
TaskLogUpdateEvent,
TaskResponseErrorEvent,
TaskStatusChangedEvent
}
@@ -40,7 +41,14 @@ import org.apache.linkis.governance.common.paser.CodeParser
import org.apache.linkis.governance.common.protocol.task.{EngineConcurrentInfo, RequestTask}
import org.apache.linkis.governance.common.utils.{JobUtils, LoggerUtils}
import org.apache.linkis.manager.common.entity.enumeration.NodeStatus
import org.apache.linkis.manager.label.entity.engine.UserCreatorLabel
import org.apache.linkis.manager.label.entity.engine.{
CodeLanguageLabel,
EngineType,
EngineTypeLabel,
RunType,
UserCreatorLabel
}
import org.apache.linkis.manager.label.utils.LabelUtil
import org.apache.linkis.protocol.engine.JobProgressInfo
import org.apache.linkis.scheduler.executer._

@@ -50,6 +58,8 @@ import org.apache.commons.lang3.exception.ExceptionUtils
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger

import scala.collection.JavaConverters._

import com.google.common.cache.{Cache, CacheBuilder}

abstract class ComputationExecutor(val outputPrintLimit: Int = 1000)
@@ -132,6 +142,12 @@ abstract class ComputationExecutor(val outputPrintLimit: Int = 1000)

override def close(): Unit = {
if (null != lastTask) CLOSE_LOCKER.synchronized {
listenerBusContext.getEngineConnSyncListenerBus.postToAll(
TaskLogUpdateEvent(
lastTask.getTaskId,
LogUtils.generateERROR("EC exits unexpectedly and actively kills the task")
)
)
killTask(lastTask.getTaskId)
}
else {
@@ -169,9 +185,11 @@ abstract class ComputationExecutor(val outputPrintLimit: Int = 1000)
Utils.tryFinally {
transformTaskStatus(engineConnTask, ExecutionNodeStatus.Running)
val engineExecutionContext = createEngineExecutionContext(engineConnTask)

val engineCreationContext = EngineConnObject.getEngineCreationContext

var hookedCode = engineConnTask.getCode
Utils.tryCatch {
val engineCreationContext = EngineConnObject.getEngineCreationContext
ComputationExecutorHook.getComputationExecutorHooks.foreach(hook => {
hookedCode =
hook.beforeExecutorExecute(engineExecutionContext, engineCreationContext, hookedCode)
@@ -182,12 +200,24 @@ abstract class ComputationExecutor(val outputPrintLimit: Int = 1000)
} else {
logger.info(s"hooked after code: $hookedCode ")
}

// task params log
// spark engine: at org.apache.linkis.engineplugin.spark.executor.SparkEngineConnExecutor.executeLine log special conf
Utils.tryAndWarn {
val engineType = LabelUtil.getEngineType(engineCreationContext.getLabels())
EngineType.mapStringToEngineType(engineType) match {
case EngineType.HIVE | EngineType.TRINO => printTaskParamsLog(engineExecutionContext)
case _ =>
}
}

val localPath = EngineConnConf.getLogDir
engineExecutionContext.appendStdout(
LogUtils.generateInfo(
s"EngineConn local log path: ${DataWorkCloudApplication.getServiceInstance.toString} $localPath"
)
)

var response: ExecuteResponse = null
val incomplete = new StringBuilder
val codes =
@@ -244,6 +274,11 @@ abstract class ComputationExecutor(val outputPrintLimit: Int = 1000)
case s: SuccessExecuteResponse =>
succeedTasks.increase()
s
case incompleteExecuteResponse: IncompleteExecuteResponse =>
ErrorExecuteResponse(
s"The task cannot be an incomplete response ${incompleteExecuteResponse.message}",
null
)
case _ => response
}
response
@@ -271,6 +306,7 @@ abstract class ComputationExecutor(val outputPrintLimit: Int = 1000)
TaskResponseErrorEvent(engineConnTask.getTaskId, errorExecuteResponse.message)
)
transformTaskStatus(engineConnTask, ExecutionNodeStatus.Failed)
case _ => logger.warn(s"task get response is $executeResponse")
}
executeResponse
}
@@ -335,6 +371,30 @@ abstract class ComputationExecutor(val outputPrintLimit: Int = 1000)
}
}

/**
* job task log print task params info
*
* @param engineExecutorContext
* @return
* Unit
*/

def printTaskParamsLog(engineExecutorContext: EngineExecutionContext): Unit = {
val sb = new StringBuilder

EngineConnObject.getEngineCreationContext.getOptions.asScala.foreach({ case (key, value) =>
// skip log jobId because it corresponding jobid when the ec created
if (!ComputationExecutorConf.PRINT_TASK_PARAMS_SKIP_KEYS.getValue.contains(key)) {
sb.append(s"${key}=${value.toString}\n")
}
})

sb.append("\n")
engineExecutorContext.appendStdout(
LogUtils.generateInfo(s"Your job exec with configs:\n${sb.toString()}\n")
)
}

def transformTaskStatus(task: EngineConnTask, newStatus: ExecutionNodeStatus): Unit = {
val oriStatus = task.getStatus
logger.info(s"task ${task.getTaskId} from status $oriStatus to new status $newStatus")
Original file line number Diff line number Diff line change
@@ -91,4 +91,12 @@ object ComputationEngineConnMetrics {
getTotalBusyTimeMills(nodeStatus) + getTotalIdleTimeMills(nodeStatus)

def getUnlockToShutdownDurationMills(): Long = unlockToShutdownDurationMills.get()

def getLastUnlockTimestamp(nodeStatus: NodeStatus): Long = {
nodeStatus match {
case NodeStatus.Unlock => lastUnlockTimeMills
case _ => 0
}
}

}
Original file line number Diff line number Diff line change
@@ -66,6 +66,10 @@ class DefaultNodeHeartbeatMsgManager extends NodeHeartbeatMsgManager with Loggin
ECConstants.EC_TOTAL_LOCK_TIME_MILLS_KEY,
ComputationEngineConnMetrics.getTotalLockTimeMills(status).asInstanceOf[Object]
)
msgMap.put(
ECConstants.EC_LAST_UNLOCK_TIMESTAMP,
ComputationEngineConnMetrics.getLastUnlockTimestamp(status).asInstanceOf[Object]
)
case _ =>
}
val engineParams = EngineConnObject.getEngineCreationContext.getOptions
Original file line number Diff line number Diff line change
@@ -187,7 +187,7 @@ class TaskExecutionServiceImpl
if (!lockService.isLockExist(requestTask.getLock)) {
logger.error(s"Lock ${requestTask.getLock} not exist, cannot execute.")
return ErrorExecuteResponse(
"Lock not exixt",
"Lock not exist",
new EngineConnExecutorErrorException(
EngineConnExecutorErrorCode.INVALID_LOCK,
"Lock : " + requestTask.getLock + " not exist(您的锁无效,请重新获取后再提交)."
@@ -422,31 +422,51 @@ class TaskExecutionServiceImpl
): Future[_] = {
val sleepInterval = ComputationExecutorConf.ENGINE_PROGRESS_FETCH_INTERVAL.getValue
scheduler.submit(new Runnable {
override def run(): Unit = Utils.tryAndWarn {
override def run(): Unit = {
Utils.tryQuietly(Thread.sleep(TimeUnit.MILLISECONDS.convert(1, TimeUnit.SECONDS)))
while (null != taskFuture && !taskFuture.isDone) {
if (
ExecutionNodeStatus.isCompleted(task.getStatus) || ExecutionNodeStatus
.isRunning(task.getStatus)
) {
val progressResponse = taskProgress(task.getTaskId)
val resourceResponse = buildResourceMap(task)
val extraInfoMap = buildExtraInfoMap(task)
// todo add other info
val resourceMap =
if (null != resourceResponse) resourceResponse.getResourceMap else null

val respRunningInfo: ResponseTaskRunningInfo = new ResponseTaskRunningInfo(
progressResponse.execId,
progressResponse.progress,
progressResponse.progressInfo,
resourceMap,
extraInfoMap
)

sendToEntrance(task, respRunningInfo)
Thread.sleep(TimeUnit.MILLISECONDS.convert(sleepInterval, TimeUnit.SECONDS))
if (!ExecutionNodeStatus.isCompleted(task.getStatus)) {
Utils.tryAndWarn {
val progressResponse = Utils.tryCatch(taskProgress(task.getTaskId)) {
case e: Exception =>
logger.info("Failed to get progress", e)
null
}
val resourceResponse = Utils.tryCatch(buildResourceMap(task)) { case e: Exception =>
logger.info("Failed to get resource", e)
null
}
val extraInfoMap = Utils.tryCatch(buildExtraInfoMap(task)) { case e: Exception =>
logger.info("Failed to get extra info ", e)
null
}
val resourceMap =
if (null != resourceResponse) resourceResponse.getResourceMap else null

/**
* It is guaranteed that there must be progress the progress must be greater than or
* equal to 0.1
*/
val newProgressResponse = if (null == progressResponse) {
ResponseTaskProgress(task.getTaskId, 0.1f, null)
} else if (progressResponse.progress < 0.1f) {
ResponseTaskProgress(task.getTaskId, 0.1f, progressResponse.progressInfo)
} else {
progressResponse
}
val respRunningInfo: ResponseTaskRunningInfo = new ResponseTaskRunningInfo(
newProgressResponse.execId,
newProgressResponse.progress,
newProgressResponse.progressInfo,
resourceMap,
extraInfoMap
)
sendToEntrance(task, respRunningInfo)
}
}
Utils.tryQuietly(
Thread.sleep(TimeUnit.MILLISECONDS.convert(sleepInterval, TimeUnit.SECONDS))
)
}
}
})
@@ -499,7 +519,7 @@ class TaskExecutionServiceImpl
}

override def taskProgress(taskID: String): ResponseTaskProgress = {
var response = ResponseTaskProgress(taskID, 0, null)
var response = ResponseTaskProgress(taskID, 0.01f, null)
if (StringUtils.isBlank(taskID)) return response
val executor = taskIdCache.getIfPresent(taskID)
if (null != executor) {
@@ -514,11 +534,9 @@ class TaskExecutionServiceImpl
ResponseTaskProgress(taskID, progress, executor.getProgressInfo(taskID))
)
}
} else {
response = ResponseTaskProgress(taskID, -1, null)
}
} else {
logger.error(s"Executor of taskId : $taskID is not cached.")
logger.info(s"Executor of taskId : $taskID is not cached.")
}
response
}
@@ -606,7 +624,7 @@ class TaskExecutionServiceImpl
logger.warn("Unknown event : " + BDPJettyServerHelper.gson.toJson(event))
}

override def onLogUpdate(logUpdateEvent: TaskLogUpdateEvent): Unit = {
override def onLogUpdate(logUpdateEvent: TaskLogUpdateEvent): Unit = Utils.tryAndWarn {
if (EngineConnConf.ENGINE_PUSH_LOG_TO_ENTRANCE.getValue) {
if (null != logUpdateEvent && StringUtils.isNotBlank(logUpdateEvent.taskId)) {
val task = getTaskByTaskId(logUpdateEvent.taskId)
@@ -663,32 +681,33 @@ class TaskExecutionServiceImpl
}
}

override def onProgressUpdate(taskProgressUpdateEvent: TaskProgressUpdateEvent): Unit = {
if (EngineConnConf.ENGINE_PUSH_LOG_TO_ENTRANCE.getValue) {
val task = getTaskByTaskId(taskProgressUpdateEvent.taskId)
if (null != task) {
val resourceResponse = buildResourceMap(task)
val extraInfoMap = buildExtraInfoMap(task)
override def onProgressUpdate(taskProgressUpdateEvent: TaskProgressUpdateEvent): Unit =
Utils.tryAndWarn {
if (EngineConnConf.ENGINE_PUSH_LOG_TO_ENTRANCE.getValue) {
val task = getTaskByTaskId(taskProgressUpdateEvent.taskId)
if (null != task) {
val resourceResponse = buildResourceMap(task)
val extraInfoMap = buildExtraInfoMap(task)

val resourceMap = if (null != resourceResponse) resourceResponse.getResourceMap else null
val resourceMap = if (null != resourceResponse) resourceResponse.getResourceMap else null

val respRunningInfo: ResponseTaskRunningInfo = new ResponseTaskRunningInfo(
taskProgressUpdateEvent.taskId,
taskProgressUpdateEvent.progress,
taskProgressUpdateEvent.progressInfo,
resourceMap,
extraInfoMap
)
val respRunningInfo: ResponseTaskRunningInfo = new ResponseTaskRunningInfo(
taskProgressUpdateEvent.taskId,
taskProgressUpdateEvent.progress,
taskProgressUpdateEvent.progressInfo,
resourceMap,
extraInfoMap
)

sendToEntrance(task, respRunningInfo)
} else {
logger.error(
"Task cannot null! taskProgressUpdateEvent : " + ComputationEngineUtils.GSON
.toJson(taskProgressUpdateEvent)
)
sendToEntrance(task, respRunningInfo)
} else {
logger.error(
"Task cannot null! taskProgressUpdateEvent : " + ComputationEngineUtils.GSON
.toJson(taskProgressUpdateEvent)
)
}
}
}
}

override def onResultSetCreated(taskResultCreateEvent: TaskResultCreateEvent): Unit = {
logger.info(s"start to deal result event ${taskResultCreateEvent.taskId}")
Original file line number Diff line number Diff line change
@@ -39,10 +39,10 @@ class ECTaskKillHandler extends MonitorHandler with Logging {
while (elements.hasNext) {
val element = elements.next
Utils.tryCatch {
doKill(element)
logger.error(
s"ERROR: entrance : ${element.getUpstreamConnection().getUpstreamServiceInstanceName()} lose connect, will kill job : ${element.getKey()}"
)
doKill(element)
} { t =>
logger.error("Failed to kill job: " + element.getKey, t)
}
Original file line number Diff line number Diff line change
@@ -61,6 +61,8 @@ object EngineConnConf {

val ENGINE_CONN_LOCAL_LOG_DIRS_KEY = CommonVars("wds.linkis.engine.logs.dir.key", "LOG_DIRS")

val ENGINE_CONN_LOCAL_TMP_DIR = CommonVars("wds.linkis.engine.tmp.dir", "TEMP_DIRS")

val ENGINE_CONN_CREATION_WAIT_TIME =
CommonVars("wds.linkis.engine.connector.init.time", new TimeType("8m"))

@@ -84,6 +86,8 @@ object EngineConnConf {

def getWorkHome: String = System.getenv(ENGINE_CONN_LOCAL_PATH_PWD_KEY.getValue)

def getEngineTmpDir: String = System.getenv(ENGINE_CONN_LOCAL_TMP_DIR.getValue)

def getLogDir: String = {
val logDir = System.getenv(ENGINE_CONN_LOCAL_LOG_DIRS_KEY.getValue)
if (StringUtils.isNotEmpty(logDir)) logDir else new File(getWorkHome, "logs").getPath
Original file line number Diff line number Diff line change
@@ -37,7 +37,7 @@ object AccessibleExecutorConfiguration {
val ENGINECONN_LOG_SEND_SIZE = CommonVars[Int]("wds.linkis.engineconn.log.send.cache.size", 300)

val ENGINECONN_MAX_FREE_TIME =
CommonVars("wds.linkis.engineconn.max.free.time", new TimeType("30m"))
CommonVars("wds.linkis.engineconn.max.free.time", new TimeType("10m"))

val ENGINECONN_LOCK_CHECK_INTERVAL =
CommonVars("wds.linkis.engineconn.lock.free.interval", new TimeType("3m"))
6 changes: 6 additions & 0 deletions linkis-computation-governance/linkis-entrance/pom.xml
Original file line number Diff line number Diff line change
@@ -90,6 +90,12 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.linkis</groupId>
<artifactId>linkis-ps-common-lock</artifactId>
<version>${project.version}</version>
</dependency>

</dependencies>

<build>
Original file line number Diff line number Diff line change
@@ -42,6 +42,7 @@
import org.apache.linkis.entrance.persistence.QueryPersistenceManager;
import org.apache.linkis.entrance.persistence.ResultSetEngine;
import org.apache.linkis.entrance.scheduler.EntranceGroupFactory;
import org.apache.linkis.entrance.scheduler.EntranceParallelConsumerManager;
import org.apache.linkis.entrance.scheduler.EntranceSchedulerContext;
import org.apache.linkis.orchestrator.ecm.EngineConnManagerBuilder;
import org.apache.linkis.orchestrator.ecm.EngineConnManagerBuilder$;
@@ -51,7 +52,6 @@
import org.apache.linkis.scheduler.executer.ExecutorManager;
import org.apache.linkis.scheduler.queue.ConsumerManager;
import org.apache.linkis.scheduler.queue.GroupFactory;
import org.apache.linkis.scheduler.queue.parallelqueue.ParallelConsumerManager;
import org.apache.linkis.scheduler.queue.parallelqueue.ParallelScheduler;

import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
@@ -150,7 +150,7 @@ public EntranceInterceptor[] entranceInterceptors() {
new ScalaCodeInterceptor(),
new SQLLimitEntranceInterceptor(),
new CommentInterceptor(),
new SetTenantLabelInterceptor(),
// new SetTenantLabelInterceptor(),
new UserCreatorIPCheckInterceptor()
};
}
@@ -190,7 +190,7 @@ public GroupFactory groupFactory() {
@Bean
@ConditionalOnMissingBean
public ConsumerManager consumerManager() {
return new ParallelConsumerManager(
return new EntranceParallelConsumerManager(
ENTRANCE_SCHEDULER_MAX_PARALLELISM_USERS().getValue(), "EntranceJobScheduler");
}

Original file line number Diff line number Diff line change
@@ -26,4 +26,6 @@ private ServiceNameConsts() {}
public static final String ENTRANCE_SERVER = "entranceServer";

public static final String ENTRANCE_INTERCEPTOR = "entranceInterceptors";

public static final String ENTRANCE_FAILOVER_SERVER = "entranceFailoverServer";
}
Original file line number Diff line number Diff line change
@@ -62,6 +62,11 @@ public enum EntranceErrorCodeSummary implements LinkisErrorCode {

INVALID_RESULTSETS(20053, "Invalid resultsets, cannot use cache(结果集无效,无法使用 cache)"),
SUBMITTING_QUERY_FAILED(30009, "Submitting the query failed(提交查询失败)!"),

SUBMIT_CODE_ISEMPTY(
30010,
"Submitting the execution code, after code preprocessing, the real execution code is empty, please check the executed code(提交的执行代码,经过预处理后为空,请检查执行的代码是否为空或则只有注解)!"),

QUERY_STATUS_FAILED(50081, "Query from jobHistory status failed(从 jobHistory 状态查询失败)"),
GET_QUERY_RESPONSE(50081, "Get query response incorrectly(获取查询响应结果不正确)"),
QUERY_TASKID_ERROR(50081, "Query task of taskId:{0} error(查询任务id:{}的任务出错)"),
@@ -71,7 +76,11 @@ public enum EntranceErrorCodeSummary implements LinkisErrorCode {
SHELL_BLACKLISTED_CODE(50081, "Shell code contains blacklisted code(shell中包含黑名单代码)"),
JOB_HISTORY_FAILED_ID(50081, ""),

LOGPATH_NOT_NULL(20301, "The logPath cannot be empty(日志路径不能为空)");
LOGPATH_NOT_NULL(20301, "The logPath cannot be empty(日志路径不能为空)"),

FAILOVER_RUNNING_TO_CANCELLED(
30001,
"Job {0} failover, status changed from Running to Cancelled (任务故障转移,状态从Running变更为Cancelled)");

/** (errorCode)错误码 */
private final int errorCode;
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.linkis.entrance.restful;

import org.apache.linkis.common.conf.Configuration;
import org.apache.linkis.entrance.EntranceServer;
import org.apache.linkis.scheduler.queue.ConsumerManager;
import org.apache.linkis.server.Message;
import org.apache.linkis.server.utils.ModuleUserUtils;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import javax.servlet.http.HttpServletRequest;

import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Api(tags = "entrance lable manager")
@RestController
@RequestMapping(path = "/entrance/operation/consumer")
public class EntranceConsumerRestfulApi {

private EntranceServer entranceServer;

private static final Logger logger = LoggerFactory.getLogger(EntranceConsumerRestfulApi.class);

@Autowired
public void setEntranceServer(EntranceServer entranceServer) {
this.entranceServer = entranceServer;
}

@ApiOperation(value = "kill-consumer", notes = "kill consumer", response = Message.class)
@RequestMapping(path = "/kill", method = RequestMethod.GET)
public Message killConsumer(
HttpServletRequest req, @RequestParam(value = "groupName") String groupName) {
String operationUser = ModuleUserUtils.getOperationUser(req, "kill consumer");
if (Configuration.isNotAdmin(operationUser)) {
return Message.error("only admin can do this");
}
logger.info("user {} to kill consumer {}", operationUser, groupName);
ConsumerManager consumerManager =
entranceServer
.getEntranceContext()
.getOrCreateScheduler()
.getSchedulerContext()
.getOrCreateConsumerManager();
consumerManager.destroyConsumer(groupName);
logger.info("user {} finished to kill consumer {}", operationUser, groupName);
return Message.ok();
}

@ApiOperation(value = "consumer-info", notes = "list consumers info", response = Message.class)
@RequestMapping(path = "/info", method = RequestMethod.GET)
public Message countConsumer(HttpServletRequest req) {
String operationUser = ModuleUserUtils.getOperationUser(req, "kill consumer");
if (Configuration.isNotAdmin(operationUser)) {
return Message.error("only admin can do this");
}
ConsumerManager consumerManager =
entranceServer
.getEntranceContext()
.getOrCreateScheduler()
.getSchedulerContext()
.getOrCreateConsumerManager();
return Message.ok().data("consumerNum", consumerManager.listConsumers().length);
}
}
Original file line number Diff line number Diff line change
@@ -18,14 +18,19 @@
package org.apache.linkis.entrance.restful;

import org.apache.linkis.common.conf.Configuration;
import org.apache.linkis.entrance.EntranceServer;
import org.apache.linkis.entrance.scheduler.EntranceSchedulerContext;
import org.apache.linkis.instance.label.client.InstanceLabelClient;
import org.apache.linkis.manager.label.constant.LabelKeyConstant;
import org.apache.linkis.manager.label.constant.LabelValueConstant;
import org.apache.linkis.protocol.label.InsLabelRefreshRequest;
import org.apache.linkis.protocol.label.InsLabelRemoveRequest;
import org.apache.linkis.rpc.Sender;
import org.apache.linkis.scheduler.SchedulerContext;
import org.apache.linkis.server.Message;
import org.apache.linkis.server.utils.ModuleUserUtils;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;

import javax.servlet.http.HttpServletRequest;
@@ -46,6 +51,14 @@
public class EntranceLabelRestfulApi {

private static final Logger logger = LoggerFactory.getLogger(EntranceLabelRestfulApi.class);
private EntranceServer entranceServer;

@Autowired
public void setEntranceServer(EntranceServer entranceServer) {
this.entranceServer = entranceServer;
}

private static Boolean offlineFlag = false;

@ApiOperation(value = "update", notes = "update route label", response = Message.class)
@ApiOperationSupport(ignoreParameters = {"jsonNode"})
@@ -72,13 +85,51 @@ public Message updateRouteLabel(HttpServletRequest req, @RequestBody JsonNode js
public Message updateRouteLabel(HttpServletRequest req) {
ModuleUserUtils.getOperationUser(req, "markoffline");
Map<String, Object> labels = new HashMap<String, Object>();
logger.info("Prepare to modify the routelabel of entry to offline");
logger.info("Prepare to modify the routelabel of entrance to offline");
labels.put(LabelKeyConstant.ROUTE_KEY, LabelValueConstant.OFFLINE_VALUE);
InsLabelRefreshRequest insLabelRefreshRequest = new InsLabelRefreshRequest();
insLabelRefreshRequest.setLabels(labels);
insLabelRefreshRequest.setServiceInstance(Sender.getThisServiceInstance());
InstanceLabelClient.getInstance().refreshLabelsToInstance(insLabelRefreshRequest);
synchronized (offlineFlag) {
offlineFlag = true;
}
logger.info("Finished to modify the routelabel of entry to offline");

logger.info("Prepare to update all not execution task instances to empty string");
SchedulerContext schedulerContext =
entranceServer.getEntranceContext().getOrCreateScheduler().getSchedulerContext();
if (schedulerContext instanceof EntranceSchedulerContext) {
((EntranceSchedulerContext) schedulerContext).setOfflineFlag(true);
}
entranceServer.updateAllNotExecutionTaskInstances(true);
logger.info("Finished to update all not execution task instances to empty string");

return Message.ok();
}

@ApiOperation(
value = "backonline",
notes = "from offline status to recover",
response = Message.class)
@RequestMapping(path = "/backonline", method = RequestMethod.GET)
public Message backOnline(HttpServletRequest req) {
ModuleUserUtils.getOperationUser(req, "backonline");
logger.info("Prepare to modify the routelabel of entrance to remove offline");
InsLabelRemoveRequest insLabelRemoveRequest = new InsLabelRemoveRequest();
insLabelRemoveRequest.setServiceInstance(Sender.getThisServiceInstance());
InstanceLabelClient.getInstance().removeLabelsFromInstance(insLabelRemoveRequest);
synchronized (offlineFlag) {
offlineFlag = false;
}
logger.info("Finished to backonline");
return Message.ok();
}

@ApiOperation(value = "isOnline", notes = "entrance isOnline", response = Message.class)
@RequestMapping(path = "/isOnline", method = RequestMethod.GET)
public Message isOnline(HttpServletRequest req) {
logger.info("Whether Entrance is online: {}", !offlineFlag);
return Message.ok().data("isOnline", !offlineFlag);
}
}
Original file line number Diff line number Diff line change
@@ -20,8 +20,7 @@
import org.apache.linkis.common.conf.Configuration;
import org.apache.linkis.entrance.EntranceServer;
import org.apache.linkis.entrance.execute.EntranceJob;
import org.apache.linkis.manager.label.entity.engine.EngineTypeLabel;
import org.apache.linkis.manager.label.utils.LabelUtil;
import org.apache.linkis.entrance.scheduler.CreatorECTypeDefaultConf;
import org.apache.linkis.server.Message;
import org.apache.linkis.server.utils.ModuleUserUtils;

@@ -67,7 +66,7 @@ public Message taskinfo(
HttpServletRequest req,
@RequestParam(value = "user", required = false) String user,
@RequestParam(value = "creator", required = false) String creator,
@RequestParam(value = "engineTypeLabel", required = false) String engineTypeLabelValue) {
@RequestParam(value = "ecType", required = false) String ecType) {
String userName = ModuleUserUtils.getOperationUser(req, "taskinfo");
String queryUser = user;
if (Configuration.isNotAdmin(userName)) {
@@ -83,41 +82,38 @@ public Message taskinfo(
} else if (StringUtils.isBlank(creator)) {
filterWords = queryUser;
}
EntranceJob[] undoneTasks = entranceServer.getAllUndoneTask(filterWords);
int taskNumber = 0;
EntranceJob[] undoneTasks = entranceServer.getAllUndoneTask(filterWords, ecType);
int runningNumber = 0;
int queuedNumber = 0;

if (null != undoneTasks) {
for (EntranceJob task : undoneTasks) {
if (StringUtils.isNotBlank(engineTypeLabelValue)) {
EngineTypeLabel engineTypeLabel =
LabelUtil.getEngineTypeLabel(task.getJobRequest().getLabels());
// Task types do not match, do not count
if (null == engineTypeLabel
|| !engineTypeLabelValue.equalsIgnoreCase(engineTypeLabel.getStringValue())) {
continue;
}
}
taskNumber++;
if (task.isRunning()) {
runningNumber++;
} else {
queuedNumber++;
}
}
}
return Message.ok("success")
.data("taskNumber", taskNumber)
.data("runningNumber", runningNumber)
.data("queuedNumber", queuedNumber);
Message resp =
Message.ok("success")
.data("taskNumber", undoneTasks.length)
.data("runningNumber", runningNumber)
.data("queuedNumber", queuedNumber);
if (StringUtils.isNoneBlank(creator, ecType)) {
int creatorECTypeMaxRunningJobs =
CreatorECTypeDefaultConf.getCreatorECTypeMaxRunningJobs(creator, ecType);
resp.data("creatorECTypeMaxRunningJobs", creatorECTypeMaxRunningJobs);
resp.data("limitExceeded", runningNumber > creatorECTypeMaxRunningJobs);
}
return resp;
}

@ApiOperation(value = "Status", notes = "get running task number ", response = Message.class)
@ApiOperation(value = "runningtask", notes = "get running task number ", response = Message.class)
@RequestMapping(path = "/runningtask", method = RequestMethod.GET)
public Message status(HttpServletRequest req) {
public Message runningtask(HttpServletRequest req) {
ModuleUserUtils.getOperationUser(req, "runningtask");
EntranceJob[] undoneTasks = entranceServer.getAllUndoneTask("");
EntranceJob[] undoneTasks = entranceServer.getAllUndoneTask("", null);
Boolean isCompleted = false;
if (null == undoneTasks || undoneTasks.length < 1) {
isCompleted = true;

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.linkis.entrance.scheduler;

import org.apache.linkis.common.conf.Configuration;
import org.apache.linkis.entrance.conf.EntranceConfiguration;
import org.apache.linkis.entrance.utils.EntranceUtils;
import org.apache.linkis.governance.common.protocol.conf.RequestQueryEngineConfig;
import org.apache.linkis.governance.common.protocol.conf.ResponseQueryConfig;
import org.apache.linkis.manager.label.entity.engine.EngineTypeLabel;
import org.apache.linkis.manager.label.entity.engine.UserCreatorLabel;
import org.apache.linkis.rpc.Sender;

import org.apache.commons.lang3.StringUtils;

import java.util.concurrent.TimeUnit;

import scala.Tuple2;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CreatorECTypeDefaultConf {

private static final Logger logger = LoggerFactory.getLogger(CreatorECTypeDefaultConf.class);

public static Sender confSender =
Sender.getSender(
Configuration.CLOUD_CONSOLE_CONFIGURATION_SPRING_APPLICATION_NAME().getValue());

private static LoadingCache<String, Integer> confCache =
CacheBuilder.newBuilder()
.maximumSize(1000)
.expireAfterWrite(
(long) EntranceConfiguration.ENTRANCE_CREATOR_JOB_LIMIT_CONF_CACHE().getValue(),
TimeUnit.MINUTES)
.build(
new CacheLoader<String, Integer>() {
@Override
public Integer load(String key) throws Exception {
Tuple2<UserCreatorLabel, EngineTypeLabel> tuple2 =
EntranceUtils.fromKeyGetLabels(key);
RequestQueryEngineConfig requestQueryEngineConfig =
new RequestQueryEngineConfig(tuple2._1, tuple2._2(), null);
int jobLimit =
(int) EntranceConfiguration.ENTRANCE_CREATOR_JOB_LIMIT().getValue();
try {
Object response = confSender.ask(requestQueryEngineConfig);
if (response instanceof ResponseQueryConfig) {
jobLimit =
(int)
EntranceConfiguration.ENTRANCE_CREATOR_JOB_LIMIT()
.getValue(((ResponseQueryConfig) response).getKeyAndValue());
}
} catch (Exception e) {
logger.warn("Failed to get key {} from conf", key, e);
}
return jobLimit;
}
});

public static int getCreatorECTypeMaxRunningJobs(String creator, String ecType) {
int jobLimit = (int) EntranceConfiguration.ENTRANCE_CREATOR_JOB_LIMIT().getValue();
if (StringUtils.isNoneBlank(creator, ecType)) {
try {
String key = EntranceUtils.getDefaultCreatorECTypeKey(creator, ecType);
jobLimit = confCache.get(key);
} catch (Exception e) {
logger.warn("Failed to get key creator {} ecType {} from cache", creator, ecType, e);
}
}
int entranceNumber = EntranceUtils.getRunningEntranceNumber();
return jobLimit / entranceNumber;
}
}
Original file line number Diff line number Diff line change
@@ -20,13 +20,17 @@
import org.apache.linkis.common.ServiceInstance;
import org.apache.linkis.entrance.EntranceContext;
import org.apache.linkis.entrance.EntranceServer;
import org.apache.linkis.entrance.conf.EntranceConfiguration;
import org.apache.linkis.entrance.conf.EntranceConfiguration$;
import org.apache.linkis.entrance.constant.ServiceNameConsts;
import org.apache.linkis.entrance.execute.EntranceJob;
import org.apache.linkis.entrance.job.EntranceExecutionJob;
import org.apache.linkis.entrance.log.LogReader;
import org.apache.linkis.governance.common.protocol.conf.EntranceInstanceConfRequest;
import org.apache.linkis.rpc.Sender;

import org.apache.commons.io.IOUtils;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.event.ContextClosedEvent;
import org.springframework.context.event.EventListener;
@@ -94,13 +98,19 @@ private void shutdownEntrance(ContextClosedEvent event) {
if (shutdownFlag) {
logger.warn("event has been handled");
} else {
if (EntranceConfiguration.ENTRANCE_SHUTDOWN_FAILOVER_CONSUME_QUEUE_ENABLED()) {
logger.warn("Entrance exit to update and clean all ConsumeQueue task instances");
updateAllNotExecutionTaskInstances(false);
}

logger.warn("Entrance exit to stop all job");
EntranceJob[] allUndoneJobs = getAllUndoneTask(null);
if (null != allUndoneJobs) {
for (EntranceJob job : allUndoneJobs) {
EntranceJob[] allUndoneTask = getAllUndoneTask(null, null);
if (null != allUndoneTask) {
for (EntranceJob job : allUndoneTask) {
job.onFailure(
"Your job will be marked as canceled because the Entrance service restarted(因为Entrance服务重启,您的任务将被标记为取消)",
null);
IOUtils.closeQuietly(((EntranceExecutionJob) job).getLogWriter().get());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.linkis.entrance.server;

import org.apache.linkis.common.ServiceInstance;
import org.apache.linkis.common.utils.Utils;
import org.apache.linkis.entrance.EntranceServer;
import org.apache.linkis.entrance.conf.EntranceConfiguration;
import org.apache.linkis.entrance.constant.ServiceNameConsts;
import org.apache.linkis.entrance.scheduler.EntranceSchedulerContext;
import org.apache.linkis.entrance.utils.JobHistoryHelper;
import org.apache.linkis.governance.common.entity.job.JobRequest;
import org.apache.linkis.publicservice.common.lock.entity.CommonLock;
import org.apache.linkis.publicservice.common.lock.service.CommonLockService;
import org.apache.linkis.rpc.Sender;
import org.apache.linkis.scheduler.queue.SchedulerEventState;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.event.ContextClosedEvent;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
import java.util.stream.Collectors;

import scala.Enumeration;
import scala.collection.Iterator;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Component(ServiceNameConsts.ENTRANCE_FAILOVER_SERVER)
public class EntranceFailoverJobServer {

private static final Logger logger = LoggerFactory.getLogger(EntranceFailoverJobServer.class);

@Autowired private EntranceServer entranceServer;

@Autowired private CommonLockService commonLockService;

private static String ENTRANCE_FAILOVER_LOCK = "ENTRANCE_FAILOVER_LOCK";

private ScheduledExecutorService scheduledExecutor;

private Future future;

@PostConstruct
public void init() {
if (EntranceConfiguration.ENTRANCE_FAILOVER_ENABLED()) {
this.scheduledExecutor =
Executors.newSingleThreadScheduledExecutor(
Utils.threadFactory("Linkis-Failover-Scheduler-Thread-", true));
failoverTask();
}
}

@EventListener
private void shutdownFailover(ContextClosedEvent event) {
if (future != null && !future.isDone()) {
future.cancel(true);
}
if (scheduledExecutor != null) {
scheduledExecutor.shutdown();
logger.info("Entrance Failover Server exit!");
}
}

public void failoverTask() {
future =
scheduledExecutor.scheduleWithFixedDelay(
() -> {
EntranceSchedulerContext schedulerContext =
(EntranceSchedulerContext)
entranceServer
.getEntranceContext()
.getOrCreateScheduler()
.getSchedulerContext();

// entrance do not failover job when it is offline
if (schedulerContext.getOfflineFlag()) return;

CommonLock commonLock = new CommonLock();
commonLock.setLockObject(ENTRANCE_FAILOVER_LOCK);
Boolean locked = false;
try {
locked = commonLockService.lock(commonLock, 30 * 1000L);
if (!locked) return;
logger.info("success locked {}", ENTRANCE_FAILOVER_LOCK);

// get all entrance server from eureka
ServiceInstance[] serviceInstances =
Sender.getInstances(Sender.getThisServiceInstance().getApplicationName());
if (serviceInstances == null || serviceInstances.length <= 0) return;

// serverInstance to map
Map<String, Long> serverInstanceMap =
Arrays.stream(serviceInstances)
.collect(
Collectors.toMap(
ServiceInstance::getInstance,
ServiceInstance::getRegistryTimestamp,
(k1, k2) -> k2));

// It is very important to avoid repeated execute job
// when failover self job, if self instance is empty, the job can be repeated
// execute
if (!serverInstanceMap.containsKey(Sender.getThisInstance())) {
logger.warn(
"server has just started and has not get self info, it does not failover");
return;
}

// get failover job expired time (获取任务故障转移过期时间,配置为0表示不过期, 过期则不处理)
long expiredTimestamp = 0L;
if (EntranceConfiguration.ENTRANCE_FAILOVER_DATA_INTERVAL_TIME() > 0) {
expiredTimestamp =
System.currentTimeMillis()
- EntranceConfiguration.ENTRANCE_FAILOVER_DATA_INTERVAL_TIME();
}

List<JobRequest> jobRequests =
JobHistoryHelper.queryWaitForFailoverTask(
serverInstanceMap,
getUnCompleteStatus(),
expiredTimestamp,
EntranceConfiguration.ENTRANCE_FAILOVER_DATA_NUM_LIMIT());
if (jobRequests.isEmpty()) return;
List<Long> ids =
jobRequests.stream().map(JobRequest::getId).collect(Collectors.toList());
logger.info("success query failover jobs , job size: {}, ids: {}", ids.size(), ids);

// failover to local server
for (JobRequest jobRequest : jobRequests) {
entranceServer.failoverExecute(jobRequest);
}
logger.info("finished execute failover jobs, job ids: {}", ids);

} catch (Exception e) {
logger.error("failover failed", e);
} finally {
if (locked) commonLockService.unlock(commonLock);
}
},
EntranceConfiguration.ENTRANCE_FAILOVER_SCAN_INIT_TIME(),
EntranceConfiguration.ENTRANCE_FAILOVER_SCAN_INTERVAL(),
TimeUnit.MILLISECONDS);
}

private List<String> getUnCompleteStatus() {
List<String> status = new ArrayList<>();
Enumeration.ValueSet values = SchedulerEventState.values();
Iterator<Enumeration.Value> iterator = values.iterator();
while (iterator.hasNext()) {
Enumeration.Value next = iterator.next();
if (!SchedulerEventState.isCompleted(next)) status.add(next.toString());
}
return status;
}
}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -215,18 +215,6 @@ class EntranceWebSocketService
s"Your job's execution code is (after variable substitution and code check) "
)
)
entranceServer.getEntranceContext
.getOrCreateLogManager()
.onLogUpdate(
job,
"************************************SCRIPT CODE************************************"
)
entranceServer.getEntranceContext
.getOrCreateLogManager()
.onLogUpdate(
job,
"************************************SCRIPT CODE************************************"
)
entranceServer.getEntranceContext
.getOrCreateLogManager()
.onLogUpdate(
Original file line number Diff line number Diff line change
@@ -220,7 +220,69 @@ object EntranceConfiguration {
val CREATOR_IP_SWITCH =
CommonVars("wds.linkis.entrance.user.creator.ip.interceptor.switch", false)

val TEMPLATE_CONF_SWITCH =
CommonVars("linkis.entrance.template.conf.interceptor.switch", false)

val ENABLE_ENTRANCE_DIRTY_DATA_CLEAR =
CommonVars("linkis.entrance.auto.clean.dirty.data.enable", false)

val ENTRANCE_CREATOR_JOB_LIMIT: CommonVars[Int] =
CommonVars[Int](
"linkis.entrance.creator.job.concurrency.limit",
10000,
"Creator task concurrency limit parameters"
)

val ENTRANCE_CREATOR_JOB_LIMIT_CONF_CACHE =
CommonVars("linkis.entrance.creator.job.concurrency.limit.conf.cache.time", 30L)

val ENTRANCE_TASK_TIMEOUT =
CommonVars("linkis.entrance.task.timeout", new TimeType("48h"))

val ENTRANCE_TASK_TIMEOUT_SCAN =
CommonVars("linkis.entrance.task.timeout.scan", new TimeType("12h"))

val ENABLE_HDFS_JVM_USER =
CommonVars[Boolean]("linkis.entrance.enable.hdfs.jvm.user", true).getValue

val ENTRANCE_FAILOVER_ENABLED = CommonVars("linkis.entrance.failover.enable", true).getValue

val ENTRANCE_FAILOVER_SCAN_INIT_TIME =
CommonVars("linkis.entrance.failover.scan.init.time", 3 * 1000).getValue

val ENTRANCE_FAILOVER_SCAN_INTERVAL =
CommonVars("linkis.entrance.failover.scan.interval", 30 * 1000).getValue

val ENTRANCE_FAILOVER_DATA_NUM_LIMIT =
CommonVars("linkis.entrance.failover.data.num.limit", 10).getValue

val ENTRANCE_FAILOVER_DATA_INTERVAL_TIME =
CommonVars("linkis.entrance.failover.data.interval.time", new TimeType("1d").toLong).getValue

// if true, the waitForRetry job in runningJobs can be failover
val ENTRANCE_FAILOVER_RETRY_JOB_ENABLED =
CommonVars("linkis.entrance.failover.retry.job.enable", false)

val ENTRANCE_UPDATE_BATCH_SIZE = CommonVars("linkis.entrance.update.batch.size", 100)

// if true, the job in ConsumeQueue can be failover
val ENTRANCE_SHUTDOWN_FAILOVER_CONSUME_QUEUE_ENABLED =
CommonVars("linkis.entrance.shutdown.failover.consume.queue.enable", true).getValue

val ENTRANCE_GROUP_SCAN_ENABLED = CommonVars("linkis.entrance.group.scan.enable", true)

val ENTRANCE_GROUP_SCAN_INIT_TIME = CommonVars("linkis.entrance.group.scan.init.time", 3 * 1000)

val ENTRANCE_GROUP_SCAN_INTERVAL = CommonVars("linkis.entrance.group.scan.interval", 60 * 1000)

val ENTRANCE_FAILOVER_RETAIN_METRIC_ENGINE_CONN_ENABLED =
CommonVars("linkis.entrance.failover.retain.metric.engine.conn.enable", false)

val ENTRANCE_FAILOVER_RETAIN_METRIC_YARN_RESOURCE_ENABLED =
CommonVars("linkis.entrance.failover.retain.metric.yarn.resource.enable", false)

// if true, job whose status is running will be set to Cancelled
val ENTRANCE_FAILOVER_RUNNING_KILL_ENABLED =
CommonVars("linkis.entrance.failover.running.kill.enable", false)

}
Original file line number Diff line number Diff line change
@@ -18,7 +18,6 @@
package org.apache.linkis.entrance.interceptor.impl

import org.apache.linkis.common.utils.CodeAndRunTypeUtils
import org.apache.linkis.entrance.conf.EntranceConfiguration
import org.apache.linkis.entrance.interceptor.EntranceInterceptor
import org.apache.linkis.governance.common.entity.job.JobRequest
import org.apache.linkis.manager.label.utils.LabelUtil
Original file line number Diff line number Diff line change
@@ -63,7 +63,7 @@ object CustomVariableUtils extends Logging {
}
val variableMap = TaskUtils
.getVariableMap(jobRequest.getParams)
.asInstanceOf[util.HashMap[String, String]]
.asInstanceOf[util.Map[String, String]]
variables.putAll(variableMap)
if (!variables.containsKey("user")) {
variables.put("user", jobRequest.getExecuteUser)
Original file line number Diff line number Diff line change
@@ -57,6 +57,8 @@ object SparkExplain extends Explain {
private val sy = Pattern.compile("sys\\.")
private val scCancelAllJobs = Pattern.compile("sc\\.cancelAllJobs(\\s*)")
private val runtime = Pattern.compile("Runtime\\.getRuntime")
private val LINE_BREAK = "\n"
private val LOG: Logger = LoggerFactory.getLogger(getClass)

override def authPass(code: String, error: StringBuilder): Boolean = {
if (EntranceConfiguration.SKIP_AUTH.getHotValue()) {
@@ -99,6 +101,7 @@ object SQLExplain extends Explain {
private val LIMIT: String = "limit"
private val LIMIT_UPPERCASE: String = "LIMIT"
private val IDE_ALLOW_NO_LIMIT = "--set wds.linkis.engine.no.limit.allow=true"
private val LOG: Logger = LoggerFactory.getLogger(getClass)

override def authPass(code: String, error: StringBuilder): Boolean = {
true
@@ -131,6 +134,8 @@ object SQLExplain extends Explain {
.generateWarn("please pay attention ,SQL full export mode opened(请注意,SQL全量导出模式打开)\n")
)
}
var isFirstTimePrintingLimit = true
var isFirstTimePrintingOverLimit = true
if (tempCode.contains("""\;""")) {
val semicolonIndexes = findRealSemicolonIndex(tempCode)
var oldIndex = 0
@@ -140,20 +145,27 @@ object SQLExplain extends Explain {
if (isSelectCmd(singleCode)) {
val trimCode = singleCode.trim
if (isSelectCmdNoLimit(trimCode) && !isNoLimitAllowed) {
logAppender.append(
LogUtils.generateWarn(
s"You submitted a sql without limit, DSS will add limit 5000 to your sql"
) + "\n"
)
if (isFirstTimePrintingLimit) {
logAppender.append(
LogUtils.generateWarn(
s"You submitted a sql without limit, DSS will add limit 5000 to your sql"
) + "\n"
)
isFirstTimePrintingLimit = false
}
// 将注释先干掉,然后再进行添加limit
val realCode = cleanComment(trimCode)
fixedCode += (realCode + SQL_APPEND_LIMIT)
} else if (isSelectOverLimit(singleCode) && !isNoLimitAllowed) {
val trimCode = singleCode.trim
logAppender.append(
LogUtils.generateWarn(
s"You submitted a sql with limit exceeding 5000, it is not allowed. DSS will change your limit to 5000"
) + "\n"
)
if (isFirstTimePrintingOverLimit) {
logAppender.append(
LogUtils.generateWarn(
s"You submitted a sql with limit exceeding 5000, it is not allowed. DSS will change your limit to 5000"
) + "\n"
)
isFirstTimePrintingOverLimit = false
}
fixedCode += repairSelectOverLimit(trimCode)
} else {
fixedCode += singleCode.trim
@@ -167,20 +179,27 @@ object SQLExplain extends Explain {
if (isSelectCmd(singleCode)) {
val trimCode = singleCode.trim
if (isSelectCmdNoLimit(trimCode) && !isNoLimitAllowed) {
logAppender.append(
LogUtils.generateWarn(
s"You submitted a sql without limit, DSS will add limit 5000 to your sql"
) + "\n"
)
if (isFirstTimePrintingLimit) {
logAppender.append(
LogUtils.generateWarn(
s"You submitted a sql without limit, DSS will add limit 5000 to your sql"
) + "\n"
)
isFirstTimePrintingLimit = false
}
// 将注释先干掉,然后再进行添加limit
val realCode = cleanComment(trimCode)
fixedCode += (realCode + SQL_APPEND_LIMIT)
} else if (isSelectOverLimit(singleCode) && !isNoLimitAllowed) {
val trimCode = singleCode.trim
logAppender.append(
LogUtils.generateWarn(
s"You submitted a sql with limit exceeding 5000, it is not allowed. DSS will change your limit to 5000"
) + "\n"
)
if (isFirstTimePrintingOverLimit) {
logAppender.append(
LogUtils.generateWarn(
s"You submitted a sql with limit exceeding 5000, it is not allowed. DSS will change your limit to 5000"
) + "\n"
)
isFirstTimePrintingOverLimit = false
}
fixedCode += repairSelectOverLimit(trimCode)
} else {
fixedCode += singleCode.trim
@@ -210,23 +229,26 @@ object SQLExplain extends Explain {
array.toArray
}

private def addNoLimit(code: String) = code + NO_LIMIT_STRING

protected def needNoLimit(code: String): Boolean = code.endsWith(NO_LIMIT_STRING)

def isSelectCmd(code: String): Boolean = {
if (StringUtils.isEmpty(code)) {
return false
}
val realCode = cleanComment(code)
realCode.trim.split("\\s+")(0).toLowerCase().contains("select")
realCode.trim.split("\\s+")(0).toLowerCase(Locale.getDefault).contains("select")
}

def continueWhenError: Boolean = false
// def continueWhenError = false

def isSelectCmdNoLimit(cmd: String): Boolean = {
if (StringUtils.isEmpty(cmd)) {
return false
}
val realCode = cmd.trim
// limit is often the last in a sql statement, so you need to make a final judgment
val arr = realCode.split("\\s+")
val words = new ArrayBuffer[String]()
arr foreach { w =>
@@ -235,8 +257,10 @@ object SQLExplain extends Explain {
val a = words.toArray
val length = a.length
if (a.length > 1) {
val second_last = a(length - 2)
!"limit".equals(second_last.toLowerCase())
val second_last = a(length - 2).toLowerCase(Locale.getDefault)
// for some case eg:"SELECT * from dual WHERE (1=1)LIMIT 1;"
val result = !("limit".equals(second_last) || second_last.contains(")limit"))
result
} else {
false
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.linkis.entrance.interceptor.impl

import org.apache.linkis.entrance.conf.EntranceConfiguration
import org.apache.linkis.entrance.interceptor.EntranceInterceptor
import org.apache.linkis.governance.common.entity.job.JobRequest

import java.lang

class TemplateConfInterceptor extends EntranceInterceptor {

override def apply(jobRequest: JobRequest, logAppender: lang.StringBuilder): JobRequest = {
if (EntranceConfiguration.TEMPLATE_CONF_SWITCH.getValue) {
TemplateConfUtils.dealWithTemplateConf(jobRequest, logAppender)
} else {
jobRequest
}
}

}
Loading

0 comments on commit 2d6ef74

Please sign in to comment.