Skip to content

Commit

Permalink
Merge branch 'dev' into fix-monitor-bug
Browse files Browse the repository at this point in the history
  • Loading branch information
gaoyan1998 committed Dec 12, 2023
2 parents d96f7dc + 2343bd8 commit 6e6daf0
Show file tree
Hide file tree
Showing 47 changed files with 500 additions and 300 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ public boolean success() {
history.setStatus(job.getStatus().getCode());
history.setJobId(job.getJobId());
history.setEndTime(job.getEndTime());
history.setJobManagerAddress(job.isUseGateway() ? job.getJobManagerAddress() : null);
history.setJobManagerAddress(job.getJobManagerAddress());

Integer clusterId = job.getJobConfig().getClusterId();
ClusterInstance clusterInstance;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.dinky.cluster.FlinkClusterInfo;
import org.dinky.data.dto.ClusterInstanceDTO;
import org.dinky.data.model.ClusterInstance;
import org.dinky.job.JobConfig;
import org.dinky.mybatis.service.ISuperService;

import java.util.List;
Expand Down Expand Up @@ -49,26 +50,10 @@ public interface ClusterInstanceService extends ISuperService<ClusterInstance> {
/**
* build environment address
*
* @param useRemote {@link Boolean} use remote or local
* @param id {@link Integer} cluster id
* @return {@link String} eg: host1:8081
*/
String buildEnvironmentAddress(boolean useRemote, Integer id);

/**
* build remote environment address by cluster id
*
* @param id {@link Integer} cluster id
* @return {@link String} eg: host1:8081
*/
String buildRemoteEnvironmentAddress(Integer id);

/**
* build local environment address
*
* @param config {@link JobConfig} the config of job
* @return {@link String} eg: host1:8081
*/
String buildLocalEnvironmentAddress();
String buildEnvironmentAddress(JobConfig config);

/**
* list enabled cluster instances
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,24 +23,25 @@
import org.dinky.assertion.Asserts;
import org.dinky.cluster.FlinkCluster;
import org.dinky.cluster.FlinkClusterInfo;
import org.dinky.constant.FlinkConstant;
import org.dinky.data.dto.ClusterInstanceDTO;
import org.dinky.data.model.ClusterConfiguration;
import org.dinky.data.model.ClusterInstance;
import org.dinky.gateway.config.GatewayConfig;
import org.dinky.gateway.exception.GatewayException;
import org.dinky.gateway.model.FlinkClusterConfig;
import org.dinky.gateway.result.GatewayResult;
import org.dinky.job.JobConfig;
import org.dinky.job.JobManager;
import org.dinky.mapper.ClusterInstanceMapper;
import org.dinky.mybatis.service.impl.SuperServiceImpl;
import org.dinky.service.ClusterConfigurationService;
import org.dinky.service.ClusterInstanceService;
import org.dinky.utils.IpUtils;
import org.dinky.utils.URLUtils;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Map;

import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
Expand Down Expand Up @@ -87,30 +88,33 @@ public String getJobManagerAddress(ClusterInstance clusterInstance) {
}

@Override
public String buildEnvironmentAddress(boolean useRemote, Integer id) {
if (useRemote && id != 0) {
public String buildEnvironmentAddress(JobConfig config) {
Integer id = config.getClusterId();
Boolean useRemote = config.isUseRemote();
if (useRemote && id != null && id != 0) {
return buildRemoteEnvironmentAddress(id);
} else {
return buildLocalEnvironmentAddress();
Map<String, String> flinkConfig = config.getConfigJson();
int port;
if (Asserts.isNotNull(flinkConfig) && flinkConfig.containsKey("rest.port")) {
port = Integer.valueOf(flinkConfig.get("rest.port"));
} else {
port = URLUtils.getRandomPort();
while (!IpUtils.isPortAvailable(port)) {
port = URLUtils.getRandomPort();
}
}
return buildLocalEnvironmentAddress(port);
}
}

@Override
public String buildRemoteEnvironmentAddress(Integer id) {
private String buildRemoteEnvironmentAddress(Integer id) {
return getJobManagerAddress(getById(id));
}

@Override
public String buildLocalEnvironmentAddress() {
try {
InetAddress inetAddress = InetAddress.getLocalHost();
if (inetAddress != null) {
return inetAddress.getHostAddress();
}
} catch (UnknownHostException e) {
e.printStackTrace();
}
return FlinkConstant.LOCAL_HOST;
private String buildLocalEnvironmentAddress(int port) {
String host = IpUtils.getHostIp();
return host + ":" + port;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,14 +224,21 @@ public JobConfig buildJobConfig(TaskDTO task) {
JobInstance jobInstance = jobInstanceService.getById(task.getJobInstanceId());
config.setClusterId(jobInstance.getClusterId());
}
} else if (GatewayType.LOCAL.equalsValue(task.getType())) {
if (task.getJobInstanceId() == null) {
config.setClusterId(null);
} else {
JobInstance jobInstance = jobInstanceService.getById(task.getJobInstanceId());
config.setClusterId(jobInstance.getClusterId());
config.setUseRemote(true);
}
config.setClusterConfigurationId(null);
} else {
Optional.ofNullable(task.getClusterId()).ifPresent(config::setClusterId);
}
log.info("Init remote cluster");
try {
Optional.ofNullable(config.getClusterId()).ifPresent(i -> {
config.setAddress(clusterInstanceService.buildEnvironmentAddress(config.isUseRemote(), i));
});
config.setAddress(clusterInstanceService.buildEnvironmentAddress(config));
} catch (Exception e) {
log.error("Init remote cluster error", e);
}
Expand Down
11 changes: 11 additions & 0 deletions dinky-admin/src/main/java/org/dinky/utils/IpUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.dinky.assertion.Asserts;

import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.UnknownHostException;

import javax.servlet.http.HttpServletRequest;
Expand Down Expand Up @@ -171,6 +172,16 @@ public static String getHostIp() {
return "127.0.0.1";
}

public static boolean isPortAvailable(int port) {
try (ServerSocket serverSocket = new ServerSocket(port)) {
// If the code reaches this point, the port is available
return true;
} catch (Exception e) {
// Port is not available
return false;
}
}

public static String getHostName() {
try {
return InetAddress.getLocalHost().getHostName();
Expand Down
6 changes: 6 additions & 0 deletions dinky-common/src/main/java/org/dinky/utils/URLUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.net.URL;
import java.net.URLConnection;
import java.util.Arrays;
import java.util.Random;
import java.util.Set;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -103,4 +104,9 @@ public static String formatAddress(String webURL) {
return "";
}
}

public static int getRandomPort() {
Random random = new Random();
return 30000 + random.nextInt(35536);
}
}
11 changes: 1 addition & 10 deletions dinky-core/src/main/java/org/dinky/constant/FlinkConstant.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,8 @@
* @since 2021/5/25 14:39
*/
public interface FlinkConstant {

/** flink端口 */
Integer FLINK_REST_DEFAULT_PORT = 8081;
/** flink会话默认个数 */
Integer DEFAULT_SESSION_COUNT = 256;
/** flink加载因子 */
Double DEFAULT_FACTOR = 0.75;
/** 本地模式host */
String LOCAL_HOST = "localhost:8081";
String LOCAL_HOST = "localhost";
/** changlog op */
String OP = "op";

String DEFAULT_FLINK_HOME = "/opt/flink";
}
15 changes: 15 additions & 0 deletions dinky-core/src/main/java/org/dinky/executor/ExecutorConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,13 @@ public class ExecutorConfig {
notes = "Flag indicating whether to use batch model")
private boolean useBatchModel;

@ApiModelProperty(
value = "Whether to only build plans",
dataType = "boolean",
example = "true",
notes = "Build plan only")
private boolean isPlan;

@ApiModelProperty(
value = "Checkpoint interval",
dataType = "Integer",
Expand Down Expand Up @@ -263,6 +270,14 @@ public boolean isValidJarFiles() {
return Asserts.isNotNull(this.getJarFiles());
}

public boolean isPlan() {
return isPlan;
}

public void setPlan(boolean plan) {
isPlan = plan;
}

@Override
public String toString() {
return String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,12 @@ public LocalBatchExecutor(ExecutorConfig executorConfig) {
.map(FileUtil::getAbsolutePath)
.collect(Collectors.joining(",")));
}
if (executorConfig.isValidConfig()) {
if (!executorConfig.isPlan()) {
Configuration configuration = Configuration.fromMap(executorConfig.getConfig());
if (configuration.contains(RestOptions.PORT)) {
this.environment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
} else {
this.environment = StreamExecutionEnvironment.createLocalEnvironment(configuration);
if (!configuration.contains(RestOptions.PORT)) {
configuration.set(RestOptions.PORT, executorConfig.getPort());
}
this.environment = StreamExecutionEnvironment.createLocalEnvironment(configuration);
} else {
this.environment = StreamExecutionEnvironment.createLocalEnvironment();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,12 @@ public LocalStreamExecutor(ExecutorConfig executorConfig) {
.map(FileUtil::getAbsolutePath)
.collect(Collectors.joining(",")));
}
if (executorConfig.isValidConfig()) {
if (!executorConfig.isPlan()) {
Configuration configuration = Configuration.fromMap(executorConfig.getConfig());
if (configuration.contains(RestOptions.PORT)) {
this.environment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
} else {
this.environment = StreamExecutionEnvironment.createLocalEnvironment(configuration);
if (!configuration.contains(RestOptions.PORT)) {
configuration.set(RestOptions.PORT, executorConfig.getPort());
}
this.environment = StreamExecutionEnvironment.createLocalEnvironment(configuration);
} else {
this.environment = StreamExecutionEnvironment.createLocalEnvironment();
}
Expand Down
2 changes: 1 addition & 1 deletion dinky-core/src/main/java/org/dinky/job/JobConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ public void addGatewayConfig(Map<String, Object> config) {
}

public boolean isUseRemote() {
return !GatewayType.LOCAL.equalsValue(type);
return useRemote || !GatewayType.LOCAL.equalsValue(type);
}

public void buildLocal() {
Expand Down
1 change: 1 addition & 0 deletions dinky-core/src/main/java/org/dinky/job/JobManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ public void init() {
useRestAPI = SystemConfiguration.getInstances().isUseRestAPI();
sqlSeparator = SystemConfiguration.getInstances().getSqlSeparator();
executorConfig = config.getExecutorSetting();
executorConfig.setPlan(isPlanMode);
executor = ExecutorFactory.buildExecutor(executorConfig);
ExecutorContext.setExecutor(executor);
}
Expand Down
1 change: 1 addition & 0 deletions dinky-web/src/components/CustomEditor/CodeEdit/index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ const CodeEdit = (props: CodeEditFormProps & connect) => {
readOnly, // 是否只读
glyphMargin: true, // 字形边缘
formatOnType: true, // 代码格式化
// columnSelection: true, // 列选择
wrappingIndent:
language === 'yaml' || language === 'yml' || language === 'json' ? 'indent' : 'none',
inlineSuggest: {
Expand Down
33 changes: 14 additions & 19 deletions dinky-web/src/components/LineageGraph/index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import { Badge, Tooltip, Typography } from 'antd';

import LineageDagExt from '@/components/LineageGraph/lineage-dag-ext';
import {
LineageDetailInfo,
LineageRelations,
Expand All @@ -28,19 +29,15 @@ import {
import { l } from '@/utils/intl';
import { SuccessNotification, WarningNotification } from '@/utils/messages';
import {
ArrowsAltOutlined,
ColumnHeightOutlined,
CompassOutlined,
ExpandAltOutlined,
FullscreenExitOutlined,
InsertRowAboveOutlined,
PlusCircleOutlined,
ReloadOutlined
ReloadOutlined,
ShrinkOutlined
} from '@ant-design/icons';
import _ from 'lodash';
import React, { useEffect } from 'react';
import 'react-lineage-dag/dist/index.css';
// import LineageDag from "react-lineage-dag";
import LineageDagExt from '@/components/LineageGraph/lineage-dag-ext';
// import LineageDagExt from "@/components/LineageGraph/lineage-dag-ext";

interface LineageState {
lineageData: LineageDetailInfo;
Expand All @@ -50,7 +47,7 @@ interface LineageState {
relations: LineageRelations[];
columns: any[];
operator: any[];
centerId: string;
centerId?: string;
showMinimap: boolean;
refresh: boolean;
expandField: boolean;
Expand All @@ -69,7 +66,7 @@ type JobLineageProps = {
type ITable = {
id: string;
name: string;
isCollapse: boolean;
isCollapse?: boolean;
fields: LineageTableColumn[];
};

Expand Down Expand Up @@ -188,10 +185,10 @@ const LineageGraph: React.FC<JobLineageProps> = (props) => {
<Tooltip
title={lineageState.expandField ? l('lineage.expandField') : l('lineage.expandField')}
>
<FullscreenExitOutlined width={300} />
<ColumnHeightOutlined />
</Tooltip>
),
onClick: (nodeData: any) => handleExpandField(nodeData, _.clone(data))
onClick: (nodeData: any) => handleExpandField(nodeData, data)
},
{
id: 'expandDownstream',
Expand All @@ -206,7 +203,7 @@ const LineageGraph: React.FC<JobLineageProps> = (props) => {
: l('lineage.collapseDownstream')
}
>
<PlusCircleOutlined />
<ShrinkOutlined />
</Tooltip>
),
onClick: (nodeData: { id: string }) => {
Expand Down Expand Up @@ -235,7 +232,7 @@ const LineageGraph: React.FC<JobLineageProps> = (props) => {
: l('lineage.collapseUpstream')
}
>
<ExpandAltOutlined />
<ArrowsAltOutlined />
</Tooltip>
),
onClick: (nodeData: { id: string }) => {
Expand Down Expand Up @@ -327,11 +324,9 @@ const LineageGraph: React.FC<JobLineageProps> = (props) => {
isAdsorb: true,
theme: {
shapeType: 'line',
gap: 20,
lineWidth: 1,
lineColor: '#e8e8e8',
circleRadiu: 5,
circleColor: '#e8e8e8'
gap: 30,
lineWidth: 0.2,
circleRadiu: 5
}
}
}}
Expand Down
Loading

0 comments on commit 6e6daf0

Please sign in to comment.