Skip to content

Commit

Permalink
code format
Browse files Browse the repository at this point in the history
  • Loading branch information
aiceflower committed Dec 20, 2024
1 parent 9f4133a commit 8b6fd5e
Show file tree
Hide file tree
Showing 17 changed files with 73 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ object SchedulerConfiguration {

// support fifo pfifo
val FIFO_QUEUE_STRATEGY =
CommonVars("linkis.fifo.queue.strategy", FIFO_SCHEDULER_STRATEGY).getValue
CommonVars("linkis.fifo.queue.strategy", "fifo").getValue

val SUPPORT_PRIORITY_TASK_USERS =
CommonVars("linkis.fifo.queue.support.priority.users", "").getValue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,13 +116,13 @@ class ParallelConsumerManager(maxParallelismUsers: Int, schedulerName: String)
val newConsumer = createConsumer(groupName)
val group = getSchedulerContext.getOrCreateGroupFactory.getGroup(groupName)
newConsumer.setGroup(group)
val fifoQueueStrategy: String = FIFO_QUEUE_STRATEGY.toLowerCase()
// 需要判断人员是否是指定部门
val consumerQueue: ConsumeQueue =
if (
PFIFO_SCHEDULER_STRATEGY
.equals(fifoQueueStrategy) && isSupportPriority(groupName)
.equalsIgnoreCase(FIFO_QUEUE_STRATEGY) && isSupportPriority(groupName)
) {
logger.info(s"use priority queue: ${groupName}")
new PriorityLoopArrayQueue(group)
} else new LoopArrayQueue(group)
newConsumer.setConsumeQueue(consumerQueue)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.linkis.entrance

import org.apache.commons.lang3.StringUtils
import org.apache.commons.lang3.exception.ExceptionUtils
import org.apache.linkis.common.exception.{ErrorException, LinkisException, LinkisRuntimeException}
import org.apache.linkis.common.log.LogUtils
import org.apache.linkis.common.utils.{Logging, Utils}
Expand All @@ -35,10 +33,17 @@ import org.apache.linkis.governance.common.utils.LoggerUtils
import org.apache.linkis.protocol.constants.TaskConstant
import org.apache.linkis.protocol.utils.TaskUtils
import org.apache.linkis.rpc.Sender
import org.apache.linkis.scheduler.conf.SchedulerConfiguration.{ENGINE_PRIORITY_RUNTIME_KEY, FIFO_QUEUE_STRATEGY, PFIFO_SCHEDULER_STRATEGY}
import org.apache.linkis.scheduler.conf.SchedulerConfiguration.{
ENGINE_PRIORITY_RUNTIME_KEY,
FIFO_QUEUE_STRATEGY,
PFIFO_SCHEDULER_STRATEGY
}
import org.apache.linkis.scheduler.queue.{Job, SchedulerEventState}
import org.apache.linkis.server.conf.ServerConfiguration

import org.apache.commons.lang3.StringUtils
import org.apache.commons.lang3.exception.ExceptionUtils

import java.text.MessageFormat
import java.util
import java.util.concurrent.TimeUnit
Expand Down Expand Up @@ -171,11 +176,15 @@ abstract class EntranceServer extends Logging {
)
}

