Skip to content

Commit

Permalink
Merge branch 'dev-1.1.15-webank' into dev-1.1.15-engine-conf-update
Browse files Browse the repository at this point in the history
  • Loading branch information
v-kkhuang committed Sep 5, 2023
2 parents 66e94f1 + f9a7338 commit 08df665
Show file tree
Hide file tree
Showing 24 changed files with 100 additions and 51 deletions.
2 changes: 1 addition & 1 deletion docs/configuration/linkis-storage.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,4 @@
|linkis-storage|wds.linkis.fs.hdfs.impl.disable.cache| false |disable.cache |true|
|linkis-storage|wds.linkis.hdfs.rest.errs| |rest.errs|true|
|linkis-storage|wds.linkis.resultset.row.max.str | 2m | max.str |true|
|linkis-storage|wds.linkis.storage.file.type | dolphin,sql,scala,py,hql,python,out,log,text,sh,jdbc,ngql,psql,fql,tsql | file.type |true|
|linkis-storage|wds.linkis.storage.file.type | dolphin,sql,scala,py,hql,python,out,log,text,sh,jdbc,ngql,psql,fql,tsql,txt | file.type |true|
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ object LinkisStorageConf {

val FILE_TYPE = CommonVars(
"wds.linkis.storage.file.type",
"dolphin,sql,scala,py,hql,python,out,log,text,sh,jdbc,ngql,psql,fql,tsql"
"dolphin,sql,scala,py,hql,python,out,log,text,sh,jdbc,ngql,psql,fql,tsql,txt"
).getValue

private var fileTypeArr: Array[String] = null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ object DataType extends Logging {

val LOWCASE_NULL_VALUE = "null"

val DECIMAL_REGEX = "^decimal\\(\\d*\\,\\d*\\)".r.unanchored
val DECIMAL_REGEX = "^decimal\\(\\s*\\d*\\s*,\\s*\\d*\\s*\\)".r.unanchored

val SHORT_REGEX = "^short.*".r.unanchored
val INT_REGEX = "^int.*".r.unanchored
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,11 @@ class DataTypeTest {
Assertions.assertNotNull(str)
}

@Test
@DisplayName("decimalTest")
def decimalTest(): Unit = {
val dateType = DataType.toDataType("decimal(10, 8)")
Assertions.assertTrue(dateType.typeName.equals("decimal"))
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ class TaskExecutionServiceImpl
override def run(): Unit = {
Utils.tryQuietly(Thread.sleep(TimeUnit.MILLISECONDS.convert(1, TimeUnit.SECONDS)))
while (null != taskFuture && !taskFuture.isDone) {
if (ExecutionNodeStatus.isCompleted(task.getStatus)) {
if (!ExecutionNodeStatus.isCompleted(task.getStatus)) {
Utils.tryAndWarn {
val progressResponse = Utils.tryAndWarn(taskProgress(task.getTaskId))
val resourceResponse = Utils.tryAndWarn(buildResourceMap(task))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class DefaultEngineConnResourceService extends EngineConnResourceService with Lo

private val bmlClient = BmlClientFactory.createBmlClient()
private var isRefreshing: Boolean = false
private val _LOCK = "_LOAD_ENGINE_CONN_LOCK"
private val _LOCK = "_MASTER_LOAD_ENGINE_CONN_LOCK"
val commonLock = new CommonLock
private var lock = false

Expand All @@ -74,13 +74,13 @@ class DefaultEngineConnResourceService extends EngineConnResourceService with Lo
commonLock.setCreateTime(new Date)
commonLock.setUpdateTime(new Date)
commonLock.setCreator(Utils.getJvmUser)
commonLock.setHost(Utils.getLocalHostname)
commonLock.setLocker(Utils.getLocalHostname)
commonLock.setUpdator(Utils.getJvmUser)
lock = commonLockService.reentrantLock(commonLock, -1)
if (lock) {
logger.info(
"The master LinkisManager node get lock by {}. And start to refresh all engineconn plugins when inited.",
_LOCK + "-" + commonLock.getHost
_LOCK + "-" + commonLock.getLocker
)
refreshAll(false)
}
Expand All @@ -92,7 +92,7 @@ class DefaultEngineConnResourceService extends EngineConnResourceService with Lo
logger.info(
"The master LinkisManager node hase released lock {}.",
commonLock.getLockObject + "-" +
commonLock.getHost
commonLock.getLocker
)
commonLockService.unlock(commonLock)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,13 @@ class AMHeartbeatService extends HeartbeatService with Logging {
commonLock.setCreateTime(new Date)
commonLock.setUpdateTime(new Date)
commonLock.setCreator(Utils.getJvmUser)
commonLock.setHost(Utils.getLocalHostname)
commonLock.setLocker(Utils.getLocalHostname)
commonLock.setUpdator(Utils.getJvmUser)
lock = commonLockService.reentrantLock(commonLock, -1)
if (null != managerMonitor && lock) {
logger.info(
"The master am get lock by {}. And start to init AMHeartbeatService monitor.",
_LOCK + "-" + commonLock.getHost
_LOCK + "-" + commonLock.getLocker
)
Utils.defaultScheduler.scheduleAtFixedRate(
managerMonitor,
Expand All @@ -87,7 +87,7 @@ class AMHeartbeatService extends HeartbeatService with Logging {
commonLockService.unlock(commonLock)
logger.info(
"The master am has released lock {}.",
commonLock.getLockObject + "-" + commonLock.getHost
commonLock.getLockObject + "-" + commonLock.getLocker
);
}
}
Expand Down
2 changes: 1 addition & 1 deletion linkis-dist/package/db/linkis_ddl.sql
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ DROP TABLE IF EXISTS `linkis_ps_common_lock`;
CREATE TABLE `linkis_ps_common_lock` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`lock_object` varchar(255) COLLATE utf8_bin DEFAULT NULL,
`host` varchar(255) COLLATE utf8_bin NOT NULL,
`locker` varchar(255) COLLATE utf8_bin NOT NULL,
`time_out` longtext COLLATE utf8_bin,
`update_time` datetime DEFAULT CURRENT_TIMESTAMP,
`create_time` datetime DEFAULT CURRENT_TIMESTAMP,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ ALTER TABLE `linkis_ps_configuration_config_key` ADD UNIQUE `uniq_key_ectype` (`

ALTER TABLE `linkis_ps_configuration_config_key` modify column `engine_conn_type` varchar(50) DEFAULT '' COMMENT 'engine type,such as spark,hive etc';

ALTER TABLE linkis_ps_common_lock ADD COLUMN `host` VARCHAR(255) NOT NULL COMMENT 'host';
ALTER TABLE linkis_ps_common_lock ADD COLUMN locker VARCHAR(255) NOT NULL COMMENT 'locker';

ALTER TABLE `linkis_ps_configuration_config_key` ADD column `template_required` tinyint(1) DEFAULT 0 COMMENT 'template required 0 none / 1 must'

Expand Down
17 changes: 17 additions & 0 deletions linkis-engineconn-plugins/flink/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<exclusions>
<exclusion>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
Expand Down Expand Up @@ -188,6 +194,17 @@
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
Expand Down
4 changes: 4 additions & 0 deletions linkis-engineconn-plugins/sqoop/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
import java.text.MessageFormat;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
Expand Down Expand Up @@ -358,22 +357,31 @@ public Message saveFullTree(HttpServletRequest req, @RequestBody JsonNode json)

private void sparkConfCheck(List<ConfigKeyValue> settings) throws ConfigurationException {
for (ConfigKeyValue setting : settings) {
if (setting.getKey().equals("spark.conf")) {
if (setting.getKey().equals("spark.conf")
&& StringUtils.isNotBlank(setting.getConfigValue())) {
// Check if there are any duplicates in spark. conf
String[] split = setting.getConfigValue().split(";");
Stream<String> stringStream = Arrays.stream(split).map(s -> s.split("=")[0].trim());
int setSize = stringStream.collect(Collectors.toSet()).size();
int listSize = (int) stringStream.count();
int setSize =
Arrays.stream(split)
.map(s -> s.split("=")[0].trim())
.collect(Collectors.toSet())
.size();
int listSize =
Arrays.stream(split)
.map(s -> s.split("=")[0].trim())
.collect(Collectors.toList())
.size();
if (listSize != setSize) {
throw new ConfigurationException("Key has duplicate entries");
throw new ConfigurationException("Spark.conf contains duplicate keys");
}
// Check if there are any duplicates in the spark.conf configuration and other individual
// configurations
for (String key : split) {
for (String keyValue : split) {
String key = keyValue.split("=")[0].trim();
boolean matchResult =
settings.stream().anyMatch(settingKey -> key.equals(settingKey.getKey()));
if (matchResult) {
throw new ConfigurationException("Key has duplicate entries,key :" + key);
throw new ConfigurationException(
"Saved key is duplicated with the spark conf key , key :" + key);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,9 +259,8 @@ public List<Object> queryKeyInfoList(List<String> uuidList) throws Configuration
temp.put("validateRange", info.getValidateRange());
temp.put("boundaryType", info.getBoundaryType());
temp.put("defaultValue", info.getDefaultValue());
// for front-end to judge whether input is required
if (StringUtils.isNotEmpty(info.getDefaultValue())) {
temp.put("require", "true");
if (StringUtils.isNotBlank(info.getTemplateRequired())) {
temp.put("require", info.getTemplateRequired().equals("1"));
} else {
temp.put("require", "false");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,15 +65,15 @@ public void init(AvailabilityChangeEvent<AvailabilityState> availabilityChangeEv
commonLock.setCreateTime(new Date());
commonLock.setUpdateTime(new Date());
commonLock.setCreator(Utils.getJvmUser());
commonLock.setHost(Utils.getLocalHostname());
commonLock.setLocker(Utils.getLocalHostname());
commonLock.setUpdator(Utils.getJvmUser());
lock = commonLockService.reentrantLock(commonLock, -1L);
String suffix = ContextServerConf.CS_LABEL_SUFFIX;
String confLabel;

if (lock) {
// master node set cs_1_xxx label
logger.info("The master ps-cs node get lock by {}", _LOCK + "-" + commonLock.getHost());
logger.info("The master ps-cs node get lock by {}", _LOCK + "-" + commonLock.getLocker());
confLabel = "cs_1_" + suffix;
} else {
confLabel = "cs_2_" + suffix;
Expand All @@ -98,7 +98,7 @@ public void shutdown(ContextClosedEvent contextClosedEvent) {
commonLockService.unlock(commonLock);
logger.info(
"The master ps-cs node has released lock {}.",
commonLock.getLockObject() + "-" + commonLock.getHost());
commonLock.getLockObject() + "-" + commonLock.getLocker());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,12 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.11.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
Expand All @@ -91,6 +97,10 @@
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,10 @@ public Message getTaskByID(HttpServletRequest req, @PathVariable("id") Long jobI
return Message.error(
"The corresponding job was not found, or there may be no permission to view the job"
+ "(没有找到对应的job,也可能是没有查看该job的权限)");
} else if (taskVO.getStatus().equals("Running")) {
// 任务运行时不显示异常信息(Do not display exception information during task runtime)
taskVO.setErrCode(null);
taskVO.setErrDesc(null);
}

return Message.ok().data(TaskConstant.TASK, taskVO);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,5 @@ public interface CommonLockMapper {

List<CommonLock> getAll();

CommonLock getLockByHost(@Param("lockObject") String lockObject, @Param("host") String host);
CommonLock getLockByLocker(@Param("lockObject") String lockObject, @Param("locker") String host);
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public class CommonLock {
private Date createTime;
private String updator;
private String creator;
private String host;
private String locker;

public Integer getId() {
return id;
Expand Down Expand Up @@ -86,11 +86,11 @@ public void setCreator(String creator) {
this.creator = creator;
}

public String getHost() {
return host;
public String getLocker() {
return locker;
}

public void setHost(String host) {
this.host = host;
public void setLocker(String locker) {
this.locker = locker;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public Boolean lock(CommonLock commonLock, Long timeOut) {
@Override
public Boolean reentrantLock(CommonLock commonLock, Long timeOut) {
CommonLock oldLock =
commonLockMapper.getLockByHost(commonLock.getLockObject(), commonLock.getHost());
commonLockMapper.getLockByLocker(commonLock.getLockObject(), commonLock.getLocker());
if (oldLock != null) {
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,18 @@
<mapper namespace="org.apache.linkis.publicservice.common.lock.dao.CommonLockMapper">

<insert id="lock">
insert into linkis_ps_common_lock (lock_object, `host`, time_out, update_time, create_time)
values(#{commonLock.lockObject}, #{commonLock.host}, #{timeOut}, now(), now())
insert into linkis_ps_common_lock (lock_object, locker, time_out, update_time, create_time)
values(#{commonLock.lockObject}, #{commonLock.locker}, #{timeOut}, now(), now())
</insert>

<delete id="unlock">
delete from linkis_ps_common_lock where lock_object = #{commonLock.lockObject} and `host` = #{commonLock.host}
delete from linkis_ps_common_lock where lock_object = #{commonLock.lockObject} and locker = #{commonLock.locker}
</delete>

<select id="getAll" resultType="org.apache.linkis.publicservice.common.lock.entity.CommonLock">
select * from linkis_ps_common_lock
</select>
<select id="getLockByHost" resultType="org.apache.linkis.publicservice.common.lock.entity.CommonLock">
select * from linkis_ps_common_lock where lock_object = #{lockObject} and `host` = #{host}
<select id="getLockByLocker" resultType="org.apache.linkis.publicservice.common.lock.entity.CommonLock">
select * from linkis_ps_common_lock where lock_object = #{lockObject} and locker = #{locker}
</select>
</mapper>
Loading

0 comments on commit 08df665

Please sign in to comment.