Skip to content

Commit

Permalink
Merge branch 'DataLinkDC:dev' into dev
Browse files Browse the repository at this point in the history
  • Loading branch information
Zzm0809 authored Dec 11, 2023
2 parents a5b1451 + 963cb1d commit 671384f
Show file tree
Hide file tree
Showing 10 changed files with 59 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.dinky.cluster.FlinkClusterInfo;
import org.dinky.data.dto.ClusterInstanceDTO;
import org.dinky.data.model.ClusterInstance;
import org.dinky.job.JobConfig;
import org.dinky.mybatis.service.ISuperService;

import java.util.List;
Expand Down Expand Up @@ -49,26 +50,10 @@ public interface ClusterInstanceService extends ISuperService<ClusterInstance> {
/**
* build environment address
*
* @param useRemote {@link Boolean} use remote or local
* @param id {@link Integer} cluster id
* @return {@link String} eg: host1:8081
*/
String buildEnvironmentAddress(boolean useRemote, Integer id);

/**
* build remote environment address by cluster id
*
* @param id {@link Integer} cluster id
* @return {@link String} eg: host1:8081
*/
String buildRemoteEnvironmentAddress(Integer id);

/**
* build local environment address
*
* @param config {@link JobConfig} the config of job
* @return {@link String} eg: host1:8081
*/
String buildLocalEnvironmentAddress();
String buildEnvironmentAddress(JobConfig config);

/**
* list enabled cluster instances
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,24 +23,25 @@
import org.dinky.assertion.Asserts;
import org.dinky.cluster.FlinkCluster;
import org.dinky.cluster.FlinkClusterInfo;
import org.dinky.constant.FlinkConstant;
import org.dinky.data.dto.ClusterInstanceDTO;
import org.dinky.data.model.ClusterConfiguration;
import org.dinky.data.model.ClusterInstance;
import org.dinky.gateway.config.GatewayConfig;
import org.dinky.gateway.exception.GatewayException;
import org.dinky.gateway.model.FlinkClusterConfig;
import org.dinky.gateway.result.GatewayResult;
import org.dinky.job.JobConfig;
import org.dinky.job.JobManager;
import org.dinky.mapper.ClusterInstanceMapper;
import org.dinky.mybatis.service.impl.SuperServiceImpl;
import org.dinky.service.ClusterConfigurationService;
import org.dinky.service.ClusterInstanceService;
import org.dinky.utils.IpUtils;
import org.dinky.utils.URLUtils;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Map;

import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
Expand Down Expand Up @@ -87,30 +88,30 @@ public String getJobManagerAddress(ClusterInstance clusterInstance) {
}

@Override
public String buildEnvironmentAddress(boolean useRemote, Integer id) {
if (useRemote && id != 0) {
public String buildEnvironmentAddress(JobConfig config) {
Integer id = config.getClusterId();
Boolean useRemote = config.isUseRemote();
if (useRemote && id != null && id != 0) {
return buildRemoteEnvironmentAddress(id);
} else {
return buildLocalEnvironmentAddress();
Map<String, String> flinkConfig = config.getConfigJson();
int port;
if (Asserts.isNotNull(flinkConfig) && flinkConfig.containsKey("rest.port")) {
port = Integer.valueOf(flinkConfig.get("rest.port"));
} else {
port = URLUtils.getRandomPort();
}
return buildLocalEnvironmentAddress(port);
}
}

@Override
public String buildRemoteEnvironmentAddress(Integer id) {
private String buildRemoteEnvironmentAddress(Integer id) {
return getJobManagerAddress(getById(id));
}

@Override
public String buildLocalEnvironmentAddress() {
try {
InetAddress inetAddress = InetAddress.getLocalHost();
if (inetAddress != null) {
return inetAddress.getHostAddress();
}
} catch (UnknownHostException e) {
e.printStackTrace();
}
return FlinkConstant.LOCAL_HOST;
private String buildLocalEnvironmentAddress(int port) {
String host = IpUtils.getHostIp();
return host + ":" + port;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,14 +224,15 @@ public JobConfig buildJobConfig(TaskDTO task) {
JobInstance jobInstance = jobInstanceService.getById(task.getJobInstanceId());
config.setClusterId(jobInstance.getClusterId());
}
} else if (GatewayType.LOCAL.equalsValue(task.getType())) {
config.setClusterId(null);
config.setClusterConfigurationId(null);
} else {
Optional.ofNullable(task.getClusterId()).ifPresent(config::setClusterId);
}
log.info("Init remote cluster");
try {
Optional.ofNullable(config.getClusterId()).ifPresent(i -> {
config.setAddress(clusterInstanceService.buildEnvironmentAddress(config.isUseRemote(), i));
});
config.setAddress(clusterInstanceService.buildEnvironmentAddress(config));
} catch (Exception e) {
log.error("Init remote cluster error", e);
}
Expand Down
6 changes: 6 additions & 0 deletions dinky-common/src/main/java/org/dinky/utils/URLUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.net.URL;
import java.net.URLConnection;
import java.util.Arrays;
import java.util.Random;
import java.util.Set;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -103,4 +104,9 @@ public static String formatAddress(String webURL) {
return "";
}
}

public static int getRandomPort() {
Random random = new Random();
return random.nextInt(65536);
}
}
11 changes: 1 addition & 10 deletions dinky-core/src/main/java/org/dinky/constant/FlinkConstant.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,8 @@
* @since 2021/5/25 14:39
*/
public interface FlinkConstant {

/** flink端口 */
Integer FLINK_REST_DEFAULT_PORT = 8081;
/** flink会话默认个数 */
Integer DEFAULT_SESSION_COUNT = 256;
/** flink加载因子 */
Double DEFAULT_FACTOR = 0.75;
/** 本地模式host */
String LOCAL_HOST = "localhost:8081";
String LOCAL_HOST = "localhost";
/** changlog op */
String OP = "op";

String DEFAULT_FLINK_HOME = "/opt/flink";
}
15 changes: 15 additions & 0 deletions dinky-core/src/main/java/org/dinky/executor/ExecutorConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,13 @@ public class ExecutorConfig {
notes = "Flag indicating whether to use batch model")
private boolean useBatchModel;

@ApiModelProperty(
value = "Whether to only build plans",
dataType = "boolean",
example = "true",
notes = "Build plan only")
private boolean isPlan;

@ApiModelProperty(
value = "Checkpoint interval",
dataType = "Integer",
Expand Down Expand Up @@ -263,6 +270,14 @@ public boolean isValidJarFiles() {
return Asserts.isNotNull(this.getJarFiles());
}

public boolean isPlan() {
return isPlan;
}

public void setPlan(boolean plan) {
isPlan = plan;
}

@Override
public String toString() {
return String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,12 @@ public LocalBatchExecutor(ExecutorConfig executorConfig) {
.map(FileUtil::getAbsolutePath)
.collect(Collectors.joining(",")));
}
if (executorConfig.isValidConfig()) {
if (!executorConfig.isPlan()) {
Configuration configuration = Configuration.fromMap(executorConfig.getConfig());
if (configuration.contains(RestOptions.PORT)) {
this.environment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
} else {
this.environment = StreamExecutionEnvironment.createLocalEnvironment(configuration);
if (!configuration.contains(RestOptions.PORT)) {
configuration.set(RestOptions.PORT, executorConfig.getPort());
}
this.environment = StreamExecutionEnvironment.createLocalEnvironment(configuration);
} else {
this.environment = StreamExecutionEnvironment.createLocalEnvironment();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,12 @@ public LocalStreamExecutor(ExecutorConfig executorConfig) {
.map(FileUtil::getAbsolutePath)
.collect(Collectors.joining(",")));
}
if (executorConfig.isValidConfig()) {
if (!executorConfig.isPlan()) {
Configuration configuration = Configuration.fromMap(executorConfig.getConfig());
if (configuration.contains(RestOptions.PORT)) {
this.environment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
} else {
this.environment = StreamExecutionEnvironment.createLocalEnvironment(configuration);
if (!configuration.contains(RestOptions.PORT)) {
configuration.set(RestOptions.PORT, executorConfig.getPort());
}
this.environment = StreamExecutionEnvironment.createLocalEnvironment(configuration);
} else {
this.environment = StreamExecutionEnvironment.createLocalEnvironment();
}
Expand Down
1 change: 1 addition & 0 deletions dinky-core/src/main/java/org/dinky/job/JobManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ public void init() {
useRestAPI = SystemConfiguration.getInstances().isUseRestAPI();
sqlSeparator = SystemConfiguration.getInstances().getSqlSeparator();
executorConfig = config.getExecutorSetting();
executorConfig.setPlan(isPlanMode);
executor = ExecutorFactory.buildExecutor(executorConfig);
ExecutorContext.setExecutor(executor);
}
Expand Down
2 changes: 1 addition & 1 deletion script/bin/auto.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ FLINK_VERSION=${2:-1.14}
JAR_NAME="dinky-admin"

# Use FLINK_HOME:
CLASS_PATH=".:./lib/*:config:./plugins/*:./customJar/*:./plugins/flink${FLINK_VERSION}/*"
CLASS_PATH=".:./lib/*:config:./plugins/*:./customJar/*:./plugins/flink${FLINK_VERSION}/dinky/*:./plugins/flink${FLINK_VERSION}/*"

PID_FILE="dinky.pid"

Expand Down

0 comments on commit 671384f

Please sign in to comment.