Utils.tryAndWarn{
Utils.tryAndWarn {
// 如果是使用优先级队列,设置下优先级
val properties: util.Map[String, AnyRef] = TaskUtils.getRuntimeMap(params)
val fifoStrategy: String = FIFO_QUEUE_STRATEGY
if (PFIFO_SCHEDULER_STRATEGY.equalsIgnoreCase(fifoStrategy) && properties != null && !properties.isEmpty) {
if (
PFIFO_SCHEDULER_STRATEGY.equalsIgnoreCase(
fifoStrategy
) && properties != null && !properties.isEmpty
) {
val priorityValue: AnyRef = properties.get(ENGINE_PRIORITY_RUNTIME_KEY)
if (priorityValue != null) {
val value: Int = priorityValue.toString.toInt
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@

package org.apache.linkis.entrance.interceptor.impl

import org.apache.commons.lang3.exception.ExceptionUtils
import org.apache.linkis.common.exception.ErrorException
import org.apache.linkis.common.utils.{Logging, Utils}
import org.apache.linkis.entrance.interceptor.EntranceInterceptor
import org.apache.linkis.entrance.interceptor.exception.LogPathCreateException
import org.apache.linkis.entrance.parser.ParserUtils
import org.apache.linkis.governance.common.entity.job.JobRequest

import org.apache.commons.lang3.exception.ExceptionUtils

/**
* Description:Log path generation interceptor, used to set the path log of the task(日志路径生成拦截器,
* 用于设置task的路径日志)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,8 @@ class UJESSQLResultSet(
}
val metaTmp = resultSetResult.getMetadata
if (NULL_VALUE.equals(String.valueOf(metaTmp))) {
val fileContentList = resultSetResult.getFileContent.asInstanceOf[util.List[util.List[String]]]
val fileContentList =
resultSetResult.getFileContent.asInstanceOf[util.List[util.List[String]]]
if (null != fileContentList) {
resultSetMetaData.setColumnNameProperties(1, "linkis_string")
resultSetMetaData.setDataTypeProperties(1, "String")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@ public class AMConfiguration {
CommonVars.apply("wds.linkis.allow.batch.kill.engine.types", "trino,appconn,io_file,jdbc");
public static final CommonVars<String> MULTI_USER_ENGINE_USER =
CommonVars.apply("wds.linkis.multi.user.engine.user", getDefaultMultiEngineUser());
public static final String UDF_KILL_ENGINE_TYPE = CommonVars.apply("linkis.udf.kill.engine.type", "spark,hive").getValue();
public static final String UDF_KILL_ENGINE_TYPE =
CommonVars.apply("linkis.udf.kill.engine.type", "spark,hive").getValue();

public static final CommonVars<Integer> ENGINE_LOCKER_MAX_TIME =
CommonVars.apply("wds.linkis.manager.am.engine.locker.max.time", 1000 * 60 * 5);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -772,16 +772,16 @@ public Message killEngineByUpdateConfig(HttpServletRequest req, @RequestBody Jso
}
if (engineType.equals(Configuration.GLOBAL_CONF_SYMBOL())) {
Arrays.stream(AMConfiguration.UDF_KILL_ENGINE_TYPE.split(","))
.forEach(
engine ->
engineStopService.stopUnlockECByUserCreatorAndECType(
userName, creatorStr, engine));
.forEach(
engine ->
engineStopService.stopUnlockECByUserCreatorAndECType(
userName, creatorStr, engine));
} else {
engineStopService.stopUnlockECByUserCreatorAndECType(
userName, creatorStr, engineType);
engineStopService.stopUnlockECByUserCreatorAndECType(userName, creatorStr, engineType);
}
return Message.ok("Kill engineConn succeed");
}

static ServiceInstance getServiceInstance(JsonNode jsonNode) throws AMErrorException {
String applicationName = jsonNode.get("applicationName").asText();
String instance = jsonNode.get("instance").asText();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ public NodeResource requestResourceInfo(

String queueName = ((YarnResourceIdentifier) identifier).getQueueName();
if (queueName.startsWith(queuePrefix)) {
logger.info("Queue name {} starts with '{}', remove '{}'", queueName, queuePrefix, queuePrefix);
logger.info(
"Queue name {} starts with '{}', remove '{}'", queueName, queuePrefix, queuePrefix);
queueName = queueName.substring(queuePrefix.length());
}
String realQueueName = queuePrefix + queueName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,9 @@ class HiveEngineConnExecutor(
if (numberOfMRJobs > 0) {
engineExecutorContext.appendStdout(s"Your hive sql has $numberOfMRJobs MR jobs to do")
val queueName = hiveConf.get(HiveEngineConfiguration.HIVE_QUEUE_NAME)
engineExecutorContext.appendStdout(s"Your task will be submitted to the $queueName queue")
engineExecutorContext.appendStdout(
s"Your task will be submitted to the $queueName queue"
)
}
if (thread.isInterrupted) {
logger.error(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ public void jobHistoryFinishedScan() {
// 新增失败任务分析扫描
try {
JobHistoryAnalyzeRule jobHistoryAnalyzeRule =
new JobHistoryAnalyzeRule(new JobHistoryAnalyzeAlertSender());
new JobHistoryAnalyzeRule(new JobHistoryAnalyzeAlertSender());
scanner.addScanRule(jobHistoryAnalyzeRule);
} catch (Exception e) {
logger.warn("JobHistoryAnalyzeRule Scan Error msg: " + e.getMessage());
Expand Down Expand Up @@ -252,21 +252,21 @@ public void jdbcUnfinishedAlertScan() {
@Scheduled(cron = "${linkis.monitor.jdbc.timeout.kill.cron:0 0/10 0 * * ?}")
public void jdbcUnfinishedKillScan() {
long id =
Optional.ofNullable(CacheUtils.cacheBuilder.getIfPresent("jdbcUnfinishedKillScan"))
.orElse(MonitorConfig.JOB_HISTORY_TIME_EXCEED.getValue());
Optional.ofNullable(CacheUtils.cacheBuilder.getIfPresent("jdbcUnfinishedKillScan"))
.orElse(MonitorConfig.JOB_HISTORY_TIME_EXCEED.getValue());
long intervalMs = 7200 * 1000;
long maxIntervalMs = Constants.ERRORCODE_MAX_INTERVALS_SECONDS() * 1000;
long endTime = System.currentTimeMillis();
long startTime = endTime - intervalMs;
AnomalyScanner scanner = new DefaultScanner();
List<DataFetcher> fetchers =
JobMonitorUtils.generateFetchers(startTime, endTime, maxIntervalMs, id, "");
JobMonitorUtils.generateFetchers(startTime, endTime, maxIntervalMs, id, "");
if (fetchers.isEmpty()) {
logger.warn("jdbcUnfinishedScan generated 0 dataFetchers, plz check input");
return;
}
StarrocksTimeKillRule starrocksTimeKillRule =
new StarrocksTimeKillRule(new StarrocksTimeKillAlertSender());
new StarrocksTimeKillRule(new StarrocksTimeKillAlertSender());
scanner.addScanRule(starrocksTimeKillRule);
JobMonitorUtils.run(scanner, fetchers, true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import java.util

import scala.collection.JavaConverters._


class JobHistoryAnalyzeAlertSender() extends Observer with Logging {
override def update(e: Event, jobHistroyList: scala.Any): Unit = {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ class StarrocksTimeKillAlertSender extends Observer with Logging {
/**
* Observer Pattern
*/
override def update(e: Event, jobHistroyList: scala.Any): Unit = {
}
override def update(e: Event, jobHistroyList: scala.Any): Unit = {}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.linkis.monitor.jobhistory.jobtime

import org.apache.commons.collections.MapUtils
import org.apache.commons.lang3.StringUtils
import org.apache.linkis.common.utils.Logging
import org.apache.linkis.monitor.constants.Constants
import org.apache.linkis.monitor.core.ob.Observer
Expand All @@ -27,8 +25,12 @@ import org.apache.linkis.monitor.jobhistory.entity.JobHistory
import org.apache.linkis.monitor.until.{CacheUtils, HttpsUntils}
import org.apache.linkis.server.BDPJettyServerHelper

import org.apache.commons.collections.MapUtils
import org.apache.commons.lang3.StringUtils

import java.util
import java.util.Locale

import scala.collection.JavaConverters._

class StarrocksTimeKillRule(hitObserver: Observer)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,8 @@ public List<Map<String, Object>> getTablesByDbNameAndOptionalUserName(
queryParam.withRoles(roles);
List<Map<String, Object>> hiveTables =
hiveMetaDao.getTablesByDbNameAndUserAndRolesFromDbPrvs(queryParam);
hiveTables.addAll(
hiveMetaDao.getTablesByDbNameAndUserAndRolesFromTblPrvs(queryParam));
return hiveTables.stream()
.distinct()
.collect(Collectors.toList());
hiveTables.addAll(hiveMetaDao.getTablesByDbNameAndUserAndRolesFromTblPrvs(queryParam));
return hiveTables.stream().distinct().collect(Collectors.toList());
} else {
log.info("user {} to getTablesByDbName no permission control", queryParam.getUserName());
return hiveMetaDao.getTablesByDbName(queryParam);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -774,13 +774,14 @@ public Message getUserKeyValue(
parms.put("nonce", SHAUtils.DOCTOR_NONCE);
// doctor提供的token
String token = SHAUtils.DOCTOR_TOKEN.getValue();
if (StringUtils.isNotBlank(token)){
if (StringUtils.isNotBlank(token)) {
String signature =
SHAUtils.Encrypt(
SHAUtils.Encrypt(
SHAUtils.Encrypt(
parms.get("app_id") + SHAUtils.DOCTOR_NONCE + System.currentTimeMillis(), null)
+ token,
null);
parms.get("app_id") + SHAUtils.DOCTOR_NONCE + System.currentTimeMillis(),
null)
+ token,
null);
parms.put("signature", signature);
return Message.ok().data("doctor", parms);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1440,14 +1440,14 @@ public Message pythonUpload(
}

@ApiImplicitParam(
name = "path",
dataType = "String",
value = "path",
example = "file:///test-dir/test-sub-dir/test1012_01.py")
name = "path",
dataType = "String",
value = "path",
example = "file:///test-dir/test-sub-dir/test1012_01.py")
@RequestMapping(path = "/get-register-functions", method = RequestMethod.GET)
public Message getRegisterFunctions(HttpServletRequest req, @RequestParam("path") String path) {
if (StringUtils.endsWithIgnoreCase(path, Constants.FILE_EXTENSION_PY)
|| StringUtils.endsWithIgnoreCase(path, Constants.FILE_EXTENSION_SCALA)) {
|| StringUtils.endsWithIgnoreCase(path, Constants.FILE_EXTENSION_SCALA)) {
if (StringUtils.startsWithIgnoreCase(path, StorageUtils$.MODULE$.FILE_SCHEMA())) {
try {
FsPath fsPath = new FsPath(path);
Expand All @@ -1456,7 +1456,7 @@ public Message getRegisterFunctions(HttpServletRequest req, @RequestParam("path"
fileSystem.init(null);
if (fileSystem.canRead(fsPath)) {
return Message.ok()
.data("functions", UdfUtils.getRegisterFunctions(fileSystem, fsPath, path));
.data("functions", UdfUtils.getRegisterFunctions(fileSystem, fsPath, path));
} else {
return Message.error("您没有权限访问该文件");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@

import org.apache.linkis.common.conf.Configuration;
import org.apache.linkis.common.io.FsPath;
import org.apache.linkis.common.utils.JsonUtils;
import org.apache.linkis.common.utils.Utils;
import org.apache.linkis.storage.fs.FileSystem;
import org.apache.linkis.storage.utils.StorageUtils$;
import org.apache.linkis.udf.conf.Constants;
import org.apache.linkis.udf.exception.UdfException;

Expand All @@ -29,10 +32,6 @@
import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.linkis.common.utils.JsonUtils;
import org.apache.linkis.storage.fs.FileSystem;
import org.apache.linkis.storage.utils.StorageUtils$;
import com.fasterxml.jackson.core.type.TypeReference;

import org.springframework.web.multipart.MultipartFile;

Expand All @@ -47,6 +46,7 @@
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;

import com.fasterxml.jackson.core.type.TypeReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -286,7 +286,7 @@ public static List<String> extractDependencies(String content, String name) {
}

public static List<String> getRegisterFunctions(FileSystem fileSystem, FsPath fsPath, String path)
throws Exception {
throws Exception {
try (InputStream is = fileSystem.read(fsPath)) {
// 将inputstream内容转换为字符串
String content = IOUtils.toString(is, StandardCharsets.UTF_8);
Expand Down Expand Up @@ -322,17 +322,17 @@ public static List<String> extractScalaMethodNames(String scalaCode) {
public static List<String> extractPythonMethodNames(String udfPath) throws Exception {
String localPath = udfPath.replace(StorageUtils$.MODULE$.FILE_SCHEMA(), "");
String exec =
Utils.exec(
(new String[] {
Constants.PYTHON_COMMAND.getValue(),
Configuration.getLinkisHome() + "/admin/" + "linkis_udf_get_python_methods.py",
localPath
}));
Utils.exec(
(new String[] {
Constants.PYTHON_COMMAND.getValue(),
Configuration.getLinkisHome() + "/admin/" + "linkis_udf_get_python_methods.py",
localPath
}));
logger.info(
"execute python script to get python method name...{} {} {}",
Constants.PYTHON_COMMAND.getValue(),
Configuration.getLinkisHome() + "/admin/" + "linkis_udf_get_python_methods.py",
localPath);
"execute python script to get python method name...{} {} {}",
Constants.PYTHON_COMMAND.getValue(),
Configuration.getLinkisHome() + "/admin/" + "linkis_udf_get_python_methods.py",
localPath);
// 将exec转换为List<String>,exec为一个json数组
return JsonUtils.jackson().readValue(exec, new TypeReference<List<String>>() {});
}
Expand Down

0 comments on commit 8b6fd5e

Please sign in to comment.