Skip to content

Commit

Permalink
Optimize cluster configuration and start session cluster for manual r…
Browse files Browse the repository at this point in the history
…egistration (DataLinkDC#3788)

Signed-off-by: Zzm0809 <[email protected]>
Co-authored-by: Zzm0809 <[email protected]>
  • Loading branch information
Zzm0809 and Zzm0809 authored Sep 7, 2024
1 parent ae45318 commit 885061a
Show file tree
Hide file tree
Showing 7 changed files with 89 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ public class ClusterInstanceDTO {
@ApiModelProperty(value = "taskId", required = true, dataType = "Integer", example = "test", notes = "task id")
private Integer taskId;

@ApiModelProperty(value = "note", dataType = "String", example = "test")
private String note;

@NotNull(
message = "Enabled cannot be null",
groups = {Save.class})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.dinky.utils.URLUtils;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
Expand All @@ -58,7 +59,6 @@
import org.springframework.transaction.annotation.Transactional;

import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;

import cn.hutool.core.thread.ThreadUtil;
import cn.hutool.core.util.StrUtil;
Expand Down Expand Up @@ -87,7 +87,6 @@ public FlinkClusterInfo checkHeartBeat(String hosts, String host) {

@Override
public String getJobManagerAddress(ClusterInstance clusterInstance) {
// TODO 这里判空逻辑有问题,clusterInstance有可能为null
DinkyAssert.check(clusterInstance);
FlinkClusterInfo info =
FlinkCluster.testFlinkJobManagerIP(clusterInstance.getHosts(), clusterInstance.getJobManagerHost());
Expand Down Expand Up @@ -134,7 +133,7 @@ private String buildLocalEnvironmentAddress(int port) {

@Override
public List<ClusterInstance> listEnabledAllClusterInstance() {
return this.list(new QueryWrapper<ClusterInstance>().eq("enabled", 1));
return this.list(new LambdaQueryWrapper<ClusterInstance>().eq(ClusterInstance::getEnabled, 1));
}

@Override
Expand All @@ -144,11 +143,12 @@ public List<ClusterInstance> listSessionEnable() {

@Override
public List<ClusterInstance> listAutoEnable() {
return list(new QueryWrapper<ClusterInstance>().eq("enabled", 1).eq("auto_registers", 1));
return list(new LambdaQueryWrapper<ClusterInstance>()
.eq(ClusterInstance::getEnabled, 1)
.eq(ClusterInstance::isAutoRegisters, 1));
}

@Override
@Transactional(rollbackFor = Exception.class)
public ClusterInstance registersCluster(ClusterInstanceDTO clusterInstanceDTO) {
ClusterInstance clusterInstance = clusterInstanceDTO.toBean();

Expand All @@ -167,8 +167,7 @@ public ClusterInstance registersCluster(ClusterInstance clusterInstance) {
}

/**
* @param id
* @return
* @param id cluster instance id
*/
@Override
public Boolean deleteClusterInstanceById(Integer id) {
Expand Down Expand Up @@ -237,15 +236,20 @@ public ClusterInstance deploySessionCluster(Integer id) {
GatewayResult gatewayResult = JobManager.deploySessionCluster(gatewayConfig);
if (gatewayResult.isSuccess()) {
Asserts.checkNullString(gatewayResult.getWebURL(), "Unable to obtain Web URL.");
return registersCluster(ClusterInstanceDTO.builder()
ClusterInstance registersedCluster = registersCluster(ClusterInstanceDTO.builder()
.hosts(gatewayResult.getWebURL().replace("http://", ""))
.name(gatewayResult.getId())
.alias(clusterCfg.getName() + "_" + LocalDateTime.now())
.alias(clusterCfg.getName() + "_"
+ LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMddHHmmss")))
.type(gatewayConfig.getType().getLongValue())
.clusterConfigurationId(id)
.autoRegisters(true)
.autoRegisters(false)
.enabled(true)
.note(String.format("Deployment from cluster configuration [%s]", clusterCfg.getName()))
.build());
// check health after deploy session cluster
checkHealth(registersedCluster);
return registersedCluster;
}
throw new DinkyException("Deploy session cluster error: " + gatewayResult.getError());
}
Expand Down Expand Up @@ -284,8 +288,8 @@ public Long heartbeat() {
List<ClusterInstance> clusterInstances = this.list();
ExecutorService executor = ThreadUtil.newExecutor(Math.min(clusterInstances.size(), 10));
List<CompletableFuture<Integer>> futures = clusterInstances.stream()
.map(c -> CompletableFuture.supplyAsync(
() -> this.registersCluster(c).getStatus(), executor))
.map(c ->
CompletableFuture.supplyAsync(() -> registersCluster(c).getStatus(), executor))
.collect(Collectors.toList());
return futures.stream().map(CompletableFuture::join).filter(x -> x == 1).count();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,6 @@
from
dinky_cluster a
where enabled = 1
and (type = 'standalone' or type = 'yarn-session'or type = 'kubernetes-session')
and `type` in( 'standalone' , 'yarn-session' , 'kubernetes-session')
</select>
</mapper>
2 changes: 1 addition & 1 deletion dinky-web/src/locales/en-US/pages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -896,7 +896,7 @@ export default {
'rc.ci.killConfirm':
'Are you sure to stop this Flink Cluster instance? Please note that after stopping, it will not be recovered! The associated task will affect, please be careful!',
'rc.ci.kill': 'Stop Flink Cluster Instance',
'rc.ci.jma': 'JobManager Address',
'rc.ci.jma': 'JM Address',
'rc.ci.jmha': 'JobManager HA Address',
'rc.ci.jmha.tips':
'Add the RestApi address of the JobManager of the Flink cluster. In HA mode, the addresses are separated by commas, for example: 192.168.123.101:8081',
Expand Down
6 changes: 3 additions & 3 deletions dinky-web/src/locales/zh-CN/pages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -814,7 +814,7 @@ export default {
'rc.cc.value': '配置值',
'rc.cc.loadFromLocal': '从本地文件加载',

'rc.ci.alias': '实例别名',
'rc.ci.alias': '别名',
'rc.ci.aliasPlaceholder': '请输入别名!',

'rc.ci.ar': '自动注册',
Expand All @@ -825,7 +825,7 @@ export default {
'确定停止该 Flink 实例吗? 请注意,停止后将无法恢复!关联的任务将会收到影响,请谨慎操作!',
'rc.ci.heartbeat': '心跳检测',
'rc.ci.kill': '停止 Flink 实例',
'rc.ci.jma': 'JobManager 地址',
'rc.ci.jma': 'JM 地址',
'rc.ci.jmha': 'JobManager 高可用地址',
'rc.ci.jmha.tips':
'添加 Flink 集群的 JobManager 的 RestApi 地址。当 HA 模式时,地址间用英文逗号分隔,例如:192.168.123.101:8081',
Expand All @@ -844,7 +844,7 @@ export default {
'rc.ci.type': '类型',
'rc.ci.typePlaceholder': '请选择集群类型!',
'rc.ci.version': '版本',
'rc.ci.desc': '描述',
'rc.ci.desc': '备注',
'rc.ci.search': '搜索 名称/别名/备注',
'rc.doc.category': '注册类型',
'rc.doc.categoryPlaceholder': '请选择该文档所属类型!',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ import {
Typography
} from 'antd';
import { useState } from 'react';
import EllipsisMiddle from '@/components/Typography/EllipsisMiddle';
import { isContainsChinese } from '@/utils/function';

const { Text, Paragraph, Link } = Typography;

Expand Down Expand Up @@ -219,15 +221,24 @@ export default () => {
{l('rc.ci.jma')}: {renderWebUiRedirect(item)}
</blockquote>
<blockquote>
{l('rc.ci.version')}: <Link>{item.version}</Link>
{l('rc.ci.version')}: <Link>{item.version || 'None'}</Link>
</blockquote>
<blockquote style={{ display: 'flex' }}>
<span style={{ minWidth: '2vw' }}> {l('rc.ci.alias')}: </span>
<EllipsisMiddle
copyable={false}
maxCount={isContainsChinese(item.alias ?? '') ? 10 : 20}
children={item.alias}
/>
</blockquote>
<blockquote style={{ display: 'flex' }}>
<span style={{ minWidth: '2vw' }}> {l('rc.ci.desc')}: </span>
<EllipsisMiddle
copyable={false}
maxCount={isContainsChinese(item.note ?? '') ? 10 : 20}
children={item.note}
/>
</blockquote>
<Text title={item.alias} ellipsis>
{(item.alias || item.alias === '') && (
<blockquote>
{l('rc.ci.alias')}: {item.alias}
</blockquote>
)}
</Text>
</Paragraph>

<Space size={8} align={'baseline'} className={'hidden-overflow'}>
Expand Down
46 changes: 46 additions & 0 deletions dinky-web/src/utils/function.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,7 @@ export const parseSecondStr = (s_time: number) => {
}
return time;
};

export function Bytes2Mb(bs: number) {
return bs / 1024 / 1024;
}
Expand Down Expand Up @@ -672,3 +673,48 @@ export function getUrlParam(allParams = window.location.search, key: string) {
const result = params.get(key);
return result ?? '';
}

/**
* Determine whether the string contains Chinese, English, or a mixture of both
* Current: If it is a mixture of Chinese/English, return true; if it is English, return false
* @returns {boolean} true: contains Chinese characters; false: does not contain Chinese characters
* @param str string
*/
export function isContainsChinese(str: string = '') {
// Regular expression matches Chinese characters
const chineseRegex = /[\u4e00-\u9fa5]/;
// Regular expression matches English characters (including uppercase and lowercase)
const englishRegex = /[a-zA-Z]/;

const numberRegex = /[0-9]/;

if (str && str.length === 0) {
return;
}

let hasChinese = false;
let hasEnglish = false;
let hasNumber = false;

for (let i = 0; i < str.length; i++) {
if (chineseRegex.test(str[i])) {
hasChinese = true;
}
if (englishRegex.test(str[i])) {
hasEnglish = true;
}
if (numberRegex.test(str[i])) {
hasNumber = true;
}
}

if (hasChinese) {
return true;
} else if (hasEnglish && !hasChinese) {
return false;
} else if (hasNumber && !hasChinese) {
return false;
} else {
return false;
}
}

0 comments on commit 885061a

Please sign in to comment.